From b33cd81ba0e92e02077d133009ac2ac9addc0983 Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepikhov@postgrespro.ru>
Date: Wed, 27 Jun 2018 10:14:19 +0500
Subject: [PATCH 2/2] Quick Vacuum Strategy patch v.3

---
 src/backend/access/heap/heapam.c    |  31 +++++++
 src/backend/access/heap/pruneheap.c |  40 ++++++++-
 src/backend/commands/vacuumlazy.c   | 162 +++++++++++++++++++++++++++++++++++-
 src/backend/utils/misc/guc.c        |  10 ++-
 src/include/access/heapam.h         |   1 +
 5 files changed, 235 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 72395a5..bf964f8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -9393,6 +9393,36 @@ heap_sync(Relation rel)
 }
 
 /*
+ * Mask DEAD tuples at a HEAP page
+ *
+ * We introduce this function for wal_consistency_checking satisfaction at
+ * Hot Standby node.
+ *
+ * DEAD tuple on a master node has t_cid and t_infomask, which can not be
+ * obtained by WAL records applying on a Hot Standby node.
+ */
+static void
+mask_dead_tuples(Page page)
+{
+	OffsetNumber	offnum;
+
+	for (offnum = FirstOffsetNumber; offnum <= PageGetMaxOffsetNumber(page); offnum = OffsetNumberNext(offnum))
+	{
+		ItemId	lp = PageGetItemId(page, offnum);
+		char*	htup_begin;
+
+		if (!ItemIdIsDead(lp))
+			continue;
+
+		if (!ItemIdHasStorage(lp))
+			continue;
+
+		htup_begin = (char *) PageGetItem(page, lp);
+		memset(htup_begin, MASK_MARKER, ItemIdGetLength(lp));
+	}
+}
+
+/*
  * Mask a heap page before performing consistency checks on it.
  */
 void
@@ -9405,6 +9435,7 @@ heap_mask(char *pagedata, BlockNumber blkno)
 
 	mask_page_hint_bits(page);
 	mask_unused_space(page);
+	mask_dead_tuples(page);
 
 	for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
 	{
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index c2f5343..ebbafe3 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -43,6 +43,9 @@ typedef struct
 	bool		marked[MaxHeapTuplesPerPage + 1];
 } PruneState;
 
+/* Parameter for target deletion strategy in lazy vacuum */
+double target_index_deletion_factor = 0.01;
+
 /* Local functions */
 static int heap_prune_chain(Relation relation, Buffer buffer,
 				 OffsetNumber rootoffnum,
@@ -405,7 +408,7 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 			if (HeapTupleSatisfiesVacuum(&tup, OldestXmin, buffer)
 				== HEAPTUPLE_DEAD && !HeapTupleHeaderIsHotUpdated(htup))
 			{
-				heap_prune_record_unused(prstate, rootoffnum);
+				heap_prune_record_dead(prstate, rootoffnum);
 				HeapTupleHeaderAdvanceLatestRemovedXid(htup,
 													   &prstate->latestRemovedXid);
 				ndeleted++;
@@ -580,8 +583,10 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 */
 		for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
 		{
-			heap_prune_record_unused(prstate, chainitems[i]);
 			ndeleted++;
+			if (chainitems[i] == latestdead)
+				continue;
+			heap_prune_record_unused(prstate, chainitems[i]);
 		}
 
 		/*
@@ -598,9 +603,28 @@ heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
 		 * redirect the root to the correct chain member.
 		 */
 		if (i >= nchain)
+		{
+			if (rootoffnum != latestdead)
+			{
+				if (ItemIdIsNormal(rootlp))
+					heap_prune_record_unused(prstate, latestdead);
+				else
+				{
+					/*
+					 * We allow overlapping of redirected and dead items
+					 */
+					heap_prune_record_redirect(prstate, rootoffnum, latestdead);
+					heap_prune_record_dead(prstate, latestdead);
+				}
+			}
 			heap_prune_record_dead(prstate, rootoffnum);
+		}
 		else
+		{
+			if (rootoffnum != latestdead)
+				heap_prune_record_unused(prstate, latestdead);
 			heap_prune_record_redirect(prstate, rootoffnum, chainitems[i]);
+		}
 	}
 	else if (nchain < 2 && ItemIdIsRedirected(rootlp))
 	{
@@ -653,7 +677,12 @@ heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum)
 	Assert(prstate->ndead < MaxHeapTuplesPerPage);
 	prstate->nowdead[prstate->ndead] = offnum;
 	prstate->ndead++;
-	Assert(!prstate->marked[offnum]);
+
+	/*
+	 * We suppress checking prstate->marked[offnum]. It is not the best idea,
+	 * but this is most simplistic way to enable Dead Redirecting by
+	 * overlapping Dead and Redirected states.
+	 */
 	prstate->marked[offnum] = true;
 }
 
