From 5adc1d3c6c4526b723a114ca00eacdea20143861 Mon Sep 17 00:00:00 2001
From: Himanshu Upadhyaya <himanshu.upadhyaya@enterprisedb.com>
Date: Tue, 6 Sep 2022 15:59:40 +0530
Subject: [PATCH v2] HOT chain validation in verify_heapam.

---
 contrib/amcheck/verify_heapam.c | 228 +++++++++++++++++++++++++++++---
 1 file changed, 206 insertions(+), 22 deletions(-)

diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index c875f3e5a2..cc69ccb239 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -158,6 +158,7 @@ static void check_toast_tuple(HeapTuple toasttup, HeapCheckContext *ctx,
 static bool check_tuple_attribute(HeapCheckContext *ctx);
 static void check_toasted_attribute(HeapCheckContext *ctx,
 									ToastedAttribute *ta);
+static bool check_lp(HeapCheckContext *ctx, uint16 lp_len, uint16 lp_off);
 
 static bool check_tuple_header(HeapCheckContext *ctx);
 static bool check_tuple_visibility(HeapCheckContext *ctx);
@@ -399,6 +400,7 @@ verify_heapam(PG_FUNCTION_ARGS)
 	for (ctx.blkno = first_block; ctx.blkno <= last_block; ctx.blkno++)
 	{
 		OffsetNumber maxoff;
+		OffsetNumber predecessor[MaxOffsetNumber] = {0};
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -433,6 +435,10 @@ verify_heapam(PG_FUNCTION_ARGS)
 		for (ctx.offnum = FirstOffsetNumber; ctx.offnum <= maxoff;
 			 ctx.offnum = OffsetNumberNext(ctx.offnum))
 		{
+			OffsetNumber nextTupOffnum;
+			HeapTupleHeader htup;
+			OffsetNumber	currOffnum = ctx.offnum;
+
 			ctx.itemid = PageGetItemId(ctx.page, ctx.offnum);
 
 			/* Skip over unused/dead line pointers */
@@ -469,44 +475,178 @@ verify_heapam(PG_FUNCTION_ARGS)
 					report_corruption(&ctx,
 									  psprintf("line pointer redirection to unused item at offset %u",
 											   (unsigned) rdoffnum));
+
+				ctx.offnum = rdoffnum;
+				if (!check_lp(&ctx, ItemIdGetLength(rditem), ItemIdGetOffset(rditem)))
+					continue;
+				ctx.offnum = currOffnum;
+
+				htup = (HeapTupleHeader) PageGetItem(ctx.page, rditem);
+				if (!HeapTupleHeaderIsHeapOnly(htup))
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap only tuple",
+											   (unsigned) rdoffnum));
+				}
+				if ((htup->t_infomask & HEAP_UPDATED) == 0)
+				{
+					report_corruption(&ctx,
+									  psprintf("redirected tuple at line pointer offset %u is not heap updated tuple",
+											   (unsigned) rdoffnum));
+				}
 				continue;
 			}
 
 			/* Sanity-check the line pointer's offset and length values */
 			ctx.lp_len = ItemIdGetLength(ctx.itemid);
 			ctx.lp_off = ItemIdGetOffset(ctx.itemid);
+			/*
+			 * If offnum is already present in predecessor array means it was
+			 * previously sanity-checked.
+			 */
+			if (predecessor[ctx.offnum] == 0 && !check_lp(&ctx, ctx.lp_len, ctx.lp_off))
+				continue;
+
+			/* It should be safe to examine the tuple's header, at least */
+			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
+			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
+
+			/* Ok, ready to check this next tuple */
+			check_tuple(&ctx);
 
