diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 094d0c9..68649f0 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -56,6 +56,7 @@
 #include "pg_trace.h"
 #include "pgstat.h"
 #include "replication/walsender.h"
+#include "replication/syncrep.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
@@ -2027,6 +2028,14 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	MyProc->inCommit = false;
 
 	END_CRIT_SECTION();
+
+	/*
+	 * Wait for synchronous replication, if required.
+	 *
+	 * Note that at this stage we have marked clog, but still show as
+	 * running in the procarray and continue to hold locks.
+	 */
+	SyncRepWaitForLSN(recptr);
 }
 
 /*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 942d5c2..69c1fbc 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -37,6 +37,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "replication/walsender.h"
+#include "replication/syncrep.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
@@ -1118,6 +1119,14 @@ RecordTransactionCommit(void)
 	/* Compute latestXid while we have the child XIDs handy */
 	latestXid = TransactionIdLatest(xid, nchildren, children);
 
+	/*
+	 * Wait for synchronous replication, if required.
+	 *
+	 * Note that at this stage we have marked clog, but still show as
+	 * running in the procarray and continue to hold locks.
+	 */
+	SyncRepWaitForLSN(XactLastRecEnd);
+
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd.xrecoff = 0;
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ddf7d79..6d989bb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -41,6 +41,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
+#include "replication/syncrep.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -152,6 +153,9 @@ HotStandbyState standbyState = STANDBY_DISABLED;
 
 static XLogRecPtr LastRec;
 
+/* The end of the next block of WAL received */
+static XLogRecPtr receivedUpto = {0, 0};
+
 /*
  * Local copy of SharedRecoveryInProgress variable. True actually means "not
  * known, need to check the shared state".
@@ -5251,6 +5255,42 @@ readRecoveryCommandFile(void)
 					(errmsg("primary_conninfo = '%s'",
 							PrimaryConnInfo)));
 		}
+		else if (strcmp(tok1, "synchronous_replication_service") == 0)
+		{
+			if (strcmp(tok2, "recv") == 0)
+			{
+				sync_rep_service = SYNC_REP_RECV;
+				ereport(DEBUG2,
+					(errmsg("synchronous_replication_service = '%s'",
+								"recv")));
+			}
+			else if (strcmp(tok2, "fsync") == 0)
+			{
+				sync_rep_service = SYNC_REP_FSYNC;
+				ereport(DEBUG2,
+					(errmsg("synchronous_replication_service = '%s'",
+								"fsync")));
+			}
+			else if (strcmp(tok2, "apply") == 0)
+			{
+				sync_rep_service = SYNC_REP_APPLY;
+				ereport(DEBUG2,
+					(errmsg("synchronous_replication_service = '%s'",
+								"apply")));
+			}
+			else if (strcmp(tok2, "async") == 0)
+			{
+				sync_rep_service = SYNC_REP_ASYNC;
+				ereport(DEBUG2,
+					(errmsg("synchronous_replication_service = '%s'",
+								"async")));
+			}
+			else
+				ereport(FATAL,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("invalid value for parameter \"%s\": \"%s\"",
+						"synchronous_replication_service", tok2)));
+		}
 		else if (strcmp(tok1, "trigger_file") == 0)
 		{
 			TriggerFile = pstrdup(tok2);
@@ -5282,6 +5322,12 @@ readRecoveryCommandFile(void)
 					(errmsg("recovery command file \"%s\" specified neither primary_conninfo nor restore_command",
 							RECOVERY_COMMAND_FILE),
 					 errhint("The database server will regularly poll the pg_xlog subdirectory to check for files placed there.")));
+
+		if (PrimaryConnInfo == NULL && sync_rep_service != SYNC_REP_ASYNC)
+			ereport(WARNING,
+					(errmsg("recovery command file \"%s\" specified synchronous_replication_service yet streaming was not requested",
+							RECOVERY_COMMAND_FILE),
+					 errhint("Specify primary_conninfo to allow synchronous replication.")));
 	}
 	else
 	{
@@ -6239,6 +6285,16 @@ StartupXLOG(void)
 				xlogctl->recoveryLastRecPtr = EndRecPtr;
 				SpinLockRelease(&xlogctl->info_lck);
 
+				/*
+				 * If we are offering sync_rep_service then signal WALReceiver
+				 * when we reach end of next chunk.  This allows an otherwise
+				 * quiet WALReceiver to wake when its time to reply with latest
+				 * state.
+				 */
+				if (sync_rep_service == SYNC_REP_APPLY &&
+					XLByteLE(receivedUpto, EndRecPtr))
+					WalRcvWakeup(AUX_PROC_WAL_RECEIVER);
+
 				LastRec = ReadRecPtr;
 
 				record = ReadRecord(NULL, LOG, false);
@@ -8820,22 +8876,39 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
 }
 
 /*
+ * Get latest redo apply position.
+ *
+ * Exported to allow WALReceiver to read the pointer directly.
+ */
+XLogRecPtr
+GetXLogReplayRecPtr(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile XLogCtlData *xlogctl = XLogCtl;
+	XLogRecPtr	recptr;
+
+	SpinLockAcquire(&xlogctl->info_lck);
+	recptr = xlogctl->recoveryLastRecPtr;
+	SpinLockRelease(&xlogctl->info_lck);
+
+	return recptr;
+}
+
+/*
  * Report the last WAL replay location (same format as pg_start_backup etc)
  *
  * This is useful for determining how much of WAL is visible to read-only
  * connections during recovery.
+ *
+ * PATCH: required refactoring to separate out GetXLogReplayRecPtr()
  */
 Datum
 pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
 {
-	/* use volatile pointer to prevent code rearrangement */
-	volatile XLogCtlData *xlogctl = XLogCtl;
 	XLogRecPtr	recptr;
 	char		location[MAXFNAMELEN];
 
-	SpinLockAcquire(&xlogctl->info_lck);
-	recptr = xlogctl->recoveryLastRecPtr;
-	SpinLockRelease(&xlogctl->info_lck);
+	recptr = GetXLogReplayRecPtr();
 
 	if (recptr.xlogid == 0 && recptr.xrecoff == 0)
 		PG_RETURN_NULL();
@@ -9232,6 +9305,8 @@ StartupProcessMain(void)
 
 	StartupXLOG();
 
+	WalRcvDisownLatch();
+
 	/*
 	 * Exit normally. Exit code 0 tells postmaster that we completed recovery
 	 * successfully.
@@ -9264,7 +9339,6 @@ static bool
 XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
 			 bool randAccess)
 {
-	static XLogRecPtr receivedUpto = {0, 0};
 	bool		switched_segment = false;
 	uint32		targetPageOff;
 	uint32		targetRecOff;
@@ -9403,10 +9477,9 @@ retry:
 						goto triggered;
 
 					/*
-					 * When streaming is active, we want to react quickly when
-					 * the next WAL record arrives, so sleep only a bit.
+					 * Wait until someone wakes us with new data to apply.
 					 */
-					pg_usleep(100000L); /* 100ms */
+					WalRcvWaitLatch();
 				}
 				else
 				{
@@ -9465,9 +9538,28 @@ retry:
 						 */
 						if (PrimaryConnInfo)
 						{
-							RequestXLogStreaming(
-									  fetching_ckpt ? RedoStartLSN : *RecPtr,
-												 PrimaryConnInfo);
+							WalRcvConnOptions connopts;
+
+							WalRcvOwnLatch(AUX_PROC_STARTUP);
+
+							/*
+							 * Set connection options structure
+							 *
+							 * PATCH: refactor to use an options structure,
+							 * rather than simply extend the APIs each time.
+							 * Now we can send sync rep mode into shmem, so it
+							 * can be read by WALReceiver and sent to master.
+							 */
+							strlcpy((char *) connopts.conninfo, PrimaryConnInfo, MAXCONNINFO);
+							if (fetching_ckpt)
+								connopts.startpoint = RedoStartLSN;
+							else
+								connopts.startpoint = *RecPtr;
+
+							connopts.sync_rep_service =
+										sync_rep_service;
+
+							RequestXLogStreaming(&connopts);
 							continue;
 						}
 					}
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 20314a7..a20b9db 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -422,7 +422,6 @@ AuxiliaryProcessMain(int argc, char *argv[])
 
 		case WalWriterProcess:
 			/* don't set signals, walwriter has its own agenda */
