diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index c8f7e781c6..8e7c7fe6ae 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -17,6 +17,7 @@
 #include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
+#include "access/heapam_xlog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -67,6 +68,7 @@ gistkillitems(IndexScanDesc scan)
 	{
 		UnlockReleaseBuffer(buffer);
 		so->numKilled = 0;		/* reset counter */
+		so->killedLatestRemovedXid = InvalidTransactionId;
 		return;
 	}
 
@@ -87,7 +89,9 @@ gistkillitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		GistMarkPageHasGarbage(page);
-		MarkBufferDirtyHint(buffer, true);
+		MarkBufferDirtyIndexHint(buffer, true,
+								 scan->indexRelation,
+								 so->killedLatestRemovedXid);
 	}
 
 	UnlockReleaseBuffer(buffer);
@@ -97,6 +101,7 @@ gistkillitems(IndexScanDesc scan)
 	 * pages.
 	 */
 	so->numKilled = 0;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 }
 
 /*
@@ -666,8 +671,12 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 						MemoryContextSwitchTo(oldCxt);
 					}
 					if (so->numKilled < MaxIndexTuplesPerPage)
+					{
 						so->killedItems[so->numKilled++] =
 							so->pageData[so->curPageData - 1].offnum;
+						IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+															&so->killedLatestRemovedXid);
+					}
 				}
 				/* continuing to return tuples from a leaf page */
 				scan->xs_heaptid = so->pageData[so->curPageData].heapPtr;
@@ -703,8 +712,12 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 					MemoryContextSwitchTo(oldCxt);
 				}
 				if (so->numKilled < MaxIndexTuplesPerPage)
+				{
 					so->killedItems[so->numKilled++] =
 						so->pageData[so->curPageData - 1].offnum;
+					IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+														&so->killedLatestRemovedXid);
+				}
 			}
 			/* find and process the next index page */
 			do
diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c
index 61e92cf0f5..b959ac5f17 100644
--- a/src/backend/access/gist/gistscan.c
+++ b/src/backend/access/gist/gistscan.c
@@ -107,6 +107,7 @@ gistbeginscan(Relation r, int nkeys, int norderbys)
 	}
 
 	so->killedItems = NULL;		/* until needed */
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 	so->curBlkno = InvalidBlockNumber;
 	so->curPageLSN = InvalidXLogRecPtr;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 0752fb38a9..2d3ae80ca7 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -20,6 +20,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "access/heapam_xlog.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "catalog/index.h"
@@ -311,7 +312,11 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 					palloc(MaxIndexTuplesPerPage * sizeof(int));
 
 			if (so->numKilled < MaxIndexTuplesPerPage)
+			{
 				so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+				IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+													&so->killedLatestRemovedXid);
+			}
 		}
 
 		/*
@@ -379,6 +384,7 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
 	so->hashso_buc_split = false;
 
 	so->killedItems = NULL;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 
 	scan->opaque = so;
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index 519872850e..7f9f13f115 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -545,6 +545,7 @@ _hash_kill_items(IndexScanDesc scan)
 	OffsetNumber offnum,
 				maxoff;
 	int			numKilled = so->numKilled;
+	TransactionId killedLatestRemovedXid = so->killedLatestRemovedXid;
 	int			i;
 	bool		killedsomething = false;
 	bool		havePin = false;
@@ -558,6 +559,7 @@ _hash_kill_items(IndexScanDesc scan)
 	 * pages.
 	 */
 	so->numKilled = 0;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 
 	blkno = so->currPos.currPage;
 	if (HashScanPosIsPinned(so->currPos))
@@ -611,7 +613,9 @@ _hash_kill_items(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
-		MarkBufferDirtyHint(buf, true);
+		MarkBufferDirtyIndexHint(buf, true,
+								 scan->indexRelation,
+								 killedLatestRemovedXid);
 	}
 
 	if (so->hashso_bucket_buf == so->currPos.buf ||
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9926e2bd54..c4f55b8268 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1527,9 +1527,11 @@ heap_fetch(Relation relation,
  * the tuple here, in addition to updating *tid.  If no match is found, the
  * contents of this buffer on return are undefined.
  *
- * If all_dead is not NULL, we check non-visible tuples to see if they are
- * globally dead; *all_dead is set true if all members of the HOT chain
- * are vacuumable, false if not.
+ * If indexHintBitsData is not NULL, we check non-visible tuples to see if they
+ * are globally dead; *all_dead is set true if all members of the HOT chain
+ * are vacuumable, false if not. Also, *latest_removed_xid is set to the
+ * latest removed xid in a HOT chain, if known. *page_lsn is set to current page
+ * LSN value.
  *
  * Unlike heap_fetch, the caller must already have pin and (at least) share
  * lock on the buffer; it is still pinned/locked at exit.  Also unlike
@@ -1538,7 +1540,7 @@ heap_fetch(Relation relation,
 bool
 heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 					   Snapshot snapshot, HeapTuple heapTuple,
-					   bool *all_dead, bool first_call)
+					   IndexHintBitsData *indexHintBitsData, bool first_call)
 {
 	Page		dp = (Page) BufferGetPage(buffer);
 	TransactionId prev_xmax = InvalidTransactionId;
@@ -1550,8 +1552,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 	GlobalVisState *vistest = NULL;
 
 	/* If this is not the first call, previous call returned a (live!) tuple */
-	if (all_dead)
-		*all_dead = first_call;
+	if (indexHintBitsData)
+	{
+		indexHintBitsData->all_dead = first_call;
+		indexHintBitsData->latest_removed_xid = InvalidTransactionId;
+		indexHintBitsData->page_lsn = PageGetLSN(dp);
+	}
 
 	blkno = ItemPointerGetBlockNumber(tid);
 	offnum = ItemPointerGetOffsetNumber(tid);
@@ -1584,6 +1590,13 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 				at_chain_start = false;
 				continue;
 			}
+			/*
+			 * Even if all items are dead we are not sure about latest_removed_xid
+			 * value. In theory, some newer items of the chain could be vacuumed
+			 * while older are not (pure paranoia, probably).
+			 */
+			if (indexHintBitsData)
+				indexHintBitsData->latest_removed_xid = InvalidTransactionId;
 			/* else must be end of chain */
 			break;
 		}
