From 54266c60d6d0186f6ff01807ce8315a61b60ce7d Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 19 Jan 2019 23:45:07 -0800
Subject: [PATCH v18 10/18] tableam: index builds.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 contrib/amcheck/verify_nbtree.c          |   6 +-
 contrib/bloom/blinsert.c                 |   9 +-
 src/backend/access/brin/brin.c           |  18 +-
 src/backend/access/gin/gininsert.c       |   6 +-
 src/backend/access/gist/gistbuild.c      |   8 +-
 src/backend/access/hash/hash.c           |   8 +-
 src/backend/access/heap/heapam_handler.c | 759 ++++++++++++++++++++
 src/backend/access/nbtree/nbtsort.c      |  14 +-
 src/backend/access/spgist/spginsert.c    |   9 +-
 src/backend/catalog/index.c              | 861 +----------------------
 src/include/access/tableam.h             |  89 +++
 src/include/catalog/index.h              |  71 +-
 12 files changed, 950 insertions(+), 908 deletions(-)

diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c
index bb6442de82d..cd261983690 100644
--- a/contrib/amcheck/verify_nbtree.c
+++ b/contrib/amcheck/verify_nbtree.c
@@ -23,9 +23,9 @@
  */
 #include "postgres.h"
 
-#include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/table.h"
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -535,8 +535,8 @@ bt_check_every_level(Relation rel, Relation heaprel, bool readonly,
 			 RelationGetRelationName(state->rel),
 			 RelationGetRelationName(state->heaprel));
 
-		IndexBuildHeapScan(state->heaprel, state->rel, indexinfo, true,
-						   bt_tuple_present_callback, (void *) state, scan);
+		table_index_build_scan(state->heaprel, state->rel, indexinfo, true,
+							   bt_tuple_present_callback, (void *) state, scan);
 
 		ereport(DEBUG1,
 				(errmsg_internal("finished verifying presence of " INT64_FORMAT " tuples from table \"%s\" with bitset %.2f%% set",
diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c
index e43fbe0005f..3b704312665 100644
--- a/contrib/bloom/blinsert.c
+++ b/contrib/bloom/blinsert.c
@@ -14,6 +14,7 @@
 
 #include "access/genam.h"
 #include "access/generic_xlog.h"
+#include "access/tableam.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
@@ -69,7 +70,7 @@ initCachedPage(BloomBuildState *buildstate)
 }
 
 /*
- * Per-tuple callback from IndexBuildHeapScan.
+ * Per-tuple callback from table_index_build_scan.
  */
 static void
 bloomBuildCallback(Relation index, HeapTuple htup, Datum *values,
@@ -141,9 +142,9 @@ blbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	initCachedPage(&buildstate);
 
 	/* Do the heap scan */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-								   bloomBuildCallback, (void *) &buildstate,
-								   NULL);
+	reltuples = table_index_build_scan(heap, index, indexInfo, true,
+									   bloomBuildCallback, (void *) &buildstate,
+									   NULL);
 
 	/* Flush last page if needed (it will be, unless heap was empty) */
 	if (buildstate.count > 0)
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 8f008dd0080..6e96d24ca22 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -23,6 +23,7 @@
 #include "access/reloptions.h"
 #include "access/relscan.h"
 #include "access/table.h"
+#include "access/tableam.h"
 #include "access/xloginsert.h"
 #include "catalog/index.h"
 #include "catalog/pg_am.h"
@@ -587,7 +588,7 @@ brinendscan(IndexScanDesc scan)
 }
 
 /*
- * Per-heap-tuple callback for IndexBuildHeapScan.
+ * Per-heap-tuple callback for table_index_build_scan.
  *
  * Note we don't worry about the page range at the end of the table here; it is
  * present in the build state struct after we're called the last time, but not
@@ -718,8 +719,8 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	 * Now scan the relation.  No syncscan allowed here because we want the
 	 * heap blocks in physical order.
 	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-								   brinbuildCallback, (void *) state, NULL);
+	reltuples = table_index_build_scan(heap, index, indexInfo, false,
+									   brinbuildCallback, (void *) state, NULL);
 
 	/* process the final batch */
 	form_and_insert_tuple(state);
@@ -1230,13 +1231,14 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
 	 * short of brinbuildCallback creating the new index entry.
 	 *
 	 * Note that it is critical we use the "any visible" mode of
-	 * IndexBuildHeapRangeScan here: otherwise, we would miss tuples inserted
-	 * by transactions that are still in progress, among other corner cases.
+	 * table_index_build_range_scan here: otherwise, we would miss tuples
+	 * inserted by transactions that are still in progress, among other corner
+	 * cases.
 	 */
 	state->bs_currRangeStart = heapBlk;
-	IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
-							heapBlk, scanNumBlks,
-							brinbuildCallback, (void *) state, NULL);
+	table_index_build_range_scan(heapRel, state->bs_irel, indexInfo, false, true,
+								 heapBlk, scanNumBlks,
+								 brinbuildCallback, (void *) state, NULL);
 
 	/*
 	 * Now we update the values obtained by the scan with the placeholder
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 524ac5be8b5..b02f69b0dcb 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -17,6 +17,7 @@
 #include "access/gin_private.h"
 #include "access/ginxlog.h"
 #include "access/xloginsert.h"
+#include "access/tableam.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
@@ -394,8 +395,9 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	 * Do the heap scan.  We disallow sync scan here because dataPlaceToPage
 	 * prefers to receive tuples in TID order.
 	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
-								   ginBuildCallback, (void *) &buildstate, NULL);
+	reltuples = table_index_build_scan(heap, index, indexInfo, false,
+									   ginBuildCallback, (void *) &buildstate,
+									   NULL);
 
 	/* dump remaining entries to the index */
 	oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index bd142a3560d..b76a587e945 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -19,6 +19,7 @@
 #include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/gistxlog.h"
+#include "access/tableam.h"
 #include "access/xloginsert.h"
 #include "catalog/index.h"
 #include "miscadmin.h"
@@ -204,8 +205,9 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	/*
 	 * Do the heap scan.
 	 */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-								   gistBuildCallback, (void *) &buildstate, NULL);
