diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index c8f7e781c6..625c8ddde3 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -17,6 +17,7 @@
 #include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
+#include "access/heapam_xlog.h"
 #include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -87,7 +88,9 @@ gistkillitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		GistMarkPageHasGarbage(page);
-		MarkBufferDirtyHint(buffer, true);
+		MarkBufferDirtyIndexHint(buffer, true,
+								 scan->indexRelation,
+								 so->killedLatestRemovedXid);
 	}
 
 	UnlockReleaseBuffer(buffer);
@@ -666,8 +669,12 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 						MemoryContextSwitchTo(oldCxt);
 					}
 					if (so->numKilled < MaxIndexTuplesPerPage)
+					{
 						so->killedItems[so->numKilled++] =
 							so->pageData[so->curPageData - 1].offnum;
+						IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+															&so->killedLatestRemovedXid);
+					}
 				}
 				/* continuing to return tuples from a leaf page */
 				scan->xs_heaptid = so->pageData[so->curPageData].heapPtr;
@@ -703,8 +710,12 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 					MemoryContextSwitchTo(oldCxt);
 				}
 				if (so->numKilled < MaxIndexTuplesPerPage)
+				{
 					so->killedItems[so->numKilled++] =
 						so->pageData[so->curPageData - 1].offnum;
+					IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+														&so->killedLatestRemovedXid);
+				}
 			}
 			/* find and process the next index page */
 			do
diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c
index 61e92cf0f5..b959ac5f17 100644
--- a/src/backend/access/gist/gistscan.c
+++ b/src/backend/access/gist/gistscan.c
@@ -107,6 +107,7 @@ gistbeginscan(Relation r, int nkeys, int norderbys)
 	}
 
 	so->killedItems = NULL;		/* until needed */
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 	so->curBlkno = InvalidBlockNumber;
 	so->curPageLSN = InvalidXLogRecPtr;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 0752fb38a9..c0e9fbac89 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -311,7 +311,11 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 					palloc(MaxIndexTuplesPerPage * sizeof(int));
 
 			if (so->numKilled < MaxIndexTuplesPerPage)
+			{
 				so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+				IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+													&so->killedLatestRemovedXid);
+			}
 		}
 
 		/*
@@ -379,6 +383,7 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
 	so->hashso_buc_split = false;
 
 	so->killedItems = NULL;
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 
 	scan->opaque = so;
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index 519872850e..5617144221 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -611,7 +611,9 @@ _hash_kill_items(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
-		MarkBufferDirtyHint(buf, true);
+		MarkBufferDirtyIndexHint(buf, true,
+								 scan->indexRelation,
+								 so->killedLatestRemovedXid);
 	}
 
 	if (so->hashso_bucket_buf == so->currPos.buf ||
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index faffbb1865..835ff8a2c0 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1517,7 +1517,8 @@ heap_fetch(Relation relation,
  *
  * If all_dead is not NULL, we check non-visible tuples to see if they are
  * globally dead; *all_dead is set true if all members of the HOT chain
- * are vacuumable, false if not.
+ * are vacuumable, false if not. Also, latest_removed_xid is set to the
+ * latest removed xid in a HOT chain.
  *
  * Unlike heap_fetch, the caller must already have pin and (at least) share
  * lock on the buffer; it is still pinned/locked at exit.  Also unlike
@@ -1526,7 +1527,7 @@ heap_fetch(Relation relation,
 bool
 heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 					   Snapshot snapshot, HeapTuple heapTuple,
-					   bool *all_dead, bool first_call)
+					   bool *all_dead, TransactionId *latest_removed_xid, bool first_call)
 {
 	Page		dp = (Page) BufferGetPage(buffer);
 	TransactionId prev_xmax = InvalidTransactionId;
@@ -1537,9 +1538,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 	bool		skip;
 	GlobalVisState *vistest = NULL;
 
+	Assert((!all_dead) || (all_dead && latest_removed_xid));
 	/* If this is not the first call, previous call returned a (live!) tuple */
 	if (all_dead)
 		*all_dead = first_call;
+	if (latest_removed_xid)
+		*latest_removed_xid = InvalidTransactionId;
 
 	blkno = ItemPointerGetBlockNumber(tid);
 	offnum = ItemPointerGetOffsetNumber(tid);
@@ -1622,7 +1626,10 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 				PredicateLockTID(relation, &heapTuple->t_self, snapshot,
 								 HeapTupleHeaderGetXmin(heapTuple->t_data));
 				if (all_dead)
+				{
 					*all_dead = false;
+					*latest_removed_xid = InvalidTransactionId;
+				}
 				return true;
 			}
 		}
@@ -1642,7 +1649,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 				vistest = GlobalVisTestFor(relation);
 
 			if (!HeapTupleIsSurelyDead(heapTuple, vistest))
+			{
 				*all_dead = false;
+				*latest_removed_xid = InvalidTransactionId;
+			}
+			else
+				HeapTupleHeaderAdvanceLatestRemovedXid(heapTuple->t_data, latest_removed_xid);
 		}
 
 		/*
@@ -7005,6 +7017,20 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 	/* *latestRemovedXid may still be invalid at end */
 }
 