@@ -1633,8 +1646,11 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 				ItemPointerSetOffsetNumber(tid, offnum);
 				PredicateLockTID(relation, &heapTuple->t_self, snapshot,
 								 HeapTupleHeaderGetXmin(heapTuple->t_data));
-				if (all_dead)
-					*all_dead = false;
+				if (indexHintBitsData)
+				{
+					indexHintBitsData->all_dead = false;
+					indexHintBitsData->latest_removed_xid = InvalidTransactionId;
+				}
 				return true;
 			}
 		}
@@ -1648,13 +1664,19 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 		 * Note: if you change the criterion here for what is "dead", fix the
 		 * planner's get_actual_variable_range() function to match.
 		 */
-		if (all_dead && *all_dead)
+		if (indexHintBitsData && indexHintBitsData->all_dead)
 		{
 			if (!vistest)
 				vistest = GlobalVisTestFor(relation);
 
 			if (!HeapTupleIsSurelyDead(heapTuple, vistest))
-				*all_dead = false;
+			{
+				indexHintBitsData->all_dead = false;
+				indexHintBitsData->latest_removed_xid = InvalidTransactionId;
+			}
+			else
+				HeapTupleHeaderAdvanceLatestRemovedXid(heapTuple->t_data,
+											&indexHintBitsData->latest_removed_xid);
 		}
 
 		/*
@@ -7085,6 +7107,20 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 	/* *latestRemovedXid may still be invalid at end */
 }
 
+void
+IndexHintBitAdvanceLatestRemovedXid(TransactionId killedTupleRemovedXid,
+									TransactionId *latestRemovedXid)
+{
+	if (TransactionIdIsNormal(killedTupleRemovedXid))
+	{
+		if (!TransactionIdIsValid(*latestRemovedXid))
+			*latestRemovedXid = killedTupleRemovedXid;
+		else
+			*latestRemovedXid =
+			TransactionIdLatest(killedTupleRemovedXid, 1, latestRemovedXid);
+	}
+}
+
 #ifdef USE_PREFETCH
 /*
  * Helper function for heap_index_delete_tuples.  Issues prefetch requests for
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 4a70e20a14..6362a71d35 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -113,7 +113,8 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 						 ItemPointer tid,
 						 Snapshot snapshot,
 						 TupleTableSlot *slot,
-						 bool *call_again, bool *all_dead)
+						 bool *call_again,
+						 IndexHintBitsData *indexHintBitsData)
 {
 	IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
@@ -145,7 +146,7 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 											hscan->xs_cbuf,
 											snapshot,
 											&bslot->base.tupdata,
-											all_dead,
+											indexHintBitsData,
 											!*call_again);
 	bslot->base.tupdata.t_self = *tid;
 	LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 1c3e937c61..359a39e1d8 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -20,7 +20,6 @@
 #include "postgres.h"
 
 #include "access/genam.h"
-#include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -28,6 +27,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -106,18 +106,16 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
 	scan->xs_want_itup = false; /* may be set later */
 
 	/*
-	 * During recovery we ignore killed tuples and don't bother to kill them
-	 * either. We do this because the xmin on the primary node could easily be
-	 * later than the xmin on the standby node, so that what the primary
-	 * thinks is killed is supposed to be visible on standby. So for correct
-	 * MVCC for queries during recovery we must ignore these hints and check
-	 * all tuples. Do *not* set ignore_killed_tuples to true when running in a
-	 * transaction that was started during recovery. xactStartedInRecovery
-	 * should not be altered by index AMs.
-	 */
+	 * For correct MVCC for queries during recovery, we could use
+	 * index hint bits as on the primary. But to avoid frequent query
+	 * cancellation we do it only if hot_standby_feedback is active and
+	 * our xmin is honored on the primary.
+	 *
+	 * The decision is made in GetSnapshotIndexIgnoreKilledTuples.
+	*/
 	scan->kill_prior_tuple = false;
-	scan->xactStartedInRecovery = TransactionStartedDuringRecovery();
-	scan->ignore_killed_tuples = !scan->xactStartedInRecovery;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
+	scan->ignore_killed_tuples = MyProc->indexIgnoreKilledTuples;
 
 	scan->opaque = NULL;
 
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 3d2dbed708..dee80b8ef7 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -309,6 +309,7 @@ index_rescan(IndexScanDesc scan,
 		table_index_fetch_reset(scan->xs_heapfetch);
 
 	scan->kill_prior_tuple = false; /* for safety */
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
@@ -386,6 +387,7 @@ index_restrpos(IndexScanDesc scan)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
 	scan->kill_prior_tuple = false; /* for safety */
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	scan->indexRelation->rd_indam->amrestrpos(scan);
@@ -534,6 +536,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 
 	/* Reset kill flag immediately for safety */
 	scan->kill_prior_tuple = false;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	/* If we're out of index entries, we're done */
@@ -574,12 +577,17 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 bool
 index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 {
-	bool		all_dead = false;
-	bool		found;
+	IndexHintBitsData	ihbd;
+	bool				found;
+
+	ihbd.all_dead = false;
+	ihbd.latest_removed_xid = InvalidTransactionId;
+	ihbd.page_lsn = InvalidXLogRecPtr;
 
 	found = table_index_fetch_tuple(scan->xs_heapfetch, &scan->xs_heaptid,
 									scan->xs_snapshot, slot,
-									&scan->xs_heap_continue, &all_dead);
+									&scan->xs_heap_continue,
+									&ihbd);
 
 	if (found)
 		pgstat_count_heap_fetch(scan->indexRelation);
@@ -587,13 +595,15 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 	/*
 	 * If we scanned a whole HOT chain and found only dead tuples, tell index
 	 * AM to kill its entry for that TID (this will take effect in the next
-	 * amgettuple call, in index_getnext_tid).  We do not do this when in
-	 * recovery because it may violate MVCC to do so.  See comments in
-	 * RelationGetIndexScan().
+	 * amgettuple call, in index_getnext_tid). We do this when in
+	 * recovery only in certain conditions because it may violate MVCC.
 	 */
-	if (!scan->xactStartedInRecovery)
-		scan->kill_prior_tuple = all_dead;
-
+	if (scan->ignore_killed_tuples)
+	{
+		scan->kill_prior_tuple = IsMarkBufferDirtyIndexHintAllowed(&ihbd);
+		scan->prior_tuple_removed_xid = scan->kill_prior_tuple ?
+								ihbd.latest_removed_xid : InvalidTransactionId;
+	}
 	return found;
 }
 
