From 94f7f2fe6b6b7fdcbe4d60a157d87e607ef70dc0 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 19 Mar 2024 18:50:33 -0400
Subject: [PATCH v4 14/19] Vacuum second pass emits XLOG_HEAP2_PRUNE record

Remove the XLOG_HEAP2_VACUUM record and update vacuum's second pass to
emit a XLOG_HEAP2_PRUNE record. This temporarily wastes some space but a
future commit will streamline xl_heap_prune and ensure that no unused
members are included in the WAL record.
---
 src/backend/access/heap/heapam.c         | 94 ++++--------------------
 src/backend/access/heap/pruneheap.c      | 67 ++++++++++-------
 src/backend/access/heap/vacuumlazy.c     | 12 ++-
 src/backend/access/rmgrdesc/heapdesc.c   | 20 -----
 src/backend/replication/logical/decode.c |  1 -
 src/include/access/heapam.h              |  2 +-
 src/include/access/heapam_xlog.h         | 33 +++++----
 7 files changed, 85 insertions(+), 144 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 532868039d5..16bab55ba02 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8717,23 +8717,34 @@ heap_xlog_prune(XLogReaderState *record)
 	BlockNumber blkno;
 	XLogRedoAction action;
 	bool		get_cleanup_lock;
+	bool		lp_truncate_only;
 
 	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
 
+	lp_truncate_only = xlrec->flags & XLHP_LP_TRUNCATE_ONLY;
+
 	/*
 	 * If there are dead, redirected, or unused items set unused by
 	 * heap_page_prune_and_freeze(), heap_page_prune_execute() will call
 	 * PageRepairFragementation() which expects a full cleanup lock.
 	 */
 	get_cleanup_lock = xlrec->nredirected > 0 ||
-		xlrec->ndead > 0 || xlrec->nunused > 0;
+		xlrec->ndead > 0 ||
+		(xlrec->nunused > 0 && !lp_truncate_only);
+
+	if (lp_truncate_only)
+	{
+		Assert(xlrec->nredirected == 0);
+		Assert(xlrec->ndead == 0);
+		Assert(xlrec->nunused > 0);
+	}
 
 	/*
 	 * We are either about to remove tuples or freeze them. In Hot Standby
 	 * mode, ensure that there's no queries running for which any removed
 	 * tuples are still visible or which consider the frozen xids as running.
 	 */
-	if (InHotStandby)
+	if (xlrec->flags & XLHP_HAS_CONFLICT_HORIZON && InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
 											xlrec->isCatalogRel,
 											rlocator);
@@ -8772,7 +8783,7 @@ heap_xlog_prune(XLogReaderState *record)
 
 		/* Update all line pointers per the record, and repair fragmentation */
 		if (nredirected > 0 || ndead > 0 || nunused > 0)
