From ccc7defbeec7653240c76e6e4dcdb9349b49284d Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Mon, 18 Nov 2019 16:26:33 +0530
Subject: [PATCH v6 02/12] Issue individual invalidations with
 wal_level=logical

When wal_level=logical, write individual invalidations into WAL so
that decoding can use this information.

We still add the invalidations to the cache, and write them to WAL
at commit time in RecordTransactionCommit(). This uses the existing
XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource
manager (see LogStandbyInvalidations for details).

So existing code relying on those invalidations (e.g. redo) does not
need to be changed.

The individual invalidations are written are written using a new
xlog record type XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource
manager. See LogLogicalInvalidations for details.

These new xlog records are ignored by existing redo procedures,
which still rely on the invalidations written to commit records.

The invalidations are decoded and added as a new ReorderBufferChange
type (REORDER_BUFFER_CHANGE_INVALIDATION), and then executed during
replay, unlike the existing invalidations (which are either decoded
as part of commit record, or executed immediately during decoding
and not added to reorderbuffer at all).

LogStandbyInvalidations was accumulating all the invalidations in
memory, and then only wrote them once at commit time, which may
reduce the performance impact by amortizing the overhead and
deduplicating the invalidations.

The new invalidations are written to WAL immediately, without any
such caching. Perhaps it would be possible to add similar caching,
e.g. at the command level, or something like that?
---
 src/backend/access/rmgrdesc/xactdesc.c        | 50 +++++++++++++
 src/backend/access/transam/xact.c             |  7 ++
 src/backend/replication/logical/decode.c      | 23 ++++++
 .../replication/logical/reorderbuffer.c       | 55 ++++++++++++--
 src/backend/utils/cache/inval.c               | 75 +++++++++++++++++++
 src/include/access/xact.h                     | 18 ++++-
 src/include/replication/reorderbuffer.h       | 14 ++++
 7 files changed, 234 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e388cc714a..6e46d19168 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -20,6 +20,11 @@
 #include "storage/standbydefs.h"
 #include "utils/timestamp.h"
 
+static void xact_desc_invalidations(StringInfo buf,
+						int nmsgs, SharedInvalidationMessage *msgs,
+						Oid dbId, Oid tsId,
+						bool relcacheInitFileInval);
+
 /*
  * Parse the WAL format of an xact commit and abort records into an easier to
  * understand format.
@@ -397,6 +402,14 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_INVALIDATIONS)
+	{
+		xl_xact_invalidations *xlrec = (xl_xact_invalidations *) rec;
+
+		xact_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+								xlrec->dbId, xlrec->tsId,
+								xlrec->relcacheInitFileInval);
+	}
 }
 
 const char *
@@ -424,7 +437,44 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_INVALIDATIONS:
+			id = "INVALIDATION";
+			break;
 	}
 
 	return id;
 }
+
+static void
+xact_desc_invalidations(StringInfo buf,
+						int nmsgs, SharedInvalidationMessage *msgs,
+						Oid dbId, Oid tsId,
+						bool relcacheInitFileInval)
+{
+	int			i;
+
+	if (relcacheInitFileInval)
+		appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
+						 dbId, tsId);
+
+	appendStringInfoString(buf, "; inval msgs:");
+	for (i = 0; i < nmsgs; i++)
+	{
+		SharedInvalidationMessage *msg = &msgs[i];
+
+		if (msg->id >= 0)
+			appendStringInfo(buf, " catcache %d", msg->id);
+		else if (msg->id == SHAREDINVALCATALOG_ID)
+			appendStringInfo(buf, " catalog %u", msg->cat.catId);
+		else if (msg->id == SHAREDINVALRELCACHE_ID)
+			appendStringInfo(buf, " relcache %u", msg->rc.relId);
+		/* not expected, but print something anyway */
+		else if (msg->id == SHAREDINVALSMGR_ID)
+			appendStringInfoString(buf, " smgr");
+		/* not expected, but print something anyway */
+		else if (msg->id == SHAREDINVALRELMAP_ID)
+			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
+		else
+			appendStringInfo(buf, " unrecognized id %d", msg->id);
+	}
+}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 51557e2951..dd3d36ffb1 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -6001,6 +6001,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_INVALIDATIONS)
+	{
+		/*
+		 * XXX we do ignore this for now, what matters are invalidations
+		 * written into the commit record.
+		 */
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index a99fcaf0a1..13a11ac782 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -281,6 +281,29 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			}
 		case XLOG_XACT_ASSIGNMENT:
 			break;