@@ -667,6 +677,7 @@ index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap)
 
 	/* just make sure this is false... */
 	scan->kill_prior_tuple = false;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 
 	/*
 	 * have the am's getbitmap proc do all the work.
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e333603912..954cbe5562 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -502,7 +502,11 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 			if (inposting || !ItemIdIsDead(curitemid))
 			{
 				ItemPointerData htid;
-				bool		all_dead = false;
+				IndexHintBitsData ihbd;
+
+				ihbd.all_dead = false;
+				ihbd.latest_removed_xid = InvalidTransactionId;
+				ihbd.page_lsn = InvalidXLogRecPtr;
 
 				if (!inposting)
 				{
@@ -556,7 +560,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				 */
 				else if (table_index_fetch_tuple_check(heapRel, &htid,
 													   &SnapshotDirty,
-													   &all_dead))
+													   &ihbd))
 				{
 					TransactionId xwait;
 
@@ -670,7 +674,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 													RelationGetRelationName(rel))));
 					}
 				}
-				else if (all_dead && (!inposting ||
+				else if (ihbd.all_dead && (!inposting ||
 									  (prevalldead &&
 									   curposti == BTreeTupleGetNPosting(curitup) - 1)))
 				{
@@ -687,16 +691,18 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 					 * crucial. Be sure to mark the proper buffer dirty.
 					 */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						MarkBufferDirtyIndexHint(nbuf, true,
+												 rel, ihbd.latest_removed_xid);
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						MarkBufferDirtyIndexHint(insertstate->buf, true,
+												 rel, ihbd.latest_removed_xid);
 				}
 
 				/*
 				 * Remember if posting list tuple has even a single HOT chain
 				 * whose members are not all dead
 				 */
-				if (!all_dead && inposting)
+				if (!ihbd.all_dead && inposting)
 					prevalldead = false;
 			}
 		}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 289bd3c15d..c35a34003b 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -21,7 +21,7 @@
 #include "access/nbtree.h"
 #include "access/nbtxlog.h"
 #include "access/relscan.h"
-#include "access/xlog.h"
+#include "access/heapam_xlog.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
@@ -272,7 +272,11 @@ btgettuple(IndexScanDesc scan, ScanDirection dir)
 					so->killedItems = (int *)
 						palloc(MaxTIDsPerBTreePage * sizeof(int));
 				if (so->numKilled < MaxTIDsPerBTreePage)
+				{
 					so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+					IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+														&so->killedLatestRemovedXid);
+				}
 			}
 
 			/*
@@ -378,6 +382,7 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
 	so->arrayContext = NULL;
 
 	so->killedItems = NULL;		/* until needed */
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 
 	/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index d524310723..bfa1c357c3 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1724,6 +1724,7 @@ _bt_killitems(IndexScanDesc scan)
 	OffsetNumber maxoff;
 	int			i;
 	int			numKilled = so->numKilled;
+	TransactionId killedLatestRemovedXid = so->killedLatestRemovedXid;
 	bool		killedsomething = false;
 	bool		droppedpin PG_USED_FOR_ASSERTS_ONLY;
 
@@ -1734,6 +1735,7 @@ _bt_killitems(IndexScanDesc scan)
 	 * pages.
 	 */
 	so->numKilled = 0;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 
 	if (BTScanPosIsPinned(so->currPos))
 	{
@@ -1883,7 +1885,9 @@ _bt_killitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
-		MarkBufferDirtyHint(so->currPos.buf, true);
+		MarkBufferDirtyIndexHint(so->currPos.buf, true,
+								 scan->indexRelation,
+								 killedLatestRemovedXid);
 	}
 
 	_bt_unlockbuf(scan->indexRelation, so->currPos.buf);
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 01ee7ac6d2..32ebd730f1 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -36,6 +36,16 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 		appendStringInfoString(buf, "; subxid ovf");
 }
 
+static void
+standby_desc_index_hint_bits_horizon(StringInfo buf,
+									 xl_index_hint_bits_horizon *xlrec)
+{
+	char		*path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+	appendStringInfo(buf, "latestRemovedXid %u in %s",
+					 xlrec->latestRemovedXid, path);
+}
+
 void
 standby_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -66,6 +76,12 @@ standby_desc(StringInfo buf, XLogReaderState *record)
 								   xlrec->dbId, xlrec->tsId,
 								   xlrec->relcacheInitFileInval);
 	}
+	else if (info == XLOG_INDEX_HINT_BITS_HORIZON)
+	{
+		xl_index_hint_bits_horizon *xlrec = (xl_index_hint_bits_horizon *) rec;
+
+		standby_desc_index_hint_bits_horizon(buf, xlrec);
+	}
 }
 
 const char *
@@ -84,6 +100,9 @@ standby_identify(uint8 info)
 		case XLOG_INVALIDATIONS:
 			id = "INVALIDATIONS";
 			break;
+		case XLOG_INDEX_HINT_BITS_HORIZON:
+			id = "INDEX_HINT_BITS_HORIZON";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 5ea5bdd810..bb9a0ddc18 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -219,7 +219,7 @@ bool
 table_index_fetch_tuple_check(Relation rel,
 							  ItemPointer tid,
 							  Snapshot snapshot,
-							  bool *all_dead)
+							  IndexHintBitsData *indexHintBitsData)
 {
 	IndexFetchTableData *scan;
 	TupleTableSlot *slot;
@@ -229,7 +229,7 @@ table_index_fetch_tuple_check(Relation rel,
 	slot = table_slot_create(rel, NULL);
 	scan = table_index_fetch_begin(rel);
 	found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again,
-									all_dead);
+									indexHintBitsData);
 	table_index_fetch_end(scan);
 	ExecDropSingleTupleTableSlot(slot);
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f75b52719d..f4c1f830d6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4104,6 +4104,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT:
 			event_name = "RecoveryConflictSnapshot";
 			break;
