From 5db15d85ce237c5b288c093bcdad53082a78fab3 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 20 Jan 2019 00:02:52 -0800
Subject: [PATCH v18 12/18] tableam: VACUUM and ANALYZE.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam_handler.c | 160 ++++++++++++++++++
 src/backend/commands/analyze.c           | 205 +++++------------------
 src/backend/commands/vacuum.c            |   2 +-
 src/include/access/tableam.h             |  25 +++
 4 files changed, 229 insertions(+), 163 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 9e67d48b6ea..b35ed21581e 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -678,6 +678,163 @@ heapam_relation_copy_data(Relation rel, RelFileNode newrnode)
 	smgrclose(dstrel);
 }
 
+static void
+heapam_scan_analyze_next_block(TableScanDesc sscan, BlockNumber blockno, BufferAccessStrategy bstrategy)
+{
+	HeapScanDesc scan = (HeapScanDesc) sscan;
+
+	/*
+	 * We must maintain a pin on the target page's buffer to ensure that the
+	 * maxoffset value stays good (else concurrent VACUUM might delete tuples
+	 * out from under us).  Hence, pin the page until we are done looking at
+	 * it.  We also choose to hold sharelock on the buffer throughout --- we
+	 * could release and re-acquire sharelock for each tuple, but since we
+	 * aren't doing much work per tuple, the extra lock traffic is probably
+	 * better avoided.
+	 */
+	scan->rs_cblock = blockno;
+	scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, blockno,
+									   RBM_NORMAL, bstrategy);
+	scan->rs_cindex = FirstOffsetNumber;
+	LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+}
+
+static bool
+heapam_scan_analyze_next_tuple(TableScanDesc sscan, TransactionId OldestXmin, double *liverows, double *deadrows, TupleTableSlot *slot)
+{
+	HeapScanDesc scan = (HeapScanDesc) sscan;
+	Page		targpage;
+	OffsetNumber maxoffset;
+	BufferHeapTupleTableSlot *hslot;
+
+	Assert(TTS_IS_BUFFERTUPLE(slot));
+
+	hslot = (BufferHeapTupleTableSlot *) slot;
+	targpage = BufferGetPage(scan->rs_cbuf);
+	maxoffset = PageGetMaxOffsetNumber(targpage);
+
+	/* Inner loop over all tuples on the selected page */
+	for (; scan->rs_cindex <= maxoffset; scan->rs_cindex++)
+	{
+		ItemId		itemid;
+		HeapTuple	targtuple = &hslot->base.tupdata;
+		bool		sample_it = false;
+
+		itemid = PageGetItemId(targpage, scan->rs_cindex);
+
+		/*
+		 * We ignore unused and redirect line pointers.  DEAD line pointers
+		 * should be counted as dead, because we need vacuum to run to get rid
+		 * of them.  Note that this rule agrees with the way that
+		 * heap_page_prune() counts things.
+		 */
+		if (!ItemIdIsNormal(itemid))
+		{
+			if (ItemIdIsDead(itemid))
+				*deadrows += 1;
+			continue;
+		}
+
+		ItemPointerSet(&targtuple->t_self, scan->rs_cblock, scan->rs_cindex);
+
+		targtuple->t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
+		targtuple->t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
+		targtuple->t_len = ItemIdGetLength(itemid);
+
+		switch (HeapTupleSatisfiesVacuum(targtuple, OldestXmin, scan->rs_cbuf))
+		{
+			case HEAPTUPLE_LIVE:
+				sample_it = true;
+				*liverows += 1;
+				break;
+
+			case HEAPTUPLE_DEAD:
+			case HEAPTUPLE_RECENTLY_DEAD:
+				/* Count dead and recently-dead rows */
+				*deadrows += 1;
+				break;
+
+			case HEAPTUPLE_INSERT_IN_PROGRESS:
+
+				/*
+				 * Insert-in-progress rows are not counted.  We assume that
+				 * when the inserting transaction commits or aborts, it will
+				 * send a stats message to increment the proper count.  This
+				 * works right only if that transaction ends after we finish
+				 * analyzing the table; if things happen in the other order,
+				 * its stats update will be overwritten by ours.  However, the
+				 * error will be large only if the other transaction runs long
+				 * enough to insert many tuples, so assuming it will finish
+				 * after us is the safer option.
+				 *
+				 * A special case is that the inserting transaction might be
+				 * our own.  In this case we should count and sample the row,
+				 * to accommodate users who load a table and analyze it in one
+				 * transaction.  (pgstat_report_analyze has to adjust the
+				 * numbers we send to the stats collector to make this come
+				 * out right.)
+				 */
+				if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple->t_data)))
+				{
+					sample_it = true;
+					*liverows += 1;
+				}
+				break;
+
+			case HEAPTUPLE_DELETE_IN_PROGRESS:
+
+				/*
+				 * We count and sample delete-in-progress rows the same as
+				 * live ones, so that the stats counters come out right if the
+				 * deleting transaction commits after us, per the same
+				 * reasoning given above.
+				 *
+				 * If the delete was done by our own transaction, however, we
+				 * must count the row as dead to make pgstat_report_analyze's
+				 * stats adjustments come out right.  (Note: this works out
+				 * properly when the row was both inserted and deleted in our
+				 * xact.)
+				 *
+				 * The net effect of these choices is that we act as though an
+				 * IN_PROGRESS transaction hasn't happened yet, except if it
+				 * is our own transaction, which we assume has happened.
+				 *
+				 * This approach ensures that we behave sanely if we see both
+				 * the pre-image and post-image rows for a row being updated
+				 * by a concurrent transaction: we will sample the pre-image
+				 * but not the post-image.  We also get sane results if the
+				 * concurrent transaction never commits.
+				 */
+				if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(targtuple->t_data)))
+					deadrows += 1;
+				else
+				{
+					sample_it = true;
+					liverows += 1;
+				}
+				break;
+
+			default:
+				elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+				break;
+		}
+
+		if (sample_it)
+		{
+			ExecStoreBufferHeapTuple(targtuple, slot, scan->rs_cbuf);
+			scan->rs_cindex++;
+
+			/* note that we leave the buffer locked here! */
+			return true;
+		}
+	}
+
+	/* Now release the lock and pin on the page */
+	UnlockReleaseBuffer(scan->rs_cbuf);
+	scan->rs_cbuf = InvalidBuffer;
+
+	return false;
+}
 
 static void
 heapam_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex,