-			InitXLOGAccess();
 			WalWriterMain();
 			proc_exit(1);		/* should never return */
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index db61569..fdde3d1 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2884,6 +2884,9 @@ PostmasterStateMachine(void)
 				signal_child(StartupPID, SIGTERM);
 			if (WalReceiverPID != 0)
 				signal_child(WalReceiverPID, SIGTERM);
+			if (WalWriterPID != 0)
+				signal_child(WalWriterPID, SIGTERM);
+
 			pmState = PM_WAIT_BACKENDS;
 		}
 	}
@@ -4227,6 +4230,16 @@ sigusr1_handler(SIGNAL_ARGS)
 	{
 		/* Startup Process wants us to start the walreceiver process. */
 		WalReceiverPID = StartWalReceiver();
+
+		/*
+		 * Also start the WALWriter to help do most of the fsyncing
+		 * for the WALReceiver. This is unconditional, since it is
+		 * helpful whether or not we are offering a sync replication
+		 * service from this standby. If WALReceiver is starting for
+		 * second or subsequent time then WALWriter will already exist.
+		 */
+		if (WalWriterPID == 0)
+			WalWriterPID = StartWalWriter();
 	}
 
 	PG_SETMASK(&UnBlockSig);
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 1debe92..3a6e664 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -49,6 +49,7 @@
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "postmaster/walwriter.h"
+#include "replication/walreceiver.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -90,6 +91,9 @@ WalWriterMain(void)
 	sigjmp_buf	local_sigjmp_buf;
 	MemoryContext walwriter_context;
 
+	elog(DEBUG2, "WAL writer starting");
+	WalRcvOwnLatch(AUX_PROC_WAL_WRITER);
+
 	/*
 	 * If possible, make this process a group leader, so that the postmaster
 	 * can signal any child processes too.	(walwriter probably never has any
@@ -241,28 +245,54 @@ WalWriterMain(void)
 		if (shutdown_requested)
 		{
 			/* Normal exit from the walwriter is here */
+			WalRcvDisownLatch();
 			proc_exit(0);		/* done */
 		}
 
 		/*
-		 * Do what we're here for...
-		 */
-		XLogBackgroundFlush();
-
-		/*
-		 * Delay until time to do something more, but fall out of delay
-		 * reasonably quickly if signaled.
+		 * If we startup during recovery it will be because streaming
+		 * replication is enabled, so we expect a WALreceiver process
+		 * to interact with. Should the WALreceiver process terminate
+		 * then we don't have anything to do, so waiting is OK. A new
+		 * WALreceiver may eventually start, in which case we will
+		 * begin working again.
+		 *
+		 * If we startup after recovery then this call will perform
+		 * InitXLOGAccess() for us, so no need to do that before now.
 		 */
-		udelay = WalWriterDelay * 1000L;
-		while (udelay > 999999L)
+		if (RecoveryInProgress())
+		{
+			/*
+			 * Flush the WAL already written by the WAL Receiver or wait.
+			 *
+			 * XXX on shutdown on standby we must flush up to date
+			 * before we close.
+			 */
+			if (!XLogWalRcvBackgroundFlush())
+				WalRcvWaitLatch();
+		}
+		else
 		{
-			if (got_SIGHUP || shutdown_requested)
-				break;
-			pg_usleep(1000000L);
-			udelay -= 1000000L;
+			/*
+			 * Flush the WAL as a background task during normal running.
+			 */
+			XLogBackgroundFlush();
+
+			/*
+			 * Delay until time to do something more, but fall out of delay
+			 * reasonably quickly if signaled.
+			 */
+			udelay = WalWriterDelay * 1000L;
+			while (udelay > 999999L)
+			{
+				if (got_SIGHUP || shutdown_requested)
+					break;
+				pg_usleep(1000000L);
+				udelay -= 1000000L;
+			}
+			if (!(got_SIGHUP || shutdown_requested))
+				pg_usleep(udelay);
 		}
-		if (!(got_SIGHUP || shutdown_requested))
-			pg_usleep(udelay);
 	}
 }
 
@@ -309,6 +339,7 @@ static void
 WalSigHupHandler(SIGNAL_ARGS)
 {
 	got_SIGHUP = true;
+	WalRcvWakeup(AUX_PROC_WAL_WRITER);
 }
 
 /* SIGTERM: set flag to exit normally */
@@ -316,4 +347,5 @@ static void
 WalShutdownHandler(SIGNAL_ARGS)
 {
 	shutdown_requested = true;
+	WalRcvWakeup(AUX_PROC_WAL_WRITER);
 }
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 113dc3f..b2b8bcc 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/replication
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = walsender.o walreceiverfuncs.o walreceiver.o
+OBJS = walsender.o walreceiverfuncs.o walreceiver.o syncrep.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 38a96c6..7349d3b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -47,9 +47,10 @@ static bool justconnected = false;
 static char *recvBuf = NULL;
 
 /* Prototypes for interface functions */
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_connect(WalRcvConnOptions *connopts);
 static bool libpqrcv_receive(int timeout, unsigned char *type,
 				 char **buffer, int *len);
+static bool libpqrcv_send(char *buffer, int len);
 static void libpqrcv_disconnect(void);
 
 /* Prototypes for private functions */
@@ -68,14 +69,17 @@ _PG_init(void)
 		elog(ERROR, "libpqwalreceiver already loaded");
 	walrcv_connect = libpqrcv_connect;
 	walrcv_receive = libpqrcv_receive;
+	walrcv_send = libpqrcv_send;
 	walrcv_disconnect = libpqrcv_disconnect;
 }
 
 /*
  * Establish the connection to the primary server for XLOG streaming
+ *
+ * In 9.1 we accept options in a structure, to allow easier extension.
  */
 static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+libpqrcv_connect(WalRcvConnOptions *connopts)
 {
 	char		conninfo_repl[MAXCONNINFO + 37];
 	char	   *primary_sysid;
@@ -92,7 +96,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
 	 */
 	snprintf(conninfo_repl, sizeof(conninfo_repl),
 			 "%s dbname=replication replication=true",
-			 conninfo);
+			 connopts->conninfo);
 
 	streamConn = PQconnectdb(conninfo_repl);
 	if (PQstatus(streamConn) != CONNECTION_OK)
@@ -153,9 +157,20 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
 						primary_tli, standby_tli)));
 	ThisTimeLineID = primary_tli;
 
+	/*
+	 * If we want to offer sync rep then enable duplex copy. If it fails, error out.
+	 */
+	sync_rep_service = connopts->sync_rep_service;
+	if (StandbyOffersSyncRepService())
+	{
+		if (PQsetDuplexCopy(streamConn, connopts->sync_rep_service) < 1)
+			ereport(ERROR,
+					(errmsg("unable to establish duplex copy path")));
+	}
+
 	/* Start streaming from the point requested by startup process */
 	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
-			 startpoint.xlogid, startpoint.xrecoff);
+			 connopts->startpoint.xlogid, connopts->startpoint.xrecoff);
 	res = libpqrcv_PQexec(cmd);
 	if (PQresultStatus(res) != PGRES_COPY_OUT)
 	{
@@ -398,3 +413,25 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
 
 	return true;
 }
