Synchronous replication patch built on SR

Started by Nonameover 15 years ago3 messages
#1Noname
zb@cybertec.at
1 attachment(s)

Resending, my ISP lost my mail yesterday. :-(

===========================================================

Hi,

attached is a patch that does $SUBJECT, we are submitting it for 9.1.
I have updated it to today's CVS after the "wal_level" GUC went in.

How does it work?

First, the walreceiver and the walsender are now able to communicate
in a duplex way on the same connection, so while COPY OUT is
in progress from the primary server, the standby server is able to
issue PQputCopyData() to pass the transaction IDs that were seen
with XLOG_XACT_COMMIT or XLOG_XACT_PREPARE
signatures. I did by adding a new protocol message type, with letter
'x' that's only acknowledged by the walsender process. The regular
backend was intentionally unchanged so an SQL client gets a protocol
error. A new libpq call called PQsetDuplexCopy() which sends this
new message before sending START_REPLICATION. The primary
makes a note of it in the walsender process' entry.

I had to move the TransactionIdLatest(xid, nchildren, children) call
that computes latestXid earlier in RecordTransactionCommit(), so
it's in the critical section now, just before the
XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata)
call. Otherwise, there was a race condition between the primary
and the standby server, where the standby server might have seen
the XLOG_XACT_COMMIT record for some XIDs before the
transaction in the primary server marked itself waiting for this XID,
resulting in stuck transactions.

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

3. synchronous_slave = boolean (in recovery.conf)

this instructs the standby server to tell the primary that it's a
synchronous
replication server and it will send the committed XIDs back to the primary.

I also added a contrib module for monitoring the synchronous replication
but it abuses the procarray.c code by exposing the procArray pointer
which is ugly. It's either need to be abandoned or moved to core if or when
this code is discussed enough. :-)

Best regards,
Zoltán Böszörményi

Attachments:

pg91-syncrep-15-ctxdiff.patchtext/x-patch; name=pg91-syncrep-15-ctxdiff.patchDownload
diff -dcrpN pgsql.orig/contrib/Makefile pgsql/contrib/Makefile
*** pgsql.orig/contrib/Makefile	2009-11-18 22:57:56.000000000 +0100
--- pgsql/contrib/Makefile	2010-04-29 12:17:37.000000000 +0200
*************** SUBDIRS = \
*** 37,42 ****
--- 37,43 ----
  		pgstattuple	\
  		seg		\
  		spi		\
+ 		syncreputils	\
  		tablefunc	\
  		test_parser	\
  		tsearch2	\