-			if (ctx.lp_off != MAXALIGN(ctx.lp_off))
+			/*
+			 * Add line pointer offset to predecessor array. 1) If xmax is
+			 * matching with xmin of next Tuple(reaching via its t_ctid). 2)
+			 * If next tuple is in the same page. Raise corruption if: We have
+			 * two tuples having same predecessor.
+			 *
+			 * We add offset to predecessor irrespective of
+			 * transaction(t_xmin) status. We will do validation related to
+			 * transaction status(and also all other validations) when we loop
+			 * over predecessor array.
+			 */
+			nextTupOffnum = ItemPointerGetOffsetNumber(&(ctx.tuphdr)->t_ctid);
+			if (ItemPointerGetBlockNumber(&(ctx.tuphdr)->t_ctid) == ctx.blkno &&
+				nextTupOffnum != ctx.offnum)
 			{
-				report_corruption(&ctx,
-								  psprintf("line pointer to page offset %u is not maximally aligned",
-										   ctx.lp_off));
+				TransactionId currTupXmax;
+				ItemId		lp;
+
+				currTupXmax = HeapTupleHeaderGetUpdateXid(ctx.tuphdr);
+				lp = PageGetItemId(ctx.page, nextTupOffnum);
+				/*
+				 * Sanity-check the next line pointer's offset and length
+				 * values ctx.offnum must be pointing to next tuple offset as
+				 * check_lp can refer to ctx.offnum while calling
+				 * report_corruption. We are not updating the other fields in
+				 * ctx like itemid, natts etc as we are just going to iterate
+				 * the next Tuple after populating predecessor array.
+				 */
+				ctx.offnum = nextTupOffnum;
+				ctx.attnum = -1;
+				if (!check_lp(&ctx, ItemIdGetLength(lp), ItemIdGetOffset(lp)))
+					continue;
+				htup = (HeapTupleHeader) PageGetItem(ctx.page, lp);
+
+				/* Revert back to old offset for below corruption. */
+				ctx.offnum = currOffnum;
+
+				if (TransactionIdIsValid(currTupXmax) &&
+					TransactionIdEquals(HeapTupleHeaderGetXmin(htup), currTupXmax))
+				{
+					if (predecessor[nextTupOffnum] != 0)
+					{
+						report_corruption(&ctx,
+										  psprintf("updated version at offset %u is also the updated version of tuple at offset %u",
+												   (unsigned) nextTupOffnum, (unsigned) predecessor[nextTupOffnum]));
+						continue;
+					}
+					predecessor[nextTupOffnum] = ctx.offnum;
+				}
+				/* Non matching XMAX with XMIN is not a corruption */
+			}
+		}
+		/* Loop over offset and validate data in predecessor array. */
+		for (OffsetNumber currentoffnum = FirstOffsetNumber; currentoffnum <= maxoff;
+			 currentoffnum = OffsetNumberNext(currentoffnum))
+		{
+			HeapTupleHeader pred_htup;
+			HeapTupleHeader	curr_htup;
+			TransactionId pred_xmin;
+			TransactionId curr_xmin;
+			CommandId	pred_cmin;
+			CommandId	curr_cmin;
+			ItemId		pred_lp;
+			ItemId		curr_lp;
+
+			ctx.offnum = predecessor[currentoffnum];
+			ctx.attnum = -1;
+
+			if (ctx.offnum == 0)
+			{
+				/*
+				 * Either root of chain or xmin-aborted tuple from an
+				 * abandoned portion of a HOT chain.
+				 */
 				continue;
 			}
-			if (ctx.lp_len < MAXALIGN(SizeofHeapTupleHeader))
+
+			/*
+			 * Validation via predecessor array: 1) If the predecessor’s
+			 * xmin is aborted or in progress, the current tuples xmin should
+			 * be aborted or in progress respectively. Also Both xmin must be
+			 * equal. 2) If the predecessor’s xmin is not frozen, then
+			 * current tuple’s shouldn’t be either. 3) If the
+			 * predecessor’s xmin is equal to the current tuple’s xmin,
+			 * the current tuple’s cmin should be greater than
+			 * predecessor’s cmin. 4) If the current tuple is not HOT then
+			 * its predecessor’s tuple must not be HEAP_HOT_UPDATED. 5) If
+			 * the current Tuple is HOT then its predecessor’s tuple must be
+			 * HEAP_HOT_UPDATED.
+			 */
+			curr_lp = PageGetItemId(ctx.page, currentoffnum);
+			curr_htup = (HeapTupleHeader) PageGetItem(ctx.page, curr_lp);
+			curr_xmin = HeapTupleHeaderGetXmin(curr_htup);
+			curr_cmin = HeapTupleHeaderGetRawCommandId(curr_htup);
+
+			ctx.itemid = pred_lp = PageGetItemId(ctx.page, ctx.offnum);
+			pred_htup = (HeapTupleHeader) PageGetItem(ctx.page, pred_lp);
+			pred_xmin = HeapTupleHeaderGetXmin(pred_htup);
+			pred_cmin = HeapTupleHeaderGetRawCommandId(curr_htup);
+
+			if ((TransactionIdDidAbort(pred_xmin) || TransactionIdIsInProgress(pred_xmin))
+				&& !TransactionIdEquals(pred_xmin, curr_xmin))
 			{
 				report_corruption(&ctx,
-								  psprintf("line pointer length %u is less than the minimum tuple header size %u",
-										   ctx.lp_len,
-										   (unsigned) MAXALIGN(SizeofHeapTupleHeader)));
-				continue;
+								  psprintf("tuple with uncommitted xmin %u was updated to produce a tuple at offset %u with differing xmin %u",
+										   (unsigned) pred_xmin, (unsigned) currentoffnum, (unsigned) curr_xmin));
 			}