+
+/*
+ * Send a message back to the primary. The user-visible libpq
+ * knows that we sent an 'x' message to the primary server via
+ * PQsetDuplexCopy(), so it will let us send a COPY IN message
+ * while it's still in COPY OUT state.
+ */
+static bool
+libpqrcv_send(char *buffer, int len)
+{
+	int	result;
+
+	result = PQputCopyData(streamConn, buffer, len);
+	if (result < 0)
+		ereport(ERROR,
+				(errmsg("could not send data to primary: %s",
+						PQerrorMessage(streamConn))));
+	PQflush(streamConn);
+
+	/* We don't care about sending in async mode. */
+	return (result >= 0);
+}
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
new file mode 100644
index 0000000..109fc43
--- /dev/null
+++ b/src/backend/replication/syncrep.c
@@ -0,0 +1,437 @@
+/*-------------------------------------------------------------------------
+ *
+ * syncrep.c
+ *
+ * Synchronous replication is new as of PostgreSQL 9.1.
+ *
+ * Transaction commits wait while their commit LSN is confirmed
+ * received, fsynced or applied to a standby, as requested.
+ *
+ * This module contains the code for waiting on the master and also
+ * for the release mechanism. The core streaming replication transport
+ * remains within WALreceiver and WALsender modules. Sync rep requires
+ * that the standby replies to the master. This requires the addition
+ * of a duplex mode to the COPY protocol, allowing a replication
+ * session to communicate in both directions, see PQsetDuplexCopy(),
+ * with some changes to the initial handshake in libpqwalreceiver.c
+ * as well as logic to send up the back channel.
+ *
+ * The essence of this design is that it isolates all logic about
+ * waiting/releasing onto the master. The master is aware of which
+ * standby servers offer a synchronisation service. The standby is
+ * completely unaware of the durability requirements of transactions
+ * on the master, reducing the complexity of the code and streamlining
+ * both standby operations and network bandwidth because there is no
+ * requirement to ship per-transaction state information.
+ *
+ * None of the code in this module executes on any of the standbys.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "access/xact.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "replication/syncrep.h"
+#include "replication/walsender.h"
+#include "storage/latch.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "utils/guc.h"
+#include "utils/guc_tables.h"
+#include "utils/memutils.h"
+
+LogstreamState SyncRepReplyFromStandby;
+
+/* User-settable parameters for sync rep */
+int			sync_rep_mode = SYNC_REP_ASYNC;		/* Only set in user backends */
+int			sync_rep_timeout = 30000;			/* Only set in user backends */
+int			sync_rep_service = SYNC_REP_ASYNC;	/* Not set in user backends */
+
+#define SYNC_REP_NOT_ON_QUEUE 	-1
+#define	IsOnSyncRepQueue()		(current_queue > SYNC_REP_NOT_ON_QUEUE)
+/*
+ * Queue identifier of the queue on which user backend currently waits.
+ */
+static int current_queue = SYNC_REP_NOT_ON_QUEUE;
+
+const struct config_enum_entry sync_rep_options[] = {
+	{"recv",  SYNC_REP_RECV,  false},
+	{"fsync", SYNC_REP_FSYNC, false},
+	{"apply", SYNC_REP_APPLY, false},
+	{"async", SYNC_REP_ASYNC, false},
+	{NULL, 0, false}
+};
+
+static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN, int qid);
+static void SyncRepRemoveFromQueue(void);
+static void SyncRepAddToQueue(SyncRepQueue *queue);
+static int SyncRepGetSyncMode(int sync_rep_mode);
+static int SyncRepGetWaitTimeout(void);
+
+static void SyncRepWakeFromQueue(int wait_queue, XLogRecPtr lsn);
+
+
+/*
+ * ===========================================================
+ * Synchronous Replication functions for normal user backends
+ * ===========================================================
+ */
+
+/*
+ * Wait for synchronous replication, if any requested by user.
+ */
+extern void
+SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+{
+	int	sync_mode = sync_rep_mode;
+
+	/*
+	 * Fast exit if user has requested async replication, or
+	 * streaming replication is inactive in this server.
+	 */
+	if (max_wal_senders == 0 ||
+		sync_rep_mode == SYNC_REP_ASYNC)
+		return;
+
+	/*
+	 * Make sure we don't sleep while holding an LWlock.
+	 * We still hold normal locks which isn't a problem.
+	 */
+	Assert(LWNumLocksHeldByMe() == 0);
+
+	Assert(sync_rep_mode == SYNC_REP_RECV ||
+			sync_rep_mode == SYNC_REP_FSYNC ||
+			sync_rep_mode == SYNC_REP_APPLY);
+
+	/*
+	 * Assign a wait mode to this request, possibly
+	 * overriding user's requested level in some cases.
+	 * That usually means downgrading the request if no
+	 * wal sender is active that offers that service level.
+	 */
+	sync_mode = SyncRepGetSyncMode(sync_rep_mode);
+
+	/*
+	 * Perform the wait here, then drop through and exit.
+	 */
+	SyncRepWaitOnQueue(XactCommitLSN, sync_mode);
+}
+
+/*
+ * Wait for specified LSN to be confirmed at the requested level
+ * of durability. Each proc has its own wait latch, so we perform
+ * a normal latch check/wait loop here.
+ */
+static void
+SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN, int qid)
+{
+	volatile WalSndCtlData *walsndctl = WalSndCtl;
+	SyncRepQueue *queue = (SyncRepQueue *) &(walsndctl->sync_rep_queue[qid]);
+
+	for (;;)
+	{
+		ResetLatch(&MyProc->waitLatch);
+
+		/*
+		 * Check the LSN on our queue. First time through this is
+		 * unlikely to be far enough, but next time we are woken we
+		 * will hopefully be successful.
+		 * If our wait is over, remove ourselves from queue.
+		 */
+		SpinLockAcquire(&queue->mutex);
+
+		if (XLByteLE(XactCommitLSN, queue->LSN))
+		{
+			/*
+			 * Our LSN is confirmed on at least one standby, so lets get off
+			 * the queue and get back to work.
+			 */
+			SyncRepRemoveFromQueue();
+			SpinLockRelease(&queue->mutex);
+
+			/* XXX Do we need to unset the latch? */
+			return;
+		}
+		else if (current_queue < 0)
+		{
+			SyncRepAddToQueue(queue);
+			current_queue = qid; /* Remember which queue we're on */
+
+			/*
+			 * Set our waitLSN so WALSender will know when to wake us
+			 */
+			MyProc->waitLSN = XactCommitLSN;
+			SpinLockRelease(&queue->mutex);
+		}
+
+		WaitLatch(&MyProc->waitLatch, 1000000L * SyncRepGetWaitTimeout());
+
+#ifdef NOT_USED
+		/* Retest timeout, if so do this */
+		{
+
+			/*
+			 * Our response to the timeout is to simply post a NOTICE and
+			 * then return to the user. The commit has happened, we just
+			 * haven't been able to verify it has been replicated to the
+			 * level requested.
+			 *
+			 * XXX We could check here to see if our LSN has been sent to
+			 * another standby that offers a lower level of service. That
+			 * could be true if we had, for example, requested 'apply'
+			 * with two standbys, one at 'apply' and one at 'recv' and the
+			 * apply standby has just gone down. Something for the weekend.
+			 */
+			ereport(NOTICE, (errmsg("synchronous replication timeout reached")));
+			break;
+		}
+#endif
+	}
+}
+
+/*
+ * Remove myself from sync rep wait queue.
+ *
+ * Assume on queue at start; will not be on queue at end.
+ * Queue is already locked at start and remains locked on exit.
+ *
+ * XXX Implements design pattern "Reinvent Wheel", think about changing
+ */
+void
+SyncRepRemoveFromQueue(void)
+{
+	volatile WalSndCtlData *walsndctl = WalSndCtl;
+	SyncRepQueue *queue = (SyncRepQueue *) &(walsndctl->sync_rep_queue[current_queue]);
+	PGPROC	*head = queue->head;
+
+	Assert(IsOnSyncRepQueue());
+
+	if (head == MyProc)
+	{
+		if (MyProc->lwWaitLink == NULL)
+		{
+			/*
+			 * We were the only waiter on the queue. Reset head and tail.
+			 */
+			Assert(queue->tail == MyProc);
+			queue->head = NULL;
+			queue->tail = NULL;
+		}
+		else
+			/*
+			 * Move head to next proc on the queue.
+			 */
+			queue->head = MyProc->lwWaitLink;
+	}
+	else
+	{
+		PGPROC	*proc = head;
+
+		while (proc->lwWaitLink != NULL)
+		{
+			/* Are we the next proc in our traversal of the queue? */
+			if (proc->lwWaitLink == MyProc)
+			{
+				if (MyProc->lwWaitLink == NULL)
+				{
+					/* Remove ourselves from tail of queue */
+					Assert(queue->tail == MyProc);
+					queue->tail = proc;
+					proc->lwWaitLink = NULL;
+				}
+				else
+				{
+					/*
+					 * Remove ourselves from middle of queue.
+					 * No need to touch head or tail.
+					 */
+					proc->lwWaitLink = MyProc->lwWaitLink;
+				}
+			}
+
+			if (proc->lwWaitLink == NULL)
+				elog(FATAL, "could not locate ourselves on wait queue");
+			proc = proc->lwWaitLink;
+		}
+	}
+	current_queue = SYNC_REP_NOT_ON_QUEUE;
+}
+
+/*
+ * Add myself to sync rep wait queue.
+ *
+ * Assume not on queue at start; will be on queue at end.
+ * Queue is already locked at start and remains locked on exit.
+ */
+static void
+SyncRepAddToQueue(SyncRepQueue *queue)
+{
+	PGPROC	*tail = queue->tail;
+
+	/*
+	 * Add myself to tail of wait queue.
+	 */
+	if (tail == NULL)
+	{
+		queue->head = MyProc;
+		queue->tail = MyProc;
+	}
+	else
+	{
+		/*
+		 * XXX extra code needed here to maintain sorted invariant.
+		 * Our approach should be same as racing car - slow in, fast out.
+		 */
+		Assert(tail->lwWaitLink == NULL);
+		tail->lwWaitLink = MyProc;
+	}
+	queue->tail = MyProc;
+	Assert(MyProc->lwWaitLink == NULL);	/* to be sure */
+}
+
+static int
+SyncRepGetSyncMode(int sync_rep_mode)
+{
+	int i;
+	int max_sync_rep_mode = SYNC_REP_ASYNC;
+
+	/*
+	 * Is there a walsender that offers the requested level of service?
+	 */
+	for (i = 0; i < max_wal_senders; i++)
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+		SpinLockAcquire(&walsnd->mutex);
+
+		if (walsnd->pid != 0)
+		{
+			SpinLockRelease(&walsnd->mutex);
+			continue;
+		}
+		else
+		{
+			/*
+			 * Found an active wal sender. If it has the service level
+			 * we require, return quickly.
+			 */
+			if (walsnd->sync_rep_service >= sync_rep_mode)
+			{
+				SpinLockRelease(&walsnd->mutex);
+				return sync_rep_mode;
+			}
+			else if (walsnd->sync_rep_service >= max_sync_rep_mode)
+			{
+				max_sync_rep_mode = walsnd->sync_rep_service;
+			}
+			SpinLockRelease(&walsnd->mutex);
+		}
+	}
+
+	/*
+	 * We accept a reduced service level of whatever the highest
+	 * available mode from any wal sender.
+	 */
+	return max_sync_rep_mode;
+}
+
+/*
+ * Allows more complex decision making about what the wait time should be.
+ */
+static int
+SyncRepGetWaitTimeout(void)
+{
+	return sync_rep_timeout;
+}
+
+void
+SyncRepCleanupAtProcExit(int code, Datum arg)
+{
+/*
+	volatile WalSndCtlData *walsndctl = WalSndCtl;
+	SyncRepQueue *queue = (SyncRepQueue *) &(walsndctl->sync_rep_queue[current_queue]);
+
+	if (IsOnSyncRepQueue())
+	{
+		SpinLockAcquire(&queue->mutex);
+		SyncRepRemoveFromQueue();
+		SpinLockRelease(&queue->mutex);
+	}
+*/
+	if (MyProc != NULL && MyProc->ownLatch)
+	{
+		DisownLatch(&MyProc->waitLatch);
+		MyProc->ownLatch = false;
+	}
+}
+
+/*
+ * ===========================================================
+ * Synchronous Replication functions for wal sender processes
+ * ===========================================================
+ */
+
+/*
+ * Update the LSNs on each queue based upon our latest state. This
+ * implements a simple policy of first-valid-standby-releases-waiter.
+ *
+ * Other policies are possible, which would change what we do here and what
+ * perhaps also which information we store as well.
+ */
+void
+SyncRepReleaseWaiters(void)
+{
+	volatile WalSndCtlData *walsndctl = WalSndCtl;
+	SyncRepQueue *queue = (SyncRepQueue *) &(walsndctl->sync_rep_queue[current_queue]);
+	XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODES];
+	int mode;
+
+	lsn[SYNC_REP_RECV]  = SyncRepReplyFromStandby.Write;
+	lsn[SYNC_REP_FSYNC] = SyncRepReplyFromStandby.Flush;
+	lsn[SYNC_REP_APPLY] = SyncRepReplyFromStandby.Apply;
+
+	/*
+	 * Only maintain LSNs for queues for which we advertise a service.
+	 * This is important to ensure that we only wakeup users when a
+	 * perferred standby has reached the required LSN.
+	 */
+	for (mode = SYNC_REP_RECV; mode < sync_rep_service; mode++)
+	{
+		SpinLockAcquire(&queue->mutex);
+		queue->LSN = lsn[mode];
+		SyncRepWakeFromQueue(mode, lsn[mode]);
+		SpinLockRelease(&queue->mutex);
+	}
+}
+
+/*
+ * Walk queue from head setting the latches of any procs that need
+ * to be woken. We don't modify the queue, we leave that for individual
+ * procs to release themselves.
+ */
+static void
+SyncRepWakeFromQueue(int wait_queue, XLogRecPtr lsn)
+{
+	volatile WalSndCtlData *walsndctl = WalSndCtl;
+	PGPROC	*proc = walsndctl->sync_rep_queue[wait_queue].head;
+
+	if (proc == NULL)
+		return;
+
+	while (XLByteLE(proc->waitLSN, lsn))
+	{
+		SetLatch(&proc->waitLatch);
+		if (proc->lwWaitLink != NULL)
+			proc = proc->lwWaitLink;
+	}
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b868707..8dc48f8 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,9 +57,13 @@ bool		am_walreceiver;
 /* libpqreceiver hooks to these when loaded */
 walrcv_connect_type walrcv_connect = NULL;
 walrcv_receive_type walrcv_receive = NULL;
+walrcv_send_type walrcv_send = NULL;
 walrcv_disconnect_type walrcv_disconnect = NULL;
 
-#define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
+/* Latch handling */
+static int my_auxproc_id = -1;
+
+#define NAPTIME_PER_CYCLE 100000L	/* max sleep time between cycles (100ms) */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -81,11 +85,8 @@ static volatile sig_atomic_t got_SIGTERM = false;
  * LogstreamResult indicates the byte positions that we have already
  * written/fsynced.
  */
-static struct
-{
-	XLogRecPtr	Write;			/* last byte + 1 written out in the standby */
-	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
-}	LogstreamResult;
+static LogstreamState LogstreamResult;
+
 
 /*
  * About SIGTERM handling:
@@ -113,6 +114,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
+static bool XLogWalRcvSendReply(void);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -157,8 +159,7 @@ DisableWalRcvImmediateExit(void)
 void
 WalReceiverMain(void)
 {
-	char		conninfo[MAXCONNINFO];
-	XLogRecPtr	startpoint;
+	WalRcvConnOptions connopts;
 
 	/* use volatile pointer to prevent code rearrangement */
 	volatile WalRcvData *walrcv = WalRcv;
@@ -203,10 +204,13 @@ WalReceiverMain(void)
 	/* Advertise our PID so that the startup process can kill us */
 	walrcv->pid = MyProcPid;
 	walrcv->walRcvState = WALRCV_RUNNING;
+	elog(DEBUG2, "WALreceiver starting");
+	WalRcvOwnLatch(AUX_PROC_WAL_RECEIVER); /* Run before signals enabled */
 
 	/* Fetch information required to start streaming */
-	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
-	startpoint = walrcv->receivedUpto;
+	strlcpy(connopts.conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+	connopts.startpoint = walrcv->receivedUpto;
+	connopts.sync_rep_service = walrcv->sync_rep_service;
 	SpinLockRelease(&walrcv->mutex);
 
 	/* Arrange to clean up at walreceiver exit */
@@ -247,7 +251,7 @@ WalReceiverMain(void)
 	/* Load the libpq-specific functions */
 	load_file("libpqwalreceiver", false);
 	if (walrcv_connect == NULL || walrcv_receive == NULL ||
-		walrcv_disconnect == NULL)
+		walrcv_send == NULL || walrcv_disconnect == NULL)
 		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
 
 	/*
@@ -261,7 +265,7 @@ WalReceiverMain(void)
 
 	/* Establish the connection to the primary for XLOG streaming */
 	EnableWalRcvImmediateExit();
-	walrcv_connect(conninfo, startpoint);
+	walrcv_connect(&connopts);
 	DisableWalRcvImmediateExit();
 
 	/* Loop until end-of-streaming or error */
@@ -298,18 +302,31 @@ WalReceiverMain(void)
 		/* Wait a while for data to arrive */
 		if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
 		{
+			bool received_all = false;
+			bool applied_all = false;
+
 			/* Accept the received data, and process it */
 			XLogWalRcvProcessMsg(type, buf, len);
-
-			/* Receive any more data we can without sleeping */
-			while (walrcv_receive(0, &type, &buf, &len))
-				XLogWalRcvProcessMsg(type, buf, len);
-
-			/*
-			 * If we've written some records, flush them to disk and let the
-			 * startup process know about them.
-			 */
-			XLogWalRcvFlush();
+			LogstreamResult.Apply = GetXLogReplayRecPtr();
+			if (XLogWalRcvSendReply())
+			{
+				while (!received_all || !applied_all)
+				{
+					/* Receive any more data we can without sleeping */
+					if (walrcv_receive(0, &type, &buf, &len))
+						XLogWalRcvProcessMsg(type, buf, len);
+					else
+						received_all = true;
+
+					/* Keep replying until we have applied all outstanding WAL */
+					LogstreamResult.Apply = GetXLogReplayRecPtr();
+					if (XLByteEQ(LogstreamResult.Apply, LogstreamResult.Write))
+						applied_all = true;
+
+					if (!XLogWalRcvSendReply())
+						break;
+				}
+			}
 		}
 	}
 }
