diff -dcrpN pgsql.orig/contrib/Makefile pgsql/contrib/Makefile
*** pgsql.orig/contrib/Makefile	2009-11-18 22:57:56.000000000 +0100
--- pgsql/contrib/Makefile	2010-04-29 12:17:37.000000000 +0200
*************** SUBDIRS = \
*** 37,42 ****
--- 37,43 ----
  		pgstattuple	\
  		seg		\
  		spi		\
+ 		syncreputils	\
  		tablefunc	\
  		test_parser	\
  		tsearch2	\
diff -dcrpN pgsql.orig/contrib/syncreputils/Makefile pgsql/contrib/syncreputils/Makefile
*** pgsql.orig/contrib/syncreputils/Makefile	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/Makefile	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,17 ----
+ # $PostgreSQL: pgsql/contrib/pg_stat_statements/Makefile,v 1.1 2009/01/04 22:19:59 tgl Exp $
+ 
+ MODULE_big = syncreputils
+ DATA_built = syncreputils.sql
+ DATA = uninstall_syncreputils.sql
+ OBJS = syncreputils.o
+ 
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/pg_stat_statements
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.c pgsql/contrib/syncreputils/syncreputils.c
*** pgsql.orig/contrib/syncreputils/syncreputils.c	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,223 ----
+ #include "postgres.h"
+ #include "fmgr.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "replication/walsender.h"
+ #include "replication/walreceiver.h"
+ #include "storage/lwlock.h"
+ #include "storage/proc.h"
+ #include "storage/procarray.h"
+ 
+ #include "syncreputils.h"
+ 
+ PG_MODULE_MAGIC;
+ 
+ PG_FUNCTION_INFO_V1(syncrep_waiting_xact);
+ PG_FUNCTION_INFO_V1(syncrep_queued_xact);
+ 
+ /*
+  *	syncrep_waiting_xact(
+  *		out int4 pid,
+  *		out xid xact,
+  *		out reports int4,
+  *		out max_reports int4)
+  */
+ Datum
+ syncrep_waiting_xact(PG_FUNCTION_ARGS)
+ {
+ 	FuncCallContext	   *funcctx;
+ 	TupleDesc		ret_tupdesc;
+ 	AttInMetadata	   *attinmeta;
+ 	MemoryContext		oldcontext;
+ 	int			call_cntr;
+ 	int			max_calls;
+ 	Datum			result[4];
+ 	bool                    isnull[4] = { false, false, false, false };
+ 	srwx		   *fctx;
+ 
+ 	/* stuff done only on the first call of the function */
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		TupleDesc	tupdesc;
+ 		int		i, found;
+ 		int		min_reports;
+ 		ProcArrayStruct *arrayP = getProcArray();
+ 
+ 		if (RecoveryInProgress())
+ 			elog(ERROR, "this function is only usable on the primary server");
+ 
+ 		if (max_wal_senders == 0)
+ 			elog(ERROR, "this function is useless on a standalone server");
+ 
+ 		if (WalSndMinSyncSlaves == 0)
+ 			elog(ERROR, "this function is useless without synchronous replication");
+ 
+ 		/* create a function context for cross-call persistence */
+ 		funcctx = SRF_FIRSTCALL_INIT();
+ 
+ 		/*
+ 		 * switch to memory context appropriate for multiple function calls
+ 		 */
+ 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ 
+ 		switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ 		{
+ 			case TYPEFUNC_COMPOSITE:
+ 				/* success */
+ 				break;
+ 			case TYPEFUNC_RECORD:
+ 				/* failed to determine actual type of RECORD */
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 						 errmsg("function returning record called in context "
+ 						 	"that cannot accept type record")));
+ 				break;
+ 			default:
+ 				/* result type isn't composite */
+ 				elog(ERROR, "return type must be a row type");
+ 				break;
+ 		}
+ 
+ 		/* make sure we have a persistent copy of the tupdesc */
+ 		tupdesc = CreateTupleDescCopy(tupdesc);
+ 
+ 		/*
+ 		 * Generate attribute metadata needed later to produce tuples from raw
+ 		 * C strings
+ 		 */
+ 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ 		funcctx->attinmeta = attinmeta;
+ 
+ 		/* allocate memory */
+ 		fctx = (srwx *)palloc(MaxConnections * sizeof(srwx));
+ 
+ 		min_reports = ProcMinSyncReports();
+ 
+ 		LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 		for (i = found = 0; i < arrayP->numProcs; i++)
+ 		{
+ 			SpinLockAcquire(&arrayP->procs[i]->mutex);
+ 			if (arrayP->procs[i]->locked_for_sync)
+ 			{
+ 				fctx[found].pid = arrayP->procs[i]->pid;
+ 				fctx[found].xid = arrayP->procs[i]->committed_xid;
+ 				fctx[found].reports = arrayP->procs[i]->sync_reports;
+ 				fctx[found].min_reports = min_reports;
+ 
+ 				found++;
+ 			}
+ 			SpinLockRelease(&arrayP->procs[i]->mutex);
+ 		}
+ 
+ 		LWLockRelease(ProcArrayLock);
+ 
+ 		/* total number of tuples to be returned */
+ 		funcctx->max_calls = found;
+ 
+ 		funcctx->user_fctx = fctx;
+ 
+ 		MemoryContextSwitchTo(oldcontext);
+ 	}
+ 
+ 	funcctx = SRF_PERCALL_SETUP();
+ 
+ 	call_cntr = funcctx->call_cntr;
+ 	max_calls = funcctx->max_calls;
+ 	fctx = funcctx->user_fctx;
+ 
+ 	/* attribute return type and return tuple description */
+ 	attinmeta = funcctx->attinmeta;
+ 	ret_tupdesc = attinmeta->tupdesc;
+ 
+ 	/* are there any records left? */
+ 	if (call_cntr < max_calls)	/* do when there is more left to send */
+ 	{
+ 		HeapTuple	tuple;
+ 
+ 		result[0] = Int32GetDatum(fctx[call_cntr].pid);
+ 		result[1] = TransactionIdGetDatum(fctx[call_cntr].xid);
+ 		result[2] = Int32GetDatum(fctx[call_cntr].reports);
+ 		result[3] = Int32GetDatum(fctx[call_cntr].min_reports);
+ 
+ 		tuple = heap_form_tuple(ret_tupdesc, result, isnull);
+ 
+ 		/* send the result */
+ 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ 	}
+ 	else
+ 	{
+ 		/* do when there is no more left */
+ 		SRF_RETURN_DONE(funcctx);
+ 	}
+ }
+ 
+ /*
+  *	syncrep_queued_xact() RETURNS SETOF xid
+  */
+ Datum
+ syncrep_queued_xact(PG_FUNCTION_ARGS)
+ {
+ 	FuncCallContext	   *funcctx;
+ 	MemoryContext		oldcontext;
+ 	int			call_cntr;
+ 	int			max_calls;
+ 	int			i;
+ 	TransactionId	   *fctx;
+ 
+ 	/* stuff done only on the first call of the function */
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		/* create a function context for cross-call persistence */
+ 		funcctx = SRF_FIRSTCALL_INIT();
+ 
+ 		if (!RecoveryInProgress())
+ 			elog(ERROR, "this function is only usable on a secondary server");
+ 
+ 		if (!WalRcvInProgress())
+ 			elog(ERROR, "this function is only usable on a secondary server");
+ 
+ 		/*
+ 		 * switch to memory context appropriate for multiple function calls
+ 		 */
+ 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ 
+ 		LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 		/* allocate memory */
+ 		fctx = (TransactionId *)palloc(WalRcv->n_xids * sizeof(TransactionId));
+ 
+ 		/* Copy current state */
+ 		for (i = 0; i < WalRcv->n_xids; i++)
+ 			fctx[i] = WalRcv->xids[i];
+ 
+ 		/* total number of tuples to be returned */
+ 		funcctx->max_calls = WalRcv->n_xids;
+ 
+ 		funcctx->user_fctx = fctx;
+ 
+ 		LWLockRelease(SynchronousReplicationLock);
+ 
+ 		MemoryContextSwitchTo(oldcontext);
+ 	}
+ 
+ 	funcctx = SRF_PERCALL_SETUP();
+ 
+ 	call_cntr = funcctx->call_cntr;
+ 	max_calls = funcctx->max_calls;
+ 	fctx = funcctx->user_fctx;
+ 
+ 	/* are there any records left? */
+ 	if (call_cntr < max_calls)	/* do when there is more left to send */
+ 	{
+ 		return TransactionIdGetDatum(fctx[call_cntr]);
+ 	}
+ 	else
+ 	{
+ 		/* do when there is no more left */
+ 		pfree(fctx);
+ 		funcctx->user_fctx = NULL;
+ 
+ 		SRF_RETURN_DONE(funcctx);
+ 	}
+ }
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.h pgsql/contrib/syncreputils/syncreputils.h
*** pgsql.orig/contrib/syncreputils/syncreputils.h	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.h	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,38 ----
+ #ifndef SYNCREPUTILS_H
+ #define SYNCREPUTILS_H
+ 
+ /* Stolen from procarray.c in 9.0 */
+ typedef struct ProcArrayStruct
+ {
+ 	int			numProcs;		/* number of valid procs entries */
+ 	int			maxProcs;		/* allocated size of procs array */
+ 
+ 	int			numKnownAssignedXids;	/* current number of known assigned
+ 										 * xids */
+ 	int			maxKnownAssignedXids;	/* allocated size of known assigned
+ 										 * xids */
+ 
+ 	/*
+ 	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
+ 	 * overflowing cached subxids in PGPROC entries.
+ 	 */
+ 	TransactionId lastOverflowedXid;
+ 
+ 	/*
+ 	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
+ 	 * actually it is maxProcs entries long.
+ 	 */
+ 	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
+ } ProcArrayStruct;
+ 
+ typedef struct {
+ 	int4		pid;
+ 	TransactionId	xid;
+ 	int4		reports;
+ 	int4		min_reports;
+ } srwx;
+ 
+ extern Datum syncrep_waiting_xact(PG_FUNCTION_ARGS);
+ extern Datum syncrep_queued_xact(PG_FUNCTION_ARGS);
+ 
+ #endif
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.sql.in pgsql/contrib/syncreputils/syncreputils.sql.in
*** pgsql.orig/contrib/syncreputils/syncreputils.sql.in	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.sql.in	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,9 ----
+ -- Lists the PIDs and XIDs that are waiting for synchronous replication report
+ -- Runnable on the primary server
+ CREATE OR REPLACE FUNCTION syncrep_waiting_xact(out pid int4, out xact xid, out reports int4, out min_reports int4)
+ RETURNS SETOF RECORD AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE;
+ 
+ -- Lists the XIDs that are queued for synchronous replication
+ -- Runnable on the secondary server
+ CREATE OR REPLACE FUNCTION syncrep_queued_xact()
+ RETURNS SETOF xid AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE;
diff -dcrpN pgsql.orig/contrib/syncreputils/uninstall_syncreputils.sql pgsql/contrib/syncreputils/uninstall_syncreputils.sql
*** pgsql.orig/contrib/syncreputils/uninstall_syncreputils.sql	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/uninstall_syncreputils.sql	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,2 ----
+ DROP FUNCTION syncrep_waiting_xact(out int4, out xid);
+ DROP FUNCTION syncrep_queued_xact();
diff -dcrpN pgsql.orig/doc/src/sgml/config.sgml pgsql/doc/src/sgml/config.sgml
*** pgsql.orig/doc/src/sgml/config.sgml	2010-04-29 12:08:59.000000000 +0200
--- pgsql/doc/src/sgml/config.sgml	2010-04-29 12:17:37.000000000 +0200
*************** SET ENABLE_SEQSCAN TO OFF;
*** 1913,1918 ****
--- 1913,1955 ----
         </para>
         </listitem>
        </varlistentry>