+		case XLOG_XACT_INVALIDATIONS:
+			{
+				TransactionId xid;
+				xl_xact_invalidations *invals;
+
+				xid = XLogRecGetXid(r);
+				invals = (xl_xact_invalidations *) XLogRecGetData(r);
+
+				/* XXX for now we're issuing invalidations one by one */
+				Assert(invals->nmsgs == 1);
+
+				if (!TransactionIdIsValid(xid))
+					break;
+
+				ReorderBufferAddInvalidation(reorder, xid, buf->origptr,
+											 invals->dbId, invals->tsId,
+											 invals->relcacheInitFileInval,
+											 invals->msgs[0]);
+
+
+				ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+			}
+			break;
 		case XLOG_XACT_PREPARE:
 
 			/*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index bbd908af05..531897cf05 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -473,6 +473,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			break;
 	}
 
@@ -1822,17 +1823,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 						TeardownHistoricSnapshot(false);
 						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
-
-						/*
-						 * Every time the CommandId is incremented, we could
-						 * see new catalog contents, so execute all
-						 * invalidations.
-						 */
-						ReorderBufferExecuteInvalidations(rb, txn);
 					}
 
 					break;
 
+				case REORDER_BUFFER_CHANGE_INVALIDATION:
+
+					/*
+					 * Execute the invalidation message locally.
+					 *
+					 * XXX Do we need to care about relcacheInitFileInval and
+					 * the other fields added to ReorderBufferChange, or just
+					 * about the message itself?
+					 */
+					LocalExecuteInvalidationMessage(&change->data.inval.msg);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
@@ -2225,6 +2231,38 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 	txn->ntuplecids++;
 }
 
+/*
+ * Setup the invalidation of the toplevel transaction.
+ */
+void
+ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid,
+							 XLogRecPtr lsn,
+							 Oid dbId, Oid tsId, bool relcacheInitFileInval,
+							 SharedInvalidationMessage msg)
+{
+	MemoryContext oldcontext;
+	ReorderBufferChange *change;
+
+	/* XXX Should we even write invalidations without valid XID? */
+	if (xid == InvalidTransactionId)
+		return;
+
+	Assert(xid != InvalidTransactionId);
+
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+	change->data.inval.dbId = dbId;
+	change->data.inval.tsId = tsId;
+	change->data.inval.relcacheInitFileInval = relcacheInitFileInval;
+	change->data.inval.msg = msg;
+
+	ReorderBufferQueueChange(rb, xid, lsn, change);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Setup the invalidation of the toplevel transaction.
  *
@@ -2674,6 +2712,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
@@ -2770,6 +2809,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
@@ -3055,6 +3095,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			break;
 	}
 
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 591dd33be6..e0d04b9850 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -85,6 +85,12 @@
  *	worth trying to avoid sending such inval traffic in the future, if those
  *	problems can be overcome cheaply.
  *
+ *	When wal_level=logical, write individual invalidations into WAL to support
+ *	the decoding of the in-progress transaction.  As of now it was enough to
+ *	log invalidation only at commit because we are only decoding the transaction
+ *	at the commit time.   We only need to log the catalog cache and relcache
+ *	invalidation.  There can not be any active MVCC scan in logical decoding so
+ *	we don't need to log the snapshot invalidation.
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -104,6 +110,7 @@
 #include "catalog/pg_constraint.h"
 #include "miscadmin.h"
 #include "storage/sinval.h"
+#include "storage/standby.h"
 #include "storage/smgr.h"
 #include "utils/catcache.h"
 #include "utils/inval.h"