+		case WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS:
+			event_name = "RecoveryConflictSnapshotIndexHintBits";
+			break;
 		case WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE:
 			event_name = "RecoveryConflictTablespace";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df00d0..8289f055b0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -411,6 +411,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
 			 */
 			break;
+		case XLOG_INDEX_HINT_BITS_HORIZON:
+			break;
 		default:
 			elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e903e561af..401a7d5693 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -577,6 +577,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 #endif
 
 	MyProc->xmin = snap->xmin;
+	// to keep it simple use index hint bits on the primary only
+	MyProc->indexIgnoreKilledTuples = !RecoveryInProgress();
 
 	/* allocate in transaction context */
 	newxip = (TransactionId *)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 723f513d8b..1199dffc45 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -225,6 +225,9 @@ WalReceiverMain(void)
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
 	walrcv->walRcvState = WALRCV_STREAMING;
+	/* Initially true so we always send at least one feedback message */
+	walrcv->sender_has_standby_xmin = true;
+	walrcv->sender_propagates_feedback_to_primary = false;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -806,6 +809,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 	XLogRecPtr	walEnd;
 	TimestampTz sendTime;
 	bool		replyRequested;
+	bool		senderPropagatesFeedbackToPrimary;
 
 	resetStringInfo(&incoming_message);
 
@@ -835,7 +839,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 		case 'k':				/* Keepalive */
 			{
 				/* copy message to StringInfo */
-				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(char);
 				if (len != hdrlen)
 					ereport(ERROR,
 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -846,8 +850,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				walEnd = pq_getmsgint64(&incoming_message);
 				sendTime = pq_getmsgint64(&incoming_message);
 				replyRequested = pq_getmsgbyte(&incoming_message);
+				senderPropagatesFeedbackToPrimary = pq_getmsgbyte(&incoming_message);
 
 				ProcessWalSndrMessage(walEnd, sendTime);
+				WalRcv->sender_propagates_feedback_to_primary = senderPropagatesFeedbackToPrimary;
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
@@ -1110,15 +1116,13 @@ XLogWalRcvSendHSFeedback(bool immed)
 				catalog_xmin;
 	static TimestampTz sendTime = 0;
 
-	/* initially true so we always send at least one feedback message */
-	static bool primary_has_standby_xmin = true;
 
 	/*
 	 * If the user doesn't want status to be reported to the primary, be sure
 	 * to exit before doing anything at all.
 	 */
 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
-		!primary_has_standby_xmin)
+		!WalRcv->sender_has_standby_xmin)
 		return;
 
 	/* Get current timestamp. */
@@ -1188,9 +1192,9 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint32(&reply_message, catalog_xmin_epoch);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
-		primary_has_standby_xmin = true;
+		WalRcv->sender_has_standby_xmin = true;
 	else
-		primary_has_standby_xmin = false;
+		WalRcv->sender_has_standby_xmin = false;
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8545c6c423..71fe08e1ab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2169,6 +2169,12 @@ ProcessStandbyHSFeedbackMessage(void)
 		else
 			MyProc->xmin = feedbackXmin;
 	}
+
+	/*
+	 * Always send keep-alive after feedback to allow standby to maintain
+	 * WalRcv->sender_propagates_feedback_to_primary.
+	 */
+	WalSndKeepalive(false);
 }
 
 /*
@@ -3450,7 +3456,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply)
 {
+	bool am_propagating_feedback_to_primary;
 	elog(DEBUG2, "sending replication keepalive");
+	am_propagating_feedback_to_primary = !am_cascading_walsender
+		|| (WalRcv->sender_has_standby_xmin && WalRcv->sender_propagates_feedback_to_primary);
 
 	/* construct the message... */
 	resetStringInfo(&output_message);
@@ -3458,6 +3467,7 @@ WalSndKeepalive(bool requestReply)
 	pq_sendint64(&output_message, sentPtr);
 	pq_sendint64(&output_message, GetCurrentTimestamp());
 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
+	pq_sendbyte(&output_message, am_propagating_feedback_to_primary ? 1 : 0);
 
 	/* ... and send it wrapped in CopyData */
 	pq_putmessage_noblock('d', output_message.data, output_message.len);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 561c212092..63e60613c4 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3898,6 +3898,56 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	}
 }
 
+/*
+ * MarkBufferDirtyIndexHint
+ *
+ * This is essentially the same as MarkBufferDirtyHint, except it WAL log
+ * new value for index hint bits horizon if required.
+ *
+ * Should be used instead of MarkBufferDirtyHint for LP_DEAD hints in indexes.
+ */
+void
+MarkBufferDirtyIndexHint(Buffer buffer, bool buffer_std,
+						 Relation rel, TransactionId latestRemovedXid)
+{
+	LogIndexHintBitsHorizonIfNeeded(rel, latestRemovedXid);
+	MarkBufferDirtyHint(buffer, buffer_std);
+}
+
+/*
+ * IsMarkBufferDirtyIndexHintAllowed
+ *
+ * Checks is it allowed to set index hint bit for the tuple.
+ */
+bool
+IsMarkBufferDirtyIndexHintAllowed(IndexHintBitsData *indexHintBitsData)
+{
+	if (!indexHintBitsData->all_dead)
+		return false;
+	// it all always allowed on primary if *all_dead
+	if (!RecoveryInProgress())
+		return true;
+
+	if (TransactionIdIsValid(indexHintBitsData->latest_removed_xid)) {
+		/*
+		 * If latest_removed_xid is known - make sure its commit record
+		 * less than minRecoveryPoint to avoid MVCC failure after crash recovery.
+		 */
+		XLogRecPtr commitLSN
+				= TransactionIdGetCommitLSN(indexHintBitsData->latest_removed_xid);
+
+		return !XLogNeedsFlush(commitLSN);
+	} else {
+		/*
+		 * Looks like it is tuple cleared by heap_page_prune_execute,
+		 * so conflict resolution already done. But we must be sure if
+		 * LSN of XLOG_HEAP2_CLEAN (or any subsequent updates) less than
+		 * minRecoveryPoint to avoid MVCC failure after crash recovery.
+		 */
+		return !XLogNeedsFlush(indexHintBitsData->page_lsn);
+	}
+}
+
 /*
  * Release buffer content locks for shared buffers.
  *
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index f9bbe97b50..ce1b8f628a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -267,6 +267,7 @@ CreateSharedMemoryAndSemaphores(void)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	StandByShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cf12eda504..07863dbb2d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -65,8 +65,10 @@
 #include "utils/builtins.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
+#include "replication/walreceiver.h"
 
 #define UINT32_ACCESS_ONCE(var)		 ((uint32)(*((volatile uint32 *)&(var))))
+#define BOOL_ACCESS_ONCE(var)		 ((bool)(*((volatile bool *)&(var))))
 
 /* Our shared memory area */
 typedef struct ProcArrayStruct