diff -dcrpN pgsql.orig/contrib/syncreputils/Makefile pgsql/contrib/syncreputils/Makefile
*** pgsql.orig/contrib/syncreputils/Makefile	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/Makefile	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,17 ----
+ # $PostgreSQL: pgsql/contrib/pg_stat_statements/Makefile,v 1.1 2009/01/04 22:19:59 tgl Exp $
+ 
+ MODULE_big = syncreputils
+ DATA_built = syncreputils.sql
+ DATA = uninstall_syncreputils.sql
+ OBJS = syncreputils.o
+ 
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/pg_stat_statements
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.c pgsql/contrib/syncreputils/syncreputils.c
*** pgsql.orig/contrib/syncreputils/syncreputils.c	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,223 ----
+ #include "postgres.h"
+ #include "fmgr.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "replication/walsender.h"
+ #include "replication/walreceiver.h"
+ #include "storage/lwlock.h"
+ #include "storage/proc.h"
+ #include "storage/procarray.h"
+ 
+ #include "syncreputils.h"
+ 
+ PG_MODULE_MAGIC;
+ 
+ PG_FUNCTION_INFO_V1(syncrep_waiting_xact);
+ PG_FUNCTION_INFO_V1(syncrep_queued_xact);
+ 
+ /*
+  *	syncrep_waiting_xact(
+  *		out int4 pid,
+  *		out xid xact,
+  *		out reports int4,
+  *		out max_reports int4)
+  */
+ Datum
+ syncrep_waiting_xact(PG_FUNCTION_ARGS)
+ {
+ 	FuncCallContext	   *funcctx;
+ 	TupleDesc		ret_tupdesc;
+ 	AttInMetadata	   *attinmeta;
+ 	MemoryContext		oldcontext;
+ 	int			call_cntr;
+ 	int			max_calls;
+ 	Datum			result[4];
+ 	bool                    isnull[4] = { false, false, false, false };
+ 	srwx		   *fctx;
+ 
+ 	/* stuff done only on the first call of the function */
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		TupleDesc	tupdesc;
+ 		int		i, found;
+ 		int		min_reports;
+ 		ProcArrayStruct *arrayP = getProcArray();
+ 
+ 		if (RecoveryInProgress())
+ 			elog(ERROR, "this function is only usable on the primary server");
+ 
+ 		if (max_wal_senders == 0)
+ 			elog(ERROR, "this function is useless on a standalone server");
+ 
+ 		if (WalSndMinSyncSlaves == 0)
+ 			elog(ERROR, "this function is useless without synchronous replication");
+ 
+ 		/* create a function context for cross-call persistence */
+ 		funcctx = SRF_FIRSTCALL_INIT();
+ 
+ 		/*
+ 		 * switch to memory context appropriate for multiple function calls
+ 		 */
+ 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ 
+ 		switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ 		{
+ 			case TYPEFUNC_COMPOSITE:
+ 				/* success */
+ 				break;
+ 			case TYPEFUNC_RECORD:
+ 				/* failed to determine actual type of RECORD */
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 						 errmsg("function returning record called in context "
+ 						 	"that cannot accept type record")));
+ 				break;
+ 			default:
+ 				/* result type isn't composite */
+ 				elog(ERROR, "return type must be a row type");
+ 				break;
+ 		}
+ 
+ 		/* make sure we have a persistent copy of the tupdesc */
+ 		tupdesc = CreateTupleDescCopy(tupdesc);
+ 
+ 		/*
+ 		 * Generate attribute metadata needed later to produce tuples from raw
+ 		 * C strings
+ 		 */
+ 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ 		funcctx->attinmeta = attinmeta;
+ 
+ 		/* allocate memory */
+ 		fctx = (srwx *)palloc(MaxConnections * sizeof(srwx));
+ 
+ 		min_reports = ProcMinSyncReports();
+ 
+ 		LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 		for (i = found = 0; i < arrayP->numProcs; i++)
+ 		{
+ 			SpinLockAcquire(&arrayP->procs[i]->mutex);
+ 			if (arrayP->procs[i]->locked_for_sync)
+ 			{
+ 				fctx[found].pid = arrayP->procs[i]->pid;
+ 				fctx[found].xid = arrayP->procs[i]->committed_xid;
+ 				fctx[found].reports = arrayP->procs[i]->sync_reports;
+ 				fctx[found].min_reports = min_reports;
+ 
+ 				found++;
+ 			}
+ 			SpinLockRelease(&arrayP->procs[i]->mutex);
+ 		}
+ 
+ 		LWLockRelease(ProcArrayLock);
+ 
+ 		/* total number of tuples to be returned */
+ 		funcctx->max_calls = found;
+ 
+ 		funcctx->user_fctx = fctx;
+ 
+ 		MemoryContextSwitchTo(oldcontext);
+ 	}
+ 
+ 	funcctx = SRF_PERCALL_SETUP();
+ 
+ 	call_cntr = funcctx->call_cntr;
+ 	max_calls = funcctx->max_calls;
+ 	fctx = funcctx->user_fctx;
+ 
+ 	/* attribute return type and return tuple description */
+ 	attinmeta = funcctx->attinmeta;
+ 	ret_tupdesc = attinmeta->tupdesc;
+ 
+ 	/* are there any records left? */
+ 	if (call_cntr < max_calls)	/* do when there is more left to send */
+ 	{
+ 		HeapTuple	tuple;
+ 
+ 		result[0] = Int32GetDatum(fctx[call_cntr].pid);
+ 		result[1] = TransactionIdGetDatum(fctx[call_cntr].xid);
+ 		result[2] = Int32GetDatum(fctx[call_cntr].reports);
+ 		result[3] = Int32GetDatum(fctx[call_cntr].min_reports);
+ 
+ 		tuple = heap_form_tuple(ret_tupdesc, result, isnull);
+ 
+ 		/* send the result */
+ 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ 	}
+ 	else
+ 	{
+ 		/* do when there is no more left */
+ 		SRF_RETURN_DONE(funcctx);
+ 	}
+ }
+ 
+ /*
+  *	syncrep_queued_xact() RETURNS SETOF xid
+  */
+ Datum
+ syncrep_queued_xact(PG_FUNCTION_ARGS)
+ {
+ 	FuncCallContext	   *funcctx;
+ 	MemoryContext		oldcontext;
+ 	int			call_cntr;
+ 	int			max_calls;
+ 	int			i;
+ 	TransactionId	   *fctx;
+ 
+ 	/* stuff done only on the first call of the function */
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		/* create a function context for cross-call persistence */
+ 		funcctx = SRF_FIRSTCALL_INIT();
+ 
+ 		if (!RecoveryInProgress())
+ 			elog(ERROR, "this function is only usable on a secondary server");
+ 
+ 		if (!WalRcvInProgress())
+ 			elog(ERROR, "this function is only usable on a secondary server");
+ 
+ 		/*
+ 		 * switch to memory context appropriate for multiple function calls
+ 		 */
+ 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ 
+ 		LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 		/* allocate memory */
+ 		fctx = (TransactionId *)palloc(WalRcv->n_xids * sizeof(TransactionId));
+ 
+ 		/* Copy current state */
+ 		for (i = 0; i < WalRcv->n_xids; i++)
+ 			fctx[i] = WalRcv->xids[i];
+ 
+ 		/* total number of tuples to be returned */
+ 		funcctx->max_calls = WalRcv->n_xids;
+ 
+ 		funcctx->user_fctx = fctx;
+ 
+ 		LWLockRelease(SynchronousReplicationLock);
+ 
+ 		MemoryContextSwitchTo(oldcontext);
+ 	}
+ 
+ 	funcctx = SRF_PERCALL_SETUP();
+ 
+ 	call_cntr = funcctx->call_cntr;
+ 	max_calls = funcctx->max_calls;
+ 	fctx = funcctx->user_fctx;
+ 
+ 	/* are there any records left? */
+ 	if (call_cntr < max_calls)	/* do when there is more left to send */
+ 	{
+ 		return TransactionIdGetDatum(fctx[call_cntr]);
+ 	}
+ 	else
+ 	{
+ 		/* do when there is no more left */
+ 		pfree(fctx);
+ 		funcctx->user_fctx = NULL;
+ 
+ 		SRF_RETURN_DONE(funcctx);
+ 	}
+ }
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.h pgsql/contrib/syncreputils/syncreputils.h
*** pgsql.orig/contrib/syncreputils/syncreputils.h	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.h	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,38 ----
+ #ifndef SYNCREPUTILS_H
+ #define SYNCREPUTILS_H
+ 
+ /* Stolen from procarray.c in 9.0 */
+ typedef struct ProcArrayStruct
+ {
+ 	int			numProcs;		/* number of valid procs entries */
+ 	int			maxProcs;		/* allocated size of procs array */
+ 
+ 	int			numKnownAssignedXids;	/* current number of known assigned
+ 										 * xids */
+ 	int			maxKnownAssignedXids;	/* allocated size of known assigned
+ 										 * xids */
+ 
+ 	/*
+ 	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
+ 	 * overflowing cached subxids in PGPROC entries.
+ 	 */
+ 	TransactionId lastOverflowedXid;
+ 
+ 	/*
+ 	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
+ 	 * actually it is maxProcs entries long.
+ 	 */
+ 	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
+ } ProcArrayStruct;
+ 
+ typedef struct {
+ 	int4		pid;
+ 	TransactionId	xid;
+ 	int4		reports;
+ 	int4		min_reports;
+ } srwx;
+ 
+ extern Datum syncrep_waiting_xact(PG_FUNCTION_ARGS);
+ extern Datum syncrep_queued_xact(PG_FUNCTION_ARGS);
+ 
+ #endif
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.sql.in pgsql/contrib/syncreputils/syncreputils.sql.in
*** pgsql.orig/contrib/syncreputils/syncreputils.sql.in	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.sql.in	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,9 ----
+ -- Lists the PIDs and XIDs that are waiting for synchronous replication report
+ -- Runnable on the primary server
+ CREATE OR REPLACE FUNCTION syncrep_waiting_xact(out pid int4, out xact xid, out reports int4, out min_reports int4)
+ RETURNS SETOF RECORD AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE;
+ 
+ -- Lists the XIDs that are queued for synchronous replication
+ -- Runnable on the secondary server
+ CREATE OR REPLACE FUNCTION syncrep_queued_xact()
+ RETURNS SETOF xid AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE;
diff -dcrpN pgsql.orig/contrib/syncreputils/uninstall_syncreputils.sql pgsql/contrib/syncreputils/uninstall_syncreputils.sql
*** pgsql.orig/contrib/syncreputils/uninstall_syncreputils.sql	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/uninstall_syncreputils.sql	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,2 ----
+ DROP FUNCTION syncrep_waiting_xact(out int4, out xid);
+ DROP FUNCTION syncrep_queued_xact();
diff -dcrpN pgsql.orig/doc/src/sgml/config.sgml pgsql/doc/src/sgml/config.sgml
*** pgsql.orig/doc/src/sgml/config.sgml	2010-04-29 12:08:59.000000000 +0200
--- pgsql/doc/src/sgml/config.sgml	2010-04-29 12:17:37.000000000 +0200
*************** SET ENABLE_SEQSCAN TO OFF;
*** 1913,1918 ****
--- 1913,1955 ----
         </para>
         </listitem>
        </varlistentry>
+ 
+       <varlistentry id="guc-min-sync-replication-clients" xreflabel="min_sync_replication_clients">
+        <term><varname>min_sync_replication_clients</varname> (<type>integer</type>)</term>
+        <indexterm>
+         <primary><varname>min_sync_replication_clients</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies the number of replication clients that need to report back
+         in synchronous replication. COMMITted transactions will hold until
+         enough clients report back. Meaningful values are zero ... <varname>max_wal_senders</>.
+         Zero (the default value) means asynchronous replication.
+         Greater values may mean transactions (and clients) held back longer.
+        </para>
+        </listitem>
+       </varlistentry>
+ 
+       <varlistentry id="guc-strict-sync-replication" xreflabel="strict_sync_replication">
+        <term><varname>strict_sync_replication</varname> (<type>boolean</type>)</term>
+        <indexterm>
+         <primary><varname>strict_sync_replication</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies whether the synchronous replication strictly expects
+         <varname>min_sync_replication_clients</> reports from replication clients
+         or the number of expected reports is limited to the actual number of
+         connected synchronous replication clients. Default value is off
+         meaning that transactions and SQL clients will not be held back
+         if the number of connected synchronous replication clients is
+         smaller than <varname>min_sync_replication_clients</>. Turning it on
+         causes transactions and SQL clients to be held back (stalled) until
+         enough synchronous replication clients connect and report back.
+        </para>
+        </listitem>
+       </varlistentry>
+ 
       </variablelist>
      </sect2>
      <sect2 id="runtime-config-standby">
diff -dcrpN pgsql.orig/doc/src/sgml/protocol.sgml pgsql/doc/src/sgml/protocol.sgml
*** pgsql.orig/doc/src/sgml/protocol.sgml	2010-04-03 09:22:55.000000000 +0200
--- pgsql/doc/src/sgml/protocol.sgml	2010-04-29 12:17:37.000000000 +0200
*************** Terminate (F)
*** 3953,3958 ****
--- 3953,3996 ----
  </varlistentry>
  
  
