From b13f4d1d5fb4e8fcb3f97fe1f0043fdfaf319b4c Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sun, 7 Jan 2024 17:55:31 -0500
Subject: [PATCH v4 13/19] Merge prune and freeze records

Eliminate xl_heap_freeze and XLOG_HEAP2_FREEZE record. When vacuum
freezes tuples, the information needed to replay those changes is
now recorded in the xl_heap_prune record.

When both pruning and freezing is done, this means a single, combined
WAL record is emitted for both operations. This will reduce the number
of WAL records emitted.

When there are only tuples to freeze present, we can avoid taking a full
cleanup lock when replaying the record.

The XLOG_HEAP2_PRUNE record is now bigger than it was previously and
bigger than the XLOG_HEAP2_FREEZE record. A future commit will
streamline the record.
---
 src/backend/access/heap/heapam.c         | 146 ++++------
 src/backend/access/heap/pruneheap.c      | 326 ++++++++++++-----------
 src/backend/access/rmgrdesc/heapdesc.c   |  95 ++++---
 src/backend/replication/logical/decode.c |   1 -
 src/include/access/heapam_xlog.h         |  97 ++++---
 5 files changed, 318 insertions(+), 347 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e47b56e7856..532868039d5 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8706,8 +8706,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 
 /*
  * Handles XLOG_HEAP2_PRUNE record type.
- *
- * Acquires a full cleanup lock.
  */
 static void
 heap_xlog_prune(XLogReaderState *record)
@@ -8718,12 +8716,22 @@ heap_xlog_prune(XLogReaderState *record)
 	RelFileLocator rlocator;
 	BlockNumber blkno;
 	XLogRedoAction action;
+	bool		get_cleanup_lock;
 
 	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
 
 	/*
-	 * We're about to remove tuples. In Hot Standby mode, ensure that there's
-	 * no queries running for which the removed tuples are still visible.
+	 * If there are dead, redirected, or unused items set unused by
+	 * heap_page_prune_and_freeze(), heap_page_prune_execute() will call
+	 * PageRepairFragementation() which expects a full cleanup lock.
+	 */
+	get_cleanup_lock = xlrec->nredirected > 0 ||
+		xlrec->ndead > 0 || xlrec->nunused > 0;
+
+	/*
+	 * We are either about to remove tuples or freeze them. In Hot Standby
+	 * mode, ensure that there's no queries running for which any removed
+	 * tuples are still visible or which consider the frozen xids as running.
 	 */
 	if (InHotStandby)
 		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
@@ -8731,38 +8739,69 @@ heap_xlog_prune(XLogReaderState *record)
 											rlocator);
 
 	/*
-	 * If we have a full-page image, restore it (using a cleanup lock) and
-	 * we're done.
+	 * If we have a full-page image, restore it and we're done.
 	 */
-	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true,
-										   &buffer);
+	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+										   get_cleanup_lock, &buffer);
+
 	if (action == BLK_NEEDS_REDO)
 	{
 		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *end;
 		OffsetNumber *redirected;
 		OffsetNumber *nowdead;
 		OffsetNumber *nowunused;
 		int			nredirected;
 		int			ndead;
 		int			nunused;
+		int			nplans;
 		Size		datalen;
+		xl_heap_freeze_plan *plans;
+		OffsetNumber *frz_offsets;
+		int			curoff = 0;
 
-		redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
-
+		nplans = xlrec->nplans;
 		nredirected = xlrec->nredirected;
 		ndead = xlrec->ndead;
-		end = (OffsetNumber *) ((char *) redirected + datalen);
+		nunused = xlrec->nunused;
+
+		plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, &datalen);
+		redirected = (OffsetNumber *) &plans[nplans];
 		nowdead = redirected + (nredirected * 2);
 		nowunused = nowdead + ndead;
-		nunused = (end - nowunused);
-		Assert(nunused >= 0);
+		frz_offsets = nowunused + nunused;
 
 		/* Update all line pointers per the record, and repair fragmentation */
