From 06d5ff5349a8aa95cbfd06a8043fe503b7b1bf7b Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 20 Mar 2024 14:50:14 +0200
Subject: [PATCH v5 01/26] Merge prune, freeze and vacuum WAL record formats

The new combined WAL record is now used for pruning, freezing and 2nd
pass of vacuum. This is in preparation of changing vacuuming to write
a combined prune+freeze record per page, instead of separate two
records. The new WAL record format now supports that, but the code
still always writes separate records for pruning and freezing.

XXX I tried to lift-and-shift the code from v4 patch set as unchanged
as possible, for easier review, but some noteworthy changes:

- Instead of passing PruneState and PageFreezeResult to
  log_heap_prune_and_freeze(), pass the arrays of frozen, redirected
  et al offsets directly. That way, it can be called from other places.

- moved heap_xlog_deserialize_prune_and_freeze() from xactdesc.c to
  heapdesc.c. (Because that's clearly where it belongs)

Author: Melanie Plageman <melanieplageman@gmail.com>
---
 src/backend/access/heap/heapam.c         | 433 +++++------------------
 src/backend/access/heap/pruneheap.c      | 381 ++++++++++++++++----
 src/backend/access/heap/vacuumlazy.c     |  20 +-
 src/backend/access/rmgrdesc/heapdesc.c   | 198 +++++++----
 src/backend/replication/logical/decode.c |   2 -
 src/include/access/heapam.h              |   9 +-
 src/include/access/heapam_xlog.h         | 172 +++++----
 7 files changed, 642 insertions(+), 573 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 34bc60f625f..e6cfffd9f3e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -91,9 +91,6 @@ static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
 static TM_Result heap_lock_updated_tuple(Relation rel, HeapTuple tuple,
 										 ItemPointer ctid, TransactionId xid,
 										 LockTupleMode mode);
-static int	heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples,
-								 xl_heap_freeze_plan *plans_out,
-								 OffsetNumber *offsets_out);
 static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask,
 								   uint16 *new_infomask2);
 static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax,
@@ -6746,179 +6743,16 @@ heap_freeze_execute_prepared(Relation rel, Buffer buffer,
 	/* Now WAL-log freezing if necessary */
 	if (RelationNeedsWAL(rel))
 	{
-		xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
-		OffsetNumber offsets[MaxHeapTuplesPerPage];
-		int			nplans;
-		xl_heap_freeze_page xlrec;
-		XLogRecPtr	recptr;
-
-		/* Prepare deduplicated representation for use in WAL record */
-		nplans = heap_log_freeze_plan(tuples, ntuples, plans, offsets);
-
-		xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
-		xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(rel);
-		xlrec.nplans = nplans;
-
-		XLogBeginInsert();
-		XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
-
-		/*
-		 * The freeze plan array and offset array are not actually in the
-		 * buffer, but pretend that they are.  When XLogInsert stores the
-		 * whole buffer, the arrays need not be stored too.
-		 */
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-		XLogRegisterBufData(0, (char *) plans,
-							nplans * sizeof(xl_heap_freeze_plan));
-		XLogRegisterBufData(0, (char *) offsets,
-							ntuples * sizeof(OffsetNumber));
-
-		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
-
-		PageSetLSN(page, recptr);
+		log_heap_prune_and_freeze(rel, buffer, snapshotConflictHorizon, false,
+								  tuples, ntuples,
+								  NULL, 0,	/* redirected */
+								  NULL, 0,	/* dead */
+								  NULL, 0); /* unused */
 	}
 
 	END_CRIT_SECTION();
 }
 
-/*
- * Comparator used to deduplicate XLOG_HEAP2_FREEZE_PAGE freeze plans
- */
-static int
-heap_log_freeze_cmp(const void *arg1, const void *arg2)
-{
-	HeapTupleFreeze *frz1 = (HeapTupleFreeze *) arg1;
-	HeapTupleFreeze *frz2 = (HeapTupleFreeze *) arg2;
-
-	if (frz1->xmax < frz2->xmax)
-		return -1;
-	else if (frz1->xmax > frz2->xmax)
-		return 1;
-
-	if (frz1->t_infomask2 < frz2->t_infomask2)
-		return -1;
-	else if (frz1->t_infomask2 > frz2->t_infomask2)
-		return 1;
-
-	if (frz1->t_infomask < frz2->t_infomask)
-		return -1;
-	else if (frz1->t_infomask > frz2->t_infomask)
-		return 1;
-
-	if (frz1->frzflags < frz2->frzflags)
-		return -1;
-	else if (frz1->frzflags > frz2->frzflags)
-		return 1;
-
-	/*
-	 * heap_log_freeze_eq would consider these tuple-wise plans to be equal.
-	 * (So the tuples will share a single canonical freeze plan.)
-	 *
-	 * We tiebreak on page offset number to keep each freeze plan's page
-	 * offset number array individually sorted. (Unnecessary, but be tidy.)
-	 */
-	if (frz1->offset < frz2->offset)
-		return -1;
-	else if (frz1->offset > frz2->offset)
-		return 1;
-
-	Assert(false);
-	return 0;
-}
-
-/*
- * Compare fields that describe actions required to freeze tuple with caller's
- * open plan.  If everything matches then the frz tuple plan is equivalent to
- * caller's plan.
- */
-static inline bool
-heap_log_freeze_eq(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz)
-{
-	if (plan->xmax == frz->xmax &&
-		plan->t_infomask2 == frz->t_infomask2 &&
-		plan->t_infomask == frz->t_infomask &&
-		plan->frzflags == frz->frzflags)
-		return true;
-
-	/* Caller must call heap_log_freeze_new_plan again for frz */
-	return false;
-}
-
-/*
- * Start new plan initialized using tuple-level actions.  At least one tuple
- * will have steps required to freeze described by caller's plan during REDO.
- */
-static inline void
-heap_log_freeze_new_plan(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz)
-{
-	plan->xmax = frz->xmax;
-	plan->t_infomask2 = frz->t_infomask2;
-	plan->t_infomask = frz->t_infomask;
-	plan->frzflags = frz->frzflags;
-	plan->ntuples = 1;			/* for now */
-}
-
-/*
- * Deduplicate tuple-based freeze plans so that each distinct set of
- * processing steps is only stored once in XLOG_HEAP2_FREEZE_PAGE records.
- * Called during original execution of freezing (for logged relations).
- *
- * Return value is number of plans set in *plans_out for caller.  Also writes
- * an array of offset numbers into *offsets_out output argument for caller
- * (actually there is one array per freeze plan, but that's not of immediate
- * concern to our caller).
- */
-static int
-heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples,
-					 xl_heap_freeze_plan *plans_out,
-					 OffsetNumber *offsets_out)
-{
-	int			nplans = 0;
-
-	/* Sort tuple-based freeze plans in the order required to deduplicate */
-	qsort(tuples, ntuples, sizeof(HeapTupleFreeze), heap_log_freeze_cmp);
-
-	for (int i = 0; i < ntuples; i++)
-	{
-		HeapTupleFreeze *frz = tuples + i;
-
-		if (i == 0)
-		{
-			/* New canonical freeze plan starting with first tup */
-			heap_log_freeze_new_plan(plans_out, frz);
-			nplans++;
-		}
-		else if (heap_log_freeze_eq(plans_out, frz))
-		{
-			/* tup matches open canonical plan -- include tup in it */
-			Assert(offsets_out[i - 1] < frz->offset);
-			plans_out->ntuples++;
-		}
-		else
-		{
-			/* Tup doesn't match current plan -- done with it now */
-			plans_out++;
-
-			/* New canonical freeze plan starting with this tup */
-			heap_log_freeze_new_plan(plans_out, frz);
-			nplans++;
-		}
-
-		/*
-		 * Save page offset number in dedicated buffer in passing.
-		 *
-		 * REDO routine relies on the record's offset numbers array grouping
-		 * offset numbers by freeze plan.  The sort order within each grouping
-		 * is ascending offset number order, just to keep things tidy.
-		 */
-		offsets_out[i] = frz->offset;
-	}
-
-	Assert(nplans > 0 && nplans <= ntuples);
-
-	return nplans;
-}
-
 /*
  * heap_freeze_tuple
  *		Freeze tuple in place, without WAL logging.
@@ -8754,8 +8588,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 
 /*
  * Handles XLOG_HEAP2_PRUNE record type.
- *
- * Acquires a full cleanup lock.
  */
 static void
 heap_xlog_prune(XLogReaderState *record)