+ <varlistentry>
+ <term>
+ Set Duplex Copy (F)
+ </term>
+ <listitem>
+ <para>
+ 
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('x')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as an indicator that the frontend is
+                 a synchronous replication client. Only used between the primary
+                 and the standby servers during replication handshake. Regular
+                 frontends receive a protocol error.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32(4)
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ 
+ </para>
+ </listitem>
+ </varlistentry>
+ 
+ 
  </variablelist>
  
  </sect1>
diff -dcrpN pgsql.orig/doc/src/sgml/recovery-config.sgml pgsql/doc/src/sgml/recovery-config.sgml
*** pgsql.orig/doc/src/sgml/recovery-config.sgml	2010-04-29 12:09:00.000000000 +0200
--- pgsql/doc/src/sgml/recovery-config.sgml	2010-04-29 12:17:37.000000000 +0200
*************** restore_command = 'copy "C:\\server\\arc
*** 279,284 ****
--- 279,297 ----
           </para>
          </listitem>
         </varlistentry>
+        <varlistentry id="synchronous-slave" xreflabel="synchronous_slave">
+         <term><varname>synchronous_slave</varname> (<type>boolean</type>)</term>
+         <listitem>
+          <para>
+           Specifies whether this replication clients acts as a synchronous
+           replication client. This means that upon replaying the WAL stream
+           the standby server is looking for WAL records with COMMIT or
+           PREPARE TRANSACTION signature. Transaction IDs of such WAL records
+           are sent back to the primary server. Default value is off, meaning
+           that the replication client acts asynchronously.
+          </para>
+         </listitem>
+        </varlistentry>
         <varlistentry id="trigger-file" xreflabel="trigger_file">
          <term><varname>trigger_file</varname> (<type>string</type>)</term>
          <indexterm>
diff -dcrpN pgsql.orig/src/backend/access/transam/twophase.c pgsql/src/backend/access/transam/twophase.c
*** pgsql.orig/src/backend/access/transam/twophase.c	2010-04-28 09:08:17.000000000 +0200
--- pgsql/src/backend/access/transam/twophase.c	2010-04-29 14:07:40.000000000 +0200
***************
*** 58,63 ****
--- 58,64 ----
  #include "storage/fd.h"
  #include "storage/procarray.h"
  #include "storage/sinvaladt.h"
+ #include "storage/spin.h"
  #include "storage/smgr.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
*************** EndPrepare(GlobalTransaction gxact)
*** 1019,1024 ****
--- 1020,1038 ----
  
  	MyProc->inCommit = true;
  
+ 	/*
+ 	 * We want synchronous replication here, mark the backend
+ 	 * to wait at the end of the transaction.
+ 	 */
+ 	if (ProcMinSyncReports() > 0)
+ 	{
+ 		SpinLockAcquire(&MyProc->mutex);
+ 		MyProc->locked_for_sync = true;
+ 		MyProc->committed_xid = xid;
+ 		MyProc->sync_reports = 0;
+ 		SpinLockRelease(&MyProc->mutex);
+ 	}
+ 
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
  	XLogFlush(gxact->prepare_lsn);
diff -dcrpN pgsql.orig/src/backend/access/transam/xact.c pgsql/src/backend/access/transam/xact.c
*** pgsql.orig/src/backend/access/transam/xact.c	2010-02-26 03:00:34.000000000 +0100
--- pgsql/src/backend/access/transam/xact.c	2010-04-29 14:05:52.000000000 +0200
***************
*** 36,41 ****
--- 36,42 ----
  #include "libpq/be-fsstubs.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "replication/walsender.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/lmgr.h"
*************** RecordTransactionCommit(void)
*** 1012,1017 ****
--- 1013,1038 ----
  		}
  		rdata[lastrdata].next = NULL;
  
+ 		/*
+ 		 * Compute latestXid before we actually COMMIT, so we have a chance
+ 		 * to lock our transaction before the XID notify comes back from
+ 		 * the secondary server(s).
+ 		 */
+ 		latestXid = TransactionIdLatest(xid, nchildren, children);
+ 
+ 		/*
+ 		 * We want synchronous replication here, mark the backend
+ 		 * to wait at the end of the transaction.
+ 		 */
+ 		if (ProcMinSyncReports() > 0)
+ 		{
+ 			SpinLockAcquire(&MyProc->mutex);
+ 			MyProc->locked_for_sync = true;
+ 			MyProc->committed_xid = latestXid;
+ 			MyProc->sync_reports = 0;
+ 			SpinLockRelease(&MyProc->mutex);
+ 		}
+ 
  		(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
  	}
  
*************** RecordTransactionCommit(void)
*** 1080,1088 ****
  		END_CRIT_SECTION();
  	}
  
- 	/* Compute latestXid while we have the child XIDs handy */
- 	latestXid = TransactionIdLatest(xid, nchildren, children);
- 
  	/* Reset XactLastRecEnd until the next transaction writes something */
  	XactLastRecEnd.xrecoff = 0;
  
--- 1101,1106 ----
*************** StartTransaction(void)
*** 1675,1680 ****
--- 1693,1721 ----
  	ShowTransactionState("StartTransaction");
  }
  
+ static void
+ WaitForSynchronousReport(void)
+ {
+ 	if (TransactionIdIsValid(MyProc->committed_xid) && MyProc->locked_for_sync)
+ 	{
+ 		bool	wait;
+ 
+ 		/*
+ 		 * Loop on the pg_usleep() until some walsender unlocks and signals us
+ 		 * or every walsenders has quit.
+ 		 */
+ 		SpinLockAcquire(&MyProc->mutex);
+ 		wait = (MyProc->locked_for_sync && (MyProc->sync_reports < ProcMinSyncReports()));
+ 		SpinLockRelease(&MyProc->mutex);
+ 		while (wait)
+ 		{
+ 			pg_usleep(1000);
+ 			SpinLockAcquire(&MyProc->mutex);
+ 			wait = (MyProc->locked_for_sync && (MyProc->sync_reports < ProcMinSyncReports()));
+ 			SpinLockRelease(&MyProc->mutex);
+ 		}
+ 	}
+ }
  
  /*
   *	CommitTransaction
*************** CommitTransaction(void)
*** 1865,1870 ****
--- 1906,1913 ----
  	s->state = TRANS_DEFAULT;
  
  	RESUME_INTERRUPTS();
+ 
+ 	WaitForSynchronousReport();
  }
  
  
*************** PrepareTransaction(void)
*** 2099,2104 ****
--- 2142,2149 ----
  	s->state = TRANS_DEFAULT;
  
  	RESUME_INTERRUPTS();
+ 
+ 	WaitForSynchronousReport();
  }
  
  
diff -dcrpN pgsql.orig/src/backend/access/transam/xlog.c pgsql/src/backend/access/transam/xlog.c
*** pgsql.orig/src/backend/access/transam/xlog.c	2010-04-29 12:09:01.000000000 +0200
--- pgsql/src/backend/access/transam/xlog.c	2010-04-29 12:17:37.000000000 +0200
*************** static void XLogArchiveNotifySeg(uint32 
*** 538,544 ****
  static bool XLogArchiveCheckDone(const char *xlog);
  static bool XLogArchiveIsBusy(const char *xlog);
  static void XLogArchiveCleanup(const char *xlog);
- static void readRecoveryCommandFile(void);
  static void exitArchiveRecovery(TimeLineID endTLI,
  					uint32 endLogId, uint32 endLogSeg);
  static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
--- 538,543 ----
*************** parseRecoveryCommandFileLine(char *cmdli
*** 5076,5083 ****
   * cater for additional parameters and controls
   * possibly use a flex lexer similar to the GUC one
   */
! static void
! readRecoveryCommandFile(void)
  {
  	FILE	   *fd;
  	char		cmdline[MAXPGPATH];
--- 5075,5082 ----
   * cater for additional parameters and controls
   * possibly use a flex lexer similar to the GUC one
   */
! void
! ReadRecoveryCommandFile(void)
  {
  	FILE	   *fd;
  	char		cmdline[MAXPGPATH];
*************** readRecoveryCommandFile(void)
*** 5217,5222 ****
--- 5216,5230 ----
  					(errmsg("primary_conninfo = '%s'",
  							PrimaryConnInfo)));
  		}
