From 53784c818b39d39c6a1f3588f7823a9010074869 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <peter.geoghegan86@gmail.com>
Date: Sun, 22 Feb 2015 17:02:14 -0800
Subject: [PATCH 7/7] Logical decoding support for ON CONFLICT UPDATE

---
 src/backend/replication/logical/decode.c        |  5 +++-
 src/backend/replication/logical/reorderbuffer.c | 35 +++++++++++++++++++++++--
 src/include/replication/reorderbuffer.h         |  3 ++-
 3 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e7614bd..a9cd0df 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -682,7 +682,10 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
-	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	if (!(xlrec->flags & XLOG_HEAP_KILLED_SPECULATIVE_TUPLE))
+		change->action = REORDER_BUFFER_CHANGE_DELETE;
+	else
+		change->action = REORDER_BUFFER_CHANGE_INTERNAL_DELETE;
 
 	memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 20bb3b7..8f9101e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -401,6 +401,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INSERT:
 		case REORDER_BUFFER_CHANGE_UPDATE:
 		case REORDER_BUFFER_CHANGE_DELETE:
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			if (change->data.tp.newtuple)
 			{
 				ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
@@ -1313,7 +1314,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 	PG_TRY();
 	{
+		/*
+		 * To merge super delete and insert records, peek-ahead is occasionally
+		 * required
+		 */
 		ReorderBufferChange *change;
+		ReorderBufferChange *nextchange = NULL;
 
 		if (using_subtxn)
 			BeginInternalSubTransaction("replay");
@@ -1323,16 +1329,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		rb->begin(rb, txn);
 
 		iterstate = ReorderBufferIterTXNInit(rb, txn);
-		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
+		while ((change = nextchange ? nextchange :
+				ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
 		{
 			Relation	relation = NULL;
 			Oid			reloid;
 
+			/* Forget about previous peek ahead */
+			nextchange = NULL;
+
 			switch (change->action)
 			{
 				case REORDER_BUFFER_CHANGE_INSERT:
 				case REORDER_BUFFER_CHANGE_UPDATE:
 				case REORDER_BUFFER_CHANGE_DELETE:
+				case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 					Assert(snapshot_now);
 
 					reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
@@ -1374,7 +1385,23 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						else if (!IsToastRelation(relation))
 						{
 							ReorderBufferToastReplace(rb, txn, relation, change);
-							rb->apply_change(rb, txn, relation, change);
+
+							/*
+							 * Speculative insertion occasionally makes use of
+							 * "super deletion" -- an implementation defined
+							 * delete of a speculatively inserted tuple.
+							 * Neither the super deletion, nor the insertion
+							 * (which must be the prior record type) are
+							 * included in the final assembly.
+							 */
+							if (change->action == REORDER_BUFFER_CHANGE_INSERT)
+								nextchange = ReorderBufferIterTXNNext(rb, iterstate);
+
+							if (change->action !=
+								REORDER_BUFFER_CHANGE_INTERNAL_DELETE &&
+								(!nextchange || nextchange->action !=
+								 REORDER_BUFFER_CHANGE_INTERNAL_DELETE))
+								rb->apply_change(rb, txn, relation, change);
 
 							/*
 							 * Only clear reassembled toast chunks if we're
@@ -2003,6 +2030,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			/* fall through */
 		case REORDER_BUFFER_CHANGE_DELETE:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			{
 				char	   *data;
 				ReorderBufferTupleBuf *oldtup,
@@ -2258,6 +2287,8 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			/* fall through */
 		case REORDER_BUFFER_CHANGE_DELETE:
+			/* fall through */
+		case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
 			if (change->data.tp.newtuple)
 			{
 				Size		len = offsetof(ReorderBufferTupleBuf, t_data) +
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f1e0f57..c1a8819 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -51,7 +51,8 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_DELETE,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
-	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
+	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
+	REORDER_BUFFER_CHANGE_INTERNAL_DELETE
 };
 
 /*
-- 
1.9.1