+ 
+       <varlistentry id="guc-min-sync-replication-clients" xreflabel="min_sync_replication_clients">
+        <term><varname>min_sync_replication_clients</varname> (<type>integer</type>)</term>
+        <indexterm>
+         <primary><varname>min_sync_replication_clients</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies the number of replication clients that need to report back
+         in synchronous replication. COMMITted transactions will hold until
+         enough clients report back. Meaningful values are zero ... <varname>max_wal_senders</>.
+         Zero (the default value) means asynchronous replication.
+         Greater values may mean transactions (and clients) held back longer.
+        </para>
+        </listitem>
+       </varlistentry>
+ 
+       <varlistentry id="guc-strict-sync-replication" xreflabel="strict_sync_replication">
+        <term><varname>strict_sync_replication</varname> (<type>boolean</type>)</term>
+        <indexterm>
+         <primary><varname>strict_sync_replication</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies whether the synchronous replication strictly expects
+         <varname>min_sync_replication_clients</> reports from replication clients
+         or the number of expected reports is limited to the actual number of
+         connected synchronous replication clients. Default value is off
+         meaning that transactions and SQL clients will not be held back
+         if the number of connected synchronous replication clients is
+         smaller than <varname>min_sync_replication_clients</>. Turning it on
+         causes transactions and SQL clients to be held back (stalled) until
+         enough synchronous replication clients connect and report back.
+        </para>
+        </listitem>
+       </varlistentry>
+ 
       </variablelist>
      </sect2>
      <sect2 id="runtime-config-standby">
diff -dcrpN pgsql.orig/doc/src/sgml/protocol.sgml pgsql/doc/src/sgml/protocol.sgml
*** pgsql.orig/doc/src/sgml/protocol.sgml	2010-04-03 09:22:55.000000000 +0200
--- pgsql/doc/src/sgml/protocol.sgml	2010-04-29 12:17:37.000000000 +0200
*************** Terminate (F)
*** 3953,3958 ****
--- 3953,3996 ----
  </varlistentry>
  
  
+ <varlistentry>
+ <term>
+ Set Duplex Copy (F)
+ </term>
+ <listitem>
+ <para>
+ 
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('x')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as an indicator that the frontend is
+                 a synchronous replication client. Only used between the primary
+                 and the standby servers during replication handshake. Regular
+                 frontends receive a protocol error.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32(4)
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ 
+ </para>
+ </listitem>
+ </varlistentry>
+ 
+ 
  </variablelist>
  
  </sect1>
diff -dcrpN pgsql.orig/doc/src/sgml/recovery-config.sgml pgsql/doc/src/sgml/recovery-config.sgml
*** pgsql.orig/doc/src/sgml/recovery-config.sgml	2010-04-29 12:09:00.000000000 +0200
--- pgsql/doc/src/sgml/recovery-config.sgml	2010-04-29 12:17:37.000000000 +0200
*************** restore_command = 'copy "C:\\server\\arc
*** 279,284 ****
--- 279,297 ----
           </para>
          </listitem>
         </varlistentry>