+ 		else if (strcmp(tok1, "synchronous_slave") == 0)
+ 		{
+ 			if (!parse_bool(tok2, &SynchronousSlave))
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 						 errmsg("parameter \"synchronous_slave\" requires a Boolean value")));
+ 			ereport(DEBUG2,
+ 					(errmsg("synchronous_slave = '%s'", tok2)));
+ 		}
  		else if (strcmp(tok1, "trigger_file") == 0)
  		{
  			TriggerFile = pstrdup(tok2);
*************** StartupXLOG(void)
*** 5712,5718 ****
  	 * Check for recovery control file, and if so set up state for offline
  	 * recovery
  	 */
! 	readRecoveryCommandFile();
  
  	/* Now we can determine the list of expected TLIs */
  	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
--- 5720,5726 ----
  	 * Check for recovery control file, and if so set up state for offline
  	 * recovery
  	 */
! 	ReadRecoveryCommandFile();
  
  	/* Now we can determine the list of expected TLIs */
  	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
*************** StartupXLOG(void)
*** 6075,6080 ****
--- 6083,6090 ----
  		{
  			bool		recoveryContinue = true;
  			bool		recoveryApply = true;
+ 			TransactionId	   *xids = NULL;
+ 			int		n_xids, max_xids;
  			ErrorContextCallback errcontext;
  
  			InRedo = true;
*************** StartupXLOG(void)
*** 6083,6088 ****
--- 6093,6106 ----
  					(errmsg("redo starts at %X/%X",
  							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
  
+ 			if (PrimaryConnInfo && SynchronousSlave)
+ 			{
+ #define XIDS_CHUNK	(256)
+ 				max_xids = XIDS_CHUNK;
+ 				n_xids = 0;
+ 				xids = palloc(max_xids * sizeof(TransactionId));
+ 			}
+ 
  			/*
  			 * main redo apply loop
  			 */
*************** StartupXLOG(void)
*** 6155,6160 ****
--- 6173,6229 ----
  
  				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
  
+ 				/*
+ 				 * If we are using streaming replication and are a synchronous slave...
+ 				 */
+ 				if (PrimaryConnInfo && SynchronousSlave)
+ 				{
+ 					int	processed;
+ 					uint8	info = record->xl_info & ~XLR_INFO_MASK;
+ 
+ 					/*
+ 					 * If we have already encountered some XIDs previously,
+ 					 * then try to pass them to the walreceiver first.
+ 					 */
+ 					processed = WalReceiverAddXid(xids, n_xids);
+ 					if (processed > 0)
+ 					{
+ 						int	i;
+ 
+ 						for (i = processed; i < n_xids; i++)
+ 							xids[i - processed] = xids[i];
+ 						n_xids -= processed;
+ 					}
+ 
+ 					/*
+ 					 * If we are seeing a COMMIT record info and we don't
+ 					 * have a backlog, then try to pass it to the walreceiver.
+ 					 * If unsuccessful, keep it locally.
+ 					 */
+ 					if ((record->xl_rmid == RM_XACT_ID) &&
+ 						(info == XLOG_XACT_COMMIT || info == XLOG_XACT_PREPARE) &&
+ 						TransactionIdIsValid(record->xl_xid))
+ 					{
+ 						bool	backlog_append = (n_xids > 0);
+ 
+ 						if (!backlog_append)
+ 						{
+ 							processed = WalReceiverAddXid(&record->xl_xid, 1);
+ 							backlog_append = (processed == 0);
+ 						}
+ 
+ 						if (backlog_append)
+ 						{
+ 							xids[n_xids++] = record->xl_xid;
+ 							if (n_xids >= max_xids)
+ 							{
+ 								max_xids += XIDS_CHUNK;
+ 								xids = repalloc(xids, max_xids * sizeof(TransactionId));
+ 							}
+ 						}
+ 					}
+ 				}
+ 
  				/* Pop the error context stack */
  				error_context_stack = errcontext.previous;
  
*************** StartupXLOG(void)
*** 6171,6176 ****
--- 6240,6249 ----
  				record = ReadRecord(NULL, LOG, false);
  			} while (record != NULL && recoveryContinue);
  
+ 			if (PrimaryConnInfo && SynchronousSlave)
+ 				pfree(xids);
+ #undef XIDS_CHUNK
+ 
  			/*
  			 * end of main redo apply loop
  			 */
diff -dcrpN pgsql.orig/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
*** pgsql.orig/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c	2010-04-21 08:43:42.000000000 +0200
--- pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c	2010-04-29 12:24:11.000000000 +0200
*************** static bool justconnected = false;
*** 47,55 ****
  static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
--- 47,56 ----
  static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, bool *sync);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static bool libpqrcv_send(char *buffer, int len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
*************** _PG_init(void)
*** 68,73 ****
--- 69,75 ----
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
*************** _PG_init(void)
*** 75,81 ****
   * Establish the connection to the primary server for XLOG streaming
   */
  static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  {
  	char		conninfo_repl[MAXCONNINFO + 18];
  	char	   *primary_sysid;
--- 77,83 ----
   * Establish the connection to the primary server for XLOG streaming
   */
  static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, bool *sync)
  {
  	char		conninfo_repl[MAXCONNINFO + 18];
  	char	   *primary_sysid;
*************** libpqrcv_connect(char *conninfo, XLogRec
*** 147,152 ****
--- 149,158 ----
  						primary_tli, standby_tli)));
  	ThisTimeLineID = primary_tli;
  
+ 	/* Tell the server if we are a synchronous client. */
+ 	if (*sync)
+ 		*sync = PQsetDuplexCopy(streamConn);
+ 
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
*************** libpqrcv_receive(int timeout, unsigned c
*** 394,396 ****
--- 400,424 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message back to the primary. The user-visible libpq
+  * knows that we sent an 'x' message to the primary server via
+  * PQsetDuplexCopy(), so it will let us send a COPY IN message
+  * while it's still in COPY OUT state.
+  */
+ static bool
+ libpqrcv_send(char *buffer, int len)
+ {
+ 	int	result;
+ 
+ 	result = PQputCopyData(streamConn, buffer, len);
+ 	if (result < 0)
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to primary: %s",
+ 						PQerrorMessage(streamConn))));
+ 	PQflush(streamConn);
+ 
+ 	/* We don't care about sending in async mode. */
+ 	return (result >= 0);
+ }
diff -dcrpN pgsql.orig/src/backend/replication/walreceiver.c pgsql/src/backend/replication/walreceiver.c
*** pgsql.orig/src/backend/replication/walreceiver.c	2010-04-21 08:43:42.000000000 +0200
--- pgsql/src/backend/replication/walreceiver.c	2010-04-29 12:22:10.000000000 +0200
***************
*** 43,48 ****
--- 43,49 ----
  #include "miscadmin.h"
  #include "replication/walreceiver.h"
  #include "storage/ipc.h"
+ #include "storage/lwlock.h"
  #include "storage/pmsignal.h"
  #include "utils/builtins.h"
  #include "utils/guc.h"
***************
*** 52,61 ****
--- 53,64 ----
  
  /* Global variable to indicate if this process is a walreceiver process */
  bool		am_walreceiver;
