From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
 xslot data

While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.

The code paths involved in those problems concern the WAL sender,
logical decoding initialization and WAL reservation for slots.

A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com
---
 src/backend/replication/logical/logical.c | 13 +++++++++----
 src/backend/replication/slot.c            |  4 ++++
 src/backend/replication/walsender.c       | 14 +++++++++++---
 src/include/replication/slot.h            | 10 ++++++++++
 4 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
 
+	SpinLockAcquire(&slot->mutex);
 	slot->effective_catalog_xmin = xmin_horizon;
 	slot->data.catalog_xmin = xmin_horizon;
 	if (need_full_snapshot)
 		slot->effective_xmin = xmin_horizon;
+	SpinLockRelease(&slot->mutex);
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -445,13 +447,14 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	XLogRecPtr	startptr;
+	ReplicationSlot *slot = ctx->slot;
 
 	/* Initialize from where to start reading WAL. */
-	startptr = ctx->slot->data.restart_lsn;
+	startptr = slot->data.restart_lsn;
 
 	elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
-		 (uint32) (ctx->slot->data.restart_lsn >> 32),
-		 (uint32) ctx->slot->data.restart_lsn);
+		 (uint32) (slot->data.restart_lsn >> 32),
+		 (uint32) slot->data.restart_lsn);
 
 	/* Wait for a consistent starting point */
 	for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockAcquire(&slot->mutex);
+	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockRelease(&slot->mutex);
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
 			XLogRecPtr	flushptr;
 
 			/* start at current insert position */
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
+			SpinLockRelease(&slot->mutex);
 
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
 		}
 		else
 		{
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetRedoRecPtr();
+			SpinLockRelease(&slot->mutex);
 		}
 
 		/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
-	StringInfoData buf;
+	StringInfoData	buf;
+	XLogRecPtr		restart_lsn;
+	XLogRecPtr		confirmed_lsn;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 												 WalSndWriteData,
 												 WalSndUpdateProgress);
 
+	/* Fetch all needed values from the slot */
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	restart_lsn = MyReplicationSlot->data.restart_lsn;
+	confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
 	/* Start reading WAL from the oldest required WAL. */
-	logical_startptr = MyReplicationSlot->data.restart_lsn;
+	logical_startptr = restart_lsn;
 
 	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
-	sentPtr = MyReplicationSlot->data.confirmed_flush;
+	sentPtr = confirmed_lsn;
 
 	/* Also update the sent position status in shared memory */
 	SpinLockAcquire(&MyWalSnd->mutex);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..6fa9df5553 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData
 
 /*
  * Shared memory state of a single replication slot.
+ *
+ * The data included in this structure, including the contents within
+ * ReplicationSlotPersistentData, are protected by mutex when read from
+ * other backends than the one registering the slot as in_use.  If the
+ * slot is not marked as in_use, then no code paths refer or should refer
+ * to the in-memory data of a slot.
+ *
+ * Note that a slot is switched as in_use only with
+ * ReplicationSlotControlLock held in exclusive mode, protecting from any
+ * while readers have to hold this lock at least in shared mode.
  */
 typedef struct ReplicationSlot
 {
-- 
2.17.0