+void
+IndexHintBitAdvanceLatestRemovedXid(TransactionId killedTupleRemovedXid,
+									TransactionId *latestRemovedXid)
+{
+	if (TransactionIdIsNormal(killedTupleRemovedXid))
+	{
+		if (!TransactionIdIsValid(*latestRemovedXid))
+			*latestRemovedXid = killedTupleRemovedXid;
+		else
+			*latestRemovedXid =
+			TransactionIdLatest(killedTupleRemovedXid, 1, latestRemovedXid);
+	}
+}
+
 #ifdef USE_PREFETCH
 /*
  * Helper function for heap_index_delete_tuples.  Issues prefetch requests for
@@ -7254,7 +7280,7 @@ heap_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate)
 
 			/* Are any tuples from this HOT chain non-vacuumable? */
 			if (heap_hot_search_buffer(&tmp, rel, buf, &SnapshotNonVacuumable,
-									   &heapTuple, NULL, true))
+									   &heapTuple, NULL, NULL, true))
 				continue;		/* can't delete entry */
 
 			/* Caller will delete, since whole HOT chain is vacuumable */
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 4a70e20a14..762e3cfd8c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -113,7 +113,8 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 						 ItemPointer tid,
 						 Snapshot snapshot,
 						 TupleTableSlot *slot,
-						 bool *call_again, bool *all_dead)
+						 bool *call_again, bool *all_dead,
+						 TransactionId *latest_removed_xid)
 {
 	IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
 	BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
@@ -146,6 +147,7 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 											snapshot,
 											&bslot->base.tupdata,
 											all_dead,
+											latest_removed_xid,
 											!*call_again);
 	bslot->base.tupdata.t_self = *tid;
 	LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK);
@@ -2173,7 +2175,7 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 
 			ItemPointerSet(&tid, page, offnum);
 			if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot,
-									   &heapTuple, NULL, true))
+									   &heapTuple, NULL, NULL, true))
 				hscan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid);
 		}
 	}
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index c911c705ba..e4c1cd114c 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -20,7 +20,6 @@
 #include "postgres.h"
 
 #include "access/genam.h"
-#include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -28,6 +27,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -106,18 +106,16 @@ RelationGetIndexScan(Relation indexRelation, int nkeys, int norderbys)
 	scan->xs_want_itup = false; /* may be set later */
 
 	/*
-	 * During recovery we ignore killed tuples and don't bother to kill them
-	 * either. We do this because the xmin on the primary node could easily be
-	 * later than the xmin on the standby node, so that what the primary
-	 * thinks is killed is supposed to be visible on standby. So for correct
-	 * MVCC for queries during recovery we must ignore these hints and check
-	 * all tuples. Do *not* set ignore_killed_tuples to true when running in a
-	 * transaction that was started during recovery. xactStartedInRecovery
-	 * should not be altered by index AMs.
-	 */
+	 * For correct MVCC for queries during recovery, we could use
+	 * index hint bits as on the primary. But to avoid frequent query
+	 * cancellation we do it only if hot_standby_feedback is active and
+	 * our xmin is honored on the primary.
+	 *
+	 * The decision is made in GetSnapshotIndexIgnoreKilledTuples.
+	*/
 	scan->kill_prior_tuple = false;
-	scan->xactStartedInRecovery = TransactionStartedDuringRecovery();
-	scan->ignore_killed_tuples = !scan->xactStartedInRecovery;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
+	scan->ignore_killed_tuples = MyProc->indexIgnoreKilledTuples;
 
 	scan->opaque = NULL;
 
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 3d2dbed708..bc5c11def3 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -309,6 +309,7 @@ index_rescan(IndexScanDesc scan,
 		table_index_fetch_reset(scan->xs_heapfetch);
 
 	scan->kill_prior_tuple = false; /* for safety */
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
@@ -386,6 +387,7 @@ index_restrpos(IndexScanDesc scan)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
 	scan->kill_prior_tuple = false; /* for safety */
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	scan->indexRelation->rd_indam->amrestrpos(scan);
@@ -534,6 +536,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 
 	/* Reset kill flag immediately for safety */
 	scan->kill_prior_tuple = false;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 	scan->xs_heap_continue = false;
 
 	/* If we're out of index entries, we're done */
@@ -574,12 +577,14 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 bool
 index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 {
-	bool		all_dead = false;
-	bool		found;
+	bool			all_dead = false;
+	TransactionId	latest_removed_xid = InvalidTransactionId;
+	bool			found;
 
 	found = table_index_fetch_tuple(scan->xs_heapfetch, &scan->xs_heaptid,
 									scan->xs_snapshot, slot,
-									&scan->xs_heap_continue, &all_dead);
+									&scan->xs_heap_continue,
+									&all_dead, &latest_removed_xid);
 
 	if (found)
 		pgstat_count_heap_fetch(scan->indexRelation);
@@ -587,13 +592,13 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 	/*
 	 * If we scanned a whole HOT chain and found only dead tuples, tell index
 	 * AM to kill its entry for that TID (this will take effect in the next
-	 * amgettuple call, in index_getnext_tid).  We do not do this when in
-	 * recovery because it may violate MVCC to do so.  See comments in
-	 * RelationGetIndexScan().
+	 * amgettuple call, in index_getnext_tid).
 	 */
-	if (!scan->xactStartedInRecovery)
+	if (scan->ignore_killed_tuples)
+	{
 		scan->kill_prior_tuple = all_dead;
-
+		scan->prior_tuple_removed_xid = latest_removed_xid;
+	}
 	return found;
 }
 
@@ -667,6 +672,7 @@ index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap)
 
 	/* just make sure this is false... */
 	scan->kill_prior_tuple = false;
+	scan->prior_tuple_removed_xid = InvalidTransactionId;
 
 	/*
 	 * have the am's getbitmap proc do all the work.
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e333603912..5282831c40 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -502,7 +502,8 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 			if (inposting || !ItemIdIsDead(curitemid))
 			{
 				ItemPointerData htid;
-				bool		all_dead = false;
+				bool			all_dead = false;
+				TransactionId	latest_removed_xid = InvalidTransactionId;
 
 				if (!inposting)
 				{
@@ -556,7 +557,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				 */
 				else if (table_index_fetch_tuple_check(heapRel, &htid,
 													   &SnapshotDirty,
-													   &all_dead))
+													   &all_dead, &latest_removed_xid))
 				{
 					TransactionId xwait;
 
@@ -613,7 +614,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 					 */
 					htid = itup->t_tid;
 					if (table_index_fetch_tuple_check(heapRel, &htid,
-													  SnapshotSelf, NULL))
+													  SnapshotSelf, NULL, NULL))
 					{
 						/* Normal case --- it's still live */
 					}