@@ -330,6 +347,8 @@ WalRcvDie(int code, Datum arg)
 	walrcv->pid = 0;
 	SpinLockRelease(&walrcv->mutex);
 
+	WalRcvDisownLatch();
+
 	/* Terminate the connection gracefully. */
 	if (walrcv_disconnect != NULL)
 		walrcv_disconnect();
@@ -340,6 +359,7 @@ static void
 WalRcvSigHupHandler(SIGNAL_ARGS)
 {
 	got_SIGHUP = true;
+	WalRcvWakeup(AUX_PROC_WAL_RECEIVER);
 }
 
 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
@@ -347,6 +367,7 @@ static void
 WalRcvShutdownHandler(SIGNAL_ARGS)
 {
 	got_SIGTERM = true;
+	WalRcvWakeup(AUX_PROC_WAL_RECEIVER);
 
 	/* Don't joggle the elbow of proc_exit */
 	if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -407,6 +428,11 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 				len -= sizeof(WalDataMessageHeader);
 
 				XLogWalRcvWrite(buf, len, msghdr.dataStart);
+				/*
+				 * As soon as we wrote WAL to the xlog files wake the
+				 * wal writer to do some work, if any.
+				 */
+				WalRcvWakeup(AUX_PROC_WAL_WRITER);
 				break;
 			}
 		default:
@@ -507,13 +533,39 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 		buf += byteswritten;
 
 		LogstreamResult.Write = recptr;
+
+		{
+			/* use volatile pointer to prevent code rearrangement */
+			volatile WalRcvData *walrcv = WalRcv;
+
+			/*
+			 * Update shared-memory status following write.
+			 * This allows the WALWriter to fsync WALReceiver continues.
+			 */
+			SpinLockAcquire(&walrcv->mutex);
+			walrcv->writtenUpto = LogstreamResult.Write;
+			SpinLockRelease(&walrcv->mutex);
+		}
 	}
 }
 
-/* Flush the log to disk */
+/* Flush the log to disk - run from WALReceiver */
 static void
 XLogWalRcvFlush(void)
 {
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalRcvData *walrcv = WalRcv;
+
+		/*
+		 * Update local status from shared-memory, in case WALWriter
+		 * has already flushed this for us.
+		 */
+		SpinLockAcquire(&walrcv->mutex);
+		LogstreamResult.Flush = walrcv->receivedUpto;
+		SpinLockRelease(&walrcv->mutex);
+	}
+
 	if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write))
 	{
 		/* use volatile pointer to prevent code rearrangement */
@@ -529,6 +581,16 @@ XLogWalRcvFlush(void)
 		walrcv->receivedUpto = LogstreamResult.Flush;
 		SpinLockRelease(&walrcv->mutex);
 
+		/*
+		 * Wakeup the Startup process, so it can apply new WAL.
+		 * This call is essential and cannot be optimised away.
+		 */
+		WalRcvWakeup(AUX_PROC_STARTUP);
+
+		/*
+		 * Don't wake the walwriter since we just did that work.
+		 */
+
 		/* Report XLOG streaming progress in PS display */
 		if (update_process_title)
 		{
@@ -541,3 +603,201 @@ XLogWalRcvFlush(void)
 		}
 	}
 }
