From ae7bfc6848143185d13adaa5532c13e9f3d730ca Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Fri, 10 Jan 2020 09:01:35 +0530
Subject: [PATCH v6 06/12] Fix speculative insert bug.

---
 .../replication/logical/reorderbuffer.c       | 23 +++++++++++++++----
 src/include/replication/reorderbuffer.h       |  6 +++++
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 50341a6d9e..8e4744f73a 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1701,6 +1701,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		ReorderBufferChange *change;
 		ReorderBufferChange *specinsert = NULL;
 
+		/*
+		 * Resotre any previous speculative inserted tuple if we are running in
+		 * streaming mode.
+		 */
+		if (streaming && txn->specinsert != NULL)
+		{
+			specinsert = txn->specinsert;
+			txn->specinsert = NULL;
+		}
+
 		if (using_subtxn)
 			BeginInternalSubTransaction("stream");
 		else
@@ -2029,13 +2039,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		}
 
 		/*
-		 * There's a speculative insertion remaining, just clean in up, it
-		 * can't have been successful, otherwise we'd gotten a confirmation
-		 * record.
+		 * In non-streaming mode if there's a speculative insertion remaining,
+		 * just clean in up, it can't have been successful, otherwise we'd
+		 * gotten a confirmation record.  For streaming mode, remember the tuple
+		 * so that if we get the confirmation in the next stream we can stream
+		 * it.
 		 */
 		if (specinsert)
 		{
-			ReorderBufferReturnChange(rb, specinsert);
+			if (streaming)
+				txn->specinsert = specinsert;
+			else
+				ReorderBufferReturnChange(rb, specinsert);
 			specinsert = NULL;
 		}
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 629eeca7f6..0510d3831f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -343,6 +343,12 @@ typedef struct ReorderBufferTXN
 	uint32		ninvalidations;
 	SharedInvalidationMessage *invalidations;
 
+	/*
+	 * Speculative insert saved from the last streamed run if the speculative
+	 * confirm has not received in the same stream.
+	 */
+	ReorderBufferChange *specinsert;
+
 	/* ---
 	 * Position in one of three lists:
 	 * * list of subtransactions if we are *known* to be subxact
-- 
2.20.1