+        <varlistentry id="synchronous-slave" xreflabel="synchronous_slave">
+         <term><varname>synchronous_slave</varname> (<type>boolean</type>)</term>
+         <listitem>
+          <para>
+           Specifies whether this replication clients acts as a synchronous
+           replication client. This means that upon replaying the WAL stream
+           the standby server is looking for WAL records with COMMIT or
+           PREPARE TRANSACTION signature. Transaction IDs of such WAL records
+           are sent back to the primary server. Default value is off, meaning
+           that the replication client acts asynchronously.
+          </para>
+         </listitem>
+        </varlistentry>
         <varlistentry id="trigger-file" xreflabel="trigger_file">
          <term><varname>trigger_file</varname> (<type>string</type>)</term>
          <indexterm>
diff -dcrpN pgsql.orig/src/backend/access/transam/twophase.c pgsql/src/backend/access/transam/twophase.c
*** pgsql.orig/src/backend/access/transam/twophase.c	2010-04-28 09:08:17.000000000 +0200
--- pgsql/src/backend/access/transam/twophase.c	2010-04-29 14:07:40.000000000 +0200
***************
*** 58,63 ****
--- 58,64 ----
  #include "storage/fd.h"
  #include "storage/procarray.h"
  #include "storage/sinvaladt.h"
+ #include "storage/spin.h"
  #include "storage/smgr.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
*************** EndPrepare(GlobalTransaction gxact)
*** 1019,1024 ****
--- 1020,1038 ----
  
  	MyProc->inCommit = true;
  
+ 	/*
+ 	 * We want synchronous replication here, mark the backend
+ 	 * to wait at the end of the transaction.
+ 	 */
+ 	if (ProcMinSyncReports() > 0)
+ 	{
+ 		SpinLockAcquire(&MyProc->mutex);
+ 		MyProc->locked_for_sync = true;
+ 		MyProc->committed_xid = xid;
+ 		MyProc->sync_reports = 0;
+ 		SpinLockRelease(&MyProc->mutex);
+ 	}
+ 
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
  	XLogFlush(gxact->prepare_lsn);
diff -dcrpN pgsql.orig/src/backend/access/transam/xact.c pgsql/src/backend/access/transam/xact.c
*** pgsql.orig/src/backend/access/transam/xact.c	2010-02-26 03:00:34.000000000 +0100
--- pgsql/src/backend/access/transam/xact.c	2010-04-29 14:05:52.000000000 +0200
***************
*** 36,41 ****
--- 36,42 ----
  #include "libpq/be-fsstubs.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "replication/walsender.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/lmgr.h"
*************** RecordTransactionCommit(void)
*** 1012,1017 ****
--- 1013,1038 ----
  		}
  		rdata[lastrdata].next = NULL;
  
+ 		/*
+ 		 * Compute latestXid before we actually COMMIT, so we have a chance
+ 		 * to lock our transaction before the XID notify comes back from
+ 		 * the secondary server(s).
+ 		 */
+ 		latestXid = TransactionIdLatest(xid, nchildren, children);
+ 
+ 		/*
+ 		 * We want synchronous replication here, mark the backend
+ 		 * to wait at the end of the transaction.
+ 		 */
+ 		if (ProcMinSyncReports() > 0)
+ 		{
+ 			SpinLockAcquire(&MyProc->mutex);
+ 			MyProc->locked_for_sync = true;
+ 			MyProc->committed_xid = latestXid;
+ 			MyProc->sync_reports = 0;
+ 			SpinLockRelease(&MyProc->mutex);
+ 		}
+ 
  		(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
  	}
  
*************** RecordTransactionCommit(void)
*** 1080,1088 ****
  		END_CRIT_SECTION();
  	}
  
- 	/* Compute latestXid while we have the child XIDs handy */
- 	latestXid = TransactionIdLatest(xid, nchildren, children);
- 
  	/* Reset XactLastRecEnd until the next transaction writes something */
  	XactLastRecEnd.xrecoff = 0;
  
--- 1101,1106 ----
*************** StartTransaction(void)
*** 1675,1680 ****
--- 1693,1721 ----
  	ShowTransactionState("StartTransaction");
  }
  
+ static void
+ WaitForSynchronousReport(void)
+ {
+ 	if (TransactionIdIsValid(MyProc->committed_xid) && MyProc->locked_for_sync)
+ 	{
+ 		bool	wait;
+ 
+ 		/*
+ 		 * Loop on the pg_usleep() until some walsender unlocks and signals us
+ 		 * or every walsenders has quit.
+ 		 */
+ 		SpinLockAcquire(&MyProc->mutex);
+ 		wait = (MyProc->locked_for_sync && (MyProc->sync_reports < ProcMinSyncReports()));
+ 		SpinLockRelease(&MyProc->mutex);
+ 		while (wait)
+ 		{
+ 			pg_usleep(1000);
+ 			SpinLockAcquire(&MyProc->mutex);
+ 			wait = (MyProc->locked_for_sync && (MyProc->sync_reports < ProcMinSyncReports()));
+ 			SpinLockRelease(&MyProc->mutex);
+ 		}
+ 	}
+ }
  
  /*
   *	CommitTransaction
*************** CommitTransaction(void)
*** 1865,1870 ****
--- 1906,1913 ----
  	s->state = TRANS_DEFAULT;
  
  	RESUME_INTERRUPTS();
+ 
+ 	WaitForSynchronousReport();
  }
  
  
*************** PrepareTransaction(void)
*** 2099,2104 ****
--- 2142,2149 ----
  	s->state = TRANS_DEFAULT;
  
  	RESUME_INTERRUPTS();
+ 
+ 	WaitForSynchronousReport();
  }
  
  
diff -dcrpN pgsql.orig/src/backend/access/transam/xlog.c pgsql/src/backend/access/transam/xlog.c
*** pgsql.orig/src/backend/access/transam/xlog.c	2010-04-29 12:09:01.000000000 +0200
--- pgsql/src/backend/access/transam/xlog.c	2010-04-29 12:17:37.000000000 +0200
*************** static void XLogArchiveNotifySeg(uint32 
*** 538,544 ****
  static bool XLogArchiveCheckDone(const char *xlog);
  static bool XLogArchiveIsBusy(const char *xlog);
  static void XLogArchiveCleanup(const char *xlog);
- static void readRecoveryCommandFile(void);
  static void exitArchiveRecovery(TimeLineID endTLI,
  					uint32 endLogId, uint32 endLogSeg);
  static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
--- 538,543 ----
*************** parseRecoveryCommandFileLine(char *cmdli
*** 5076,5083 ****
   * cater for additional parameters and controls
   * possibly use a flex lexer similar to the GUC one
   */
! static void
! readRecoveryCommandFile(void)
  {
  	FILE	   *fd;
  	char		cmdline[MAXPGPATH];
--- 5075,5082 ----
   * cater for additional parameters and controls
   * possibly use a flex lexer similar to the GUC one
   */
! void
! ReadRecoveryCommandFile(void)
  {
  	FILE	   *fd;
  	char		cmdline[MAXPGPATH];
*************** readRecoveryCommandFile(void)
*** 5217,5222 ****
--- 5216,5230 ----
  					(errmsg("primary_conninfo = '%s'",
  							PrimaryConnInfo)));
  		}