@@ -8766,125 +8598,109 @@ heap_xlog_prune(XLogReaderState *record)
 	RelFileLocator rlocator;
 	BlockNumber blkno;
 	XLogRedoAction action;
+	bool		get_cleanup_lock;
+	bool		lp_truncate_only;
 
 	XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
 
-	/*
-	 * We're about to remove tuples. In Hot Standby mode, ensure that there's
-	 * no queries running for which the removed tuples are still visible.
-	 */
-	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
-											xlrec->isCatalogRel,
-											rlocator);
+	lp_truncate_only = xlrec->flags & XLHP_LP_TRUNCATE_ONLY;
 
 	/*
-	 * If we have a full-page image, restore it (using a cleanup lock) and
-	 * we're done.
+	 * If there are dead, redirected, or unused items set unused by
+	 * heap_page_prune_and_freeze(), heap_page_prune_execute() will call
+	 * PageRepairFragementation() which expects a full cleanup lock.
 	 */
-	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true,
-										   &buffer);
-	if (action == BLK_NEEDS_REDO)
-	{
-		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *end;
-		OffsetNumber *redirected;
-		OffsetNumber *nowdead;
-		OffsetNumber *nowunused;
-		int			nredirected;
-		int			ndead;
-		int			nunused;
-		Size		datalen;
-
-		redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
-
-		nredirected = xlrec->nredirected;
-		ndead = xlrec->ndead;
-		end = (OffsetNumber *) ((char *) redirected + datalen);
-		nowdead = redirected + (nredirected * 2);
-		nowunused = nowdead + ndead;
-		nunused = (end - nowunused);
-		Assert(nunused >= 0);
+	get_cleanup_lock = xlrec->flags & XLHP_HAS_REDIRECTIONS ||
+		xlrec->flags & XLHP_HAS_DEAD_ITEMS ||
+		(xlrec->flags & XLHP_HAS_NOW_UNUSED_ITEMS && !lp_truncate_only);
 
-		/* Update all line pointers per the record, and repair fragmentation */
-		heap_page_prune_execute(buffer,
-								redirected, nredirected,
-								nowdead, ndead,
-								nowunused, nunused);
-
-		/*
-		 * Note: we don't worry about updating the page's prunability hints.
-		 * At worst this will cause an extra prune cycle to occur soon.
-		 */
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
+	if (lp_truncate_only)
+	{
+		Assert(!(xlrec->flags & XLHP_HAS_REDIRECTIONS));
+		Assert(!(xlrec->flags & XLHP_HAS_DEAD_ITEMS));
+		Assert(xlrec->flags & XLHP_HAS_NOW_UNUSED_ITEMS);
 	}
 
-	if (BufferIsValid(buffer))
+	/*
+	 * We are either about to remove tuples or freeze them. In Hot Standby
+	 * mode, ensure that there's no queries running for which any removed
+	 * tuples are still visible or which consider the frozen xids as running.
+	 */
+	if (xlrec->flags & XLHP_HAS_CONFLICT_HORIZON && InHotStandby)
 	{
-		Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-
-		UnlockReleaseBuffer(buffer);
+		xlhp_conflict_horizon *horizon = (xlhp_conflict_horizon *) (xlrec + SizeOfHeapPrune);
 
-		/*
-		 * After pruning records from a page, it's useful to update the FSM
-		 * about it, as it may cause the page become target for insertions
-		 * later even if vacuum decides not to visit it (which is possible if
-		 * gets marked all-visible.)
-		 *
-		 * Do this regardless of a full-page image being applied, since the
-		 * FSM data is not in the page anyway.
-		 */
-		XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+		ResolveRecoveryConflictWithSnapshot(horizon->xid,
+											xlrec->flags & XLHP_IS_CATALOG_REL,
+											rlocator);
 	}
-}
-
-/*
- * Handles XLOG_HEAP2_VACUUM record type.
- *
- * Acquires an ordinary exclusive lock only.
- */
-static void
-heap_xlog_vacuum(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_vacuum *xlrec = (xl_heap_vacuum *) XLogRecGetData(record);
-	Buffer		buffer;
-	BlockNumber blkno;
-	XLogRedoAction action;
 
 	/*
-	 * If we have a full-page image, restore it	(without using a cleanup lock)
-	 * and we're done.
+	 * If we have a full-page image, restore it and we're done.
 	 */
-	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, false,
-										   &buffer);
+	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+										   get_cleanup_lock, &buffer);
+
 	if (action == BLK_NEEDS_REDO)
 	{
 		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *nowunused;
+		OffsetNumber *redirected = NULL;
+		OffsetNumber *nowdead = NULL;
+		OffsetNumber *nowunused = NULL;
+		int			nredirected = 0;
+		int			ndead = 0;
+		int			nunused = 0;
+		int			nplans = 0;
 		Size		datalen;
-		OffsetNumber *offnum;
+		xl_heap_freeze_plan *plans = NULL;
+		OffsetNumber *frz_offsets = NULL;
+		int			curoff = 0;
 
-		nowunused = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
+		char	   *cursor = XLogRecGetBlockData(record, 0, &datalen);
 
-		/* Shouldn't be a record unless there's something to do */
-		Assert(xlrec->nunused > 0);
+		heap_xlog_deserialize_prune_and_freeze(cursor, xlrec->flags,
+											   &nredirected, &redirected,
+											   &ndead, &nowdead,
+											   &nunused, &nowunused,
+											   &nplans, &plans, &frz_offsets);
+
+		/* Update all line pointers per the record, and repair fragmentation */
+		if (nredirected > 0 || ndead > 0 || nunused > 0)
+			heap_page_prune_execute(buffer, lp_truncate_only,
+									redirected, nredirected,
+									nowdead, ndead,
+									nowunused, nunused);
 
-		/* Update all now-unused line pointers */
-		offnum = nowunused;
-		for (int i = 0; i < xlrec->nunused; i++)
+		for (int p = 0; p < nplans; p++)
 		{
-			OffsetNumber off = *offnum++;
-			ItemId		lp = PageGetItemId(page, off);
+			HeapTupleFreeze frz;
 
-			Assert(ItemIdIsDead(lp) && !ItemIdHasStorage(lp));
-			ItemIdSetUnused(lp);
+			/*
+			 * Convert freeze plan representation from WAL record into
+			 * per-tuple format used by heap_execute_freeze_tuple
+			 */
+			frz.xmax = plans[p].xmax;
+			frz.t_infomask2 = plans[p].t_infomask2;
+			frz.t_infomask = plans[p].t_infomask;
+			frz.frzflags = plans[p].frzflags;
+			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
+
+			for (int i = 0; i < plans[p].ntuples; i++)
+			{
+				OffsetNumber offset = frz_offsets[curoff++];
+				ItemId		lp;
+				HeapTupleHeader tuple;
+
+				lp = PageGetItemId(page, offset);
+				tuple = (HeapTupleHeader) PageGetItem(page, lp);
+				heap_execute_freeze_tuple(tuple, &frz);
+			}
 		}
 
-		/* Attempt to truncate line pointer array now */
-		PageTruncateLinePointerArray(page);
+		/*
+		 * Note: we don't worry about updating the page's prunability hints.
+		 * At worst this will cause an extra prune cycle to occur soon.
+		 */
 
 		PageSetLSN(page, lsn);
 		MarkBufferDirty(buffer);
@@ -8893,17 +8709,14 @@ heap_xlog_vacuum(XLogReaderState *record)
 	if (BufferIsValid(buffer))
 	{
 		Size		freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-		RelFileLocator rlocator;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
 
 		UnlockReleaseBuffer(buffer);
 
 		/*
-		 * After vacuuming LP_DEAD items from a page, it's useful to update
-		 * the FSM about it, as it may cause the page become target for
-		 * insertions later even if vacuum decides not to visit it (which is
-		 * possible if gets marked all-visible.)
+		 * After modifying records on a page, it's useful to update the FSM
+		 * about it, as it may cause the page become target for insertions
+		 * later even if vacuum decides not to visit it (which is possible if
+		 * gets marked all-visible.)
 		 *
 		 * Do this regardless of a full-page image being applied, since the
 		 * FSM data is not in the page anyway.
@@ -9049,74 +8862,6 @@ heap_xlog_visible(XLogReaderState *record)
 		UnlockReleaseBuffer(vmbuffer);
 }
 
-/*
- * Replay XLOG_HEAP2_FREEZE_PAGE records
- */
-static void
-heap_xlog_freeze_page(XLogReaderState *record)
-{
-	XLogRecPtr	lsn = record->EndRecPtr;
-	xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record);
-	Buffer		buffer;
-
-	/*
-	 * In Hot Standby mode, ensure that there's no queries running which still
-	 * consider the frozen xids as running.
-	 */
-	if (InHotStandby)
-	{
-		RelFileLocator rlocator;
-
-		XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
-											xlrec->isCatalogRel,
-											rlocator);
-	}
-
-	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-	{
-		Page		page = BufferGetPage(buffer);
-		xl_heap_freeze_plan *plans;
-		OffsetNumber *offsets;
-		int			curoff = 0;
-
-		plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL);
-		offsets = (OffsetNumber *) ((char *) plans +
-									(xlrec->nplans *
-									 sizeof(xl_heap_freeze_plan)));
-		for (int p = 0; p < xlrec->nplans; p++)
-		{
-			HeapTupleFreeze frz;
-
-			/*
-			 * Convert freeze plan representation from WAL record into
-			 * per-tuple format used by heap_execute_freeze_tuple
-			 */
-			frz.xmax = plans[p].xmax;
-			frz.t_infomask2 = plans[p].t_infomask2;
-			frz.t_infomask = plans[p].t_infomask;
-			frz.frzflags = plans[p].frzflags;
-			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
-
-			for (int i = 0; i < plans[p].ntuples; i++)
-			{
-				OffsetNumber offset = offsets[curoff++];
-				ItemId		lp;
-				HeapTupleHeader tuple;
-
-				lp = PageGetItemId(page, offset);
-				tuple = (HeapTupleHeader) PageGetItem(page, lp);
-				heap_execute_freeze_tuple(tuple, &frz);
-			}
-		}
-
-		PageSetLSN(page, lsn);
-		MarkBufferDirty(buffer);
-	}
-	if (BufferIsValid(buffer))
-		UnlockReleaseBuffer(buffer);
-}
-
 /*
  * Given an "infobits" field from an XLog record, set the correct bits in the
  * given infomask and infomask2 for the tuple touched by the record.
@@ -10020,12 +9765,6 @@ heap2_redo(XLogReaderState *record)
 		case XLOG_HEAP2_PRUNE:
 			heap_xlog_prune(record);
 			break;
-		case XLOG_HEAP2_VACUUM:
-			heap_xlog_vacuum(record);
-			break;
-		case XLOG_HEAP2_FREEZE_PAGE:
-			heap_xlog_freeze_page(record);
-			break;
 		case XLOG_HEAP2_VISIBLE:
 			heap_xlog_visible(record);
 			break;
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 69332b0d25c..9773681868c 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -338,7 +338,7 @@ heap_page_prune(Relation relation, Buffer buffer,
 		 * Apply the planned item changes, then repair page fragmentation, and
 		 * update the page's hint bit about whether it has free line pointers.
 		 */
