From fd10eea9ddffb0161fb2fb62d6547723d92900c9 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 4 Mar 2016 13:13:44 -0800
Subject: [PATCH 1/3] logical decoding: Tell reorderbuffer about all xids.

Logical decoding's reorderbuffer keeps transactions in an LSN ordered
list for efficiency. To make that's efficiently possible upper-level xid
are forced before nested subtransaction xids.  That only works if these
records are all looked at: Unfortunately we didn't do so for e.g. row
level locks, which are otherwise uninteresting for logical decoding.

This could lead to errors like:
"ERROR: subxact logged without previous toplevel record".

It's not sufficient to just look at row locking records, the xid could
appear first due to a lot of other types of records (which will trigger
the transaction to be marked logged with MarkCurrentTransactionIdLoggedIfAny).
So invent infrastructure to tell reorderbuffer about xids seen, when
they'd otherwise not pass through reorderbuffer.c.

Reported-By: Jarred Ward
Bug: #13844
Discussion: 20160105033249.1087.66040@wrigleys.postgresql.org
Backpatch: 9.4, where logical decoding was added
---
 contrib/test_decoding/Makefile                  |  4 +--
 contrib/test_decoding/expected/xact.out         | 42 +++++++++++++++++++++++++
 contrib/test_decoding/sql/xact.sql              | 22 +++++++++++++
 src/backend/replication/logical/decode.c        | 28 ++++++++++++++++-
 src/backend/replication/logical/reorderbuffer.c | 21 ++++++++-----
 src/backend/replication/logical/snapbuild.c     |  6 +---
 src/include/replication/reorderbuffer.h         |  2 +-
 7 files changed, 108 insertions(+), 17 deletions(-)
 create mode 100644 contrib/test_decoding/expected/xact.out
 create mode 100644 contrib/test_decoding/sql/xact.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 78816bf..200c43e 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,8 +37,8 @@ submake-isolation:
 submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-	binary prepared replorigin time
+REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
+	decoding_into_rel binary prepared replorigin time
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/xact.out b/contrib/test_decoding/expected/xact.out
new file mode 100644
index 0000000..507b701
--- /dev/null
+++ b/contrib/test_decoding/expected/xact.out
@@ -0,0 +1,42 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- bug #13844, xids in non-decoded records need to be inspected
+CREATE TABLE xact_test(data text);
+INSERT INTO xact_test VALUES ('before-test');
+BEGIN;
+-- perform operation in xact that creates and logs xid, but isn't decoded
+SELECT * FROM xact_test FOR UPDATE;
+    data     
+-------------
+ before-test
+(1 row)
+
+SAVEPOINT foo;
+-- and now actually insert in subxact, xid is expected to be known
+INSERT INTO xact_test VALUES ('after-assignment');
+COMMIT;
+-- and now show those changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                             data                              
+---------------------------------------------------------------
+ BEGIN
+ table public.xact_test: INSERT: data[text]:'before-test'
+ COMMIT
+ BEGIN
+ table public.xact_test: INSERT: data[text]:'after-assignment'
+ COMMIT
+(6 rows)
+
+DROP TABLE xact_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/xact.sql b/contrib/test_decoding/sql/xact.sql
new file mode 100644
index 0000000..9ce238f
--- /dev/null
+++ b/contrib/test_decoding/sql/xact.sql
@@ -0,0 +1,22 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- bug #13844, xids in non-decoded records need to be inspected
+CREATE TABLE xact_test(data text);
+INSERT INTO xact_test VALUES ('before-test');
+
+BEGIN;
+-- perform operation in xact that creates and logs xid, but isn't decoded
+SELECT * FROM xact_test FOR UPDATE;
+SAVEPOINT foo;
+-- and now actually insert in subxact, xid is expected to be known
+INSERT INTO xact_test VALUES ('after-assignment');
+COMMIT;
+-- and now show those changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+DROP TABLE xact_test;
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 56be1ed..33e4343 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
  * Take every XLogReadRecord()ed record and perform the actions required to
  * decode it using the output plugin already setup in the logical decoding
  * context.
+ *
+ * NB: Note that every record's xid needs to be processed by reorderbuffer
+ * (xids contained in the content of records are not relevant for this rule).
+ * That means that for records which'd otherwise not go through the
+ * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
+ * call ReorderBufferProcessXid for each record type by default, because
+ * e.g. empty xacts can be handled more efficiently if there's no previous
+ * state for them.
  */
 void
 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
@@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
 		case RM_REPLORIGIN_ID:
+			/* just deal with xid, and done */
+			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+									buf.origptr);
 			break;
 		case RM_NEXT_ID:
 			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
@@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	SnapBuild  *builder = ctx->snapshot_builder;
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
 
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
+							buf->origptr);
+
 	switch (info)
 	{
 			/* this is also used in END_OF_RECOVERY checkpoints */
@@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	XLogReaderState *r = buf->record;
 	uint8		info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
 
-	/* no point in doing anything yet, data could not be decoded anyway */
+	/*
+	 * No point in doing anything yet, data could not be decoded anyway. It's
+	 * ok not to call ReorderBufferProcessXid() in that case, except in the
+	 * assignment case there'll not be any later records with the same xid;
+	 * and in the assignment case we'll not decode those xacts.
+	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
@@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * transactions in the changestream allowing for a kind of
 			 * distributed 2PC.
 			 */
+			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	XLogReaderState *r = buf->record;
 	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
 
+	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
 	switch (info)
 	{
 		case XLOG_RUNNING_XACTS:
@@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	TransactionId xid = XLogRecGetXid(buf->record);
 	SnapBuild  *builder = ctx->snapshot_builder;
 
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
 	/* no point in doing anything yet */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
@@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	TransactionId xid = XLogRecGetXid(buf->record);
 	SnapBuild  *builder = ctx->snapshot_builder;
 
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
 	/* no point in doing anything yet */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 78acced..513fcde 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1741,16 +1741,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
 
 /*
- * Check whether a transaction is already known in this module.xs
+ * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
+ * least once for every xid in XLogRecord->xl_xid (other places in records
+ * may, but do not have to be passed through here).
+ *
+ * Reorderbuffer keeps some datastructures about transactions in LSN order,
+ * for efficiency. To do that it has to know about when transactions are seen
+ * first in the WAL. As many types of records are not actually interesting for
+ * logical decoding, they do not necessarily pass though here.
  */
-bool
-ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
+void
+ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 {
-	ReorderBufferTXN *txn;
-
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-	return txn != NULL;
+	/* many records won't have an xid assigned, centralize check here */
+	if (xid != InvalidTransactionId)
+		ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 }
 
 /*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ed823ec..179b85a 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -635,8 +635,6 @@ SnapBuildClearExportedSnapshot(void)
 bool
 SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 {
-	bool		is_old_tx;
-
 	/*
 	 * We can't handle data in transactions if we haven't built a snapshot
 	 * yet, so don't store them.
@@ -657,9 +655,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 	 * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
 	 * be needed to decode the change we're currently processing.
 	 */
-	is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
-	if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+	if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
 	{
 		/* only build a new snapshot if we don't have a prebuilt one */
 		if (builder->snapshot == NULL)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index d33ea27..b25eae8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -366,7 +366,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
 						 CommandId cmin, CommandId cmax, CommandId combocid);
 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
 							  Size nmsgs, SharedInvalidationMessage *msgs);
-bool		ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid);
+void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
-- 
2.7.0.229.g701fa7f