+ 		else if (strcmp(tok1, "synchronous_slave") == 0)
+ 		{
+ 			if (!parse_bool(tok2, &SynchronousSlave))
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 						 errmsg("parameter \"synchronous_slave\" requires a Boolean value")));
+ 			ereport(DEBUG2,
+ 					(errmsg("synchronous_slave = '%s'", tok2)));
+ 		}
  		else if (strcmp(tok1, "trigger_file") == 0)
  		{
  			TriggerFile = pstrdup(tok2);
*************** StartupXLOG(void)
*** 5712,5718 ****
  	 * Check for recovery control file, and if so set up state for offline
  	 * recovery
  	 */
! 	readRecoveryCommandFile();
  
  	/* Now we can determine the list of expected TLIs */
  	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
--- 5720,5726 ----
  	 * Check for recovery control file, and if so set up state for offline
  	 * recovery
  	 */
! 	ReadRecoveryCommandFile();
  
  	/* Now we can determine the list of expected TLIs */
  	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
*************** StartupXLOG(void)
*** 6075,6080 ****
--- 6083,6090 ----
  		{
  			bool		recoveryContinue = true;
  			bool		recoveryApply = true;
+ 			TransactionId	   *xids = NULL;
+ 			int		n_xids, max_xids;
  			ErrorContextCallback errcontext;
  
  			InRedo = true;
*************** StartupXLOG(void)
*** 6083,6088 ****
--- 6093,6106 ----
  					(errmsg("redo starts at %X/%X",
  							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
  
+ 			if (PrimaryConnInfo && SynchronousSlave)
+ 			{
+ #define XIDS_CHUNK	(256)
+ 				max_xids = XIDS_CHUNK;
+ 				n_xids = 0;
+ 				xids = palloc(max_xids * sizeof(TransactionId));
+ 			}
+ 
  			/*
  			 * main redo apply loop
  			 */
*************** StartupXLOG(void)
*** 6155,6160 ****
--- 6173,6229 ----
  
  				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
  
+ 				/*
+ 				 * If we are using streaming replication and are a synchronous slave...
+ 				 */
+ 				if (PrimaryConnInfo && SynchronousSlave)
+ 				{
+ 					int	processed;
+ 					uint8	info = record->xl_info & ~XLR_INFO_MASK;
+ 
+ 					/*
+ 					 * If we have already encountered some XIDs previously,
+ 					 * then try to pass them to the walreceiver first.
+ 					 */
+ 					processed = WalReceiverAddXid(xids, n_xids);
+ 					if (processed > 0)
+ 					{
+ 						int	i;
+ 
+ 						for (i = processed; i < n_xids; i++)
+ 							xids[i - processed] = xids[i];
+ 						n_xids -= processed;
+ 					}
+ 
+ 					/*
+ 					 * If we are seeing a COMMIT record info and we don't
+ 					 * have a backlog, then try to pass it to the walreceiver.
+ 					 * If unsuccessful, keep it locally.
+ 					 */
+ 					if ((record->xl_rmid == RM_XACT_ID) &&
+ 						(info == XLOG_XACT_COMMIT || info == XLOG_XACT_PREPARE) &&
+ 						TransactionIdIsValid(record->xl_xid))
+ 					{
+ 						bool	backlog_append = (n_xids > 0);
+ 
+ 						if (!backlog_append)
+ 						{
+ 							processed = WalReceiverAddXid(&record->xl_xid, 1);
+ 							backlog_append = (processed == 0);
+ 						}
+ 
+ 						if (backlog_append)
+ 						{
+ 							xids[n_xids++] = record->xl_xid;
+ 							if (n_xids >= max_xids)
+ 							{
+ 								max_xids += XIDS_CHUNK;
+ 								xids = repalloc(xids, max_xids * sizeof(TransactionId));
+ 							}
+ 						}
+ 					}
+ 				}
+ 
  				/* Pop the error context stack */
  				error_context_stack = errcontext.previous;
  
*************** StartupXLOG(void)
*** 6171,6176 ****
--- 6240,6249 ----
  				record = ReadRecord(NULL, LOG, false);
  			} while (record != NULL && recoveryContinue);
  
+ 			if (PrimaryConnInfo && SynchronousSlave)
+ 				pfree(xids);
+ #undef XIDS_CHUNK
+ 
  			/*
  			 * end of main redo apply loop
  			 */
diff -dcrpN pgsql.orig/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
*** pgsql.orig/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c	2010-04-21 08:43:42.000000000 +0200
--- pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c	2010-04-29 12:24:11.000000000 +0200
*************** static bool justconnected = false;
*** 47,55 ****
  static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
--- 47,56 ----
  static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, bool *sync);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static bool libpqrcv_send(char *buffer, int len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
*************** _PG_init(void)
*** 68,73 ****
--- 69,75 ----
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
*************** _PG_init(void)
*** 75,81 ****
   * Establish the connection to the primary server for XLOG streaming
   */
  static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  {
  	char		conninfo_repl[MAXCONNINFO + 18];
  	char	   *primary_sysid;
--- 77,83 ----
   * Establish the connection to the primary server for XLOG streaming
   */
  static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, bool *sync)
  {
  	char		conninfo_repl[MAXCONNINFO + 18];
  	char	   *primary_sysid;
*************** libpqrcv_connect(char *conninfo, XLogRec
*** 147,152 ****
--- 149,158 ----
  						primary_tli, standby_tli)));
  	ThisTimeLineID = primary_tli;
  
+ 	/* Tell the server if we are a synchronous client. */
+ 	if (*sync)
+ 		*sync = PQsetDuplexCopy(streamConn);
+ 
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
*************** libpqrcv_receive(int timeout, unsigned c
*** 394,396 ****
--- 400,424 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message back to the primary. The user-visible libpq
+  * knows that we sent an 'x' message to the primary server via
+  * PQsetDuplexCopy(), so it will let us send a COPY IN message
+  * while it's still in COPY OUT state.
+  */
+ static bool
+ libpqrcv_send(char *buffer, int len)
+ {
+ 	int	result;
+ 
+ 	result = PQputCopyData(streamConn, buffer, len);
+ 	if (result < 0)
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to primary: %s",
+ 						PQerrorMessage(streamConn))));
+ 	PQflush(streamConn);
+ 
+ 	/* We don't care about sending in async mode. */
+ 	return (result >= 0);
+ }
diff -dcrpN pgsql.orig/src/backend/replication/walreceiver.c pgsql/src/backend/replication/walreceiver.c
*** pgsql.orig/src/backend/replication/walreceiver.c	2010-04-21 08:43:42.000000000 +0200
--- pgsql/src/backend/replication/walreceiver.c	2010-04-29 12:22:10.000000000 +0200
***************
*** 43,48 ****
--- 43,49 ----
  #include "miscadmin.h"
  #include "replication/walreceiver.h"
  #include "storage/ipc.h"
+ #include "storage/lwlock.h"
  #include "storage/pmsignal.h"
  #include "utils/builtins.h"
  #include "utils/guc.h"
***************
*** 52,61 ****
--- 53,64 ----
  
  /* Global variable to indicate if this process is a walreceiver process */
  bool		am_walreceiver;
