Synchronous replication patch built on SR

Started by Boszormenyi Zoltanover 15 years ago14 messages
#1Boszormenyi Zoltan
zb@cybertec.at
1 attachment(s)

Hi,

attached is a patch that does $SUBJECT, we are submitting it for 9.1.
I have updated it to today's CVS after the "wal_level" GUC went in.

How does it work?

First, the walreceiver and the walsender are now able to communicate
in a duplex way on the same connection, so while COPY OUT is
in progress from the primary server, the standby server is able to
issue PQputCopyData() to pass the transaction IDs that were seen
with XLOG_XACT_COMMIT or XLOG_XACT_PREPARE
signatures. I did by adding a new protocol message type, with letter
'x' that's only acknowledged by the walsender process. The regular
backend was intentionally unchanged so an SQL client gets a protocol
error. A new libpq call called PQsetDuplexCopy() which sends this
new message before sending START_REPLICATION. The primary
makes a note of it in the walsender process' entry.

I had to move the TransactionIdLatest(xid, nchildren, children) call
that computes latestXid earlier in RecordTransactionCommit(), so
it's in the critical section now, just before the
XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata)
call. Otherwise, there was a race condition between the primary
and the standby server, where the standby server might have seen
the XLOG_XACT_COMMIT record for some XIDs before the
transaction in the primary server marked itself waiting for this XID,
resulting in stuck transactions.

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

3. synchronous_slave = boolean (in recovery.conf)

this instructs the standby server to tell the primary that it's a
synchronous
replication server and it will send the committed XIDs back to the primary.

I also added a contrib module for monitoring the synchronous replication
but it abuses the procarray.c code by exposing the procArray pointer
which is ugly. It's either need to be abandoned or moved to core if or when
this code is discussed enough. :-)

Best regards,
Zolt�n B�sz�rm�nyi

--
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
http://www.postgresql.at/

Attachments:

pg91-syncrep-15-ctxdiff.patchtext/x-patch; name=pg91-syncrep-15-ctxdiff.patchDownload
diff -dcrpN pgsql.orig/contrib/Makefile pgsql/contrib/Makefile
*** pgsql.orig/contrib/Makefile	2009-11-18 22:57:56.000000000 +0100
--- pgsql/contrib/Makefile	2010-04-29 12:17:37.000000000 +0200
*************** SUBDIRS = \
*** 37,42 ****
--- 37,43 ----
  		pgstattuple	\
  		seg		\
  		spi		\
+ 		syncreputils	\
  		tablefunc	\
  		test_parser	\
  		tsearch2	\
diff -dcrpN pgsql.orig/contrib/syncreputils/Makefile pgsql/contrib/syncreputils/Makefile
*** pgsql.orig/contrib/syncreputils/Makefile	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/Makefile	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,17 ----
+ # $PostgreSQL: pgsql/contrib/pg_stat_statements/Makefile,v 1.1 2009/01/04 22:19:59 tgl Exp $
+ 
+ MODULE_big = syncreputils
+ DATA_built = syncreputils.sql
+ DATA = uninstall_syncreputils.sql
+ OBJS = syncreputils.o
+ 
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/pg_stat_statements
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.c pgsql/contrib/syncreputils/syncreputils.c
*** pgsql.orig/contrib/syncreputils/syncreputils.c	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,223 ----
+ #include "postgres.h"
+ #include "fmgr.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "replication/walsender.h"
+ #include "replication/walreceiver.h"
+ #include "storage/lwlock.h"
+ #include "storage/proc.h"
+ #include "storage/procarray.h"
+ 
+ #include "syncreputils.h"
+ 
+ PG_MODULE_MAGIC;
+ 
+ PG_FUNCTION_INFO_V1(syncrep_waiting_xact);
+ PG_FUNCTION_INFO_V1(syncrep_queued_xact);
+ 
+ /*
+  *	syncrep_waiting_xact(
+  *		out int4 pid,
+  *		out xid xact,
+  *		out reports int4,
+  *		out max_reports int4)
+  */
+ Datum
+ syncrep_waiting_xact(PG_FUNCTION_ARGS)
+ {
+ 	FuncCallContext	   *funcctx;
+ 	TupleDesc		ret_tupdesc;
+ 	AttInMetadata	   *attinmeta;
+ 	MemoryContext		oldcontext;
+ 	int			call_cntr;
+ 	int			max_calls;
+ 	Datum			result[4];
+ 	bool                    isnull[4] = { false, false, false, false };
+ 	srwx		   *fctx;
+ 
+ 	/* stuff done only on the first call of the function */
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		TupleDesc	tupdesc;
+ 		int		i, found;
+ 		int		min_reports;
+ 		ProcArrayStruct *arrayP = getProcArray();
+ 
+ 		if (RecoveryInProgress())
+ 			elog(ERROR, "this function is only usable on the primary server");
+ 
+ 		if (max_wal_senders == 0)
+ 			elog(ERROR, "this function is useless on a standalone server");
+ 
+ 		if (WalSndMinSyncSlaves == 0)
+ 			elog(ERROR, "this function is useless without synchronous replication");
+ 
+ 		/* create a function context for cross-call persistence */
+ 		funcctx = SRF_FIRSTCALL_INIT();
+ 
+ 		/*
+ 		 * switch to memory context appropriate for multiple function calls
+ 		 */
+ 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ 
+ 		switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ 		{
+ 			case TYPEFUNC_COMPOSITE:
+ 				/* success */
+ 				break;
+ 			case TYPEFUNC_RECORD:
+ 				/* failed to determine actual type of RECORD */
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 						 errmsg("function returning record called in context "
+ 						 	"that cannot accept type record")));
+ 				break;
+ 			default:
+ 				/* result type isn't composite */
+ 				elog(ERROR, "return type must be a row type");
+ 				break;
+ 		}
+ 
+ 		/* make sure we have a persistent copy of the tupdesc */
+ 		tupdesc = CreateTupleDescCopy(tupdesc);
+ 
+ 		/*
+ 		 * Generate attribute metadata needed later to produce tuples from raw
+ 		 * C strings
+ 		 */
+ 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ 		funcctx->attinmeta = attinmeta;
+ 
+ 		/* allocate memory */
+ 		fctx = (srwx *)palloc(MaxConnections * sizeof(srwx));
+ 
+ 		min_reports = ProcMinSyncReports();
+ 
+ 		LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 		for (i = found = 0; i < arrayP->numProcs; i++)
+ 		{
+ 			SpinLockAcquire(&arrayP->procs[i]->mutex);
+ 			if (arrayP->procs[i]->locked_for_sync)
+ 			{
+ 				fctx[found].pid = arrayP->procs[i]->pid;
+ 				fctx[found].xid = arrayP->procs[i]->committed_xid;
+ 				fctx[found].reports = arrayP->procs[i]->sync_reports;
+ 				fctx[found].min_reports = min_reports;
+ 
+ 				found++;
+ 			}
+ 			SpinLockRelease(&arrayP->procs[i]->mutex);
+ 		}
+ 
+ 		LWLockRelease(ProcArrayLock);
+ 
+ 		/* total number of tuples to be returned */
+ 		funcctx->max_calls = found;
+ 
+ 		funcctx->user_fctx = fctx;
+ 
+ 		MemoryContextSwitchTo(oldcontext);
+ 	}
+ 
+ 	funcctx = SRF_PERCALL_SETUP();
+ 
+ 	call_cntr = funcctx->call_cntr;
+ 	max_calls = funcctx->max_calls;
+ 	fctx = funcctx->user_fctx;
+ 
+ 	/* attribute return type and return tuple description */
+ 	attinmeta = funcctx->attinmeta;
+ 	ret_tupdesc = attinmeta->tupdesc;
+ 
+ 	/* are there any records left? */
+ 	if (call_cntr < max_calls)	/* do when there is more left to send */
+ 	{
+ 		HeapTuple	tuple;
+ 
+ 		result[0] = Int32GetDatum(fctx[call_cntr].pid);
+ 		result[1] = TransactionIdGetDatum(fctx[call_cntr].xid);
+ 		result[2] = Int32GetDatum(fctx[call_cntr].reports);
+ 		result[3] = Int32GetDatum(fctx[call_cntr].min_reports);
+ 
+ 		tuple = heap_form_tuple(ret_tupdesc, result, isnull);
+ 
+ 		/* send the result */
+ 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
+ 	}
+ 	else
+ 	{
+ 		/* do when there is no more left */
+ 		SRF_RETURN_DONE(funcctx);
+ 	}
+ }
+ 
+ /*
+  *	syncrep_queued_xact() RETURNS SETOF xid
+  */
+ Datum
+ syncrep_queued_xact(PG_FUNCTION_ARGS)
+ {
+ 	FuncCallContext	   *funcctx;
+ 	MemoryContext		oldcontext;
+ 	int			call_cntr;
+ 	int			max_calls;
+ 	int			i;
+ 	TransactionId	   *fctx;
+ 
+ 	/* stuff done only on the first call of the function */
+ 	if (SRF_IS_FIRSTCALL())
+ 	{
+ 		/* create a function context for cross-call persistence */
+ 		funcctx = SRF_FIRSTCALL_INIT();
+ 
+ 		if (!RecoveryInProgress())
+ 			elog(ERROR, "this function is only usable on a secondary server");
+ 
+ 		if (!WalRcvInProgress())
+ 			elog(ERROR, "this function is only usable on a secondary server");
+ 
+ 		/*
+ 		 * switch to memory context appropriate for multiple function calls
+ 		 */
+ 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ 
+ 		LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 		/* allocate memory */
+ 		fctx = (TransactionId *)palloc(WalRcv->n_xids * sizeof(TransactionId));
+ 
+ 		/* Copy current state */
+ 		for (i = 0; i < WalRcv->n_xids; i++)
+ 			fctx[i] = WalRcv->xids[i];
+ 
+ 		/* total number of tuples to be returned */
+ 		funcctx->max_calls = WalRcv->n_xids;
+ 
+ 		funcctx->user_fctx = fctx;
+ 
+ 		LWLockRelease(SynchronousReplicationLock);
+ 
+ 		MemoryContextSwitchTo(oldcontext);
+ 	}
+ 
+ 	funcctx = SRF_PERCALL_SETUP();
+ 
+ 	call_cntr = funcctx->call_cntr;
+ 	max_calls = funcctx->max_calls;
+ 	fctx = funcctx->user_fctx;
+ 
+ 	/* are there any records left? */
+ 	if (call_cntr < max_calls)	/* do when there is more left to send */
+ 	{
+ 		return TransactionIdGetDatum(fctx[call_cntr]);
+ 	}
+ 	else
+ 	{
+ 		/* do when there is no more left */
+ 		pfree(fctx);
+ 		funcctx->user_fctx = NULL;
+ 
+ 		SRF_RETURN_DONE(funcctx);
+ 	}
+ }
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.h pgsql/contrib/syncreputils/syncreputils.h
*** pgsql.orig/contrib/syncreputils/syncreputils.h	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.h	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,38 ----
+ #ifndef SYNCREPUTILS_H
+ #define SYNCREPUTILS_H
+ 
+ /* Stolen from procarray.c in 9.0 */
+ typedef struct ProcArrayStruct
+ {
+ 	int			numProcs;		/* number of valid procs entries */
+ 	int			maxProcs;		/* allocated size of procs array */
+ 
+ 	int			numKnownAssignedXids;	/* current number of known assigned
+ 										 * xids */
+ 	int			maxKnownAssignedXids;	/* allocated size of known assigned
+ 										 * xids */
+ 
+ 	/*
+ 	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
+ 	 * overflowing cached subxids in PGPROC entries.
+ 	 */
+ 	TransactionId lastOverflowedXid;
+ 
+ 	/*
+ 	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
+ 	 * actually it is maxProcs entries long.
+ 	 */
+ 	PGPROC	   *procs[1];		/* VARIABLE LENGTH ARRAY */
+ } ProcArrayStruct;
+ 
+ typedef struct {
+ 	int4		pid;
+ 	TransactionId	xid;
+ 	int4		reports;
+ 	int4		min_reports;
+ } srwx;
+ 
+ extern Datum syncrep_waiting_xact(PG_FUNCTION_ARGS);
+ extern Datum syncrep_queued_xact(PG_FUNCTION_ARGS);
+ 
+ #endif
diff -dcrpN pgsql.orig/contrib/syncreputils/syncreputils.sql.in pgsql/contrib/syncreputils/syncreputils.sql.in
*** pgsql.orig/contrib/syncreputils/syncreputils.sql.in	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/syncreputils.sql.in	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,9 ----
+ -- Lists the PIDs and XIDs that are waiting for synchronous replication report
+ -- Runnable on the primary server
+ CREATE OR REPLACE FUNCTION syncrep_waiting_xact(out pid int4, out xact xid, out reports int4, out min_reports int4)
+ RETURNS SETOF RECORD AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE;
+ 
+ -- Lists the XIDs that are queued for synchronous replication
+ -- Runnable on the secondary server
+ CREATE OR REPLACE FUNCTION syncrep_queued_xact()
+ RETURNS SETOF xid AS 'MODULE_PATHNAME' LANGUAGE C VOLATILE;
diff -dcrpN pgsql.orig/contrib/syncreputils/uninstall_syncreputils.sql pgsql/contrib/syncreputils/uninstall_syncreputils.sql
*** pgsql.orig/contrib/syncreputils/uninstall_syncreputils.sql	1970-01-01 01:00:00.000000000 +0100
--- pgsql/contrib/syncreputils/uninstall_syncreputils.sql	2010-04-29 12:17:37.000000000 +0200
***************
*** 0 ****
--- 1,2 ----
+ DROP FUNCTION syncrep_waiting_xact(out int4, out xid);
+ DROP FUNCTION syncrep_queued_xact();
diff -dcrpN pgsql.orig/doc/src/sgml/config.sgml pgsql/doc/src/sgml/config.sgml
*** pgsql.orig/doc/src/sgml/config.sgml	2010-04-29 12:08:59.000000000 +0200
--- pgsql/doc/src/sgml/config.sgml	2010-04-29 12:17:37.000000000 +0200
*************** SET ENABLE_SEQSCAN TO OFF;
*** 1913,1918 ****
--- 1913,1955 ----
         </para>
         </listitem>
        </varlistentry>