@@ -687,9 +688,9 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 					 * crucial. Be sure to mark the proper buffer dirty.
 					 */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						MarkBufferDirtyIndexHint(nbuf, true, rel, latest_removed_xid);
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						MarkBufferDirtyIndexHint(insertstate->buf, true, rel, latest_removed_xid);
 				}
 
 				/*
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 289bd3c15d..c35a34003b 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -21,7 +21,7 @@
 #include "access/nbtree.h"
 #include "access/nbtxlog.h"
 #include "access/relscan.h"
-#include "access/xlog.h"
+#include "access/heapam_xlog.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
@@ -272,7 +272,11 @@ btgettuple(IndexScanDesc scan, ScanDirection dir)
 					so->killedItems = (int *)
 						palloc(MaxTIDsPerBTreePage * sizeof(int));
 				if (so->numKilled < MaxTIDsPerBTreePage)
+				{
 					so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+					IndexHintBitAdvanceLatestRemovedXid(scan->prior_tuple_removed_xid,
+														&so->killedLatestRemovedXid);
+				}
 			}
 
 			/*
@@ -378,6 +382,7 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
 	so->arrayContext = NULL;
 
 	so->killedItems = NULL;		/* until needed */
+	so->killedLatestRemovedXid = InvalidTransactionId;
 	so->numKilled = 0;
 
 	/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index d524310723..0ce93551ac 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1883,7 +1883,9 @@ _bt_killitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
-		MarkBufferDirtyHint(so->currPos.buf, true);
+		MarkBufferDirtyIndexHint(so->currPos.buf, true,
+								 scan->indexRelation,
+								 so->killedLatestRemovedXid);
 	}
 
 	_bt_unlockbuf(scan->indexRelation, so->currPos.buf);
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 01ee7ac6d2..32ebd730f1 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -36,6 +36,16 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 		appendStringInfoString(buf, "; subxid ovf");
 }
 
+static void
+standby_desc_index_hint_bits_horizon(StringInfo buf,
+									 xl_index_hint_bits_horizon *xlrec)
+{
+	char		*path = relpathperm(xlrec->rnode, MAIN_FORKNUM);
+
+	appendStringInfo(buf, "latestRemovedXid %u in %s",
+					 xlrec->latestRemovedXid, path);
+}
+
 void
 standby_desc(StringInfo buf, XLogReaderState *record)
 {
@@ -66,6 +76,12 @@ standby_desc(StringInfo buf, XLogReaderState *record)
 								   xlrec->dbId, xlrec->tsId,
 								   xlrec->relcacheInitFileInval);
 	}
+	else if (info == XLOG_INDEX_HINT_BITS_HORIZON)
+	{
+		xl_index_hint_bits_horizon *xlrec = (xl_index_hint_bits_horizon *) rec;
+
+		standby_desc_index_hint_bits_horizon(buf, xlrec);
+	}
 }
 
 const char *
@@ -84,6 +100,9 @@ standby_identify(uint8 info)
 		case XLOG_INVALIDATIONS:
 			id = "INVALIDATIONS";
 			break;
+		case XLOG_INDEX_HINT_BITS_HORIZON:
+			id = "INDEX_HINT_BITS_HORIZON";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 5ea5bdd810..be02cad7e5 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -219,7 +219,8 @@ bool
 table_index_fetch_tuple_check(Relation rel,
 							  ItemPointer tid,
 							  Snapshot snapshot,
-							  bool *all_dead)
+							  bool *all_dead,
+							  TransactionId *latest_removed_xid)
 {
 	IndexFetchTableData *scan;
 	TupleTableSlot *slot;
@@ -229,7 +230,7 @@ table_index_fetch_tuple_check(Relation rel,
 	slot = table_slot_create(rel, NULL);
 	scan = table_index_fetch_begin(rel);
 	found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again,
-									all_dead);
+									all_dead, latest_removed_xid);
 	table_index_fetch_end(scan);
 	ExecDropSingleTupleTableSlot(slot);
 