+ bool		SynchronousSlave;
  
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
*************** static volatile sig_atomic_t got_SIGHUP 
*** 77,82 ****
--- 80,162 ----
  static volatile sig_atomic_t got_SIGTERM = false;
  
  /*
+  *		WalReceiverAddXid
+  *
+  * Add a new set of XIDs to the array of XIDs in flight, eventually
+  * we need to report them to the primary when they are committed.
+  *
+  * Called by StartupXLOG() with a list of known valid and
+  * to-be-committed XID, so the checks
+  * 1. whether the XID is valid, or
+  * 2. the XID was encountered before
+  * can be omitted.
+  *
+  * Returns the number of XIDs that were added to the array.
+  */
+ int
+ WalReceiverAddXid(TransactionId *xids, int n_xids)
+ {
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	int	i = 0;
+ 
+ 	LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * There shouldn't be more than MaxConnections number of elements.
+ 	 * Stay on the safe side and document that we must have the same
+ 	 * max_connections on both the primary and the secondary servers.
+ 	 */
+ 	if (walrcv->n_xids == MaxConnections)
+ 	{
+ 		LWLockRelease(SynchronousReplicationLock);
+ 		return 0;
+ 	}
+ 
+ 	while ((i < n_xids) && (walrcv->n_xids < MaxConnections))
+ 		walrcv->xids[walrcv->n_xids++] = xids[i++];
+ 
+ 	LWLockRelease(SynchronousReplicationLock);
+ 
+ 	return i;
+ }
+ 
+ /*
+  * Check for completed XIDs and
+  * send a message about them to the primary.
+  */
+ static void
+ WalRcvCheckCompletedXid(void)
+ {
+ 	int			i;
+ 	int			send;
+ 
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 	send = (walrcv->n_xids < MAX_SYNC_XIDS_SEND ? walrcv->n_xids : MAX_SYNC_XIDS_SEND);
+ 
+ 	if (send == 0)
+ 	{
+ 		LWLockRelease(SynchronousReplicationLock);
+ 		return;
+ 	}
+ 
+ 	for (i = 0; i < send; i++)
+ 		walrcv->xids[i] = htonl(walrcv->xids[i]);
+ 
+ 	walrcv_send((char *) walrcv->xids, send * sizeof(TransactionId));
+ 
+ 	for (i = send; i < walrcv->n_xids; i++)
+ 		walrcv->xids[i - send] = walrcv->xids[i];
+ 
+ 	walrcv->n_xids -= send;
+ 
+ 	LWLockRelease(SynchronousReplicationLock);
+ }
+ 
+ /*
   * LogstreamResult indicates the byte positions that we have already
   * written/fsynced.
   */
*************** WalReceiverMain(void)
*** 164,169 ****
--- 244,255 ----
  	am_walreceiver = true;
  
  	/*
+ 	 * Read the recovery.conf file so the parameters are valid
+ 	 * in the walreceiver process, too. Especially synchronous_slave.
+ 	 */
+ 	ReadRecoveryCommandFile();
+ 
+ 	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
  	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
  	 */
*************** WalReceiverMain(void)
*** 245,251 ****
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 331,337 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*************** WalReceiverMain(void)
*** 259,265 ****
  
  	/* Establish the connection to the primary for XLOG streaming */
  	EnableWalRcvImmediateExit();
! 	walrcv_connect(conninfo, startpoint);
  	DisableWalRcvImmediateExit();
  
  	/* Loop until end-of-streaming or error */
--- 345,351 ----
  
  	/* Establish the connection to the primary for XLOG streaming */
  	EnableWalRcvImmediateExit();
! 	walrcv_connect(conninfo, startpoint, &SynchronousSlave);
  	DisableWalRcvImmediateExit();
  
  	/* Loop until end-of-streaming or error */
*************** WalReceiverMain(void)
*** 309,314 ****
--- 395,403 ----
  			 */
  			XLogWalRcvFlush();
  		}
+ 
+ 		if (SynchronousSlave)
+ 			WalRcvCheckCompletedXid();
  	}
  }
  
diff -dcrpN pgsql.orig/src/backend/replication/walreceiverfuncs.c pgsql/src/backend/replication/walreceiverfuncs.c
*** pgsql.orig/src/backend/replication/walreceiverfuncs.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/replication/walreceiverfuncs.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 24,29 ****
--- 24,30 ----
  #include <signal.h>
  
  #include "access/xlog_internal.h"
+ #include "miscadmin.h"
  #include "replication/walreceiver.h"
  #include "storage/fd.h"
  #include "storage/pmsignal.h"
*************** WalRcvShmemSize(void)
*** 45,50 ****
--- 46,60 ----
  	Size		size = 0;
  
  	size = add_size(size, sizeof(WalRcvData));
+ 	/*
+ 	 * Optimally, when the primary and secondary servers are configured
+ 	 * identically, we shouldn't see more than MaxConnections number of
+ 	 * XIDs in flight visible by the slave during synchronous replication.
+ 	 * The "-1" is included in the above sizeof(WalRcvData).
+ 	 * (As we don't live in an optimal world, the StartupXLOG() code keeps
+ 	 * a local backlog of the XIDs not yet passed.)
+ 	 */
+ 	size = add_size(size, (MaxConnections - 1) * sizeof(TransactionId));
  
  	return size;
  }
diff -dcrpN pgsql.orig/src/backend/replication/walsender.c pgsql/src/backend/replication/walsender.c
*** pgsql.orig/src/backend/replication/walsender.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/replication/walsender.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 44,53 ****
--- 44,55 ----
  #include "libpq/pqformat.h"
  #include "libpq/pqsignal.h"
  #include "miscadmin.h"
+ #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
+ #include "storage/procarray.h"
  #include "tcop/tcopprot.h"
  #include "utils/guc.h"
  #include "utils/memutils.h"
*************** bool		am_walsender = false;		/* Am I a w
*** 66,71 ****
--- 68,76 ----
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
  int			WalSndDelay = 200;	/* max sleep time between some actions */
+ int			WalSndMinSyncSlaves = 0;	/* minimum number of clients to report back
+ 										in sync replication */
+ bool			StrictSyncRepl = false;	/* look for ProcSyncReplicationUnlock() for semantics */
  
  #define NAPTIME_PER_CYCLE 100000L	/* max sleep time between cycles (100ms) */
  
*************** static void WalSndHandshake(void);
*** 101,107 ****
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
! static void CheckClosedConnection(void);
  
  /*
   * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
--- 106,112 ----
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
! static void CheckMsgFromStandby(void);
  
  /*
   * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
*************** WalSndHandshake(void)
*** 295,300 ****
--- 300,317 ----
  					break;
  				}
  
+ 			case 'x':
+ 				/*
+ 				 * The other side of the wire indicates that it will send us
+ 				 * the finished XIDs. Send the response as is was an empty query.
+ 				 */
+ 				MyWalSnd->sync = true;
+ 				elog(LOG, "standby server indicated synchronous replication");
+ 
+ 				EndCommand("SYNC", DestRemote);
+ 				ReadyForQuery(DestRemote);
+ 				break;
+ 
  			case 'X':
  				/* standby is closing the connection */
  				proc_exit(0);
*************** WalSndHandshake(void)
*** 315,324 ****
  }
  
  /*
!  * Check if the remote end has closed the connection.
   */
  static void
! CheckClosedConnection(void)
  {
  	unsigned char firstchar;
  	int			r;
--- 332,380 ----
  }
  
  /*
!  * Signal the postmaster to unlock the waiting backends
   */
  static void