+ 
+       <varlistentry id="guc-min-sync-replication-clients" xreflabel="min_sync_replication_clients">
+        <term><varname>min_sync_replication_clients</varname> (<type>integer</type>)</term>
+        <indexterm>
+         <primary><varname>min_sync_replication_clients</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies the number of replication clients that need to report back
+         in synchronous replication. COMMITted transactions will hold until
+         enough clients report back. Meaningful values are zero ... <varname>max_wal_senders</>.
+         Zero (the default value) means asynchronous replication.
+         Greater values may mean transactions (and clients) held back longer.
+        </para>
+        </listitem>
+       </varlistentry>
+ 
+       <varlistentry id="guc-strict-sync-replication" xreflabel="strict_sync_replication">
+        <term><varname>strict_sync_replication</varname> (<type>boolean</type>)</term>
+        <indexterm>
+         <primary><varname>strict_sync_replication</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies whether the synchronous replication strictly expects
+         <varname>min_sync_replication_clients</> reports from replication clients
+         or the number of expected reports is limited to the actual number of
+         connected synchronous replication clients. Default value is off
+         meaning that transactions and SQL clients will not be held back
+         if the number of connected synchronous replication clients is
+         smaller than <varname>min_sync_replication_clients</>. Turning it on
+         causes transactions and SQL clients to be held back (stalled) until
+         enough synchronous replication clients connect and report back.
+        </para>
+        </listitem>
+       </varlistentry>
+ 
       </variablelist>
      </sect2>
      <sect2 id="runtime-config-standby">
diff -dcrpN pgsql.orig/doc/src/sgml/protocol.sgml pgsql/doc/src/sgml/protocol.sgml
*** pgsql.orig/doc/src/sgml/protocol.sgml	2010-04-03 09:22:55.000000000 +0200
--- pgsql/doc/src/sgml/protocol.sgml	2010-04-29 12:17:37.000000000 +0200
*************** Terminate (F)
*** 3953,3958 ****
--- 3953,3996 ----
  </varlistentry>
  
  
+ <varlistentry>
+ <term>
+ Set Duplex Copy (F)
+ </term>
+ <listitem>
+ <para>
+ 
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('x')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as an indicator that the frontend is
+                 a synchronous replication client. Only used between the primary
+                 and the standby servers during replication handshake. Regular
+                 frontends receive a protocol error.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32(4)
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ 
+ </para>
+ </listitem>
+ </varlistentry>
+ 
+ 
  </variablelist>
  
  </sect1>
diff -dcrpN pgsql.orig/doc/src/sgml/recovery-config.sgml pgsql/doc/src/sgml/recovery-config.sgml
*** pgsql.orig/doc/src/sgml/recovery-config.sgml	2010-04-29 12:09:00.000000000 +0200
--- pgsql/doc/src/sgml/recovery-config.sgml	2010-04-29 12:17:37.000000000 +0200
*************** restore_command = 'copy "C:\\server\\arc
*** 279,284 ****
--- 279,297 ----
           </para>
          </listitem>
         </varlistentry>
+        <varlistentry id="synchronous-slave" xreflabel="synchronous_slave">
+         <term><varname>synchronous_slave</varname> (<type>boolean</type>)</term>
+         <listitem>
+          <para>
+           Specifies whether this replication clients acts as a synchronous
+           replication client. This means that upon replaying the WAL stream
+           the standby server is looking for WAL records with COMMIT or
+           PREPARE TRANSACTION signature. Transaction IDs of such WAL records
+           are sent back to the primary server. Default value is off, meaning
+           that the replication client acts asynchronously.
+          </para>
+         </listitem>
+        </varlistentry>
         <varlistentry id="trigger-file" xreflabel="trigger_file">
          <term><varname>trigger_file</varname> (<type>string</type>)</term>
          <indexterm>
diff -dcrpN pgsql.orig/src/backend/access/transam/twophase.c pgsql/src/backend/access/transam/twophase.c
*** pgsql.orig/src/backend/access/transam/twophase.c	2010-04-28 09:08:17.000000000 +0200
--- pgsql/src/backend/access/transam/twophase.c	2010-04-29 14:07:40.000000000 +0200
***************
*** 58,63 ****
--- 58,64 ----
  #include "storage/fd.h"
  #include "storage/procarray.h"
  #include "storage/sinvaladt.h"
+ #include "storage/spin.h"
  #include "storage/smgr.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
*************** EndPrepare(GlobalTransaction gxact)
*** 1019,1024 ****
--- 1020,1038 ----
  
  	MyProc->inCommit = true;
  
+ 	/*
+ 	 * We want synchronous replication here, mark the backend
+ 	 * to wait at the end of the transaction.
+ 	 */
+ 	if (ProcMinSyncReports() > 0)
+ 	{
+ 		SpinLockAcquire(&MyProc->mutex);
+ 		MyProc->locked_for_sync = true;
+ 		MyProc->committed_xid = xid;
+ 		MyProc->sync_reports = 0;
+ 		SpinLockRelease(&MyProc->mutex);
+ 	}
+ 
  	gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
  									records.head);
  	XLogFlush(gxact->prepare_lsn);
diff -dcrpN pgsql.orig/src/backend/access/transam/xact.c pgsql/src/backend/access/transam/xact.c
*** pgsql.orig/src/backend/access/transam/xact.c	2010-02-26 03:00:34.000000000 +0100
--- pgsql/src/backend/access/transam/xact.c	2010-04-29 14:05:52.000000000 +0200
***************
*** 36,41 ****
--- 36,42 ----
  #include "libpq/be-fsstubs.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "replication/walsender.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/lmgr.h"
*************** RecordTransactionCommit(void)
*** 1012,1017 ****
--- 1013,1038 ----
  		}
  		rdata[lastrdata].next = NULL;
  
+ 		/*
+ 		 * Compute latestXid before we actually COMMIT, so we have a chance
+ 		 * to lock our transaction before the XID notify comes back from
+ 		 * the secondary server(s).
+ 		 */
+ 		latestXid = TransactionIdLatest(xid, nchildren, children);
+ 
+ 		/*
+ 		 * We want synchronous replication here, mark the backend
+ 		 * to wait at the end of the transaction.
+ 		 */
+ 		if (ProcMinSyncReports() > 0)
+ 		{
+ 			SpinLockAcquire(&MyProc->mutex);
+ 			MyProc->locked_for_sync = true;
+ 			MyProc->committed_xid = latestXid;
+ 			MyProc->sync_reports = 0;
+ 			SpinLockRelease(&MyProc->mutex);
+ 		}
+ 
  		(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
  	}
  
*************** RecordTransactionCommit(void)
*** 1080,1088 ****
  		END_CRIT_SECTION();
  	}
  
- 	/* Compute latestXid while we have the child XIDs handy */
- 	latestXid = TransactionIdLatest(xid, nchildren, children);
- 
  	/* Reset XactLastRecEnd until the next transaction writes something */
  	XactLastRecEnd.xrecoff = 0;
  
--- 1101,1106 ----
*************** StartTransaction(void)
*** 1675,1680 ****
--- 1693,1721 ----
  	ShowTransactionState("StartTransaction");
  }
  
+ static void
+ WaitForSynchronousReport(void)
+ {
+ 	if (TransactionIdIsValid(MyProc->committed_xid) && MyProc->locked_for_sync)
+ 	{
+ 		bool	wait;
+ 
+ 		/*
+ 		 * Loop on the pg_usleep() until some walsender unlocks and signals us
+ 		 * or every walsenders has quit.
+ 		 */
+ 		SpinLockAcquire(&MyProc->mutex);
+ 		wait = (MyProc->locked_for_sync && (MyProc->sync_reports < ProcMinSyncReports()));
+ 		SpinLockRelease(&MyProc->mutex);
+ 		while (wait)
+ 		{
+ 			pg_usleep(1000);
+ 			SpinLockAcquire(&MyProc->mutex);
+ 			wait = (MyProc->locked_for_sync && (MyProc->sync_reports < ProcMinSyncReports()));
+ 			SpinLockRelease(&MyProc->mutex);
+ 		}
+ 	}
+ }
  
  /*
   *	CommitTransaction
*************** CommitTransaction(void)
*** 1865,1870 ****
--- 1906,1913 ----
  	s->state = TRANS_DEFAULT;
  
  	RESUME_INTERRUPTS();
+ 
+ 	WaitForSynchronousReport();
  }
  
  
*************** PrepareTransaction(void)
*** 2099,2104 ****
--- 2142,2149 ----
  	s->state = TRANS_DEFAULT;
  
  	RESUME_INTERRUPTS();
+ 
+ 	WaitForSynchronousReport();
  }
  
  
diff -dcrpN pgsql.orig/src/backend/access/transam/xlog.c pgsql/src/backend/access/transam/xlog.c
*** pgsql.orig/src/backend/access/transam/xlog.c	2010-04-29 12:09:01.000000000 +0200
--- pgsql/src/backend/access/transam/xlog.c	2010-04-29 12:17:37.000000000 +0200
*************** static void XLogArchiveNotifySeg(uint32 
*** 538,544 ****
  static bool XLogArchiveCheckDone(const char *xlog);
  static bool XLogArchiveIsBusy(const char *xlog);
  static void XLogArchiveCleanup(const char *xlog);
- static void readRecoveryCommandFile(void);
  static void exitArchiveRecovery(TimeLineID endTLI,
  					uint32 endLogId, uint32 endLogSeg);
  static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
--- 538,543 ----
*************** parseRecoveryCommandFileLine(char *cmdli
*** 5076,5083 ****
   * cater for additional parameters and controls
   * possibly use a flex lexer similar to the GUC one
   */