-		heap_page_prune_execute(buffer,
+		heap_page_prune_execute(buffer, false,
 								prstate.redirected, prstate.nredirected,
 								prstate.nowdead, prstate.ndead,
 								prstate.nowunused, prstate.nunused);
@@ -363,40 +363,13 @@ heap_page_prune(Relation relation, Buffer buffer,
 		 */
 		if (RelationNeedsWAL(relation))
 		{
-			xl_heap_prune xlrec;
-			XLogRecPtr	recptr;
-
-			xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
-			xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
-			xlrec.nredirected = prstate.nredirected;
-			xlrec.ndead = prstate.ndead;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, SizeOfHeapPrune);
-
-			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-
-			/*
-			 * The OffsetNumber arrays are not actually in the buffer, but we
-			 * pretend that they are.  When XLogInsert stores the whole
-			 * buffer, the offset arrays need not be stored too.
-			 */
-			if (prstate.nredirected > 0)
-				XLogRegisterBufData(0, (char *) prstate.redirected,
-									prstate.nredirected *
-									sizeof(OffsetNumber) * 2);
-
-			if (prstate.ndead > 0)
-				XLogRegisterBufData(0, (char *) prstate.nowdead,
-									prstate.ndead * sizeof(OffsetNumber));
-
-			if (prstate.nunused > 0)
-				XLogRegisterBufData(0, (char *) prstate.nowunused,
-									prstate.nunused * sizeof(OffsetNumber));
-
-			recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
-
-			PageSetLSN(BufferGetPage(buffer), recptr);
+			log_heap_prune_and_freeze(relation, buffer,
+									  prstate.snapshotConflictHorizon,
+									  false,
+									  NULL, 0,
+									  prstate.redirected, prstate.nredirected,
+									  prstate.nowdead, prstate.ndead,
+									  prstate.nowunused, prstate.nunused);
 		}
 	}
 	else