! WalSndHandleCompletedXids(void)
! {
! 	int			n_xids;
! 	int			i;
! 	StringInfoData		s;
! 	static TransactionId	   *xids = NULL; /* we won't deallocate this */
! 
! 	/* We allocate only once and keep this for the whole runtime of walsender. */
! 	if (xids == NULL)
! 	{
! 		xids = (TransactionId *) MemoryContextAlloc(TopMemoryContext, MAX_SYNC_XIDS_SEND * sizeof(TransactionId));
! 		if (xids == NULL)
! 			elog(PANIC, "cannot allocate memory for XID receive array");
! 	}
! 
! 	if (pq_getmessage(&s, 4 + MAX_SYNC_XIDS_SEND * sizeof(TransactionId)))
! 		ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("unexpected EOF on standby connection")));
! 
! 	n_xids = s.len / sizeof(TransactionId);
! 	if (s.len % sizeof(TransactionId))
! 		ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("invalid message length on standby connection")));
! 
! 	xids = (TransactionId *) palloc(s.len);
! 
! 	for (i = 0; i < n_xids; i++)
! 		xids[i] = pq_getmsgint(&s, 4);	/* 4 == sizeof(TransactionId) */
! 
! 	ProcSyncReplicationUnlock(xids, n_xids);
! }
! 
! /*
!  * Check if the remote end has sent us a message,
!  * like completed XIDs or closed the connection.
!  */
! static void
! CheckMsgFromStandby(void)
  {
  	unsigned char firstchar;
  	int			r;
*************** CheckClosedConnection(void)
*** 347,352 ****
--- 403,416 ----
  		case 'X':
  			proc_exit(0);
  
+ 			/*
+ 			 * 'd' means the standby is sending us some completed XIDs
+ 			 * in a COPY IN packet.
+ 			 */
+ 		case 'd':
+ 			WalSndHandleCompletedXids();
+ 			break;
+ 
  		default:
  			ereport(FATAL,
  					(errcode(ERRCODE_PROTOCOL_VIOLATION),
*************** WalSndLoop(void)
*** 420,426 ****
  			 * from other processes has arrived.
  			 */
  			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckClosedConnection();
  
  			remain -= NAPTIME_PER_CYCLE;
  		}
--- 484,490 ----
  			 * from other processes has arrived.
  			 */
  			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckMsgFromStandby();
  
  			remain -= NAPTIME_PER_CYCLE;
  		}
*************** InitWalSnd(void)
*** 496,501 ****
--- 560,585 ----
  	on_shmem_exit(WalSndKill, 0);
  }
  
+ /* Count active walsenders */
+ int
+ WalSenderSyncCount(void)
+ {
+ 	int	i;
+ 	int	found = 0;
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		if (walsnd->pid != 0 && walsnd->sync)
+ 			found++;
+ 		SpinLockRelease(&walsnd->mutex);
+ 	}
+ 
+ 	return found;
+ }
+ 
  /* Destroy the per-walsender data structure for this walsender process */
  static void
  WalSndKill(int code, Datum arg)
diff -dcrpN pgsql.orig/src/backend/storage/ipc/procarray.c pgsql/src/backend/storage/ipc/procarray.c
*** pgsql.orig/src/backend/storage/ipc/procarray.c	2010-04-28 09:08:18.000000000 +0200
--- pgsql/src/backend/storage/ipc/procarray.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 51,56 ****
--- 51,57 ----
  #include "access/xact.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
+ #include "replication/walsender.h"
  #include "storage/procarray.h"
  #include "storage/spin.h"
  #include "storage/standby.h"
*************** KnownAssignedXidsDisplay(int trace_level
*** 2966,2968 ****
--- 2967,3054 ----
  
  	pfree(buf.data);
  }
+ 
+ /*
+  * getProcArray - because our diagnostics are in contrib
+  */
+ void *
+ getProcArray(void)
+ {
+ 	return procArray;
+ }
+ 
+ /*
+  * Compute minimum expected reports from synchronous secondary servers
+  */
+ int
+ ProcMinSyncReports(void)
+ {
+ 	int	min_reports;
+ 
+ 	/*
+ 	 * Limit expected minimum reports to the maximum number of walsenders
+ 	 * if the former exceeds the latter.
+ 	 */
+ 	min_reports = WalSndMinSyncSlaves;
+ 	if (min_reports > max_wal_senders)
+ 		min_reports = max_wal_senders;
+ 
+ 	/*
+ 	 * If strict_sync_replication is not set, further limit the expected
+ 	 * minimum number of reports to the actual number of sync walsenders.
+ 	 */
+ 	if (!StrictSyncRepl)
+ 	{
+ 		int	wal_senders = WalSenderSyncCount();
+ 
+ 		if (min_reports > wal_senders)
+ 			min_reports = wal_senders;
+ 	}
+ 
+ 	return min_reports;
+ }
+ 
+ /*
+  * Increase the synchronous report count for the processes with
+  * the XIDs found in the array. Called by the walsender processes.
+  */
+ void
+ ProcSyncReplicationUnlock(TransactionId *xids, int n_xids)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int		i, j;
+ 	int		min_reports = ProcMinSyncReports();
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 
+ 	for (i = 0; i < arrayP->numProcs; i++)
+ 	{
+ 		bool	found = false;
+ 
+ 		SpinLockAcquire(&arrayP->procs[i]->mutex);
+ 		if (arrayP->procs[i]->locked_for_sync &&
+ 			TransactionIdIsValid(arrayP->procs[i]->committed_xid))
+ 		{
+ 			for (j = 0; j < n_xids; j++)
+ 			{
+ 				if (xids[j] == arrayP->procs[i]->committed_xid)
+ 				{
+ 					arrayP->procs[i]->sync_reports++;
+ 					if (arrayP->procs[i]->sync_reports >= min_reports)
+ 					{
+ 						arrayP->procs[i]->locked_for_sync = false;
+ 						arrayP->procs[i]->committed_xid = InvalidTransactionId;
+ 						arrayP->procs[i]->sync_reports = 0;
+ 						
+ 						SendProcSignal(arrayP->procs[i]->pid, PROCSIG_SYNCREP_UNLOCKED, arrayP->procs[i]->backendId);
+ 					}
+ 					found = true;
+ 					break;
+ 				}
+ 			}
+ 		}
+ 		SpinLockRelease(&arrayP->procs[i]->mutex);
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ }
diff -dcrpN pgsql.orig/src/backend/storage/ipc/procsignal.c pgsql/src/backend/storage/ipc/procsignal.c
*** pgsql.orig/src/backend/storage/ipc/procsignal.c	2010-02-26 03:01:00.000000000 +0100
--- pgsql/src/backend/storage/ipc/procsignal.c	2010-04-29 12:17:37.000000000 +0200
*************** procsignal_sigusr1_handler(SIGNAL_ARGS)
*** 278,282 ****
--- 278,287 ----
  	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
  		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
  
+ 	/*
+ 	 * PROCSIG_SYNCREP_UNLOCKED doesn't need any special attention,
+ 	 * it only wakes up the backend from pg_usleep()
+ 	 */
+ 
  	errno = save_errno;
  }
diff -dcrpN pgsql.orig/src/backend/storage/lmgr/proc.c pgsql/src/backend/storage/lmgr/proc.c
*** pgsql.orig/src/backend/storage/lmgr/proc.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/storage/lmgr/proc.c	2010-04-29 12:17:37.000000000 +0200
*************** InitProcess(void)
*** 331,336 ****
--- 331,341 ----
  		SHMQueueInit(&(MyProc->myProcLocks[i]));
  	MyProc->recoveryConflictPending = false;
  
+ 	SpinLockInit(&MyProc->mutex);
+ 	MyProc->locked_for_sync = false;
+ 	MyProc->committed_xid = InvalidTransactionId;
+ 	MyProc->sync_reports = 0;
+ 
  	/*
  	 * We might be reusing a semaphore that belonged to a failed process. So
  	 * be careful and reinitialize its value here.	(This is not strictly
*************** LockWaitCancel(void)
*** 621,626 ****
--- 626,644 ----
  	 */
  }
  