-			heap_page_prune_execute(buffer,
+			heap_page_prune_execute(buffer, lp_truncate_only,
 									redirected, nredirected,
 									nowdead, ndead,
 									nowunused, nunused);
@@ -8819,7 +8830,7 @@ heap_xlog_prune(XLogReaderState *record)
 		UnlockReleaseBuffer(buffer);
 
 		/*
-		 * After pruning records from a page, it's useful to update the FSM
+		 * After modifying records on a page, it's useful to update the FSM
 		 * about it, as it may cause the page become target for insertions
 		 * later even if vacuum decides not to visit it (which is possible if
 		 * gets marked all-visible.)
@@ -8831,78 +8842,6 @@ heap_xlog_prune(XLogReaderState *record)
 	}
 }
 
-/*
- * Handles XLOG_HEAP2_VACUUM record type.
- *
- * Acquires an ordinary exclusive lock only.
- */
-static void
-heap_xlog_vacuum(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_vacuum *xlrec = (xl_heap_vacuum *) XLogRecGetData(record);
-	Buffer		buffer;
-	BlockNumber blkno;
-	XLogRedoAction action;
-
-	/*
-	 * If we have a full-page image, restore it	(without using a cleanup lock)
-	 * and we're done.
-	 */
-	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, false,
-										   &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *nowunused;
-		Size		datalen;
-		OffsetNumber *offnum;
-
-		nowunused = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
-
-		/* Shouldn't be a record unless there's something to do */
-		Assert(xlrec->nunused > 0);
-
-		/* Update all now-unused line pointers */
-		offnum = nowunused;
-		for (int i = 0; i < xlrec->nunused; i++)
-		{
-			OffsetNumber off = *offnum++;
-			ItemId		lp = PageGetItemId(page, off);
-
-			Assert(ItemIdIsDead(lp) && !ItemIdHasStorage(lp));
-			ItemIdSetUnused(lp);
-		}
-
-		/* Attempt to truncate line pointer array now */
-		PageTruncateLinePointerArray(page);
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-
-	if (BufferIsValid(buffer))
-	{
-		Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-		RelFileLocator rlocator;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-
-		UnlockReleaseBuffer(buffer);
-
-		/*
-		 * After vacuuming LP_DEAD items from a page, it's useful to update
-		 * the FSM about it, as it may cause the page become target for
-		 * insertions later even if vacuum decides not to visit it (which is
-		 * possible if gets marked all-visible.)
-		 *
-		 * Do this regardless of a full-page image being applied, since the
-		 * FSM data is not in the page anyway.
-		 */
-		XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-	}
-}
-
 /*
  * Replay XLOG_HEAP2_VISIBLE record.
  *
@@ -9943,9 +9882,6 @@ heap2_redo(XLogReaderState *record)
 		case XLOG_HEAP2_PRUNE:
 			heap_xlog_prune(record);
 			break;
-		case XLOG_HEAP2_VACUUM:
-			heap_xlog_vacuum(record);
-			break;
 		case XLOG_HEAP2_VISIBLE:
 			heap_xlog_visible(record);
 			break;
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 19b50931b90..135fe2dba3e 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -601,7 +601,7 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		 */
 		if (do_prune)
 		{
-			heap_page_prune_execute(buffer,
+			heap_page_prune_execute(buffer, false,
 									prstate.redirected, prstate.nredirected,
 									prstate.nowdead, prstate.ndead,
 									prstate.nowunused, prstate.nunused);
@@ -668,12 +668,16 @@ log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 	OffsetNumber offsets[MaxHeapTuplesPerPage];
 	bool		do_freeze = presult->nfrozen > 0;
 
+	xlrec.flags = 0;
+
 	xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
 	xlrec.nredirected = prstate->nredirected;
 	xlrec.ndead = prstate->ndead;
 	xlrec.nunused = prstate->nunused;
 	xlrec.nplans = 0;
 
+	xlrec.flags |= XLHP_HAS_CONFLICT_HORIZON;
+
 	/*
 	 * The snapshotConflictHorizon for the whole record should be the most
 	 * conservative of all the horizons calculated for any of the possible
@@ -1149,7 +1153,7 @@ heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
  * cleanup lock on the buffer.
  */
 void
-heap_page_prune_execute(Buffer buffer,
+heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
 						OffsetNumber *nowunused, int nunused)
@@ -1171,6 +1175,7 @@ heap_page_prune_execute(Buffer buffer,
 		ItemId		tolp PG_USED_FOR_ASSERTS_ONLY;
 
 #ifdef USE_ASSERT_CHECKING
+		Assert(!lp_truncate_only);
 
 		/*
 		 * Any existing item that we set as an LP_REDIRECT (any 'from' item)
@@ -1226,6 +1231,7 @@ heap_page_prune_execute(Buffer buffer,
 		ItemId		lp = PageGetItemId(page, off);
 
 #ifdef USE_ASSERT_CHECKING
+		Assert(!lp_truncate_only);
 
 		/*
 		 * An LP_DEAD line pointer must be left behind when the original item
@@ -1259,23 +1265,29 @@ heap_page_prune_execute(Buffer buffer,
 
 #ifdef USE_ASSERT_CHECKING
 
-		/*
-		 * When heap_page_prune_and_freeze() was called, mark_unused_now may
-		 * have been passed as true, which allows would-be LP_DEAD items to be
-		 * made LP_UNUSED instead. This is only possible if the relation has
-		 * no indexes. If there are any dead items, then mark_unused_now was
-		 * not true and every item being marked LP_UNUSED must refer to a
-		 * heap-only tuple.
-		 */
-		if (ndead > 0)
+		if (lp_truncate_only)
 		{
-			Assert(ItemIdHasStorage(lp) && ItemIdIsNormal(lp));
-			htup = (HeapTupleHeader) PageGetItem(page, lp);
-			Assert(HeapTupleHeaderIsHeapOnly(htup));
+			/* Setting LP_DEAD to LP_UNUSED in vacuum's second pass */
+			Assert(ItemIdIsDead(lp) && !ItemIdHasStorage(lp));
 		}
 		else
 		{
-			Assert(ItemIdIsUsed(lp));
+			/*
+			 * When heap_page_prune_and_freeze() was called, mark_unused_now
+			 * may have been passed as true, which allows would-be LP_DEAD
+			 * items to be made LP_UNUSED instead. This is only possible if
+			 * the relation has no indexes. If there are any dead items, then
+			 * mark_unused_now was not true and every item being marked
+			 * LP_UNUSED must refer to a heap-only tuple.
+			 */
+			if (ndead > 0)
+			{
+				Assert(ItemIdHasStorage(lp) && ItemIdIsNormal(lp));
+				htup = (HeapTupleHeader) PageGetItem(page, lp);
+				Assert(HeapTupleHeaderIsHeapOnly(htup));
+			}
+			else
+				Assert(ItemIdIsUsed(lp));
 		}
 
 #endif
@@ -1283,17 +1295,22 @@ heap_page_prune_execute(Buffer buffer,
 		ItemIdSetUnused(lp);
 	}
 
-	/*
-	 * Finally, repair any fragmentation, and update the page's hint bit about
-	 * whether it has free pointers.
-	 */
-	PageRepairFragmentation(page);
+	if (lp_truncate_only)
+		PageTruncateLinePointerArray(page);
+	else
+	{
+		/*
+		 * Finally, repair any fragmentation, and update the page's hint bit
+		 * about whether it has free pointers.
+		 */
+		PageRepairFragmentation(page);
 
-	/*
-	 * Now that the page has been modified, assert that redirect items still
-	 * point to valid targets.
-	 */
-	page_verify_redirects(page);
+		/*
+		 * Now that the page has been modified, assert that redirect items
+		 * still point to valid targets.
+		 */
+		page_verify_redirects(page);
+	}
 }
 
 
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index c4553a4159c..9dfb56475cf 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2394,18 +2394,24 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 	/* XLOG stuff */
 	if (RelationNeedsWAL(vacrel->rel))
 	{
-		xl_heap_vacuum xlrec;
+		xl_heap_prune xlrec;
 		XLogRecPtr	recptr;
 
+		xlrec.flags = XLHP_LP_TRUNCATE_ONLY;
+		xlrec.snapshotConflictHorizon = InvalidTransactionId;
+		xlrec.nplans = 0;
+		xlrec.nredirected = 0;
+		xlrec.ndead = 0;
 		xlrec.nunused = nunused;
+		xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(vacrel->rel);
 
 		XLogBeginInsert();
-		XLogRegisterData((char *) &xlrec, SizeOfHeapVacuum);
+		XLogRegisterData((char *) &xlrec, SizeOfHeapPrune);
 
 		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 		XLogRegisterBufData(0, (char *) unused, nunused * sizeof(OffsetNumber));
 
-		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VACUUM);
+		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
 
 		PageSetLSN(page, recptr);
 	}
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index 9f0a0341d40..ea03f902fc4 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -242,23 +242,6 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 			}
 		}
 	}