diff --git a/src/backend/commands/constraint.c b/src/backend/commands/constraint.c
index d0063164a7..9e72af54ba 100644
--- a/src/backend/commands/constraint.c
+++ b/src/backend/commands/constraint.c
@@ -112,7 +112,7 @@ unique_key_recheck(PG_FUNCTION_ARGS)
 		bool		call_again = false;
 
 		if (!table_index_fetch_tuple(scan, &tmptid, SnapshotSelf, slot,
-									 &call_again, NULL))
+									 &call_again, NULL, NULL))
 		{
 			/*
 			 * All rows referenced by the index entry are dead, so skip the
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f75b52719d..f4c1f830d6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4104,6 +4104,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT:
 			event_name = "RecoveryConflictSnapshot";
 			break;
+		case WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS:
+			event_name = "RecoveryConflictSnapshotIndexHintBits";
+			break;
 		case WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE:
 			event_name = "RecoveryConflictTablespace";
 			break;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df00d0..8289f055b0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -411,6 +411,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
 			 */
 			break;
+		case XLOG_INDEX_HINT_BITS_HORIZON:
+			break;
 		default:
 			elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 71d510e305..07681fdc6d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -577,6 +577,8 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 #endif
 
 	MyProc->xmin = snap->xmin;
+	// to keep it simple use index hint bits on the primary only
+	MyProc->indexIgnoreKilledTuples = !RecoveryInProgress();
 
 	/* allocate in transaction context */
 	newxip = (TransactionId *)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 723f513d8b..1199dffc45 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -225,6 +225,9 @@ WalReceiverMain(void)
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
 	walrcv->walRcvState = WALRCV_STREAMING;
+	/* Initially true so we always send at least one feedback message */
+	walrcv->sender_has_standby_xmin = true;
+	walrcv->sender_propagates_feedback_to_primary = false;
 
 	/* Fetch information required to start streaming */
 	walrcv->ready_to_display = false;
@@ -806,6 +809,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 	XLogRecPtr	walEnd;
 	TimestampTz sendTime;
 	bool		replyRequested;
+	bool		senderPropagatesFeedbackToPrimary;
 
 	resetStringInfo(&incoming_message);
 
@@ -835,7 +839,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 		case 'k':				/* Keepalive */
 			{
 				/* copy message to StringInfo */
-				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
+				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char) + sizeof(char);
 				if (len != hdrlen)
 					ereport(ERROR,
 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -846,8 +850,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				walEnd = pq_getmsgint64(&incoming_message);
 				sendTime = pq_getmsgint64(&incoming_message);
 				replyRequested = pq_getmsgbyte(&incoming_message);
+				senderPropagatesFeedbackToPrimary = pq_getmsgbyte(&incoming_message);
 
 				ProcessWalSndrMessage(walEnd, sendTime);
+				WalRcv->sender_propagates_feedback_to_primary = senderPropagatesFeedbackToPrimary;
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
@@ -1110,15 +1116,13 @@ XLogWalRcvSendHSFeedback(bool immed)
 				catalog_xmin;
 	static TimestampTz sendTime = 0;
 
-	/* initially true so we always send at least one feedback message */
-	static bool primary_has_standby_xmin = true;
 
 	/*
 	 * If the user doesn't want status to be reported to the primary, be sure
 	 * to exit before doing anything at all.
 	 */
 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
-		!primary_has_standby_xmin)
+		!WalRcv->sender_has_standby_xmin)
 		return;
 
 	/* Get current timestamp. */
@@ -1188,9 +1192,9 @@ XLogWalRcvSendHSFeedback(bool immed)
 	pq_sendint32(&reply_message, catalog_xmin_epoch);
 	walrcv_send(wrconn, reply_message.data, reply_message.len);
 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
-		primary_has_standby_xmin = true;
+		WalRcv->sender_has_standby_xmin = true;
 	else
-		primary_has_standby_xmin = false;
+		WalRcv->sender_has_standby_xmin = false;
 }
 
 /*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8545c6c423..71fe08e1ab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2169,6 +2169,12 @@ ProcessStandbyHSFeedbackMessage(void)
 		else
 			MyProc->xmin = feedbackXmin;
 	}
+
+	/*
+	 * Always send keep-alive after feedback to allow standby to maintain
+	 * WalRcv->sender_propagates_feedback_to_primary.
+	 */
+	WalSndKeepalive(false);
 }
 
 /*
@@ -3450,7 +3456,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply)
 {
+	bool am_propagating_feedback_to_primary;
 	elog(DEBUG2, "sending replication keepalive");
+	am_propagating_feedback_to_primary = !am_cascading_walsender
+		|| (WalRcv->sender_has_standby_xmin && WalRcv->sender_propagates_feedback_to_primary);
 
 	/* construct the message... */
 	resetStringInfo(&output_message);
@@ -3458,6 +3467,7 @@ WalSndKeepalive(bool requestReply)
 	pq_sendint64(&output_message, sentPtr);
 	pq_sendint64(&output_message, GetCurrentTimestamp());
 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
+	pq_sendbyte(&output_message, am_propagating_feedback_to_primary ? 1 : 0);
 
 	/* ... and send it wrapped in CopyData */
 	pq_putmessage_noblock('d', output_message.data, output_message.len);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 561c212092..1e7c1797c1 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3898,6 +3898,22 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	}
 }
 
+/*
+ * MarkBufferDirtyIndexHint
+ *
+ * This is essentially the same as MarkBufferDirtyHint, except it WAL log
+ * new value for index hint bits horizon if required.
+ *
+ * Should be used instead of MarkBufferDirtyHint for LP_DEAD hints in indexes.
+ */
+void
+MarkBufferDirtyIndexHint(Buffer buffer, bool buffer_std,
+						 Relation rel, TransactionId latestRemovedXid)
+{
+	LogIndexHintBitsHorizonIfNeeded(rel, latestRemovedXid);
+	MarkBufferDirtyHint(buffer, buffer_std);
+}
+
 /*
  * Release buffer content locks for shared buffers.
  *
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index f9bbe97b50..ce1b8f628a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -267,6 +267,7 @@ CreateSharedMemoryAndSemaphores(void)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	StandByShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cf12eda504..07863dbb2d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -65,8 +65,10 @@
 #include "utils/builtins.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
+#include "replication/walreceiver.h"
 
 #define UINT32_ACCESS_ONCE(var)		 ((uint32)(*((volatile uint32 *)&(var))))
+#define BOOL_ACCESS_ONCE(var)		 ((bool)(*((volatile bool *)&(var))))
 
 /* Our shared memory area */
 typedef struct ProcArrayStruct
@@ -655,6 +657,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 
 		proc->lxid = InvalidLocalTransactionId;
 		proc->xmin = InvalidTransactionId;
+		proc->indexIgnoreKilledTuples = false;
 		proc->delayChkpt = false;	/* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
 
@@ -694,6 +697,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
 	proc->xid = InvalidTransactionId;
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
+	proc->indexIgnoreKilledTuples = false;
 	proc->delayChkpt = false;	/* be sure this is cleared in abort */
 	proc->recoveryConflictPending = false;
 