+	reltuples = table_index_build_scan(heap, index, indexInfo, true,
+									   gistBuildCallback,
+									   (void *) &buildstate, NULL);
 
 	/*
 	 * If buffering was used, flush out all the tuples that are still in the
@@ -454,7 +456,7 @@ calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep)
 }
 
 /*
- * Per-tuple callback from IndexBuildHeapScan.
+ * Per-tuple callback from table_index_build_scan.
  */
 static void
 gistBuildCallback(Relation index,
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index f1f01a0956d..38747ba3d7a 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -21,6 +21,7 @@
 #include "access/hash.h"
 #include "access/hash_xlog.h"
 #include "access/relscan.h"
+#include "access/tableam.h"
 #include "catalog/index.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
@@ -159,8 +160,9 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.heapRel = heap;
 
 	/* do the heap scan */
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-								   hashbuildCallback, (void *) &buildstate, NULL);
+	reltuples = table_index_build_scan(heap, index, indexInfo, true,
+									   hashbuildCallback,
+									   (void *) &buildstate, NULL);
 
 	if (buildstate.spool)
 	{
@@ -190,7 +192,7 @@ hashbuildempty(Relation index)
 }
 
 /*
- * Per-tuple callback from IndexBuildHeapScan
+ * Per-tuple callback from table_index_build_scan
  */
 static void
 hashbuildCallback(Relation index,
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 3098cb96b60..1bf08b47250 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -19,11 +19,19 @@
  */
 #include "postgres.h"
 
+#include "miscadmin.h"
+
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/catalog.h"
+#include "catalog/index.h"
+#include "executor/executor.h"
 #include "storage/bufmgr.h"
+#include "storage/bufpage.h"
 #include "storage/lmgr.h"
+#include "storage/procarray.h"
 #include "utils/builtins.h"
 
 
@@ -558,6 +566,754 @@ heapam_finish_bulk_insert(Relation relation, int options)
  * ------------------------------------------------------------------------
  */
 
+/*
+ * PBORKED: Update comment.
+ *
+ * As above, except that instead of scanning the complete heap, only the given
+ * number of blocks are scanned.  Scan to end-of-rel can be signalled by
+ * passing InvalidBlockNumber as numblocks.  Note that restricting the range
+ * to scan cannot be done when requesting syncscan.
+ *
+ * When "anyvisible" mode is requested, all tuples visible to any transaction
+ * are indexed and counted as live, including those inserted or deleted by
+ * transactions that are still in progress.
+ */
+static double
+heapam_index_build_range_scan(Relation heapRelation,
+							  Relation indexRelation,
+							  IndexInfo *indexInfo,
+							  bool allow_sync,
+							  bool anyvisible,
+							  BlockNumber start_blockno,
+							  BlockNumber numblocks,
+							  IndexBuildCallback callback,
+							  void *callback_state,
+							  TableScanDesc sscan)
+{
+	HeapScanDesc scan = (HeapScanDesc) sscan;
+	bool		is_system_catalog;
+	bool		checking_uniqueness;
+	HeapTuple	heapTuple;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	double		reltuples;
+	ExprState  *predicate;
+	TupleTableSlot *slot;
+	EState	   *estate;
+	ExprContext *econtext;
+	Snapshot	snapshot;
+	bool		need_unregister_snapshot = false;
+	TransactionId OldestXmin;
+	BlockNumber root_blkno = InvalidBlockNumber;
+	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
+
+	/*
+	 * sanity checks
+	 */
+	Assert(OidIsValid(indexRelation->rd_rel->relam));
+
+	/* Remember if it's a system catalog */
+	is_system_catalog = IsSystemRelation(heapRelation);
+
+	/* See whether we're verifying uniqueness/exclusion properties */
+	checking_uniqueness = (indexInfo->ii_Unique ||
+						   indexInfo->ii_ExclusionOps != NULL);
+
+	/*
+	 * "Any visible" mode is not compatible with uniqueness checks; make sure
+	 * only one of those is requested.
+	 */
+	Assert(!(anyvisible && checking_uniqueness));
+
+	/*
+	 * Need an EState for evaluation of index expressions and partial-index
+	 * predicates.  Also a slot to hold the current tuple.
+	 */
+	estate = CreateExecutorState();
+	econtext = GetPerTupleExprContext(estate);
+	slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation),
+									&TTSOpsHeapTuple);
+
+	/* Arrange for econtext's scan tuple to be the tuple under test */
+	econtext->ecxt_scantuple = slot;
+
+	/* Set up execution state for predicate, if any. */
+	predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+	 * concurrent build, or during bootstrap, we take a regular MVCC snapshot
+	 * and index whatever's live according to that.
+	 */
+	OldestXmin = InvalidTransactionId;
+
+	/* okay to ignore lazy VACUUMs here */
+	if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
+		OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+
+	if (!scan)
+	{
+		/*
+		 * Serial index build.
+		 *
+		 * Must begin our own heap scan in this case.  We may also need to
+		 * register a snapshot whose lifetime is under our direct control.
+		 */
+		if (!TransactionIdIsValid(OldestXmin))
+		{
+			snapshot = RegisterSnapshot(GetTransactionSnapshot());
+			need_unregister_snapshot = true;
+		}
+		else
+			snapshot = SnapshotAny;
+
+		sscan = table_beginscan_strat(heapRelation, /* relation */
+									  snapshot, /* snapshot */
+									  0,	/* number of keys */
+									  NULL, /* scan key */
+									  true, /* buffer access strategy OK */
+									  allow_sync);	/* syncscan OK? */
+		scan = (HeapScanDesc) sscan;
+	}
+	else
+	{
+		/*
+		 * Parallel index build.
+		 *
+		 * Parallel case never registers/unregisters own snapshot.  Snapshot
+		 * is taken from parallel heap scan, and is SnapshotAny or an MVCC
+		 * snapshot, based on same criteria as serial case.
+		 */
+		Assert(!IsBootstrapProcessingMode());
+		Assert(allow_sync);
+		snapshot = scan->rs_base.rs_snapshot;
+	}
+
+	/*
+	 * Must call GetOldestXmin() with SnapshotAny.  Should never call
+	 * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
+	 * this for parallel builds, since ambuild routines that support parallel
+	 * builds must work these details out for themselves.)
+	 */
+	Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
+	Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
+		   !TransactionIdIsValid(OldestXmin));
+	Assert(snapshot == SnapshotAny || !anyvisible);
+
+	/* set our scan endpoints */
+	if (!allow_sync)
+		heap_setscanlimits(sscan, start_blockno, numblocks);
+	else
+	{
+		/* syncscan can only be requested on whole relation */
+		Assert(start_blockno == 0);
+		Assert(numblocks == InvalidBlockNumber);
+	}
+
+	reltuples = 0;
+
+	/*
+	 * Scan all tuples in the base relation.
+	 */
+	while ((heapTuple = heap_getnext(sscan, ForwardScanDirection)) != NULL)
+	{
+		bool		tupleIsAlive;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * When dealing with a HOT-chain of updated tuples, we want to index
+		 * the values of the live tuple (if any), but index it under the TID
+		 * of the chain's root tuple.  This approach is necessary to preserve
+		 * the HOT-chain structure in the heap. So we need to be able to find
+		 * the root item offset for every tuple that's in a HOT-chain.  When
+		 * first reaching a new page of the relation, call
+		 * heap_get_root_tuples() to build a map of root item offsets on the
+		 * page.
+		 *
+		 * It might look unsafe to use this information across buffer
+		 * lock/unlock.  However, we hold ShareLock on the table so no
+		 * ordinary insert/update/delete should occur; and we hold pin on the
+		 * buffer continuously while visiting the page, so no pruning
+		 * operation can occur either.
+		 *
+		 * Also, although our opinions about tuple liveness could change while
+		 * we scan the page (due to concurrent transaction commits/aborts),
+		 * the chain root locations won't, so this info doesn't need to be
+		 * rebuilt after waiting for another transaction.
+		 *
+		 * Note the implied assumption that there is no more than one live
+		 * tuple per HOT-chain --- else we could create more than one index
+		 * entry pointing to the same root tuple.
+		 */
+		if (scan->rs_cblock != root_blkno)
+		{
+			Page		page = BufferGetPage(scan->rs_cbuf);
+
+			LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+			heap_get_root_tuples(page, root_offsets);
+			LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+
+			root_blkno = scan->rs_cblock;
+		}
+
+		if (snapshot == SnapshotAny)
+		{
+			/* do our own time qual check */
+			bool		indexIt;
+			TransactionId xwait;
+
+	recheck:
+
+			/*
+			 * We could possibly get away with not locking the buffer here,
+			 * since caller should hold ShareLock on the relation, but let's
+			 * be conservative about it.  (This remark is still correct even
+			 * with HOT-pruning: our pin on the buffer prevents pruning.)
+			 */
+			LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+
+			/*
+			 * The criteria for counting a tuple as live in this block need to
+			 * match what analyze.c's acquire_sample_rows() does, otherwise
+			 * CREATE INDEX and ANALYZE may produce wildly different reltuples
+			 * values, e.g. when there are many recently-dead tuples.
+			 */
+			switch (HeapTupleSatisfiesVacuum(heapTuple, OldestXmin, scan->rs_cbuf))
+			{
+				case HEAPTUPLE_DEAD:
+					/* Definitely dead, we can ignore it */
+					indexIt = false;
+					tupleIsAlive = false;
+					break;
+				case HEAPTUPLE_LIVE:
+					/* Normal case, index and unique-check it */
+					indexIt = true;
+					tupleIsAlive = true;
+					/* Count it as live, too */
+					reltuples += 1;
+					break;
+				case HEAPTUPLE_RECENTLY_DEAD:
+
+					/*
+					 * If tuple is recently deleted then we must index it
+					 * anyway to preserve MVCC semantics.  (Pre-existing
+					 * transactions could try to use the index after we finish
+					 * building it, and may need to see such tuples.)
+					 *
+					 * However, if it was HOT-updated then we must only index
+					 * the live tuple at the end of the HOT-chain.  Since this
+					 * breaks semantics for pre-existing snapshots, mark the
+					 * index as unusable for them.
+					 *
+					 * We don't count recently-dead tuples in reltuples, even
+					 * if we index them; see acquire_sample_rows().
+					 */
+					if (HeapTupleIsHotUpdated(heapTuple))
+					{
+						indexIt = false;
+						/* mark the index as unsafe for old snapshots */
+						indexInfo->ii_BrokenHotChain = true;
+					}
+					else
+						indexIt = true;
+					/* In any case, exclude the tuple from unique-checking */
+					tupleIsAlive = false;
+					break;
+				case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+					/*
+					 * In "anyvisible" mode, this tuple is visible and we
+					 * don't need any further checks.
+					 */
+					if (anyvisible)
+					{
+						indexIt = true;
+						tupleIsAlive = true;
+						reltuples += 1;
+						break;
+					}
+
+					/*
+					 * Since caller should hold ShareLock or better, normally
+					 * the only way to see this is if it was inserted earlier
+					 * in our own transaction.  However, it can happen in
+					 * system catalogs, since we tend to release write lock
+					 * before commit there.  Give a warning if neither case
+					 * applies.
+					 */
+					xwait = HeapTupleHeaderGetXmin(heapTuple->t_data);
+					if (!TransactionIdIsCurrentTransactionId(xwait))
+					{
+						if (!is_system_catalog)
+							elog(WARNING, "concurrent insert in progress within table \"%s\"",
+								 RelationGetRelationName(heapRelation));
+
+						/*
+						 * If we are performing uniqueness checks, indexing
+						 * such a tuple could lead to a bogus uniqueness
+						 * failure.  In that case we wait for the inserting
+						 * transaction to finish and check again.
+						 */
+						if (checking_uniqueness)
+						{
+							/*
+							 * Must drop the lock on the buffer before we wait
+							 */
+							LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+							XactLockTableWait(xwait, heapRelation,
+											  &heapTuple->t_self,
+											  XLTW_InsertIndexUnique);
+							CHECK_FOR_INTERRUPTS();
+							goto recheck;
+						}
+					}
+					else
+					{
+						/*
+						 * For consistency with acquire_sample_rows(), count
+						 * HEAPTUPLE_INSERT_IN_PROGRESS tuples as live only
+						 * when inserted by our own transaction.
+						 */
+						reltuples += 1;
+					}
+
+					/*
+					 * We must index such tuples, since if the index build
+					 * commits then they're good.
+					 */
+					indexIt = true;
+					tupleIsAlive = true;
+					break;
+				case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+					/*
+					 * As with INSERT_IN_PROGRESS case, this is unexpected
+					 * unless it's our own deletion or a system catalog; but
+					 * in anyvisible mode, this tuple is visible.
+					 */
+					if (anyvisible)
+					{
+						indexIt = true;
+						tupleIsAlive = false;
+						reltuples += 1;
+						break;
+					}
+
+					xwait = HeapTupleHeaderGetUpdateXid(heapTuple->t_data);
+					if (!TransactionIdIsCurrentTransactionId(xwait))
+					{
+						if (!is_system_catalog)
+							elog(WARNING, "concurrent delete in progress within table \"%s\"",
+								 RelationGetRelationName(heapRelation));
+
+						/*
+						 * If we are performing uniqueness checks, assuming
+						 * the tuple is dead could lead to missing a
+						 * uniqueness violation.  In that case we wait for the
+						 * deleting transaction to finish and check again.
+						 *
+						 * Also, if it's a HOT-updated tuple, we should not
+						 * index it but rather the live tuple at the end of
+						 * the HOT-chain.  However, the deleting transaction
+						 * could abort, possibly leaving this tuple as live
+						 * after all, in which case it has to be indexed. The
+						 * only way to know what to do is to wait for the
+						 * deleting transaction to finish and check again.
+						 */
+						if (checking_uniqueness ||
+							HeapTupleIsHotUpdated(heapTuple))
+						{
+							/*
+							 * Must drop the lock on the buffer before we wait
+							 */
+							LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+							XactLockTableWait(xwait, heapRelation,
+											  &heapTuple->t_self,
+											  XLTW_InsertIndexUnique);
+							CHECK_FOR_INTERRUPTS();
+							goto recheck;
+						}
+
+						/*
+						 * Otherwise index it but don't check for uniqueness,
+						 * the same as a RECENTLY_DEAD tuple.
+						 */
+						indexIt = true;
+
+						/*
+						 * Count HEAPTUPLE_DELETE_IN_PROGRESS tuples as live,
+						 * if they were not deleted by the current
+						 * transaction.  That's what acquire_sample_rows()
+						 * does, and we want the behavior to be consistent.
+						 */
+						reltuples += 1;
+					}
+					else if (HeapTupleIsHotUpdated(heapTuple))
+					{
+						/*
+						 * It's a HOT-updated tuple deleted by our own xact.
+						 * We can assume the deletion will commit (else the
+						 * index contents don't matter), so treat the same as
+						 * RECENTLY_DEAD HOT-updated tuples.
+						 */
+						indexIt = false;
+						/* mark the index as unsafe for old snapshots */
+						indexInfo->ii_BrokenHotChain = true;
+					}
+					else
+					{
+						/*
+						 * It's a regular tuple deleted by our own xact. Index
+						 * it, but don't check for uniqueness nor count in
+						 * reltuples, the same as a RECENTLY_DEAD tuple.
+						 */
+						indexIt = true;
+					}
+					/* In any case, exclude the tuple from unique-checking */
+					tupleIsAlive = false;
+					break;
+				default:
+					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+					indexIt = tupleIsAlive = false; /* keep compiler quiet */
+					break;
+			}
+
+			LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+
+			if (!indexIt)
+				continue;
+		}
+		else
+		{
+			/* heap_getnext did the time qual check */
+			tupleIsAlive = true;
+			reltuples += 1;
+		}
+
+		MemoryContextReset(econtext->ecxt_per_tuple_memory);
+
+		/* Set up for predicate or expression evaluation */
+		ExecStoreHeapTuple(heapTuple, slot, false);
+
+		/*
+		 * In a partial index, discard tuples that don't satisfy the
+		 * predicate.
+		 */
+		if (predicate != NULL)
+		{
+			if (!ExecQual(predicate, econtext))
+				continue;
+		}
+
+		/*
+		 * For the current heap tuple, extract all the attributes we use in
+		 * this index, and note which are null.  This also performs evaluation
+		 * of any expressions needed.
+		 */
+		FormIndexDatum(indexInfo,
+					   slot,
+					   estate,
+					   values,
+					   isnull);
+
+		/*
+		 * You'd think we should go ahead and build the index tuple here, but
+		 * some index AMs want to do further processing on the data first.  So
+		 * pass the values[] and isnull[] arrays, instead.
+		 */
+
+		if (HeapTupleIsHeapOnly(heapTuple))
+		{
+			/*
+			 * For a heap-only tuple, pretend its TID is that of the root. See
+			 * src/backend/access/heap/README.HOT for discussion.
+			 */
+			HeapTupleData rootTuple;
+			OffsetNumber offnum;
+
+			rootTuple = *heapTuple;
+			offnum = ItemPointerGetOffsetNumber(&heapTuple->t_self);
+
+			if (!OffsetNumberIsValid(root_offsets[offnum - 1]))
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("failed to find parent tuple for heap-only tuple at (%u,%u) in table \"%s\"",
+										 ItemPointerGetBlockNumber(&heapTuple->t_self),
+										 offnum,
+										 RelationGetRelationName(heapRelation))));
+
+			ItemPointerSetOffsetNumber(&rootTuple.t_self,
+									   root_offsets[offnum - 1]);
+
+			/* Call the AM's callback routine to process the tuple */
+			callback(indexRelation, &rootTuple, values, isnull, tupleIsAlive,
+					 callback_state);
+		}
+		else
+		{
+			/* Call the AM's callback routine to process the tuple */
+			callback(indexRelation, heapTuple, values, isnull, tupleIsAlive,
+					 callback_state);
+		}
+	}
+
+	table_endscan(sscan);
+
+	/* we can now forget our snapshot, if set and registered by us */
+	if (need_unregister_snapshot)
+		UnregisterSnapshot(snapshot);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	FreeExecutorState(estate);
+
+	/* These may have been pointing to the now-gone estate */
+	indexInfo->ii_ExpressionsState = NIL;
+	indexInfo->ii_PredicateState = NULL;
+
+	return reltuples;
+}
+
+/*
+ * Second table scan for concurrent index build
+ *
+ * This has much code in common with IndexBuildHeapScan, but it's enough
+ * different that it seems cleaner to have two routines not one.
+ */
+static void
+heapam_index_validate_scan(Relation heapRelation,
+						   Relation indexRelation,
+						   IndexInfo *indexInfo,
+						   Snapshot snapshot,
+						   ValidateIndexState * state)
+{
+	TableScanDesc sscan;
+	HeapScanDesc scan;
+	HeapTuple	heapTuple;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	ExprState  *predicate;
+	TupleTableSlot *slot;
+	EState	   *estate;
+	ExprContext *econtext;
+	BlockNumber root_blkno = InvalidBlockNumber;
+	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
+	bool		in_index[MaxHeapTuplesPerPage];
+
+	/* state variables for the merge */
+	ItemPointer indexcursor = NULL;
+	ItemPointerData decoded;
+	bool		tuplesort_empty = false;
+
+	/*
+	 * sanity checks
+	 */
+	Assert(OidIsValid(indexRelation->rd_rel->relam));
+
+	/*
+	 * Need an EState for evaluation of index expressions and partial-index
+	 * predicates.  Also a slot to hold the current tuple.
+	 */
+	estate = CreateExecutorState();
+	econtext = GetPerTupleExprContext(estate);
+	slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation),
+									&TTSOpsHeapTuple);
+
+	/* Arrange for econtext's scan tuple to be the tuple under test */
+	econtext->ecxt_scantuple = slot;
+
+	/* Set up execution state for predicate, if any. */
+	predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+
+	/*
+	 * Prepare for scan of the base relation.  We need just those tuples
+	 * satisfying the passed-in reference snapshot.  We must disable syncscan
+	 * here, because it's critical that we read from block zero forward to
+	 * match the sorted TIDs.
+	 */
+	sscan = table_beginscan_strat(heapRelation, /* relation */
+								  snapshot, /* snapshot */
+								  0,	/* number of keys */
+								  NULL, /* scan key */
+								  true, /* buffer access strategy OK */
+								  false);	/* syncscan not OK */
+	scan = (HeapScanDesc) sscan;
+
+	/*
+	 * Scan all tuples matching the snapshot.
+	 */
+	while ((heapTuple = heap_getnext(sscan, ForwardScanDirection)) != NULL)
+	{
+		ItemPointer heapcursor = &heapTuple->t_self;
+		ItemPointerData rootTuple;
+		OffsetNumber root_offnum;
+
+		CHECK_FOR_INTERRUPTS();
+
+		state->htups += 1;
+
+		/*
+		 * As commented in IndexBuildHeapScan, we should index heap-only
+		 * tuples under the TIDs of their root tuples; so when we advance onto
+		 * a new heap page, build a map of root item offsets on the page.
+		 *
+		 * This complicates merging against the tuplesort output: we will
+		 * visit the live tuples in order by their offsets, but the root
+		 * offsets that we need to compare against the index contents might be
+		 * ordered differently.  So we might have to "look back" within the
+		 * tuplesort output, but only within the current page.  We handle that
+		 * by keeping a bool array in_index[] showing all the
+		 * already-passed-over tuplesort output TIDs of the current page. We
+		 * clear that array here, when advancing onto a new heap page.
+		 */
+		if (scan->rs_cblock != root_blkno)
+		{
+			Page		page = BufferGetPage(scan->rs_cbuf);
+
+			LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+			heap_get_root_tuples(page, root_offsets);
+			LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+
+			memset(in_index, 0, sizeof(in_index));
+
+			root_blkno = scan->rs_cblock;
+		}
+
+		/* Convert actual tuple TID to root TID */
+		rootTuple = *heapcursor;
+		root_offnum = ItemPointerGetOffsetNumber(heapcursor);
+
+		if (HeapTupleIsHeapOnly(heapTuple))
+		{
+			root_offnum = root_offsets[root_offnum - 1];
+			if (!OffsetNumberIsValid(root_offnum))
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("failed to find parent tuple for heap-only tuple at (%u,%u) in table \"%s\"",
+										 ItemPointerGetBlockNumber(heapcursor),
+										 ItemPointerGetOffsetNumber(heapcursor),
+										 RelationGetRelationName(heapRelation))));
+			ItemPointerSetOffsetNumber(&rootTuple, root_offnum);
+		}
+
+		/*
+		 * "merge" by skipping through the index tuples until we find or pass
+		 * the current root tuple.
+		 */
+		while (!tuplesort_empty &&
+			   (!indexcursor ||
+				ItemPointerCompare(indexcursor, &rootTuple) < 0))
+		{
+			Datum		ts_val;
+			bool		ts_isnull;
+
+			if (indexcursor)
+			{
+				/*
+				 * Remember index items seen earlier on the current heap page
+				 */
+				if (ItemPointerGetBlockNumber(indexcursor) == root_blkno)
+					in_index[ItemPointerGetOffsetNumber(indexcursor) - 1] = true;
+			}
+
+			tuplesort_empty = !tuplesort_getdatum(state->tuplesort, true,
+												  &ts_val, &ts_isnull, NULL);
+			Assert(tuplesort_empty || !ts_isnull);
+			if (!tuplesort_empty)
+			{
+				itemptr_decode(&decoded, DatumGetInt64(ts_val));
+				indexcursor = &decoded;
+
+				/* If int8 is pass-by-ref, free (encoded) TID Datum memory */
+#ifndef USE_FLOAT8_BYVAL
+				pfree(DatumGetPointer(ts_val));
+#endif
+			}
+			else
+			{
+				/* Be tidy */
+				indexcursor = NULL;
+			}
+		}
+
+		/*
+		 * If the tuplesort has overshot *and* we didn't see a match earlier,
+		 * then this tuple is missing from the index, so insert it.
+		 */
+		if ((tuplesort_empty ||
+			 ItemPointerCompare(indexcursor, &rootTuple) > 0) &&
+			!in_index[root_offnum - 1])
+		{
+			MemoryContextReset(econtext->ecxt_per_tuple_memory);
+
+			/* Set up for predicate or expression evaluation */
+			ExecStoreHeapTuple(heapTuple, slot, false);
+
+			/*
+			 * In a partial index, discard tuples that don't satisfy the
+			 * predicate.
+			 */
+			if (predicate != NULL)
+			{
+				if (!ExecQual(predicate, econtext))
+					continue;
+			}
+
+			/*
+			 * For the current heap tuple, extract all the attributes we use
+			 * in this index, and note which are null.  This also performs
+			 * evaluation of any expressions needed.
+			 */
+			FormIndexDatum(indexInfo,
+						   slot,
+						   estate,
+						   values,
+						   isnull);
+
+			/*
+			 * You'd think we should go ahead and build the index tuple here,
+			 * but some index AMs want to do further processing on the data
+			 * first. So pass the values[] and isnull[] arrays, instead.
+			 */
+
+			/*
+			 * If the tuple is already committed dead, you might think we
+			 * could suppress uniqueness checking, but this is no longer true
+			 * in the presence of HOT, because the insert is actually a proxy
+			 * for a uniqueness check on the whole HOT-chain.  That is, the
+			 * tuple we have here could be dead because it was already
+			 * HOT-updated, and if so the updating transaction will not have
+			 * thought it should insert index entries.  The index AM will
+			 * check the whole HOT-chain and correctly detect a conflict if
+			 * there is one.
+			 */
+
+			index_insert(indexRelation,
+						 values,
+						 isnull,
+						 &rootTuple,
+						 heapRelation,
+						 indexInfo->ii_Unique ?
+						 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
+						 indexInfo);
+
+			state->tups_inserted += 1;
+		}
+	}
+
+	table_endscan(sscan);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	FreeExecutorState(estate);
+
+	/* These may have been pointing to the now-gone estate */
+	indexInfo->ii_ExpressionsState = NIL;
+	indexInfo->ii_PredicateState = NULL;
+}
 static const TableAmRoutine heapam_methods = {
 	.type = T_TableAmRoutine,
 
@@ -589,6 +1345,9 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_fetch_row_version = heapam_fetch_row_version,
 	.tuple_get_latest_tid = heap_get_latest_tid,
 	.tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot,
+
+	.index_build_range_scan = heapam_index_build_range_scan,
+	.index_validate_scan = heapam_index_validate_scan,
 };
 
 
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 05a9b03aed5..dcdd142367b 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -471,9 +471,9 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
 
 	/* Fill spool using either serial or parallel heap scan */
 	if (!buildstate->btleader)