-	else if (info == XLOG_HEAP2_VACUUM)
-	{
-		xl_heap_vacuum *xlrec = (xl_heap_vacuum *) rec;
-
-		appendStringInfo(buf, "nunused: %u", xlrec->nunused);
-
-		if (XLogRecHasBlockData(record, 0))
-		{
-			OffsetNumber *nowunused;
-
-			nowunused = (OffsetNumber *) XLogRecGetBlockData(record, 0, NULL);
-
-			appendStringInfoString(buf, ", unused:");
-			array_desc(buf, nowunused, sizeof(OffsetNumber), xlrec->nunused,
-					   &offset_elem_desc, NULL);
-		}
-	}
 	else if (info == XLOG_HEAP2_VISIBLE)
 	{
 		xl_heap_visible *xlrec = (xl_heap_visible *) rec;
@@ -360,9 +343,6 @@ heap2_identify(uint8 info)
 		case XLOG_HEAP2_PRUNE:
 			id = "PRUNE";
 			break;
-		case XLOG_HEAP2_VACUUM:
-			id = "VACUUM";
-			break;
 		case XLOG_HEAP2_VISIBLE:
 			id = "VISIBLE";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index f77051572fd..38d1bdd825e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -446,7 +446,6 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * interested in.
 			 */
 		case XLOG_HEAP2_PRUNE:
-		case XLOG_HEAP2_VACUUM:
 		case XLOG_HEAP2_VISIBLE:
 		case XLOG_HEAP2_LOCK_UPDATED:
 			break;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 321a46185e1..d5cb8f99cac 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -349,7 +349,7 @@ extern void heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 									   HeapPageFreeze *pagefrz,
 									   PruneFreezeResult *presult,
 									   OffsetNumber *off_loc);