@@ -655,6 +657,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
 		proc->lxid = InvalidLocalTransactionId;
 		proc->xmin = InvalidTransactionId;
+		proc->indexIgnoreKilledTuples = false;
 		proc->delayChkpt = false;	/* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
 
@@ -694,6 +697,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
 	proc->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
+	proc->indexIgnoreKilledTuples = false;
 	proc->delayChkpt = false;	/* be sure this is cleared in abort */
 	proc->recoveryConflictPending = false;
 
@@ -877,6 +881,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
+	proc->indexIgnoreKilledTuples = false;
 	proc->recoveryConflictPending = false;
 
 	Assert(!(proc->statusFlags & PROC_VACUUM_STATE_MASK));
@@ -2013,6 +2018,23 @@ GetSnapshotDataInitOldSnapshot(Snapshot snapshot)
 	}
 }
 
+static bool
+GetSnapshotIndexIgnoreKilledTuples(Snapshot snapshot)
+{
+	/*
+	 * Always use and set LP_DEAD bits on primary. In case of standby
+	 * only if hot_standby_feedback enabled, walsender has our xmin
+	 * and walsender propagates feedback up to the primary (to avoid
+	 * unnecessary cancellations).
+	 *
+	 * It is always safe to set it to true but could cause high
+	 * rate of conflicts.
+	*/
+	Assert(!RecoveryInProgress() || WalRcv);
+	return !snapshot->takenDuringRecovery ||
+		(WalRcv->sender_propagates_feedback_to_primary && WalRcv->sender_has_standby_xmin);
+}
+
 /*
  * Helper function for GetSnapshotData() that checks if the bulk of the
  * visibility information in the snapshot is still valid. If so, it updates
@@ -2057,7 +2079,10 @@ GetSnapshotDataReuse(Snapshot snapshot)
 	 * xmin.
 	 */
 	if (!TransactionIdIsValid(MyProc->xmin))
+	{
 		MyProc->xmin = TransactionXmin = snapshot->xmin;
+		MyProc->indexIgnoreKilledTuples = GetSnapshotIndexIgnoreKilledTuples(snapshot);
+	}
 
 	RecentXmin = snapshot->xmin;
 	Assert(TransactionIdPrecedesOrEquals(TransactionXmin, RecentXmin));
@@ -2345,7 +2370,10 @@ GetSnapshotData(Snapshot snapshot)
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	if (!TransactionIdIsValid(MyProc->xmin))
+	{
 		MyProc->xmin = TransactionXmin = xmin;
+		MyProc->indexIgnoreKilledTuples = GetSnapshotIndexIgnoreKilledTuples(snapshot);
+	}
 
 	LWLockRelease(ProcArrayLock);
 
@@ -2524,6 +2552,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
 		 * we don't check that.)
 		 */
 		MyProc->xmin = TransactionXmin = xmin;
+		// no need to change indexIgnoreKilledTuples because restriction is relaxed.
 
 		result = true;
 		break;
@@ -2567,6 +2596,8 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
 		TransactionIdPrecedesOrEquals(xid, xmin))
 	{
 		MyProc->xmin = TransactionXmin = xmin;
+		// we could also copy indexIgnoreKilledTuples, could be useful for parallel scans
+		MyProc->indexIgnoreKilledTuples = proc->indexIgnoreKilledTuples;
 		result = true;
 	}
 
@@ -3245,11 +3276,15 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
  *
  * If dbOid is valid we skip backends attached to other databases.
  *
+ * If onlyIndexIgnoreKilledTuples is true we include only backends
+ * with indexIgnoreKilledTuples set.
+ *
  * Be careful to *not* pfree the result from this function. We reuse
  * this array sufficiently often that we use malloc for the result.
  */
 VirtualTransactionId *
-GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+						  bool onlyIndexIgnoreKilledTuples)
 {
 	static VirtualTransactionId *vxids;
 	ProcArrayStruct *arrayP = procArray;
@@ -3287,6 +3322,8 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
 		{
 			/* Fetch xmin just once - can't change on us, but good coding */
 			TransactionId pxmin = UINT32_ACCESS_ONCE(proc->xmin);
+			bool indexIgnoreKilledTuples =
+				BOOL_ACCESS_ONCE(proc->indexIgnoreKilledTuples);
 
 			/*
 			 * We ignore an invalid pxmin because this means that backend has
@@ -3297,7 +3334,8 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
 			 * test here.
 			 */
 			if (!TransactionIdIsValid(limitXmin) ||
-				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
+				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin) &&
+					(!onlyIndexIgnoreKilledTuples || indexIgnoreKilledTuples)))
 			{
 				VirtualTransactionId vxid;
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 39a30c00f7..3cffd64161 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -42,6 +42,7 @@ int			max_standby_streaming_delay = 30 * 1000;
 bool		log_recovery_conflict_waits = false;
 
 static HTAB *RecoveryLockLists;
+static HTAB *IndexHintBitsHorizons;
 
 /* Flags set by timeout handlers */
 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
@@ -65,6 +66,12 @@ typedef struct RecoveryLockListsEntry
 	List	   *locks;
 } RecoveryLockListsEntry;
 