-		reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-									   _bt_build_callback, (void *) buildstate,
-									   NULL);
+		reltuples = table_index_build_scan(heap, index, indexInfo, true,
+										   _bt_build_callback, (void *) buildstate,
+										   NULL);
 	else
 		reltuples = _bt_parallel_heapscan(buildstate,
 										  &indexInfo->ii_BrokenHotChain);
@@ -548,7 +548,7 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 }
 
 /*
- * Per-tuple callback from IndexBuildHeapScan
+ * Per-tuple callback from table_index_build_scan
  */
 static void
 _bt_build_callback(Relation index,
@@ -1673,9 +1673,9 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	indexInfo = BuildIndexInfo(btspool->index);
 	indexInfo->ii_Concurrent = btshared->isconcurrent;
 	scan = table_beginscan_parallel(btspool->heap, &btshared->heapdesc);
-	reltuples = IndexBuildHeapScan(btspool->heap, btspool->index, indexInfo,
-								   true, _bt_build_callback,
-								   (void *) &buildstate, scan);
+	reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo,
+									   true, _bt_build_callback,
+									   (void *) &buildstate, scan);
 
 	/*
 	 * Execute this worker's part of the sort.
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index f428a151385..390ad9ac51f 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -19,6 +19,7 @@
 #include "access/genam.h"
 #include "access/spgist_private.h"
 #include "access/spgxlog.h"
+#include "access/tableam.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "catalog/index.h"
@@ -37,7 +38,7 @@ typedef struct
 } SpGistBuildState;
 
 
-/* Callback to process one heap tuple during IndexBuildHeapScan */
+/* Callback to process one heap tuple during table_index_build_scan */
 static void
 spgistBuildCallback(Relation index, HeapTuple htup, Datum *values,
 					bool *isnull, bool tupleIsAlive, void *state)