@@ -826,12 +799,14 @@ heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
 
 
 /*
- * Perform the actual page changes needed by heap_page_prune.
- * It is expected that the caller has a full cleanup lock on the
- * buffer.
+ * Perform the actual page pruning modifications needed by
+ * heap_page_prune_and_freeze().
+ *
+ * Unless 'lp_truncate_only' is set, it is expected that the caller has a full
+ * cleanup lock on the buffer.
  */
 void
-heap_page_prune_execute(Buffer buffer,
+heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 						OffsetNumber *redirected, int nredirected,
 						OffsetNumber *nowdead, int ndead,
 						OffsetNumber *nowunused, int nunused)
@@ -843,6 +818,9 @@ heap_page_prune_execute(Buffer buffer,
 	/* Shouldn't be called unless there's something to do */
 	Assert(nredirected > 0 || ndead > 0 || nunused > 0);
 
+	/* We can only remove already-dead line pointers with 'lp_truncate_only' */
+	Assert(!lp_truncate_only || (nredirected == 0 && ndead == 0));
+
 	/* Update all redirected line pointers */
 	offnum = redirected;
 	for (int i = 0; i < nredirected; i++)
@@ -941,23 +919,29 @@ heap_page_prune_execute(Buffer buffer,
 
 #ifdef USE_ASSERT_CHECKING
 
-		/*
-		 * When heap_page_prune() was called, mark_unused_now may have been
-		 * passed as true, which allows would-be LP_DEAD items to be made
-		 * LP_UNUSED instead. This is only possible if the relation has no
-		 * indexes. If there are any dead items, then mark_unused_now was not
-		 * true and every item being marked LP_UNUSED must refer to a
-		 * heap-only tuple.
-		 */
-		if (ndead > 0)
+		if (lp_truncate_only)
 		{
-			Assert(ItemIdHasStorage(lp) && ItemIdIsNormal(lp));
-			htup = (HeapTupleHeader) PageGetItem(page, lp);
-			Assert(HeapTupleHeaderIsHeapOnly(htup));
+			/* Setting LP_DEAD to LP_UNUSED in vacuum's second pass */
+			Assert(ItemIdIsDead(lp) && !ItemIdHasStorage(lp));
 		}
 		else
 		{
-			Assert(ItemIdIsUsed(lp));
+			/*
+			 * When heap_page_prune_and_freeze() was called, mark_unused_now
+			 * may have been passed as true, which allows would-be LP_DEAD
+			 * items to be made LP_UNUSED instead. This is only possible if
+			 * the relation has no indexes. If there are any dead items, then
+			 * mark_unused_now was not true and every item being marked
+			 * LP_UNUSED must refer to a heap-only tuple.
+			 */
+			if (ndead > 0)
+			{
+				Assert(ItemIdHasStorage(lp) && ItemIdIsNormal(lp));
+				htup = (HeapTupleHeader) PageGetItem(page, lp);
+				Assert(HeapTupleHeaderIsHeapOnly(htup));
+			}
+			else
+				Assert(ItemIdIsUsed(lp));
 		}
 
 #endif
@@ -965,17 +949,22 @@ heap_page_prune_execute(Buffer buffer,
 		ItemIdSetUnused(lp);
 	}
 
-	/*
-	 * Finally, repair any fragmentation, and update the page's hint bit about
-	 * whether it has free pointers.
-	 */
-	PageRepairFragmentation(page);
+	if (lp_truncate_only)
+		PageTruncateLinePointerArray(page);
+	else
+	{
+		/*
+		 * Finally, repair any fragmentation, and update the page's hint bit
+		 * about whether it has free pointers.
+		 */
+		PageRepairFragmentation(page);
 
-	/*
-	 * Now that the page has been modified, assert that redirect items still
-	 * point to valid targets.
-	 */
-	page_verify_redirects(page);
+		/*
+		 * Now that the page has been modified, assert that redirect items
+		 * still point to valid targets.
+		 */
+		page_verify_redirects(page);
+	}
 }
 
 
@@ -1144,3 +1133,271 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 		}
 	}
 }
