From a89d06fbca4b350192ad7895796783bf50a17757 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v6 2/5] Add info in WAL records in preparation for logical
 slot conflict handling.

When a WAL replay on standby indicates that a catalog table tuple is
to be deleted by an xid that is greater than a logical slot's
catalog_xmin, then that means the slot's catalog_xmin conflicts with
the xid, and we need to handle the conflict.  While subsequent commits
will do the actual conflict handling, this commit adds a new field
onCatalogTable in such WAL records, that is true for catalog tables,
so as to arrange for conflict handling.

Andres Freund.
---
 src/backend/access/gist/gist.c          |  2 +-
 src/backend/access/gist/gistbuild.c     |  2 +-
 src/backend/access/gist/gistutil.c      |  4 ++--
 src/backend/access/gist/gistxlog.c      |  4 +++-
 src/backend/access/hash/hashinsert.c    |  2 ++
 src/backend/access/heap/heapam.c        | 10 +++++++---
 src/backend/access/heap/vacuumlazy.c    |  2 +-
 src/backend/access/heap/visibilitymap.c |  2 +-
 src/backend/access/nbtree/nbtpage.c     |  4 ++++
 src/backend/access/spgist/spgvacuum.c   |  8 ++++++++
 src/backend/utils/cache/lsyscache.c     | 16 ++++++++++++++++
 src/include/access/gist_private.h       |  6 +++---
 src/include/access/gistxlog.h           |  3 ++-
 src/include/access/hash_xlog.h          |  1 +
 src/include/access/heapam_xlog.h        |  8 ++++++--
 src/include/access/nbtxlog.h            |  2 ++
 src/include/access/spgxlog.h            |  1 +
 src/include/utils/lsyscache.h           |  1 +
 src/include/utils/rel.h                 |  9 +++++++++
 19 files changed, 71 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 90c46e86a1..43ce813a50 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -342,7 +342,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
 		for (; ptr; ptr = ptr->next)
 		{
 			/* Allocate new page */
-			ptr->buffer = gistNewBuffer(rel);
+			ptr->buffer = gistNewBuffer(heapRel, rel);
 			GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
 			ptr->page = BufferGetPage(ptr->buffer);
 			ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 671b5e9186..c4c5f41a2a 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -171,7 +171,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.giststate->tempCxt = createTempGistContext();
 
 	/* initialize the root page */
-	buffer = gistNewBuffer(index);
+	buffer = gistNewBuffer(heap, index);
 	Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
 	page = BufferGetPage(buffer);
 
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index dd975b164c..62b4e9e8db 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -806,7 +806,7 @@ gistcheckpage(Relation rel, Buffer buf)
  * Caller is responsible for initializing the page by calling GISTInitBuffer
  */
 Buffer
-gistNewBuffer(Relation r)
+gistNewBuffer(Relation heapRel, Relation r)
 {
 	Buffer		buffer;
 	bool		needLock;
@@ -850,7 +850,7 @@ gistNewBuffer(Relation r)
 				 * page's deleteXid.
 				 */
 				if (XLogStandbyInfoActive() && RelationNeedsWAL(r))
-					gistXLogPageReuse(r, blkno, GistPageGetDeleteXid(page));
+					gistXLogPageReuse(heapRel, r, blkno, GistPageGetDeleteXid(page));
 
 				return buffer;
 			}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index d3f3a7b803..90bc4895b2 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -596,7 +596,8 @@ gistXLogPageDelete(Buffer buffer, FullTransactionId xid,
  * Write XLOG record about reuse of a deleted page.
  */
 void
-gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemovedXid)
+gistXLogPageReuse(Relation heapRel, Relation rel,
+				  BlockNumber blkno, FullTransactionId latestRemovedXid)
 {
 	gistxlogPageReuse xlrec_reuse;
 
@@ -607,6 +608,7 @@ gistXLogPageReuse(Relation rel, BlockNumber blkno, FullTransactionId latestRemov
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = RelationIsAccessibleInLogicalDecoding(heapRel);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedFullXid = latestRemovedXid;
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 2ebe671967..e6cef08330 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -17,6 +17,7 @@
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "catalog/catalog.h"
 #include "miscadmin.h"
 #include "storage/buf_internals.h"
 #include "storage/lwlock.h"
@@ -398,6 +399,7 @@ _hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
 			xl_hash_vacuum_one_page xlrec;
 			XLogRecPtr	recptr;
 
+			xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(hrel);
 			xlrec.latestRemovedXid = latestRemovedXid;
 			xlrec.ntuples = ndeletable;
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 29694b8aa4..bf6bfe69dc 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7145,12 +7145,13 @@ heap_compute_xid_horizon_for_tuples(Relation rel,
  * see comments for vacuum_log_cleanup_info().
  */
 XLogRecPtr
-log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid)
+log_heap_cleanup_info(Relation rel, TransactionId latestRemovedXid)
 {
 	xl_heap_cleanup_info xlrec;
 	XLogRecPtr	recptr;
 
-	xlrec.node = rnode;
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
+	xlrec.node = rel->rd_node;
 	xlrec.latestRemovedXid = latestRemovedXid;
 
 	XLogBeginInsert();
@@ -7186,6 +7187,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	/* Caller should not call me on a non-WAL-logged relation */
 	Assert(RelationNeedsWAL(reln));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
 	xlrec.ndead = ndead;
@@ -7236,6 +7238,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(reln);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.ntuples = ntuples;
 
@@ -7266,7 +7269,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
  * heap_buffer, if necessary.
  */
 XLogRecPtr
-log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
+log_heap_visible(Relation rel, Buffer heap_buffer, Buffer vm_buffer,
 				 TransactionId cutoff_xid, uint8 vmflags)
 {
 	xl_heap_visible xlrec;
@@ -7276,6 +7279,7 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
 	Assert(BufferIsValid(heap_buffer));
 	Assert(BufferIsValid(vm_buffer));
 
+	xlrec.onCatalogTable = RelationIsAccessibleInLogicalDecoding(rel);
 	xlrec.cutoff_xid = cutoff_xid;
 	xlrec.flags = vmflags;
 	XLogBeginInsert();
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 03c43efc32..b1014ef7b9 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -659,7 +659,7 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
 	 * No need to write the record at all unless it contains a valid value
 	 */
 	if (TransactionIdIsValid(vacrelstats->latestRemovedXid))
-		(void) log_heap_cleanup_info(rel->rd_node, vacrelstats->latestRemovedXid);
+		(void) log_heap_cleanup_info(rel, vacrelstats->latestRemovedXid);
 }
 
 /*
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 0a51678c40..89f8955f36 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -282,7 +282,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 			if (XLogRecPtrIsInvalid(recptr))
 			{
 				Assert(!InRecovery);
-				recptr = log_heap_visible(rel->rd_node, heapBuf, vmBuf,
+				recptr = log_heap_visible(rel, heapBuf, vmBuf,
 										  cutoff_xid, flags);
 
 				/*
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 39b8f17f4b..22ea714382 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -32,6 +32,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
 
 static BTMetaPageData *_bt_getmeta(Relation rel, Buffer metabuf);
@@ -760,6 +761,7 @@ _bt_log_reuse_page(Relation rel, BlockNumber blkno, TransactionId latestRemovedX
 	 */
 
 	/* XLOG stuff */
+	xlrec_reuse.onCatalogTable = get_rel_logical_catalog(rel->rd_index->indrelid);
 	xlrec_reuse.node = rel->rd_node;
 	xlrec_reuse.block = blkno;
 	xlrec_reuse.latestRemovedXid = latestRemovedXid;
@@ -1210,6 +1212,8 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 		XLogRecPtr	recptr;
 		xl_btree_delete xlrec_delete;
 
+		xlrec_delete.onCatalogTable =
+			RelationIsAccessibleInLogicalDecoding(heapRel);
 		xlrec_delete.latestRemovedXid = latestRemovedXid;
 		xlrec_delete.ndeleted = ndeletable;
 
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index bd98707f3c..f97597dba1 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -27,6 +27,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
 
 
 /* Entry in pending-list of TIDs we need to revisit */
@@ -502,6 +503,13 @@ vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
 	OffsetNumber itemnos[MaxIndexTuplesPerPage];
 	spgxlogVacuumRedirect xlrec;
 
+	/*
+	 * There is no chance of endless recursion even when we are doing catalog
+	 * acceses here; because, spgist is never used for catalogs. Check
+	 * comments in RelationIsAccessibleInLogicalDecoding().
+	 */
+	xlrec.onCatalogTable = get_rel_logical_catalog(index->rd_index->indrelid);
+
 	xlrec.nToPlaceholder = 0;
 	xlrec.newestRedirectXid = InvalidTransactionId;
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 27bbb58f56..e663523af1 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -18,7 +18,9 @@
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
+#include "access/table.h"
 #include "bootstrap/bootstrap.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amop.h"
@@ -1919,6 +1921,20 @@ get_rel_persistence(Oid relid)
 	return result;
 }
 
+bool
+get_rel_logical_catalog(Oid relid)
+{
+	bool	res;
+	Relation rel;
+
+	/* assume previously locked */
+	rel = table_open(relid, NoLock);
+	res = RelationIsAccessibleInLogicalDecoding(rel);
+	table_close(rel, NoLock);
+
+	return res;
+}
+
 
 /*				---------- TRANSFORM CACHE ----------						 */
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 18f2b0d98e..eff9696b18 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -439,8 +439,8 @@ extern XLogRecPtr gistXLogPageDelete(Buffer buffer,
 									 FullTransactionId xid, Buffer parentBuffer,
 									 OffsetNumber downlinkOffset);
 
-extern void gistXLogPageReuse(Relation rel, BlockNumber blkno,
-							  FullTransactionId latestRemovedXid);
+extern void gistXLogPageReuse(Relation heapRel, Relation rel,
+				  BlockNumber blkno, FullTransactionId latestRemovedXid);
 
 extern XLogRecPtr gistXLogUpdate(Buffer buffer,
 								 OffsetNumber *todelete, int ntodelete,
@@ -478,7 +478,7 @@ extern bool gistproperty(Oid index_oid, int attno,
 extern bool gistfitpage(IndexTuple *itvec, int len);
 extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete, Size freespace);
 extern void gistcheckpage(Relation rel, Buffer buf);
-extern Buffer gistNewBuffer(Relation r);
+extern Buffer gistNewBuffer(Relation heapRel, Relation r);
 extern bool gistPageRecyclable(Page page);
 extern void gistfillbuffer(Page page, IndexTuple *itup, int len,
 						   OffsetNumber off);
diff --git a/src/include/access/gistxlog.h b/src/include/access/gistxlog.h
index 55fc843d3a..390b8b65f6 100644
--- a/src/include/access/gistxlog.h
+++ b/src/include/access/gistxlog.h
@@ -48,9 +48,9 @@ typedef struct gistxlogPageUpdate
  */
 typedef struct gistxlogDelete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		ntodelete;		/* number of deleted offsets */
-
 	/*
 	 * In payload of blk 0 : todelete OffsetNumbers
 	 */
@@ -96,6 +96,7 @@ typedef struct gistxlogPageDelete
  */
 typedef struct gistxlogPageReuse
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	FullTransactionId latestRemovedFullXid;
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index d1aa6daa40..b44892cc48 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -250,6 +250,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	int			ntuples;
 
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 95d18cdb12..5edeb99808 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -237,6 +237,7 @@ typedef struct xl_heap_update
  */
 typedef struct xl_heap_clean
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint16		nredirected;
 	uint16		ndead;
@@ -252,6 +253,7 @@ typedef struct xl_heap_clean
  */
 typedef struct xl_heap_cleanup_info
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	TransactionId latestRemovedXid;
 } xl_heap_cleanup_info;
@@ -332,6 +334,7 @@ typedef struct xl_heap_freeze_tuple
  */
 typedef struct xl_heap_freeze_page
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint16		ntuples;
 } xl_heap_freeze_page;
@@ -346,6 +349,7 @@ typedef struct xl_heap_freeze_page
  */
 typedef struct xl_heap_visible
 {
+	bool		onCatalogTable;
 	TransactionId cutoff_xid;
 	uint8		flags;
 } xl_heap_visible;
@@ -395,7 +399,7 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
-extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
+extern XLogRecPtr log_heap_cleanup_info(Relation rel,
 										TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
 								 OffsetNumber *redirected, int nredirected,
@@ -414,7 +418,7 @@ extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple,
 									  bool *totally_frozen);
 extern void heap_execute_freeze_tuple(HeapTupleHeader tuple,
 									  xl_heap_freeze_tuple *xlrec_tp);
-extern XLogRecPtr log_heap_visible(RelFileNode rnode, Buffer heap_buffer,
+extern XLogRecPtr log_heap_visible(Relation rel, Buffer heap_buffer,
 								   Buffer vm_buffer, TransactionId cutoff_xid, uint8 flags);
 
 #endif							/* HEAPAM_XLOG_H */
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 347976c532..fe69dabca0 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -186,6 +186,7 @@ typedef struct xl_btree_dedup
  */
 typedef struct xl_btree_delete
 {
+	bool		onCatalogTable;
 	TransactionId latestRemovedXid;
 	uint32		ndeleted;
 
@@ -203,6 +204,7 @@ typedef struct xl_btree_delete
  */
 typedef struct xl_btree_reuse_page
 {
+	bool		onCatalogTable;
 	RelFileNode node;
 	BlockNumber block;
 	TransactionId latestRemovedXid;
diff --git a/src/include/access/spgxlog.h b/src/include/access/spgxlog.h
index 63d3c63db2..83121c7266 100644
--- a/src/include/access/spgxlog.h
+++ b/src/include/access/spgxlog.h
@@ -237,6 +237,7 @@ typedef struct spgxlogVacuumRoot
 
 typedef struct spgxlogVacuumRedirect
 {
+	bool		onCatalogTable;
 	uint16		nToPlaceholder; /* number of redirects to make placeholders */
 	OffsetNumber firstPlaceholder;	/* first placeholder tuple to remove */
 	TransactionId newestRedirectXid;	/* newest XID of removed redirects */
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 4e646c55e9..ab0d16c1f9 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -132,6 +132,7 @@ extern char get_rel_relkind(Oid relid);
 extern bool get_rel_relispartition(Oid relid);
 extern Oid	get_rel_tablespace(Oid relid);
 extern char get_rel_persistence(Oid relid);
+extern bool get_rel_logical_catalog(Oid relid);
 extern Oid	get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
 extern Oid	get_transform_tosql(Oid typid, Oid langid, List *trftypes);
 extern bool get_typisdefined(Oid typid);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 44ed04dd3f..b6dc5dfc7d 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -16,6 +16,7 @@
 
 #include "access/tupdesc.h"
 #include "access/xlog.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
 #include "catalog/pg_publication.h"
@@ -316,6 +317,9 @@ typedef struct StdRdOptions
  * RelationIsUsedAsCatalogTable
  *		Returns whether the relation should be treated as a catalog table
  *		from the pov of logical decoding.  Note multiple eval of argument!
+ *		This definition should not invoke anything that performs catalog
+ *		access; otherwise it may cause infinite recursion. Check the comments
+ *		in RelationIsAccessibleInLogicalDecoding() for details.
  */
 #define RelationIsUsedAsCatalogTable(relation)	\
 	((relation)->rd_options && \
@@ -580,6 +584,11 @@ typedef struct ViewOptions
  * RelationIsAccessibleInLogicalDecoding
  *		True if we need to log enough information to have access via
  *		decoding snapshot.
+ *		This definition should not invoke anything that performs catalog
+ *		access. Otherwise, e.g. logging a WAL entry for catalog relation may
+ *		invoke this function, which will in turn do catalog access, which may
+ *		in turn cause another similar WAL entry to be logged, leading to
+ *		infinite recursion.
  */
 #define RelationIsAccessibleInLogicalDecoding(relation) \
 	(XLogLogicalInfoActive() && \
-- 
2.20.1