+
+/*
+ * Flush the log to disk in the background - used only by WALWriter
+ *
+ * Returns true if WAL was flushed.
+ */
+bool
+XLogWalRcvBackgroundFlush(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	for (;;)
+	{
+		char		path[MAXPGPATH];
+		uint32 flushId = 0;
+		uint32 flushSeg = 0;
+
+		{
+			/* use volatile pointer to prevent code rearrangement */
+			volatile WalRcvData *walrcv = WalRcv;
+
+			/*
+			 * Update local status from shared-memory so we know
+			 * how far WALReceiver has written.
+			 */
+			SpinLockAcquire(&walrcv->mutex);
+			LogstreamResult.Flush = walrcv->receivedUpto;
+			LogstreamResult.Write = walrcv->writtenUpto;
+			SpinLockRelease(&walrcv->mutex);
+		}
+
+		/*
+		 * If we are already up to date, leave quickly.
+		 */
+		if (XLByteLE(LogstreamResult.Write, LogstreamResult.Flush))
+			return false;
+
+		XLByteToSeg(LogstreamResult.Flush, flushId, flushSeg);
+
+		if (recvFile >= 0)
+		{
+			/* If we are in the same WAL file as before, fsync and leave */
+			if (flushId == recvId || flushSeg == recvSeg)
+			{
+				issue_xlog_fsync(recvFile, recvId, recvSeg);
+				elog(DEBUG4, "writer flushed %X/%X",
+							LogstreamResult.Flush.xlogid, LogstreamResult.Flush.xrecoff);
+
+				LogstreamResult.Flush = LogstreamResult.Write;
+
+				/* Update shared-memory status */
+				SpinLockAcquire(&walrcv->mutex);
+				walrcv->receivedUpto = LogstreamResult.Flush;
+				SpinLockRelease(&walrcv->mutex);
+
+				/*
+				 * Wakeup the WALReceiver, so it can reply.
+				 *
+				 * XXX: We could optimise away this wakeup if
+				 * sync_rep_service = async or recv but we don't know
+				 * about that at present in walwriter.
+				 */
+				WalRcvWakeup(AUX_PROC_WAL_RECEIVER);
+
+				/*
+				 * Wakeup the Startup process, so it can apply new WAL.
+				 * This call is essential and cannot be optimised away.
+				 */
+				WalRcvWakeup(AUX_PROC_STARTUP);
+
+				/* Report XLOG streaming progress in PS display */
+				if (update_process_title)
+				{
+					char		activitymsg[50];
+
+					snprintf(activitymsg, sizeof(activitymsg), "flushed to %X/%X",
+							 LogstreamResult.Flush.xlogid,
+							 LogstreamResult.Flush.xrecoff);
+					set_ps_display(activitymsg, false);
+				}
+				return true;
+			}
+			else
+			{
+				/*
+				 * XLOG segment files will be re-read by recovery in startup
+				 * process soon, so we don't advise the OS to release cache
+				 * pages associated with the file like XLogFileClose() does.
+				 */
+				if (close(recvFile) != 0)
+					ereport(PANIC,
+							(errcode_for_file_access(),
+						errmsg("could not close log file %u, segment %u: %m",
+							   recvId, recvSeg)));
+				recvFile = -1;
+			}
+		}
+
+		/*
+		 * Open the next WAL file.
+		 */
+		XLogFilePath(path, GetRecoveryTargetTLI(), flushId, flushSeg);
+		recvFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
+		if (recvFile < 0)
+			ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\": %m", path)));
+
+		recvId = flushId;
+		recvSeg = flushSeg;
+	}
+}
+
+/*
+ * Send reply message to master. Returns false if message send failed.
+ *
+ * Our reply consists solely of the current state of the standby. We
+ * don't make any attempt to remember requests made by transactions on
+ * the master, so we have zero bookeeping to do on the standby and
+ * no transaction-level state information to transfer.
+ */
+static bool
+XLogWalRcvSendReply(void)
+{
+	if (!StandbyOffersSyncRepService())
+		return true;
+
+	return walrcv_send((char *) &LogstreamResult, sizeof(LogstreamState));
+}
+
+/*
+ * Latch handling for WALReceiver, WALWriter and Startup processes,
+ * which are referred to here as replication auxiliary procs.
+ * Those processes work together when we are in streaming replication
+ * only, so its OK to assume they all exist for processing loops etc.
+ * The WALRcv data structure exists in shared memory whether or not
+ * a WALReceiver actually exists currently. If it doesn't then we
+ * avoid waiting for it.
+ */
+
+/* Wake up the specified recovery auxiliary proc */
+void
+WalRcvWakeup(int auxproc)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	Assert(auxproc == AUX_PROC_WAL_RECEIVER	||
+			auxproc == AUX_PROC_WAL_WRITER  ||
+			auxproc == AUX_PROC_STARTUP);
+
+	SetLatch((Latch *) &walrcv->latch[auxproc]);
+};
+
+/*
+ * Wait on my processes' latch, previously claimed by WalRcvOwnLatch()
+ */
+void
+WalRcvWaitLatch(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	WaitLatch((Latch *) &walrcv->latch[my_auxproc_id], NAPTIME_PER_CYCLE);
+};
+
+void
+WalRcvOwnLatch(int auxproc)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	Assert(auxproc == AUX_PROC_WAL_RECEIVER	||
+			auxproc == AUX_PROC_WAL_WRITER  ||
+			auxproc == AUX_PROC_STARTUP);
+
+	/*
+	 * Ensure we don't request multiple OwnLatch commands.
+	 * This can be executed multiple times when wal receiver starts.
+	 */
+	if (my_auxproc_id < 0)
+	{
+		elog(DEBUG2, "OwnLatch %d", auxproc);
+		OwnLatch((Latch *) &walrcv->latch[auxproc]);
+		my_auxproc_id = auxproc;
+	}
+}
+
+void
+WalRcvDisownLatch(void)
+{
+	/* use volatile pointer to prevent code rearrangement */
+	volatile WalRcvData *walrcv = WalRcv;
+
+	if (my_auxproc_id >= 0)
+		DisownLatch(&walrcv->latch[my_auxproc_id]);
+}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index b206885..26ec086 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -60,10 +60,14 @@ WalRcvShmemInit(void)
 
 	if (!found)
 	{
+		int i;
+
 		/* First time through, so initialize */
 		MemSet(WalRcv, 0, WalRcvShmemSize());
 		WalRcv->walRcvState = WALRCV_STOPPED;
 		SpinLockInit(&WalRcv->mutex);
+		for (i = 0; i < NUM_RECOVERY_AUX_PROCS; i++)
+			InitSharedLatch(&WalRcv->latch[i]);
 	}
 }
 
@@ -172,11 +176,13 @@ ShutdownWalRcv(void)
  * is a libpq connection string to use.
  */
 void
-RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(WalRcvConnOptions *connopts)
 {
 	/* use volatile pointer to prevent code rearrangement */
 	volatile WalRcvData *walrcv = WalRcv;
 	pg_time_t	now = (pg_time_t) time(NULL);
+	XLogRecPtr recptr = connopts->startpoint;
+	char *conninfo = connopts->conninfo;
 
 	/*
 	 * We always start at the beginning of the segment. That prevents a broken
@@ -202,8 +208,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 	walrcv->receivedUpto = recptr;
 	walrcv->latestChunkStart = recptr;
 
+	walrcv->sync_rep_service = connopts->sync_rep_service;
+
 	SpinLockRelease(&walrcv->mutex);
 
+	/*
+	 * Note that we always start the WALwriter as well, so there is
+	 * no need to let postmaster know the exact sync_rep_service level.
+	 */
 	SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
 }
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 793ac33..8beffab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -82,6 +82,8 @@ static uint32 sendOff = 0;
  */
 static XLogRecPtr sentPtr = {0, 0};
 
+static StringInfoData		reply;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t shutdown_requested = false;
@@ -101,7 +103,8 @@ static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
 static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
+static void WalSndReceiveReply(void);
+static void CheckMsgFromStandby(void);
 
 
 /* Main entry point for walsender process */
@@ -167,6 +170,7 @@ WalSndHandshake(void)
 	bool		replication_started = false;
 
 	initStringInfo(&input_message);
+	initStringInfo(&reply);
 
 	while (!replication_started)
 	{
@@ -312,6 +316,20 @@ WalSndHandshake(void)
 					break;
 				}
 
+			case 'x':
+				/*
+				 * The other side of the wire indicates that it will send us
+				 * the current LSNs and that synchronous replication service is
+				 * available to backends that request it.
+				 */
+				MyWalSnd->sync_rep_service = pq_getmsgint(&input_message, 4);
+				elog(LOG, "standby server offers synchronous replication service up to mode %d",
+								MyWalSnd->sync_rep_service);
+
+				EndCommand("SYNC", DestRemote);
+				ReadyForQuery(DestRemote);
+				break;
+
 			case 'X':
 				/* standby is closing the connection */
 				proc_exit(0);
@@ -332,10 +350,63 @@ WalSndHandshake(void)
 }
 
 /*
+ * Receive reply message from standby. Returns false if message send failed.
+ */
+static void
+WalSndReceiveReply(void)
+{
+	XLogRecPtr	recptr;
+	bool		changed;
+
+	if (pq_getmessage(&reply, 4 + sizeof(LogstreamState)))
+		ereport(COMMERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("unexpected EOF on standby connection")));
+
+	if (reply.len != sizeof(LogstreamState))
+		ereport(COMMERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("invalid message length on standby connection")));
+
+	recptr.xlogid  = pq_getmsgint(&reply, 4);
+	recptr.xrecoff = pq_getmsgint(&reply, 4);
+	if (XLByteLT(SyncRepReplyFromStandby.Write, recptr))
+	{
+		changed = true;
+		SyncRepReplyFromStandby.Write = recptr;
+	}
+
+	recptr.xlogid  = pq_getmsgint(&reply, 4);
+	recptr.xrecoff = pq_getmsgint(&reply, 4);
+	if (XLByteLT(SyncRepReplyFromStandby.Flush, recptr))
+	{
+		changed = true;
+		SyncRepReplyFromStandby.Flush = recptr;
+	}
+
+	recptr.xlogid  = pq_getmsgint(&reply, 4);
+	recptr.xrecoff = pq_getmsgint(&reply, 4);
+	if (XLByteLT(SyncRepReplyFromStandby.Apply, recptr))
+	{
+		changed = true;
+		SyncRepReplyFromStandby.Apply = recptr;
+	}
+
+	if (changed)
+		elog(LOG, "sync rep state: recv = %X/%X fsync = %X/%X apply = %X/%X",
+					SyncRepReplyFromStandby.Write.xlogid,
+					SyncRepReplyFromStandby.Write.xrecoff,
+					SyncRepReplyFromStandby.Flush.xlogid,
+					SyncRepReplyFromStandby.Flush.xrecoff,
+					SyncRepReplyFromStandby.Apply.xlogid,
+					SyncRepReplyFromStandby.Apply.xrecoff);
+}
+
+/*
  * Check if the remote end has closed the connection.
  */
 static void