+
+
+/*
+ * Compare fields that describe actions required to freeze tuple with caller's
+ * open plan.  If everything matches then the frz tuple plan is equivalent to
+ * caller's plan.
+ */
+static inline bool
+heap_log_freeze_eq(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz)
+{
+	if (plan->xmax == frz->xmax &&
+		plan->t_infomask2 == frz->t_infomask2 &&
+		plan->t_infomask == frz->t_infomask &&
+		plan->frzflags == frz->frzflags)
+		return true;
+
+	/* Caller must call heap_log_freeze_new_plan again for frz */
+	return false;
+}
+
+
+/*
+ * Comparator used to deduplicate XLOG_HEAP2_FREEZE_PAGE freeze plans
+ */
+static int
+heap_log_freeze_cmp(const void *arg1, const void *arg2)
+{
+	HeapTupleFreeze *frz1 = (HeapTupleFreeze *) arg1;
+	HeapTupleFreeze *frz2 = (HeapTupleFreeze *) arg2;
+
+	if (frz1->xmax < frz2->xmax)
+		return -1;
+	else if (frz1->xmax > frz2->xmax)
+		return 1;
+
+	if (frz1->t_infomask2 < frz2->t_infomask2)
+		return -1;
+	else if (frz1->t_infomask2 > frz2->t_infomask2)
+		return 1;
+
+	if (frz1->t_infomask < frz2->t_infomask)
+		return -1;
+	else if (frz1->t_infomask > frz2->t_infomask)
+		return 1;
+
+	if (frz1->frzflags < frz2->frzflags)
+		return -1;
+	else if (frz1->frzflags > frz2->frzflags)
+		return 1;
+
+	/*
+	 * heap_log_freeze_eq would consider these tuple-wise plans to be equal.
+	 * (So the tuples will share a single canonical freeze plan.)
+	 *
+	 * We tiebreak on page offset number to keep each freeze plan's page
+	 * offset number array individually sorted. (Unnecessary, but be tidy.)
+	 */
+	if (frz1->offset < frz2->offset)
+		return -1;
+	else if (frz1->offset > frz2->offset)
+		return 1;
+
+	Assert(false);
+	return 0;
+}
+
+/*
+ * Start new plan initialized using tuple-level actions.  At least one tuple
+ * will have steps required to freeze described by caller's plan during REDO.
+ */
+static inline void
+heap_log_freeze_new_plan(xl_heap_freeze_plan *plan, HeapTupleFreeze *frz)
+{
+	plan->xmax = frz->xmax;
+	plan->t_infomask2 = frz->t_infomask2;
+	plan->t_infomask = frz->t_infomask;
+	plan->frzflags = frz->frzflags;
+	plan->ntuples = 1;			/* for now */
+}
+
+/*
+ * Deduplicate tuple-based freeze plans so that each distinct set of
+ * processing steps is only stored once in XLOG_HEAP2_FREEZE_PAGE records.
+ * Called during original execution of freezing (for logged relations).
+ *
+ * Return value is number of plans set in *plans_out for caller.  Also writes
+ * an array of offset numbers into *offsets_out output argument for caller
+ * (actually there is one array per freeze plan, but that's not of immediate
+ * concern to our caller).
+ */
+static int
+heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples,
+					 xl_heap_freeze_plan *plans_out,
+					 OffsetNumber *offsets_out)
+{
+	int			nplans = 0;
+
+	/* Sort tuple-based freeze plans in the order required to deduplicate */
+	qsort(tuples, ntuples, sizeof(HeapTupleFreeze), heap_log_freeze_cmp);
+
+	for (int i = 0; i < ntuples; i++)
+	{
+		HeapTupleFreeze *frz = tuples + i;
+
+		if (i == 0)
+		{
+			/* New canonical freeze plan starting with first tup */
+			heap_log_freeze_new_plan(plans_out, frz);
+			nplans++;
+		}
+		else if (heap_log_freeze_eq(plans_out, frz))
+		{
+			/* tup matches open canonical plan -- include tup in it */
+			Assert(offsets_out[i - 1] < frz->offset);
+			plans_out->ntuples++;
+		}
+		else
+		{
+			/* Tup doesn't match current plan -- done with it now */
+			plans_out++;
+
+			/* New canonical freeze plan starting with this tup */
+			heap_log_freeze_new_plan(plans_out, frz);
+			nplans++;
+		}
+
+		/*
+		 * Save page offset number in dedicated buffer in passing.
+		 *
+		 * REDO routine relies on the record's offset numbers array grouping
+		 * offset numbers by freeze plan.  The sort order within each grouping
+		 * is ascending offset number order, just to keep things tidy.
+		 */
+		offsets_out[i] = frz->offset;
+	}
+
+	Assert(nplans > 0 && nplans <= ntuples);
+
+	return nplans;
+}
+
+void
+log_heap_prune_and_freeze(Relation relation, Buffer buffer,
+						  TransactionId conflict_xid,
+						  bool lp_truncate_only,
+						  HeapTupleFreeze *frozen, int nfrozen,
+						  OffsetNumber *redirected, int nredirected,
+						  OffsetNumber *dead, int ndead,
+						  OffsetNumber *unused, int nunused)
+{
+	xl_heap_prune xlrec;
+	xlhp_conflict_horizon horizon;
+	XLogRecPtr	recptr;
+	xlhp_freeze freeze;
+	xlhp_prune_items redirect_items,
+				dead_items,
+				unused_items;
+
+	int			nplans = 0;
+	xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
+	OffsetNumber frz_offsets[MaxHeapTuplesPerPage];
+	bool		do_freeze = (nfrozen > 0);
+
+	xlrec.flags = 0;
+
+	if (lp_truncate_only)
+	{
+		xlrec.flags |= XLHP_LP_TRUNCATE_ONLY;
+		Assert(nfrozen == 0 && nredirected == 0 && ndead == 0);
+	}
+
+	if (RelationIsAccessibleInLogicalDecoding(relation))
+		xlrec.flags |= XLHP_IS_CATALOG_REL;
+
+	if (TransactionIdIsValid(conflict_xid))
+		xlrec.flags |= XLHP_HAS_CONFLICT_HORIZON;
+
+	/*
+	 * Prepare deduplicated representation for use in WAL record Destructively
+	 * sorts tuples array in-place.
+	 */
+	if (do_freeze)
+		nplans = heap_log_freeze_plan(frozen, nfrozen, plans, frz_offsets);
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfHeapPrune);
+
+	if (TransactionIdIsValid(conflict_xid))
+		XLogRegisterData((char *) &horizon, SizeOfSnapshotConflictHorizon);
+
+	XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+
+	/*
+	 * The OffsetNumber arrays are not actually in the buffer, but we pretend
+	 * that they are.  When XLogInsert stores the whole buffer, the offset
+	 * arrays need not be stored too.
+	 */
+	if (nplans > 0)
+	{
+		xlrec.flags |= XLHP_HAS_FREEZE_PLANS;
+
+		freeze = (xlhp_freeze)
+		{
+			.nplans = nplans
+		};
+
+		XLogRegisterBufData(0, (char *) &freeze, offsetof(xlhp_freeze, plans));
+
+		XLogRegisterBufData(0, (char *) plans,
+							sizeof(xl_heap_freeze_plan) * freeze.nplans);
+	}
+
+
+	if (nredirected > 0)
+	{
+		xlrec.flags |= XLHP_HAS_REDIRECTIONS;
+
+		redirect_items = (xlhp_prune_items)
+		{
+			.ntargets = nredirected
+		};
+
+		XLogRegisterBufData(0, (char *) &redirect_items,
+							offsetof(xlhp_prune_items, data));
+
+		XLogRegisterBufData(0, (char *) redirected,
+							sizeof(OffsetNumber[2]) * nredirected);
+	}
+
+	if (ndead > 0)
+	{
+		xlrec.flags |= XLHP_HAS_DEAD_ITEMS;
+
+		dead_items = (xlhp_prune_items)
+		{
+			.ntargets = ndead
+		};
+
+		XLogRegisterBufData(0, (char *) &dead_items,
+							offsetof(xlhp_prune_items, data));
+
+		XLogRegisterBufData(0, (char *) dead,
+							sizeof(OffsetNumber) * dead_items.ntargets);
+	}
+
+	if (nunused > 0)
+	{
+		xlrec.flags |= XLHP_HAS_NOW_UNUSED_ITEMS;
+
+		unused_items = (xlhp_prune_items)
+		{
+			.ntargets = nunused
+		};
+
+		XLogRegisterBufData(0, (char *) &unused_items,
+							offsetof(xlhp_prune_items, data));
+
+		XLogRegisterBufData(0, (char *) unused,
+							sizeof(OffsetNumber) * unused_items.ntargets);
+	}
+
+	if (nplans > 0)
+		XLogRegisterBufData(0, (char *) frz_offsets,
+							sizeof(OffsetNumber) * nfrozen);
+
+	recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
+
+	PageSetLSN(BufferGetPage(buffer), recptr);
+}
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 18004907750..25e8f0c30a7 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2546,20 +2546,12 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 	/* XLOG stuff */
 	if (RelationNeedsWAL(vacrel->rel))
 	{
-		xl_heap_vacuum xlrec;
-		XLogRecPtr	recptr;
-
-		xlrec.nunused = nunused;
-
-		XLogBeginInsert();
-		XLogRegisterData((char *) &xlrec, SizeOfHeapVacuum);
-
-		XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-		XLogRegisterBufData(0, (char *) unused, nunused * sizeof(OffsetNumber));
-
-		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VACUUM);
-
-		PageSetLSN(page, recptr);
+		log_heap_prune_and_freeze(vacrel->rel, buffer,
+								  InvalidTransactionId, true,
+								  NULL, 0,	/* frozen */
+								  NULL, 0,	/* redirected */
+								  NULL, 0,	/* dead */
+								  unused, nunused);
 	}
 
 	/*
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index 36a3d83c8c2..0d7edffff20 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -91,6 +91,74 @@ plan_elem_desc(StringInfo buf, void *plan, void *data)
 	appendStringInfoString(buf, " }");
 }
 
+
+/*
+ * Given a MAXALIGNed buffer returned by XLogRecGetBlockData() and pointed to
+ * by cursor and any xl_heap_prune flags, deserialize the arrays of
+ * OffsetNumbers contained in an xl_heap_prune record. This is in this file so
+ * it can be shared between heap2_redo and heap2_desc code, the latter of which
+ * is used in frontend code.
+ */
+void
+heap_xlog_deserialize_prune_and_freeze(char *cursor, uint8 flags,
+									   int *nredirected, OffsetNumber **redirected,
+									   int *ndead, OffsetNumber **nowdead,
+									   int *nunused, OffsetNumber **nowunused,
+									   int *nplans, xl_heap_freeze_plan **plans,
+									   OffsetNumber **frz_offsets)
+{
+	if (flags & XLHP_HAS_FREEZE_PLANS)
+	{
+		xlhp_freeze *freeze = (xlhp_freeze *) cursor;
+
+		*nplans = freeze->nplans;
+		Assert(*nplans > 0);
+		*plans = freeze->plans;
+
+		cursor += offsetof(xlhp_freeze, plans);
+		cursor += sizeof(xl_heap_freeze_plan) * freeze->nplans;
+	}
+
+	if (flags & XLHP_HAS_REDIRECTIONS)
+	{
+		xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
+
+		*nredirected = subrecord->ntargets;
+		Assert(nredirected > 0);
+		*redirected = &subrecord->data[0];
+
+		cursor += offsetof(xlhp_prune_items, data);
+		cursor += sizeof(OffsetNumber[2]) * *nredirected;
+	}
+
+	if (flags & XLHP_HAS_DEAD_ITEMS)
+	{
+		xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
+
+		*ndead = subrecord->ntargets;
+		Assert(ndead > 0);
+		*nowdead = subrecord->data;
+
+		cursor += offsetof(xlhp_prune_items, data);
+		cursor += sizeof(OffsetNumber) * *ndead;
+	}
+
+	if (flags & XLHP_HAS_NOW_UNUSED_ITEMS)
+	{
+		xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
+
+		*nunused = subrecord->ntargets;
+		Assert(nunused > 0);
+		*nowunused = subrecord->data;
+
+		cursor += offsetof(xlhp_prune_items, data);
+		cursor += sizeof(OffsetNumber) * *nunused;
+	}
+
+	if (nplans > 0)
+		*frz_offsets = (OffsetNumber *) cursor;
+}
+
 void
 heap_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -179,82 +247,68 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_heap_prune *xlrec = (xl_heap_prune *) rec;
 
