From f12b1105e4a0c356083a8dbae81e6a2c11da7bbe Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 5 Apr 2019 22:01:51 -0700
Subject: [PATCH] Convert gist to compute page level xid horizon on primary.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/gist/gist.c         |   8 +-
 src/backend/access/gist/gistxlog.c     | 153 +------------------------
 src/backend/access/rmgrdesc/gistdesc.c |   7 +-
 src/include/access/gist_private.h      |   2 +-
 src/include/access/gistxlog.h          |   3 +-
 src/include/access/xlog_internal.h     |   2 +-
 6 files changed, 18 insertions(+), 157 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 2db790c840c..769aca42e3b 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -1616,6 +1616,7 @@ gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
 	int			ndeletable = 0;
 	OffsetNumber offnum,
 				maxoff;
+	TransactionId latestRemovedXid = InvalidTransactionId;
 
 	Assert(GistPageIsLeaf(page));
 
@@ -1634,6 +1635,11 @@ gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
 			deletable[ndeletable++] = offnum;
 	}
 
+	if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
+		latestRemovedXid =
+			index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
+												 deletable, ndeletable);
+
 	if (ndeletable > 0)
 	{
 		START_CRIT_SECTION();
@@ -1658,7 +1664,7 @@ gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
 
 			recptr = gistXLogDelete(buffer,
 									deletable, ndeletable,
-									heapRel->rd_node);
+									latestRemovedXid);
 
 			PageSetLSN(page, recptr);
 		}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 4fb1855e890..503db34d863 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -165,152 +165,6 @@ gistRedoPageUpdateRecord(XLogReaderState *record)
 		UnlockReleaseBuffer(buffer);
 }
 
-/*
- * Get the latestRemovedXid from the heap pages pointed at by the index
- * tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
- * on which this function is based.
- */
-static TransactionId
-gistRedoDeleteRecordGetLatestRemovedXid(XLogReaderState *record)
-{
-	gistxlogDelete *xlrec = (gistxlogDelete *) XLogRecGetData(record);
-	OffsetNumber *todelete;
-	Buffer		ibuffer,
-				hbuffer;
-	Page		ipage,
-				hpage;
-	RelFileNode rnode;
-	BlockNumber blkno;
-	ItemId		iitemid,
-				hitemid;
-	IndexTuple	itup;
-	HeapTupleHeader htuphdr;
-	BlockNumber hblkno;
-	OffsetNumber hoffnum;
-	TransactionId latestRemovedXid = InvalidTransactionId;
-	int			i;
-
-	/*
-	 * If there's nothing running on the standby we don't need to derive a
-	 * full latestRemovedXid value, so use a fast path out of here.  This
-	 * returns InvalidTransactionId, and so will conflict with all HS
-	 * transactions; but since we just worked out that that's zero people,
-	 * it's OK.
-	 *
-	 * XXX There is a race condition here, which is that a new backend might
-	 * start just after we look.  If so, it cannot need to conflict, but this
-	 * coding will result in throwing a conflict anyway.
-	 */
-	if (CountDBBackends(InvalidOid) == 0)
-		return latestRemovedXid;
-
-	/*
-	 * In what follows, we have to examine the previous state of the index
-	 * page, as well as the heap page(s) it points to.  This is only valid if
-	 * WAL replay has reached a consistent database state; which means that
-	 * the preceding check is not just an optimization, but is *necessary*. We
-	 * won't have let in any user sessions before we reach consistency.
-	 */
-	if (!reachedConsistency)
-		elog(PANIC, "gistRedoDeleteRecordGetLatestRemovedXid: cannot operate with inconsistent data");
-
-	/*
-	 * Get index page.  If the DB is consistent, this should not fail, nor
-	 * should any of the heap page fetches below.  If one does, we return
-	 * InvalidTransactionId to cancel all HS transactions.  That's probably
-	 * overkill, but it's safe, and certainly better than panicking here.
-	 */
-	XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
-	ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
-	if (!BufferIsValid(ibuffer))
-		return InvalidTransactionId;
-	LockBuffer(ibuffer, BUFFER_LOCK_EXCLUSIVE);
-	ipage = (Page) BufferGetPage(ibuffer);
-
-	/*
-	 * Loop through the deleted index items to obtain the TransactionId from
-	 * the heap items they point to.
-	 */
-	todelete = (OffsetNumber *) ((char *) xlrec + SizeOfGistxlogDelete);
-
-	for (i = 0; i < xlrec->ntodelete; i++)
-	{
-		/*
-		 * Identify the index tuple about to be deleted
-		 */
-		iitemid = PageGetItemId(ipage, todelete[i]);
-		itup = (IndexTuple) PageGetItem(ipage, iitemid);
-
-		/*
-		 * Locate the heap page that the index tuple points at
-		 */
-		hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
-		hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL);
-		if (!BufferIsValid(hbuffer))
-		{
-			UnlockReleaseBuffer(ibuffer);
-			return InvalidTransactionId;
-		}
-		LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
-		hpage = (Page) BufferGetPage(hbuffer);
-
-		/*
-		 * Look up the heap tuple header that the index tuple points at by
-		 * using the heap node supplied with the xlrec. We can't use
-		 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
-		 * Note that we are not looking at tuple data here, just headers.
-		 */
-		hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
-		hitemid = PageGetItemId(hpage, hoffnum);
-
-		/*
-		 * Follow any redirections until we find something useful.
-		 */
-		while (ItemIdIsRedirected(hitemid))
-		{
-			hoffnum = ItemIdGetRedirect(hitemid);
-			hitemid = PageGetItemId(hpage, hoffnum);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * If the heap item has storage, then read the header and use that to
-		 * set latestRemovedXid.
-		 *
-		 * Some LP_DEAD items may not be accessible, so we ignore them.
-		 */
-		if (ItemIdHasStorage(hitemid))
-		{
-			htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
-
-			HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
-		}
-		else if (ItemIdIsDead(hitemid))
-		{
-			/*
-			 * Conjecture: if hitemid is dead then it had xids before the xids
-			 * marked on LP_NORMAL items. So we just ignore this item and move
-			 * onto the next, for the purposes of calculating
-			 * latestRemovedxids.
-			 */
-		}
-		else
-			Assert(!ItemIdIsUsed(hitemid));
-
-		UnlockReleaseBuffer(hbuffer);
-	}
-
-	UnlockReleaseBuffer(ibuffer);
-
-	/*
-	 * If all heap tuples were LP_DEAD then we will be returning
-	 * InvalidTransactionId here, which avoids conflicts. This matches
-	 * existing logic which assumes that LP_DEAD tuples must already be older
-	 * than the latestRemovedXid on the cleanup record that set them as
-	 * LP_DEAD, hence must already have generated a conflict.
-	 */
-	return latestRemovedXid;
-}
 
 /*
  * redo delete on gist index page to remove tuples marked as DEAD during index
@@ -337,12 +191,11 @@ gistRedoDeleteRecord(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 	{
-		TransactionId latestRemovedXid = gistRedoDeleteRecordGetLatestRemovedXid(record);
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -800,12 +653,12 @@ gistXLogUpdate(Buffer buffer,
  */
 XLogRecPtr
 gistXLogDelete(Buffer buffer, OffsetNumber *todelete, int ntodelete,
-			   RelFileNode hnode)
+			   TransactionId latestRemovedXid)
 {
 	gistxlogDelete xlrec;
 	XLogRecPtr	recptr;
 
-	xlrec.hnode = hnode;
+	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.ntodelete = ntodelete;
 
 	XLogBeginInsert();
diff --git a/src/backend/access/rmgrdesc/gistdesc.c b/src/backend/access/rmgrdesc/gistdesc.c
index eb308c72d6b..767864b58e6 100644
--- a/src/backend/access/rmgrdesc/gistdesc.c
+++ b/src/backend/access/rmgrdesc/gistdesc.c
@@ -33,8 +33,11 @@ out_gistxlogPageReuse(StringInfo buf, gistxlogPageReuse *xlrec)
 }
 
 static void