! static void
! readRecoveryCommandFile(void)
  {
  	FILE	   *fd;
  	char		cmdline[MAXPGPATH];
--- 5075,5082 ----
   * cater for additional parameters and controls
   * possibly use a flex lexer similar to the GUC one
   */
! void
! ReadRecoveryCommandFile(void)
  {
  	FILE	   *fd;
  	char		cmdline[MAXPGPATH];
*************** readRecoveryCommandFile(void)
*** 5217,5222 ****
--- 5216,5230 ----
  					(errmsg("primary_conninfo = '%s'",
  							PrimaryConnInfo)));
  		}
+ 		else if (strcmp(tok1, "synchronous_slave") == 0)
+ 		{
+ 			if (!parse_bool(tok2, &SynchronousSlave))
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 						 errmsg("parameter \"synchronous_slave\" requires a Boolean value")));
+ 			ereport(DEBUG2,
+ 					(errmsg("synchronous_slave = '%s'", tok2)));
+ 		}
  		else if (strcmp(tok1, "trigger_file") == 0)
  		{
  			TriggerFile = pstrdup(tok2);
*************** StartupXLOG(void)
*** 5712,5718 ****
  	 * Check for recovery control file, and if so set up state for offline
  	 * recovery
  	 */
! 	readRecoveryCommandFile();
  
  	/* Now we can determine the list of expected TLIs */
  	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
--- 5720,5726 ----
  	 * Check for recovery control file, and if so set up state for offline
  	 * recovery
  	 */
! 	ReadRecoveryCommandFile();
  
  	/* Now we can determine the list of expected TLIs */
  	expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
*************** StartupXLOG(void)
*** 6075,6080 ****
--- 6083,6090 ----
  		{
  			bool		recoveryContinue = true;
  			bool		recoveryApply = true;
+ 			TransactionId	   *xids = NULL;
+ 			int		n_xids, max_xids;
  			ErrorContextCallback errcontext;
  
  			InRedo = true;
*************** StartupXLOG(void)
*** 6083,6088 ****
--- 6093,6106 ----
  					(errmsg("redo starts at %X/%X",
  							ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
  
+ 			if (PrimaryConnInfo && SynchronousSlave)
+ 			{
+ #define XIDS_CHUNK	(256)
+ 				max_xids = XIDS_CHUNK;
+ 				n_xids = 0;
+ 				xids = palloc(max_xids * sizeof(TransactionId));
+ 			}
+ 
  			/*
  			 * main redo apply loop
  			 */
*************** StartupXLOG(void)
*** 6155,6160 ****
--- 6173,6229 ----
  
  				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
  
+ 				/*
+ 				 * If we are using streaming replication and are a synchronous slave...
+ 				 */
+ 				if (PrimaryConnInfo && SynchronousSlave)
+ 				{
+ 					int	processed;
+ 					uint8	info = record->xl_info & ~XLR_INFO_MASK;
+ 
+ 					/*
+ 					 * If we have already encountered some XIDs previously,
+ 					 * then try to pass them to the walreceiver first.
+ 					 */
+ 					processed = WalReceiverAddXid(xids, n_xids);
+ 					if (processed > 0)
+ 					{
+ 						int	i;
+ 
+ 						for (i = processed; i < n_xids; i++)
+ 							xids[i - processed] = xids[i];
+ 						n_xids -= processed;
+ 					}
+ 
+ 					/*
+ 					 * If we are seeing a COMMIT record info and we don't
+ 					 * have a backlog, then try to pass it to the walreceiver.
+ 					 * If unsuccessful, keep it locally.
+ 					 */
+ 					if ((record->xl_rmid == RM_XACT_ID) &&
+ 						(info == XLOG_XACT_COMMIT || info == XLOG_XACT_PREPARE) &&
+ 						TransactionIdIsValid(record->xl_xid))
+ 					{
+ 						bool	backlog_append = (n_xids > 0);
+ 
+ 						if (!backlog_append)
+ 						{
+ 							processed = WalReceiverAddXid(&record->xl_xid, 1);
+ 							backlog_append = (processed == 0);
+ 						}
+ 
+ 						if (backlog_append)
+ 						{
+ 							xids[n_xids++] = record->xl_xid;
+ 							if (n_xids >= max_xids)
+ 							{
+ 								max_xids += XIDS_CHUNK;
+ 								xids = repalloc(xids, max_xids * sizeof(TransactionId));
+ 							}
+ 						}
+ 					}
+ 				}
+ 
  				/* Pop the error context stack */
  				error_context_stack = errcontext.previous;
  
*************** StartupXLOG(void)
*** 6171,6176 ****
--- 6240,6249 ----
  				record = ReadRecord(NULL, LOG, false);
  			} while (record != NULL && recoveryContinue);
  
+ 			if (PrimaryConnInfo && SynchronousSlave)
+ 				pfree(xids);
+ #undef XIDS_CHUNK
+ 
  			/*
  			 * end of main redo apply loop
  			 */
diff -dcrpN pgsql.orig/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
*** pgsql.orig/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c	2010-04-21 08:43:42.000000000 +0200
--- pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c	2010-04-29 12:24:11.000000000 +0200
*************** static bool justconnected = false;
*** 47,55 ****
  static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
--- 47,56 ----
  static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, bool *sync);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static bool libpqrcv_send(char *buffer, int len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
*************** _PG_init(void)
*** 68,73 ****
--- 69,75 ----
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
*************** _PG_init(void)
*** 75,81 ****
   * Establish the connection to the primary server for XLOG streaming
   */
  static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  {
  	char		conninfo_repl[MAXCONNINFO + 18];
  	char	   *primary_sysid;
--- 77,83 ----
   * Establish the connection to the primary server for XLOG streaming
   */
  static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, bool *sync)
  {
  	char		conninfo_repl[MAXCONNINFO + 18];
  	char	   *primary_sysid;
*************** libpqrcv_connect(char *conninfo, XLogRec
*** 147,152 ****
--- 149,158 ----
  						primary_tli, standby_tli)));
  	ThisTimeLineID = primary_tli;
  
+ 	/* Tell the server if we are a synchronous client. */
+ 	if (*sync)
+ 		*sync = PQsetDuplexCopy(streamConn);
+ 
  	/* Start streaming from the point requested by startup process */
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
*************** libpqrcv_receive(int timeout, unsigned c
*** 394,396 ****
--- 400,424 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message back to the primary. The user-visible libpq
+  * knows that we sent an 'x' message to the primary server via
+  * PQsetDuplexCopy(), so it will let us send a COPY IN message
+  * while it's still in COPY OUT state.
+  */
+ static bool
+ libpqrcv_send(char *buffer, int len)
+ {
+ 	int	result;
+ 
+ 	result = PQputCopyData(streamConn, buffer, len);
+ 	if (result < 0)
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to primary: %s",
+ 						PQerrorMessage(streamConn))));
+ 	PQflush(streamConn);
+ 
+ 	/* We don't care about sending in async mode. */
+ 	return (result >= 0);
+ }
diff -dcrpN pgsql.orig/src/backend/replication/walreceiver.c pgsql/src/backend/replication/walreceiver.c
*** pgsql.orig/src/backend/replication/walreceiver.c	2010-04-21 08:43:42.000000000 +0200
--- pgsql/src/backend/replication/walreceiver.c	2010-04-29 12:22:10.000000000 +0200
***************
*** 43,48 ****
--- 43,49 ----
  #include "miscadmin.h"
  #include "replication/walreceiver.h"
  #include "storage/ipc.h"
+ #include "storage/lwlock.h"
  #include "storage/pmsignal.h"
  #include "utils/builtins.h"
  #include "utils/guc.h"
***************
*** 52,61 ****
--- 53,64 ----
  
  /* Global variable to indicate if this process is a walreceiver process */
  bool		am_walreceiver;
+ bool		SynchronousSlave;
  
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
*************** static volatile sig_atomic_t got_SIGHUP 
*** 77,82 ****
--- 80,162 ----
  static volatile sig_atomic_t got_SIGTERM = false;
  
  /*
+  *		WalReceiverAddXid
+  *
+  * Add a new set of XIDs to the array of XIDs in flight, eventually
+  * we need to report them to the primary when they are committed.
+  *
+  * Called by StartupXLOG() with a list of known valid and
+  * to-be-committed XID, so the checks
+  * 1. whether the XID is valid, or
+  * 2. the XID was encountered before
+  * can be omitted.
+  *
+  * Returns the number of XIDs that were added to the array.
+  */
+ int
+ WalReceiverAddXid(TransactionId *xids, int n_xids)
+ {
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	int	i = 0;
+ 
+ 	LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * There shouldn't be more than MaxConnections number of elements.
+ 	 * Stay on the safe side and document that we must have the same
+ 	 * max_connections on both the primary and the secondary servers.
+ 	 */
+ 	if (walrcv->n_xids == MaxConnections)
+ 	{
+ 		LWLockRelease(SynchronousReplicationLock);
+ 		return 0;
+ 	}
+ 
+ 	while ((i < n_xids) && (walrcv->n_xids < MaxConnections))
+ 		walrcv->xids[walrcv->n_xids++] = xids[i++];
+ 
+ 	LWLockRelease(SynchronousReplicationLock);
+ 
+ 	return i;
+ }
+ 
+ /*
+  * Check for completed XIDs and
+  * send a message about them to the primary.
+  */
+ static void
+ WalRcvCheckCompletedXid(void)
+ {
+ 	int			i;
+ 	int			send;
+ 
+ 	volatile WalRcvData *walrcv = WalRcv;
+ 
+ 	LWLockAcquire(SynchronousReplicationLock, LW_EXCLUSIVE);
+ 
+ 	send = (walrcv->n_xids < MAX_SYNC_XIDS_SEND ? walrcv->n_xids : MAX_SYNC_XIDS_SEND);
+ 
+ 	if (send == 0)
+ 	{
+ 		LWLockRelease(SynchronousReplicationLock);
+ 		return;
+ 	}
+ 
+ 	for (i = 0; i < send; i++)
+ 		walrcv->xids[i] = htonl(walrcv->xids[i]);
+ 
+ 	walrcv_send((char *) walrcv->xids, send * sizeof(TransactionId));
+ 
+ 	for (i = send; i < walrcv->n_xids; i++)
+ 		walrcv->xids[i - send] = walrcv->xids[i];
+ 
+ 	walrcv->n_xids -= send;
+ 
+ 	LWLockRelease(SynchronousReplicationLock);
+ }
+ 
+ /*
   * LogstreamResult indicates the byte positions that we have already
   * written/fsynced.
   */
*************** WalReceiverMain(void)
*** 164,169 ****
--- 244,255 ----
  	am_walreceiver = true;
  
  	/*
+ 	 * Read the recovery.conf file so the parameters are valid
+ 	 * in the walreceiver process, too. Especially synchronous_slave.
+ 	 */
+ 	ReadRecoveryCommandFile();
+ 
+ 	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
  	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
  	 */
*************** WalReceiverMain(void)
*** 245,251 ****
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 331,337 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*************** WalReceiverMain(void)
*** 259,265 ****
  
  	/* Establish the connection to the primary for XLOG streaming */
  	EnableWalRcvImmediateExit();
! 	walrcv_connect(conninfo, startpoint);
  	DisableWalRcvImmediateExit();
  
  	/* Loop until end-of-streaming or error */
--- 345,351 ----
  
  	/* Establish the connection to the primary for XLOG streaming */
  	EnableWalRcvImmediateExit();
! 	walrcv_connect(conninfo, startpoint, &SynchronousSlave);
  	DisableWalRcvImmediateExit();
  
  	/* Loop until end-of-streaming or error */
*************** WalReceiverMain(void)
*** 309,314 ****
--- 395,403 ----
  			 */
  			XLogWalRcvFlush();
  		}
+ 
+ 		if (SynchronousSlave)
+ 			WalRcvCheckCompletedXid();
  	}
  }
  
diff -dcrpN pgsql.orig/src/backend/replication/walreceiverfuncs.c pgsql/src/backend/replication/walreceiverfuncs.c
*** pgsql.orig/src/backend/replication/walreceiverfuncs.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/replication/walreceiverfuncs.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 24,29 ****
--- 24,30 ----
  #include <signal.h>
  
  #include "access/xlog_internal.h"
