From 2c8c6c737259d3fff0026187f029987a57517b76 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Fri, 29 Mar 2024 15:22:18 -0400
Subject: [PATCH v10 03/10] Mark all line pointers during pruning

In anticipation of preparing to freeze and counting tuples which are not
candidates for pruning, this commit introduces heap_prune_record*()
functions for marking a line pointer which will not change. It also
introduces an assert to check that every offset was marked once.
---
 src/backend/access/heap/pruneheap.c | 108 +++++++++++++++++++++++++---
 1 file changed, 98 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index c89b0f613d..6188c5b2f3 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -69,6 +69,11 @@ static void heap_prune_record_redirect(PruneState *prstate,
 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum, bool was_normal);
 static void heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal);
 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal);
+
+static void heap_prune_record_unchanged(Page page, PruneState *prstate, OffsetNumber offnum);
+static void heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum);
+static void heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum);
+
 static void page_verify_redirects(Page page);
 
 
@@ -330,6 +335,34 @@ heap_page_prune(Relation relation, Buffer buffer,
 		heap_prune_chain(buffer, offnum, presult->htsv, &prstate);
 	}
 
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid = PageGetItemId(page, offnum);
+
+		if (ItemIdIsUsed(itemid) && !prstate.marked[offnum])
+			heap_prune_record_unchanged(page, &prstate, offnum);
+	}
+
+/* We should now have processed every tuple exactly once  */
+#ifdef USE_ASSERT_CHECKING
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid;
+
+		if (off_loc)
+			*off_loc = offnum;
+		itemid = PageGetItemId(page, offnum);
+		if (ItemIdIsUsed(itemid))
+			Assert(prstate.marked[offnum]);
+		else
+			Assert(!prstate.marked[offnum]);
+	}
+#endif
+
 	/* Clear the offset information once we have processed the given page. */
 	if (off_loc)
 		*off_loc = InvalidOffsetNumber;
@@ -583,6 +616,8 @@ heap_prune_chain(Buffer buffer, OffsetNumber rootoffnum,
 			 */
 			if (unlikely(prstate->mark_unused_now))
 				heap_prune_record_unused(prstate, offnum, false);
+			else
+				heap_prune_record_unchanged_lp_dead(prstate, offnum);
 
 			break;
 		}
@@ -702,10 +737,18 @@ process_chains:
 
 	if (ndeadchain == 0)
 	{
+		int			i;
+
 		/*
-		 * If no DEAD tuple was found, the chain is entirely composed of
-		 * normal, unchanged tuples, leave it alone.
+		 * If no DEAD tuple was found, and the root is redirected, mark it as
+		 * such.
 		 */
+		if ((i = ItemIdIsRedirected(rootlp)))
+			heap_prune_record_unchanged_lp_redirect(prstate, rootoffnum);
+
+		/* the rest of tuples in the chain are normal, unchanged tuples */
+		for (; i < nchain; i++)
+			heap_prune_record_unchanged(dp, prstate, chainitems[i]);
 	}
 	else if (ndeadchain == nchain)
 	{
@@ -736,6 +779,8 @@ process_chains:
 			heap_prune_record_unused(prstate, chainitems[i], true);
 
 		/* the rest of tuples in the chain are normal, unchanged tuples */
+		for (int i = ndeadchain; i < nchain; i++)
+			heap_prune_record_unchanged(dp, prstate, chainitems[i]);
 	}
 }
 
@@ -759,14 +804,19 @@ heap_prune_record_redirect(PruneState *prstate,
 						   OffsetNumber offnum, OffsetNumber rdoffnum,
 						   bool was_normal)
 {
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
+	/*
+	 * Do not mark the redirect target here.  It needs to be counted
+	 * separately as an unchanged tuple.
+	 */
+
 	Assert(prstate->nredirected < MaxHeapTuplesPerPage);
 	prstate->redirected[prstate->nredirected * 2] = offnum;
 	prstate->redirected[prstate->nredirected * 2 + 1] = rdoffnum;
+
 	prstate->nredirected++;
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
-	Assert(!prstate->marked[rdoffnum]);
-	prstate->marked[rdoffnum] = true;
 
 	/*
 	 * If the root entry had been a normal tuple, we are deleting it, so count
@@ -782,11 +832,12 @@ static void
 heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum,
 					   bool was_normal)
 {
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
 	Assert(prstate->ndead < MaxHeapTuplesPerPage);
 	prstate->nowdead[prstate->ndead] = offnum;
 	prstate->ndead++;
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
 
 	/*
 	 * If the root entry had been a normal tuple, we are deleting it, so count
@@ -823,11 +874,12 @@ heap_prune_record_dead_or_unused(PruneState *prstate, OffsetNumber offnum,
 static void
 heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_normal)
 {
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+
 	Assert(prstate->nunused < MaxHeapTuplesPerPage);
 	prstate->nowunused[prstate->nunused] = offnum;
 	prstate->nunused++;
-	Assert(!prstate->marked[offnum]);
-	prstate->marked[offnum] = true;
 
 	/*
 	 * If the root entry had been a normal tuple, we are deleting it, so count
@@ -838,6 +890,42 @@ heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum, bool was_norm
 		prstate->ndeleted++;
 }
 
+/*
+ * Record LP_NORMAL line pointer that is left unchanged.
+ */
+static void
+heap_prune_record_unchanged(Page page, PruneState *prstate, OffsetNumber offnum)
+{
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+}
+
+
+/*
+ * Record line pointer that was already LP_DEAD and is left unchanged.
+ */
+static void
+heap_prune_record_unchanged_lp_dead(PruneState *prstate, OffsetNumber offnum)
+{
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+}
+
+
+static void
+heap_prune_record_unchanged_lp_redirect(PruneState *prstate, OffsetNumber offnum)
+{
+	/*
+	 * A redirect line pointer doesn't count as a live tuple.
+	 *
+	 * If we leave a redirect line pointer in place, there will be another
+	 * tuple on the page that it points to.  We will do the bookkeeping for
+	 * that separately. So we have nothing to do here, except remember that we
+	 * processed this item.
+	 */
+	Assert(!prstate->marked[offnum]);
+	prstate->marked[offnum] = true;
+}
 
 /*
  * Perform the actual page changes needed by heap_page_prune.
-- 
2.40.1