+ /*
+  * Cancel waiting for the synchronous replication.
+  */
+ void
+ SyncWaitCancel(void)
+ {
+ 	/*
+ 	 * Don't bother to take the spinlock, we're called from die().
+ 	 * This backend may be under SpinLockAcquire() because we're
+ 	 * looping on locked_for_sync to become true.
+ 	 */
+ 	MyProc->locked_for_sync = false;
+ }
  
  /*
   * ProcReleaseLocks() -- release locks associated with current transaction
diff -dcrpN pgsql.orig/src/backend/tcop/postgres.c pgsql/src/backend/tcop/postgres.c
*** pgsql.orig/src/backend/tcop/postgres.c	2010-04-20 21:22:20.000000000 +0200
--- pgsql/src/backend/tcop/postgres.c	2010-04-29 12:17:37.000000000 +0200
*************** die(SIGNAL_ARGS)
*** 2670,2675 ****
--- 2670,2676 ----
  			/* bump holdoff count to make ProcessInterrupts() a no-op */
  			/* until we are done getting ready for it */
  			InterruptHoldoffCount++;
+ 			SyncWaitCancel();	/* the process may be still waiting for sync replication */
  			LockWaitCancel();	/* prevent CheckDeadLock from running */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
diff -dcrpN pgsql.orig/src/backend/utils/misc/guc.c pgsql/src/backend/utils/misc/guc.c
*** pgsql.orig/src/backend/utils/misc/guc.c	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/backend/utils/misc/guc.c	2010-04-29 12:17:37.000000000 +0200
*************** static struct config_bool ConfigureNames
*** 1263,1268 ****
--- 1263,1277 ----
  		false, NULL, NULL
  	},
  
+ 	{
+ 		{"strict_sync_replication", PGC_SUSET, WAL_REPLICATION,
+ 			gettext_noop("Enables strict synchronous replication."),
+ 			gettext_noop("Transactions will stall until at least \"min_sync_replication_clients\" number of slaves are connected"),
+ 		},
+ 		&StrictSyncRepl,
+ 		false, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
*************** static struct config_int ConfigureNamesI
*** 1738,1743 ****
--- 1747,1761 ----
  	},
  
  	{
+ 		{"min_sync_replication_clients", PGC_POSTMASTER, WAL_REPLICATION,
+ 			gettext_noop("Sets the minimum number of secondary servers to report back in synchronous replication."),
+ 			NULL
+ 		},
+ 		&WalSndMinSyncSlaves,
+ 		0, 0, INT_MAX / 4, NULL, NULL
+ 	},
+ 
+ 	{
  		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
  			gettext_noop("Sets the delay in microseconds between transaction commit and "
  						 "flushing WAL to disk."),
diff -dcrpN pgsql.orig/src/backend/utils/misc/postgresql.conf.sample pgsql/src/backend/utils/misc/postgresql.conf.sample
*** pgsql.orig/src/backend/utils/misc/postgresql.conf.sample	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/backend/utils/misc/postgresql.conf.sample	2010-04-29 12:17:37.000000000 +0200
***************
*** 195,200 ****
--- 195,210 ----
  #max_wal_senders = 0		# max number of walsender processes
  #wal_sender_delay = 200ms	# 1-10000 milliseconds
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
+ #min_sync_replication_clients = 0
+ 				# min number of walreceiver processes needed
+ 				# to report back for synchronous replication
+ 				# 0 sets async replication; default: 0
+ #strict_sync_replication = off	# strict synchronous replication
+ 				# if the number of sync walsender processes are
+ 				# less than min_sync_replication_clients then
+ 				# the clients will be locked until enough
+ 				# secondary servers connect and report back
+ 				# default: off
  
  
  #------------------------------------------------------------------------------
diff -dcrpN pgsql.orig/src/include/access/xlog.h pgsql/src/include/access/xlog.h
*** pgsql.orig/src/include/access/xlog.h	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/include/access/xlog.h	2010-04-29 12:17:37.000000000 +0200
*************** extern uint64 GetSystemIdentifier(void);
*** 289,294 ****
--- 289,295 ----
  extern Size XLOGShmemSize(void);
  extern void XLOGShmemInit(void);
  extern void BootStrapXLOG(void);
+ extern void ReadRecoveryCommandFile(void);
  extern void StartupXLOG(void);
  extern void ShutdownXLOG(int code, Datum arg);
  extern void InitXLOGAccess(void);
diff -dcrpN pgsql.orig/src/include/replication/walreceiver.h pgsql/src/include/replication/walreceiver.h
*** pgsql.orig/src/include/replication/walreceiver.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/replication/walreceiver.h	2010-04-29 12:22:02.000000000 +0200
***************
*** 16,21 ****
--- 16,22 ----
  #include "storage/spin.h"
  
  extern bool am_walreceiver;
+ extern bool SynchronousSlave;
  
  /*
   * MAXCONNINFO: maximum size of a connection string.
*************** extern bool am_walreceiver;
*** 25,30 ****
--- 26,36 ----
  #define MAXCONNINFO		1024
  
  /*
+  * Maximum number of XIDs sent back at once by walreceiver
+  */
+ #define MAX_SYNC_XIDS_SEND	(256)
+ 
+ /*
   * Values for WalRcv->walRcvState.
   */
  typedef enum
*************** typedef struct
*** 60,77 ****
  	XLogRecPtr	receivedUpto;
  
  	slock_t		mutex;			/* locks shared variables shown above */
  } WalRcvData;
  
  extern WalRcvData *WalRcv;
  
  /* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
  typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
--- 66,89 ----
  	XLogRecPtr	receivedUpto;
  
  	slock_t		mutex;			/* locks shared variables shown above */
+ 
+ 	int32		n_xids;			/* number of currently stored XIDs */
+ 	TransactionId	xids[1];		/* variable number of XIDs to report back to the primary */
  } WalRcvData;
  
  extern WalRcvData *WalRcv;
  
  /* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint, bool *sync);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
  typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef bool (*walrcv_send_type) (char *buffer, int len);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*************** extern bool WalRcvInProgress(void);
*** 83,87 ****
--- 95,100 ----
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
  extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+ extern int WalReceiverAddXid(TransactionId *xids, int n_xids);
  
  #endif   /* _WALRECEIVER_H */