-out_gistxlogDelete(StringInfo buf, gistxlogPageUpdate *xlrec)
+out_gistxlogDelete(StringInfo buf, gistxlogDelete *xlrec)
 {
+	appendStringInfo(buf, "delete: latestRemovedXid %u, nitems: %u",
+					 xlrec->latestRemovedXid, xlrec->ntodelete);
+
 }
 
 static void
@@ -66,7 +69,7 @@ gist_desc(StringInfo buf, XLogReaderState *record)
 			out_gistxlogPageReuse(buf, (gistxlogPageReuse *) rec);
 			break;
 		case XLOG_GIST_DELETE:
-			out_gistxlogDelete(buf, (gistxlogPageUpdate *) rec);
+			out_gistxlogDelete(buf, (gistxlogDelete *) rec);
 			break;
 		case XLOG_GIST_PAGE_SPLIT:
 			out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 78e2e3fb312..ccf050cd62d 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -431,7 +431,7 @@ extern XLogRecPtr gistXLogUpdate(Buffer buffer,
 			   Buffer leftchild);
 
 extern XLogRecPtr gistXLogDelete(Buffer buffer, OffsetNumber *todelete,
-			   int ntodelete, RelFileNode hnode);
+			   int ntodelete, TransactionId latestRemovedXid);
 
 extern XLogRecPtr gistXLogSplit(bool page_is_leaf,
 			  SplitedPageLayout *dist,
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 9990d97cbd3..e66b034d7b8 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -47,8 +47,7 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
-	RelFileNode hnode;			/* RelFileNode of the heap the index currently
-								 * points at */
+	TransactionId latestRemovedXid;
 	uint16		ntodelete;		/* number of deleted offsets */
 
 	/*
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 8b1348c36db..39a474c4996 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD100	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD101	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
-- 
2.21.0.dirty