@@ -877,6 +881,7 @@ ProcArrayClearTransaction(PGPROC *proc)
 
 	proc->lxid = InvalidLocalTransactionId;
 	proc->xmin = InvalidTransactionId;
+	proc->indexIgnoreKilledTuples = false;
 	proc->recoveryConflictPending = false;
 
 	Assert(!(proc->statusFlags & PROC_VACUUM_STATE_MASK));
@@ -2013,6 +2018,23 @@ GetSnapshotDataInitOldSnapshot(Snapshot snapshot)
 	}
 }
 
+static bool
+GetSnapshotIndexIgnoreKilledTuples(Snapshot snapshot)
+{
+	/*
+	 * Always use and set LP_DEAD bits on primary. In case of standby
+	 * only if hot_standby_feedback enabled, walsender has our xmin
+	 * and walsender propagates feedback up to the primary (to avoid
+	 * unnecessary cancellations).
+	 *
+	 * It is always safe to set it to true but could cause high
+	 * rate of conflicts.
+	*/
+	Assert(!RecoveryInProgress() || WalRcv);
+	return !snapshot->takenDuringRecovery ||
+		(WalRcv->sender_propagates_feedback_to_primary && WalRcv->sender_has_standby_xmin);
+}
+
 /*
  * Helper function for GetSnapshotData() that checks if the bulk of the
  * visibility information in the snapshot is still valid. If so, it updates
@@ -2057,7 +2079,10 @@ GetSnapshotDataReuse(Snapshot snapshot)
 	 * xmin.
 	 */
 	if (!TransactionIdIsValid(MyProc->xmin))
+	{
 		MyProc->xmin = TransactionXmin = snapshot->xmin;
+		MyProc->indexIgnoreKilledTuples = GetSnapshotIndexIgnoreKilledTuples(snapshot);
+	}
 
 	RecentXmin = snapshot->xmin;
 	Assert(TransactionIdPrecedesOrEquals(TransactionXmin, RecentXmin));
@@ -2345,7 +2370,10 @@ GetSnapshotData(Snapshot snapshot)
 	replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
 
 	if (!TransactionIdIsValid(MyProc->xmin))
+	{
 		MyProc->xmin = TransactionXmin = xmin;
+		MyProc->indexIgnoreKilledTuples = GetSnapshotIndexIgnoreKilledTuples(snapshot);
+	}
 
 	LWLockRelease(ProcArrayLock);
 
@@ -2524,6 +2552,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
 		 * we don't check that.)
 		 */
 		MyProc->xmin = TransactionXmin = xmin;
+		// no need to change indexIgnoreKilledTuples because restriction is relaxed.
 
 		result = true;
 		break;
@@ -2567,6 +2596,8 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
 		TransactionIdPrecedesOrEquals(xid, xmin))
 	{
 		MyProc->xmin = TransactionXmin = xmin;
+		// we could also copy indexIgnoreKilledTuples, could be useful for parallel scans
+		MyProc->indexIgnoreKilledTuples = proc->indexIgnoreKilledTuples;
 		result = true;
 	}
 
@@ -3245,11 +3276,15 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
  *
  * If dbOid is valid we skip backends attached to other databases.
  *
+ * If onlyIndexIgnoreKilledTuples is true we include only backends
+ * with indexIgnoreKilledTuples set.
+ *
  * Be careful to *not* pfree the result from this function. We reuse
  * this array sufficiently often that we use malloc for the result.
  */
 VirtualTransactionId *
-GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
+GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+						  bool onlyIndexIgnoreKilledTuples)
 {
 	static VirtualTransactionId *vxids;
 	ProcArrayStruct *arrayP = procArray;
@@ -3287,6 +3322,8 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
 		{
 			/* Fetch xmin just once - can't change on us, but good coding */
 			TransactionId pxmin = UINT32_ACCESS_ONCE(proc->xmin);
+			bool indexIgnoreKilledTuples =
+				BOOL_ACCESS_ONCE(proc->indexIgnoreKilledTuples);
 
 			/*
 			 * We ignore an invalid pxmin because this means that backend has
@@ -3297,7 +3334,8 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
 			 * test here.
 			 */
 			if (!TransactionIdIsValid(limitXmin) ||
-				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
+				(TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin) &&
+					(!onlyIndexIgnoreKilledTuples || indexIgnoreKilledTuples)))
 			{
 				VirtualTransactionId vxid;
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 39a30c00f7..3cffd64161 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -42,6 +42,7 @@ int			max_standby_streaming_delay = 30 * 1000;
 bool		log_recovery_conflict_waits = false;
 
 static HTAB *RecoveryLockLists;
+static HTAB *IndexHintBitsHorizons;
 
 /* Flags set by timeout handlers */
 static volatile sig_atomic_t got_standby_deadlock_timeout = false;
@@ -65,6 +66,12 @@ typedef struct RecoveryLockListsEntry
 	List	   *locks;
 } RecoveryLockListsEntry;
 
+typedef struct IndexHintBitsHorizonsEntry
+{
+	Oid				dbOid;
+	TransactionId	hintHorizonXid;
+} IndexHintBitsHorizonsEntry;
+
 /*
  * InitRecoveryTransactionEnvironment
  *		Initialize tracking of our primary's in-progress transactions.
@@ -425,7 +432,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
@@ -444,7 +452,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 		return;
 
 	backends = GetConflictingVirtualXIDs(latestRemovedXid,
-										 node.dbNode);
+										 node.dbNode, false);
 
 	ResolveRecoveryConflictWithVirtualXIDs(backends,
 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
@@ -452,6 +460,22 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode
 										   true);
 }
 
+void
+ResolveIndexHintBitsRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+												 RelFileNode node)
+{
+	VirtualTransactionId *backends;
+
+	backends = GetConflictingVirtualXIDs(latestRemovedXid,
+										 node.dbNode, true);
+
+	ResolveRecoveryConflictWithVirtualXIDs(
+			backends,
+			PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+			WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS,
+			true);
+}
+
 void
 ResolveRecoveryConflictWithTablespace(Oid tsid)
 {
@@ -475,7 +499,7 @@ ResolveRecoveryConflictWithTablespace(Oid tsid)
 	 * We don't wait for commit because drop tablespace is non-transactional.
 	 */
 	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
-												InvalidOid);
+												InvalidOid, false);
 	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 										   WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