@@ -142,9 +143,9 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 											  "SP-GiST build temporary context",
 											  ALLOCSET_DEFAULT_SIZES);
 
-	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
-								   spgistBuildCallback, (void *) &buildstate,
-								   NULL);
+	reltuples = table_index_build_scan(heap, index, indexInfo, true,
+									   spgistBuildCallback, (void *) &buildstate,
+									   NULL);
 
 	MemoryContextDelete(buildstate.tmpCtx);
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index ff1a18c4d4e..f9ae483ab97 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -80,16 +80,6 @@
 /* Potentially set by pg_upgrade_support functions */
 Oid			binary_upgrade_next_index_pg_class_oid = InvalidOid;
 
-/* state info for validate_index bulkdelete callback */
-typedef struct
-{
-	Tuplesortstate *tuplesort;	/* for sorting the index TIDs */
-	/* statistics (for debug purposes only): */
-	double		htups,
-				itups,
-				tups_inserted;
-} v_i_state;
-
 /*
  * Pointer-free representation of variables used when reindexing system
  * catalogs; we use this to propagate those values to parallel workers.
@@ -130,14 +120,7 @@ static void index_update_stats(Relation rel,
 static void IndexCheckExclusion(Relation heapRelation,
 					Relation indexRelation,
 					IndexInfo *indexInfo);
-static inline int64 itemptr_encode(ItemPointer itemptr);
-static inline void itemptr_decode(ItemPointer itemptr, int64 encoded);
 static bool validate_index_callback(ItemPointer itemptr, void *opaque);
-static void validate_index_heapscan(Relation heapRelation,
-						Relation indexRelation,
-						IndexInfo *indexInfo,
-						Snapshot snapshot,
-						v_i_state *state);
 static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid);
 static void SetReindexProcessing(Oid heapOid, Oid indexOid);
 static void ResetReindexProcessing(void);
@@ -2401,557 +2384,6 @@ index_build(Relation heapRelation,
 	SetUserIdAndSecContext(save_userid, save_sec_context);
 }
 
-
-/*
- * IndexBuildHeapScan - scan the heap relation to find tuples to be indexed
- *
- * This is called back from an access-method-specific index build procedure
- * after the AM has done whatever setup it needs.  The parent heap relation
- * is scanned to find tuples that should be entered into the index.  Each
- * such tuple is passed to the AM's callback routine, which does the right
- * things to add it to the new index.  After we return, the AM's index
- * build procedure does whatever cleanup it needs.
- *
- * The total count of live heap tuples is returned.  This is for updating
- * pg_class statistics.  (It's annoying not to be able to do that here, but we
- * want to merge that update with others; see index_update_stats.)  Note that
- * the index AM itself must keep track of the number of index tuples; we don't
- * do so here because the AM might reject some of the tuples for its own
- * reasons, such as being unable to store NULLs.
- *
- * A side effect is to set indexInfo->ii_BrokenHotChain to true if we detect
- * any potentially broken HOT chains.  Currently, we set this if there are
- * any RECENTLY_DEAD or DELETE_IN_PROGRESS entries in a HOT chain, without
- * trying very hard to detect whether they're really incompatible with the
- * chain tip.
- */
-double
-IndexBuildHeapScan(Relation heapRelation,
-				   Relation indexRelation,
-				   IndexInfo *indexInfo,
-				   bool allow_sync,
-				   IndexBuildCallback callback,
-				   void *callback_state,
-				   TableScanDesc scan)
-{
-	return IndexBuildHeapRangeScan(heapRelation, indexRelation,
-								   indexInfo, allow_sync,
-								   false,
-								   0, InvalidBlockNumber,
-								   callback, callback_state, scan);
-}
-
-/*
- * As above, except that instead of scanning the complete heap, only the given
- * number of blocks are scanned.  Scan to end-of-rel can be signalled by
- * passing InvalidBlockNumber as numblocks.  Note that restricting the range
- * to scan cannot be done when requesting syncscan.
- *
- * When "anyvisible" mode is requested, all tuples visible to any transaction
- * are indexed and counted as live, including those inserted or deleted by
- * transactions that are still in progress.
- */
-double
-IndexBuildHeapRangeScan(Relation heapRelation,
-						Relation indexRelation,
-						IndexInfo *indexInfo,
-						bool allow_sync,
-						bool anyvisible,
-						BlockNumber start_blockno,
-						BlockNumber numblocks,
-						IndexBuildCallback callback,
-						void *callback_state,
-						TableScanDesc scan)
-{
-	HeapScanDesc hscan;
-	bool		is_system_catalog;
-	bool		checking_uniqueness;
-	HeapTuple	heapTuple;
-	Datum		values[INDEX_MAX_KEYS];
-	bool		isnull[INDEX_MAX_KEYS];
-	double		reltuples;
-	ExprState  *predicate;
-	TupleTableSlot *slot;
-	EState	   *estate;
-	ExprContext *econtext;
-	Snapshot	snapshot;
-	bool		need_unregister_snapshot = false;
-	TransactionId OldestXmin;
-	BlockNumber root_blkno = InvalidBlockNumber;
-	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
-
-	/*
-	 * sanity checks
-	 */
-	Assert(OidIsValid(indexRelation->rd_rel->relam));
-
-	/* Remember if it's a system catalog */
-	is_system_catalog = IsSystemRelation(heapRelation);
-
-	/* See whether we're verifying uniqueness/exclusion properties */
-	checking_uniqueness = (indexInfo->ii_Unique ||
-						   indexInfo->ii_ExclusionOps != NULL);
-
-	/*
-	 * "Any visible" mode is not compatible with uniqueness checks; make sure
-	 * only one of those is requested.
-	 */
-	Assert(!(anyvisible && checking_uniqueness));
-
-	/*
-	 * Need an EState for evaluation of index expressions and partial-index
-	 * predicates.  Also a slot to hold the current tuple.
-	 */
-	estate = CreateExecutorState();
-	econtext = GetPerTupleExprContext(estate);
-	slot = table_slot_create(heapRelation, NULL);
-
-	/* Arrange for econtext's scan tuple to be the tuple under test */
-	econtext->ecxt_scantuple = slot;
-
-	/* Set up execution state for predicate, if any. */
-	predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
-
-	/*
-	 * Prepare for scan of the base relation.  In a normal index build, we use
-	 * SnapshotAny because we must retrieve all tuples and do our own time
-	 * qual checks (because we have to index RECENTLY_DEAD tuples). In a
-	 * concurrent build, or during bootstrap, we take a regular MVCC snapshot
-	 * and index whatever's live according to that.
-	 */
-	OldestXmin = InvalidTransactionId;
-
-	/* okay to ignore lazy VACUUMs here */
-	if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
-		OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
-
-	if (!scan)
-	{
-		/*
-		 * Serial index build.
-		 *
-		 * Must begin our own heap scan in this case.  We may also need to
-		 * register a snapshot whose lifetime is under our direct control.
-		 */
-		if (!TransactionIdIsValid(OldestXmin))
-		{
-			snapshot = RegisterSnapshot(GetTransactionSnapshot());
-			need_unregister_snapshot = true;
-		}
-		else
-			snapshot = SnapshotAny;
-
-		scan = table_beginscan_strat(heapRelation,	/* relation */
-									 snapshot,	/* snapshot */
-									 0,	/* number of keys */
-									 NULL,	/* scan key */
-									 true,	/* buffer access strategy OK */
-									 allow_sync);	/* syncscan OK? */
-	}
-	else
-	{
-		/*
-		 * Parallel index build.
-		 *
-		 * Parallel case never registers/unregisters own snapshot.  Snapshot
-		 * is taken from parallel heap scan, and is SnapshotAny or an MVCC
-		 * snapshot, based on same criteria as serial case.
-		 */
-		Assert(!IsBootstrapProcessingMode());
-		Assert(allow_sync);
-		snapshot = scan->rs_snapshot;
-	}
-
-	hscan = (HeapScanDesc) scan;
-
-	/*
-	 * Must call GetOldestXmin() with SnapshotAny.  Should never call
-	 * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
-	 * this for parallel builds, since ambuild routines that support parallel
-	 * builds must work these details out for themselves.)
-	 */
-	Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
-	Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
-		   !TransactionIdIsValid(OldestXmin));
-	Assert(snapshot == SnapshotAny || !anyvisible);
-
-	/* set our scan endpoints */
-	if (!allow_sync)
-		heap_setscanlimits(scan, start_blockno, numblocks);
-	else
-	{
-		/* syncscan can only be requested on whole relation */
-		Assert(start_blockno == 0);
-		Assert(numblocks == InvalidBlockNumber);
-	}
-
-	reltuples = 0;
-
-	/*
-	 * Scan all tuples in the base relation.
-	 */
-	while ((heapTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-	{
-		bool		tupleIsAlive;
-
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * When dealing with a HOT-chain of updated tuples, we want to index
-		 * the values of the live tuple (if any), but index it under the TID
-		 * of the chain's root tuple.  This approach is necessary to preserve
-		 * the HOT-chain structure in the heap. So we need to be able to find
-		 * the root item offset for every tuple that's in a HOT-chain.  When
-		 * first reaching a new page of the relation, call
-		 * heap_get_root_tuples() to build a map of root item offsets on the
-		 * page.
-		 *
-		 * It might look unsafe to use this information across buffer
-		 * lock/unlock.  However, we hold ShareLock on the table so no
-		 * ordinary insert/update/delete should occur; and we hold pin on the
-		 * buffer continuously while visiting the page, so no pruning
-		 * operation can occur either.
-		 *
-		 * Also, although our opinions about tuple liveness could change while
-		 * we scan the page (due to concurrent transaction commits/aborts),
-		 * the chain root locations won't, so this info doesn't need to be
-		 * rebuilt after waiting for another transaction.
-		 *
-		 * Note the implied assumption that there is no more than one live
-		 * tuple per HOT-chain --- else we could create more than one index
-		 * entry pointing to the same root tuple.
-		 */
-		if (hscan->rs_cblock != root_blkno)
-		{
-			Page		page = BufferGetPage(hscan->rs_cbuf);
-
-			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
-			heap_get_root_tuples(page, root_offsets);
-			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-
-			root_blkno = hscan->rs_cblock;
-		}
-
-		if (snapshot == SnapshotAny)
-		{
-			/* do our own time qual check */
-			bool		indexIt;
-			TransactionId xwait;
-
-	recheck:
-
-			/*
-			 * We could possibly get away with not locking the buffer here,
-			 * since caller should hold ShareLock on the relation, but let's
-			 * be conservative about it.  (This remark is still correct even
-			 * with HOT-pruning: our pin on the buffer prevents pruning.)
-			 */
-			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
-
-			/*
-			 * The criteria for counting a tuple as live in this block need to
-			 * match what analyze.c's acquire_sample_rows() does, otherwise
-			 * CREATE INDEX and ANALYZE may produce wildly different reltuples
-			 * values, e.g. when there are many recently-dead tuples.
-			 */
-			switch (HeapTupleSatisfiesVacuum(heapTuple, OldestXmin,
-											 hscan->rs_cbuf))
-			{
-				case HEAPTUPLE_DEAD:
-					/* Definitely dead, we can ignore it */
-					indexIt = false;
-					tupleIsAlive = false;
-					break;
-				case HEAPTUPLE_LIVE:
-					/* Normal case, index and unique-check it */
-					indexIt = true;
-					tupleIsAlive = true;
-					/* Count it as live, too */
-					reltuples += 1;
-					break;
-				case HEAPTUPLE_RECENTLY_DEAD:
-
-					/*
-					 * If tuple is recently deleted then we must index it
-					 * anyway to preserve MVCC semantics.  (Pre-existing
-					 * transactions could try to use the index after we finish
-					 * building it, and may need to see such tuples.)
-					 *
-					 * However, if it was HOT-updated then we must only index
-					 * the live tuple at the end of the HOT-chain.  Since this
-					 * breaks semantics for pre-existing snapshots, mark the
-					 * index as unusable for them.
-					 *
-					 * We don't count recently-dead tuples in reltuples, even
-					 * if we index them; see acquire_sample_rows().
-					 */
-					if (HeapTupleIsHotUpdated(heapTuple))
-					{
-						indexIt = false;
-						/* mark the index as unsafe for old snapshots */
-						indexInfo->ii_BrokenHotChain = true;
-					}
-					else
-						indexIt = true;
-					/* In any case, exclude the tuple from unique-checking */
-					tupleIsAlive = false;
-					break;
-				case HEAPTUPLE_INSERT_IN_PROGRESS:
-
-					/*
-					 * In "anyvisible" mode, this tuple is visible and we
-					 * don't need any further checks.
-					 */
-					if (anyvisible)
-					{
-						indexIt = true;
-						tupleIsAlive = true;
-						reltuples += 1;
-						break;
-					}
-
-					/*
-					 * Since caller should hold ShareLock or better, normally
-					 * the only way to see this is if it was inserted earlier
-					 * in our own transaction.  However, it can happen in
-					 * system catalogs, since we tend to release write lock
-					 * before commit there.  Give a warning if neither case
-					 * applies.
-					 */
-					xwait = HeapTupleHeaderGetXmin(heapTuple->t_data);
-					if (!TransactionIdIsCurrentTransactionId(xwait))
-					{
-						if (!is_system_catalog)
-							elog(WARNING, "concurrent insert in progress within table \"%s\"",
-								 RelationGetRelationName(heapRelation));
-
-						/*
-						 * If we are performing uniqueness checks, indexing
-						 * such a tuple could lead to a bogus uniqueness
-						 * failure.  In that case we wait for the inserting
-						 * transaction to finish and check again.
-						 */
-						if (checking_uniqueness)
-						{
-							/*
-							 * Must drop the lock on the buffer before we wait
-							 */
-							LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-							XactLockTableWait(xwait, heapRelation,
-											  &heapTuple->t_self,
-											  XLTW_InsertIndexUnique);
-							CHECK_FOR_INTERRUPTS();
-							goto recheck;
-						}
-					}
-					else
-					{
-						/*
-						 * For consistency with acquire_sample_rows(), count
-						 * HEAPTUPLE_INSERT_IN_PROGRESS tuples as live only
-						 * when inserted by our own transaction.
-						 */
-						reltuples += 1;
-					}
-
-					/*
-					 * We must index such tuples, since if the index build
-					 * commits then they're good.
-					 */
-					indexIt = true;
-					tupleIsAlive = true;
-					break;
-				case HEAPTUPLE_DELETE_IN_PROGRESS:
-
-					/*
-					 * As with INSERT_IN_PROGRESS case, this is unexpected
-					 * unless it's our own deletion or a system catalog; but
-					 * in anyvisible mode, this tuple is visible.
-					 */
-					if (anyvisible)
-					{
-						indexIt = true;
-						tupleIsAlive = false;
-						reltuples += 1;
-						break;
-					}
-
-					xwait = HeapTupleHeaderGetUpdateXid(heapTuple->t_data);
-					if (!TransactionIdIsCurrentTransactionId(xwait))
-					{
-						if (!is_system_catalog)
-							elog(WARNING, "concurrent delete in progress within table \"%s\"",
-								 RelationGetRelationName(heapRelation));
-
-						/*
-						 * If we are performing uniqueness checks, assuming
-						 * the tuple is dead could lead to missing a
-						 * uniqueness violation.  In that case we wait for the
-						 * deleting transaction to finish and check again.
-						 *
-						 * Also, if it's a HOT-updated tuple, we should not
-						 * index it but rather the live tuple at the end of
-						 * the HOT-chain.  However, the deleting transaction
-						 * could abort, possibly leaving this tuple as live
-						 * after all, in which case it has to be indexed. The
-						 * only way to know what to do is to wait for the
-						 * deleting transaction to finish and check again.
-						 */
-						if (checking_uniqueness ||
-							HeapTupleIsHotUpdated(heapTuple))
-						{
-							/*
-							 * Must drop the lock on the buffer before we wait
-							 */
-							LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-							XactLockTableWait(xwait, heapRelation,
-											  &heapTuple->t_self,
-											  XLTW_InsertIndexUnique);
-							CHECK_FOR_INTERRUPTS();
-							goto recheck;
-						}
-
-						/*
-						 * Otherwise index it but don't check for uniqueness,
-						 * the same as a RECENTLY_DEAD tuple.
-						 */
-						indexIt = true;
-
-						/*
-						 * Count HEAPTUPLE_DELETE_IN_PROGRESS tuples as live,
-						 * if they were not deleted by the current
-						 * transaction.  That's what acquire_sample_rows()
-						 * does, and we want the behavior to be consistent.
-						 */
-						reltuples += 1;
-					}
-					else if (HeapTupleIsHotUpdated(heapTuple))
-					{
-						/*
-						 * It's a HOT-updated tuple deleted by our own xact.
-						 * We can assume the deletion will commit (else the
-						 * index contents don't matter), so treat the same as
-						 * RECENTLY_DEAD HOT-updated tuples.
-						 */
-						indexIt = false;
-						/* mark the index as unsafe for old snapshots */
-						indexInfo->ii_BrokenHotChain = true;
-					}
-					else
-					{
-						/*
-						 * It's a regular tuple deleted by our own xact. Index
-						 * it, but don't check for uniqueness nor count in
-						 * reltuples, the same as a RECENTLY_DEAD tuple.
-						 */
-						indexIt = true;
-					}
-					/* In any case, exclude the tuple from unique-checking */
-					tupleIsAlive = false;
-					break;
-				default:
-					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
-					indexIt = tupleIsAlive = false; /* keep compiler quiet */
-					break;
-			}
-
-			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-
-			if (!indexIt)
-				continue;
-		}
-		else
-		{
-			/* heap_getnext did the time qual check */
-			tupleIsAlive = true;
-			reltuples += 1;
-		}
-
-		MemoryContextReset(econtext->ecxt_per_tuple_memory);
-
-		/* Set up for predicate or expression evaluation */
-		ExecStoreBufferHeapTuple(heapTuple, slot, hscan->rs_cbuf);
-
-		/*
-		 * In a partial index, discard tuples that don't satisfy the
-		 * predicate.
-		 */
-		if (predicate != NULL)
-		{
-			if (!ExecQual(predicate, econtext))
-				continue;
-		}
-
-		/*
-		 * For the current heap tuple, extract all the attributes we use in
-		 * this index, and note which are null.  This also performs evaluation
-		 * of any expressions needed.
-		 */
-		FormIndexDatum(indexInfo,
-					   slot,
-					   estate,
-					   values,
-					   isnull);
-
-		/*
-		 * You'd think we should go ahead and build the index tuple here, but
-		 * some index AMs want to do further processing on the data first.  So
-		 * pass the values[] and isnull[] arrays, instead.
-		 */
-
-		if (HeapTupleIsHeapOnly(heapTuple))
-		{
-			/*
-			 * For a heap-only tuple, pretend its TID is that of the root. See
-			 * src/backend/access/heap/README.HOT for discussion.
-			 */
-			HeapTupleData rootTuple;
-			OffsetNumber offnum;
-
-			rootTuple = *heapTuple;
-			offnum = ItemPointerGetOffsetNumber(&heapTuple->t_self);
-
-			if (!OffsetNumberIsValid(root_offsets[offnum - 1]))
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg_internal("failed to find parent tuple for heap-only tuple at (%u,%u) in table \"%s\"",
-										 ItemPointerGetBlockNumber(&heapTuple->t_self),
-										 offnum,
-										 RelationGetRelationName(heapRelation))));
-
-			ItemPointerSetOffsetNumber(&rootTuple.t_self,
-									   root_offsets[offnum - 1]);
-
-			/* Call the AM's callback routine to process the tuple */
-			callback(indexRelation, &rootTuple, values, isnull, tupleIsAlive,
-					 callback_state);
-		}
-		else
-		{
-			/* Call the AM's callback routine to process the tuple */
-			callback(indexRelation, heapTuple, values, isnull, tupleIsAlive,
-					 callback_state);
-		}
-	}
-
-	table_endscan(scan);
-
-	/* we can now forget our snapshot, if set and registered by us */
-	if (need_unregister_snapshot)
-		UnregisterSnapshot(snapshot);
-
-	ExecDropSingleTupleTableSlot(slot);
-
-	FreeExecutorState(estate);
-
-	/* These may have been pointing to the now-gone estate */
-	indexInfo->ii_ExpressionsState = NIL;
-	indexInfo->ii_PredicateState = NULL;
-
-	return reltuples;
-}
-
-
 /*
  * IndexCheckExclusion - verify that a new exclusion constraint is satisfied
  *
@@ -3127,7 +2559,7 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot)
 				indexRelation;
 	IndexInfo  *indexInfo;
 	IndexVacuumInfo ivinfo;
-	v_i_state	state;
+	ValidateIndexState state;
 	Oid			save_userid;
 	int			save_sec_context;
 	int			save_nestlevel;
@@ -3188,11 +2620,11 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot)
 	/*
 	 * Now scan the heap and "merge" it with the index
 	 */