-extern void heap_page_prune_execute(Buffer buffer,
+extern void heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 									OffsetNumber *redirected, int nredirected,
 									OffsetNumber *nowdead, int ndead,
 									OffsetNumber *nowunused, int nunused);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index fe4a8ff0620..2393540cf68 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -52,11 +52,10 @@
  */
 #define XLOG_HEAP2_REWRITE		0x00
 #define XLOG_HEAP2_PRUNE		0x10
-#define XLOG_HEAP2_VACUUM		0x20
-#define XLOG_HEAP2_VISIBLE		0x30
-#define XLOG_HEAP2_MULTI_INSERT 0x40
-#define XLOG_HEAP2_LOCK_UPDATED 0x50
-#define XLOG_HEAP2_NEW_CID		0x60
+#define XLOG_HEAP2_VISIBLE		0x20
+#define XLOG_HEAP2_MULTI_INSERT 0x30
+#define XLOG_HEAP2_LOCK_UPDATED 0x40
+#define XLOG_HEAP2_NEW_CID		0x50
 
 /*
  * xl_heap_insert/xl_heap_multi_insert flag values, 8 bits are available.
@@ -266,6 +265,7 @@ typedef struct xl_heap_freeze_plan
  */
 typedef struct xl_heap_prune
 {
+	uint8		flags;
 	TransactionId snapshotConflictHorizon;
 	uint16		nplans;
 	uint16		nredirected;
@@ -288,19 +288,22 @@ typedef struct xl_heap_prune
 
 #define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
 
+/* Flags for xl_heap_prune */
+
 /*
- * The vacuum page record is similar to the prune record, but can only mark
- * already LP_DEAD items LP_UNUSED (during VACUUM's second heap pass)
- *
- * Acquires an ordinary exclusive lock only.
+ * During vacuum's second pass which sets LP_DEAD items LP_UNUSED, we will only
+ * truncate the line pointer array, not call PageRepairFragmentation. We need
+ * this flag to differentiate what kind of lock (exclusive or cleanup) to take
+ * on the buffer and whether to call PageTruncateLinePointerArray() or
+ * PageRepairFragementation().
  */
-typedef struct xl_heap_vacuum
-{
-	uint16		nunused;
-	/* OFFSET NUMBERS are in the block reference 0 */
-} xl_heap_vacuum;
+#define		XLHP_LP_TRUNCATE_ONLY       (1 << 1)
 
-#define SizeOfHeapVacuum (offsetof(xl_heap_vacuum, nunused) + sizeof(uint16))
+/*
+ * Vacuum's first pass and on-access pruning may need to include a snapshot
+ * conflict horizon.
+ */
+#define		XLHP_HAS_CONFLICT_HORIZON   (1 << 2)
 
 /* flags for infobits_set */
 #define XLHL_XMAX_IS_MULTI		0x01
-- 
2.40.1