+ #include "miscadmin.h"
  #include "replication/walreceiver.h"
  #include "storage/fd.h"
  #include "storage/pmsignal.h"
*************** WalRcvShmemSize(void)
*** 45,50 ****
--- 46,60 ----
  	Size		size = 0;
  
  	size = add_size(size, sizeof(WalRcvData));
+ 	/*
+ 	 * Optimally, when the primary and secondary servers are configured
+ 	 * identically, we shouldn't see more than MaxConnections number of
+ 	 * XIDs in flight visible by the slave during synchronous replication.
+ 	 * The "-1" is included in the above sizeof(WalRcvData).
+ 	 * (As we don't live in an optimal world, the StartupXLOG() code keeps
+ 	 * a local backlog of the XIDs not yet passed.)
+ 	 */
+ 	size = add_size(size, (MaxConnections - 1) * sizeof(TransactionId));
  
  	return size;
  }
diff -dcrpN pgsql.orig/src/backend/replication/walsender.c pgsql/src/backend/replication/walsender.c
*** pgsql.orig/src/backend/replication/walsender.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/replication/walsender.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 44,53 ****
--- 44,55 ----
  #include "libpq/pqformat.h"
  #include "libpq/pqsignal.h"
  #include "miscadmin.h"
+ #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
+ #include "storage/procarray.h"
  #include "tcop/tcopprot.h"
  #include "utils/guc.h"
  #include "utils/memutils.h"
*************** bool		am_walsender = false;		/* Am I a w
*** 66,71 ****
--- 68,76 ----
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
  int			WalSndDelay = 200;	/* max sleep time between some actions */
+ int			WalSndMinSyncSlaves = 0;	/* minimum number of clients to report back
+ 										in sync replication */
+ bool			StrictSyncRepl = false;	/* look for ProcSyncReplicationUnlock() for semantics */
  
  #define NAPTIME_PER_CYCLE 100000L	/* max sleep time between cycles (100ms) */
  
*************** static void WalSndHandshake(void);
*** 101,107 ****
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
! static void CheckClosedConnection(void);
  
  /*
   * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
--- 106,112 ----
  static void WalSndKill(int code, Datum arg);
  static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  static bool XLogSend(StringInfo outMsg);
! static void CheckMsgFromStandby(void);
  
  /*
   * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
*************** WalSndHandshake(void)
*** 295,300 ****
--- 300,317 ----
  					break;
  				}
  
+ 			case 'x':
+ 				/*
+ 				 * The other side of the wire indicates that it will send us
+ 				 * the finished XIDs. Send the response as is was an empty query.
+ 				 */
+ 				MyWalSnd->sync = true;
+ 				elog(LOG, "standby server indicated synchronous replication");
+ 
+ 				EndCommand("SYNC", DestRemote);
+ 				ReadyForQuery(DestRemote);
+ 				break;
+ 
  			case 'X':
  				/* standby is closing the connection */
  				proc_exit(0);
*************** WalSndHandshake(void)
*** 315,324 ****
  }
  
  /*
!  * Check if the remote end has closed the connection.
   */
  static void
! CheckClosedConnection(void)
  {
  	unsigned char firstchar;
  	int			r;
--- 332,380 ----
  }
  
  /*
!  * Signal the postmaster to unlock the waiting backends
   */
  static void
! WalSndHandleCompletedXids(void)
! {
! 	int			n_xids;
! 	int			i;
! 	StringInfoData		s;
! 	static TransactionId	   *xids = NULL; /* we won't deallocate this */
! 
! 	/* We allocate only once and keep this for the whole runtime of walsender. */
! 	if (xids == NULL)
! 	{
! 		xids = (TransactionId *) MemoryContextAlloc(TopMemoryContext, MAX_SYNC_XIDS_SEND * sizeof(TransactionId));
! 		if (xids == NULL)
! 			elog(PANIC, "cannot allocate memory for XID receive array");
! 	}
! 
! 	if (pq_getmessage(&s, 4 + MAX_SYNC_XIDS_SEND * sizeof(TransactionId)))
! 		ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("unexpected EOF on standby connection")));
! 
! 	n_xids = s.len / sizeof(TransactionId);
! 	if (s.len % sizeof(TransactionId))
! 		ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("invalid message length on standby connection")));
! 
! 	xids = (TransactionId *) palloc(s.len);
! 
! 	for (i = 0; i < n_xids; i++)
! 		xids[i] = pq_getmsgint(&s, 4);	/* 4 == sizeof(TransactionId) */
! 
! 	ProcSyncReplicationUnlock(xids, n_xids);
! }
! 
! /*
!  * Check if the remote end has sent us a message,
!  * like completed XIDs or closed the connection.
!  */
! static void
! CheckMsgFromStandby(void)
  {
  	unsigned char firstchar;
  	int			r;
*************** CheckClosedConnection(void)
*** 347,352 ****
--- 403,416 ----
  		case 'X':
  			proc_exit(0);
  
+ 			/*
+ 			 * 'd' means the standby is sending us some completed XIDs
+ 			 * in a COPY IN packet.
+ 			 */
+ 		case 'd':
+ 			WalSndHandleCompletedXids();
+ 			break;
+ 
  		default:
  			ereport(FATAL,
  					(errcode(ERRCODE_PROTOCOL_VIOLATION),
*************** WalSndLoop(void)
*** 420,426 ****
  			 * from other processes has arrived.
  			 */
  			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckClosedConnection();
  
  			remain -= NAPTIME_PER_CYCLE;
  		}
--- 484,490 ----
  			 * from other processes has arrived.
  			 */
  			pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! 			CheckMsgFromStandby();
  
  			remain -= NAPTIME_PER_CYCLE;
  		}
*************** InitWalSnd(void)
*** 496,501 ****
--- 560,585 ----
  	on_shmem_exit(WalSndKill, 0);
  }
  
+ /* Count active walsenders */
+ int
+ WalSenderSyncCount(void)
+ {
+ 	int	i;
+ 	int	found = 0;
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		if (walsnd->pid != 0 && walsnd->sync)
+ 			found++;
+ 		SpinLockRelease(&walsnd->mutex);
+ 	}
+ 
+ 	return found;
+ }
+ 
  /* Destroy the per-walsender data structure for this walsender process */
  static void
  WalSndKill(int code, Datum arg)
diff -dcrpN pgsql.orig/src/backend/storage/ipc/procarray.c pgsql/src/backend/storage/ipc/procarray.c
*** pgsql.orig/src/backend/storage/ipc/procarray.c	2010-04-28 09:08:18.000000000 +0200
--- pgsql/src/backend/storage/ipc/procarray.c	2010-04-29 12:17:37.000000000 +0200
***************
*** 51,56 ****
--- 51,57 ----
  #include "access/xact.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
+ #include "replication/walsender.h"
  #include "storage/procarray.h"
  #include "storage/spin.h"
  #include "storage/standby.h"
*************** KnownAssignedXidsDisplay(int trace_level
*** 2966,2968 ****
--- 2967,3054 ----
  
  	pfree(buf.data);
  }
+ 
+ /*
+  * getProcArray - because our diagnostics are in contrib
+  */
+ void *
+ getProcArray(void)
+ {
+ 	return procArray;
+ }
+ 
+ /*
+  * Compute minimum expected reports from synchronous secondary servers
+  */
+ int
+ ProcMinSyncReports(void)
+ {
+ 	int	min_reports;
+ 
+ 	/*
+ 	 * Limit expected minimum reports to the maximum number of walsenders
+ 	 * if the former exceeds the latter.
+ 	 */
+ 	min_reports = WalSndMinSyncSlaves;
+ 	if (min_reports > max_wal_senders)
+ 		min_reports = max_wal_senders;
+ 
+ 	/*
+ 	 * If strict_sync_replication is not set, further limit the expected
+ 	 * minimum number of reports to the actual number of sync walsenders.
+ 	 */
+ 	if (!StrictSyncRepl)
+ 	{
+ 		int	wal_senders = WalSenderSyncCount();
+ 
+ 		if (min_reports > wal_senders)
+ 			min_reports = wal_senders;
+ 	}
+ 
+ 	return min_reports;
+ }
+ 
+ /*
+  * Increase the synchronous report count for the processes with
+  * the XIDs found in the array. Called by the walsender processes.
+  */
+ void
+ ProcSyncReplicationUnlock(TransactionId *xids, int n_xids)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int		i, j;
+ 	int		min_reports = ProcMinSyncReports();
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ 
+ 	for (i = 0; i < arrayP->numProcs; i++)
+ 	{
+ 		bool	found = false;
+ 
+ 		SpinLockAcquire(&arrayP->procs[i]->mutex);
+ 		if (arrayP->procs[i]->locked_for_sync &&
+ 			TransactionIdIsValid(arrayP->procs[i]->committed_xid))
+ 		{
+ 			for (j = 0; j < n_xids; j++)
+ 			{
+ 				if (xids[j] == arrayP->procs[i]->committed_xid)
+ 				{
+ 					arrayP->procs[i]->sync_reports++;
+ 					if (arrayP->procs[i]->sync_reports >= min_reports)
+ 					{
+ 						arrayP->procs[i]->locked_for_sync = false;
+ 						arrayP->procs[i]->committed_xid = InvalidTransactionId;
+ 						arrayP->procs[i]->sync_reports = 0;
+ 						
+ 						SendProcSignal(arrayP->procs[i]->pid, PROCSIG_SYNCREP_UNLOCKED, arrayP->procs[i]->backendId);
+ 					}
+ 					found = true;
+ 					break;
+ 				}
+ 			}
+ 		}
+ 		SpinLockRelease(&arrayP->procs[i]->mutex);
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ }
diff -dcrpN pgsql.orig/src/backend/storage/ipc/procsignal.c pgsql/src/backend/storage/ipc/procsignal.c
*** pgsql.orig/src/backend/storage/ipc/procsignal.c	2010-02-26 03:01:00.000000000 +0100
--- pgsql/src/backend/storage/ipc/procsignal.c	2010-04-29 12:17:37.000000000 +0200
*************** procsignal_sigusr1_handler(SIGNAL_ARGS)
*** 278,282 ****
--- 278,287 ----
  	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
  		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
  
+ 	/*
+ 	 * PROCSIG_SYNCREP_UNLOCKED doesn't need any special attention,
+ 	 * it only wakes up the backend from pg_usleep()
+ 	 */
+ 
  	errno = save_errno;
  }
diff -dcrpN pgsql.orig/src/backend/storage/lmgr/proc.c pgsql/src/backend/storage/lmgr/proc.c
*** pgsql.orig/src/backend/storage/lmgr/proc.c	2010-04-29 12:09:03.000000000 +0200
--- pgsql/src/backend/storage/lmgr/proc.c	2010-04-29 12:17:37.000000000 +0200
*************** InitProcess(void)
*** 331,336 ****
--- 331,341 ----
  		SHMQueueInit(&(MyProc->myProcLocks[i]));
  	MyProc->recoveryConflictPending = false;
  
+ 	SpinLockInit(&MyProc->mutex);
+ 	MyProc->locked_for_sync = false;
+ 	MyProc->committed_xid = InvalidTransactionId;
+ 	MyProc->sync_reports = 0;
+ 
  	/*
  	 * We might be reusing a semaphore that belonged to a failed process. So
  	 * be careful and reinitialize its value here.	(This is not strictly
*************** LockWaitCancel(void)
*** 621,626 ****
--- 626,644 ----
  	 */
  }
  