-	validate_index_heapscan(heapRelation,
-							indexRelation,
-							indexInfo,
-							snapshot,
-							&state);
+	table_index_validate_scan(heapRelation,
+							  indexRelation,
+							  indexInfo,
+							  snapshot,
+							  &state);
 
 	/* Done with tuplesort object */
 	tuplesort_end(state.tuplesort);
@@ -3212,53 +2644,13 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot)
 	table_close(heapRelation, NoLock);
 }
 
-/*
- * itemptr_encode - Encode ItemPointer as int64/int8
- *
- * This representation must produce values encoded as int64 that sort in the
- * same order as their corresponding original TID values would (using the
- * default int8 opclass to produce a result equivalent to the default TID
- * opclass).
- *
- * As noted in validate_index(), this can be significantly faster.
- */
-static inline int64
-itemptr_encode(ItemPointer itemptr)
-{
-	BlockNumber block = ItemPointerGetBlockNumber(itemptr);
-	OffsetNumber offset = ItemPointerGetOffsetNumber(itemptr);
-	int64		encoded;
-
-	/*
-	 * Use the 16 least significant bits for the offset.  32 adjacent bits are
-	 * used for the block number.  Since remaining bits are unused, there
-	 * cannot be negative encoded values (We assume a two's complement
-	 * representation).
-	 */
-	encoded = ((uint64) block << 16) | (uint16) offset;
-
-	return encoded;
-}
-
-/*
- * itemptr_decode - Decode int64/int8 representation back to ItemPointer
- */
-static inline void
-itemptr_decode(ItemPointer itemptr, int64 encoded)
-{
-	BlockNumber block = (BlockNumber) (encoded >> 16);
-	OffsetNumber offset = (OffsetNumber) (encoded & 0xFFFF);
-
-	ItemPointerSet(itemptr, block, offset);
-}
-
 /*
  * validate_index_callback - bulkdelete callback to collect the index TIDs
  */
 static bool
 validate_index_callback(ItemPointer itemptr, void *opaque)
 {
-	v_i_state  *state = (v_i_state *) opaque;
+	ValidateIndexState *state = (ValidateIndexState *) opaque;
 	int64		encoded = itemptr_encode(itemptr);
 
 	tuplesort_putdatum(state->tuplesort, Int64GetDatum(encoded), false);
@@ -3266,245 +2658,6 @@ validate_index_callback(ItemPointer itemptr, void *opaque)
 	return false;				/* never actually delete anything */
 }
 
