From 9ea738827c142f0a8400ab962a285c1961822c7b Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 26 Nov 2003 20:50:11 +0000
Subject: [PATCH] Second try at fixing no-room-to-move-down PANIC in
 compact_fsm_storage. Ward's report that it can still happen in RC2 forces me
 to realize that this is not a can't-happen condition after all, and that the
 compaction code had better cope rather than panicking.

---
 src/backend/storage/freespace/freespace.c | 64 +++++++++++++++++------
 1 file changed, 47 insertions(+), 17 deletions(-)

diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index b5e1a53b35..62a0044f09 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.24 2003/10/29 17:36:57 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.25 2003/11/26 20:50:11 tgl Exp $
  *
  *
  * NOTES:
@@ -1394,6 +1394,7 @@ static void
 compact_fsm_storage(void)
 {
 	int			nextChunkIndex = 0;
+	bool		did_push = false;
 	FSMRelation *fsmrel;
 
 	for (fsmrel = FreeSpaceMap->firstRel;
@@ -1419,16 +1420,15 @@ compact_fsm_storage(void)
 			newAllocPages = newAlloc * INDEXCHUNKPAGES;
 		else
 			newAllocPages = newAlloc * CHUNKPAGES;
-		newChunkIndex = nextChunkIndex;
-		nextChunkIndex += newAlloc;
 
 		/*
 		 * Determine current size, current and new locations
 		 */
 		curChunks = fsm_current_chunks(fsmrel);
 		oldChunkIndex = fsmrel->firstChunk;
-		newLocation = FreeSpaceMap->arena + newChunkIndex * CHUNKBYTES;
 		oldLocation = FreeSpaceMap->arena + oldChunkIndex * CHUNKBYTES;
+		newChunkIndex = nextChunkIndex;
+		newLocation = FreeSpaceMap->arena + newChunkIndex * CHUNKBYTES;
 
 		/*
 		 * It's possible that we have to move data down, not up, if the
@@ -1440,10 +1440,16 @@ compact_fsm_storage(void)
 		 * more than once, so pack everything against the end of the arena
 		 * if so.
 		 *
-		 * In corner cases where roundoff has affected our allocation, it's
-		 * possible that we have to move down and compress our data too.
-		 * Since this case is extremely infrequent, we do not try to be smart
-		 * about it --- we just drop pages from the end of the rel's data.
+		 * In corner cases where we are on the short end of a roundoff choice
+		 * that we were formerly on the long end of, it's possible that we
+		 * have to move down and compress our data too.  In fact, even after
+		 * pushing down the following rels, there might not be as much space
+		 * as we computed for this rel above --- that would imply that some
+		 * following rel(s) are also on the losing end of roundoff choices.
+		 * We could handle this fairly by doing the per-rel compactions
+		 * out-of-order, but that seems like way too much complexity to deal
+		 * with a very infrequent corner case.  Instead, we simply drop pages
+		 * from the end of the current rel's data until it fits.
 		 */
 		if (newChunkIndex > oldChunkIndex)
 		{
@@ -1455,21 +1461,44 @@ compact_fsm_storage(void)
 				fsmrel->storedPages = newAllocPages;
 				curChunks = fsm_current_chunks(fsmrel);
 			}
+			/* is there enough space? */
 			if (fsmrel->nextPhysical != NULL)
 				limitChunkIndex = fsmrel->nextPhysical->firstChunk;
 			else
 				limitChunkIndex = FreeSpaceMap->totalChunks;
 			if (newChunkIndex + curChunks > limitChunkIndex)
 			{
-				/* need to push down additional rels */
-				push_fsm_rels_after(fsmrel);
-				/* recheck for safety */
+				/* not enough space, push down following rels */
+				if (!did_push)
+				{
+					push_fsm_rels_after(fsmrel);
+					did_push = true;
+				}
+				/* now is there enough space? */
 				if (fsmrel->nextPhysical != NULL)
 					limitChunkIndex = fsmrel->nextPhysical->firstChunk;
 				else
 					limitChunkIndex = FreeSpaceMap->totalChunks;
 				if (newChunkIndex + curChunks > limitChunkIndex)
-					elog(PANIC, "insufficient room in FSM");
+				{
+					/* uh-oh, forcibly cut the allocation to fit */
+					newAlloc = limitChunkIndex - newChunkIndex;
+					/*
+					 * If newAlloc < 0 at this point, we are moving the rel's
+					 * firstChunk into territory currently assigned to a later
+					 * rel.  This is okay so long as we do not copy any data.
+					 * The rels will be back in nondecreasing firstChunk order
+					 * at completion of the compaction pass.
+					 */
+					if (newAlloc < 0)
+						newAlloc = 0;
+					if (fsmrel->isIndex)
+						newAllocPages = newAlloc * INDEXCHUNKPAGES;
+					else
+						newAllocPages = newAlloc * CHUNKPAGES;
+					fsmrel->storedPages = newAllocPages;
+					curChunks = fsm_current_chunks(fsmrel);
+				}
 			}
 			memmove(newLocation, oldLocation, curChunks * CHUNKBYTES);
 		}
@@ -1504,6 +1533,7 @@ compact_fsm_storage(void)
 			memmove(newLocation, oldLocation, curChunks * CHUNKBYTES);
 		}
 		fsmrel->firstChunk = newChunkIndex;
+		nextChunkIndex += newAlloc;
 	}
 	Assert(nextChunkIndex <= FreeSpaceMap->totalChunks);
 	FreeSpaceMap->usedChunks = nextChunkIndex;
@@ -1544,8 +1574,8 @@ push_fsm_rels_after(FSMRelation *afterRel)
 		oldChunkIndex = fsmrel->firstChunk;
 		if (newChunkIndex < oldChunkIndex)
 		{
-			/* trouble... */
-			elog(PANIC, "insufficient room in FSM");
+			/* we're pushing down, how can it move up? */
+			elog(PANIC, "inconsistent entry sizes in FSM");
 		}
 		else if (newChunkIndex > oldChunkIndex)
 		{
@@ -1758,14 +1788,14 @@ fsm_current_chunks(FSMRelation *fsmrel)
 {
 	int			chunkCount;
 
+	/* Make sure storedPages==0 produces right answer */
+	if (fsmrel->storedPages <= 0)
+		return 0;
 	/* Convert page count to chunk count */
 	if (fsmrel->isIndex)
 		chunkCount = (fsmrel->storedPages - 1) / INDEXCHUNKPAGES + 1;
 	else
 		chunkCount = (fsmrel->storedPages - 1) / CHUNKPAGES + 1;
-	/* Make sure storedPages==0 produces right answer */
-	if (chunkCount < 0)
-		chunkCount = 0;
 	return chunkCount;
 }