From 51e9988ff44b7c2b716e3a0da3f1d1c9359a1d79 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@otacoo.com>
Date: Thu, 30 Oct 2014 22:01:10 +0900
Subject: [PATCH 1/2] Refactor code to detect synchronous node in WAL sender
 array

This patch is made to remove code duplication in walsender.c and syncrep.c
in order to detect what is the node with the lowest strictly-positive
priority, facilitating maintenance of this code.
---
 src/backend/replication/syncrep.c   | 97 +++++++++++++++++++++++++++----------
 src/backend/replication/walsender.c | 35 +++----------
 src/include/replication/syncrep.h   |  1 +
 3 files changed, 78 insertions(+), 55 deletions(-)

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..f838ad0 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -357,6 +357,70 @@ SyncRepInitConfig(void)
 	}
 }
 
+
+/*
+ * Obtain position of sync standby in the array referencing all the WAL
+ * senders, or -1 if no sync node can be found. The caller of this function
+ * should take a lock on SyncRepLock. If there are multiple nodes with
+ * the same lowest priority value, the first node found is selected.
+ * sync_priority is a preallocated array of size max_wal_senders that can
+ * be used to retrieve the priority of each WAL sender. Its inclusion in
+ * this function has the advantage to limit the scan of the WAL sender
+ * array to one pass, limiting the amount of cycles SyncRepLock is taken.
+ */
+int
+SyncRepGetSynchronousStandby(int *node_priority)
+{
+	int		sync_node = -1;
+	int		priority = 0;
+	int		i;
+
+	/* Scan WAL senders and find the sync node if any */
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		/* Use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		/* First get the priority of this WAL sender */
+		if (node_priority)
+			node_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
+				0 : walsnd->sync_standby_priority;
+
+		/* Proceed to next if not active */
+		if (walsnd->pid == 0)
+			continue;
+
+		/* Proceed to next if not streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Proceed to next one if asynchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Proceed to next one if priority conditions not satisfied */
+		if (priority != 0 &&
+			priority <= walsnd->sync_standby_priority)
+			continue;
+
+		/* Proceed to next one if flush position is invalid */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * We have a potential sync candidate, choose it as the sync
+		 * node for the time being before going through the other nodes
+		 * listed in the WAL sender array.
+		 */
+		sync_node = i;
+
+		/* Update priority to current value of WAL sender */
+		priority = walsnd->sync_standby_priority;
+	}
+
+	return sync_node;
+}
+
 /*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
@@ -369,10 +433,9 @@ SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
 	volatile WalSnd *syncWalSnd = NULL;
+	int			sync_node;
 	int			numwrite = 0;
 	int			numflush = 0;
-	int			priority = 0;
-	int			i;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,32 +451,14 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
-	 * change pg_stat_get_wal_senders().
+	 * then we use the first mentioned standby.
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+	sync_node = SyncRepGetSynchronousStandby(NULL);
 
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
-		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
-		}
-	}
-
-	/*
-	 * We should have found ourselves at least.
-	 */
-	Assert(syncWalSnd);
+	/* We should have found ourselves at least */
+	Assert(sync_node >= 0 && sync_node < max_wal_senders);
+	syncWalSnd = &WalSndCtl->walsnds[sync_node];
 
 	/*
 	 * If we aren't managing the highest priority standby then just leave.
@@ -444,7 +489,7 @@ SyncRepReleaseWaiters(void)
 
 	elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X",
 		 numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
-	   numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
+		 numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush);
 
 	/*
 	 * If we are managing the highest priority standby, though we weren't
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 385d18b..2d35cc9 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2742,8 +2742,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int		   *sync_priority;
-	int			priority = 0;
-	int			sync_standby = -1;
+	int			sync_standby;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -2774,36 +2773,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	/*
 	 * Get the priorities of sync standbys all in one go, to minimise lock
 	 * acquisitions and to allow us to evaluate who is the current sync
-	 * standby. This code must match the code in SyncRepReleaseWaiters().
+	 * standby.
 	 */
-	sync_priority = palloc(sizeof(int) * max_wal_senders);
+	sync_priority = (int *) palloc(sizeof(int) * max_wal_senders);
 	LWLockAcquire(SyncRepLock, LW_SHARED);
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		/* use volatile pointer to prevent code rearrangement */
-		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
 
-		if (walsnd->pid != 0)
-		{
-			/*
-			 * Treat a standby such as a pg_basebackup background process
-			 * which always returns an invalid flush location, as an
-			 * asynchronous standby.
-			 */
-			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-				0 : walsnd->sync_standby_priority;
-
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
-			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
-			}
-		}
-	}
+	/* Look for sync standby if any */
+	sync_standby = SyncRepGetSynchronousStandby(sync_priority);
 	LWLockRelease(SyncRepLock);
 
 	for (i = 0; i < max_wal_senders; i++)
@@ -2873,6 +2849,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
+
 	pfree(sync_priority);
 
 	/* clean up and return the tuplestore */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..8361c2f 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -49,6 +49,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 
 /* called by various procs */
 extern int	SyncRepWakeQueue(bool all, int mode);
+extern int	SyncRepGetSynchronousStandby(int *sync_priority);
 
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);
-- 
2.1.3