+ bool		SynchronousSlave;
  
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
*************** static volatile sig_atomic_t got_SIGHUP 
*** 77,82 ****
--- 80,162 ----
  static volatile sig_atomic_t got_SIGTERM = false;
  
  /*
+  *		WalReceiverAddXid
+  *
+  * Add a new set of XIDs to the array of XIDs in flight, eventually
+  * we need to report them to the primary when they are committed.
+  *
+  * Called by StartupXLOG() with a list of known valid and
+  * to-be-committed XID, so the checks
+  * 1. whether the XID is valid, or
+  * 2. the XID was encountered before
+  * can be omitted.
+  *
+  * Returns the number of XIDs that were added to the array.
+  */
+ int
+ WalReceiverAddXid(TransactionId *xids, int n_xids)
+ {
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	int	i = 0;
+ 
+ 	LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * There shouldn't be more than MaxConnections number of elements.
+ 	 * Stay on the safe side and document that we must have the same
+ 	 * max_connections on both the primary and the secondary servers.
+ 	 */
+ 	if (walrcv->n_xids == MaxConnections)
+ 	{
+ 		LWLockRelease(SynchronousReplicationLock);
+ 		return 0;
+ 	}
+ 
+ 	while ((i < n_xids) && (walrcv->n_xids < MaxConnections))
+ 		walrcv->xids[walrcv->n_xids++] = xids[i++];
+ 
+ 	LWLockRelease(SynchronousReplicationLock);
+ 
+ 	return i;
+ }
+ 
+ /*
+  * Check for completed XIDs and
+  * send a message about them to the primary.
+  */
+ static void
+ WalRcvCheckCompletedXid(void)
+ {
+ 	int			i;
+ 	int			send;
+ 
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 	send = (walrcv->n_xids < MAX_SYNC_XIDS_SEND ? walrcv->n_xids : MAX_SYNC_XIDS_SEND);
+ 
+ 	if (send == 0)
+ 	{
+ 		LWLockRelease(SynchronousReplicationLock);
+ 		return;
+ 	}
+ 
+ 	for (i = 0; i < send; i++)
+ 		walrcv->xids[i] = htonl(walrcv->xids[i]);
+ 
+ 	walrcv_send((char *) walrcv->xids, send * sizeof(TransactionId));
+ 
+ 	for (i = send; i < walrcv->n_xids; i++)
+ 		walrcv->xids[i - send] = walrcv->xids[i];
+ 
+ 	walrcv->n_xids -= send;
+ 
+ 	LWLockRelease(SynchronousReplicationLock);
+ }
+ 
+ /*
   * LogstreamResult indicates the byte positions that we have already
   * written/fsynced.
   */
*************** WalReceiverMain(void)
*** 164,169 ****
--- 244,255 ----
  	am_walreceiver = true;
  
  	/*
+ 	 * Read the recovery.conf file so the parameters are valid
+ 	 * in the walreceiver process, too. Especially synchronous_slave.
+ 	 */
+ 	ReadRecoveryCommandFile();
+ 
+ 	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
  	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
  	 */
*************** WalReceiverMain(void)
*** 245,251 ****
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 331,337 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*************** WalReceiverMain(void)
*** 259,265 ****
  
  	/* Establish the connection to the primary for XLOG streaming */
  	EnableWalRcvImmediateExit();
! 	walrcv_connect(conninfo, startpoint);
  	DisableWalRcvImmediateExit();
  
  	/* Loop until end-of-streaming or error */
--- 345,351 ----
  
  	/* Establish the connection to the primary for XLOG streaming */
  	EnableWalRcvImmediateExit();
! 	walrcv_connect(conninfo, startpoint, &SynchronousSlave);
  	DisableWalRcvImmediateExit();
  
  	/* Loop until end-of-streaming or error */
*************** WalReceiverMain(void)
*** 309,314 ****
--- 395,403 ----
  			 */
  			XLogWalRcvFlush();
  		}
+ 
+ 		if (SynchronousSlave)
+ 			WalRcvCheckCompletedXid();
  	}
  }
  
diff -dcrpN pgsql.orig/src/backend/replication/walreceiverfuncs.c pgsql/src/backend/replication/walreceiverfuncs.c
*** pgsql.orig/src/backend/replication/walreceiverfuncs.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/replication/walreceiverfuncs.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 24,29 ****
--- 24,30 ----
  #include <signal.h>
  
  #include "access/xlog_internal.h"
+ #include "miscadmin.h"
  #include "replication/walreceiver.h"
  #include "storage/fd.h"
  #include "storage/pmsignal.h"
*************** WalRcvShmemSize(void)
*** 45,50 ****
--- 46,60 ----
  	Size		size = 0;
  
  	size = add_size(size, sizeof(WalRcvData));
+ 	/*
+ 	 * Optimally, when the primary and secondary servers are configured
+ 	 * identically, we shouldn't see more than MaxConnections number of
+ 	 * XIDs in flight visible by the slave during synchronous replication.
+ 	 * The "-1" is included in the above sizeof(WalRcvData).
+ 	 * (As we don't live in an optimal world, the StartupXLOG() code keeps
+ 	 * a local backlog of the XIDs not yet passed.)
+ 	 */
+ 	size = add_size(size, (MaxConnections - 1) * sizeof(TransactionId));
  
  	return size;
  }
diff -dcrpN pgsql.orig/src/backend/replication/walsender.c pgsql/src/backend/replication/walsender.c
*** pgsql.orig/src/backend/replication/walsender.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/replication/walsender.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 44,53 ****
--- 44,55 ----
  #include "libpq/pqformat.h"
  #include "libpq/pqsignal.h"
  #include "miscadmin.h"
+ #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
+ #include "storage/procarray.h"
  #include "tcop/tcopprot.h"
  #include "utils/guc.h"
  #include "utils/memutils.h"
*************** bool		am_walsender = false;		/* Am I a w
*** 66,71 ****
--- 68,76 ----
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
  int			WalSndDelay = 200;	/* max sleep time between some actions */
+ int			WalSndMinSyncSlaves = 0;	/* minimum number of clients to report back
+ 										in sync replication */
+ bool			StrictSyncRepl = false;	/* look for ProcSyncReplicationUnlock() for semantics */
  
  #define NAPTIME_PER_CYCLE 100000L	/* max sleep time between cycles (100ms) */
  
*************** static void WalSndHandshake(void);
*** 101,107 ****
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
! static void CheckClosedConnection(void);
  
  /*
   * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
--- 106,112 ----
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
! static void CheckMsgFromStandby(void);
  
  /*
   * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
*************** WalSndHandshake(void)
*** 295,300 ****
--- 300,317 ----
  					break;
  				}
  
+ 			case 'x':
+ 				/*
+ 				 * The other side of the wire indicates that it will send us
+ 				 * the finished XIDs. Send the response as is was an empty query.
+ 				 */
+ 				MyWalSnd->sync = true;
+ 				elog(LOG, "standby server indicated synchronous replication");
+ 
+ 				EndCommand("SYNC", DestRemote);
+ 				ReadyForQuery(DestRemote);
+ 				break;
+ 
  			case 'X':
  				/* standby is closing the connection */
  				proc_exit(0);
*************** WalSndHandshake(void)
*** 315,324 ****
  }
  
  /*
!  * Check if the remote end has closed the connection.
   */
  static void
! CheckClosedConnection(void)
  {
  	unsigned char firstchar;
  	int			r;
--- 332,380 ----
  }
  
  /*
!  * Signal the postmaster to unlock the waiting backends
   */
  static void
! WalSndHandleCompletedXids(void)
! {
! 	int			n_xids;
! 	int			i;
! 	StringInfoData		s;
! 	static TransactionId	   *xids = NULL; /* we won't deallocate this */
! 
! 	/* We allocate only once and keep this for the whole runtime of walsender. */
! 	if (xids == NULL)
! 	{
! 		xids = (TransactionId *) MemoryContextAlloc(TopMemoryContext, MAX_SYNC_XIDS_SEND * sizeof(TransactionId));
! 		if (xids == NULL)
! 			elog(PANIC, "cannot allocate memory for XID receive array");
! 	}
! 
! 	if (pq_getmessage(&s, 4 + MAX_SYNC_XIDS_SEND * sizeof(TransactionId)))
! 		ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("unexpected EOF on standby connection")));
! 
! 	n_xids = s.len / sizeof(TransactionId);
! 	if (s.len % sizeof(TransactionId))
! 		ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("invalid message length on standby connection")));
! 
! 	xids = (TransactionId *) palloc(s.len);
! 
! 	for (i = 0; i < n_xids; i++)
! 		xids[i] = pq_getmsgint(&s, 4);	/* 4 == sizeof(TransactionId) */
! 
! 	ProcSyncReplicationUnlock(xids, n_xids);
! }
! 
! /*
!  * Check if the remote end has sent us a message,
!  * like completed XIDs or closed the connection.
!  */
! static void
! CheckMsgFromStandby(void)
  {
  	unsigned char firstchar;
  	int			r;
*************** CheckClosedConnection(void)
*** 347,352 ****
--- 403,416 ----
  		case 'X':
  			proc_exit(0);
  
+ 			/*
+ 			 * 'd' means the standby is sending us some completed XIDs
+ 			 * in a COPY IN packet.
+ 			 */
+ 		case 'd':
+ 			WalSndHandleCompletedXids();
+ 			break;
+ 
  		default:
  			ereport(FATAL,
  					(errcode(ERRCODE_PROTOCOL_VIOLATION),
*************** WalSndLoop(void)
*** 420,426 ****
  			 * from other processes has arrived.
  			 */
  			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckClosedConnection();
  
  			remain -= NAPTIME_PER_CYCLE;
  		}
--- 484,490 ----
  			 * from other processes has arrived.
  			 */
  			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckMsgFromStandby();
  
  			remain -= NAPTIME_PER_CYCLE;
  		}
*************** InitWalSnd(void)
*** 496,501 ****
--- 560,585 ----
  	on_shmem_exit(WalSndKill, 0);
  }
  