+typedef struct IndexHintBitsHorizonsEntry
+{
+	Oid				dbOid;
+	TransactionId	hintHorizonXid;
+} IndexHintBitsHorizonsEntry;
+
 /*
  * InitRecoveryTransactionEnvironment
  *		Initialize tracking of our primary's in-progress transactions.
@@ -425,7 +432,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
@@ -444,7 +452,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 		return;
 
 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
-										 node.dbNode);
+										 node.dbNode, false);
 
 	ResolveRecoveryConflictWithVirtualXIDs(backends,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
@@ -452,6 +460,22 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 										   true);
 }
 
+void
+ResolveIndexHintBitsRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+												 RelFileNode node)
+{
+	VirtualTransactionId *backends;
+
+	backends = GetConflictingVirtualXIDs(latestRemovedXid,
+										 node.dbNode, true);
+
+	ResolveRecoveryConflictWithVirtualXIDs(
+			backends,
+			PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+			WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS,
+			true);
+}
+
 void
 ResolveRecoveryConflictWithTablespace(Oid tsid)
 {
@@ -475,7 +499,7 @@ ResolveRecoveryConflictWithTablespace(Oid tsid)
 	 * We don't wait for commit because drop tablespace is non-transactional.
 	 */
 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
-												InvalidOid);
+												InvalidOid, false);
 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 										   WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
@@ -1026,6 +1050,43 @@ StandbyReleaseOldLocks(TransactionId oldxid)
 	}
 }
 
+static bool
+IsNewerIndexHintBitsHorizonXid(Oid dbOid, TransactionId latestRemovedXid)
+{
+	bool found, result;
+	IndexHintBitsHorizonsEntry* entry;
+	Assert(TransactionIdIsNormal(latestRemovedXid));
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_SHARED);
+	entry = (IndexHintBitsHorizonsEntry *) hash_search(IndexHintBitsHorizons, &dbOid,
+													   HASH_FIND, &found);
+
+	result = !found || TransactionIdPrecedes(entry->hintHorizonXid, latestRemovedXid);
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+
+	return result;
+}
+
+static void
+UpsertLatestIndexHintBitsHorizonXid(Oid dbOid, TransactionId latestRemovedXid)
+{
+
+	bool found;
+	IndexHintBitsHorizonsEntry* entry;
+	Assert(TransactionIdIsNormal(latestRemovedXid));
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_EXCLUSIVE);
+
+	entry = (IndexHintBitsHorizonsEntry *) hash_search(IndexHintBitsHorizons, &dbOid,
+													   HASH_ENTER, &found);
+
+	if (!found || TransactionIdPrecedes(entry->hintHorizonXid, latestRemovedXid))
+		entry->hintHorizonXid = latestRemovedXid;
+
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+}
+
+
 /*
  * --------------------------------------------------------------------
  *		Recovery handling for Rmgr RM_STANDBY_ID
@@ -1081,6 +1142,16 @@ standby_redo(XLogReaderState *record)
 											 xlrec->dbId,
 											 xlrec->tsId);
 	}
+	else if (info == XLOG_INDEX_HINT_BITS_HORIZON) {
+		if (InHotStandby) {
+			xl_index_hint_bits_horizon *xlrec =
+					(xl_index_hint_bits_horizon *) XLogRecGetData(record);
+
+			ResolveIndexHintBitsRecoveryConflictWithSnapshot(
+												xlrec->latestRemovedXid,
+												xlrec->rnode);
+		}
+	}
 	else
 		elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1381,3 +1452,49 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 
 	return reasonDesc;
 }
+
+static void
+LogIndexHintBitsHorizon(RelFileNode rnode, TransactionId latestRemovedXid)
+{
+	xl_index_hint_bits_horizon xlrec;
+
+	xlrec.rnode = rnode;
+	xlrec.latestRemovedXid = latestRemovedXid;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_index_hint_bits_horizon));
+
+	XLogInsert(RM_STANDBY_ID, XLOG_INDEX_HINT_BITS_HORIZON);
+}
+
+void
+LogIndexHintBitsHorizonIfNeeded(Relation rel, TransactionId latestRemovedXid)
+{
+	if (!RecoveryInProgress() && XLogStandbyInfoActive() &&
+			TransactionIdIsNormal(latestRemovedXid) && RelationNeedsWAL(rel)) {
+		if (IsNewerIndexHintBitsHorizonXid(rel->rd_node.dbNode, latestRemovedXid))
+		{
+			LogIndexHintBitsHorizon(rel->rd_node, latestRemovedXid);
+			UpsertLatestIndexHintBitsHorizonXid(rel->rd_node.dbNode,
+												latestRemovedXid);
+		}
+	}
+}
+
+void
+StandByShmemInit(void)
+{
+	HASHCTL		info;
+
+	MemSet(&info, 0, sizeof(info));
+	info.keysize = sizeof(Oid);
+	info.entrysize = sizeof(IndexHintBitsHorizonsEntry);
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_EXCLUSIVE);
+
+	IndexHintBitsHorizons = ShmemInitHash("IndexHintBitsHorizons",
+										  64, 64,
+										  &info, HASH_ELEM | HASH_BLOBS);
+
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+}
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 6c7cf6c295..dcaecee8f1 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -53,3 +53,4 @@ XactTruncationLock					44
 # 45 was XactTruncationLock until removal of BackendRandomLock
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
+IndexHintBitsHorizonShmemLock		48
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index c87ffc6549..2da7eb69da 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -386,6 +386,7 @@ InitProcess(void)
 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->indexIgnoreKilledTuples = false;
 	MyProc->pid = MyProcPid;
 	/* backendId, databaseId and roleId will be filled in later */
 	MyProc->backendId = InvalidBackendId;
