From 20af21231112a31f9d825d379bf75ce1e0aecf54 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
 feature

A review of the code has showed up two issues fixed by this commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position).
- Never return 0/0 is a slot cannot be advanced.  This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.

Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
 src/backend/replication/slotfuncs.c | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..151151b1a8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -322,15 +322,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 {
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	retlsn = startlsn;
 
-	SpinLockAcquire(&MyReplicationSlot->mutex);
 	if (MyReplicationSlot->data.restart_lsn < moveto)
 	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.restart_lsn = moveto;
+		SpinLockRelease(&MyReplicationSlot->mutex)
 		retlsn = moveto;
 	}
-	SpinLockRelease(&MyReplicationSlot->mutex);
 
 	return retlsn;
 }
@@ -343,7 +343,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	retlsn = startlsn;
 
 	PG_TRY();
 	{
@@ -472,7 +472,11 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true);
 
-	startlsn = MyReplicationSlot->data.confirmed_flush;
+	if (OidIsValid(MyReplicationSlot->data.database))
+		startlsn = MyReplicationSlot->data.confirmed_flush;
+	else
+		startlsn = MyReplicationSlot->data.restart_lsn;
+
 	if (moveto < startlsn)
 	{
 		ReplicationSlotRelease();
-- 
2.17.0