@@ -210,6 +217,9 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static void LogLogicalInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+						bool relcacheInitFileInval);
+
 /* ----------------------------------------------------------------
  *				Invalidation list support functions
  *
@@ -489,6 +499,18 @@ RegisterCatcacheInvalidation(int cacheId,
 {
 	AddCatcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
 								   cacheId, hashValue, dbId);
+
+	/* Issue an invalidation WAL record (when wal_level=logical) */
+	if (XLogLogicalInfoActive())
+	{
+		SharedInvalidationMessage msg;
+
+		msg.cc.id = (int8) cacheId;
+		msg.cc.dbId = dbId;
+		msg.cc.hashValue = hashValue;
+
+		LogLogicalInvalidations(1, &msg, false);
+	}
 }
 
 /*
@@ -501,6 +523,18 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId)
 {
 	AddCatalogInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
 								  dbId, catId);
+
+	/* Issue an invalidation WAL record (when wal_level=logical) */
+	if (XLogLogicalInfoActive())
+	{
+		SharedInvalidationMessage msg;
+
+		msg.cat.id = SHAREDINVALCATALOG_ID;
+		msg.cat.dbId = dbId;
+		msg.cat.catId = catId;
+
+		LogLogicalInvalidations(1, &msg, false);
+	}
 }
 
 /*
@@ -511,6 +545,8 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId)
 static void
 RegisterRelcacheInvalidation(Oid dbId, Oid relId)
 {
+	bool		RelcacheInitFileInval = false;
+
 	AddRelcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs,
 								   dbId, relId);
 
@@ -529,7 +565,22 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId)
 	 * as well.  Also zap when we are invalidating whole relcache.
 	 */
 	if (relId == InvalidOid || RelationIdIsInInitFile(relId))
+	{
 		transInvalInfo->RelcacheInitFileInval = true;
+		RelcacheInitFileInval = true;
+	}
+
+	/* Issue an invalidation WAL record (when wal_level=logical) */
+	if (XLogLogicalInfoActive())
+	{
+		SharedInvalidationMessage msg;
+
+		msg.rc.id = SHAREDINVALRELCACHE_ID;
+		msg.rc.dbId = dbId;
+		msg.rc.relId = relId;
+
+		LogLogicalInvalidations(1, &msg, RelcacheInitFileInval);
+	}
 }
 
 /*
@@ -1501,3 +1552,27 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 		i = ccitem->link - 1;
 	}
 }
+
+/*
+ * Emit WAL for invalidations.
+ */
+static void
+LogLogicalInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+						bool relcacheInitFileInval)
+{
+	xl_xact_invalidations xlrec;
+
+	/* prepare record */
+	memset(&xlrec, 0, sizeof(xlrec));
+	xlrec.dbId = MyDatabaseId;
+	xlrec.tsId = MyDatabaseTableSpace;
+	xlrec.relcacheInitFileInval = relcacheInitFileInval;
+	xlrec.nmsgs = nmsgs;
+
+	/* perform insertion */
+	XLogBeginInsert();
+	XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvalidations);
+	XLogRegisterData((char *) msgs,
+					 nmsgs * sizeof(SharedInvalidationMessage));
+	XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
+}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 8645b3816c..6f2a5831ee 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_INVALIDATIONS		0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -197,6 +197,22 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
+/*
+ * Invalidations logged with wal_level=logical.
+ *
+ * XXX Currently nmsgs=1 but that might change in the future.
+ */
+typedef struct xl_xact_invalidations
+{
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+	bool		relcacheInitFileInval;	/* invalidate relcache init file */
+	int			nmsgs;			/* number of shared inval msgs */
+	SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+}			xl_xact_invalidations;
+
+#define MinSizeOfXactInvalidations offsetof(xl_xact_invalidations, msgs)
+
 /*
  * Commit and abort records can contain a lot of information. But a large
  * portion of the records won't need all possible pieces of information. So we
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4f6c65d6f4..fa41115db9 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -57,6 +57,7 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
 	REORDER_BUFFER_CHANGE_MESSAGE,
+	REORDER_BUFFER_CHANGE_INVALIDATION,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -149,6 +150,16 @@ typedef struct ReorderBufferChange
 			CommandId	cmax;
 			CommandId	combocid;
 		}			tuplecid;
+
+		/* Invalidation. */
+		struct
+		{
+			Oid			dbId;	/* MyDatabaseId */
+			Oid			tsId;	/* MyDatabaseTableSpace */
+			bool		relcacheInitFileInval;	/* invalidate relcache init
+												 * file */
+			SharedInvalidationMessage msg;	/* invalidation message */
+		}			inval;
 	}			data;
 
 	/*
@@ -458,6 +469,9 @@ void		ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr ls
 void		ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
 										 RelFileNode node, ItemPointerData pt,
 										 CommandId cmin, CommandId cmax, CommandId combocid);
+void ReorderBufferAddInvalidation(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+								  Oid dbId, Oid tsId, bool relcacheInitFileInval,
+								  SharedInvalidationMessage msg);
 void		ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
 										  Size nmsgs, SharedInvalidationMessage *msgs);
 void		ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
-- 
2.20.1