-		heap_page_prune_execute(buffer,
-								redirected, nredirected,
-								nowdead, ndead,
-								nowunused, nunused);
+		if (nredirected > 0 || ndead > 0 || nunused > 0)
+			heap_page_prune_execute(buffer,
+									redirected, nredirected,
+									nowdead, ndead,
+									nowunused, nunused);
+
+		for (int p = 0; p < nplans; p++)
+		{
+			HeapTupleFreeze frz;
+
+			/*
+			 * Convert freeze plan representation from WAL record into
+			 * per-tuple format used by heap_execute_freeze_tuple
+			 */
+			frz.xmax = plans[p].xmax;
+			frz.t_infomask2 = plans[p].t_infomask2;
+			frz.t_infomask = plans[p].t_infomask;
+			frz.frzflags = plans[p].frzflags;
+			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
+
+			for (int i = 0; i < plans[p].ntuples; i++)
+			{
+				OffsetNumber offset = frz_offsets[curoff++];
+				ItemId		lp;
+				HeapTupleHeader tuple;
+
+				lp = PageGetItemId(page, offset);
+				tuple = (HeapTupleHeader) PageGetItem(page, lp);
+				heap_execute_freeze_tuple(tuple, &frz);
+			}
+		}
 
 		/*
 		 * Note: we don't worry about updating the page's prunability hints.
@@ -9001,74 +9040,6 @@ heap_xlog_visible(XLogReaderState *record)
 		UnlockReleaseBuffer(vmbuffer);
 }
 
-/*
- * Replay XLOG_HEAP2_FREEZE_PAGE records
- */
-static void
-heap_xlog_freeze_page(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record);
-	Buffer		buffer;
-
-	/*
-	 * In Hot Standby mode, ensure that there's no queries running which still
-	 * consider the frozen xids as running.
-	 */
-	if (InHotStandby)
-	{
-		RelFileLocator rlocator;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
-											xlrec->isCatalogRel,
-											rlocator);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		Page		page = BufferGetPage(buffer);
-		xl_heap_freeze_plan *plans;
-		OffsetNumber *offsets;
-		int			curoff = 0;
-
-		plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL);
-		offsets = (OffsetNumber *) ((char *) plans +
-									(xlrec->nplans *
-									 sizeof(xl_heap_freeze_plan)));
-		for (int p = 0; p < xlrec->nplans; p++)
-		{
-			HeapTupleFreeze frz;
-
-			/*
-			 * Convert freeze plan representation from WAL record into
-			 * per-tuple format used by heap_execute_freeze_tuple
-			 */
-			frz.xmax = plans[p].xmax;
-			frz.t_infomask2 = plans[p].t_infomask2;
-			frz.t_infomask = plans[p].t_infomask;
-			frz.frzflags = plans[p].frzflags;
-			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
-
-			for (int i = 0; i < plans[p].ntuples; i++)
-			{
-				OffsetNumber offset = offsets[curoff++];
-				ItemId		lp;
-				HeapTupleHeader tuple;
-
-				lp = PageGetItemId(page, offset);
-				tuple = (HeapTupleHeader) PageGetItem(page, lp);
-				heap_execute_freeze_tuple(tuple, &frz);
-			}
-		}
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
 /*
  * Given an "infobits" field from an XLog record, set the correct bits in the
  * given infomask and infomask2 for the tuple touched by the record.
@@ -9975,9 +9946,6 @@ heap2_redo(XLogReaderState *record)
 		case XLOG_HEAP2_VACUUM:
 			heap_xlog_vacuum(record);
 			break;
-		case XLOG_HEAP2_FREEZE_PAGE:
-			heap_xlog_freeze_page(record);
-			break;
 		case XLOG_HEAP2_VISIBLE:
 			heap_xlog_visible(record);
 			break;
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 7bd479cfd4e..19b50931b90 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -79,6 +79,9 @@ static void heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber o
 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
 static void page_verify_redirects(Page page);
 
+static void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
+									  PruneState *prstate, PruneFreezeResult *presult);
+
 
 /*
  * Optionally prune and repair fragmentation in the specified page.
@@ -247,9 +250,9 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	HeapTupleData tup;
 	bool		do_freeze;
 	bool		do_prune;
+	bool		do_hint;
 	bool		whole_page_freezable;
 	bool		hint_bit_fpi;
-	bool		prune_fpi = false;
 	int64		fpi_before = pgWalUsage.wal_fpi;
 
 	/*
@@ -445,10 +448,9 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 
 	/*
 	 * If checksums are enabled, heap_prune_satisfies_vacuum() may have caused
-	 * an FPI to be emitted. Then reset fpi_before for no prune case.
+	 * an FPI to be emitted.
 	 */
 	hint_bit_fpi = fpi_before != pgWalUsage.wal_fpi;