+ /* Count active walsenders */
+ int
+ WalSenderSyncCount(void)
+ {
+ 	int	i;
+ 	int	found = 0;
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		if (walsnd->pid != 0 && walsnd->sync)
+ 			found++;
+ 		SpinLockRelease(&walsnd->mutex);
+ 	}
+ 
+ 	return found;
+ }
+ 
  /* Destroy the per-walsender data structure for this walsender process */
  static void
  WalSndKill(int code, Datum arg)
diff -dcrpN pgsql.orig/src/backend/storage/ipc/procarray.c pgsql/src/backend/storage/ipc/procarray.c
*** pgsql.orig/src/backend/storage/ipc/procarray.c	2010-04-28 09:08:18.000000000 +0200
--- pgsql/src/backend/storage/ipc/procarray.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 51,56 ****
--- 51,57 ----
  #include "access/xact.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
+ #include "replication/walsender.h"
  #include "storage/procarray.h"
  #include "storage/spin.h"
  #include "storage/standby.h"
*************** KnownAssignedXidsDisplay(int trace_level
*** 2966,2968 ****
--- 2967,3054 ----
  
  	pfree(buf.data);
  }
+ 
+ /*
+  * getProcArray - because our diagnostics are in contrib
+  */
+ void *
+ getProcArray(void)
+ {
+ 	return procArray;
+ }
+ 
+ /*
+  * Compute minimum expected reports from synchronous secondary servers
+  */
+ int
+ ProcMinSyncReports(void)
+ {
+ 	int	min_reports;
+ 
+ 	/*
+ 	 * Limit expected minimum reports to the maximum number of walsenders
+ 	 * if the former exceeds the latter.
+ 	 */
+ 	min_reports = WalSndMinSyncSlaves;
+ 	if (min_reports > max_wal_senders)
+ 		min_reports = max_wal_senders;
+ 
+ 	/*
+ 	 * If strict_sync_replication is not set, further limit the expected
+ 	 * minimum number of reports to the actual number of sync walsenders.
+ 	 */
+ 	if (!StrictSyncRepl)
+ 	{
+ 		int	wal_senders = WalSenderSyncCount();
+ 
+ 		if (min_reports > wal_senders)
+ 			min_reports = wal_senders;
+ 	}
+ 
+ 	return min_reports;
+ }
+ 
+ /*
+  * Increase the synchronous report count for the processes with
+  * the XIDs found in the array. Called by the walsender processes.
+  */
+ void
+ ProcSyncReplicationUnlock(TransactionId *xids, int n_xids)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int		i, j;
+ 	int		min_reports = ProcMinSyncReports();
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 
+ 	for (i = 0; i < arrayP->numProcs; i++)
+ 	{
+ 		bool	found = false;
+ 
+ 		SpinLockAcquire(&arrayP->procs[i]->mutex);
+ 		if (arrayP->procs[i]->locked_for_sync &&
+ 			TransactionIdIsValid(arrayP->procs[i]->committed_xid))
+ 		{
+ 			for (j = 0; j < n_xids; j++)
+ 			{
+ 				if (xids[j] == arrayP->procs[i]->committed_xid)
+ 				{
+ 					arrayP->procs[i]->sync_reports++;
+ 					if (arrayP->procs[i]->sync_reports >= min_reports)
+ 					{
+ 						arrayP->procs[i]->locked_for_sync = false;
+ 						arrayP->procs[i]->committed_xid = InvalidTransactionId;
+ 						arrayP->procs[i]->sync_reports = 0;
+ 						
+ 						SendProcSignal(arrayP->procs[i]->pid, PROCSIG_SYNCREP_UNLOCKED, arrayP->procs[i]->backendId);
+ 					}
+ 					found = true;
+ 					break;
+ 				}
+ 			}
+ 		}
+ 		SpinLockRelease(&arrayP->procs[i]->mutex);
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ }
diff -dcrpN pgsql.orig/src/backend/storage/ipc/procsignal.c pgsql/src/backend/storage/ipc/procsignal.c
*** pgsql.orig/src/backend/storage/ipc/procsignal.c	2010-02-26 03:01:00.000000000 +0100
--- pgsql/src/backend/storage/ipc/procsignal.c	2010-04-29 12:17:37.000000000 +0200
*************** procsignal_sigusr1_handler(SIGNAL_ARGS)
*** 278,282 ****
--- 278,287 ----
  	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
  		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
  
+ 	/*
+ 	 * PROCSIG_SYNCREP_UNLOCKED doesn't need any special attention,
+ 	 * it only wakes up the backend from pg_usleep()
+ 	 */
+ 
  	errno = save_errno;
  }
diff -dcrpN pgsql.orig/src/backend/storage/lmgr/proc.c pgsql/src/backend/storage/lmgr/proc.c
*** pgsql.orig/src/backend/storage/lmgr/proc.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/storage/lmgr/proc.c	2010-04-29 12:17:37.000000000 +0200
*************** InitProcess(void)
*** 331,336 ****
--- 331,341 ----
  		SHMQueueInit(&(MyProc->myProcLocks[i]));
  	MyProc->recoveryConflictPending = false;
  
+ 	SpinLockInit(&MyProc->mutex);
+ 	MyProc->locked_for_sync = false;
+ 	MyProc->committed_xid = InvalidTransactionId;
+ 	MyProc->sync_reports = 0;
+ 
  	/*
  	 * We might be reusing a semaphore that belonged to a failed process. So
  	 * be careful and reinitialize its value here.	(This is not strictly
*************** LockWaitCancel(void)
*** 621,626 ****
--- 626,644 ----
  	 */
  }
  
+ /*
+  * Cancel waiting for the synchronous replication.
+  */
+ void
+ SyncWaitCancel(void)
+ {
+ 	/*
+ 	 * Don't bother to take the spinlock, we're called from die().
+ 	 * This backend may be under SpinLockAcquire() because we're
+ 	 * looping on locked_for_sync to become true.
+ 	 */
+ 	MyProc->locked_for_sync = false;
+ }
  
  /*
   * ProcReleaseLocks() -- release locks associated with current transaction
diff -dcrpN pgsql.orig/src/backend/tcop/postgres.c pgsql/src/backend/tcop/postgres.c
*** pgsql.orig/src/backend/tcop/postgres.c	2010-04-20 21:22:20.000000000 +0200
--- pgsql/src/backend/tcop/postgres.c	2010-04-29 12:17:37.000000000 +0200
*************** die(SIGNAL_ARGS)
*** 2670,2675 ****
--- 2670,2676 ----
  			/* bump holdoff count to make ProcessInterrupts() a no-op */
  			/* until we are done getting ready for it */
  			InterruptHoldoffCount++;
+ 			SyncWaitCancel();	/* the process may be still waiting for sync replication */
  			LockWaitCancel();	/* prevent CheckDeadLock from running */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
