From 98ffc5a3705f3cf99f958c696541896825fb2a01 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
 feature

A review of the code has showed up a couple of issues fixed by this
commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position, which
was correct).
- The actual slot update was incorrect for both physical and logical
slots.  Physical slots need to use their restart_lsn as base comparison
point (confirmed_flush was used because of previous point), and logical
slots need to begin reading WAL from restart_lsn (confirmed_flush was
used as well), while confirmed_flush is compiled depending on the
decoding context and record read.
- Never return 0/0 is a slot cannot be advanced.  This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.  Instead return the
LSN the slot has been advanced to.  With repetitive calls, the same
position is returned.

Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held, so fix that on the way.

Author: Michael Paquier
Reviewed-by: Peter Jelinek, Simon Riggs

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru
---
 src/backend/replication/slotfuncs.c | 50 +++++++++++++++++++++--------
 1 file changed, 36 insertions(+), 14 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..63cada0786 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 /*
  * Helper function for advancing physical replication slot forward.
+ * The LSN position to move to is compared simply to the slot's
+ * restart_lsn, knowing that any position older than that would be
+ * removed by successive checkpoints.
  */
 static XLogRecPtr
-pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_physical_replication_slot_advance(XLogRecPtr moveto)
 {
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
+	XLogRecPtr	retlsn = startlsn;
 
-	SpinLockAcquire(&MyReplicationSlot->mutex);
-	if (MyReplicationSlot->data.restart_lsn < moveto)
+	if (startlsn < moveto)
 	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.restart_lsn = moveto;
+		SpinLockRelease(&MyReplicationSlot->mutex);
 		retlsn = moveto;
 	}
-	SpinLockRelease(&MyReplicationSlot->mutex);
 
 	return retlsn;
 }
 
 /*
  * Helper function for advancing logical replication slot forward.
+ * The slot's restart_lsn is used as start point for reading records,
+ * while confirmed_lsn is used as base point for the decoding context.
+ * The LSN position to move to is checked by doing a per-record scan and
+ * logical decoding which makes sure that confirmed_lsn is updated to a
+ * LSN which allows the future slot consumer to get consistent logical
+ * changes.
  */
 static XLogRecPtr
-pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
+pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	startlsn = MyReplicationSlot->data.restart_lsn;
+	XLogRecPtr	retlsn = startlsn;
 
 	PG_TRY();
 	{
@@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 			if (record != NULL)
 				LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-			/* check limits */
+			/* Stop once the moving point wanted by caller has been reached */
 			if (moveto <= ctx->reader->EndRecPtr)
 				break;
 
@@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	Name		slotname = PG_GETARG_NAME(0);
 	XLogRecPtr	moveto = PG_GETARG_LSN(1);
 	XLogRecPtr	endlsn;
-	XLogRecPtr	startlsn;
+	XLogRecPtr	minlsn;
 	TupleDesc	tupdesc;
 	Datum		values[2];
 	bool		nulls[2];
@@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true);
 
-	startlsn = MyReplicationSlot->data.confirmed_flush;
-	if (moveto < startlsn)
+	/*
+	 * Check if the slot is not moving backwards.  Physical slots rely simply
+	 * on restart_lsn as a minimum point, while logical slots have confirmed
+	 * consumption up to confirmed_lsn, meaning that in both cases data older
+	 * than that is not available anymore.
+	 */
+	if (OidIsValid(MyReplicationSlot->data.database))
+		minlsn = MyReplicationSlot->data.confirmed_flush;
+	else
+		minlsn = MyReplicationSlot->data.restart_lsn;
+
+	if (moveto < minlsn)
 	{
 		ReplicationSlotRelease();
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot move slot to %X/%X, minimum is %X/%X",
 						(uint32) (moveto >> 32), (uint32) moveto,
-						(uint32) (startlsn >> 32), (uint32) startlsn)));
+						(uint32) (minlsn >> 32), (uint32) minlsn)));
 	}
 
+	/* Do the actual slot update, depending on the slot type */
 	if (OidIsValid(MyReplicationSlot->data.database))
-		endlsn = pg_logical_replication_slot_advance(startlsn, moveto);
+		endlsn = pg_logical_replication_slot_advance(moveto);
 	else
-		endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+		endlsn = pg_physical_replication_slot_advance(moveto);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
-- 
2.17.1