@@ -1026,6 +1050,43 @@ StandbyReleaseOldLocks(TransactionId oldxid)
 	}
 }
 
+static bool
+IsNewerIndexHintBitsHorizonXid(Oid dbOid, TransactionId latestRemovedXid)
+{
+	bool found, result;
+	IndexHintBitsHorizonsEntry* entry;
+	Assert(TransactionIdIsNormal(latestRemovedXid));
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_SHARED);
+	entry = (IndexHintBitsHorizonsEntry *) hash_search(IndexHintBitsHorizons, &dbOid,
+													   HASH_FIND, &found);
+
+	result = !found || TransactionIdPrecedes(entry->hintHorizonXid, latestRemovedXid);
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+
+	return result;
+}
+
+static void
+UpsertLatestIndexHintBitsHorizonXid(Oid dbOid, TransactionId latestRemovedXid)
+{
+
+	bool found;
+	IndexHintBitsHorizonsEntry* entry;
+	Assert(TransactionIdIsNormal(latestRemovedXid));
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_EXCLUSIVE);
+
+	entry = (IndexHintBitsHorizonsEntry *) hash_search(IndexHintBitsHorizons, &dbOid,
+													   HASH_ENTER, &found);
+
+	if (!found || TransactionIdPrecedes(entry->hintHorizonXid, latestRemovedXid))
+		entry->hintHorizonXid = latestRemovedXid;
+
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+}
+
+
 /*
  * --------------------------------------------------------------------
  *		Recovery handling for Rmgr RM_STANDBY_ID
@@ -1081,6 +1142,16 @@ standby_redo(XLogReaderState *record)
 											 xlrec->dbId,
 											 xlrec->tsId);
 	}
+	else if (info == XLOG_INDEX_HINT_BITS_HORIZON) {
+		if (InHotStandby) {
+			xl_index_hint_bits_horizon *xlrec =
+					(xl_index_hint_bits_horizon *) XLogRecGetData(record);
+
+			ResolveIndexHintBitsRecoveryConflictWithSnapshot(
+												xlrec->latestRemovedXid,
+												xlrec->rnode);
+		}
+	}
 	else
 		elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1381,3 +1452,49 @@ get_recovery_conflict_desc(ProcSignalReason reason)
 
 	return reasonDesc;
 }
+
+static void
+LogIndexHintBitsHorizon(RelFileNode rnode, TransactionId latestRemovedXid)
+{
+	xl_index_hint_bits_horizon xlrec;
+
+	xlrec.rnode = rnode;
+	xlrec.latestRemovedXid = latestRemovedXid;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_index_hint_bits_horizon));
+
+	XLogInsert(RM_STANDBY_ID, XLOG_INDEX_HINT_BITS_HORIZON);
+}
+
+void
+LogIndexHintBitsHorizonIfNeeded(Relation rel, TransactionId latestRemovedXid)
+{
+	if (!RecoveryInProgress() && XLogStandbyInfoActive() &&
+			TransactionIdIsNormal(latestRemovedXid) && RelationNeedsWAL(rel)) {
+		if (IsNewerIndexHintBitsHorizonXid(rel->rd_node.dbNode, latestRemovedXid))
+		{
+			LogIndexHintBitsHorizon(rel->rd_node, latestRemovedXid);
+			UpsertLatestIndexHintBitsHorizonXid(rel->rd_node.dbNode,
+												latestRemovedXid);
+		}
+	}
+}
+
+void
+StandByShmemInit(void)
+{
+	HASHCTL		info;
+
+	MemSet(&info, 0, sizeof(info));
+	info.keysize = sizeof(Oid);
+	info.entrysize = sizeof(IndexHintBitsHorizonsEntry);
+
+	LWLockAcquire(IndexHintBitsHorizonShmemLock, LW_EXCLUSIVE);
+
+	IndexHintBitsHorizons = ShmemInitHash("IndexHintBitsHorizons",
+										  64, 64,
+										  &info, HASH_ELEM | HASH_BLOBS);
+
+	LWLockRelease(IndexHintBitsHorizonShmemLock);
+}
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 774292fd94..e66f8fbb8b 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -53,3 +53,4 @@ XactTruncationLock					44
 # 45 was XactTruncationLock until removal of BackendRandomLock
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
+IndexHintBitsHorizonShmemLock		48
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index c87ffc6549..2da7eb69da 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -386,6 +386,7 @@ InitProcess(void)
 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->indexIgnoreKilledTuples = false;
 	MyProc->pid = MyProcPid;
 	/* backendId, databaseId and roleId will be filled in later */
 	MyProc->backendId = InvalidBackendId;
@@ -569,6 +570,7 @@ InitAuxiliaryProcess(void)
 	MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
 	MyProc->xid = InvalidTransactionId;
 	MyProc->xmin = InvalidTransactionId;