diff -dcrpN pgsql.orig/src/include/replication/walsender.h pgsql/src/include/replication/walsender.h
*** pgsql.orig/src/include/replication/walsender.h	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/include/replication/walsender.h	2010-04-29 12:18:46.000000000 +0200
***************
*** 21,26 ****
--- 21,27 ----
  typedef struct WalSnd
  {
  	pid_t		pid;			/* this walsender's process id, or 0 */
+ 	bool		sync;			/* the walreceiver on the other side is synchronous slave */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
  
  	slock_t		mutex;			/* locks shared variables shown above */
*************** extern bool am_walsender;
*** 40,47 ****
--- 41,51 ----
  /* user-settable parameters */
  extern int	WalSndDelay;
  extern int	max_wal_senders;
+ extern int	WalSndMinSyncSlaves;
+ extern bool	StrictSyncRepl;
  
  extern int	WalSenderMain(void);
+ extern int	WalSenderSyncCount(void);
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
diff -dcrpN pgsql.orig/src/include/storage/lwlock.h pgsql/src/include/storage/lwlock.h
*** pgsql.orig/src/include/storage/lwlock.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/storage/lwlock.h	2010-04-29 12:17:37.000000000 +0200
*************** typedef enum LWLockId
*** 70,75 ****
--- 70,76 ----
  	RelationMappingLock,
  	AsyncCtlLock,
  	AsyncQueueLock,
+ 	SynchronousReplicationLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -dcrpN pgsql.orig/src/include/storage/procarray.h pgsql/src/include/storage/procarray.h
*** pgsql.orig/src/include/storage/procarray.h	2010-01-23 17:37:12.000000000 +0100
--- pgsql/src/include/storage/procarray.h	2010-04-29 12:17:37.000000000 +0200
*************** extern void XidCacheRemoveRunningXids(Tr
*** 71,74 ****
--- 71,78 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
  
+ extern void *getProcArray(void);
+ extern int	ProcMinSyncReports(void);
+ extern void ProcSyncReplicationUnlock(TransactionId *xids, int n_xids);
+ 
  #endif   /* PROCARRAY_H */
diff -dcrpN pgsql.orig/src/include/storage/proc.h pgsql/src/include/storage/proc.h
*** pgsql.orig/src/include/storage/proc.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/storage/proc.h	2010-04-29 12:17:37.000000000 +0200
***************
*** 15,20 ****
--- 15,21 ----
  #define _PROC_H_
  
  #include "storage/lock.h"
+ #include "storage/s_lock.h"
  #include "storage/pg_sema.h"
  #include "utils/timestamp.h"
  
*************** struct PGPROC
*** 123,128 ****
--- 124,135 ----
  	SHM_QUEUE	myProcLocks[NUM_LOCK_PARTITIONS];
  
  	struct XidCache subxids;	/* cache for subtransaction XIDs */
+ 
+ 	slock_t		mutex;		/* protects the shared variables below */
+ 	volatile bool	locked_for_sync;	/* the backend waits for synchronous replication */
+ 	TransactionId	committed_xid;	/* xid saved here after COMMIT,
+ 						we lock on it for synchronous replication */
+ 	int		sync_reports;	/* number of sync replication clients that reported COMMIT */
  };
  
  /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
*************** extern PGPROC *ProcWakeup(PGPROC *proc, 
*** 191,196 ****
--- 198,204 ----
  extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
  extern bool IsWaitingForLock(void);
  extern void LockWaitCancel(void);
+ extern void SyncWaitCancel(void);
  
  extern void ProcWaitForSignal(void);
  extern void ProcSendSignal(int pid);
diff -dcrpN pgsql.orig/src/include/storage/procsignal.h pgsql/src/include/storage/procsignal.h
*** pgsql.orig/src/include/storage/procsignal.h	2010-02-26 03:01:28.000000000 +0100
--- pgsql/src/include/storage/procsignal.h	2010-04-29 12:17:37.000000000 +0200
*************** typedef enum
*** 40,45 ****
--- 40,48 ----
  	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
  	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
  
+ 	/* Synchronous replication reason */
+ 	PROCSIG_SYNCREP_UNLOCKED,
+ 
  	NUM_PROCSIGNALS				/* Must be last! */
  } ProcSignalReason;
  
diff -dcrpN pgsql.orig/src/interfaces/libpq/exports.txt pgsql/src/interfaces/libpq/exports.txt
*** pgsql.orig/src/interfaces/libpq/exports.txt	2010-01-28 07:28:26.000000000 +0100
--- pgsql/src/interfaces/libpq/exports.txt	2010-04-29 12:17:37.000000000 +0200
*************** PQescapeLiteral           154
*** 157,159 ****
--- 157,160 ----
  PQescapeIdentifier        155
  PQconnectdbParams         156
  PQconnectStartParams      157
+ PQsetDuplexCopy           158
diff -dcrpN pgsql.orig/src/interfaces/libpq/fe-exec.c pgsql/src/interfaces/libpq/fe-exec.c
*** pgsql.orig/src/interfaces/libpq/fe-exec.c	2010-02-26 03:01:32.000000000 +0100
--- pgsql/src/interfaces/libpq/fe-exec.c	2010-04-29 12:17:38.000000000 +0200
***************
*** 16,21 ****
--- 16,22 ----
  
  #include <ctype.h>
  #include <fcntl.h>
+ #include <stdio.h>
  
  #include "libpq-fe.h"
  #include "libpq-int.h"
*************** PQputCopyData(PGconn *conn, const char *
*** 2010,2016 ****
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2011,2018 ----
  {
  	if (!conn)
  		return -1;
! 	if (	(!conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_IN) ||
! 		( conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_OUT) )
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
*************** PQgetCopyData(PGconn *conn, char **buffe
*** 2174,2179 ****
--- 2176,2211 ----
  }
  
  /*
+  * PQsetDuplexCopy - set the duplex copy flag
+  *
+  * This makes PQputCopyData() able to pass an arbitrary message while
+  * COPY OUT is in progress. This is for synchronous replication.
+  */
+ int
+ PQsetDuplexCopy(PGconn *conn)
+ {
+ 	PGresult   *res;
+ 
+ 	if (pqPutMsgStart('x', false, conn) < 0 ||
+ 		pqPutMsgEnd(conn) < 0)
+ 		goto sendFailed;
+ 
+ 	conn->asyncStatus = PGASYNC_BUSY;
+ 
+ 	res = PQexecFinish(conn);
+ 
+ 	conn->duplexCopy = (PQresultStatus(res) == PGRES_COMMAND_OK);
+ 
+ 	PQclear(res);
+ 
+ 	return (conn->duplexCopy ? 1 : 0);
+ 
+ sendFailed:
+ 	pqHandleSendFailure(conn);
+ 	return 0;
+ }
+ 
+ /*
   * PQgetline - gets a newline-terminated string from the backend.
   *
   * Chiefly here so that applications can use "COPY <rel> to stdout"
diff -dcrpN pgsql.orig/src/interfaces/libpq/libpq-fe.h pgsql/src/interfaces/libpq/libpq-fe.h
*** pgsql.orig/src/interfaces/libpq/libpq-fe.h	2010-02-26 03:01:33.000000000 +0100
--- pgsql/src/interfaces/libpq/libpq-fe.h	2010-04-29 12:17:38.000000000 +0200
*************** extern PGnotify *PQnotifies(PGconn *conn
*** 391,396 ****
--- 391,397 ----
  extern int	PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
  extern int	PQputCopyEnd(PGconn *conn, const char *errormsg);
  extern int	PQgetCopyData(PGconn *conn, char **buffer, int async);
+ extern int	PQsetDuplexCopy(PGconn *conn);
  
  /* Deprecated routines for copy in/out */
  extern int	PQgetline(PGconn *conn, char *string, int length);
diff -dcrpN pgsql.orig/src/interfaces/libpq/libpq-int.h pgsql/src/interfaces/libpq/libpq-int.h
*** pgsql.orig/src/interfaces/libpq/libpq-int.h	2010-03-13 15:55:57.000000000 +0100
--- pgsql/src/interfaces/libpq/libpq-int.h	2010-04-29 12:17:38.000000000 +0200
*************** struct pg_conn
*** 362,367 ****
--- 362,368 ----
  	pgParameterStatus *pstatus; /* ParameterStatus data */
  	int			client_encoding;	/* encoding id */
  	bool		std_strings;	/* standard_conforming_strings */
+ 	bool		duplexCopy;	/* COPY IN/OUT in duplex mode, for synchronous replication */
  	PGVerbosity verbosity;		/* error/notice message verbosity */
  	PGlobjfuncs *lobjfuncs;		/* private state for large-object access fns */
  