@@ -706,7 +735,10 @@ heap_page_prune_execute(Buffer buffer,
 		OffsetNumber off = *offnum++;
 		ItemId		lp = PageGetItemId(page, off);
 
-		ItemIdSetDead(lp);
+		if (target_index_deletion_factor > 0)
+			ItemIdMarkDead(lp);
+		else
+			ItemIdSetDead(lp);
 	}
 
 	/* Update all now-unused line pointers */
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 5649a70..15f7e32 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -139,7 +139,6 @@ typedef struct LVRelStats
 	bool		lock_waiter_detected;
 } LVRelStats;
 
-
 /* A few variables that don't seem worth passing around as parameters */
 static int	elevel = -1;
 
@@ -156,6 +155,8 @@ static void lazy_scan_heap(Relation onerel, int options,
 			   bool aggressive);
 static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
+static void quick_vacuum_index(Relation irel, Relation hrel,
+					LVRelStats *vacrelstats);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
 				  LVRelStats *vacrelstats);
@@ -734,10 +735,17 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 			/* Remove index entries */
 			for (i = 0; i < nindexes; i++)
-				lazy_vacuum_index(Irel[i],
+			{
+				bool use_quick_strategy = (vacrelstats->num_dead_tuples/vacrelstats->old_live_tuples < target_index_deletion_factor);
+
+				if (use_quick_strategy)
+					quick_vacuum_index(Irel[i], onerel, vacrelstats);
+				else
+					lazy_vacuum_index(Irel[i],
 								  &indstats[i],
 								  vacrelstats);
 
+			}
 			/*
 			 * Report that we are now vacuuming the heap.  We also increase
 			 * the number of index scans here; note that by using
@@ -1379,10 +1387,16 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 
 		/* Remove index entries */
 		for (i = 0; i < nindexes; i++)
-			lazy_vacuum_index(Irel[i],
+		{
+			bool use_quick_strategy = (vacrelstats->num_dead_tuples/vacrelstats->old_live_tuples < target_index_deletion_factor);
+
+			if (use_quick_strategy)
+				quick_vacuum_index(Irel[i], onerel, vacrelstats);
+			else
+				lazy_vacuum_index(Irel[i],
 							  &indstats[i],
 							  vacrelstats);
-
+		}
 		/* Report that we are now vacuuming the heap */
 		hvp_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_HEAP;
 		hvp_val[1] = vacrelstats->num_index_scans + 1;
@@ -1671,6 +1685,152 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
 	return false;
 }
 
+/*
+ * Get tuple from heap for a scan key building.
+ */
+static HeapTuple
+get_tuple_by_tid(Relation rel, ItemPointer tid)
+{
+	Buffer			buffer;
+	Page			page;
+	OffsetNumber	offnum;
+	ItemId			lp;
+	HeapTuple		tuple;
+
+	buffer = ReadBufferExtended(rel, MAIN_FORKNUM, ItemPointerGetBlockNumber(tid), RBM_NORMAL, NULL);
+	LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+	page = (Page) BufferGetPage(buffer);
+	offnum = ItemPointerGetOffsetNumber(tid);
+	lp = PageGetItemId(page, offnum);
+
+	/*
+	 * VACUUM Races: someone already remove the tuple from HEAP. Ignore it.
+	 */
+	if (!ItemIdIsUsed(lp))
+	{
+		UnlockReleaseBuffer(buffer);
+		return NULL;
+	}
+	/* Walk along the chain */
+	while (!ItemIdHasStorage(lp))
+	{
+		offnum = ItemIdGetRedirect(lp);
+		lp = PageGetItemId(page, offnum);
+		Assert(ItemIdIsUsed(lp));
+	}
+
+	/* Form a tuple */
+	tuple = palloc(sizeof(HeapTupleData));
+	ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
+	tuple->t_tableOid = RelationGetRelid(rel);
+	tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
+	tuple->t_len = ItemIdGetLength(lp);
+	UnlockReleaseBuffer(buffer);
+	return tuple;
+}
+
+#include "access/nbtree.h"
+#include "catalog/index.h"
+#include "executor/executor.h"
+
+static int tid_comparator(const void* a, const void* b)
+{
+	return ItemPointerCompare((ItemPointer)a, (ItemPointer)b);
+}
+
+/*
+ *	quick_vacuum_index() -- quick vacuum one index relation.
+ *
+ *		Delete all the index entries pointing to tuples listed in
+ *		vacrelstats->dead_tuples.
+ */
+static void
+quick_vacuum_index(Relation irel, Relation hrel,
+				   LVRelStats *vacrelstats)
+{
+	IndexTargetDeleteResult	stats;
+	IndexTargetDeleteInfo	ivinfo;
+
+	if (irel->rd_amroutine->amtargetdelete != NULL)
+	{
+		int				tnum;
+		bool*			found = palloc0(vacrelstats->num_dead_tuples*sizeof(bool));
+		IndexInfo* 		indexInfo = BuildIndexInfo(irel);
+		EState*			estate = CreateExecutorState();
+		ExprContext*	econtext = GetPerTupleExprContext(estate);
+		ExprState*		predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+
+		ivinfo.indexRelation = irel;
+		ivinfo.heapRelation = hrel;
+		qsort((void *)vacrelstats->dead_tuples, vacrelstats->num_dead_tuples, sizeof(ItemPointerData), tid_comparator);
+		ivinfo.isSorted = true;
+
+		/* Get tuple from heap */
+		for (tnum = 0; tnum < vacrelstats->num_dead_tuples; tnum++)
+		{
+			HeapTuple		tuple;
+			TupleTableSlot*	slot;
+			Datum			values[INDEX_MAX_KEYS];
+			bool			isnull[INDEX_MAX_KEYS];
+
+			/* Index entry for the TID was deleted early */
+			if (found[tnum])
+				continue;
+
+			/* Get a tuple from heap */
+			if ((tuple = get_tuple_by_tid(hrel, &(vacrelstats->dead_tuples[tnum]))) == NULL)
+			{
+				/*
+				 * Tuple has 'not used' status.
+				 */
+				found[tnum] = true;
+				continue;
+			}
+
+			/*
+			 * Form values[] and isnull[] arrays from for index tuple
+			 * by heap tuple
+			 */
+			slot = MakeSingleTupleTableSlot(RelationGetDescr(hrel));
+			econtext->ecxt_scantuple = slot;
+
+			ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+			/*
+			 * In a partial index, ignore tuples that don't satisfy the
+			 * predicate.
+			 */
+			if ((predicate != NULL) && (!ExecQual(predicate, econtext)))
+			{
+				found[tnum] = true;
+				continue;
+			}
+
+			FormIndexDatum(indexInfo, slot, estate, values, isnull);
+
+			ExecDropSingleTupleTableSlot(slot);
+
+			/*
+			 * Make attempt to delete some index entries by one tree descent.
+			 * We use only a part of TID list, which contains not found TID's.
+			 */
+			ivinfo.dead_tuples = &(vacrelstats->dead_tuples[tnum]);
+			ivinfo.num_dead_tuples = vacrelstats->num_dead_tuples-tnum;
+			ivinfo.found_dead_tuples = found+tnum;
+			index_target_delete(&ivinfo, &stats, values, isnull);
+		}
+
+		pfree(found);
+		FreeExecutorState(estate);
+	}
+	else
+	{
+		IndexBulkDeleteResult **stats;
+
+		lazy_vacuum_index(irel, stats, vacrelstats);
+	}
+}
 
 /*
  *	lazy_vacuum_index() -- vacuum one index relation.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index fa3c8a7..3f6c83b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3236,7 +3236,15 @@ static struct config_real ConfigureNamesReal[] =
 		0.1, 0.0, 100.0,
 		NULL, NULL, NULL
 	},
-
+	{
+		{"target_index_deletion_factor", PGC_SIGHUP, AUTOVACUUM,
+			gettext_noop("Maximum number of vacuumed tuples as a fraction of reltuples where we can use target index vacuum strategy."),
+			NULL
+		},
+		&target_index_deletion_factor,
+		0.01, 0.0, 1.0,
+		NULL, NULL, NULL
+	},
 	{
 		{"checkpoint_completion_target", PGC_SIGHUP, WAL_CHECKPOINTS,
 			gettext_noop("Time spent flushing dirty buffers during checkpoint, as fraction of checkpoint interval."),
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index ca5cad7..c5a7bc8 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -100,6 +100,7 @@ extern Relation heap_openrv_extended(const RangeVar *relation,
 typedef struct HeapScanDescData *HeapScanDesc;
 typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc;
 
+extern double target_index_deletion_factor;
 /*
  * HeapScanIsValid
  *		True iff the heap scan is valid.
-- 
2.7.4