-CheckClosedConnection(void)
+CheckMsgFromStandby(void)
 {
 	unsigned char firstchar;
 	int			r;
@@ -364,6 +435,15 @@ CheckClosedConnection(void)
 		case 'X':
 			proc_exit(0);
 
+			/*
+			 * 'd' means the standby is sending us some completed XIDs
+			 * in a COPY IN packet.
+			 */
+		case 'd':
+			WalSndReceiveReply();
+			SyncRepReleaseWaiters();
+			break;
+
 		default:
 			ereport(FATAL,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -425,6 +505,8 @@ WalSndLoop(void)
 			proc_exit(0);
 		}
 
+		CheckMsgFromStandby();
+
 		/*
 		 * If we had sent all accumulated WAL in last round, nap for the
 		 * configured time before retrying.
@@ -449,14 +531,12 @@ WalSndLoop(void)
 				 * WaitLatchOrSocket should reliably wake up as soon as
 				 * something interesting happens.
 				 */
+				CheckMsgFromStandby();
 
 				/* Sleep */
 				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
 								  WalSndDelay);
 			}
-
-			/* Check if the connection was closed */
-			CheckClosedConnection();
 		}
 		else
 		{
@@ -516,9 +596,10 @@ InitWalSnd(void)
 			 */
 			OwnLatch((Latch *) &walsnd->latch);
 			walsnd->pid = MyProcPid;
-			MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+			MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
 			/* Set MyWalSnd only after it's fully initialized. */
 			MyWalSnd = (WalSnd *) walsnd;
+			MyWalSnd->sync_rep_service = SYNC_REP_ASYNC;
 			SpinLockRelease(&walsnd->mutex);
 			break;
 		}
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 3a5a25b..8357b45 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -697,3 +697,12 @@ LWLockHeldByMe(LWLockId lockid)
 	}
 	return false;
 }
+
+/*
+ * Allows Assert(LWNumLocksHeldByMe() == 0) checks
+ */
+int
+LWNumLocksHeldByMe(void)
+{
+	return num_held_lwlocks;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 14c8baf..df133ec 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -39,6 +39,7 @@
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
+#include "replication/syncrep.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/pmsignal.h"
@@ -196,6 +197,7 @@ InitProcGlobal(void)
 		PGSemaphoreCreate(&(procs[i].sem));
 		procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
 		ProcGlobal->freeProcs = &procs[i];
+		InitSharedLatch(&procs[i].waitLatch);
 	}
 
 	/*
@@ -326,6 +328,13 @@ InitProcess(void)
 		SHMQueueInit(&(MyProc->myProcLocks[i]));
 	MyProc->recoveryConflictPending = false;
 
+	/* Initialise the waitLSN for sync rep */
+	MyProc->waitLSN.xlogid = 0;
+	MyProc->waitLSN.xrecoff = 0;
+
+	OwnLatch((Latch *) &MyProc->waitLatch);
+	MyProc->ownLatch = true;
+
 	/*
 	 * We might be reusing a semaphore that belonged to a failed process. So
 	 * be careful and reinitialize its value here.	(This is not strictly
@@ -365,6 +374,7 @@ InitProcessPhase2(void)
 	/*
 	 * Arrange to clean that up at backend exit.
 	 */
+	on_shmem_exit(SyncRepCleanupAtProcExit, 0);
 	on_shmem_exit(RemoveProcFromArray, 0);
 }
 
@@ -1604,7 +1614,6 @@ handle_sig_alarm(SIGNAL_ARGS)
 
 	if (statement_timeout_active)
 		(void) CheckStatementTimeout();
-
 	errno = save_errno;
 }
 
@@ -1784,3 +1793,4 @@ handle_standby_sig_alarm(SIGNAL_ARGS)
 
 	errno = save_errno;
 }
+
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0048899..8c20c31 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -55,6 +55,7 @@
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
+#include "replication/syncrep.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/standby.h"
@@ -352,6 +353,7 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
  */
 extern const struct config_enum_entry wal_level_options[];
 extern const struct config_enum_entry sync_method_options[];
+extern const struct config_enum_entry sync_rep_options[];
 
 /*
  * GUC option variables that are exported from this module
@@ -1457,6 +1459,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"synchronous_replication_timeout", PGC_USERSET, WAL_SETTINGS,
+			gettext_noop("If synchronous_replication waits, it will wait no more than this timeout."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&sync_rep_timeout,
+		30000, 0, INT_MAX / 2, NULL, NULL
+	},
+
+	{
 		{"temp_buffers", PGC_USERSET, RESOURCES_MEM,
 			gettext_noop("Sets the maximum number of temporary buffers used by each session."),
 			NULL,
@@ -2861,6 +2873,15 @@ static struct config_enum ConfigureNamesEnum[] =
 	},
 
 	{
+		{"synchronous_replication", PGC_USERSET, WAL_SETTINGS,
+			gettext_noop("Set the wait mode for synchronous replication."),
+			NULL
+		},
+		&sync_rep_mode,
+		SYNC_REP_ASYNC, sync_rep_options, NULL
+	},
+
+	{
 		{"wal_sync_method", PGC_SIGHUP, WAL_SETTINGS,
 			gettext_noop("Selects the method used for forcing WAL updates to disk."),
 			NULL
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7d667d5..e119a9b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -283,6 +283,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
 extern bool RecoveryInProgress(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
+extern XLogRecPtr GetXLogReplayRecPtr(void);
 
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 2c775c1..f7cc3af 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -244,6 +244,11 @@ extern void PreventCommandDuringRecovery(const char *cmdname);
 extern int	trace_recovery_messages;
 extern int	trace_recovery(int trace_level);
 
+/* in storage/ipc/ipc.c */
+extern void ResetProgressiveSleep(void);
+extern void ProgressiveSleep(long target_delay);
+
+
 /*****************************************************************************
  *	  pdir.h --																 *
  *			POSTGRES directory path definitions.							 *
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
new file mode 100644
index 0000000..5f75c77
--- /dev/null
+++ b/src/include/replication/syncrep.h
@@ -0,0 +1,74 @@
+/*-------------------------------------------------------------------------
+ *
+ * syncrep.h
+ *	  Exports from replication/syncrep.c.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _SYNCREP_H
+#define _SYNCREP_H
+
+#include "access/xlog.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+/* Sync rep wait modes */
+typedef enum SyncRepWaitMode
+{
+	SYNC_REP_ASYNC = -1,	/* Never any data to represent, so invalid offset */
+	SYNC_REP_RECV,			/* First element in any arrays */
+	SYNC_REP_FSYNC,
+	SYNC_REP_APPLY,
+} SyncRepWaitMode;
+
+/*
+ * There is no reply from standby to master for async mode, so the reply
+ * message needs one less slot than the maximum number of modes
+ */
+#define NUM_SYNC_REP_WAIT_MODES		3
+
+#define StandbyOffersSyncRepService() (sync_rep_service < SYNC_REP_ASYNC)
+
+typedef struct LogstreamState
+{
+	XLogRecPtr	Write;			/* last byte + 1 written out in the standby */
+	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
+	XLogRecPtr	Apply;			/* last byte + 1 applied on the standby */
+}	LogstreamState;
+
+extern LogstreamState SyncRepReplyFromStandby;
+
+/*
+ * Each synchronous rep wait mode has one SyncRepWaitQueue in shared memory.
+ * These queues live in the WAL sender shmem area.
+ */
+typedef struct SyncRepQueue
+{
+	XLogRecPtr	LSN;			/* waiting for WAL to be synced up to this point */
+
+	PGPROC		*head;
+	PGPROC		*tail;
+
+	slock_t		mutex;			/* locks shared variables shown above */
+} SyncRepQueue;
+
+/* user-settable parameters */
+extern int sync_rep_mode;
+extern int sync_rep_timeout;
+extern int sync_rep_service;
+
+/* called by user backend */
+extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
+
+/* called by wal sender */
+extern void SyncRepReleaseWaiters(void);
+
+/* callback at exit */
+extern void SyncRepCleanupAtProcExit(int code, Datum arg);
+
+#endif   /* _SYNCREP_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 267ef72..b81d3dd 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -13,6 +13,8 @@
 #define _WALRECEIVER_H
 
 #include "access/xlogdefs.h"
+#include "replication/syncrep.h"
+#include "storage/latch.h"
 #include "storage/spin.h"
 #include "pgtime.h"
 
@@ -25,6 +27,13 @@ extern bool am_walreceiver;
  */
 #define MAXCONNINFO		1024
 