-/*
- * validate_index_heapscan - second table scan for concurrent index build
- *
- * This has much code in common with IndexBuildHeapScan, but it's enough
- * different that it seems cleaner to have two routines not one.
- */
-static void
-validate_index_heapscan(Relation heapRelation,
-						Relation indexRelation,
-						IndexInfo *indexInfo,
-						Snapshot snapshot,
-						v_i_state *state)
-{
-	TableScanDesc scan;
-	HeapScanDesc hscan;
-	HeapTuple	heapTuple;
-	Datum		values[INDEX_MAX_KEYS];
-	bool		isnull[INDEX_MAX_KEYS];
-	ExprState  *predicate;
-	TupleTableSlot *slot;
-	EState	   *estate;
-	ExprContext *econtext;
-	BlockNumber root_blkno = InvalidBlockNumber;
-	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
-	bool		in_index[MaxHeapTuplesPerPage];
-
-	/* state variables for the merge */
-	ItemPointer indexcursor = NULL;
-	ItemPointerData decoded;
-	bool		tuplesort_empty = false;
-
-	/*
-	 * sanity checks
-	 */
-	Assert(OidIsValid(indexRelation->rd_rel->relam));
-
-	/*
-	 * Need an EState for evaluation of index expressions and partial-index
-	 * predicates.  Also a slot to hold the current tuple.
-	 */
-	estate = CreateExecutorState();
-	econtext = GetPerTupleExprContext(estate);
-	slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation),
-									&TTSOpsHeapTuple);
-
-	/* Arrange for econtext's scan tuple to be the tuple under test */
-	econtext->ecxt_scantuple = slot;
-
-	/* Set up execution state for predicate, if any. */
-	predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
-
-	/*
-	 * Prepare for scan of the base relation.  We need just those tuples
-	 * satisfying the passed-in reference snapshot.  We must disable syncscan
-	 * here, because it's critical that we read from block zero forward to
-	 * match the sorted TIDs.
-	 */
-	scan = table_beginscan_strat(heapRelation,	/* relation */
-								 snapshot,	/* snapshot */
-								 0,	/* number of keys */
-								 NULL,	/* scan key */
-								 true,	/* buffer access strategy OK */
-								 false); /* syncscan not OK */
-	hscan = (HeapScanDesc) scan;
-
-	/*
-	 * Scan all tuples matching the snapshot.
-	 */
-	while ((heapTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-	{
-		ItemPointer heapcursor = &heapTuple->t_self;
-		ItemPointerData rootTuple;
-		OffsetNumber root_offnum;
-
-		CHECK_FOR_INTERRUPTS();
-
-		state->htups += 1;
-
-		/*
-		 * As commented in IndexBuildHeapScan, we should index heap-only
-		 * tuples under the TIDs of their root tuples; so when we advance onto
-		 * a new heap page, build a map of root item offsets on the page.
-		 *
-		 * This complicates merging against the tuplesort output: we will
-		 * visit the live tuples in order by their offsets, but the root
-		 * offsets that we need to compare against the index contents might be
-		 * ordered differently.  So we might have to "look back" within the
-		 * tuplesort output, but only within the current page.  We handle that
-		 * by keeping a bool array in_index[] showing all the
-		 * already-passed-over tuplesort output TIDs of the current page. We
-		 * clear that array here, when advancing onto a new heap page.
-		 */
-		if (hscan->rs_cblock != root_blkno)
-		{
-			Page		page = BufferGetPage(hscan->rs_cbuf);
-
-			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
-			heap_get_root_tuples(page, root_offsets);
-			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-
-			memset(in_index, 0, sizeof(in_index));
-
-			root_blkno = hscan->rs_cblock;
-		}
-
-		/* Convert actual tuple TID to root TID */
-		rootTuple = *heapcursor;
-		root_offnum = ItemPointerGetOffsetNumber(heapcursor);
-
-		if (HeapTupleIsHeapOnly(heapTuple))
-		{
-			root_offnum = root_offsets[root_offnum - 1];
-			if (!OffsetNumberIsValid(root_offnum))
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg_internal("failed to find parent tuple for heap-only tuple at (%u,%u) in table \"%s\"",
-										 ItemPointerGetBlockNumber(heapcursor),
-										 ItemPointerGetOffsetNumber(heapcursor),
-										 RelationGetRelationName(heapRelation))));
-			ItemPointerSetOffsetNumber(&rootTuple, root_offnum);
-		}
-
-		/*
-		 * "merge" by skipping through the index tuples until we find or pass
-		 * the current root tuple.
-		 */
-		while (!tuplesort_empty &&
-			   (!indexcursor ||
-				ItemPointerCompare(indexcursor, &rootTuple) < 0))
-		{
-			Datum		ts_val;
-			bool		ts_isnull;
-
-			if (indexcursor)
-			{
-				/*
-				 * Remember index items seen earlier on the current heap page
-				 */
-				if (ItemPointerGetBlockNumber(indexcursor) == root_blkno)
-					in_index[ItemPointerGetOffsetNumber(indexcursor) - 1] = true;
-			}
-
-			tuplesort_empty = !tuplesort_getdatum(state->tuplesort, true,
-												  &ts_val, &ts_isnull, NULL);
-			Assert(tuplesort_empty || !ts_isnull);
-			if (!tuplesort_empty)
-			{
-				itemptr_decode(&decoded, DatumGetInt64(ts_val));
-				indexcursor = &decoded;
-
-				/* If int8 is pass-by-ref, free (encoded) TID Datum memory */
-#ifndef USE_FLOAT8_BYVAL
-				pfree(DatumGetPointer(ts_val));
-#endif
-			}
-			else
-			{
-				/* Be tidy */
-				indexcursor = NULL;
-			}
-		}
-
-		/*
-		 * If the tuplesort has overshot *and* we didn't see a match earlier,
-		 * then this tuple is missing from the index, so insert it.
-		 */
-		if ((tuplesort_empty ||
-			 ItemPointerCompare(indexcursor, &rootTuple) > 0) &&
-			!in_index[root_offnum - 1])
-		{
-			MemoryContextReset(econtext->ecxt_per_tuple_memory);
-
-			/* Set up for predicate or expression evaluation */
-			ExecStoreHeapTuple(heapTuple, slot, false);
-
-			/*
-			 * In a partial index, discard tuples that don't satisfy the
-			 * predicate.
-			 */
-			if (predicate != NULL)
-			{
-				if (!ExecQual(predicate, econtext))
-					continue;
-			}
-
-			/*
-			 * For the current heap tuple, extract all the attributes we use
-			 * in this index, and note which are null.  This also performs
-			 * evaluation of any expressions needed.
-			 */
-			FormIndexDatum(indexInfo,
-						   slot,
-						   estate,
-						   values,
-						   isnull);
-
-			/*
-			 * You'd think we should go ahead and build the index tuple here,
-			 * but some index AMs want to do further processing on the data
-			 * first. So pass the values[] and isnull[] arrays, instead.
-			 */
-
-			/*
-			 * If the tuple is already committed dead, you might think we
-			 * could suppress uniqueness checking, but this is no longer true
-			 * in the presence of HOT, because the insert is actually a proxy
-			 * for a uniqueness check on the whole HOT-chain.  That is, the
-			 * tuple we have here could be dead because it was already
-			 * HOT-updated, and if so the updating transaction will not have
-			 * thought it should insert index entries.  The index AM will
-			 * check the whole HOT-chain and correctly detect a conflict if
-			 * there is one.
-			 */
-
-			index_insert(indexRelation,
-						 values,
-						 isnull,
-						 &rootTuple,
-						 heapRelation,
-						 indexInfo->ii_Unique ?
-						 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
-						 indexInfo);
-
-			state->tups_inserted += 1;
-		}
-	}
-
-	table_endscan(scan);
-
-	ExecDropSingleTupleTableSlot(slot);
-
-	FreeExecutorState(estate);
-
-	/* These may have been pointing to the now-gone estate */
-	indexInfo->ii_ExpressionsState = NIL;
-	indexInfo->ii_PredicateState = NULL;
-}
-
-
 /*
  * index_set_state_flags - adjust pg_index state flags
  *
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 2c2d388dda6..bd2cdd34e08 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -16,6 +16,7 @@
 
 #include "access/relscan.h"
 #include "access/sdir.h"
+#include "catalog/index.h"
 #include "utils/guc.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
@@ -27,6 +28,7 @@ extern char *default_table_access_method;
 extern bool synchronize_seqscans;
 
 
+struct ValidateIndexState;
 struct BulkInsertStateData;
 
 /*
@@ -294,6 +296,28 @@ typedef struct TableAmRoutine
 	 */
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
+
+	/* ------------------------------------------------------------------------
+	 * DDL related functionality.
+	 * ------------------------------------------------------------------------
+	 */
+
+	double		(*index_build_range_scan) (Relation heap_rel,
+										   Relation index_rel,
+										   IndexInfo *index_nfo,
+										   bool allow_sync,
+										   bool anyvisible,
+										   BlockNumber start_blockno,
+										   BlockNumber end_blockno,
+										   IndexBuildCallback callback,
+										   void *callback_state,
+										   TableScanDesc scan);
+	void		(*index_validate_scan) (Relation heap_rel,
+										Relation index_rel,
+										IndexInfo *index_info,
+										Snapshot snapshot,
+										struct ValidateIndexState *state);
+
 } TableAmRoutine;
 
 