+ /*
+  * Cancel waiting for the synchronous replication.
+  */
+ void
+ SyncWaitCancel(void)
+ {
+ 	/*
+ 	 * Don't bother to take the spinlock, we're called from die().
+ 	 * This backend may be under SpinLockAcquire() because we're
+ 	 * looping on locked_for_sync to become true.
+ 	 */
+ 	MyProc->locked_for_sync = false;
+ }
  
  /*
   * ProcReleaseLocks() -- release locks associated with current transaction
diff -dcrpN pgsql.orig/src/backend/tcop/postgres.c pgsql/src/backend/tcop/postgres.c
*** pgsql.orig/src/backend/tcop/postgres.c	2010-04-20 21:22:20.000000000 +0200
--- pgsql/src/backend/tcop/postgres.c	2010-04-29 12:17:37.000000000 +0200
*************** die(SIGNAL_ARGS)
*** 2670,2675 ****
--- 2670,2676 ----
  			/* bump holdoff count to make ProcessInterrupts() a no-op */
  			/* until we are done getting ready for it */
  			InterruptHoldoffCount++;
+ 			SyncWaitCancel();	/* the process may be still waiting for sync replication */
  			LockWaitCancel();	/* prevent CheckDeadLock from running */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
diff -dcrpN pgsql.orig/src/backend/utils/misc/guc.c pgsql/src/backend/utils/misc/guc.c
*** pgsql.orig/src/backend/utils/misc/guc.c	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/backend/utils/misc/guc.c	2010-04-29 12:17:37.000000000 +0200
*************** static struct config_bool ConfigureNames
*** 1263,1268 ****
--- 1263,1277 ----
  		false, NULL, NULL
  	},
  
+ 	{
+ 		{"strict_sync_replication", PGC_SUSET, WAL_REPLICATION,
+ 			gettext_noop("Enables strict synchronous replication."),
+ 			gettext_noop("Transactions will stall until at least \"min_sync_replication_clients\" number of slaves are connected"),
+ 		},
+ 		&StrictSyncRepl,
+ 		false, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL
*************** static struct config_int ConfigureNamesI
*** 1738,1743 ****
--- 1747,1761 ----
  	},
  
  	{
+ 		{"min_sync_replication_clients", PGC_POSTMASTER, WAL_REPLICATION,
+ 			gettext_noop("Sets the minimum number of secondary servers to report back in synchronous replication."),
+ 			NULL
+ 		},
+ 		&WalSndMinSyncSlaves,
+ 		0, 0, INT_MAX / 4, NULL, NULL
+ 	},
+ 
+ 	{
  		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
  			gettext_noop("Sets the delay in microseconds between transaction commit and "
  						 "flushing WAL to disk."),
diff -dcrpN pgsql.orig/src/backend/utils/misc/postgresql.conf.sample pgsql/src/backend/utils/misc/postgresql.conf.sample
*** pgsql.orig/src/backend/utils/misc/postgresql.conf.sample	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/backend/utils/misc/postgresql.conf.sample	2010-04-29 12:17:37.000000000 +0200
***************
*** 195,200 ****
--- 195,210 ----
  #max_wal_senders = 0		# max number of walsender processes
  #wal_sender_delay = 200ms	# 1-10000 milliseconds
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
+ #min_sync_replication_clients = 0
+ 				# min number of walreceiver processes needed
+ 				# to report back for synchronous replication
+ 				# 0 sets async replication; default: 0
+ #strict_sync_replication = off	# strict synchronous replication
+ 				# if the number of sync walsender processes are
+ 				# less than min_sync_replication_clients then
+ 				# the clients will be locked until enough
+ 				# secondary servers connect and report back
+ 				# default: off
  
  
  #------------------------------------------------------------------------------
diff -dcrpN pgsql.orig/src/include/access/xlog.h pgsql/src/include/access/xlog.h
*** pgsql.orig/src/include/access/xlog.h	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/include/access/xlog.h	2010-04-29 12:17:37.000000000 +0200
*************** extern uint64 GetSystemIdentifier(void);
*** 289,294 ****
--- 289,295 ----
  extern Size XLOGShmemSize(void);
  extern void XLOGShmemInit(void);
  extern void BootStrapXLOG(void);
+ extern void ReadRecoveryCommandFile(void);
  extern void StartupXLOG(void);
  extern void ShutdownXLOG(int code, Datum arg);
  extern void InitXLOGAccess(void);
diff -dcrpN pgsql.orig/src/include/replication/walreceiver.h pgsql/src/include/replication/walreceiver.h
*** pgsql.orig/src/include/replication/walreceiver.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/replication/walreceiver.h	2010-04-29 12:22:02.000000000 +0200
***************
*** 16,21 ****
--- 16,22 ----
  #include "storage/spin.h"
  
  extern bool am_walreceiver;
+ extern bool SynchronousSlave;
  
  /*
   * MAXCONNINFO: maximum size of a connection string.
*************** extern bool am_walreceiver;
*** 25,30 ****
--- 26,36 ----
  #define MAXCONNINFO		1024
  
  /*
+  * Maximum number of XIDs sent back at once by walreceiver
+  */
+ #define MAX_SYNC_XIDS_SEND	(256)
+ 
+ /*
   * Values for WalRcv->walRcvState.
   */
  typedef enum
*************** typedef struct
*** 60,77 ****
  	XLogRecPtr	receivedUpto;
  
  	slock_t		mutex;			/* locks shared variables shown above */
  } WalRcvData;
  
  extern WalRcvData *WalRcv;
  
  /* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
  typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
--- 66,89 ----
  	XLogRecPtr	receivedUpto;
  
  	slock_t		mutex;			/* locks shared variables shown above */
+ 
+ 	int32		n_xids;			/* number of currently stored XIDs */
+ 	TransactionId	xids[1];		/* variable number of XIDs to report back to the primary */
  } WalRcvData;
  
  extern WalRcvData *WalRcv;
  
  /* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint, bool *sync);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
  typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef bool (*walrcv_send_type) (char *buffer, int len);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*************** extern bool WalRcvInProgress(void);
*** 83,87 ****
--- 95,100 ----
  extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
  extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
  extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+ extern int WalReceiverAddXid(TransactionId *xids, int n_xids);
  
  #endif   /* _WALRECEIVER_H */
diff -dcrpN pgsql.orig/src/include/replication/walsender.h pgsql/src/include/replication/walsender.h
*** pgsql.orig/src/include/replication/walsender.h	2010-04-29 12:09:04.000000000 +0200
--- pgsql/src/include/replication/walsender.h	2010-04-29 12:18:46.000000000 +0200
***************
*** 21,26 ****
--- 21,27 ----
  typedef struct WalSnd
  {
  	pid_t		pid;			/* this walsender's process id, or 0 */
+ 	bool		sync;			/* the walreceiver on the other side is synchronous slave */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
  
  	slock_t		mutex;			/* locks shared variables shown above */
*************** extern bool am_walsender;
*** 40,47 ****
--- 41,51 ----
  /* user-settable parameters */
  extern int	WalSndDelay;
  extern int	max_wal_senders;
+ extern int	WalSndMinSyncSlaves;
+ extern bool	StrictSyncRepl;
  
  extern int	WalSenderMain(void);
+ extern int	WalSenderSyncCount(void);
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
diff -dcrpN pgsql.orig/src/include/storage/lwlock.h pgsql/src/include/storage/lwlock.h
*** pgsql.orig/src/include/storage/lwlock.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/storage/lwlock.h	2010-04-29 12:17:37.000000000 +0200
*************** typedef enum LWLockId
*** 70,75 ****
--- 70,76 ----
  	RelationMappingLock,
  	AsyncCtlLock,
  	AsyncQueueLock,
+ 	SynchronousReplicationLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
diff -dcrpN pgsql.orig/src/include/storage/procarray.h pgsql/src/include/storage/procarray.h
*** pgsql.orig/src/include/storage/procarray.h	2010-01-23 17:37:12.000000000 +0100
--- pgsql/src/include/storage/procarray.h	2010-04-29 12:17:37.000000000 +0200
*************** extern void XidCacheRemoveRunningXids(Tr
*** 71,74 ****
--- 71,78 ----
  						  int nxids, const TransactionId *xids,
  						  TransactionId latestXid);
  
+ extern void *getProcArray(void);
+ extern int	ProcMinSyncReports(void);
+ extern void ProcSyncReplicationUnlock(TransactionId *xids, int n_xids);
+ 
  #endif   /* PROCARRAY_H */
diff -dcrpN pgsql.orig/src/include/storage/proc.h pgsql/src/include/storage/proc.h
*** pgsql.orig/src/include/storage/proc.h	2010-02-26 03:01:27.000000000 +0100
--- pgsql/src/include/storage/proc.h	2010-04-29 12:17:37.000000000 +0200
***************
*** 15,20 ****
--- 15,21 ----
  #define _PROC_H_
  
  #include "storage/lock.h"
+ #include "storage/s_lock.h"
  #include "storage/pg_sema.h"
  #include "utils/timestamp.h"
  
*************** struct PGPROC
*** 123,128 ****
--- 124,135 ----
  	SHM_QUEUE	myProcLocks[NUM_LOCK_PARTITIONS];
  
  	struct XidCache subxids;	/* cache for subtransaction XIDs */
+ 
+ 	slock_t		mutex;		/* protects the shared variables below */
+ 	volatile bool	locked_for_sync;	/* the backend waits for synchronous replication */
+ 	TransactionId	committed_xid;	/* xid saved here after COMMIT,
+ 						we lock on it for synchronous replication */
+ 	int		sync_reports;	/* number of sync replication clients that reported COMMIT */
  };
  
  /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
*************** extern PGPROC *ProcWakeup(PGPROC *proc, 
*** 191,196 ****
--- 198,204 ----
  extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
  extern bool IsWaitingForLock(void);
  extern void LockWaitCancel(void);
+ extern void SyncWaitCancel(void);
  
  extern void ProcWaitForSignal(void);
  extern void ProcSendSignal(int pid);
diff -dcrpN pgsql.orig/src/include/storage/procsignal.h pgsql/src/include/storage/procsignal.h
*** pgsql.orig/src/include/storage/procsignal.h	2010-02-26 03:01:28.000000000 +0100
--- pgsql/src/include/storage/procsignal.h	2010-04-29 12:17:37.000000000 +0200
*************** typedef enum
*** 40,45 ****
--- 40,48 ----
  	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
  	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
  
+ 	/* Synchronous replication reason */
+ 	PROCSIG_SYNCREP_UNLOCKED,
+ 
  	NUM_PROCSIGNALS				/* Must be last! */
  } ProcSignalReason;
  
diff -dcrpN pgsql.orig/src/interfaces/libpq/exports.txt pgsql/src/interfaces/libpq/exports.txt
*** pgsql.orig/src/interfaces/libpq/exports.txt	2010-01-28 07:28:26.000000000 +0100
--- pgsql/src/interfaces/libpq/exports.txt	2010-04-29 12:17:37.000000000 +0200
*************** PQescapeLiteral           154
*** 157,159 ****
--- 157,160 ----
  PQescapeIdentifier        155
  PQconnectdbParams         156
  PQconnectStartParams      157
+ PQsetDuplexCopy           158
diff -dcrpN pgsql.orig/src/interfaces/libpq/fe-exec.c pgsql/src/interfaces/libpq/fe-exec.c
*** pgsql.orig/src/interfaces/libpq/fe-exec.c	2010-02-26 03:01:32.000000000 +0100
--- pgsql/src/interfaces/libpq/fe-exec.c	2010-04-29 12:17:38.000000000 +0200
***************
*** 16,21 ****
--- 16,22 ----
  
  #include <ctype.h>
  #include <fcntl.h>
+ #include <stdio.h>
  
  #include "libpq-fe.h"
  #include "libpq-int.h"
