From 4476960e63b19f6b191dc5a832c8bfc69703864c Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunklau@aiven.io>
Date: Thu, 7 Mar 2024 09:40:17 +0100
Subject: [PATCH v3] Detect FSM corruption and repair it.

After WAL-replay, we can end up with a corrupted FSM
if it has been WAL-logged while the newly extended pages it
points to were not.

Detect this case by comparing the block found by the FSM to the number
of blocks in the relation. For that case, we accept reading a stale
value from the cache even when not in recovery, as worst case scenario
we will declare an entry invalid when there has been an extension going
on. Truncation on the other end will update the FSM so we don't have to
worry about those.
---
 src/backend/storage/freespace/README      |  6 ++
 src/backend/storage/freespace/freespace.c | 78 ++++++++++++++++++++---
 2 files changed, 76 insertions(+), 8 deletions(-)

diff --git a/src/backend/storage/freespace/README b/src/backend/storage/freespace/README
index e7ff23b76f7..10b983c9319 100644
--- a/src/backend/storage/freespace/README
+++ b/src/backend/storage/freespace/README
@@ -188,6 +188,12 @@ goes through fsm_set_avail(), so that the upper nodes on those pages are
 immediately updated.  Periodically, VACUUM calls FreeSpaceMapVacuum[Range]
 to propagate the new free-space info into the upper pages of the FSM tree.
 
+Since we can end up in situations where the FSM is WAL-logged but the empty
+pages it points to are not, we can end up in the situation where the FSM is
+pointing to a block that does not exist on disk after WAL replay. We detect
+this case by comparing against the actual relation size, and mark the block
+as full in that case.
+
 TODO
 ----
 
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index bcdb1821938..07864492f64 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -112,6 +112,7 @@ static BlockNumber fsm_search(Relation rel, uint8 min_cat);
 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
 							 BlockNumber start, BlockNumber end,
 							 bool *eof_p);
+static bool fsm_does_block_exist(Relation rel, BlockNumber blknumber);
 
 
 /******** Public API ********/
@@ -128,6 +129,9 @@ static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
  * amount of free space available on that page and then try again (see
  * RecordAndGetPageWithFreeSpace).  If InvalidBlockNumber is returned,
  * extend the relation.
+ *
+ * This can trigger FSM updates if any FSM entry is found to point to
+ * a non-existing block past the end of the relation.
  */
 BlockNumber
 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
@@ -145,6 +149,9 @@ GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
  * also some effort to return a page close to the old page; if there's a
  * page with enough free space on the same FSM page where the old one page
  * is located, it is preferred.
+ *
+ * Additionally, we check if the block number is past the known number of
+ * blocks in the relation, and fall back to a regular search if it does.
  */
 BlockNumber
 RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
@@ -166,9 +173,16 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
 	 * Otherwise, search as usual.
 	 */
 	if (search_slot != -1)
-		return fsm_get_heap_blk(addr, search_slot);
-	else
-		return fsm_search(rel, search_cat);
+	{
+		BlockNumber blknum = fsm_get_heap_blk(addr, search_slot);
+		/*
+		 * Check that the blknum is actually in the relation.
+		 * Don't try to update the FSM in that case, just fall back to the other case
+		 */
+		if (fsm_does_block_exist(rel, blknum))
+				return blknum;
+	}
+	return fsm_search(rel, search_cat);
 }
 
 /*
@@ -673,9 +687,13 @@ fsm_search(Relation rel, uint8 min_cat)
 			slot = fsm_search_avail(buf, min_cat,
 									(addr.level == FSM_BOTTOM_LEVEL),
 									false);
-			if (slot == -1)
+			if (slot == -1) {
 				max_avail = fsm_get_max_avail(BufferGetPage(buf));
-			UnlockReleaseBuffer(buf);
+				UnlockReleaseBuffer(buf);
+			} else {
+				/* Keep the pin for possible update below */
+				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+			}
 		}
 		else
 			slot = -1;
@@ -686,9 +704,29 @@ fsm_search(Relation rel, uint8 min_cat)
 			 * Descend the tree, or return the found block if we're at the
 			 * bottom.
 			 */
-			if (addr.level == FSM_BOTTOM_LEVEL)
-				return fsm_get_heap_blk(addr, slot);
-
+			if (addr.level == FSM_BOTTOM_LEVEL) {
+				BlockNumber blkno = fsm_get_heap_blk(addr, slot);
+				Page page;
+				/*
+				 * If the buffer is past the end of the relation,
+				 * update the page and restarts from the root
+				 */
+				if (fsm_does_block_exist(rel, blkno))
+				{
+					ReleaseBuffer(buf);
+					return blkno;
+				}
+				page = BufferGetPage(buf);
+				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+				fsm_set_avail(page, slot, 0);
+				MarkBufferDirtyHint(buf, false);
+				UnlockReleaseBuffer(buf);
+				if (restarts++ > 10000)
+					return InvalidBlockNumber;
+				addr = FSM_ROOT_ADDRESS;
+			} else {
+				ReleaseBuffer(buf);
+			}
 			addr = fsm_get_child(addr, slot);
 		}
 		else if (addr.level == FSM_ROOT_LEVEL)
@@ -856,3 +894,27 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 
 	return max_avail;
 }
+
+
+/*
+ * Check whether a block number is past the end of the relation.
+ * This can happen after WAL replay, if the FSM was WAL-logged but
+ * newly extended pages it referes to were not.
+ *
+ * Detect this corruption, and fix it.
+ */
+static bool
+fsm_does_block_exist(Relation rel, BlockNumber blknumber)
+{
+	SMgrRelation smgr = RelationGetSmgr(rel);
+	/*
+	 * Compare the blkno with the last cached value of the relsize,
+	 * to detect references to unlogged blocks.
+	 *
+	 * If the cached value tells us the block is possibly non-existing,
+	 * request a fresh value.
+	 */
+	return((BlockNumberIsValid(smgr->smgr_cached_nblocks[MAIN_FORKNUM]) &&
+			blknumber < smgr->smgr_cached_nblocks[MAIN_FORKNUM]) ||
+			blknumber < RelationGetNumberOfBlocks(rel));
+}
-- 
2.44.0