@@ -706,6 +730,71 @@ table_finish_bulk_insert(Relation rel, int options)
 }
 
 
+/* ------------------------------------------------------------------------
+ * DDL related functionality.
+ * ------------------------------------------------------------------------
+ */
+
+static inline double
+table_index_build_scan(Relation heap_rel,
+					   Relation index_rel,
+					   IndexInfo *index_nfo,
+					   bool allow_sync,
+					   IndexBuildCallback callback,
+					   void *callback_state,
+					   TableScanDesc scan)
+{
+	return heap_rel->rd_tableam->index_build_range_scan(heap_rel,
+														index_rel,
+														index_nfo,
+														allow_sync,
+														false,
+														0,
+														InvalidBlockNumber,
+														callback,
+														callback_state,
+														scan);
+}
+
+static inline void
+table_index_validate_scan(Relation heap_rel,
+						  Relation index_rel,
+						  IndexInfo *index_info,
+						  Snapshot snapshot,
+						  struct ValidateIndexState *state)
+{
+	heap_rel->rd_tableam->index_validate_scan(heap_rel,
+											  index_rel,
+											  index_info,
+											  snapshot,
+											  state);
+}
+
+static inline double
+table_index_build_range_scan(Relation heap_rel,
+							 Relation index_rel,
+							 IndexInfo *index_nfo,
+							 bool allow_sync,
+							 bool anyvisible,
+							 BlockNumber start_blockno,
+							 BlockNumber numblocks,
+							 IndexBuildCallback callback,
+							 void *callback_state,
+							 TableScanDesc scan)
+{
+	return heap_rel->rd_tableam->index_build_range_scan(heap_rel,
+														index_rel,
+														index_nfo,
+														allow_sync,
+														anyvisible,
+														start_blockno,
+														numblocks,
+														callback,
+														callback_state,
+														scan);
+}
+
+
 /* ----------------------------------------------------------------------------
  * Functions to make modifications a bit simpler.
  * ----------------------------------------------------------------------------
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 29f7ed62379..8ac0e11f5fe 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -20,7 +20,7 @@
 
 #define DEFAULT_INDEX_TYPE	"btree"
 
-/* Typedef for callback function for IndexBuildHeapScan */
+/* Typedef for callback function for table_index_build_scan */
 typedef void (*IndexBuildCallback) (Relation index,
 									HeapTuple htup,
 									Datum *values,
@@ -37,6 +37,15 @@ typedef enum
 	INDEX_DROP_SET_DEAD
 } IndexStateFlagsAction;
 