+	MyProc->indexIgnoreKilledTuples = false;
 	MyProc->backendId = InvalidBackendId;
 	MyProc->databaseId = InvalidOid;
 	MyProc->roleId = InvalidOid;
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index ae16c3ed7d..bed98d6436 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -528,6 +528,10 @@ SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
 	 * the state for GlobalVis*.
 	 */
 	CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+	/* To keep it simple, use index hint bits only on the primary for imported
+	 * snapshots.
+	 */
+	MyProc->indexIgnoreKilledTuples = !RecoveryInProgress();
 
 	/*
 	 * Now copy appropriate fields from the source snapshot.
@@ -932,6 +936,7 @@ SnapshotResetXmin(void)
 	if (pairingheap_is_empty(&RegisteredSnapshots))
 	{
 		MyProc->xmin = InvalidTransactionId;
+		MyProc->indexIgnoreKilledTuples = false;
 		return;
 	}
 
@@ -939,6 +944,7 @@ SnapshotResetXmin(void)
 										pairingheap_first(&RegisteredSnapshots));
 
 	if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
+		// no need to change indexIgnoreKilledTuples here because xmin restriction is relaxed
 		MyProc->xmin = minSnapshot->xmin;
 }
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 553d364e2d..97c97c13c2 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -165,8 +165,9 @@ typedef struct GISTScanOpaqueData
 	IndexOrderByDistance *distances;	/* output area for gistindex_keytest */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
-	OffsetNumber *killedItems;	/* offset numbers of killed items */
-	int			numKilled;		/* number of currently stored items */
+	OffsetNumber *killedItems;			  /* offset numbers of killed items */
+	TransactionId killedLatestRemovedXid; /* latest removed xid of all killed items */
+	int			  numKilled;			  /* number of currently stored items */
 	BlockNumber curBlkno;		/* current number of block */
 	GistNSN		curPageLSN;		/* pos in the WAL stream when page was read */
 
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index 1cce865be2..a3fc82192e 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -177,8 +177,9 @@ typedef struct HashScanOpaqueData
 	 */
 	bool		hashso_buc_split;
 	/* info about killed items if any (killedItems is NULL if never used) */
-	int		   *killedItems;	/* currPos.items indexes of killed items */
-	int			numKilled;		/* number of currently stored items */
+	int			 *killedItems;			  /* currPos.items indexes of killed items */
+	TransactionId killedLatestRemovedXid; /* latest removed xid of all killed items */
+	int			  numKilled;			  /* number of currently stored items */
 
 	/*
 	 * Identify all the matching items on a page and save them in
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index d96a47b1ce..a9ed0e0918 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -126,7 +126,8 @@ extern bool heap_fetch(Relation relation, Snapshot snapshot,
 					   HeapTuple tuple, Buffer *userbuf);
 extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation,
 								   Buffer buffer, Snapshot snapshot, HeapTuple heapTuple,
-								   bool *all_dead, bool first_call);
+								   bool *all_dead, TransactionId *latest_removed_xid,
+								   bool first_call);
 
 extern void heap_get_latest_tid(TableScanDesc scan, ItemPointer tid);
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 178d49710a..b49c3b4dc7 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -387,6 +387,8 @@ typedef struct xl_heap_rewrite_mapping
 
 extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 												   TransactionId *latestRemovedXid);
+extern void IndexHintBitAdvanceLatestRemovedXid(TransactionId killedTupleRemovedXid,
+												TransactionId *latestRemovedXid);
 
 extern void heap_redo(XLogReaderState *record);
 extern void heap_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index cad4f2bdeb..10257821fa 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -920,8 +920,9 @@ typedef struct BTScanOpaqueData
 	MemoryContext arrayContext; /* scan-lifespan context for array data */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
-	int		   *killedItems;	/* currPos.items indexes of killed items */
-	int			numKilled;		/* number of currently stored items */
+	int				*killedItems;			/* currPos.items indexes of killed items */
+	TransactionId	 killedLatestRemovedXid;/* latest removed xid of all killed items */
+	int				 numKilled;				/* number of currently stored items */
 
 	/*
 	 * If we are doing an index-only scan, these are the tuple storage
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 005f3fdd2b..7038e7fdae 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -122,10 +122,9 @@ typedef struct IndexScanDescData
 	bool		xs_temp_snap;	/* unregister snapshot at scan end? */
 
 	/* signaling to index AM about killing index tuples */
-	bool		kill_prior_tuple;	/* last-returned tuple is dead */
-	bool		ignore_killed_tuples;	/* do not return killed entries */
-	bool		xactStartedInRecovery;	/* prevents killing/seeing killed
-										 * tuples */
+	bool			kill_prior_tuple;		 /* last-returned tuple is dead */
+	TransactionId	prior_tuple_removed_xid; /* removed fix for dead tuple */
+	bool			ignore_killed_tuples;	 /* do not return killed entries */
 
 	/* index access method's private state */
 	void	   *opaque;			/* access-method-specific info */
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 33bffb6815..2a90040985 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -398,12 +398,17 @@ typedef struct TableAmRoutine
 	 * index_fetch_tuple iff it is guaranteed that no backend needs to see
 	 * that tuple. Index AMs can use that to avoid returning that tid in
 	 * future searches.