*************** PQputCopyData(PGconn *conn, const char *
*** 2010,2016 ****
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2011,2018 ----
  {
  	if (!conn)
  		return -1;
! 	if (	(!conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_IN) ||
! 		( conn->duplexCopy && conn->asyncStatus != PGASYNC_COPY_OUT) )
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
*************** PQgetCopyData(PGconn *conn, char **buffe
*** 2174,2179 ****
--- 2176,2211 ----
  }
  
  /*
+  * PQsetDuplexCopy - set the duplex copy flag
+  *
+  * This makes PQputCopyData() able to pass an arbitrary message while
+  * COPY OUT is in progress. This is for synchronous replication.
+  */
+ int
+ PQsetDuplexCopy(PGconn *conn)
+ {
+ 	PGresult   *res;
+ 
+ 	if (pqPutMsgStart('x', false, conn) < 0 ||
+ 		pqPutMsgEnd(conn) < 0)
+ 		goto sendFailed;
+ 
+ 	conn->asyncStatus = PGASYNC_BUSY;
+ 
+ 	res = PQexecFinish(conn);
+ 
+ 	conn->duplexCopy = (PQresultStatus(res) == PGRES_COMMAND_OK);
+ 
+ 	PQclear(res);
+ 
+ 	return (conn->duplexCopy ? 1 : 0);
+ 
+ sendFailed:
+ 	pqHandleSendFailure(conn);
+ 	return 0;
+ }
+ 
+ /*
   * PQgetline - gets a newline-terminated string from the backend.
   *
   * Chiefly here so that applications can use "COPY <rel> to stdout"
diff -dcrpN pgsql.orig/src/interfaces/libpq/libpq-fe.h pgsql/src/interfaces/libpq/libpq-fe.h
*** pgsql.orig/src/interfaces/libpq/libpq-fe.h	2010-02-26 03:01:33.000000000 +0100
--- pgsql/src/interfaces/libpq/libpq-fe.h	2010-04-29 12:17:38.000000000 +0200
*************** extern PGnotify *PQnotifies(PGconn *conn
*** 391,396 ****
--- 391,397 ----
  extern int	PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
  extern int	PQputCopyEnd(PGconn *conn, const char *errormsg);
  extern int	PQgetCopyData(PGconn *conn, char **buffer, int async);
+ extern int	PQsetDuplexCopy(PGconn *conn);
  
  /* Deprecated routines for copy in/out */
  extern int	PQgetline(PGconn *conn, char *string, int length);
diff -dcrpN pgsql.orig/src/interfaces/libpq/libpq-int.h pgsql/src/interfaces/libpq/libpq-int.h
*** pgsql.orig/src/interfaces/libpq/libpq-int.h	2010-03-13 15:55:57.000000000 +0100
--- pgsql/src/interfaces/libpq/libpq-int.h	2010-04-29 12:17:38.000000000 +0200
*************** struct pg_conn
*** 362,367 ****
--- 362,368 ----
  	pgParameterStatus *pstatus; /* ParameterStatus data */
  	int			client_encoding;	/* encoding id */
  	bool		std_strings;	/* standard_conforming_strings */
+ 	bool		duplexCopy;	/* COPY IN/OUT in duplex mode, for synchronous replication */
  	PGVerbosity verbosity;		/* error/notice message verbosity */
  	PGlobjfuncs *lobjfuncs;		/* private state for large-object access fns */
  
#2Fujii Masao
masao.fujii@gmail.com
In reply to: Boszormenyi Zoltan (#1)
Re: Synchronous replication patch built on SR

2010/4/29 Boszormenyi Zoltan <zb@cybertec.at>:

attached is a patch that does $SUBJECT, we are submitting it for 9.1.
I have updated it to today's CVS after the "wal_level" GUC went in.

I'm planning to create the synchronous replication patch for 9.0, too.
My design is outlined in the wiki. Let's work together to do the design
of it.
http://wiki.postgresql.org/wiki/Streaming_Replication#Synchronization_capability

The log-shipping replication has some synchronization levels as follows.
Which are you going to work on?

The transaction commit on the master
#1 doesn't wait for replication (already suppored in 9.0)
#2 waits for WAL to be received by the standby
#3 waits for WAL to be received and flushed by the standby
#4 waits for WAL to be received, flushed and replayed by the standby
..etc?

I'm planning to add #2 and #3 into 9.1. #4 is useful but is outside
the scope of my development for at least 9.1. In #4, read-only query
can easily block recovery by the lock conflict and make the
transaction commit on the master get stuck. This problem is difficult
to be addressed within 9.1, I think. But the design and implementation
of #2 and #3 need to be easily extensible to #4.

How does it work?

First, the walreceiver and the walsender are now able to communicate
in a duplex way on the same connection, so while COPY OUT is
in progress from the primary server, the standby server is able to
issue PQputCopyData() to pass the transaction IDs that were seen
with XLOG_XACT_COMMIT or XLOG_XACT_PREPARE
signatures. I did by adding a new protocol message type, with letter
'x' that's only acknowledged by the walsender process. The regular
backend was intentionally unchanged so an SQL client gets a protocol
error. A new libpq call called PQsetDuplexCopy() which sends this
new message before sending START_REPLICATION. The primary
makes a note of it in the walsender process' entry.

I had to move the TransactionIdLatest(xid, nchildren, children) call
that computes latestXid earlier in RecordTransactionCommit(), so
it's in the critical section now, just before the
XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata)
call. Otherwise, there was a race condition between the primary
and the standby server, where the standby server might have seen
the XLOG_XACT_COMMIT record for some XIDs before the
transaction in the primary server marked itself waiting for this XID,
resulting in stuck transactions.

You seem to have chosen #4 as synchronization level. Right?

In your design, the transaction commit on the master waits for its XID
to be read from the XLOG_XACT_COMMIT record and replied by the standby.
Right? This design seems not to be extensible to #2 and #3 since
walreceiver cannot read XID from the XLOG_XACT_COMMIT record. How about
using LSN instead of XID? That is, the transaction commit waits until
the standby has reached its LSN. LSN is more easy-used for walreceiver
and startup process, I think.

What if the "synchronous" standby starts up from the very old backup?
The transaction on the master needs to wait until a large amount of
outstanding WAL has been applied? I think that synchronous replication
should start with *asynchronous* replication, and should switch to the
sync level after the gap between servers has become enough small.
What's your opinion?

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

Why are these options necessary?

Can these options cover more than three synchronization levels?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#3Boszormenyi Zoltan
zb@cybertec.at
In reply to: Fujii Masao (#2)
Re: Synchronous replication patch built on SR

Fujii Masao �rta:

2010/4/29 Boszormenyi Zoltan <zb@cybertec.at>:

attached is a patch that does $SUBJECT, we are submitting it for 9.1.
I have updated it to today's CVS after the "wal_level" GUC went in.

I'm planning to create the synchronous replication patch for 9.0, too.
My design is outlined in the wiki. Let's work together to do the design
of it.
http://wiki.postgresql.org/wiki/Streaming_Replication#Synchronization_capability

The log-shipping replication has some synchronization levels as follows.
Which are you going to work on?

The transaction commit on the master
#1 doesn't wait for replication (already suppored in 9.0)
#2 waits for WAL to be received by the standby
#3 waits for WAL to be received and flushed by the standby
#4 waits for WAL to be received, flushed and replayed by the standby
..etc?

I'm planning to add #2 and #3 into 9.1. #4 is useful but is outside
the scope of my development for at least 9.1. In #4, read-only query
can easily block recovery by the lock conflict and make the
transaction commit on the master get stuck. This problem is difficult
to be addressed within 9.1, I think. But the design and implementation
of #2 and #3 need to be easily extensible to #4.

How does it work?

First, the walreceiver and the walsender are now able to communicate
in a duplex way on the same connection, so while COPY OUT is
in progress from the primary server, the standby server is able to
issue PQputCopyData() to pass the transaction IDs that were seen
with XLOG_XACT_COMMIT or XLOG_XACT_PREPARE
signatures. I did by adding a new protocol message type, with letter
'x' that's only acknowledged by the walsender process. The regular
backend was intentionally unchanged so an SQL client gets a protocol
error. A new libpq call called PQsetDuplexCopy() which sends this
new message before sending START_REPLICATION. The primary
makes a note of it in the walsender process' entry.

I had to move the TransactionIdLatest(xid, nchildren, children) call
that computes latestXid earlier in RecordTransactionCommit(), so
it's in the critical section now, just before the
XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata)
call. Otherwise, there was a race condition between the primary
and the standby server, where the standby server might have seen
the XLOG_XACT_COMMIT record for some XIDs before the
transaction in the primary server marked itself waiting for this XID,
resulting in stuck transactions.

You seem to have chosen #4 as synchronization level. Right?

Yes.

In your design, the transaction commit on the master waits for its XID
to be read from the XLOG_XACT_COMMIT record and replied by the standby.
Right? This design seems not to be extensible to #2 and #3 since
walreceiver cannot read XID from the XLOG_XACT_COMMIT record.

Yes, this was my problem, too. I would have had to
implement a custom interpreter into walreceiver to
process the WAL records and extract the XIDs.

But at least the supporting details, i.e. not opening another
connection, instead being able to do duplex COPY operations in
a server-acknowledged way is acceptable, no? :-)

How about
using LSN instead of XID? That is, the transaction commit waits until
the standby has reached its LSN. LSN is more easy-used for walreceiver
and startup process, I think.

Indeed, using the LSN seems to be more appropriate for
the walreceiver, but how would you extract the information
that a certain LSN means a COMMITted transaction? Or
we could release a locked transaction in case the master receives
an LSN greater than or equal to the transaction's own LSN?

Sending back all the LSNs in case of long transactions would
increase the network traffic compared to sending back only the
XIDs, but the amount is not clear for me. What I am more
worried about is the contention on the ProcArrayLock.
XIDs are rarer then LSNs, no?

What if the "synchronous" standby starts up from the very old backup?
The transaction on the master needs to wait until a large amount of
outstanding WAL has been applied? I think that synchronous replication
should start with *asynchronous* replication, and should switch to the
sync level after the gap between servers has become enough small.
What's your opinion?

It's certainly one option, which I think partly addressed
with the "strict_sync_replication" knob below.
If strict_sync_replication = off, then the master doesn't make
its transactions wait for the synchronous reports, and the client(s)
can work through their WALs. IIRC, the walreceiver connects
to the master only very late in the recovery process, no?

It would be nicer if it could be made automatic. I simply thought
that there may be situations where the "strict" behaviour may be
desired. I was thinking about the transactions executed on the
master between the standby startup and walreceiver connection.
Someone may want to ensure the synchronous behaviour
for every xact, no matter the amount of time it needs. Someone
else will prefer synchronous behaviour whenever possible but
also ensure quick enough response time even if standbys aren't
started up yet. This dilemma cried for such a GUC, it cannot be
decided automatically.

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

Why are these options necessary?

Can these options cover more than three synchronization levels?

I think I explained it in my mail.

If min_sync_replication_clients == 0, then the replication is async.
If min_sync_replication_clients == max_wal_senders then the
replication is fully synchronous.
If 0 < min_sync_replication_clients < max_wal_senders then
the replication is partially synchronous, i.e. the master can wait
only for say, 50% of the clients to report back before it's considered
synchronous and the relevant transactions get released from the wait.

Best regards,
Zolt�n B�sz�rm�nyi

--
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
http://www.postgresql.at/

#4Robert Haas
robertmhaas@gmail.com
In reply to: Boszormenyi Zoltan (#3)
Re: Synchronous replication patch built on SR

On Fri, May 14, 2010 at 9:33 AM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

If  min_sync_replication_clients == 0, then the replication is async.
If  min_sync_replication_clients == max_wal_senders then the
replication is fully synchronous.
If 0 < min_sync_replication_clients < max_wal_senders then
the replication is partially synchronous, i.e. the master can wait
only for say, 50% of the clients to report back before it's considered
synchronous and the relevant transactions get released from the wait.

That's an interesting design and in some ways pretty elegant, but it
rules out some things that people might easily want to do - for
example, synchronous replication to the other server in the same data
center that acts as a backup for the master; and asynchronous
replication to a reporting server located off-site.

One of the things that I think we will probably need/want to change
eventually is the fact that the master has no real knowledge of who
the replication slaves are. That might be something we want to change
in order to be able to support more configurability. Inventing syntax
out of whole cloth and leaving semantics to the imagination of the
reader:

CREATE REPLICATION SLAVE reporting_server (mode asynchronous, xid_feedback on);
CREATE REPLICATION SLAVE failover_server (mode synchronous,
xid_feedback off, break_synchrep_timeout 30);

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company

#5Boszormenyi Zoltan
zb@cybertec.at
In reply to: Robert Haas (#4)
Re: Synchronous replication patch built on SR

Robert Haas �rta:

On Fri, May 14, 2010 at 9:33 AM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

If min_sync_replication_clients == 0, then the replication is async.
If min_sync_replication_clients == max_wal_senders then the
replication is fully synchronous.
If 0 < min_sync_replication_clients < max_wal_senders then
the replication is partially synchronous, i.e. the master can wait
only for say, 50% of the clients to report back before it's considered
synchronous and the relevant transactions get released from the wait.

That's an interesting design and in some ways pretty elegant, but it
rules out some things that people might easily want to do - for
example, synchronous replication to the other server in the same data
center that acts as a backup for the master; and asynchronous
replication to a reporting server located off-site.

No, it doesn't. :-) You didn't take into account the third knob
usable in recovery.conf:
synchronous_slave = on/off
The off-site reporting server can be an asynchronous standby,
while the on-site backup server can be synchronous. The only thing
you need to take into account is that min_sync_replication_clients
shouldn't ever exceed your actual number of synchronous standbys.
The setup these three knobs provide is pretty flexible I think.

One of the things that I think we will probably need/want to change
eventually is the fact that the master has no real knowledge of who
the replication slaves are.

The changes I made in my patch partly changes that,
the server still doesn't know "who" the standbys are
but there's a call that returns the number of connected
_synchronous_ standbys.

That might be something we want to change
in order to be able to support more configurability. Inventing syntax
out of whole cloth and leaving semantics to the imagination of the
reader:

CREATE REPLICATION SLAVE reporting_server (mode asynchronous, xid_feedback on);
CREATE REPLICATION SLAVE failover_server (mode synchronous,
xid_feedback off, break_synchrep_timeout 30);

--
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
http://www.postgresql.at/

#6Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Boszormenyi Zoltan (#5)
Re: Synchronous replication patch built on SR

BTW, What I'd like to see as a very first patch first is to change the
current poll loops in walreceiver and walsender to, well, not poll.
That's a requirement for synchronous replication, is very useful on its
own, and requires a some design and implementation effort to get right.
It would be nice to get that out of the way before/during we discuss the
more user-visible behavior.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#7Simon Riggs
simon@2ndQuadrant.com
In reply to: Robert Haas (#4)
Re: Synchronous replication patch built on SR

On Fri, 2010-05-14 at 15:15 -0400, Robert Haas wrote:

On Fri, May 14, 2010 at 9:33 AM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

If min_sync_replication_clients == 0, then the replication is async.
If min_sync_replication_clients == max_wal_senders then the
replication is fully synchronous.
If 0 < min_sync_replication_clients < max_wal_senders then
the replication is partially synchronous, i.e. the master can wait
only for say, 50% of the clients to report back before it's considered
synchronous and the relevant transactions get released from the wait.

That's an interesting design and in some ways pretty elegant, but it
rules out some things that people might easily want to do - for
example, synchronous replication to the other server in the same data
center that acts as a backup for the master; and asynchronous
replication to a reporting server located off-site.

The design above allows the case you mention:
min_sync_replication_clients = 1
max_wal_senders = 2

It works well in failure cases, such as the case where the local backup
server goes down.

It seems exactly what we need to me, though not sure about names.

One of the things that I think we will probably need/want to change
eventually is the fact that the master has no real knowledge of who
the replication slaves are. That might be something we want to change
in order to be able to support more configurability. Inventing syntax
out of whole cloth and leaving semantics to the imagination of the
reader:

CREATE REPLICATION SLAVE reporting_server (mode asynchronous, xid_feedback on);
CREATE REPLICATION SLAVE failover_server (mode synchronous,
xid_feedback off, break_synchrep_timeout 30);

I am against labelling servers as synchronous/asynchronous. We've had
this discussion a few times since 2008.

There is significant advantage in having the user specify the level of
robustness, so that it can vary from transaction to transaction, just as
already happens at commit. That way the user gets to say what happens.
Look for threads on "transaction controlled robustness".

As alluded to above, if you label the servers you also need to say what
happens when one or more of them are down. e.g. "synchronous to B AND
async to C, except when B is not available, in which case make C
synchronous". With N servers, you end up needing to specify O(N^2) rules
for what happens, so it only works neatly for 2, maybe 3 servers.

--
Simon Riggs www.2ndQuadrant.com

#8Fujii Masao
masao.fujii@gmail.com
In reply to: Boszormenyi Zoltan (#3)
Re: Synchronous replication patch built on SR

Thanks for your reply!

On Fri, May 14, 2010 at 10:33 PM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

In your design, the transaction commit on the master waits for its XID
to be read from the XLOG_XACT_COMMIT record and replied by the standby.
Right? This design seems not to be extensible to #2 and #3 since
walreceiver cannot read XID from the XLOG_XACT_COMMIT record.

Yes, this was my problem, too. I would have had to
implement a custom interpreter into walreceiver to
process the WAL records and extract the XIDs.

Isn't reading the same WAL twice (by walreceiver and startup process)
inefficient? In synchronous replication, the overhead of walreceiver
directly affects the performance of the master. We should not assign
such a hard work to walreceiver, I think.

But at least the supporting details, i.e. not opening another
connection, instead being able to do duplex COPY operations in
a server-acknowledged way is acceptable, no? :-)

Though I might not understand your point (sorry), it's OK for the standby
to send the reply to the master by using CopyData message. Currently
PQputCopyData() cannot be executed in COPY OUT, but we can relax
that.

 How about
using LSN instead of XID? That is, the transaction commit waits until
the standby has reached its LSN. LSN is more easy-used for walreceiver
and startup process, I think.

Indeed, using the LSN seems to be more appropriate for
the walreceiver, but how would you extract the information
that a certain LSN means a COMMITted transaction? Or
we could release a locked transaction in case the master receives
an LSN greater than or equal to the transaction's own LSN?

Yep, we can ensure that the transaction has been replicated by
comparing its own LSN with the smallest LSN in the latest LSNs
of each connected "synchronous" standby.

Sending back all the LSNs in case of long transactions would
increase the network traffic compared to sending back only the
XIDs, but the amount is not clear for me. What I am more
worried about is the contention on the ProcArrayLock.
XIDs are rarer then LSNs, no?

No. For example, when WAL data sent by walsender at a time
has two XLOG_XACT_COMMIT records, in XID approach, walreceiver
would need to send two replies. OTOH, in LSN approach, only
one reply which indicates the last received location would
need to be sent.

What if the "synchronous" standby starts up from the very old backup?
The transaction on the master needs to wait until a large amount of
outstanding WAL has been applied? I think that synchronous replication
should start with *asynchronous* replication, and should switch to the
sync level after the gap between servers has become enough small.
What's your opinion?

It's certainly one option, which I think partly addressed
with the "strict_sync_replication" knob below.
If strict_sync_replication = off, then the master doesn't make
its transactions wait for the synchronous reports, and the client(s)
can work through their WALs. IIRC, the walreceiver connects
to the master only very late in the recovery process, no?

No, the master might have a large number of WAL files which
the standby doesn't have.

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

Why are these options necessary?

Can these options cover more than three synchronization levels?

I think I explained it in my mail.

If  min_sync_replication_clients == 0, then the replication is async.
If  min_sync_replication_clients == max_wal_senders then the
replication is fully synchronous.
If 0 < min_sync_replication_clients < max_wal_senders then
the replication is partially synchronous, i.e. the master can wait
only for say, 50% of the clients to report back before it's considered
synchronous and the relevant transactions get released from the wait.

Seems s/min_sync_replication_clients/max_sync_replication_clients

min_sync_replication_clients is required to prevent outside attacker
from connecting to the master as "synchronous" standby, and degrading
the performance on the master? Other usecase?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#9Fujii Masao
masao.fujii@gmail.com
In reply to: Heikki Linnakangas (#6)
Re: Synchronous replication patch built on SR

On Sat, May 15, 2010 at 4:59 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

BTW, What I'd like to see as a very first patch first is to change the
current poll loops in walreceiver and walsender to, well, not poll.
That's a requirement for synchronous replication, is very useful on its
own, and requires a some design and implementation effort to get right.
It would be nice to get that out of the way before/during we discuss the
more user-visible behavior.

Yeah, we should wake up the walesender from sleep to send WAL data
as soon as it's flushed. But why do we need to change the loop of
walreceiver? Or you mean changing the poll loop in the startup process?

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#10Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Fujii Masao (#9)
Re: Synchronous replication patch built on SR

On 18/05/10 07:41, Fujii Masao wrote:

On Sat, May 15, 2010 at 4:59 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

BTW, What I'd like to see as a very first patch first is to change the
current poll loops in walreceiver and walsender to, well, not poll.
That's a requirement for synchronous replication, is very useful on its
own, and requires a some design and implementation effort to get right.
It would be nice to get that out of the way before/during we discuss the
more user-visible behavior.

Yeah, we should wake up the walesender from sleep to send WAL data
as soon as it's flushed. But why do we need to change the loop of
walreceiver? Or you mean changing the poll loop in the startup process?

Yeah, changing the poll loop in the startup process is what I meant.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#11Boszormenyi Zoltan
zb@cybertec.at
In reply to: Fujii Masao (#8)
Re: Synchronous replication patch built on SR

Fujii Masao �rta:

Thanks for your reply!

On Fri, May 14, 2010 at 10:33 PM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

In your design, the transaction commit on the master waits for its XID
to be read from the XLOG_XACT_COMMIT record and replied by the standby.
Right? This design seems not to be extensible to #2 and #3 since
walreceiver cannot read XID from the XLOG_XACT_COMMIT record.

Yes, this was my problem, too. I would have had to
implement a custom interpreter into walreceiver to
process the WAL records and extract the XIDs.

Isn't reading the same WAL twice (by walreceiver and startup process)
inefficient?

Yes, and I didn't implement that because it's inefficient.
I implemented a minimal communication between
StartupXLOG() and the walreceiver.

In synchronous replication, the overhead of walreceiver
directly affects the performance of the master. We should not assign
such a hard work to walreceiver, I think.

Exactly.

But at least the supporting details, i.e. not opening another
connection, instead being able to do duplex COPY operations in
a server-acknowledged way is acceptable, no? :-)

Though I might not understand your point (sorry), it's OK for the standby
to send the reply to the master by using CopyData message.

I thought about the same.

Currently
PQputCopyData() cannot be executed in COPY OUT, but we can relax
that.

And I implemented just that, in a way that upon walreceiver startup
it sends a new protocol message to the walsender by calling
PQsetDuplexCopy() (see my patch) and the walsender response is ACK.
This protocol message is intentionally not handled by the normal
backend, so plain libpq clients cannot mess up their COPY streams.

How about
using LSN instead of XID? That is, the transaction commit waits until
the standby has reached its LSN. LSN is more easy-used for walreceiver
and startup process, I think.

Indeed, using the LSN seems to be more appropriate for
the walreceiver, but how would you extract the information
that a certain LSN means a COMMITted transaction? Or
we could release a locked transaction in case the master receives
an LSN greater than or equal to the transaction's own LSN?

Yep, we can ensure that the transaction has been replicated by
comparing its own LSN with the smallest LSN in the latest LSNs
of each connected "synchronous" standby.

Sending back all the LSNs in case of long transactions would
increase the network traffic compared to sending back only the
XIDs, but the amount is not clear for me. What I am more
worried about is the contention on the ProcArrayLock.
XIDs are rarer then LSNs, no?

No. For example, when WAL data sent by walsender at a time
has two XLOG_XACT_COMMIT records, in XID approach, walreceiver
would need to send two replies. OTOH, in LSN approach, only
one reply which indicates the last received location would
need to be sent.

I see.

What if the "synchronous" standby starts up from the very old backup?
The transaction on the master needs to wait until a large amount of
outstanding WAL has been applied? I think that synchronous replication
should start with *asynchronous* replication, and should switch to the
sync level after the gap between servers has become enough small.
What's your opinion?

It's certainly one option, which I think partly addressed
with the "strict_sync_replication" knob below.
If strict_sync_replication = off, then the master doesn't make
its transactions wait for the synchronous reports, and the client(s)
can work through their WALs. IIRC, the walreceiver connects
to the master only very late in the recovery process, no?

No, the master might have a large number of WAL files which
the standby doesn't have.

We can change the walreceiver so it sends similarly encapsulated
messages as the walsender does. In our patch, the walreceiver
currently sends the raw XIDs. If we add a minimal protocol
encapsulation, we can distinguish between the XIDs (or later LSNs)
and the "mark me synchronous from now on" message.

The only problem is: what should be the point when such a client
becomes synchronous from the master's POV, so the XID/LSN reports
will count and transactions are made to wait for this client?

As a side note, the async walreceivers' behaviour should be kept
so they don't send anything back and the message that
PQsetDuplexCopy() sends to the master would then only
prepare the walsender that its client will become synchronous
in the near future.

I have added 3 new options, two GUCs in postgresql.conf and one
setting in recovery.conf. These options are:

1. min_sync_replication_clients = N

where N is the number of reports for a given transaction before it's
released as committed synchronously. 0 means completely asynchronous,
the value is maximized by the value of max_wal_senders. Anything
in between 0 and max_wal_senders means different levels of partially
synchronous replication.

2. strict_sync_replication = boolean

where the expected number of synchronous reports from standby
servers is further limited to the actual number of connected synchronous
standby servers if the value of this GUC is false. This means that if
no standby servers are connected yet then the replication is asynchronous
and transactions are allowed to finish without waiting for synchronous
reports. If the value of this GUC is true, then transactions wait until
enough synchronous standbys connect and report back.

Why are these options necessary?

Can these options cover more than three synchronization levels?

I think I explained it in my mail.

If min_sync_replication_clients == 0, then the replication is async.
If min_sync_replication_clients == max_wal_senders then the
replication is fully synchronous.
If 0 < min_sync_replication_clients < max_wal_senders then
the replication is partially synchronous, i.e. the master can wait
only for say, 50% of the clients to report back before it's considered
synchronous and the relevant transactions get released from the wait.

Seems s/min_sync_replication_clients/max_sync_replication_clients

No, "min" is indicating the minimum number of walreceiver reports
needed before a transaction can be released from under the waiting.
The other reports coming from walreceivers are ignored.

min_sync_replication_clients is required to prevent outside attacker
from connecting to the master as "synchronous" standby, and degrading
the performance on the master?

???

Properly configured pg_hba.conf prevents outside attackers
to connect as replication clients, no?

Other usecase?

Regards,

--
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
http://www.postgresql.at/

#12Fujii Masao
masao.fujii@gmail.com
In reply to: Boszormenyi Zoltan (#11)
Re: Synchronous replication patch built on SR

On Wed, May 19, 2010 at 5:41 PM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

Isn't reading the same WAL twice (by walreceiver and startup process)
inefficient?

Yes, and I didn't implement that because it's inefficient.

So I'd like to propose to use LSN instead of XID since LSN can
be easily handled by both walreceiver and startup process.

 Currently
PQputCopyData() cannot be executed in COPY OUT, but we can relax
that.

And I implemented just that, in a way that upon walreceiver startup
it sends a new protocol message to the walsender by calling
PQsetDuplexCopy() (see my patch) and the walsender response is ACK.
This protocol message is intentionally not handled by the normal
backend, so plain libpq clients cannot mess up their COPY streams.

The newly-introduced message type "Set Duplex Copy" is really required?
I think that the standby can send its replication mode to the master
via Query or CopyData message, which are already used in SR. For example,
how about including the mode in the handshake message "START_REPLICATION"?
If we do that, we would not need to introduce new libpq function
PQsetDuplexCopy(). BTW, I often got the complaints about adding
new libpq function when I implemented SR ;)

In the patch, PQputCopyData() checks the newly-introduced pg_conn field
"duplexCopy". Instead, how about checking the existing field "replication"?
Or we can just allow PQputCopyData() to go even in COPY OUT state.

We can change the walreceiver so it sends similarly encapsulated
messages as the walsender does. In our patch, the walreceiver
currently sends the raw XIDs. If we add a minimal protocol
encapsulation, we can distinguish between the XIDs (or later LSNs)
and the "mark me synchronous from now on" message.

The only problem is: what should be the point when such a client
becomes synchronous from the master's POV, so the XID/LSN reports
will count and transactions are made to wait for this client?

One idea is to switch to "sync" when the gap of LSN becomes less
than or equal to XLOG_SEG_SIZE (currently 8MB). That is, walsender
calculates the gap from the current write WAL location on the master
and the last receive/flush/replay location on the standby. And if
the gap <= XLOG_SEG_SIZE, it instructs backends to wait for
replication from then on.

As a side note, the async walreceivers' behaviour should be kept
so they don't send anything back and the message that
PQsetDuplexCopy() sends to the master would then only
prepare the walsender that its client will become synchronous
in the near future.

I agree that walreceiver should send no replication ack if "async"
mode is chosen. OTOH, in "sync" case, walreceiver should always
send ack even if the gap is large and the master doesn't wait for
replication yet. As mentioned above, walsender needs to calculate
the gap from the ack.

Seems s/min_sync_replication_clients/max_sync_replication_clients

No, "min" is indicating the minimum number of walreceiver reports
needed before a transaction can be released from under the waiting.
The other reports coming from walreceivers are ignored.

Hmm... when min_sync_replication_clients = 2 and there are three
"synchronous" standbys, the master waits for only two standbys?

The standby which the master ignores is fixed? or dynamically (or
randomly) changed?

min_sync_replication_clients is required to prevent outside attacker
from connecting to the master as "synchronous" standby, and degrading
the performance on the master?

???

Properly configured pg_hba.conf prevents outside attackers
to connect as replication clients, no?

Yes :)

I'd like to just know the use case of min_sync_replication_clients.
Sorry, I've not understood yet how useful this option is.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#13Boszormenyi Zoltan
zb@cybertec.at
In reply to: Fujii Masao (#12)
Re: Synchronous replication patch built on SR

Fujii Masao �rta:

On Wed, May 19, 2010 at 5:41 PM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

Isn't reading the same WAL twice (by walreceiver and startup process)
inefficient?

Yes, and I didn't implement that because it's inefficient.

So I'd like to propose to use LSN instead of XID since LSN can
be easily handled by both walreceiver and startup process.

OK, I will look into it replacing XIDs with LSNs.

Currently
PQputCopyData() cannot be executed in COPY OUT, but we can relax
that.

And I implemented just that, in a way that upon walreceiver startup
it sends a new protocol message to the walsender by calling
PQsetDuplexCopy() (see my patch) and the walsender response is ACK.
This protocol message is intentionally not handled by the normal
backend, so plain libpq clients cannot mess up their COPY streams.

The newly-introduced message type "Set Duplex Copy" is really required?
I think that the standby can send its replication mode to the master
via Query or CopyData message, which are already used in SR. For example,
how about including the mode in the handshake message "START_REPLICATION"?
If we do that, we would not need to introduce new libpq function
PQsetDuplexCopy(). BTW, I often got the complaints about adding
new libpq function when I implemented SR ;)

:-)

In the patch, PQputCopyData() checks the newly-introduced pg_conn field
"duplexCopy". Instead, how about checking the existing field "replication"?

I didn't see there was such a new field. (looking...) I can see now,
it was added in the middle of the structure. Ok, we can then use it
to allow duplex COPY instead of my new field. I suppose it's non-NULL
if replication is on, right? Then the extra call is not needed then.

Or we can just allow PQputCopyData() to go even in COPY OUT state.

I think this may not be too useful for SQL clients, but who knows? :-)
Use cases, anyone?

We can change the walreceiver so it sends similarly encapsulated
messages as the walsender does. In our patch, the walreceiver
currently sends the raw XIDs. If we add a minimal protocol
encapsulation, we can distinguish between the XIDs (or later LSNs)
and the "mark me synchronous from now on" message.

The only problem is: what should be the point when such a client
becomes synchronous from the master's POV, so the XID/LSN reports
will count and transactions are made to wait for this client?

One idea is to switch to "sync" when the gap of LSN becomes less
than or equal to XLOG_SEG_SIZE (currently 8MB). That is, walsender
calculates the gap from the current write WAL location on the master
and the last receive/flush/replay location on the standby. And if
the gap <= XLOG_SEG_SIZE, it instructs backends to wait for
replication from then on.

This is a sensible idea.

As a side note, the async walreceivers' behaviour should be kept
so they don't send anything back and the message that
PQsetDuplexCopy() sends to the master would then only
prepare the walsender that its client will become synchronous
in the near future.

I agree that walreceiver should send no replication ack if "async"
mode is chosen. OTOH, in "sync" case, walreceiver should always
send ack even if the gap is large and the master doesn't wait for
replication yet. As mentioned above, walsender needs to calculate
the gap from the ack.

Agreed.

Seems s/min_sync_replication_clients/max_sync_replication_clients

No, "min" is indicating the minimum number of walreceiver reports
needed before a transaction can be released from under the waiting.
The other reports coming from walreceivers are ignored.

Hmm... when min_sync_replication_clients = 2 and there are three
"synchronous" standbys, the master waits for only two standbys?

Yes. This is the idea, "partially synchronous replication".
I heard anecdotes about replication solutions where say
ensuring that (say) if at least 50% of the machines across the
whole cluster report back synchronously then the transaction
is considered replicated "good enough".

The standby which the master ignores is fixed? or dynamically (or
randomly) changed?

It may be randomly changed, depending on who send the reports
first. The replication servers themselves may get very busy with
large queries or they may be loaded by some other ways and
be somewhat late in processing the WAL stream. The less loaded
servers answer first, and the transaction is considered properly
replicated.

min_sync_replication_clients is required to prevent outside attacker
from connecting to the master as "synchronous" standby, and degrading
the performance on the master?

???

Properly configured pg_hba.conf prevents outside attackers
to connect as replication clients, no?

Yes :)

I'd like to just know the use case of min_sync_replication_clients.
Sorry, I've not understood yet how useful this option is.

I hope I answered it. :-)