-			if (ctx.lp_off + ctx.lp_len > BLCKSZ)
+			if (pred_xmin != FrozenTransactionId && curr_xmin == FrozenTransactionId)
 			{
 				report_corruption(&ctx,
-								  psprintf("line pointer to page offset %u with length %u ends beyond maximum page offset %u",
-										   ctx.lp_off,
-										   ctx.lp_len,
-										   (unsigned) BLCKSZ));
-				continue;
+								  psprintf("unfrozen tuple was updated to produce a tuple at offset %u which is not frozen",
+										   (unsigned) currentoffnum));
+			}
+			if (pred_xmin == curr_xmin && pred_cmin >= curr_cmin)
+			{
+				report_corruption(&ctx,
+								  psprintf("tuple with xmin %u has cmin %u was updated to produce a tuple at offset %u with same xmin but has cmin %u",
+										   (unsigned) pred_xmin, (unsigned) pred_cmin, (unsigned) currentoffnum, (unsigned) curr_cmin));
+			}
+			if (HeapTupleHeaderIsHeapOnly(curr_htup) &&
+				!HeapTupleHeaderIsHotUpdated(pred_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("non-heap-only update produced a heap-only tuple at offset %u",
+										   (unsigned) currentoffnum));
+			}
+			if (!HeapTupleHeaderIsHeapOnly(curr_htup) &&
+				HeapTupleHeaderIsHotUpdated(pred_htup))
+			{
+				report_corruption(&ctx,
+								  psprintf("heap-only update produced a non-heap only tuple at offset %u",
+										   (unsigned) currentoffnum));
 			}
-
-			/* It should be safe to examine the tuple's header, at least */
-			ctx.tuphdr = (HeapTupleHeader) PageGetItem(ctx.page, ctx.itemid);
-			ctx.natts = HeapTupleHeaderGetNatts(ctx.tuphdr);
-
-			/* Ok, ready to check this next tuple */
-			check_tuple(&ctx);
 		}
 
 		/* clean up */
@@ -612,6 +752,43 @@ report_toast_corruption(HeapCheckContext *ctx, ToastedAttribute *ta,
 	ctx->is_corrupt = true;
 }
 
+
+/*
+ * Check for line pointer corruption.
+ */
+static bool check_lp(HeapCheckContext *ctx, uint16 lp_len, uint16 lp_off)
+{
+	bool		result = true;
+
+	if (lp_off != MAXALIGN(lp_off))
+	{
+		report_corruption(ctx,
+						  psprintf("line pointer to page offset %u is not maximally aligned",
+								   lp_off));
+		result = false;
+	}
+	if (lp_len < MAXALIGN(SizeofHeapTupleHeader))
+	{
+		report_corruption(ctx,
+						  psprintf("line pointer length %u is less than the minimum tuple header size %u",
+								   lp_len,
+								   (unsigned) MAXALIGN(SizeofHeapTupleHeader)));
+		result = false;
+	}
+	if (lp_off + lp_len > BLCKSZ)
+	{
+		report_corruption(ctx,
+						  psprintf("line pointer to page offset %u with length %u ends beyond maximum page offset %u",
+								   lp_off,
+								   lp_len,
+								   (unsigned) BLCKSZ));
+		result = false;
+	}
+
+	return result;
+
+}
+
 /*
  * Check for tuple header corruption.
  *
@@ -640,6 +817,7 @@ check_tuple_header(HeapCheckContext *ctx)
 {
 	HeapTupleHeader tuphdr = ctx->tuphdr;
 	uint16		infomask = tuphdr->t_infomask;
+	TransactionId curr_xmax = HeapTupleHeaderGetUpdateXid(tuphdr);
 	bool		result = true;
 	unsigned	expected_hoff;
 
@@ -651,6 +829,12 @@ check_tuple_header(HeapCheckContext *ctx)
 		result = false;
 	}
 
+	if (!TransactionIdIsValid(curr_xmax) && HeapTupleHeaderIsHotUpdated(tuphdr))
+	{
+		report_corruption(ctx,
+						  psprintf("tuple has been updated, but xmax is 0"));
+		result = false;
+	}
 	if ((ctx->tuphdr->t_infomask & HEAP_XMAX_COMMITTED) &&
 		(ctx->tuphdr->t_infomask & HEAP_XMAX_IS_MULTI))
 	{
-- 
2.25.1