+	 *
+	 * *latest_removed_xid, if all_dead is not NULL, will be set to
+	 * the latest removed xid of a HOT chain by table_index_fetch_tuple()
+	 * iff it is guaranteed that no backend needs to see that tuple.
 	 */
 	bool		(*index_fetch_tuple) (struct IndexFetchTableData *scan,
 									  ItemPointer tid,
 									  Snapshot snapshot,
 									  TupleTableSlot *slot,
-									  bool *call_again, bool *all_dead);
+									  bool *call_again, bool *all_dead,
+									  TransactionId *latest_removed_xid);
 
 
 	/* ------------------------------------------------------------------------
@@ -1112,6 +1117,10 @@ table_index_fetch_end(struct IndexFetchTableData *scan)
  * that tuple. Index AMs can use that to avoid returning that tid in future
  * searches.
  *
+ * *latest_removed_xid, if all_dead is not NULL, will be set to the latest removed
+ * xid of a HOT chain by table_index_fetch_tuple() iff it is guaranteed that no
+ * backend needs to see that tuple.
+ *
  * The difference between this function and table_tuple_fetch_row_version()
  * is that this function returns the currently visible version of a row if
  * the AM supports storing multiple row versions reachable via a single index
@@ -1124,7 +1133,8 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 						ItemPointer tid,
 						Snapshot snapshot,
 						TupleTableSlot *slot,
-						bool *call_again, bool *all_dead)
+						bool *call_again, bool *all_dead,
+						TransactionId *latest_removed_xid)
 {
 	/*
 	 * We don't expect direct calls to table_index_fetch_tuple with valid
@@ -1136,7 +1146,7 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 
 	return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
 													slot, call_again,
-													all_dead);
+													all_dead, latest_removed_xid);
 }
 
 /*
@@ -1148,7 +1158,8 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 extern bool table_index_fetch_tuple_check(Relation rel,
 										  ItemPointer tid,
 										  Snapshot snapshot,
-										  bool *all_dead);
+										  bool *all_dead,
+										  TransactionId *latest_removed_xid);
 
 
 /* ------------------------------------------------------------------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 724068cf87..ac649703cd 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -992,6 +992,7 @@ typedef enum
 	WAIT_EVENT_PROC_SIGNAL_BARRIER,
 	WAIT_EVENT_PROMOTE,
 	WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
+	WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT_INDEX_HINT_BITS,
 	WAIT_EVENT_RECOVERY_CONFLICT_TABLESPACE,
 	WAIT_EVENT_RECOVERY_PAUSE,
 	WAIT_EVENT_REPLICATION_ORIGIN_DROP,
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4313f516d3..0371223c1e 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -156,6 +156,12 @@ typedef struct
 	 * store semantics, so use sig_atomic_t.
 	 */
 	sig_atomic_t force_reply;	/* used as a bool */
+
+	/* If sender has received our xmin. */
+	sig_atomic_t sender_has_standby_xmin;
+
+	/* Is senders feedback propagated through cascading replication chain up to the primary. */
+	sig_atomic_t sender_propagates_feedback_to_primary;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index fb00fda6a7..cb4c9e9233 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -222,6 +222,8 @@ extern void BufferGetTag(Buffer buffer, RelFileNode *rnode,
 						 ForkNumber *forknum, BlockNumber *blknum);
 
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
+extern void MarkBufferDirtyIndexHint(Buffer buffer, bool buffer_std,
+									 Relation rel, TransactionId latestRemovedXid);
 
 extern void UnlockBuffers(void);
 extern void LockBuffer(Buffer buffer, int mode);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 683ab64f76..0a72160b61 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -165,6 +165,11 @@ struct PGPROC
 	 * though not required. Accessed without lock, if needed.
 	 */
 	bool		recoveryConflictPending;
+	/*
+	*  Flag allowing to read\set LP_DEAD bits in indexes.
+	*  Also used to raise recovery conflicts caused by index hint bits.
+	*/
+	bool		indexIgnoreKilledTuples;
 
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index b01fa52139..3b922f3fcb 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -70,7 +70,8 @@ extern bool IsBackendPid(int pid);
 extern VirtualTransactionId *GetCurrentVirtualXIDs(TransactionId limitXmin,
 												   bool excludeXmin0, bool allDbs, int excludeVacuum,
 												   int *nvxids);
-extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid);
+extern VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid,
+													   bool onlyIndexIgnoreKilledTuples);
 extern pid_t CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode);
 extern pid_t SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode,
 									  bool conflictPending);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 2b1f340b82..9758da768e 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -18,6 +18,7 @@
 #include "storage/procsignal.h"
 #include "storage/relfilenode.h"
 #include "storage/standbydefs.h"
+#include "utils/relcache.h"
 
 /* User-settable GUC parameters */
 extern int	vacuum_defer_cleanup_age;
@@ -30,6 +31,9 @@ extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
 												RelFileNode node);
+extern void ResolveIndexHintBitsRecoveryConflictWithSnapshot(
+												TransactionId latestRemovedXid,
+												RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
@@ -92,4 +96,8 @@ extern XLogRecPtr LogStandbySnapshot(void);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 									bool relcacheInitFileInval);
 
+extern void StandByShmemInit(void);
+extern void LogIndexHintBitsHorizonIfNeeded(Relation rel,
+											TransactionId latestRemovedXid);
+
 #endif							/* STANDBY_H */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index d99e6f40c6..127de2e9eb 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -31,9 +31,10 @@ extern void standby_desc_invalidations(StringInfo buf,
 /*
  * XLOG message types
  */
-#define XLOG_STANDBY_LOCK			0x00
-#define XLOG_RUNNING_XACTS			0x10
-#define XLOG_INVALIDATIONS			0x20
+#define XLOG_STANDBY_LOCK				0x00
+#define XLOG_RUNNING_XACTS				0x10
+#define XLOG_INVALIDATIONS				0x20
+#define XLOG_INDEX_HINT_BITS_HORIZON	0x30
 
 typedef struct xl_standby_locks
 {
@@ -71,4 +72,10 @@ typedef struct xl_invalidations
 
 #define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
 
+typedef struct xl_index_hint_bits_horizon
+{
+	RelFileNode		rnode;
+	TransactionId	latestRemovedXid;
+} xl_index_hint_bits_horizon;
+
 #endif							/* STANDBYDEFS_H */