Best regards,
Zolt�n B�sz�rm�nyi

--
Bible has answers for everything. Proof:
"But let your communication be, Yea, yea; Nay, nay: for whatsoever is more
than these cometh of evil." (Matthew 5:37) - basics of digital technology.
"May your kingdom come" - superficial description of plate tectonics

----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
http://www.postgresql.at/

#14Fujii Masao
masao.fujii@gmail.com
In reply to: Boszormenyi Zoltan (#13)
Re: Synchronous replication patch built on SR

On Wed, May 19, 2010 at 9:58 PM, Boszormenyi Zoltan <zb@cybertec.at> wrote:

In the patch, PQputCopyData() checks the newly-introduced pg_conn field
"duplexCopy". Instead, how about checking the existing field "replication"?

I didn't see there was such a new field. (looking...) I can see now,
it was added in the middle of the structure. Ok, we can then use it
to allow duplex COPY instead of my new field. I suppose it's non-NULL
if replication is on, right? Then the extra call is not needed then.

Right. Usually the first byte of the pg_conn field seems to be also
checked as follows, but I'm not sure if that is valuable for this case.

if (conn->replication && conn->replication[0])

Or we can just allow PQputCopyData() to go even in COPY OUT state.

I think this may not be too useful for SQL clients, but who knows? :-)
Use cases, anyone?

It's for only replication.

Hmm... when min_sync_replication_clients = 2 and there are three
"synchronous" standbys, the master waits for only two standbys?

Yes. This is the idea, "partially synchronous replication".
I heard anecdotes about replication solutions where say
ensuring that (say) if at least 50% of the machines across the
whole cluster report back synchronously then the transaction
is considered replicated "good enough".

Oh, I got. I heard such a use case for the first time.

We seem to have many ideas about the knobs to control synchronization
levels, and would need to clarify which ones to be implemented for 9.1.

I'd like to just know the use case of min_sync_replication_clients.
Sorry, I've not understood yet how useful this option is.

I hope I answered it. :-)

Yep. Thanks!

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center