@@ -569,6 +570,7 @@ InitAuxiliaryProcess(void)
 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->indexIgnoreKilledTuples = false;
 	MyProc->backendId = InvalidBackendId;
 	MyProc->databaseId = InvalidOid;
 	MyProc->roleId = InvalidOid;
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index ae16c3ed7d..bed98d6436 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -528,6 +528,10 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
 	 * the state for GlobalVis*.
 	 */
 	CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+	/* To keep it simple, use index hint bits only on the primary for imported
+	 * snapshots.
+	 */
+	MyProc->indexIgnoreKilledTuples = !RecoveryInProgress();
 
 	/*
 	 * Now copy appropriate fields from the source snapshot.
@@ -932,6 +936,7 @@ SnapshotResetXmin(void)
 	if (pairingheap_is_empty(&RegisteredSnapshots))
 	{
 		MyProc->xmin = InvalidTransactionId;
+		MyProc->indexIgnoreKilledTuples = false;
 		return;
 	}
 
@@ -939,6 +944,7 @@ SnapshotResetXmin(void)
 										pairingheap_first(&RegisteredSnapshots));
 
 	if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
+		// no need to change indexIgnoreKilledTuples here because xmin restriction is relaxed
 		MyProc->xmin = minSnapshot->xmin;
 }
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 553d364e2d..97c97c13c2 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -165,8 +165,9 @@ typedef struct GISTScanOpaqueData
 	IndexOrderByDistance *distances;	/* output area for gistindex_keytest */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
-	OffsetNumber *killedItems;	/* offset numbers of killed items */
-	int			numKilled;		/* number of currently stored items */
+	OffsetNumber *killedItems;			  /* offset numbers of killed items */
+	TransactionId killedLatestRemovedXid; /* latest removed xid of all killed items */
+	int			  numKilled;			  /* number of currently stored items */
 	BlockNumber curBlkno;		/* current number of block */
 	GistNSN		curPageLSN;		/* pos in the WAL stream when page was read */
 
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index 1cce865be2..a3fc82192e 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -177,8 +177,9 @@ typedef struct HashScanOpaqueData
 	 */
 	bool		hashso_buc_split;
 	/* info about killed items if any (killedItems is NULL if never used) */
-	int		   *killedItems;	/* currPos.items indexes of killed items */
-	int			numKilled;		/* number of currently stored items */
+	int			 *killedItems;			  /* currPos.items indexes of killed items */
+	TransactionId killedLatestRemovedXid; /* latest removed xid of all killed items */
+	int			  numKilled;			  /* number of currently stored items */
 
 	/*
 	 * Identify all the matching items on a page and save them in
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index d96a47b1ce..7ffaac53ec 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -126,7 +126,7 @@ extern bool heap_fetch(Relation relation, Snapshot snapshot,
 					   HeapTuple tuple, Buffer *userbuf);
 extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
 								   Buffer buffer, Snapshot snapshot, HeapTuple heapTuple,
-								   bool *all_dead, bool first_call);
+								   IndexHintBitsData *indexHintBitsData, bool first_call);
 
 extern void heap_get_latest_tid(TableScanDesc scan, ItemPointer tid);
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 178d49710a..b49c3b4dc7 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -387,6 +387,8 @@ typedef struct xl_heap_rewrite_mapping
 
 extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 												   TransactionId *latestRemovedXid);
+extern void IndexHintBitAdvanceLatestRemovedXid(TransactionId killedTupleRemovedXid,
+												TransactionId *latestRemovedXid);
 
 extern void heap_redo(XLogReaderState *record);
 extern void heap_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index cad4f2bdeb..10257821fa 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -920,8 +920,9 @@ typedef struct BTScanOpaqueData
 	MemoryContext arrayContext; /* scan-lifespan context for array data */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
-	int		   *killedItems;	/* currPos.items indexes of killed items */
-	int			numKilled;		/* number of currently stored items */
+	int				*killedItems;			/* currPos.items indexes of killed items */
+	TransactionId	 killedLatestRemovedXid;/* latest removed xid of all killed items */
+	int				 numKilled;				/* number of currently stored items */
 
 	/*
 	 * If we are doing an index-only scan, these are the tuple storage
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 005f3fdd2b..82383f02f3 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -122,10 +122,9 @@ typedef struct IndexScanDescData
 	bool		xs_temp_snap;	/* unregister snapshot at scan end? */
 
 	/* signaling to index AM about killing index tuples */
-	bool		kill_prior_tuple;	/* last-returned tuple is dead */
-	bool		ignore_killed_tuples;	/* do not return killed entries */
-	bool		xactStartedInRecovery;	/* prevents killing/seeing killed
-										 * tuples */
+	bool			kill_prior_tuple;		 /* last-returned tuple is dead */
+	TransactionId	prior_tuple_removed_xid; /* removed fix for dead tuple */
+	bool			ignore_killed_tuples;	 /* do not return killed entries */
 
 	/* index access method's private state */
 	void	   *opaque;			/* access-method-specific info */
@@ -185,4 +184,12 @@ typedef struct SysScanDescData
 	struct TupleTableSlot *slot;
 }			SysScanDescData;
 
+/* Struct for data about visibility of tuple */
+typedef struct IndexHintBitsData
+{
+	bool			all_dead;			/* guaranteed not visible for all backends */
+	TransactionId	latest_removed_xid;	/* latest removed xid if known */
+	XLogRecPtr		page_lsn;			/* lsn of page where dead tuple located */
+}			IndexHintBitsData;
+
 #endif							/* RELSCAN_H */
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 33bffb6815..31659fb1c4 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -394,7 +394,7 @@ typedef struct TableAmRoutine
 	 * needs to be set to true by index_fetch_tuple, signaling to the caller
 	 * that index_fetch_tuple should be called again for the same tid.
 	 *
-	 * *all_dead, if all_dead is not NULL, should be set to true by
+	 * *indexHintBitsData, if value is not NULL, should be filled by
 	 * index_fetch_tuple iff it is guaranteed that no backend needs to see
 	 * that tuple. Index AMs can use that to avoid returning that tid in
 	 * future searches.