@@ -1721,6 +1878,9 @@ static const TableAmRoutine heapam_methods = {
 	.relation_set_new_filenode = heapam_set_new_filenode,
 	.relation_nontransactional_truncate = heapam_relation_nontransactional_truncate,
 	.relation_copy_data = heapam_relation_copy_data,
+	.relation_vacuum = heap_vacuum_rel,
+	.scan_analyze_next_block = heapam_scan_analyze_next_block,
+	.scan_analyze_next_tuple = heapam_scan_analyze_next_tuple,
 	.relation_copy_for_cluster = heapam_copy_for_cluster,
 	.index_build_range_scan = heapam_index_build_range_scan,
 	.index_validate_scan = heapam_index_validate_scan,
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index c8192353ebe..996dc500a8f 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -17,11 +17,11 @@
 #include <math.h>
 
 #include "access/genam.h"
-#include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/relation.h"
 #include "access/sysattr.h"
 #include "access/table.h"
+#include "access/tableam.h"
 #include "access/transam.h"
 #include "access/tupconvert.h"
 #include "access/tuptoaster.h"
@@ -1014,6 +1014,8 @@ acquire_sample_rows(Relation onerel, int elevel,
 	TransactionId OldestXmin;
 	BlockSamplerData bs;
 	ReservoirStateData rstate;
+	TupleTableSlot *slot;
+	TableScanDesc scan;
 
 	Assert(targrows > 0);
 
@@ -1027,193 +1029,72 @@ acquire_sample_rows(Relation onerel, int elevel,
 	/* Prepare for sampling rows */
 	reservoir_init_selection_state(&rstate, targrows);
 
+	scan = table_beginscan_analyze(onerel);
+	slot = table_slot_create(onerel, NULL);
+
 	/* Outer loop over blocks to sample */
 	while (BlockSampler_HasMore(&bs))
 	{
 		BlockNumber targblock = BlockSampler_Next(&bs);
-		Buffer		targbuffer;
-		Page		targpage;
-		OffsetNumber targoffset,
-					maxoffset;
 
 		vacuum_delay_point();
 
 		/*
-		 * We must maintain a pin on the target page's buffer to ensure that
-		 * the maxoffset value stays good (else concurrent VACUUM might delete
-		 * tuples out from under us).  Hence, pin the page until we are done
-		 * looking at it.  We also choose to hold sharelock on the buffer
-		 * throughout --- we could release and re-acquire sharelock for each
-		 * tuple, but since we aren't doing much work per tuple, the extra
-		 * lock traffic is probably better avoided.
+		 * XXX: we could have this function return a boolean, instead of
+		 * forcing such checks to happen in next_tuple().
 		 */
-		targbuffer = ReadBufferExtended(onerel, MAIN_FORKNUM, targblock,
-										RBM_NORMAL, vac_strategy);
-		LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
-		targpage = BufferGetPage(targbuffer);
-		maxoffset = PageGetMaxOffsetNumber(targpage);
+		table_scan_analyze_next_block(scan, targblock, vac_strategy);
 
-		/* Inner loop over all tuples on the selected page */
-		for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
+		while (table_scan_analyze_next_tuple(scan, OldestXmin, &liverows, &deadrows, slot))
 		{
-			ItemId		itemid;
-			HeapTupleData targtuple;
-			bool		sample_it = false;
-
-			itemid = PageGetItemId(targpage, targoffset);
-
 			/*
-			 * We ignore unused and redirect line pointers.  DEAD line
-			 * pointers should be counted as dead, because we need vacuum to
-			 * run to get rid of them.  Note that this rule agrees with the
-			 * way that heap_page_prune() counts things.
+			 * The first targrows sample rows are simply copied into the
+			 * reservoir. Then we start replacing tuples in the sample
+			 * until we reach the end of the relation.  This algorithm is
+			 * from Jeff Vitter's paper (see full citation below). It
+			 * works by repeatedly computing the number of tuples to skip
+			 * before selecting a tuple, which replaces a randomly chosen
+			 * element of the reservoir (current set of tuples).  At all
+			 * times the reservoir is a true random sample of the tuples
+			 * we've passed over so far, so when we fall off the end of
+			 * the relation we're done.
 			 */
-			if (!ItemIdIsNormal(itemid))
-			{
-				if (ItemIdIsDead(itemid))
-					deadrows += 1;
-				continue;
-			}
-
-			ItemPointerSet(&targtuple.t_self, targblock, targoffset);
-
-			targtuple.t_tableOid = RelationGetRelid(onerel);
-			targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
-			targtuple.t_len = ItemIdGetLength(itemid);
-
-			switch (HeapTupleSatisfiesVacuum(&targtuple,
-											 OldestXmin,
-											 targbuffer))
-			{
-				case HEAPTUPLE_LIVE:
-					sample_it = true;
-					liverows += 1;
-					break;
-
-				case HEAPTUPLE_DEAD:
-				case HEAPTUPLE_RECENTLY_DEAD:
-					/* Count dead and recently-dead rows */
-					deadrows += 1;
-					break;
-
-				case HEAPTUPLE_INSERT_IN_PROGRESS:
-
-					/*
-					 * Insert-in-progress rows are not counted.  We assume
-					 * that when the inserting transaction commits or aborts,
-					 * it will send a stats message to increment the proper
-					 * count.  This works right only if that transaction ends
-					 * after we finish analyzing the table; if things happen
-					 * in the other order, its stats update will be
-					 * overwritten by ours.  However, the error will be large
-					 * only if the other transaction runs long enough to
-					 * insert many tuples, so assuming it will finish after us
-					 * is the safer option.
-					 *
-					 * A special case is that the inserting transaction might
-					 * be our own.  In this case we should count and sample
-					 * the row, to accommodate users who load a table and
-					 * analyze it in one transaction.  (pgstat_report_analyze
-					 * has to adjust the numbers we send to the stats
-					 * collector to make this come out right.)
-					 */
-					if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
-					{
-						sample_it = true;
-						liverows += 1;
-					}
-					break;
-
-				case HEAPTUPLE_DELETE_IN_PROGRESS:
-
-					/*
-					 * We count and sample delete-in-progress rows the same as
-					 * live ones, so that the stats counters come out right if
-					 * the deleting transaction commits after us, per the same
-					 * reasoning given above.
-					 *
-					 * If the delete was done by our own transaction, however,
-					 * we must count the row as dead to make
-					 * pgstat_report_analyze's stats adjustments come out
-					 * right.  (Note: this works out properly when the row was
-					 * both inserted and deleted in our xact.)
-					 *
-					 * The net effect of these choices is that we act as
-					 * though an IN_PROGRESS transaction hasn't happened yet,
-					 * except if it is our own transaction, which we assume
-					 * has happened.
-					 *
-					 * This approach ensures that we behave sanely if we see
-					 * both the pre-image and post-image rows for a row being
-					 * updated by a concurrent transaction: we will sample the
-					 * pre-image but not the post-image.  We also get sane
-					 * results if the concurrent transaction never commits.
-					 */
-					if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(targtuple.t_data)))
-						deadrows += 1;
-					else
-					{
-						sample_it = true;
-						liverows += 1;
-					}
-					break;
-
-				default:
-					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
-					break;
-			}
-
-			if (sample_it)
+			if (numrows < targrows)
+				rows[numrows++] = ExecCopySlotHeapTuple(slot);
+			else
 			{
 				/*
-				 * The first targrows sample rows are simply copied into the
-				 * reservoir. Then we start replacing tuples in the sample
-				 * until we reach the end of the relation.  This algorithm is
-				 * from Jeff Vitter's paper (see full citation below). It
-				 * works by repeatedly computing the number of tuples to skip
-				 * before selecting a tuple, which replaces a randomly chosen
-				 * element of the reservoir (current set of tuples).  At all
-				 * times the reservoir is a true random sample of the tuples
-				 * we've passed over so far, so when we fall off the end of
-				 * the relation we're done.
+				 * t in Vitter's paper is the number of records already
+				 * processed.  If we need to compute a new S value, we
+				 * must use the not-yet-incremented value of samplerows as
+				 * t.
 				 */
-				if (numrows < targrows)
-					rows[numrows++] = heap_copytuple(&targtuple);
-				else
+				if (rowstoskip < 0)
+					rowstoskip = reservoir_get_next_S(&rstate, samplerows, targrows);
+
+				if (rowstoskip <= 0)
 				{
 					/*
-					 * t in Vitter's paper is the number of records already
-					 * processed.  If we need to compute a new S value, we
-					 * must use the not-yet-incremented value of samplerows as
-					 * t.
+					 * Found a suitable tuple, so save it, replacing one
+					 * old tuple at random
 					 */
-					if (rowstoskip < 0)
-						rowstoskip = reservoir_get_next_S(&rstate, samplerows, targrows);
+					int			k = (int) (targrows * sampler_random_fract(rstate.randstate));
 
-					if (rowstoskip <= 0)
-					{
-						/*
-						 * Found a suitable tuple, so save it, replacing one
-						 * old tuple at random
-						 */
-						int			k = (int) (targrows * sampler_random_fract(rstate.randstate));
-
-						Assert(k >= 0 && k < targrows);
-						heap_freetuple(rows[k]);
-						rows[k] = heap_copytuple(&targtuple);
-					}
-
-					rowstoskip -= 1;
+					Assert(k >= 0 && k < targrows);
+					heap_freetuple(rows[k]);
+					rows[k] = ExecCopySlotHeapTuple(slot);
 				}
 
-				samplerows += 1;
+				rowstoskip -= 1;
 			}
-		}
 
-		/* Now release the lock and pin on the page */
-		UnlockReleaseBuffer(targbuffer);
+			samplerows += 1;
+		}
 	}
 