-	fpi_before = pgWalUsage.wal_fpi;
 
 	/*
 	 * For vacuum, if the whole page will become frozen, we consider
@@ -498,14 +500,18 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		prstate.ndead > 0 ||
 		prstate.nunused > 0;
 
+	/* Record number of newly-set-LP_DEAD items for caller */
+	presult->nnewlpdead = prstate.ndead;
+
 	/*
-	 * Only incur overhead of checking if we will do an FPI if we might use
-	 * the information.
+	 * Even if we don't prune anything, if we found a new value for the
+	 * pd_prune_xid field or the page was marked full, we will update the hint
+	 * bit.
 	 */
-	if (do_prune && pagefrz)
-		prune_fpi = XLogCheckBufferNeedsBackup(buffer);
+	do_hint = ((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
+		PageIsFull(page);
 
-	/* Is the whole page freezable? And is there something to freeze */
+	/* Is the whole page freezable? And is there something to freeze? */
 	whole_page_freezable = presult->all_visible_except_removable &&
 		presult->all_frozen;
 
@@ -520,43 +526,51 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 * opportunistic freeze heuristic must be improved; however, for now, try
 	 * to approximate it.
 	 */
-	do_freeze = pagefrz &&
-		(pagefrz->freeze_required ||
-		 (whole_page_freezable && presult->nfrozen > 0 && (prune_fpi || hint_bit_fpi)));
+	do_freeze = false;
+	if (pagefrz)
+	{
+		if (pagefrz->freeze_required)
+			do_freeze = true;
+		else if (whole_page_freezable && presult->nfrozen > 0)
+		{
+			/*
+			 * Freezing would make the page all-frozen. In this case, we will
+			 * freeze if we have already emitted an FPI or will do so anyway.
+			 * Be sure only to incur the overhead of checking if we will do an
+			 * FPI if we may use that information.
+			 */
+			if (hint_bit_fpi ||
+				((do_prune || do_hint) && XLogCheckBufferNeedsBackup(buffer)))
+			{
+				do_freeze = true;
+			}
+		}
+	}
 
+	/*
+	 * Validate the tuples we are considering freezing. We do this even if
+	 * pruning and hint bit setting have not emitted an FPI so far because we
+	 * still may emit an FPI while setting the page hint bit later. But we
+	 * want to avoid doing the pre-freeze checks in a critical section.
+	 */
 	if (do_freeze)
-	{
 		heap_pre_freeze_checks(buffer, prstate.frozen, presult->nfrozen);
 
+	if (!do_freeze && (!pagefrz || !presult->all_frozen || presult->nfrozen > 0))
+	{
 		/*
-		 * We can use frz_conflict_horizon as our cutoff for conflicts when
-		 * the whole page is eligible to become all-frozen in the VM once
-		 * we're done with it.  Otherwise we generate a conservative cutoff by
-		 * stepping back from OldestXmin.
+		 * If we will neither freeze tuples on the page nor set the page all
+		 * frozen in the visibility map, the page is not all-frozen and there
+		 * will be no newly frozen tuples.
 		 */
-		if (!(presult->all_visible_except_removable && presult->all_frozen))
-		{
-			/* Avoids false conflicts when hot_standby_feedback in use */
-			presult->frz_conflict_horizon = pagefrz->cutoffs->OldestXmin;
-			TransactionIdRetreat(presult->frz_conflict_horizon);
-		}
+		presult->all_frozen = false;
+		presult->nfrozen = 0;	/* avoid miscounts in instrumentation */
 	}
 
-	/* Any error while applying the changes is critical */
 	START_CRIT_SECTION();
 
-	/* Have we found any prunable items? */
-	if (do_prune)
+	if (do_hint)
 	{
-		/*
-		 * Apply the planned item changes, then repair page fragmentation, and
-		 * update the page's hint bit about whether it has free line pointers.
-		 */
-		heap_page_prune_execute(buffer,
-								prstate.redirected, prstate.nredirected,
-								prstate.nowdead, prstate.ndead,
-								prstate.nowunused, prstate.nunused);
-
 		/*
 		 * Update the page's pd_prune_xid field to either zero, or the lowest
 		 * XID of any soon-prunable tuple.
@@ -564,163 +578,159 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
 
 		/*
-		 * Also clear the "page is full" flag, since there's no point in
-		 * repeating the prune/defrag process until something else happens to
-		 * the page.
+		 * Clear the "page is full" flag if it is set since there's no point
+		 * in repeating the prune/defrag process until something else happens
+		 * to the page.
 		 */
 		PageClearFull(page);
 
-		MarkBufferDirty(buffer);
+		/*
+		 * We only needed to update pd_prune_xid and clear the page-is-full
+		 * hint bit, this is a non-WAL-logged hint. If we will also freeze or
+		 * prune the page, we will mark the buffer dirty below.
+		 */
+		if (!do_freeze && !do_prune)
+			MarkBufferDirtyHint(buffer, true);
+	}
 
+	if (do_prune || do_freeze)
+	{
 		/*
-		 * Emit a WAL XLOG_HEAP2_PRUNE record showing what we did
+		 * Apply the planned item changes, then repair page fragmentation, and
+		 * update the page's hint bit about whether it has free line pointers.
 		 */
-		if (RelationNeedsWAL(relation))
+		if (do_prune)
 		{
-			xl_heap_prune xlrec;
-			XLogRecPtr	recptr;
-
-			xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
-			xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
-			xlrec.nredirected = prstate.nredirected;
-			xlrec.ndead = prstate.ndead;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, SizeOfHeapPrune);
-
-			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+			heap_page_prune_execute(buffer,
+									prstate.redirected, prstate.nredirected,
+									prstate.nowdead, prstate.ndead,
+									prstate.nowunused, prstate.nunused);
+		}
 
+		if (do_freeze)
+		{
 			/*
-			 * The OffsetNumber arrays are not actually in the buffer, but we
-			 * pretend that they are.  When XLogInsert stores the whole
-			 * buffer, the offset arrays need not be stored too.
+			 * We can use frz_conflict_horizon as our cutoff for conflicts
+			 * when the whole page is eligible to become all-frozen in the VM
+			 * once we're done with it.  Otherwise we generate a conservative
+			 * cutoff by stepping back from OldestXmin. This avoids false
+			 * conflicts when hot_standby_feedback is in use.
 			 */
-			if (prstate.nredirected > 0)
-				XLogRegisterBufData(0, (char *) prstate.redirected,
-									prstate.nredirected *
-									sizeof(OffsetNumber) * 2);
-
-			if (prstate.ndead > 0)
-				XLogRegisterBufData(0, (char *) prstate.nowdead,
-									prstate.ndead * sizeof(OffsetNumber));
-
-			if (prstate.nunused > 0)
-				XLogRegisterBufData(0, (char *) prstate.nowunused,
-									prstate.nunused * sizeof(OffsetNumber));
+			if (!(presult->all_visible_except_removable && presult->all_frozen))
+			{
+				presult->frz_conflict_horizon = pagefrz->cutoffs->OldestXmin;
+				TransactionIdRetreat(presult->frz_conflict_horizon);
+			}
+			heap_freeze_prepared_tuples(buffer, prstate.frozen, presult->nfrozen);
+		}
 
-			recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
+		MarkBufferDirty(buffer);
 
-			PageSetLSN(BufferGetPage(buffer), recptr);
-		}
-	}
-	else
-	{
 		/*
-		 * If we didn't prune anything, but have found a new value for the
-		 * pd_prune_xid field, update it and mark the buffer dirty. This is
-		 * treated as a non-WAL-logged hint.
-		 *
-		 * Also clear the "page is full" flag if it is set, since there's no
-		 * point in repeating the prune/defrag process until something else
-		 * happens to the page.
+		 * Emit a WAL XLOG_HEAP2_PRUNE record showing what we did
 		 */
-		if (((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
-			PageIsFull(page))
-		{
-			((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
-			PageClearFull(page);
-			MarkBufferDirtyHint(buffer, true);
-		}
+		if (RelationNeedsWAL(relation))
+			log_heap_prune_and_freeze(relation, buffer, &prstate, presult);
 	}
 
 	END_CRIT_SECTION();
 
-	/* Record number of newly-set-LP_DEAD items for caller */
-	presult->nnewlpdead = prstate.ndead;
-
-	if (do_freeze)
+	/*
+	 * If we froze tuples on the page, the caller can advance relfrozenxid and
+	 * relminmxid to the values in pagefrz->FreezePageRelfrozenXid and
+	 * pagefrz->FreezePageRelminMxid. Otherwise, it is only safe to advance to
+	 * the values in pagefrz->NoFreezePage[RelfrozenXid|RelminMxid]
+	 */
+	if (pagefrz)
 	{
-		START_CRIT_SECTION();
+		if (presult->nfrozen > 0)
+		{
+			presult->new_relfrozenxid = pagefrz->FreezePageRelfrozenXid;
+			presult->new_relminmxid = pagefrz->FreezePageRelminMxid;
+		}
+		else
+		{
+			presult->new_relfrozenxid = pagefrz->NoFreezePageRelfrozenXid;
+			presult->new_relminmxid = pagefrz->NoFreezePageRelminMxid;
+		}
+	}
+}
 
-		Assert(presult->nfrozen > 0);
 
-		heap_freeze_prepared_tuples(buffer, prstate.frozen, presult->nfrozen);
+static void
+log_heap_prune_and_freeze(Relation relation, Buffer buffer,
+						  PruneState *prstate, PruneFreezeResult *presult)
+{
+	xl_heap_prune xlrec;
+	XLogRecPtr	recptr;
 
-		MarkBufferDirty(buffer);
+	xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
+	OffsetNumber offsets[MaxHeapTuplesPerPage];
+	bool		do_freeze = presult->nfrozen > 0;
 
-		/* Now WAL-log freezing if necessary */
-		if (RelationNeedsWAL(relation))
-		{
-			xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
-			OffsetNumber offsets[MaxHeapTuplesPerPage];
-			int			nplans;
-			xl_heap_freeze_page xlrec;
-			XLogRecPtr	recptr;
+	xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
+	xlrec.nredirected = prstate->nredirected;
+	xlrec.ndead = prstate->ndead;
+	xlrec.nunused = prstate->nunused;
+	xlrec.nplans = 0;
 
-			/*
-			 * Prepare deduplicated representation for use in WAL record
-			 * Destructively sorts tuples array in-place.
-			 */
-			nplans = heap_log_freeze_plan(prstate.frozen, presult->nfrozen, plans, offsets);
+	/*
+	 * The snapshotConflictHorizon for the whole record should be the most
+	 * conservative of all the horizons calculated for any of the possible
+	 * modifications. If this record will prune tuples, any transactions on
+	 * the standby older than the youngest xmax of the most recently removed
+	 * tuple this record will prune will conflict. If this record will freeze
+	 * tuples, any transactions on the standby with xids older than the
+	 * youngest tuple this record will freeze will conflict.
+	 */
+	if (do_freeze)
+		xlrec.snapshotConflictHorizon = Max(prstate->snapshotConflictHorizon,
+											presult->frz_conflict_horizon);
+	else
+		xlrec.snapshotConflictHorizon = prstate->snapshotConflictHorizon;
 
-			xlrec.snapshotConflictHorizon = presult->frz_conflict_horizon;
-			xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
-			xlrec.nplans = nplans;
+	/*
+	 * Prepare deduplicated representation for use in WAL record Destructively
+	 * sorts tuples array in-place.
+	 */
+	if (do_freeze)
+		xlrec.nplans = heap_log_freeze_plan(prstate->frozen,
+											presult->nfrozen, plans, offsets);
 
-			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfHeapPrune);
 
-			/*
-			 * The freeze plan array and offset array are not actually in the
-			 * buffer, but pretend that they are.  When XLogInsert stores the
-			 * whole buffer, the arrays need not be stored too.
-			 */
-			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-			XLogRegisterBufData(0, (char *) plans,
-								nplans * sizeof(xl_heap_freeze_plan));
-			XLogRegisterBufData(0, (char *) offsets,
-								presult->nfrozen * sizeof(OffsetNumber));
+	XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 
-			recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
+	/*
+	 * The OffsetNumber arrays are not actually in the buffer, but we pretend
+	 * that they are.  When XLogInsert stores the whole buffer, the offset
+	 * arrays need not be stored too.
+	 */
+	if (xlrec.nplans > 0)
+		XLogRegisterBufData(0, (char *) plans,
+							xlrec.nplans * sizeof(xl_heap_freeze_plan));
 
-			PageSetLSN(page, recptr);
-		}
+	if (prstate->nredirected > 0)
+		XLogRegisterBufData(0, (char *) prstate->redirected,
+							prstate->nredirected *
+							sizeof(OffsetNumber) * 2);
 
-		END_CRIT_SECTION();
-	}
-	else if (!pagefrz || !presult->all_frozen || presult->nfrozen > 0)
-	{
-		/*
-		 * If we will neither freeze tuples on the page nor set the page all
-		 * frozen in the visibility map, the page is not all frozen and there
-		 * will be no newly frozen tuples.
-		 */
-		presult->all_frozen = false;
-		presult->nfrozen = 0;	/* avoid miscounts in instrumentation */
-	}
+	if (prstate->ndead > 0)
+		XLogRegisterBufData(0, (char *) prstate->nowdead,
+							prstate->ndead * sizeof(OffsetNumber));
 
-	/* Caller won't update new_relfrozenxid and new_relminmxid */
-	if (!pagefrz)
-		return;
+	if (prstate->nunused > 0)
+		XLogRegisterBufData(0, (char *) prstate->nowunused,
+							prstate->nunused * sizeof(OffsetNumber));
+	if (xlrec.nplans > 0)
+		XLogRegisterBufData(0, (char *) offsets,
+							presult->nfrozen * sizeof(OffsetNumber));
 
-	/*
-	 * If we will freeze tuples on the page or, even if we don't freeze tuples
-	 * on the page, if we will set the page all-frozen in the visibility map,
-	 * we can advance relfrozenxid and relminmxid to the values in
-	 * pagefrz->FreezePageRelfrozenXid and pagefrz->FreezePageRelminMxid.
-	 */
-	if (presult->all_frozen || presult->nfrozen > 0)
-	{
-		presult->new_relfrozenxid = pagefrz->FreezePageRelfrozenXid;
-		presult->new_relminmxid = pagefrz->FreezePageRelminMxid;
-	}
-	else
-	{
-		presult->new_relfrozenxid = pagefrz->NoFreezePageRelfrozenXid;
-		presult->new_relminmxid = pagefrz->NoFreezePageRelminMxid;
-	}
-}
+	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
 
+	PageSetLSN(BufferGetPage(buffer), recptr);
+}
 
 /*
  * Perform visibility checks for heap pruning.
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index 36a3d83c8c2..9f0a0341d40 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -179,43 +179,67 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_heap_prune *xlrec = (xl_heap_prune *) rec;
 
-		appendStringInfo(buf, "snapshotConflictHorizon: %u, nredirected: %u, ndead: %u, isCatalogRel: %c",
+		appendStringInfo(buf, "snapshotConflictHorizon: %u, isCatalogRel: %c",
 						 xlrec->snapshotConflictHorizon,
-						 xlrec->nredirected,
-						 xlrec->ndead,
 						 xlrec->isCatalogRel ? 'T' : 'F');
 
 		if (XLogRecHasBlockData(record, 0))
 		{
-			OffsetNumber *end;
 			OffsetNumber *redirected;
 			OffsetNumber *nowdead;
 			OffsetNumber *nowunused;
 			int			nredirected;
+			int			ndead;
 			int			nunused;
+			int			nplans;
 			Size		datalen;
+			xl_heap_freeze_plan *plans;
+			OffsetNumber *frz_offsets;
 
-			redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0,
-															  &datalen);
-
+			nplans = xlrec->nplans;
 			nredirected = xlrec->nredirected;
-			end = (OffsetNumber *) ((char *) redirected + datalen);
-			nowdead = redirected + (nredirected * 2);
-			nowunused = nowdead + xlrec->ndead;
-			nunused = (end - nowunused);
-			Assert(nunused >= 0);
+			ndead = xlrec->ndead;
+			nunused = xlrec->nunused;
 
-			appendStringInfo(buf, ", nunused: %d", nunused);
-
-			appendStringInfoString(buf, ", redirected:");
-			array_desc(buf, redirected, sizeof(OffsetNumber) * 2,
-					   nredirected, &redirect_elem_desc, NULL);
-			appendStringInfoString(buf, ", dead:");
-			array_desc(buf, nowdead, sizeof(OffsetNumber), xlrec->ndead,
-					   &offset_elem_desc, NULL);
-			appendStringInfoString(buf, ", unused:");
-			array_desc(buf, nowunused, sizeof(OffsetNumber), nunused,
-					   &offset_elem_desc, NULL);
+			plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, &datalen);
+			redirected = (OffsetNumber *) &plans[nplans];
+			nowdead = redirected + (nredirected * 2);
+			nowunused = nowdead + ndead;
+			frz_offsets = nowunused + nunused;
+
+			appendStringInfo(buf, ", nredirected: %u, ndead: %u, nunused: %u, nplans: %u,",
+							 nredirected,
+							 ndead,
+							 nunused,
+							 nplans);
+
+			if (nredirected > 0)
+			{
+				appendStringInfoString(buf, ", redirected:");
+				array_desc(buf, redirected, sizeof(OffsetNumber) * 2,
+						   nredirected, &redirect_elem_desc, NULL);
+			}
+
+			if (ndead > 0)
+			{
+				appendStringInfoString(buf, ", dead:");
+				array_desc(buf, nowdead, sizeof(OffsetNumber), ndead,
+						   &offset_elem_desc, NULL);
+			}
+
+			if (nunused > 0)
+			{
+				appendStringInfoString(buf, ", unused:");
+				array_desc(buf, nowunused, sizeof(OffsetNumber), nunused,
+						   &offset_elem_desc, NULL);
+			}
+
+			if (nplans > 0)
+			{
+				appendStringInfoString(buf, ", plans:");
+				array_desc(buf, plans, sizeof(xl_heap_freeze_plan), nplans,
+						   &plan_elem_desc, &frz_offsets);
+			}
 		}
 	}
 	else if (info == XLOG_HEAP2_VACUUM)
@@ -235,28 +259,6 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 					   &offset_elem_desc, NULL);
 		}
 	}
-	else if (info == XLOG_HEAP2_FREEZE_PAGE)
-	{
-		xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) rec;
-
-		appendStringInfo(buf, "snapshotConflictHorizon: %u, nplans: %u, isCatalogRel: %c",
-						 xlrec->snapshotConflictHorizon, xlrec->nplans,
-						 xlrec->isCatalogRel ? 'T' : 'F');
-
-		if (XLogRecHasBlockData(record, 0))
-		{
-			xl_heap_freeze_plan *plans;
-			OffsetNumber *offsets;
-
-			plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL);
-			offsets = (OffsetNumber *) ((char *) plans +
-										(xlrec->nplans *
-										 sizeof(xl_heap_freeze_plan)));
-			appendStringInfoString(buf, ", plans:");
-			array_desc(buf, plans, sizeof(xl_heap_freeze_plan), xlrec->nplans,
-					   &plan_elem_desc, &offsets);
-		}
-	}
 	else if (info == XLOG_HEAP2_VISIBLE)
 	{
 		xl_heap_visible *xlrec = (xl_heap_visible *) rec;
@@ -361,9 +363,6 @@ heap2_identify(uint8 info)
 		case XLOG_HEAP2_VACUUM:
 			id = "VACUUM";
 			break;
-		case XLOG_HEAP2_FREEZE_PAGE:
-			id = "FREEZE_PAGE";
-			break;
 		case XLOG_HEAP2_VISIBLE:
 			id = "VISIBLE";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e5ab7b78b78..f77051572fd 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -445,7 +445,6 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * Everything else here is just low level physical stuff we're not
 			 * interested in.
 			 */
-		case XLOG_HEAP2_FREEZE_PAGE:
 		case XLOG_HEAP2_PRUNE:
 		case XLOG_HEAP2_VACUUM:
 		case XLOG_HEAP2_VISIBLE:
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 6488dad5e64..fe4a8ff0620 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -53,11 +53,10 @@
 #define XLOG_HEAP2_REWRITE		0x00
 #define XLOG_HEAP2_PRUNE		0x10
 #define XLOG_HEAP2_VACUUM		0x20
-#define XLOG_HEAP2_FREEZE_PAGE	0x30
-#define XLOG_HEAP2_VISIBLE		0x40
-#define XLOG_HEAP2_MULTI_INSERT 0x50
-#define XLOG_HEAP2_LOCK_UPDATED 0x60
-#define XLOG_HEAP2_NEW_CID		0x70
+#define XLOG_HEAP2_VISIBLE		0x30
+#define XLOG_HEAP2_MULTI_INSERT 0x40
+#define XLOG_HEAP2_LOCK_UPDATED 0x50
+#define XLOG_HEAP2_NEW_CID		0x60
 
 /*
  * xl_heap_insert/xl_heap_multi_insert flag values, 8 bits are available.
@@ -226,28 +225,65 @@ typedef struct xl_heap_update
 
 #define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_offnum) + sizeof(OffsetNumber))
 
+/*
+ * This struct represents a 'freeze plan', which describes how to freeze a
+ * group of one or more heap tuples (appears in xl_heap_prune record)
+ */
+/* 0x01 was XLH_FREEZE_XMIN */
+#define		XLH_FREEZE_XVAC		0x02
+#define		XLH_INVALID_XVAC	0x04
+
+typedef struct xl_heap_freeze_plan
+{
+	TransactionId xmax;
+	uint16		t_infomask2;
+	uint16		t_infomask;
+	uint8		frzflags;
+
+	/* Length of individual page offset numbers array for this plan */
+	uint16		ntuples;
+} xl_heap_freeze_plan;
+
+/*
+ * As of Postgres 17, XLOG_HEAP2_PRUNE records replace
+ * XLOG_HEAP2_FREEZE_PAGE records.
+ */
+
 /*
  * This is what we need to know about page pruning (both during VACUUM and
  * during opportunistic pruning)
  *
  * The array of OffsetNumbers following the fixed part of the record contains:
+ *	* for each freeze plan: the freeze plan
  *	* for each redirected item: the item offset, then the offset redirected to
  *	* for each now-dead item: the item offset
  *	* for each now-unused item: the item offset
- * The total number of OffsetNumbers is therefore 2*nredirected+ndead+nunused.
- * Note that nunused is not explicitly stored, but may be found by reference
- * to the total record length.
+ *	* for each tuple frozen by the freeze plans: the offset of the item corresponding to that tuple
+ * The total number of OffsetNumbers is therefore
+ * (2*nredirected) + ndead + nunused + (sum[plan.ntuples for plan in plans])
  *
- * Acquires a full cleanup lock.
+ * Acquires a full cleanup lock if heap_page_prune_execute() must be called
  */
 typedef struct xl_heap_prune
 {
 	TransactionId snapshotConflictHorizon;
+	uint16		nplans;
 	uint16		nredirected;
 	uint16		ndead;
+	uint16		nunused;
 	bool		isCatalogRel;	/* to handle recovery conflict during logical
 								 * decoding on standby */
-	/* OFFSET NUMBERS are in the block reference 0 */
+	/*--------------------------------------------------------------------
+	 * OFFSET NUMBERS and freeze plans are in the block reference 0 in the
+	 * following order:
+	 *
+	 *		* xl_heap_freeze_plan plans[nplans];
+	 * 		* OffsetNumber redirected[2 * nredirected];
+	 * 		* OffsetNumber nowdead[ndead];
+	 *		* OffsetNumber nowunused[nunused];
+	 * 		* OffsetNumber frz_offsets[...];
+	 *--------------------------------------------------------------------
+	 */
 } xl_heap_prune;
 
 #define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
@@ -315,47 +351,6 @@ typedef struct xl_heap_inplace
 
 #define SizeOfHeapInplace	(offsetof(xl_heap_inplace, offnum) + sizeof(OffsetNumber))
 
-/*
- * This struct represents a 'freeze plan', which describes how to freeze a
- * group of one or more heap tuples (appears in xl_heap_freeze_page record)
- */
-/* 0x01 was XLH_FREEZE_XMIN */
-#define		XLH_FREEZE_XVAC		0x02
-#define		XLH_INVALID_XVAC	0x04
-
-typedef struct xl_heap_freeze_plan
-{
-	TransactionId xmax;
-	uint16		t_infomask2;
-	uint16		t_infomask;
-	uint8		frzflags;
-
-	/* Length of individual page offset numbers array for this plan */
-	uint16		ntuples;
-} xl_heap_freeze_plan;
-
-/*
- * This is what we need to know about a block being frozen during vacuum
- *
- * Backup block 0's data contains an array of xl_heap_freeze_plan structs
- * (with nplans elements), followed by one or more page offset number arrays.
- * Each such page offset number array corresponds to a single freeze plan
- * (REDO routine freezes corresponding heap tuples using freeze plan).
- */
-typedef struct xl_heap_freeze_page
-{
-	TransactionId snapshotConflictHorizon;
-	uint16		nplans;
-	bool		isCatalogRel;	/* to handle recovery conflict during logical
-								 * decoding on standby */
-
-	/*
-	 * In payload of blk 0 : FREEZE PLANS and OFFSET NUMBER ARRAY
-	 */
-} xl_heap_freeze_page;
-
-#define SizeOfHeapFreezePage	(offsetof(xl_heap_freeze_page, isCatalogRel) + sizeof(bool))
-
 /*
  * This is what we need to know about setting a visibility map bit
  *
-- 
2.40.1

