>From d86b884c00fbb0eb52523b322c6d4cb83e0e351f Mon Sep 17 00:00:00 2001
From: Abhijit Menon-Sen <ams@2ndQuadrant.com>
Date: Tue, 11 Jun 2013 23:25:26 +0200
Subject: [PATCH 02/17] wal_decoding: Add pg_xlog_wait_remote_{apply,receive}
 functions

We want to use these in isolationtester tests, but they're more
generally useful for "inter-node synchronisation".
---
 src/backend/replication/walsender.c | 73 +++++++++++++++++++++++++++++++++++++
 src/include/catalog/pg_proc.h       |  5 +++
 src/include/replication/walsender.h |  2 +
 3 files changed, 80 insertions(+)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 717cbfd..9f5f766 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2083,3 +2083,76 @@ GetOldestWALSendPointer(void)
 }
 
 #endif
+
+static XLogRecPtr
+text_to_xlogrecptr(text *str)
+{
+	uint32 hi, lo;
+	char *pos = text_to_cstring(str);
+
+	if (sscanf(pos, "%X/%X", &hi, &lo) != 2)
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("could not parse transaction log location \"%s\"",
+                        pos)));
+
+	return ((uint64) hi) << 32 | lo;
+}
+
+static void
+wait_for_remote_lsn(int32 pid, XLogRecPtr ptr, bool wait_for_apply)
+{
+	int i;
+	bool done;
+
+	do {
+		done = true;
+
+		for (i = 0; i < max_wal_senders; i++)
+		{
+			volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+			SpinLockAcquire(&walsnd->mutex);
+
+			if (walsnd->pid != 0 && (pid == 0 || pid == walsnd->pid))
+			{
+				XLogRecPtr rptr = wait_for_apply ? walsnd->apply : walsnd->flush;
+				if (rptr < ptr)
+					done = false;
+			}
+
+			SpinLockRelease(&walsnd->mutex);
+
+			if (!done)
+				break;
+		}
+
+		if (!done)
+			pg_usleep(10*1000);
+	}
+	while (!done);
+}
+
+Datum
+pg_xlog_wait_remote_apply(PG_FUNCTION_ARGS)
+{
+	text *pos = PG_GETARG_TEXT_P(0);
+	int32 pid = PG_GETARG_INT32(1);
+
+	XLogRecPtr startpos = text_to_xlogrecptr(pos);
+	wait_for_remote_lsn(pid, startpos, true);
+
+	PG_RETURN_VOID();
+}
+
+Datum
+pg_xlog_wait_remote_receive(PG_FUNCTION_ARGS)
+{
+	text *pos = PG_GETARG_TEXT_P(0);
+	int32 pid = PG_GETARG_INT32(1);
+
+	XLogRecPtr startpos = text_to_xlogrecptr(pos);
+	wait_for_remote_lsn(pid, startpos, false);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index b5be075..6d3d702 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4722,6 +4722,11 @@ DATA(insert OID = 3473 (  spg_range_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0
 DESCR("SP-GiST support for quad tree over range");
 
 
+DATA(insert OID = 3781 (  pg_xlog_wait_remote_apply PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 23" _null_ _null_ _null_ _null_ pg_xlog_wait_remote_apply _null_ _null_ _null_ ));
+DESCR("wait for an lsn to be applied by a remote node");
+DATA(insert OID = 3782 (  pg_xlog_wait_remote_receive PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 23" _null_ _null_ _null_ _null_ pg_xlog_wait_remote_receive _null_ _null_ _null_ ));
+DESCR("wait for an lsn to be received by a remote node");
+
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects		PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
 DESCR("list objects dropped by the current command");
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2cc7ddf..84a418a 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -35,6 +35,8 @@ extern void WalSndWakeup(void);
 extern void WalSndRqstFileReload(void);
 
 extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
+extern Datum pg_xlog_wait_remote_apply(PG_FUNCTION_ARGS);
+extern Datum pg_xlog_wait_remote_receive(PG_FUNCTION_ARGS);
 
 /*
  * Remember that we want to wakeup walsenders later
-- 
1.8.2.rc2.4.g7799588.dirty