diff -dcrpN pgsql.orig/src/backend/utils/misc/guc.c pgsql/src/backend/utils/misc/guc.c
*** pgsql.orig/src/backend/utils/misc/guc.c	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/backend/utils/misc/guc.c	2010-04-29 12:17:37.000000000 +0200
*************** static struct config_bool ConfigureNames
*** 1263,1268 ****
--- 1263,1277 ----
  		false, NULL, NULL
  	},
  
+ 	{
+ 		{"strict_sync_replication", PGC_SUSET, WAL_REPLICATION,
+ 			gettext_noop("Enables strict synchronous replication."),
+ 			gettext_noop("Transactions will stall until at least \"min_sync_replication_clients\" number of slaves are connected"),
+ 		},
+ 		&StrictSyncRepl,
+ 		false, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
*************** static struct config_int ConfigureNamesI
*** 1738,1743 ****
--- 1747,1761 ----
  	},
  
  	{
+ 		{"min_sync_replication_clients", PGC_POSTMASTER, WAL_REPLICATION,
+ 			gettext_noop("Sets the minimum number of secondary servers to report back in synchronous replication."),
+ 			NULL
+ 		},
+ 		&WalSndMinSyncSlaves,
+ 		0, 0, INT_MAX / 4, NULL, NULL
+ 	},
+ 
+ 	{
  		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
  			gettext_noop("Sets the delay in microseconds between transaction commit and "
  						 "flushing WAL to disk."),
diff -dcrpN pgsql.orig/src/backend/utils/misc/postgresql.conf.sample pgsql/src/backend/utils/misc/postgresql.conf.sample
*** pgsql.orig/src/backend/utils/misc/postgresql.conf.sample	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/backend/utils/misc/postgresql.conf.sample	2010-04-29 12:17:37.000000000 +0200
***************
*** 195,200 ****
--- 195,210 ----
  #max_wal_senders = 0		# max number of walsender processes
  #wal_sender_delay = 200ms	# 1-10000 milliseconds
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
+ #min_sync_replication_clients = 0
+ 				# min number of walreceiver processes needed
+ 				# to report back for synchronous replication
+ 				# 0 sets async replication; default: 0
+ #strict_sync_replication = off	# strict synchronous replication
+ 				# if the number of sync walsender processes are
+ 				# less than min_sync_replication_clients then
+ 				# the clients will be locked until enough
+ 				# secondary servers connect and report back
+ 				# default: off
  
  
  #------------------------------------------------------------------------------
diff -dcrpN pgsql.orig/src/include/access/xlog.h pgsql/src/include/access/xlog.h
*** pgsql.orig/src/include/access/xlog.h	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/include/access/xlog.h	2010-04-29 12:17:37.000000000 +0200
*************** extern uint64 GetSystemIdentifier(void);
*** 289,294 ****
--- 289,295 ----
  extern Size XLOGShmemSize(void);
  extern void XLOGShmemInit(void);
  extern void BootStrapXLOG(void);
+ extern void ReadRecoveryCommandFile(void);
  extern void StartupXLOG(void);
  extern void ShutdownXLOG(int code, Datum arg);
  extern void InitXLOGAccess(void);
diff -dcrpN pgsql.orig/src/include/replication/walreceiver.h pgsql/src/include/replication/walreceiver.h
*** pgsql.orig/src/include/replication/walreceiver.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/replication/walreceiver.h	2010-04-29 12:22:02.000000000 +0200
***************
*** 16,21 ****
--- 16,22 ----
  #include "storage/spin.h"
  
  extern bool am_walreceiver;
+ extern bool SynchronousSlave;
  
  /*
   * MAXCONNINFO: maximum size of a connection string.
*************** extern bool am_walreceiver;
*** 25,30 ****
--- 26,36 ----
  #define MAXCONNINFO		1024
  
  /*
+  * Maximum number of XIDs sent back at once by walreceiver
+  */
+ #define MAX_SYNC_XIDS_SEND	(256)
+ 
+ /*
   * Values for WalRcv->walRcvState.
   */
  typedef enum
*************** typedef struct
*** 60,77 ****
  	XLogRecPtr	receivedUpto;
  
  	slock_t		mutex;			/* locks shared variables shown above */
  } WalRcvData;
  
  extern WalRcvData *WalRcv;
  
  /* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
  typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
--- 66,89 ----
  	XLogRecPtr	receivedUpto;
  
  	slock_t		mutex;			/* locks shared variables shown above */
+ 
+ 	int32		n_xids;			/* number of currently stored XIDs */
+ 	TransactionId	xids[1];		/* variable number of XIDs to report back to the primary */
  } WalRcvData;
  
  extern WalRcvData *WalRcv;
  
  /* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint, bool *sync);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
  typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef bool (*walrcv_send_type) (char *buffer, int len);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*************** extern bool WalRcvInProgress(void);
*** 83,87 ****
--- 95,100 ----
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
  extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+ extern int WalReceiverAddXid(TransactionId *xids, int n_xids);
  
  #endif   /* _WALRECEIVER_H */
