From 9400c5c2f0ac5543b063782cf47b5ba6c7cc9dda Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunklau@aiven.io>
Date: Thu, 7 Mar 2024 09:40:17 +0100
Subject: [PATCH v2] Detect FSM corruption and repair it.

After WAL-replay, we can end up with a corrupted FSM
if it has been WAL-logged while the newly extended pages it
points to were not.

Detect this case by comparing the block found by the FSM to the number
of blocks in the relation. For that case, we accept reading a stale
value from the cache even when not in recovery, as worst case scenario
we will declare an entry invalid when there has been an extension going
on. Truncation on the other end will update the FSM so we don't have to
worry about those.
---
 src/backend/storage/freespace/README      |  6 ++
 src/backend/storage/freespace/freespace.c | 83 ++++++++++++++++++++---
 2 files changed, 81 insertions(+), 8 deletions(-)

diff --git a/src/backend/storage/freespace/README b/src/backend/storage/freespace/README
index e7ff23b76f7..10b983c9319 100644
--- a/src/backend/storage/freespace/README
+++ b/src/backend/storage/freespace/README
@@ -188,6 +188,12 @@ goes through fsm_set_avail(), so that the upper nodes on those pages are
 immediately updated.  Periodically, VACUUM calls FreeSpaceMapVacuum[Range]
 to propagate the new free-space info into the upper pages of the FSM tree.
 
+Since we can end up in situations where the FSM is WAL-logged but the empty
+pages it points to are not, we can end up in the situation where the FSM is
+pointing to a block that does not exist on disk after WAL replay. We detect
+this case by comparing against the actual relation size, and mark the block
+as full in that case.
+
 TODO
 ----
 
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index fb9440ff72f..ace477d1fde 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -112,6 +112,7 @@ static BlockNumber fsm_search(Relation rel, uint8 min_cat);
 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
 							 BlockNumber start, BlockNumber end,
 							 bool *eof_p);
+static bool fsm_does_block_exist(Relation rel, BlockNumber blknumber);
 
 
 /******** Public API ********/
@@ -128,6 +129,9 @@ static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
  * amount of free space available on that page and then try again (see
  * RecordAndGetPageWithFreeSpace).  If InvalidBlockNumber is returned,
  * extend the relation.
+ *
+ * This can trigger FSM updates if any FSM entry is found to point to
+ * a non-existing block past the end of the relation.
  */
 BlockNumber
 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
@@ -145,6 +149,9 @@ GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
  * also some effort to return a page close to the old page; if there's a
  * page with enough free space on the same FSM page where the old one page
  * is located, it is preferred.
+ *
+ * Additionally, we check if the block number is past the known number of
+ * blocks in the relation, and fall back to a regular search if it does.
  */
 BlockNumber
 RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
@@ -166,9 +173,16 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
 	 * Otherwise, search as usual.
 	 */
 	if (search_slot != -1)
-		return fsm_get_heap_blk(addr, search_slot);
-	else
-		return fsm_search(rel, search_cat);
+	{
+		BlockNumber blknum = fsm_get_heap_blk(addr, search_slot);
+		/*
+		 * Check that the blknum is actually in the relation.
+		 * Don't try to update the FSM in that case, just fall back to the other case
+		 */
+		if (fsm_does_block_exist(rel, blknum))
+				return blknum;
+	}
+	return fsm_search(rel, search_cat);
 }
 
 /*
@@ -680,9 +694,13 @@ fsm_search(Relation rel, uint8 min_cat)
 			slot = fsm_search_avail(buf, min_cat,
 									(addr.level == FSM_BOTTOM_LEVEL),
 									false);
-			if (slot == -1)
+			if (slot == -1) {
 				max_avail = fsm_get_max_avail(BufferGetPage(buf));
-			UnlockReleaseBuffer(buf);
+				UnlockReleaseBuffer(buf);
+			} else {
+				/* Keep the pin for possible update below */
+				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+			}
 		}
 		else
 			slot = -1;
@@ -693,9 +711,29 @@ fsm_search(Relation rel, uint8 min_cat)
 			 * Descend the tree, or return the found block if we're at the
 			 * bottom.
 			 */
-			if (addr.level == FSM_BOTTOM_LEVEL)
-				return fsm_get_heap_blk(addr, slot);
-
+			if (addr.level == FSM_BOTTOM_LEVEL) {
+				BlockNumber blkno = fsm_get_heap_blk(addr, slot);
+				Page page;
+				/*
+				 * If the buffer is past the end of the relation,
+				 * update the page and restarts from the root
+				 */
+				if (fsm_does_block_exist(rel, blkno))
+				{
+					ReleaseBuffer(buf);
+					return blkno;
+				}
+				page = BufferGetPage(buf);
+				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+				fsm_set_avail(page, slot, 0);
+				MarkBufferDirtyHint(buf, false);
+				UnlockReleaseBuffer(buf);
+				if (restarts++ > 10000)
+					return InvalidBlockNumber;
+				addr = FSM_ROOT_ADDRESS;
+			} else {
+				ReleaseBuffer(buf);
+			}
 			addr = fsm_get_child(addr, slot);
 		}
 		else if (addr.level == FSM_ROOT_LEVEL)
@@ -863,3 +901,32 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 
 	return max_avail;
 }
+
+
+/*
+ * Check whether a block number is past the end of the relation.
+ * This can happen after WAL replay, if the FSM was WAL-logged but
+ * newly extended pages it referes to were not.
+ *
+ * Detect this corruption, and fix it.
+ */
+static bool
+fsm_does_block_exist(Relation rel, BlockNumber blknumber)
+{
+	SMgrRelation smgr = RelationGetSmgr(rel);
+	BlockNumber nblocks;
+	/*
+	 * Compare the blkno with the last cached value of the relsize,
+	 * to detect references to unlogged blocks.
+	 * Stale values are not a problem, as if it has changed since then either
+	 * the relation was truncated, in which case the FSM was updated, or the relation
+	 * was extended and we may miss a new block which is not a big problem
+	 */
+	if (smgr->smgr_cached_nblocks[MAIN_FORKNUM] != InvalidBlockNumber)
+	{
+		nblocks = smgr->smgr_cached_nblocks[MAIN_FORKNUM];
+	} else {
+		nblocks = RelationGetNumberOfBlocks(rel);
+	}
+	return blknumber < nblocks;
+}
-- 
2.44.0