-		appendStringInfo(buf, "snapshotConflictHorizon: %u, nredirected: %u, ndead: %u, isCatalogRel: %c",
-						 xlrec->snapshotConflictHorizon,
-						 xlrec->nredirected,
-						 xlrec->ndead,
-						 xlrec->isCatalogRel ? 'T' : 'F');
-
-		if (XLogRecHasBlockData(record, 0))
-		{
-			OffsetNumber *end;
-			OffsetNumber *redirected;
-			OffsetNumber *nowdead;
-			OffsetNumber *nowunused;
-			int			nredirected;
-			int			nunused;
-			Size		datalen;
-
-			redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0,
-															  &datalen);
-
-			nredirected = xlrec->nredirected;
-			end = (OffsetNumber *) ((char *) redirected + datalen);
-			nowdead = redirected + (nredirected * 2);
-			nowunused = nowdead + xlrec->ndead;
-			nunused = (end - nowunused);
-			Assert(nunused >= 0);
-
-			appendStringInfo(buf, ", nunused: %d", nunused);
-
-			appendStringInfoString(buf, ", redirected:");
-			array_desc(buf, redirected, sizeof(OffsetNumber) * 2,
-					   nredirected, &redirect_elem_desc, NULL);
-			appendStringInfoString(buf, ", dead:");
-			array_desc(buf, nowdead, sizeof(OffsetNumber), xlrec->ndead,
-					   &offset_elem_desc, NULL);
-			appendStringInfoString(buf, ", unused:");
-			array_desc(buf, nowunused, sizeof(OffsetNumber), nunused,
-					   &offset_elem_desc, NULL);
-		}
-	}
-	else if (info == XLOG_HEAP2_VACUUM)
-	{
-		xl_heap_vacuum *xlrec = (xl_heap_vacuum *) rec;
-
-		appendStringInfo(buf, "nunused: %u", xlrec->nunused);
-
-		if (XLogRecHasBlockData(record, 0))
+		if (xlrec->flags & XLHP_HAS_CONFLICT_HORIZON)
 		{
-			OffsetNumber *nowunused;
-
-			nowunused = (OffsetNumber *) XLogRecGetBlockData(record, 0, NULL);
+			xlhp_conflict_horizon *horizon = (xlhp_conflict_horizon *) (xlrec + SizeOfHeapPrune);
 
-			appendStringInfoString(buf, ", unused:");
-			array_desc(buf, nowunused, sizeof(OffsetNumber), xlrec->nunused,
-					   &offset_elem_desc, NULL);
+			appendStringInfo(buf, "snapshotConflictHorizon: %u",
+							 horizon->xid);
 		}
-	}
-	else if (info == XLOG_HEAP2_FREEZE_PAGE)
-	{
-		xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) rec;
 