diff -dcrpN pgsql.orig/src/include/replication/walsender.h pgsql/src/include/replication/walsender.h
*** pgsql.orig/src/include/replication/walsender.h	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/include/replication/walsender.h	2010-04-29 12:18:46.000000000 +0200
***************
*** 21,26 ****
--- 21,27 ----
  typedef struct WalSnd
  {
  	pid_t		pid;			/* this walsender's process id, or 0 */
+ 	bool		sync;			/* the walreceiver on the other side is synchronous slave */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
  
  	slock_t		mutex;			/* locks shared variables shown above */
*************** extern bool am_walsender;
*** 40,47 ****
--- 41,51 ----
  /* user-settable parameters */
  extern int	WalSndDelay;
  extern int	max_wal_senders;
+ extern int	WalSndMinSyncSlaves;
+ extern bool	StrictSyncRepl;
  
  extern int	WalSenderMain(void);
+ extern int	WalSenderSyncCount(void);
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
diff -dcrpN pgsql.orig/src/include/storage/lwlock.h pgsql/src/include/storage/lwlock.h
*** pgsql.orig/src/include/storage/lwlock.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/storage/lwlock.h	2010-04-29 12:17:37.000000000 +0200
*************** typedef enum LWLockId
*** 70,75 ****
--- 70,76 ----
  	RelationMappingLock,
  	AsyncCtlLock,
  	AsyncQueueLock,
+ 	SynchronousReplicationLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -dcrpN pgsql.orig/src/include/storage/procarray.h pgsql/src/include/storage/procarray.h
*** pgsql.orig/src/include/storage/procarray.h	2010-01-23 17:37:12.000000000 +0100
--- pgsql/src/include/storage/procarray.h	2010-04-29 12:17:37.000000000 +0200
*************** extern void XidCacheRemoveRunningXids(Tr
*** 71,74 ****
--- 71,78 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
  
+ extern void *getProcArray(void);
+ extern int	ProcMinSyncReports(void);
+ extern void ProcSyncReplicationUnlock(TransactionId *xids, int n_xids);
+ 
  #endif   /* PROCARRAY_H */
diff -dcrpN pgsql.orig/src/include/storage/proc.h pgsql/src/include/storage/proc.h
*** pgsql.orig/src/include/storage/proc.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/storage/proc.h	2010-04-29 12:17:37.000000000 +0200
***************
*** 15,20 ****
--- 15,21 ----
  #define _PROC_H_
  
  #include "storage/lock.h"
+ #include "storage/s_lock.h"
  #include "storage/pg_sema.h"
  #include "utils/timestamp.h"
  
*************** struct PGPROC
*** 123,128 ****
--- 124,135 ----
  	SHM_QUEUE	myProcLocks[NUM_LOCK_PARTITIONS];
  
  	struct XidCache subxids;	/* cache for subtransaction XIDs */
+ 
+ 	slock_t		mutex;		/* protects the shared variables below */
+ 	volatile bool	locked_for_sync;	/* the backend waits for synchronous replication */
+ 	TransactionId	committed_xid;	/* xid saved here after COMMIT,
+ 						we lock on it for synchronous replication */
+ 	int		sync_reports;	/* number of sync replication clients that reported COMMIT */
  };
  
  /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
*************** extern PGPROC *ProcWakeup(PGPROC *proc, 
*** 191,196 ****
--- 198,204 ----
  extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
  extern bool IsWaitingForLock(void);
  extern void LockWaitCancel(void);
+ extern void SyncWaitCancel(void);
  
  extern void ProcWaitForSignal(void);
  extern void ProcSendSignal(int pid);
diff -dcrpN pgsql.orig/src/include/storage/procsignal.h pgsql/src/include/storage/procsignal.h
*** pgsql.orig/src/include/storage/procsignal.h	2010-02-26 03:01:28.000000000 +0100
--- pgsql/src/include/storage/procsignal.h	2010-04-29 12:17:37.000000000 +0200
*************** typedef enum
*** 40,45 ****
--- 40,48 ----
  	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
  	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
  
+ 	/* Synchronous replication reason */
+ 	PROCSIG_SYNCREP_UNLOCKED,
+ 
  	NUM_PROCSIGNALS				/* Must be last! */
  } ProcSignalReason;
  
diff -dcrpN pgsql.orig/src/interfaces/libpq/exports.txt pgsql/src/interfaces/libpq/exports.txt
*** pgsql.orig/src/interfaces/libpq/exports.txt	2010-01-28 07:28:26.000000000 +0100
--- pgsql/src/interfaces/libpq/exports.txt	2010-04-29 12:17:37.000000000 +0200
*************** PQescapeLiteral           154
*** 157,159 ****
--- 157,160 ----
  PQescapeIdentifier        155
  PQconnectdbParams         156
  PQconnectStartParams      157
+ PQsetDuplexCopy           158
diff -dcrpN pgsql.orig/src/interfaces/libpq/fe-exec.c pgsql/src/interfaces/libpq/fe-exec.c
*** pgsql.orig/src/interfaces/libpq/fe-exec.c	2010-02-26 03:01:32.000000000 +0100
--- pgsql/src/interfaces/libpq/fe-exec.c	2010-04-29 12:17:38.000000000 +0200
***************
*** 16,21 ****
--- 16,22 ----
  
  #include <ctype.h>
  #include <fcntl.h>
+ #include <stdio.h>
  
  #include "libpq-fe.h"
  #include "libpq-int.h"
*************** PQputCopyData(PGconn *conn, const char *
*** 2010,2016 ****
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2011,2018 ----
  {
  	if (!conn)
  		return -1;
! 	if (	(!conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_IN) ||
! 		( conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_OUT) )
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
*************** PQgetCopyData(PGconn *conn, char **buffe
*** 2174,2179 ****
--- 2176,2211 ----
  }
  
  /*
+  * PQsetDuplexCopy - set the duplex copy flag
+  *
+  * This makes PQputCopyData() able to pass an arbitrary message while
+  * COPY OUT is in progress. This is for synchronous replication.
+  */
+ int
+ PQsetDuplexCopy(PGconn *conn)
+ {
+ 	PGresult   *res;
+ 
+ 	if (pqPutMsgStart('x', false, conn) < 0 ||
+ 		pqPutMsgEnd(conn) < 0)
+ 		goto sendFailed;
+ 
+ 	conn->asyncStatus = PGASYNC_BUSY;
+ 
+ 	res = PQexecFinish(conn);
+ 
+ 	conn->duplexCopy = (PQresultStatus(res) == PGRES_COMMAND_OK);
+ 
+ 	PQclear(res);
+ 
+ 	return (conn->duplexCopy ? 1 : 0);
+ 
+ sendFailed:
+ 	pqHandleSendFailure(conn);
+ 	return 0;
+ }
+ 
+ /*
   * PQgetline - gets a newline-terminated string from the backend.
   *
   * Chiefly here so that applications can use "COPY <rel> to stdout"
diff -dcrpN pgsql.orig/src/interfaces/libpq/libpq-fe.h pgsql/src/interfaces/libpq/libpq-fe.h
*** pgsql.orig/src/interfaces/libpq/libpq-fe.h	2010-02-26 03:01:33.000000000 +0100
--- pgsql/src/interfaces/libpq/libpq-fe.h	2010-04-29 12:17:38.000000000 +0200
*************** extern PGnotify *PQnotifies(PGconn *conn
*** 391,396 ****
--- 391,397 ----
  extern int	PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
  extern int	PQputCopyEnd(PGconn *conn, const char *errormsg);
  extern int	PQgetCopyData(PGconn *conn, char **buffer, int async);
+ extern int	PQsetDuplexCopy(PGconn *conn);
  
  /* Deprecated routines for copy in/out */
  extern int	PQgetline(PGconn *conn, char *string, int length);
diff -dcrpN pgsql.orig/src/interfaces/libpq/libpq-int.h pgsql/src/interfaces/libpq/libpq-int.h
*** pgsql.orig/src/interfaces/libpq/libpq-int.h	2010-03-13 15:55:57.000000000 +0100
--- pgsql/src/interfaces/libpq/libpq-int.h	2010-04-29 12:17:38.000000000 +0200
*************** struct pg_conn
*** 362,367 ****
--- 362,368 ----
  	pgParameterStatus *pstatus; /* ParameterStatus data */
  	int			client_encoding;	/* encoding id */
  	bool		std_strings;	/* standard_conforming_strings */
+ 	bool		duplexCopy;	/* COPY IN/OUT in duplex mode, for synchronous replication */
  	PGVerbosity verbosity;		/* error/notice message verbosity */
  	PGlobjfuncs *lobjfuncs;		/* private state for large-object access fns */
  
#2Bruce Momjian
bruce@momjian.us
In reply to: Noname (#1)
Re: Synchronous replication patch built on SR

Please add it to the next commit-fest:

https://commitfest.postgresql.org/action/commitfest_view/inprogress

---------------------------------------------------------------------------

zb@cybertec.at wrote:

Resending, my ISP lost my mail yesterday. :-(

===========================================================

Hi,

attached is a patch that does $SUBJECT, we are submitting it for 9.1.
I have updated it to today's CVS after the "wal_level" GUC went in.

How does it work?

First, the walreceiver and the walsender are now able to communicate
in a duplex way on the same connection, so while COPY OUT is
in progress from the primary server, the standby server is able to
issue PQputCopyData() to pass the transaction IDs that were seen
with XLOG_XACT_COMMIT or XLOG_XACT_PREPARE
signatures. I did by adding a new protocol message type, with letter
'x' that's only acknowledged by the walsender process. The regular
backend was intentionally unchanged so an SQL client gets a protocol
error. A new libpq call called PQsetDuplexCopy() which sends this
new message before sending START_REPLICATION. The primary
makes a note of it in the walsender process' entry.

I had to move the TransactionIdLatest(xid, nchildren, children) call
that computes latestXid earlier in RecordTransactionCommit(), so
it's in the critical section now, just before the
XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata)
call. Otherwise, there was a race condition between the primary
and the standby server, where the standby server might have seen
the XLOG_XACT_COMMIT record for some XIDs before the
transaction in the primary server marked itself waiting for this XID,
resulting in stuck transactions.

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

3. synchronous_slave = boolean (in recovery.conf)

this instructs the standby server to tell the primary that it's a
synchronous
replication server and it will send the committed XIDs back to the primary.

I also added a contrib module for monitoring the synchronous replication
but it abuses the procarray.c code by exposing the procArray pointer
which is ugly. It's either need to be abandoned or moved to core if or when
this code is discussed enough. :-)

Best regards,
Zolt?n B?sz?rm?nyi

[ Attachment, skipping... ]

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com

#3Boszormenyi Zoltan
zb@cybertec.at
In reply to: Bruce Momjian (#2)
Re: Synchronous replication patch built on SR

Hi,

Bruce Momjian �rta:

Please add it to the next commit-fest:

https://commitfest.postgresql.org/action/commitfest_view/inprogress

it was already added two days ago:

https://commitfest.postgresql.org/action/patch_view?id=297

Best regards,
Zolt�n B�sz�rm�nyi

--
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
http://www.postgresql.at/