@@ -403,7 +403,8 @@ typedef struct TableAmRoutine
 									  ItemPointer tid,
 									  Snapshot snapshot,
 									  TupleTableSlot *slot,
-									  bool *call_again, bool *all_dead);
+									  bool *call_again,
+									  IndexHintBitsData *indexHintBitsData);
 
 
 	/* ------------------------------------------------------------------------
@@ -1107,7 +1108,7 @@ table_index_fetch_end(struct IndexFetchTableData *scan)
  * will be set to true, signaling that table_index_fetch_tuple() should be called
  * again for the same tid.
  *
- * *all_dead, if all_dead is not NULL, will be set to true by
+ * *index_hint_bits_data, if value is not NULL, will be filled by
  * table_index_fetch_tuple() iff it is guaranteed that no backend needs to see
  * that tuple. Index AMs can use that to avoid returning that tid in future
  * searches.
@@ -1124,7 +1125,8 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 						ItemPointer tid,
 						Snapshot snapshot,
 						TupleTableSlot *slot,
-						bool *call_again, bool *all_dead)
+						bool *call_again, 
+						IndexHintBitsData *index_hint_bits_data)
 {
 	/*
 	 * We don't expect direct calls to table_index_fetch_tuple with valid
@@ -1136,7 +1138,7 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 
 	return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
 													slot, call_again,
-													all_dead);
+													index_hint_bits_data);
 }
 
 /*
@@ -1148,7 +1150,7 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 extern bool table_index_fetch_tuple_check(Relation rel,
 										  ItemPointer tid,
 										  Snapshot snapshot,
-										  bool *all_dead);
+										  IndexHintBitsData *indexHintBitsData);
 
 
 /* ------------------------------------------------------------------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 724068cf87..ac649703cd 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -992,6 +992,7 @@ typedef enum
 	WAIT_EVENT_PROC_SIGNAL_BARRIER,
 	WAIT_EVENT_PROMOTE,
 	WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
+	WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS,
 	WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
 	WAIT_EVENT_RECOVERY_PAUSE,
 	WAIT_EVENT_REPLICATION_ORIGIN_DROP,
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4313f516d3..0371223c1e 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -156,6 +156,12 @@ typedef struct
 	 * store semantics, so use sig_atomic_t.
 	 */
 	sig_atomic_t force_reply;	/* used as a bool */
+
+	/* If sender has received our xmin. */
+	sig_atomic_t sender_has_standby_xmin;
+
+	/* Is senders feedback propagated through cascading replication chain up to the primary. */
+	sig_atomic_t sender_propagates_feedback_to_primary;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index fb00fda6a7..7cb4a92f9d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -61,6 +61,8 @@ struct WritebackContext;
 /* forward declared, to avoid including smgr.h here */
 struct SMgrRelationData;
 
+struct IndexHintBitsData;
+
 /* in globals.c ... this duplicates miscadmin.h */
 extern PGDLLIMPORT int NBuffers;
 
@@ -222,6 +224,9 @@ extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
 						 ForkNumber *forknum, BlockNumber *blknum);
 
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
+extern void MarkBufferDirtyIndexHint(Buffer buffer, bool buffer_std,
+									 Relation rel, TransactionId latestRemovedXid);
+extern bool IsMarkBufferDirtyIndexHintAllowed(struct IndexHintBitsData	*indexHintBitsData);
 
 extern void UnlockBuffers(void);
 extern void LockBuffer(Buffer buffer, int mode);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 683ab64f76..0a72160b61 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -165,6 +165,11 @@ struct PGPROC
 	 * though not required. Accessed without lock, if needed.
 	 */
 	bool		recoveryConflictPending;
+	/*
+	*  Flag allowing to read\set LP_DEAD bits in indexes.
+	*  Also used to raise recovery conflicts caused by index hint bits.
+	*/
+	bool		indexIgnoreKilledTuples;
 
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index b01fa52139..3b922f3fcb 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -70,7 +70,8 @@ extern bool IsBackendPid(int pid);
 extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
 												   bool excludeXmin0, bool allDbs, int excludeVacuum,
 												   int *nvxids);
-extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid);
+extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+													   bool onlyIndexIgnoreKilledTuples);
 extern pid_t CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode);
 extern pid_t SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
 									  bool conflictPending);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 94d33851d0..ed984082bf 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -19,6 +19,7 @@
 #include "storage/procsignal.h"
 #include "storage/relfilenode.h"
 #include "storage/standbydefs.h"
+#include "utils/relcache.h"
 
 /* User-settable GUC parameters */
 extern int	vacuum_defer_cleanup_age;
@@ -31,6 +32,9 @@ extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
 												RelFileNode node);
+extern void ResolveIndexHintBitsRecoveryConflictWithSnapshot(
+												TransactionId latestRemovedXid,
+												RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
@@ -93,4 +97,8 @@ extern XLogRecPtr LogStandbySnapshot(void);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 									bool relcacheInitFileInval);
 
+extern void StandByShmemInit(void);
+extern void LogIndexHintBitsHorizonIfNeeded(Relation rel,
+											TransactionId latestRemovedXid);
+
 #endif							/* STANDBY_H */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index d99e6f40c6..127de2e9eb 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -31,9 +31,10 @@ extern void standby_desc_invalidations(StringInfo buf,
 /*
  * XLOG message types
  */
-#define XLOG_STANDBY_LOCK			0x00
-#define XLOG_RUNNING_XACTS			0x10
-#define XLOG_INVALIDATIONS			0x20
+#define XLOG_STANDBY_LOCK				0x00
+#define XLOG_RUNNING_XACTS				0x10
+#define XLOG_INVALIDATIONS				0x20
+#define XLOG_INDEX_HINT_BITS_HORIZON	0x30
 
 typedef struct xl_standby_locks
 {
@@ -71,4 +72,10 @@ typedef struct xl_invalidations
 
 #define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
 
+typedef struct xl_index_hint_bits_horizon
+{
+	RelFileNode		rnode;
+	TransactionId	latestRemovedXid;
+} xl_index_hint_bits_horizon;
+
 #endif							/* STANDBYDEFS_H */
