From c445308ead8aa99419820769fda8bbccc6eeb980 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <peter.geoghegan86@gmail.com>
Date: Sun, 22 Feb 2015 17:02:14 -0800
Subject: [PATCH 4/8] Logical decoding support for ON CONFLICT UPDATE

Transaction commit reassembly now consolidates speculative insertion
related changes.  Using a "peek-ahead", it is determined if a
speculative insertion went on to be super deleted.  If that is not the
case, then it is reported as an ordinary insertion to decoding plugins.
---
 src/backend/replication/logical/decode.c        | 11 +++-
 src/backend/replication/logical/reorderbuffer.c | 83 ++++++++++++++++++++++++-
 src/include/replication/reorderbuffer.h         | 11 +++-
 3 files changed, 100 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e7614bd..3c05866 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -591,7 +591,11 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
-	change->action = REORDER_BUFFER_CHANGE_INSERT;
+	if (!(xlrec->flags & XLOG_HEAP_SPECULATIVE_TUPLE))
+		change->action = REORDER_BUFFER_CHANGE_INSERT;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_INSERT;
+
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
 	if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -682,7 +686,10 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
-	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	if (!(xlrec->flags & XLOG_HEAP_SPECULATIVE_TUPLE))
+		change->action = REORDER_BUFFER_CHANGE_DELETE;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_DELETE;
 
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 20bb3b7..e8ad2ee 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -401,6 +401,8 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INSERT:
 		case REORDER_BUFFER_CHANGE_UPDATE:
 		case REORDER_BUFFER_CHANGE_DELETE:
+		case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			if (change->data.tp.newtuple)
 			{
 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
@@ -1314,6 +1316,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	PG_TRY();
 	{
 		ReorderBufferChange *change;
+		ReorderBufferChange *peekchange = NULL;
 
 		if (using_subtxn)
 			BeginInternalSubTransaction("replay");
@@ -1323,16 +1326,26 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = peekchange ? peekchange :
+				ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
 
+			/* Forget about previous peek ahead */
+			if (peekchange)
+				peekchange = NULL;
+			else
+				Assert(change->action !=
+					   REORDER_BUFFER_CHANGE_INTERNAL_DELETE);
+
 			switch (change->action)
 			{
 				case REORDER_BUFFER_CHANGE_INSERT:
 				case REORDER_BUFFER_CHANGE_UPDATE:
 				case REORDER_BUFFER_CHANGE_DELETE:
+				case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+				case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 					Assert(snapshot_now);
 
 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
@@ -1374,7 +1387,65 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						else if (!IsToastRelation(relation))
 						{
 							ReorderBufferToastReplace(rb, txn, relation, change);
-							rb->apply_change(rb, txn, relation, change);
+
+							/*
+							 * Kludge:  Speculative insertion occasionally
+							 * makes use of "super deletion" -- an
+							 * implementation defined delete of a speculatively
+							 * inserted tuple.  Neither the super deletion, nor
+							 * the insertion (which must be the prior record
+							 * type) are included in the final assembly when
+							 * the tuple was super-deleted.  Otherwise, an
+							 * ordinary insertion is assembled.
+							 */
+							if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_INSERT)
+							{
+								/*
+								 * Need to ensure the memory used by promise
+								 * tuple isn't freed till we're done verifying
+								 * that there is no super deletion that
+								 * immediately follows.  Otherwise it could get
+								 * freed/reused while restoring spooled data
+								 * from disk.
+								 */
+								dlist_delete(&change->node);
+								peekchange = ReorderBufferIterTXNNext(rb, iterstate);
+								if (!peekchange || peekchange->action !=
+									REORDER_BUFFER_CHANGE_INTERNAL_DELETE)
+								{
+									/* Report as proper insert to client */
+									change->action = REORDER_BUFFER_CHANGE_INSERT;
+									rb->apply_change(rb, txn, relation,
+													 change);
+								}
+								else if (peekchange)
+								{
+									Assert(RelFileNodeEquals(change->data.tp.relnode,
+															 peekchange->data.tp.relnode));
+								}
+
+								ReorderBufferReturnChange(rb, change);
+							}
+							else if (change->action ==
+									 REORDER_BUFFER_CHANGE_INTERNAL_DELETE)
+							{
+								/*
+								 * The REORDER_BUFFER_CHANGE_INTERNAL_INSERT
+								 * case makes an assumption that
+								 * REORDER_BUFFER_CHANGE_INTERNAL_DELETE
+								 * changes immediately follows reliably iff a
+								 * speculatively inserted tuple was actually
+								 * super-deleted.
+								 */
+							}
+							else
+							{
+								/*
+								 * Handle non-speculative insertion related
+								 * changes
+								 */
+								rb->apply_change(rb, txn, relation, change);
+							}
 
 							/*
 							 * Only clear reassembled toast chunks if we're
@@ -2003,6 +2074,10 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			/* fall through */
 		case REORDER_BUFFER_CHANGE_DELETE:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			{
 				char	   *data;
 				ReorderBufferTupleBuf *oldtup,
@@ -2258,6 +2333,10 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			/* fall through */
 		case REORDER_BUFFER_CHANGE_DELETE:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			if (change->data.tp.newtuple)
 			{
 				Size		len = offsetof(ReorderBufferTupleBuf, t_data) +
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f1e0f57..d694981 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -43,6 +43,13 @@ typedef struct ReorderBufferTupleBuf
  * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
  * changes. Users of the decoding facilities will never see changes with
  * *_INTERNAL_* actions.
+ *
+ * The REORDER_BUFFER_CHANGE_INTERNAL_INSERT and
+ * REORDER_BUFFER_CHANGE_INTERNAL_DELETE changes concern "super deletion",
+ * which is a mechanism that speculative insertion makes use of to handle
+ * conflicts.  At transaction reassembly these will be consolidated, and so
+ * decoding plugins will only ever handle REORDER_BUFFER_CHANGE_INSERT changes
+ * here too (in the common case where speculative insertion works out).
  */
 enum ReorderBufferChangeType
 {
@@ -51,7 +58,9 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_DELETE,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
-	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
+	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
+	REORDER_BUFFER_CHANGE_INTERNAL_INSERT,
+	REORDER_BUFFER_CHANGE_INTERNAL_DELETE
 };
 
 /*
-- 
1.9.1