-		appendStringInfo(buf, "snapshotConflictHorizon: %u, nplans: %u, isCatalogRel: %c",
-						 xlrec->snapshotConflictHorizon, xlrec->nplans,
-						 xlrec->isCatalogRel ? 'T' : 'F');
+		appendStringInfo(buf, ", isCatalogRel: %c",
+						 xlrec->flags & XLHP_IS_CATALOG_REL ? 'T' : 'F');
 
 		if (XLogRecHasBlockData(record, 0))
 		{
-			xl_heap_freeze_plan *plans;
-			OffsetNumber *offsets;
-
-			plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, NULL);
-			offsets = (OffsetNumber *) ((char *) plans +
-										(xlrec->nplans *
-										 sizeof(xl_heap_freeze_plan)));
-			appendStringInfoString(buf, ", plans:");
-			array_desc(buf, plans, sizeof(xl_heap_freeze_plan), xlrec->nplans,
-					   &plan_elem_desc, &offsets);
+			Size		datalen;
+			OffsetNumber *redirected = NULL;
+			OffsetNumber *nowdead = NULL;
+			OffsetNumber *nowunused = NULL;
+			int			nredirected = 0;
+			int			nunused = 0;
+			int			ndead = 0;
+			int			nplans = 0;
+			xl_heap_freeze_plan *plans = NULL;
+			OffsetNumber *frz_offsets;
+
+			char	   *cursor = XLogRecGetBlockData(record, 0, &datalen);
+
+			heap_xlog_deserialize_prune_and_freeze(cursor, xlrec->flags,
+												   &nredirected, &redirected,
+												   &ndead, &nowdead,
+												   &nunused, &nowunused,
+												   &nplans, &plans, &frz_offsets);
+
+			appendStringInfo(buf, ", nredirected: %u, ndead: %u, nunused: %u, nplans: %u,",
+							 nredirected, ndead, nunused, nplans);
+
+			if (nredirected > 0)
+			{
+				appendStringInfoString(buf, ", redirected:");
+				array_desc(buf, redirected, sizeof(OffsetNumber) * 2,
+						   nredirected, &redirect_elem_desc, NULL);
+			}
+
+			if (ndead > 0)
+			{
+				appendStringInfoString(buf, ", dead:");
+				array_desc(buf, nowdead, sizeof(OffsetNumber), ndead,
+						   &offset_elem_desc, NULL);
+			}
+
+			if (nunused > 0)
+			{
+				appendStringInfoString(buf, ", unused:");
+				array_desc(buf, nowunused, sizeof(OffsetNumber), nunused,
+						   &offset_elem_desc, NULL);
+			}
+
+			if (nplans > 0)
+			{
+				appendStringInfoString(buf, ", plans:");
+				array_desc(buf, plans, sizeof(xl_heap_freeze_plan), nplans,
+						   &plan_elem_desc, &frz_offsets);
+			}
 		}
 	}
 	else if (info == XLOG_HEAP2_VISIBLE)
@@ -358,12 +412,6 @@ heap2_identify(uint8 info)
 		case XLOG_HEAP2_PRUNE:
 			id = "PRUNE";
 			break;
-		case XLOG_HEAP2_VACUUM:
-			id = "VACUUM";
-			break;
-		case XLOG_HEAP2_FREEZE_PAGE:
-			id = "FREEZE_PAGE";
-			break;
 		case XLOG_HEAP2_VISIBLE:
 			id = "VISIBLE";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e5ab7b78b78..38d1bdd825e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -445,9 +445,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * Everything else here is just low level physical stuff we're not
 			 * interested in.
 			 */
-		case XLOG_HEAP2_FREEZE_PAGE:
 		case XLOG_HEAP2_PRUNE:
-		case XLOG_HEAP2_VACUUM:
 		case XLOG_HEAP2_VISIBLE:
 		case XLOG_HEAP2_LOCK_UPDATED:
 			break;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4b133f68593..ca6ddab91ea 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -323,11 +323,18 @@ extern void heap_page_prune(Relation relation, Buffer buffer,
 							bool mark_unused_now,
 							PruneResult *presult,
 							OffsetNumber *off_loc);
-extern void heap_page_prune_execute(Buffer buffer,
+extern void heap_page_prune_execute(Buffer buffer, bool lp_truncate_only,
 									OffsetNumber *redirected, int nredirected,
 									OffsetNumber *nowdead, int ndead,
 									OffsetNumber *nowunused, int nunused);
 extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
+extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
+									  TransactionId conflict_xid,
+									  bool lp_truncate_only,
+									  HeapTupleFreeze *frozen, int nfrozen,
+									  OffsetNumber *redirected, int nredirected,
+									  OffsetNumber *dead, int ndead,
+									  OffsetNumber *unused, int nunused);
 
 /* in heap/vacuumlazy.c */
 struct VacuumParams;
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 6488dad5e64..dfeb703d136 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -52,12 +52,10 @@
  */
 #define XLOG_HEAP2_REWRITE		0x00
 #define XLOG_HEAP2_PRUNE		0x10
-#define XLOG_HEAP2_VACUUM		0x20
-#define XLOG_HEAP2_FREEZE_PAGE	0x30
-#define XLOG_HEAP2_VISIBLE		0x40
-#define XLOG_HEAP2_MULTI_INSERT 0x50
-#define XLOG_HEAP2_LOCK_UPDATED 0x60
-#define XLOG_HEAP2_NEW_CID		0x70
+#define XLOG_HEAP2_VISIBLE		0x20
+#define XLOG_HEAP2_MULTI_INSERT 0x30
+#define XLOG_HEAP2_LOCK_UPDATED 0x40
+#define XLOG_HEAP2_NEW_CID		0x50
 
 /*
  * xl_heap_insert/xl_heap_multi_insert flag values, 8 bits are available.
@@ -227,44 +225,108 @@ typedef struct xl_heap_update
 #define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_offnum) + sizeof(OffsetNumber))
 
 /*
- * This is what we need to know about page pruning (both during VACUUM and
- * during opportunistic pruning)
+ * This is what we need to know about page pruning and freezing, both during
+ * VACUUM and during opportunistic pruning.
  *
- * The array of OffsetNumbers following the fixed part of the record contains:
- *	* for each redirected item: the item offset, then the offset redirected to
- *	* for each now-dead item: the item offset
- *	* for each now-unused item: the item offset
- * The total number of OffsetNumbers is therefore 2*nredirected+ndead+nunused.
- * Note that nunused is not explicitly stored, but may be found by reference
- * to the total record length.
+ * If XLPH_HAS_REDIRECTIONS, XLHP_HAS_DEAD_ITEMS, or XLHP_HAS_NOW_UNUSED is set,
+ * acquires a full cleanup lock. Otherwise an ordinary exclusive lock is
+ * enough. This can happen if freezing was the only modification to the page.
  *
- * Acquires a full cleanup lock.
+ * The data for block reference 0 contains "sub-records" depending on which
+ * of the XLHP_HAS_* flags are set. See xlhp_* struct definitions below.
+ * The layout is in the same order as the XLHP_* flags.
+ *
+ * OFFSET NUMBERS are in the block reference 0
+ *
+ * If only unused item offsets are included because the record is constructed
+ * during vacuum's second pass (marking LP_DEAD items LP_UNUSED) then only an
+ * ordinary exclusive lock is required to replay.
  */
 typedef struct xl_heap_prune
 {
-	TransactionId snapshotConflictHorizon;
-	uint16		nredirected;
-	uint16		ndead;
-	bool		isCatalogRel;	/* to handle recovery conflict during logical
-								 * decoding on standby */
-	/* OFFSET NUMBERS are in the block reference 0 */
+	uint8		flags;
 } xl_heap_prune;
 