+/* state info for validate_index bulkdelete callback */
+typedef struct ValidateIndexState
+{
+	Tuplesortstate *tuplesort;	/* for sorting the index TIDs */
+	/* statistics (for debug purposes only): */
+	double		htups,
+				itups,
+				tups_inserted;
+} ValidateIndexState;
 
 extern void index_check_primary_key(Relation heapRel,
 						IndexInfo *indexInfo,
@@ -110,25 +119,6 @@ extern void index_build(Relation heapRelation,
 			bool isreindex,
 			bool parallel);
 
-struct TableScanDescData;
-extern double IndexBuildHeapScan(Relation heapRelation,
-				   Relation indexRelation,
-				   IndexInfo *indexInfo,
-				   bool allow_sync,
-				   IndexBuildCallback callback,
-				   void *callback_state,
-				   struct TableScanDescData *scan);
-extern double IndexBuildHeapRangeScan(Relation heapRelation,
-						Relation indexRelation,
-						IndexInfo *indexInfo,
-						bool allow_sync,
-						bool anyvisible,
-						BlockNumber start_blockno,
-						BlockNumber end_blockno,
-						IndexBuildCallback callback,
-						void *callback_state,
-						struct TableScanDescData *scan);
-
 extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
 
 extern void index_set_state_flags(Oid indexId, IndexStateFlagsAction action);
@@ -155,4 +145,45 @@ extern void RestoreReindexState(void *reindexstate);
 
 extern void IndexSetParentIndex(Relation idx, Oid parentOid);
 
+
+/*
+ * itemptr_encode - Encode ItemPointer as int64/int8
+ *
+ * This representation must produce values encoded as int64 that sort in the
+ * same order as their corresponding original TID values would (using the
+ * default int8 opclass to produce a result equivalent to the default TID
+ * opclass).
+ *
+ * As noted in validate_index(), this can be significantly faster.
+ */
+static inline int64
+itemptr_encode(ItemPointer itemptr)
+{
+	BlockNumber block = ItemPointerGetBlockNumber(itemptr);
+	OffsetNumber offset = ItemPointerGetOffsetNumber(itemptr);
+	int64		encoded;
+
+	/*
+	 * Use the 16 least significant bits for the offset.  32 adjacent bits are
+	 * used for the block number.  Since remaining bits are unused, there
+	 * cannot be negative encoded values (We assume a two's complement
+	 * representation).
+	 */
+	encoded = ((uint64) block << 16) | (uint16) offset;
+
+	return encoded;
+}
+
+/*
+ * itemptr_decode - Decode int64/int8 representation back to ItemPointer
+ */
+static inline void
+itemptr_decode(ItemPointer itemptr, int64 encoded)
+{
+	BlockNumber block = (BlockNumber) (encoded >> 16);
+	OffsetNumber offset = (OffsetNumber) (encoded & 0xFFFF);
+
+	ItemPointerSet(itemptr, block, offset);
+}
+
 #endif							/* INDEX_H */
-- 
2.21.0.dirty