+	ExecDropSingleTupleTableSlot(slot);
+	table_endscan(scan);
+
 	/*
 	 * If we didn't find as many tuples as we wanted then we're done. No sort
 	 * is needed, since they're already in order.
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 3763a8c39e0..61d6d62e6d9 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1711,7 +1711,7 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 		cluster_rel(relid, InvalidOid, cluster_options);
 	}
 	else
-		heap_vacuum_rel(onerel, options, params, vac_strategy);
+		table_vacuum_rel(onerel, options, params, vac_strategy);
 
 	/* Roll back any GUC changes executed by index functions */
 	AtEOXact_GUC(false, save_nestlevel);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 125ed1c012a..8df3abd90a2 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -28,6 +28,7 @@ extern char *default_table_access_method;
 extern bool synchronize_seqscans;
 
 
+struct VacuumParams;
 struct ValidateIndexState;
 struct BulkInsertStateData;
 
@@ -308,6 +309,12 @@ typedef struct TableAmRoutine
 											  MultiXactId *minmulti);
 	void		(*relation_nontransactional_truncate) (Relation rel);
 	void		(*relation_copy_data) (Relation rel, RelFileNode newrnode);
+	void		(*relation_vacuum) (Relation onerel, int options,
+									struct VacuumParams *params, BufferAccessStrategy bstrategy);
+	void		(*scan_analyze_next_block) (TableScanDesc scan, BlockNumber blockno,
+											BufferAccessStrategy bstrategy);
+	bool		(*scan_analyze_next_tuple) (TableScanDesc scan, TransactionId OldestXmin,
+											double *liverows, double *deadrows, TupleTableSlot *slot);
 	void		(*relation_copy_for_cluster) (Relation NewHeap, Relation OldHeap, Relation OldIndex,
 											  bool use_sort,
 											  TransactionId OldestXmin, TransactionId FreezeXid, MultiXactId MultiXactCutoff,
@@ -765,6 +772,24 @@ table_relation_copy_data(Relation rel, RelFileNode newrnode)
 	rel->rd_tableam->relation_copy_data(rel, newrnode);
 }
 
+static inline void
+table_vacuum_rel(Relation rel, int options,
+				 struct VacuumParams *params, BufferAccessStrategy bstrategy)
+{
+	rel->rd_tableam->relation_vacuum(rel, options, params, bstrategy);
+}
+
+static inline void
+table_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, BufferAccessStrategy bstrategy)
+{
+	scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, blockno, bstrategy);
+}
+
+static inline bool
+table_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, double *liverows, double *deadrows, TupleTableSlot *slot)
+{
+	return scan->rs_rd->rd_tableam->scan_analyze_next_tuple(scan, OldestXmin, liverows, deadrows, slot);
+}
 
 /* XXX: Move arguments to struct? */
 static inline void
-- 
2.21.0.dirty