-#define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
+/* to handle recovery conflict during logical decoding on standby */
+#define		XLHP_IS_CATALOG_REL			(1 << 1)
+
+/*
+ * During vacuum's second pass which sets LP_DEAD items LP_UNUSED, we will only
+ * truncate the line pointer array, not call PageRepairFragmentation. We need
+ * this flag to differentiate what kind of lock (exclusive or cleanup) to take
+ * on the buffer and whether to call PageTruncateLinePointerArray() or
+ * PageRepairFragementation().
+ */
+#define		XLHP_LP_TRUNCATE_ONLY       (1 << 2)
+
+/*
+ * Vacuum's first pass and on-access pruning may need to include a snapshot
+ * conflict horizon.
+ */
+#define		XLHP_HAS_CONFLICT_HORIZON   (1 << 3)
+#define		XLHP_HAS_FREEZE_PLANS		(1 << 4)
+#define		XLHP_HAS_REDIRECTIONS		(1 << 5)
+#define		XLHP_HAS_DEAD_ITEMS	        (1 << 6)
+#define		XLHP_HAS_NOW_UNUSED_ITEMS   (1 << 7)
+
+#define SizeOfHeapPrune (offsetof(xl_heap_prune, flags) + sizeof(uint8))
+
+typedef struct xlhp_conflict_horizon
+{
+	TransactionId xid;
+} xlhp_conflict_horizon;
+
+#define SizeOfSnapshotConflictHorizon (offsetof(xlhp_conflict_horizon, xid) + sizeof(uint32))
 
 /*
- * The vacuum page record is similar to the prune record, but can only mark
- * already LP_DEAD items LP_UNUSED (during VACUUM's second heap pass)
+ * This struct represents a 'freeze plan', which describes how to freeze a
+ * group of one or more heap tuples (appears in xl_heap_prune's xlhp_freeze
+ * record)
+ */
+/* 0x01 was XLH_FREEZE_XMIN */
+#define		XLH_FREEZE_XVAC		0x02
+#define		XLH_INVALID_XVAC	0x04
+
+typedef struct xl_heap_freeze_plan
+{
+	TransactionId xmax;
+	uint16		t_infomask2;
+	uint16		t_infomask;
+	uint8		frzflags;
+
+	/* Length of individual page offset numbers array for this plan */
+	uint16		ntuples;
+} xl_heap_freeze_plan;
+
+/*
+ * As of Postgres 17, XLOG_HEAP2_PRUNE records replace
+ * XLOG_HEAP2_FREEZE_PAGE records.
  *
- * Acquires an ordinary exclusive lock only.
+ * This is what we need to know about a block being frozen during vacuum
+ *
+ * Backup block 0's data contains an array of xl_heap_freeze_plan structs
+ * (with nplans elements), followed by one or more page offset number arrays.
+ * Each such page offset number array corresponds to a single freeze plan
+ * (REDO routine freezes corresponding heap tuples using freeze plan).
+ */
+typedef struct xlhp_freeze
+{
+	uint16		nplans;
+	xl_heap_freeze_plan plans[FLEXIBLE_ARRAY_MEMBER];
+} xlhp_freeze;
+
+/*
+ * Sub-record type contained in block reference 0 of a prune record if
+ * XLHP_HAS_REDIRECTIONS/XLHP_HAS_DEAD_ITEMS/XLHP_HAS_NOW_UNUSED_ITEMS is set.
+ * Note that in the XLHP_HAS_REDIRECTIONS variant, there are actually 2 *
+ * length number of OffsetNumbers in the data.
  */
-typedef struct xl_heap_vacuum
+typedef struct xlhp_prune_items
 {
-	uint16		nunused;
-	/* OFFSET NUMBERS are in the block reference 0 */
-} xl_heap_vacuum;
+	uint16		ntargets;
+	OffsetNumber data[FLEXIBLE_ARRAY_MEMBER];
+} xlhp_prune_items;
 
-#define SizeOfHeapVacuum (offsetof(xl_heap_vacuum, nunused) + sizeof(uint16))
 
 /* flags for infobits_set */
 #define XLHL_XMAX_IS_MULTI		0x01
@@ -315,47 +377,6 @@ typedef struct xl_heap_inplace
 
 #define SizeOfHeapInplace	(offsetof(xl_heap_inplace, offnum) + sizeof(OffsetNumber))
 
-/*
- * This struct represents a 'freeze plan', which describes how to freeze a
- * group of one or more heap tuples (appears in xl_heap_freeze_page record)
- */
-/* 0x01 was XLH_FREEZE_XMIN */
-#define		XLH_FREEZE_XVAC		0x02
-#define		XLH_INVALID_XVAC	0x04
-
-typedef struct xl_heap_freeze_plan
-{
-	TransactionId xmax;
-	uint16		t_infomask2;
-	uint16		t_infomask;
-	uint8		frzflags;
-
-	/* Length of individual page offset numbers array for this plan */
-	uint16		ntuples;
-} xl_heap_freeze_plan;
-
-/*
- * This is what we need to know about a block being frozen during vacuum
- *
- * Backup block 0's data contains an array of xl_heap_freeze_plan structs
- * (with nplans elements), followed by one or more page offset number arrays.
- * Each such page offset number array corresponds to a single freeze plan
- * (REDO routine freezes corresponding heap tuples using freeze plan).
- */
-typedef struct xl_heap_freeze_page
-{
-	TransactionId snapshotConflictHorizon;
-	uint16		nplans;
-	bool		isCatalogRel;	/* to handle recovery conflict during logical
-								 * decoding on standby */
-
-	/*
-	 * In payload of blk 0 : FREEZE PLANS and OFFSET NUMBER ARRAY
-	 */
-} xl_heap_freeze_page;
-
-#define SizeOfHeapFreezePage	(offsetof(xl_heap_freeze_page, isCatalogRel) + sizeof(bool))
-
 /*
  * This is what we need to know about setting a visibility map bit
  *
@@ -418,4 +439,11 @@ extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
 								   TransactionId snapshotConflictHorizon,
 								   uint8 vmflags);
 
+extern void heap_xlog_deserialize_prune_and_freeze(char *cursor, uint8 flags,
+												   int *nredirected, OffsetNumber **redirected,
+												   int *ndead, OffsetNumber **nowdead,
+												   int *nunused, OffsetNumber **nowunused,
+												   int *nplans, xl_heap_freeze_plan **plans,
+												   OffsetNumber **frz_offsets);
+
 #endif							/* HEAPAM_XLOG_H */
-- 
2.39.2