+typedef struct WalRcvConnOptions
+{
+	char		conninfo[MAXCONNINFO];
+	XLogRecPtr	startpoint;
+	int			sync_rep_service;
+} WalRcvConnOptions;
+
 /*
  * Values for WalRcv->walRcvState.
  */
@@ -48,13 +57,21 @@ typedef struct
 	pid_t		pid;
 	WalRcvState walRcvState;
 	pg_time_t	startTime;
+	int			sync_rep_service;
+
+	/*
+	 * receivedUpto-1 is the last byte position that has already been
+	 * written to WAL files. walreceiver updates this whenever it writes
+	 * received WAL to disk.
+	 */
+	XLogRecPtr	writtenUpto;
 
 	/*
 	 * receivedUpto-1 is the last byte position that has already been
 	 * received.  When startup process starts the walreceiver, it sets
 	 * receivedUpto to the point where it wants the streaming to begin. After
-	 * that, walreceiver updates this whenever it flushes the received WAL to
-	 * disk.
+	 * that, walreceiver or walwriter updates this whenever WAL is flushed
+	 * to disk.
 	 */
 	XLogRecPtr	receivedUpto;
 
@@ -71,31 +88,49 @@ typedef struct
 	 */
 	char		conninfo[MAXCONNINFO];
 
+#define AUX_PROC_WAL_RECEIVER	0
+#define AUX_PROC_WAL_WRITER		1
+#define AUX_PROC_STARTUP		2
+#define NUM_RECOVERY_AUX_PROCS	3
+	/*
+	 * Latch used by aux procs to wake up walreceiver when it has work to do.
+	 */
+	Latch		latch[NUM_RECOVERY_AUX_PROCS];
+
 	slock_t		mutex;			/* locks shared variables shown above */
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
 
 /* libpqwalreceiver hooks */
-typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+typedef bool (*walrcv_connect_type) (WalRcvConnOptions *connopts);
 extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
 
 typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
 												 char **buffer, int *len);
 extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
 
+typedef bool (*walrcv_send_type) (char *buffer, int len);
+extern PGDLLIMPORT walrcv_send_type walrcv_send;
+
 typedef void (*walrcv_disconnect_type) (void);
 extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
 
 /* prototypes for functions in walreceiver.c */
 extern void WalReceiverMain(void);
+extern bool XLogWalRcvBackgroundFlush(void);
+
+extern void WalRcvWakeup(int auxproc);
+extern void WalRcvWaitLatch(void);
+extern void WalRcvOwnLatch(int auxproc);
+extern void WalRcvDisownLatch(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
 extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
-extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
+extern void RequestXLogStreaming(WalRcvConnOptions *connopts);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
 
 #endif   /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 73c5904..bb51c30 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -14,6 +14,7 @@
 
 #include "access/xlog.h"
 #include "storage/latch.h"
+#include "replication/syncrep.h"
 #include "storage/spin.h"
 
 /*
@@ -24,18 +25,32 @@ typedef struct WalSnd
 	pid_t		pid;			/* this walsender's process id, or 0 */
 	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
 
-	slock_t		mutex;			/* locks shared variables shown above */
+	slock_t		mutex;
 
 	/*
 	 * Latch used by backends to wake up this walsender when it has work
 	 * to do.
 	 */
 	Latch		latch;
+	/*
+	 * Highest level of sync rep available from this standby.
+	 */
+	int		sync_rep_service;
+
 } WalSnd;
 
 /* There is one WalSndCtl struct for the whole database cluster */
 typedef struct
 {
+	/*
+	 * Sync rep wait queues with one queue per request type.
+	 * We use one queue per request type so that we can maintain the
+	 * invariant that the individual queues are sorted on LSN.
+	 * This may also help performance when multiple wal senders
+	 * offer different sync rep service levels.
+	 */
+	SyncRepQueue	sync_rep_queue[NUM_SYNC_REP_WAIT_MODES];
+
 	WalSnd		walsnds[1];		/* VARIABLE LENGTH ARRAY */
 } WalSndCtlData;
 
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 4eece8b..2cfdd84 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -98,6 +98,7 @@ extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode);
 extern void LWLockRelease(LWLockId lockid);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLockId lockid);
+extern int LWNumLocksHeldByMe(void);
 
 extern int	NumLWLocks(void);
 extern Size LWLockShmemSize(void);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 9322337..24b920d 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -14,6 +14,8 @@
 #ifndef _PROC_H_
 #define _PROC_H_
 
+#include "access/xlog.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
 #include "utils/timestamp.h"
@@ -115,6 +117,11 @@ struct PGPROC
 	LOCKMASK	heldLocks;		/* bitmask for lock types already held on this
 								 * lock object by this backend */
 
+	/* Info to allow us to wait for synchronous replication, if needed. */
+	Latch		waitLatch;
+	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
+	bool		ownLatch;		/* do we own the above latch? */
+
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
 	 * linked into one of these lists, according to the partition number of
@@ -153,11 +160,10 @@ typedef struct PROC_HDR
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer and WAL writer run during normal operation. Startup
- * process and WAL receiver also consume 2 slots, but WAL writer is
- * launched only after startup has exited, so we only need 3 slots.
+ * Background writer and WAL writer can run at any time. Startup
+ * process and WAL receiver also consume 2 slots.
  */
-#define NUM_AUXILIARY_PROCS		3
+#define NUM_AUXILIARY_PROCS		4
 
 
 /* configurable options */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 2b662e4..0be7b03 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -157,3 +157,4 @@ PQescapeLiteral           154
 PQescapeIdentifier        155
 PQconnectdbParams         156
 PQconnectStartParams      157
+PQsetDuplexCopy           158
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 07a7c70..71c4dc5 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -16,6 +16,7 @@
 
 #include <ctype.h>
 #include <fcntl.h>
+#include <stdio.h>
 
 #include "libpq-fe.h"
 #include "libpq-int.h"
@@ -2010,7 +2011,8 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
 {
 	if (!conn)
 		return -1;
-	if (conn->asyncStatus != PGASYNC_COPY_IN)
+	if (	(!conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_IN) ||
+		( conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_OUT) )
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 						  libpq_gettext("no COPY in progress\n"));
@@ -2174,6 +2176,37 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
 }
 
 /*
+ * PQsetDuplexCopy - set the duplex copy flag
+ *
+ * This makes PQputCopyData() able to pass an arbitrary message while
+ * COPY OUT is in progress. This is for synchronous replication.
+ */
+int
+PQsetDuplexCopy(PGconn *conn, int mode)
+{
+	PGresult   *res;
+
+	if (pqPutMsgStart('x', false, conn) < 0 ||
+		pqPutInt(mode, 4, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	conn->asyncStatus = PGASYNC_BUSY;
+
+	res = PQexecFinish(conn);
+
+	conn->duplexCopy = (PQresultStatus(res) == PGRES_COMMAND_OK);
+
+	PQclear(res);
+
+	return (conn->duplexCopy ? 1 : 0);
+
+sendFailed:
+	pqHandleSendFailure(conn);
+	return 0;
+}
+
+/*
  * PQgetline - gets a newline-terminated string from the backend.
  *
  * Chiefly here so that applications can use "COPY <rel> to stdout"
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 7a557d0..d3a2add 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -391,6 +391,7 @@ extern PGnotify *PQnotifies(PGconn *conn);
 extern int	PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
 extern int	PQputCopyEnd(PGconn *conn, const char *errormsg);
 extern int	PQgetCopyData(PGconn *conn, char **buffer, int async);
+extern int	PQsetDuplexCopy(PGconn *conn, int mode);
 
 /* Deprecated routines for copy in/out */
 extern int	PQgetline(PGconn *conn, char *string, int length);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 72faa15..ba0aa85 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -368,6 +368,7 @@ struct pg_conn
 	pgParameterStatus *pstatus; /* ParameterStatus data */
 	int			client_encoding;	/* encoding id */
 	bool		std_strings;	/* standard_conforming_strings */
+	bool		duplexCopy;	/* COPY IN/OUT in duplex mode, for synchronous replication */
 	PGVerbosity verbosity;		/* error/notice message verbosity */
 	PGlobjfuncs *lobjfuncs;		/* private state for large-object access fns */
 
