*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2018,2023 **** SET ENABLE_SEQSCAN TO OFF;
--- 2018,2131 ----
       </variablelist>
      </sect2>
  
+     <sect2 id="runtime-config-sync-rep">
+      <title>Synchronous Replication</title>
+ 
+      <para>
+       These settings control the behavior of the built-in
+       <firstterm>synchronous replication</> feature.
+       These parameters would be set on the primary server that is
+       to send replication data to one or more standby servers.
+      </para>
+ 
+      <variablelist>
+      <varlistentry id="guc-synchronous-replication" xreflabel="synchronous_replication">
+       <term><varname>synchronous_replication</varname> (<type>boolean</type>)</term>
+       <indexterm>
+        <primary><varname>synchronous_replication</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Specifies whether transaction commit will wait for WAL records
+         to be replicated before the command returns a <quote>success</>
+         indication to the client.  The default setting is <literal>off</>.
+         When <literal>on</>, there will be a delay while the client waits
+         for confirmation of successful replication. That delay will
+         increase depending upon the physical distance and network activity
+         between primary and standby. The commit wait will last until a
+         reply from the current synchronous standby indicates it has received
+         the commit record of the transaction. Synchronous standbys must
+         already have been defined (see <xref linkend="guc-sync-standby-names">).
+        </para>
+        <para>
+         This parameter can be changed at any time; the
+         behavior for any one transaction is determined by the setting in
+         effect when it commits.  It is therefore possible, and useful, to have
+         some transactions replicate synchronously and others asynchronously.
+         For example, to make a single multistatement transaction commit
+         asynchronously when the default is synchronous replication, issue
+         <command>SET LOCAL synchronous_replication TO OFF</> within the
+         transaction.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry id="guc-sync-replication-timeout-client" xreflabel="sync_replication_timeout">
+       <term><varname>sync_replication_timeout</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>sync_replication_timeout</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         If the client has <varname>synchronous_replication</varname> set,
+         and a synchronous standby is currently available
+         then the commit will wait for up to <varname>replication_timeout_client</>
+         seconds before it returns a <quote>success</>. The commit will wait
+         forever for a confirmation when <varname>replication_timeout_client</>
+         is set to 0.
+        </para>
+        <para>
+         If the client has <varname>synchronous_replication</varname> set,
+ 		and yet no synchronous standby is available when we commit then we
+ 		don't wait at all.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry id="guc-sync-standby-names" xreflabel="synchronous_standby_names">
+       <term><varname>synchronous_standby_names</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>synchronous_standby_names</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Specifies a priority ordered list of standby names that can offer
+         synchronous replication.  At any one time there will be just one
+         synchronous standby that will wake sleeping users following commit.
+         The synchronous standby will be the first named standby that is
+         both currently connected and streaming in real-time to the standby
+         (as shown by a state of "STREAMING").  Other standby servers
+         with listed later will become potential synchronous standbys.
+         If the current synchronous standby disconnects for whatever reason
+         it will be replaced immediately with the next highest priority standby.
+         Specifying more than one standby name can allow very high availability.
+        </para>
+        <para>
+         The standby name is currently taken as the application_name of the
+         standby, as set in the primary_conninfo on the standby. Names are
+         not enforced for uniqueness. In case of duplicates one of the standbys
+         will be chosen to be the synchronous standby, though exactly which
+         one is indeterminate.
+        </para>
+        <para>
+         The default is the special entry <literal>*</> which matches any
+         application_name, including the default application name of
+         <literal>walsender</>. This is not recommended and a more carefully
+         thought through configuration will be desirable.
+        </para>
+        <para>
+         If a standby is removed from the list of servers then it will stop
+         being the synchronous standby, allowing another to take it's place.
+         If the list is empty, synchronous replication will not be
+         possible, whatever the setting of <varname>synchronous_replication</>.
+         Standbys may also be added to the list without restarting the server.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      </variablelist>
+     </sect2>
+ 
      <sect2 id="runtime-config-standby">
      <title>Standby Servers</title>
  
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 875,880 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 875,1107 ----
     </sect3>
  
    </sect2>
+   <sect2 id="synchronous-replication">
+    <title>Synchronous Replication</title>
+ 
+    <indexterm zone="high-availability">
+     <primary>Synchronous Replication</primary>
+    </indexterm>
+ 
+    <para>
+     <productname>PostgreSQL</> streaming replication is asynchronous by
+     default. If the primary server
+     crashes then some transactions that were committed may not have been
+     replicated to the standby server, causing data loss. The amount
+     of data loss is proportional to the replication delay at the time of
+     failover.
+    </para>
+ 
+    <para>
+ 	Synchronous replication offers the ability to confirm that all changes
+ 	made by a transaction have been transferred to one synchronous standby
+ 	server. This extends the standard level of durability
+ 	offered by a transaction commit. This level of protection is referred
+ 	to as 2-safe replication in computer science theory.
+    </para>
+ 
+    <para>
+ 	When requesting synchronous replication, each commit of a
+ 	write transaction will wait until confirmation is
+ 	received that the commit has been written to the transaction log on disk
+ 	of both the primary and standby server. The only possibility that data
+ 	can be lost is if both the primary and the standby suffer crashes at the
+ 	same time. This can provide a much higher level of durability, though only
+ 	if the sysadmin is cautious about the placement and management of the two
+ 	servers.  Waiting for confirmation increases the user's confidence that the
+ 	changes will not be lost in the event of server crashes but it also
+ 	necessarily increases the response time for the requesting transaction.
+ 	The minimum wait time is the roundtrip time between primary to standby.
+    </para>
+ 
+    <para>
+ 	Read only transactions and transaction rollbacks need not wait for
+ 	replies from standby servers. Subtransaction commits do not wait for
+ 	responses from standby servers, only top-level commits. Long
+ 	running actions such as data loading or index building do not wait
+ 	until the very final commit message. All two-phase commit actions
+ 	require commit waits, including both prepare and commit.
+    </para>
+ 
+    <sect3 id="synchronous-replication-config">
+     <title>Basic Configuration</title>
+ 
+    <para>
+     All parameters have useful default values, so we can enable
+     synchronous replication easily just by setting this on the primary
+ 
+ <programlisting>
+ synchronous_replication = on
+ </programlisting>
+ 
+ 	When <varname>synchronous_replication</> is set, a commit will wait
+ 	for up to <varname>synchronous_replication_timeout</> seconds to
+ 	confirm that the standby has received the commit record. Both
+ 	<varname>synchronous_replication</> and
+ 	<varname>synchronous_replication_timeout</> can be set by individual
+ 	users, so can be configured in the configuration file, for particular
+ 	users or databases, or dynamically by applications programs.
+ 	It is possible for user sessions to reach timeout even though
+ 	standbys are communicating normally. In that case, the setting of
+ 	<varname>synchronous_replication_timeout</> is probably too low though
+ 	you probably have other system or network issues as well.
+    </para>
+ 
+    <para>
+     After a commit record has been written to disk on the primary the
+     WAL record is then sent to the standby. The standby sends reply
+     messages each time a new batch of WAL data is received, unless
+ 	<varname>wal_receiver_status_interval</> is set to zero on the standby.
+ 	If the standby is the first matching standby, as specified in
+ 	<varname>synchronous_standby_names</> on the primary, the reply
+ 	messages from that standby will be used to wake users waiting for
+ 	confirmation the commit record has been received. These parameters
+ 	allow the administrator to specify which standby servers should be
+ 	synchronous standbys. Note that the configuration of synchronous
+ 	replication is mainly on the master.
+    </para>
+ 
+    <para>
+     The default setting of <varname>synchronous_replication_timeout</> is
+     120 seconds to ensure that users do not wait forever if all specified
+     standby servers go down. If you wish to have stronger guarantees the
+     timeout can be set higher, or even to zero, meaning wait forever.
+     Users will stop waiting if a fast shutdown is requested, though the
+     server does not fully shutdown until all outstanding WAL records are
+     transferred to standby servers.
+    </para>
+ 
+    <para>
+     Note also that <varname>synchronous_commit</> is used when the user
+     specifies <varname>synchronous_replication</>, overriding even an
+     explicit setting of <varname>synchronous_commit</> to <literal>off</>.
+     This is because we must write WAL to disk on primary before we replicate
+     to ensure the standby never gets ahead of the primary.
+    </para>
+ 
+    </sect3>
+ 
+    <sect3 id="synchronous-replication-performance">
+     <title>Planning for Performance</title>
+ 
+    <para>
+ 	Synchronous replication usually requires carefully planned and placed
+ 	standby servers to ensure applications perform acceptably. Waiting
+ 	doesn't utilise system resources, but transaction locks continue to be
+ 	held until the transfer is confirmed. As a result, incautious use of
+ 	synchronous replication will reduce performance for database
+ 	applications because of increased response times and higher contention.
+    </para>
+ 
+    <para>
+ 	<productname>PostgreSQL</> allows the application developer
+ 	to specify the durability level required via replication. This can be
+ 	specified for the system overall, though it can also be specified for
+ 	specific users or connections, or even individual transactions.
+    </para>
+ 
+    <para>
+ 	For example, an application workload might consist of:
+ 	10% of changes are important customer details, while
+ 	90% of changes are less important data that the business can more
+ 	easily survive if it is lost, such as chat messages between users.
+    </para>
+ 
+    <para>
+ 	With synchronous replication options specified at the application level
+ 	(on the primary) we can offer sync rep for the most important changes,
+ 	without slowing down the bulk of the total workload. Application level
+ 	options are an important and practical tool for allowing the benefits of
+ 	synchronous replication for high performance applications.
+    </para>
+ 
+    <para>
+ 	You should consider that the network bandwidth must be higher than
+ 	the rate of generation of WAL data.
+ 	10% of changes are important customer details, while
+ 	90% of changes are less important data that the business can more
+ 	easily survive if it is lost, such as chat messages between users.
+    </para>
+ 
+    </sect3>
+ 
+    <sect3 id="synchronous-replication-ha">
+     <title>Planning for High Availability</title>
+ 
+    <para>
+     The easiest and safest method of gaining High Availability using
+     synchronous replication is to configure at least two standby servers.
+     To understand why, we need to examine what can happen when you lose all
+     standby servers.
+    </para>
+ 
+    <para>
+     Commits made when synchronous_replication is set will wait until at
+     the sync standby responds. The response may never occur if the last,
+     or only, standby should crash or the network drops. What should we do in
+     that situation?
+    </para>
+ 
+    <para>
+     If a standby was available immediately after commit we will wait.
+     Sitting and waiting will typically cause operational problems
+ 	because it is an effective outage of the primary server should all
+ 	sessions end up waiting. This is why we offer the facility to set
+ 	<varname>synchronous_replication_timeout</>.
+    </para>
+ 
+    <para>
+     Once the last synchronous standby has been lost we allow transactions
+     to skip waiting, since we know there isn't anybody to reply, or at
+     least we might expect it to be some time before one returns. You will
+     note that this provides high availability but a primary server working
+     alone could allow changes that are not replicated to other servers,
+     placing your data at risk if the primary fails also.
+    </para>
+ 
+    <para>
+ 	The best solution for avoiding data loss is to ensure you don't lose
+ 	your last remaining sync standby. This can be achieved by naming multiple
+ 	potential synchronous standbys using <varname>synchronous_standby_names</>.
+ 	The first named standby will be used as the synchronous standby. Standbys
+ 	listed after this will takeover the role of synchronous standby if the
+ 	first one should fail.
+    </para>
+ 
+    <para>
+ 	When a standby first attaches to the primary, it will not yet be properly
+ 	synchronized. This is described as <literal>CATCHUP</> mode. Once
+ 	the lag between standby and primary reaches zero for the first time
+ 	we move to real-time <literal>STREAMING</> state.
+ 	The catch-up duration may be long immediately after the standby has
+ 	been created. If the standby is shutdown, then the catch-up period
+ 	will increase according to the length of time the standby has been down.
+ 	The standby is only able to become a synchronous standby
+ 	once it has reached <literal>STREAMING</> state.
+    </para>
+ 
+    <para>
+ 	If primary crashes while commits are waiting for acknowledgement, those
+ 	waiting transactions will be marked fully committed once the primary
+ 	database recovers.
+ 	There is no way to be certain that all standbys have received all
+ 	outstanding WAL data at time of the crash of the primary. Some
+ 	transactions may not show as committed on the standby, even though
+ 	they show as committed on the primary. The guarantee we offer is that
+ 	the application will not receive explicit acknowledgement of the
+ 	successful commit of a transaction until the WAL data is known to be
+ 	safely received by the standby.
+    </para>
+ 
+    <para>
+ 	If you need to re-create a standby server while transactions are
+ 	waiting, make sure that the commands to run pg_start_backup() and
+ 	pg_stop_backup() are run in a session with
+ 	synchronous_replication = off, otherwise those requests will wait
+ 	forever for the standby to appear.
+    </para>
+ 
+    </sect3>
+   </sect2>
    </sect1>
  
    <sect1 id="warm-standby-failover">
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 56,61 ****
--- 56,62 ----
  #include "pg_trace.h"
  #include "pgstat.h"
  #include "replication/walsender.h"
+ #include "replication/syncrep.h"
  #include "storage/fd.h"
  #include "storage/predicate.h"
  #include "storage/procarray.h"
***************
*** 1071,1076 **** EndPrepare(GlobalTransaction gxact)
--- 1072,1085 ----
  
  	END_CRIT_SECTION();
  
+ 	/*
+ 	 * Wait for synchronous replication, if required.
+ 	 *
+ 	 * Note that at this stage we have marked the prepare, but still show as
+ 	 * running in the procarray (twice!) and continue to hold locks.
+ 	 */
+ 	SyncRepWaitForLSN(gxact->prepare_lsn);
+ 
  	records.tail = records.head = NULL;
  }
  
***************
*** 2030,2035 **** RecordTransactionCommitPrepared(TransactionId xid,
--- 2039,2052 ----
  	MyProc->inCommit = false;
  
  	END_CRIT_SECTION();
+ 
+ 	/*
+ 	 * Wait for synchronous replication, if required.
+ 	 *
+ 	 * Note that at this stage we have marked clog, but still show as
+ 	 * running in the procarray and continue to hold locks.
+ 	 */
+ 	SyncRepWaitForLSN(recptr);
  }
  
  /*
***************
*** 2109,2112 **** RecordTransactionAbortPrepared(TransactionId xid,
--- 2126,2137 ----
  	TransactionIdAbortTree(xid, nchildren, children);
  
  	END_CRIT_SECTION();
+ 
+ 	/*
+ 	 * Wait for synchronous replication, if required.
+ 	 *
+ 	 * Note that at this stage we have marked clog, but still show as
+ 	 * running in the procarray and continue to hold locks.
+ 	 */
+ 	SyncRepWaitForLSN(recptr);
  }
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 37,42 ****
--- 37,43 ----
  #include "miscadmin.h"
  #include "pgstat.h"
  #include "replication/walsender.h"
+ #include "replication/syncrep.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/lmgr.h"
***************
*** 1055,1061 **** RecordTransactionCommit(void)
  	 * if all to-be-deleted tables are temporary though, since they are lost
  	 * anyway if we crash.)
  	 */
! 	if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0)
  	{
  		/*
  		 * Synchronous commit case:
--- 1056,1062 ----
  	 * if all to-be-deleted tables are temporary though, since they are lost
  	 * anyway if we crash.)
  	 */
! 	if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested())
  	{
  		/*
  		 * Synchronous commit case:
***************
*** 1125,1130 **** RecordTransactionCommit(void)
--- 1126,1139 ----
  	/* Compute latestXid while we have the child XIDs handy */
  	latestXid = TransactionIdLatest(xid, nchildren, children);
  
+ 	/*
+ 	 * Wait for synchronous replication, if required.
+ 	 *
+ 	 * Note that at this stage we have marked clog, but still show as
+ 	 * running in the procarray and continue to hold locks.
+ 	 */
+ 	SyncRepWaitForLSN(XactLastRecEnd);
+ 
  	/* Reset XactLastRecEnd until the next transaction writes something */
  	XactLastRecEnd.xrecoff = 0;
  
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 521,526 **** CREATE VIEW pg_stat_replication AS
--- 521,527 ----
              W.write_location,
              W.flush_location,
              W.replay_location
+             W.sync_priority
      FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
              pg_stat_get_wal_senders() AS W
      WHERE S.usesysid = U.oid AND
*** a/src/backend/postmaster/autovacuum.c
--- b/src/backend/postmaster/autovacuum.c
***************
*** 1527,1532 **** AutoVacWorkerMain(int argc, char *argv[])
--- 1527,1539 ----
  	SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
+ 	 * Force synchronous replication off to allow regular maintenance even
+ 	 * if we are waiting for standbys to connect. This is important to
+ 	 * ensure we aren't blocked from performing anti-wraparound tasks.
+ 	 */
+ 	SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE);
+ 
+ 	/*
  	 * Get the info about the database we're going to work on.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 1836,1842 **** retry1:
  					 errmsg("the database system is starting up")));
  			break;
  		case CAC_SHUTDOWN:
! 			ereport(FATAL,
  					(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  					 errmsg("the database system is shutting down")));
  			break;
--- 1836,1843 ----
  					 errmsg("the database system is starting up")));
  			break;
  		case CAC_SHUTDOWN:
! 			if (!am_walsender)
! 				ereport(FATAL,
  					(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  					 errmsg("the database system is shutting down")));
  			break;
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 13,19 **** top_builddir = ../../..
  include $(top_builddir)/src/Makefile.global
  
  OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! 	repl_gram.o
  
  include $(top_srcdir)/src/backend/common.mk
  
--- 13,19 ----
  include $(top_builddir)/src/Makefile.global
  
  OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
! 	repl_gram.o syncrep.o
  
  include $(top_srcdir)/src/backend/common.mk
  
*** /dev/null
--- b/src/backend/replication/syncrep.c
***************
*** 0 ****
--- 1,617 ----
+ /*-------------------------------------------------------------------------
+  *
+  * syncrep.c
+  *
+  * Synchronous replication is new as of PostgreSQL 9.1.
+  *
+  * If requested, transaction commits wait until their commit LSN is
+  * acknowledged by the standby, or the wait hits timeout.
+  *
+  * This module contains the code for waiting and release of backends.
+  * All code in this module executes on the primary. The core streaming
+  * replication transport remains within WALreceiver/WALsender modules.
+  *
+  * The essence of this design is that it isolates all logic about
+  * waiting/releasing onto the primary. The primary defines which standbys
+  * it wishes to wait for. The standby is completely unaware of the
+  * durability requirements of transactions on the primary, reducing the
+  * complexity of the code and streamlining both standby operations and
+  * network bandwidth because there is no requirement to ship
+  * per-transaction state information.
+  *
+  * The bookeeping approach we take is that a commit is either synchronous
+  * or not synchronous (async). If it is async, we just fastpath out of
+  * here. If it is sync, then in 9.1 we wait for the flush location on the
+  * standby before releasing the waiting backend. Further complexity
+  * in that interaction is expected in later releases.
+  *
+  * The best performing way to manage the waiting backends is to have a
+  * single ordered queue of waiting backends, so that we can avoid
+  * searching the through all waiters each time we receive a reply.
+  *
+  * Starting sync replication is a multi stage process. First, the standby
+  * must be a potential synchronous standby. Next, we must have caught up
+  * with the primary; that may take some time. If there is no current
+  * synchronous standby then the WALsender will offer a sync rep service.
+  *
+  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include <unistd.h>
+ 
+ #include "access/xact.h"
+ #include "access/xlog_internal.h"
+ #include "miscadmin.h"
+ #include "postmaster/autovacuum.h"
+ #include "replication/syncrep.h"
+ #include "replication/walsender.h"
+ #include "storage/latch.h"
+ #include "storage/ipc.h"
+ #include "storage/pmsignal.h"
+ #include "storage/proc.h"
+ #include "utils/builtins.h"
+ #include "utils/guc.h"
+ #include "utils/guc_tables.h"
+ #include "utils/memutils.h"
+ #include "utils/ps_status.h"
+ 
+ /* User-settable parameters for sync rep */
+ bool	sync_rep_mode = false;			/* Only set in user backends */
+ int		sync_rep_timeout = 120;			/* Only set in user backends */
+ char 	*SyncRepStandbyNames;
+ 
+ bool	WaitingForSyncRep = false;	/* Global state for some exit methods */
+ 
+ #define	IsOnSyncRepQueue()		(MyProc->lwWaiting)
+ 
+ static bool announce_next_takeover = true;
+ 
+ static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN);
+ static void SyncRepRemoveFromQueue(void);
+ static void SyncRepAddToQueue(void);
+ static long SyncRepGetWaitTimeout(void);
+ 
+ static int SyncRepGetStandbyPriority(void);
+ static int SyncRepWakeQueue(void);
+ 
+ 
+ /*
+  * ===========================================================
+  * Synchronous Replication functions for normal user backends
+  * ===========================================================
+  */
+ 
+ /*
+  * Wait for synchronous replication, if requested by user.
+  */
+ void
+ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
+ {
+ 	/*
+ 	 * Fast exit if user has not requested sync replication, or
+ 	 * streaming replication is inactive in this server.
+ 	 */
+ 	if (!SyncRepRequested() || max_wal_senders == 0)
+ 		return;
+ 
+ 	/*
+ 	 * Wait on queue. We check for a fast exit once we have the lock.
+ 	 */
+ 	SyncRepWaitOnQueue(XactCommitLSN);
+ }
+ 
+ void
+ SyncRepCleanupAtProcExit(int code, Datum arg)
+ {
+ 	if (IsOnSyncRepQueue())
+ 	{
+ 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 		SyncRepRemoveFromQueue();
+ 		LWLockRelease(SyncRepLock);
+ 	}
+ 
+ 	if (MyProc != NULL)
+ 		DisownLatch(&MyProc->waitLatch);
+ }
+ 
+ /*
+  * Wait for specified LSN to be confirmed at the requested level
+  * of durability. Each proc has its own wait latch, so we perform
+  * a normal latch check/wait loop here.
+  */
+ static void
+ SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN)
+ {
+ 	volatile WalSndCtlData *walsndctl = WalSndCtl;
+ 	volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ 	TimestampTz	now = GetCurrentTransactionStopTimestamp();
+ 	long		timeout = SyncRepGetWaitTimeout();
+ 	char 		*new_status = NULL;
+ 	const char *old_status;
+ 	int			len;
+ 	bool		wait_on_queue = false;
+ 
+ 	ereport(DEBUG3,
+ 			(errmsg("synchronous replication waiting for %X/%X starting at %s",
+ 						XactCommitLSN.xlogid,
+ 						XactCommitLSN.xrecoff,
+ 						timestamptz_to_str(GetCurrentTransactionStopTimestamp()))));
+ 
+ 	for (;;)
+ 	{
+ 		ResetLatch(&MyProc->waitLatch);
+ 
+ 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 
+ 		/*
+ 		 * First time through, add ourselves to the queue.
+ 		 */
+ 		if (!IsOnSyncRepQueue())
+ 		{
+ 			int i;
+ 
+ 			/*
+ 			 * Wait no longer if we have already reached our LSN
+ 			 */
+ 			if (XLByteLE(XactCommitLSN, queue->lsn))
+ 			{
+ 				/* No need to wait */
+ 				LWLockRelease(SyncRepLock);
+ 				return;
+ 			}
+ 
+ 			/*
+ 			 * Check that we have at least one sync standby active that
+ 			 * has caught up with the primary.
+ 			 */
+ 			for (i = 0; i < max_wal_senders; i++)
+ 			{
+ 				/* use volatile pointer to prevent code rearrangement */
+ 				volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 
+ 				if (walsnd->pid != 0 &&
+ 					walsnd->sync_standby_priority > 0 &&
+ 					walsnd->state == WALSNDSTATE_STREAMING)
+ 				{
+ 					wait_on_queue = true;
+ 					break;
+ 				}
+ 			}
+ 
+ 			/*
+ 			 * Leave quickly if we don't have a sync standby that will
+ 			 * confirm it has received our commit.
+ 			 */
+ 			if (!wait_on_queue)
+ 			{
+ 				LWLockRelease(SyncRepLock);
+ 				return;
+ 			}
+ 
+ 			/*
+ 			 * Set our waitLSN so WALSender will know when to wake us.
+ 			 * We set this before we add ourselves to queue, so that
+ 			 * any proc on the queue can be examined freely without
+ 			 * taking a lock on each process in the queue.
+ 			 */
+ 			MyProc->waitLSN = XactCommitLSN;
+ 			SyncRepAddToQueue();
+ 			LWLockRelease(SyncRepLock);
+ 			WaitingForSyncRep = true;
+ 
+ 			/*
+ 			 * Alter ps display to show waiting for sync rep.
+ 			 */
+ 			if (update_process_title)
+ 			{
+ 				old_status = get_ps_display(&len);
+ 				new_status = (char *) palloc(len + 21 + 1);
+ 				memcpy(new_status, old_status, len);
+ 				strcpy(new_status + len, " waiting for sync rep");
+ 				set_ps_display(new_status, false);
+ 				new_status[len] = '\0'; /* truncate off " waiting" */
+ 			}
+ 		}
+ 		else
+ 		{
+ 			bool release = false;
+ 			bool timed_out = false;
+ 
+ 			/*
+ 			 * Check the LSN on our queue and if it's moved far enough then
+ 			 * remove us from the queue. First time through this is
+ 			 * unlikely to be far enough, yet is possible. Next time we are
+ 			 * woken we should be more lucky.
+ 			 */
+ 			if (XLByteLE(XactCommitLSN, queue->lsn))
+ 				release = true;
+ 			else if (timeout > 0 &&
+ 				TimestampDifferenceExceeds(GetCurrentTransactionStopTimestamp(),
+ 											now, timeout))
+ 			{
+ 				release = true;
+ 				timed_out = true;
+ 			}
+ 
+ 			if (release)
+ 			{
+ 				SyncRepRemoveFromQueue();
+ 				LWLockRelease(SyncRepLock);
+ 				WaitingForSyncRep = false;
+ 
+ 				/*
+ 				 * Reset our waitLSN.
+ 				 */
+ 				MyProc->waitLSN.xlogid = 0;
+ 				MyProc->waitLSN.xrecoff = 0;
+ 
+ 				if (new_status)
+ 				{
+ 					/* Reset ps display */
+ 					set_ps_display(new_status, false);
+ 					pfree(new_status);
+ 				}
+ 
+ 				/*
+ 				 * Our response to the timeout is to simply post a NOTICE and
+ 				 * then return to the user. The commit has happened, we just
+ 				 * haven't been able to verify it has been replicated in the
+ 				 * way requested.
+ 				 */
+ 				if (timed_out)
+ 					ereport(NOTICE,
+ 							(errmsg("synchronous replication timeout at %s",
+ 										timestamptz_to_str(now))));
+ 				else
+ 					ereport(DEBUG3,
+ 							(errmsg("synchronous replication wait complete at %s",
+ 										timestamptz_to_str(now))));
+ 				return;
+ 			}
+ 
+ 			LWLockRelease(SyncRepLock);
+ 		}
+ 
+ 		WaitLatch(&MyProc->waitLatch, timeout);
+ 		now = GetCurrentTimestamp();
+ 	}
+ }
+ 
+ /*
+  * Remove myself from sync rep wait queue.
+  *
+  * Assume on queue at start; will not be on queue at end.
+  * Queue is already locked at start and remains locked on exit.
+  */
+ static void
+ SyncRepRemoveFromQueue(void)
+ {
+ 	volatile WalSndCtlData *walsndctl = WalSndCtl;
+ 	volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ 	PGPROC	*proc = queue->head;
+ 
+ 	Assert(IsOnSyncRepQueue());
+ 
+ 	proc = queue->head;
+ 
+ 	if (proc == MyProc)
+ 	{
+ 		if (MyProc->lwWaitLink == NULL)
+ 		{
+ 			/*
+ 			 * We were the only waiter on the queue. Reset head and tail.
+ 			 */
+ 			Assert(queue->tail == MyProc);
+ 			queue->head = NULL;
+ 			queue->tail = NULL;
+ 		}
+ 		else
+ 			/*
+ 			 * Move head to next proc on the queue.
+ 			 */
+ 			queue->head = MyProc->lwWaitLink;
+ 	}
+ 	else
+ 	{
+ 		bool	found = false;
+ 
+ 		while (proc->lwWaitLink != NULL)
+ 		{
+ 			/* Are we the next proc in our traversal of the queue? */
+ 			if (proc->lwWaitLink == MyProc)
+ 			{
+ 				/*
+ 				 * Remove ourselves from middle of queue.
+ 				 * No need to touch head or tail.
+ 				 */
+ 				proc->lwWaitLink = MyProc->lwWaitLink;
+ 				found = true;
+ 				break;
+ 			}
+ 
+ 			proc = proc->lwWaitLink;
+ 		}
+ 
+ 		if (!found)
+ 			elog(WARNING, "could not locate ourselves on wait queue");
+ 
+ 		if (proc->lwWaitLink == NULL)	/* At tail */
+ 		{
+ 			Assert(proc != MyProc);
+ 			/* Remove ourselves from tail of queue */
+ 			Assert(queue->tail == MyProc);
+ 			queue->tail = proc;
+ 			proc->lwWaitLink = NULL;
+ 		}
+ 	}
+ 	MyProc->lwWaitLink = NULL;
+ 	MyProc->lwWaiting = false;
+ }
+ 
+ /*
+  * Add myself to sync rep wait queue.
+  *
+  * Assume not on queue at start; will be on queue at end.
+  * Queue is already locked at start and remains locked on exit.
+  */
+ static void
+ SyncRepAddToQueue(void)
+ {
+ 	volatile WalSndCtlData *walsndctl = WalSndCtl;
+ 	volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ 	PGPROC	*tail = queue->tail;
+ 
+ 	/*
+ 	 * Add myself to tail of wait queue.
+ 	 */
+ 	if (tail == NULL)
+ 	{
+ 		queue->head = MyProc;
+ 		queue->tail = MyProc;
+ 	}
+ 	else
+ 	{
+ 		/*
+ 		 * XXX extra code needed here to maintain sorted invariant.
+ 		 * Our approach should be same as racing car - slow in, fast out.
+ 		 */
+ 		Assert(tail->lwWaitLink == NULL);
+ 		tail->lwWaitLink = MyProc;
+ 	}
+ 	queue->tail = MyProc;
+ 
+ 	MyProc->lwWaiting = true;
+ 	MyProc->lwWaitLink = NULL;
+ }
+ 
+ /*
+  * Return a value that we can use directly in WaitLatch(). We need to
+  * handle special values, plus convert from seconds to microseconds.
+  *
+  */
+ static long
+ SyncRepGetWaitTimeout(void)
+ {
+ 	if (sync_rep_timeout == 0)
+ 		return -1L;
+ 
+ 	return 1000000L * sync_rep_timeout;
+ }
+ 
+ /*
+  * ===========================================================
+  * Synchronous Replication functions for wal sender processes
+  * ===========================================================
+  */
+ 
+ /*
+  * Take any action required to initialise sync rep state from config
+  * data. Called at WALSender startup and after each SIGHUP.
+  */
+ void
+ SyncRepInitConfig(void)
+ {
+ 	int priority;
+ 
+ 	/*
+ 	 * Determine if we are a potential sync standby and remember the result
+ 	 * for handling replies from standby.
+ 	 */
+ 	priority = SyncRepGetStandbyPriority();
+ 	if (MyWalSnd->sync_standby_priority != priority)
+ 	{
+ 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 		MyWalSnd->sync_standby_priority = priority;
+ 		LWLockRelease(SyncRepLock);
+ 		ereport(DEBUG1,
+ 				(errmsg("standby \"%s\" now has synchronous standby priority %u",
+ 						application_name, priority)));
+ 	}
+ }
+ 
+ /*
+  * Update the LSNs on each queue based upon our latest state. This
+  * implements a simple policy of first-valid-standby-releases-waiter.
+  *
+  * Other policies are possible, which would change what we do here and what
+  * perhaps also which information we store as well.
+  */
+ void
+ SyncRepReleaseWaiters(void)
+ {
+ 	volatile WalSndCtlData *walsndctl = WalSndCtl;
+ 	volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ 	volatile WalSnd *syncWalSnd = NULL;
+ 	int 		numprocs = 0;
+ 	int			priority = 0;
+ 	int			i;
+ 
+ 	/*
+ 	 * If this WALSender is serving a standby that is not on the list of
+ 	 * potential standbys then we have nothing to do. If we are still
+ 	 * starting up or still running base backup, then leave quicly also.
+ 	 */
+ 	if (MyWalSnd->sync_standby_priority == 0 ||
+ 		MyWalSnd->state < WALSNDSTATE_CATCHUP)
+ 		return;
+ 
+ 	/*
+ 	 * We're a potential sync standby. Release waiters if we are the
+ 	 * highest priority standby. We do this even if the standby is not yet
+ 	 * caught up, in case this is a restart situation and
+ 	 * there are backends waiting for us. That allows backends to exit the
+ 	 * wait state even if new backends cannot yet enter the wait state.
+ 	 */
+ 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
+ 
+ 		if (walsnd->pid != 0 &&
+ 			walsnd->sync_standby_priority > 0 &&
+ 			(priority == 0 ||
+ 			 priority < walsnd->sync_standby_priority))
+ 		{
+ 			 priority = walsnd->sync_standby_priority;
+ 			 syncWalSnd = walsnd;
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * We should have found ourselves at least.
+ 	 */
+ 	Assert(syncWalSnd);
+ 
+ 	/*
+ 	 * If we aren't managing the highest priority standby then just leave.
+ 	 */
+ 	if (syncWalSnd != MyWalSnd)
+ 	{
+ 		LWLockRelease(SyncRepLock);
+ 		announce_next_takeover = true;
+ 		return;
+ 	}
+ 
+ 	if (XLByteLT(queue->lsn, MyWalSnd->flush))
+ 	{
+ 		/*
+ 		 * Set the lsn first so that when we wake backends they will
+ 		 * release up to this location.
+ 		 */
+ 		queue->lsn = MyWalSnd->flush;
+ 		numprocs = SyncRepWakeQueue();
+ 	}
+ 
+ 	LWLockRelease(SyncRepLock);
+ 
+ 	elog(DEBUG3, "released %d procs up to %X/%X",
+ 					numprocs,
+ 					MyWalSnd->flush.xlogid,
+ 					MyWalSnd->flush.xrecoff);
+ 
+ 	/*
+ 	 * If we are managing the highest priority standby, though we weren't
+ 	 * prior to this, then announce we are now the sync standby.
+ 	 */
+ 	if (announce_next_takeover)
+ 	{
+ 		announce_next_takeover = false;
+ 		ereport(LOG,
+ 				(errmsg("standby \"%s\" is now the synchronous standby with priority %u",
+ 						application_name, MyWalSnd->sync_standby_priority)));
+ 	}
+ }
+ 
+ /*
+  * Check if we are in the list of sync standbys, and if so, determine
+  * priority sequence. Return priority if set, or zero to indicate that
+  * we are not a potential sync standby.
+  *
+  * Compare the parameter SyncRepStandbyNames against the application_name
+  * for this WALSender, or allow any name if we find a wildcard "*".
+  */
+ static int
+ SyncRepGetStandbyPriority(void)
+ {
+ 	char	   *rawstring;
+ 	List	   *elemlist;
+ 	ListCell   *l;
+ 	int			priority = 0;
+ 	bool		found = false;
+ 
+ 	/* Need a modifiable copy of string */
+ 	rawstring = pstrdup(SyncRepStandbyNames);
+ 
+ 	/* Parse string into list of identifiers */
+ 	if (!SplitIdentifierString(rawstring, ',', &elemlist))
+ 	{
+ 		/* syntax error in list */
+ 		pfree(rawstring);
+ 		list_free(elemlist);
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ 		   errmsg("invalid list syntax for parameter \"synchronous_standby_names\"")));
+ 		return 0;
+ 	}
+ 
+ 	foreach(l, elemlist)
+ 	{
+ 		char	   *standby_name = (char *) lfirst(l);
+ 
+ 		priority++;
+ 
+ 		if (pg_strcasecmp(standby_name, application_name) == 0 ||
+ 			pg_strcasecmp(standby_name, "*") == 0)
+ 		{
+ 			found = true;
+ 			break;
+ 		}
+ 	}
+ 
+ 	pfree(rawstring);
+ 	list_free(elemlist);
+ 
+ 	return (found ? priority : 0);
+ }
+ 
+ /*
+  * Walk queue from head setting the latches of any procs that need
+  * to be woken. We don't modify the queue, we leave that for individual
+  * procs to release themselves.
+  *
+  * Must hold SyncRepLock
+  */
+ static int
+ SyncRepWakeQueue(void)
+ {
+ 	volatile WalSndCtlData *walsndctl = WalSndCtl;
+ 	volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue);
+ 	PGPROC	*proc = queue->head;
+ 	int		numprocs = 0;
+ 
+ 	/* fast exit for empty queue */
+ 	if (proc == NULL)
+ 		return 0;
+ 
+ 	for (; proc != NULL; proc = proc->lwWaitLink)
+ 	{
+ 		/*
+ 		 * Assume the queue is ordered by LSN
+ 		 */
+ 		if (XLByteLT(queue->lsn, proc->waitLSN))
+ 			return numprocs;
+ 
+ 		numprocs++;
+ 		SetLatch(&proc->waitLatch);
+ 	}
+ 
+ 	return numprocs;
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 66,72 ****
  WalSndCtlData *WalSndCtl = NULL;
  
  /* My slot in the shared memory array */
! static WalSnd *MyWalSnd = NULL;
  
  /* Global state */
  bool		am_walsender = false;		/* Am I a walsender process ? */
--- 66,72 ----
  WalSndCtlData *WalSndCtl = NULL;
  
  /* My slot in the shared memory array */
! WalSnd *MyWalSnd = NULL;
  
  /* Global state */
  bool		am_walsender = false;		/* Am I a walsender process ? */
***************
*** 174,179 **** WalSenderMain(void)
--- 174,181 ----
  		SpinLockRelease(&walsnd->mutex);
  	}
  
+ 	SyncRepInitConfig();
+ 
  	/* Main loop of walsender */
  	return WalSndLoop();
  }
***************
*** 584,589 **** ProcessStandbyReplyMessage(void)
--- 586,593 ----
  		walsnd->apply = reply.apply;
  		SpinLockRelease(&walsnd->mutex);
  	}
+ 
+ 	SyncRepReleaseWaiters();
  }
  
  /*
***************
*** 700,705 **** WalSndLoop(void)
--- 704,710 ----
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
+ 			SyncRepInitConfig();
  		}
  
  		/*
***************
*** 771,777 **** WalSndLoop(void)
--- 776,787 ----
  		 * that point might wait for some time.
  		 */
  		if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup)
+ 		{
+ 			ereport(DEBUG1,
+ 					(errmsg("standby \"%s\" has now caught up with primary",
+ 									application_name)));
  			WalSndSetState(WALSNDSTATE_STREAMING);
+ 		}
  
  		ProcessRepliesIfAny();
  	}
***************
*** 1304,1310 **** WalSndGetStateString(WalSndState state)
  Datum
  pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  {
! #define PG_STAT_GET_WAL_SENDERS_COLS 	6
  	ReturnSetInfo	   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  	TupleDesc			tupdesc;
  	Tuplestorestate	   *tupstore;
--- 1314,1320 ----
  Datum
  pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  {
! #define PG_STAT_GET_WAL_SENDERS_COLS 	7
  	ReturnSetInfo	   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  	TupleDesc			tupdesc;
  	Tuplestorestate	   *tupstore;
***************
*** 1346,1351 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1356,1362 ----
  		XLogRecPtr	write;
  		XLogRecPtr	flush;
  		XLogRecPtr	apply;
+ 		int      	sync_priority;
  		WalSndState	state;
  		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
  		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
***************
*** 1361,1366 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1372,1381 ----
  		apply = walsnd->apply;
  		SpinLockRelease(&walsnd->mutex);
  
+ 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ 		sync_priority = walsnd->sync_standby_priority;
+ 		LWLockRelease(SyncRepLock);
+ 
  		memset(nulls, 0, sizeof(nulls));
  		values[0] = Int32GetDatum(walsnd->pid);
  
***************
*** 1370,1380 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  			 * Only superusers can see details. Other users only get
  			 * the pid value to know it's a walsender, but no details.
  			 */
! 			nulls[1] = true;
! 			nulls[2] = true;
! 			nulls[3] = true;
! 			nulls[4] = true;
! 			nulls[5] = true;
  		}
  		else
  		{
--- 1385,1391 ----
  			 * Only superusers can see details. Other users only get
  			 * the pid value to know it's a walsender, but no details.
  			 */
! 			MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
  		}
  		else
  		{
***************
*** 1401,1406 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
--- 1412,1419 ----
  			snprintf(location, sizeof(location), "%X/%X",
  					 apply.xlogid, apply.xrecoff);
  			values[5] = CStringGetTextDatum(location);
+ 
+ 			values[6] = Int32GetDatum(sync_priority);
  		}
  
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
*** a/src/backend/storage/lmgr/proc.c
--- b/src/backend/storage/lmgr/proc.c
***************
*** 39,44 ****
--- 39,45 ----
  #include "access/xact.h"
  #include "miscadmin.h"
  #include "postmaster/autovacuum.h"
+ #include "replication/syncrep.h"
  #include "storage/ipc.h"
  #include "storage/lmgr.h"
  #include "storage/pmsignal.h"
***************
*** 196,201 **** InitProcGlobal(void)
--- 197,203 ----
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
  		ProcGlobal->freeProcs = &procs[i];
+ 		InitSharedLatch(&procs[i].waitLatch);
  	}
  
  	/*
***************
*** 214,219 **** InitProcGlobal(void)
--- 216,222 ----
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
  		ProcGlobal->autovacFreeProcs = &procs[i];
+ 		InitSharedLatch(&procs[i].waitLatch);
  	}
  
  	/*
***************
*** 224,229 **** InitProcGlobal(void)
--- 227,233 ----
  	{
  		AuxiliaryProcs[i].pid = 0;		/* marks auxiliary proc as not in use */
  		PGSemaphoreCreate(&(AuxiliaryProcs[i].sem));
+ 		InitSharedLatch(&procs[i].waitLatch);
  	}
  
  	/* Create ProcStructLock spinlock, too */
***************
*** 326,331 **** InitProcess(void)
--- 330,341 ----
  		SHMQueueInit(&(MyProc->myProcLocks[i]));
  	MyProc->recoveryConflictPending = false;
  
+ 	/* Initialise the waitLSN for sync rep */
+ 	MyProc->waitLSN.xlogid = 0;
+ 	MyProc->waitLSN.xrecoff = 0;
+ 
+ 	OwnLatch((Latch *) &MyProc->waitLatch);
+ 
  	/*
  	 * We might be reusing a semaphore that belonged to a failed process. So
  	 * be careful and reinitialize its value here.	(This is not strictly
***************
*** 365,370 **** InitProcessPhase2(void)
--- 375,381 ----
  	/*
  	 * Arrange to clean that up at backend exit.
  	 */
+ 	on_shmem_exit(SyncRepCleanupAtProcExit, 0);
  	on_shmem_exit(RemoveProcFromArray, 0);
  }
  
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 2861,2866 **** ProcessInterrupts(void)
--- 2861,2894 ----
  			ereport(FATAL,
  					(errcode(ERRCODE_ADMIN_SHUTDOWN),
  					 errmsg("terminating autovacuum process due to administrator command")));
+ 		else if (WaitingForSyncRep)
+ 		{
+ 			/*
+ 			 * This must NOT be a FATAL message. We want the state of the
+ 			 * transaction being aborted to be indeterminate to ensure that
+ 			 * the transaction completion guarantee is never broken.
+ 			 */
+ 			ereport(WARNING,
+ 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+ 					 errmsg("terminating connection because fast shutdown is requested"),
+ 			errdetail("This connection requested synchronous replication at commit"
+ 					  " yet confirmation of replication has not been received."
+ 					  " The transaction has committed locally and might be committed"
+ 					  " on recently disconnected standby servers also.")));
+ 
+ 			/*
+ 			 * We DO NOT want to run proc_exit() callbacks -- we're here because
+ 			 * we are shutting down and don't want any code to stall or
+ 			 * prevent that.
+ 			 */
+ 			on_exit_reset();
+ 
+ 			/*
+ 			 * Note we do exit(0) not exit(>0). This is to avoid forcing
+ 			 * postmaster into a system reset cycle.
+ 			 */
+ 			exit(0);
+ 		}
  		else if (RecoveryConflictPending && RecoveryConflictRetryable)
  		{
  			pgstat_report_recovery_conflict(RecoveryConflictReason);
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 55,60 ****
--- 55,61 ----
  #include "postmaster/postmaster.h"
  #include "postmaster/syslogger.h"
  #include "postmaster/walwriter.h"
+ #include "replication/syncrep.h"
  #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/bufmgr.h"
***************
*** 754,759 **** static struct config_bool ConfigureNamesBool[] =
--- 755,768 ----
  		true, NULL, NULL
  	},
  	{
+ 		{"synchronous_replication", PGC_USERSET, WAL_REPLICATION,
+ 			gettext_noop("Requests synchronous replication."),
+ 			NULL
+ 		},
+ 		&sync_rep_mode,
+ 		false, NULL, NULL
+ 	},
+ 	{
  		{"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS,
  			gettext_noop("Continues processing past damaged page headers."),
  			gettext_noop("Detection of a damaged page header normally causes PostgreSQL to "
***************
*** 2161,2166 **** static struct config_int ConfigureNamesInt[] =
--- 2170,2185 ----
  	},
  
  	{
+ 		{"sync_replication_timeout", PGC_USERSET, WAL_REPLICATION,
+ 			gettext_noop("Sets the maximum wait time for a response from synchronous replication."),
+ 			gettext_noop("A value of 0 turns off the timeout."),
+ 			GUC_UNIT_S
+ 		},
+ 		&sync_rep_timeout,
+ 		120, 0, INT_MAX, NULL, NULL
+ 	},
+ 
+ 	{
  		{"track_activity_query_size", PGC_POSTMASTER, RESOURCES_MEM,
  			gettext_noop("Sets the size reserved for pg_stat_activity.current_query, in bytes."),
  			NULL,
***************
*** 2717,2722 **** static struct config_string ConfigureNamesString[] =
--- 2736,2751 ----
  	},
  
  	{
+ 		{"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION,
+ 			gettext_noop("List of potential standby names to synchronise with."),
+ 			NULL,
+ 			GUC_LIST_INPUT
+ 		},
+ 		&SyncRepStandbyNames,
+ 		"*", NULL, NULL
+ 	},
+ 
+ 	{
  		{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
  			gettext_noop("Sets default text search configuration."),
  			NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 184,190 ****
  #archive_timeout = 0		# force a logfile segment switch after this
  				# number of seconds; 0 disables
  
! # - Streaming Replication -
  
  #max_wal_senders = 0		# max number of walsender processes
  				# (change requires restart)
--- 184,200 ----
  #archive_timeout = 0		# force a logfile segment switch after this
  				# number of seconds; 0 disables
  
! # - Replication - User Settings
! 
! #synchronous_replication = off		# does commit wait for reply from standby
! #replication_timeout_client = 120   # 0 means wait forever
! 
! # - Streaming Replication - Server Settings
! 
! #synchronous_standby_names = '*'	# standby servers that provide sync rep
! 				# comma-separated list of application_name from standby(s);
! 				# '*' = all (default)
! 
  
  #max_wal_senders = 0		# max number of walsender processes
  				# (change requires restart)
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 3078,3084 **** DATA(insert OID = 1936 (  pg_stat_get_backend_idset		PGNSP PGUID 12 1 100 0 f f
  DESCR("statistics: currently active backend IDs");
  DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active replication");
  DATA(insert OID = 2026 (  pg_backend_pid				PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
  DESCR("statistics: current backend PID");
--- 3078,3084 ----
  DESCR("statistics: currently active backend IDs");
  DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,25,23}" "{i,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_hostname,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active backends");
! DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25,23}" "{o,o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,replay_location,sync_priority}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
  DESCR("statistics: information about currently active replication");
  DATA(insert OID = 2026 (  pg_backend_pid				PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
  DESCR("statistics: current backend PID");
*** a/src/include/miscadmin.h
--- b/src/include/miscadmin.h
***************
*** 78,83 **** extern PGDLLIMPORT volatile uint32 CritSectionCount;
--- 78,86 ----
  /* in tcop/postgres.c */
  extern void ProcessInterrupts(void);
  
+ /* in replication/syncrep.c */
+ extern bool WaitingForSyncRep;
+ 
  #ifndef WIN32
  
  #define CHECK_FOR_INTERRUPTS() \
*** /dev/null
--- b/src/include/replication/syncrep.h
***************
*** 0 ****
--- 1,53 ----
+ /*-------------------------------------------------------------------------
+  *
+  * syncrep.h
+  *	  Exports from replication/syncrep.c.
+  *
+  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef _SYNCREP_H
+ #define _SYNCREP_H
+ 
+ #include "access/xlog.h"
+ #include "storage/proc.h"
+ #include "storage/shmem.h"
+ #include "storage/spin.h"
+ 
+ #define SyncRepRequested()				(sync_rep_mode)
+ 
+ /*
+  * Each synchronous rep queue lives in the WAL sender shmem area.
+  */
+ typedef struct SyncRepQueue
+ {
+ 	/*
+ 	 * Current location of the head of the queue. All waiters should have
+ 	 * a waitLSN that follows this value, or they are currently being woken
+ 	 * to remove themselves from the queue.
+ 	 */
+ 	XLogRecPtr	lsn;
+ 
+ 	PGPROC		*head;
+ 	PGPROC		*tail;
+ } SyncRepQueue;
+ 
+ /* user-settable parameters for synchronous replication */
+ extern bool sync_rep_mode;
+ extern int 	sync_rep_timeout;
+ extern char *SyncRepStandbyNames;
+ 
+ /* called by user backend */
+ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
+ 
+ /* callback at backend exit */
+ extern void SyncRepCleanupAtProcExit(int code, Datum arg);
+ 
+ /* called by wal sender */
+ extern void SyncRepInitConfig(void);
+ extern void SyncRepReleaseWaiters(void);
+ 
+ #endif   /* _SYNCREP_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 15,20 ****
--- 15,21 ----
  #include "access/xlog.h"
  #include "nodes/nodes.h"
  #include "storage/latch.h"
+ #include "replication/syncrep.h"
  #include "storage/spin.h"
  
  
***************
*** 52,62 **** typedef struct WalSnd
--- 53,77 ----
  	 * to do.
  	 */
  	Latch		latch;
+ 
+ 	/*
+ 	 * The priority order of the standby managed by this WALSender, as
+ 	 * listed in synchronous_standby_names, or 0 if not-listed.
+ 	 * Protected by SyncRepLock.
+ 	 */
+ 	 int	sync_standby_priority;
  } WalSnd;
  
+ extern WalSnd *MyWalSnd;
+ 
  /* There is one WalSndCtl struct for the whole database cluster */
  typedef struct
  {
+ 	/*
+ 	 * Synchronous replication queue, protected by SyncRepLock.
+ 	 */
+ 	SyncRepQueue	sync_rep_queue;			/* Proc queue, sorted by LSN */
+ 
  	WalSnd		walsnds[1];		/* VARIABLE LENGTH ARRAY */
  } WalSndCtlData;
  
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 78,83 **** typedef enum LWLockId
--- 78,84 ----
  	SerializableFinishedListLock,
  	SerializablePredicateLockListLock,
  	OldSerXidLock,
+ 	SyncRepLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 14,19 ****
--- 14,21 ----
  #ifndef _PROC_H_
  #define _PROC_H_
  
+ #include "access/xlog.h"
+ #include "storage/latch.h"
  #include "storage/lock.h"
  #include "storage/pg_sema.h"
  #include "utils/timestamp.h"
***************
*** 115,120 **** struct PGPROC
--- 117,126 ----
  	LOCKMASK	heldLocks;		/* bitmask for lock types already held on this
  								 * lock object by this backend */
  
+ 	/* Info to allow us to wait for synchronous replication, if needed. */
+ 	Latch		waitLatch;
+ 	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
+ 
  	/*
  	 * All PROCLOCK objects for locks held or awaited by this backend are
  	 * linked into one of these lists, according to the partition number of
*** a/src/test/regress/expected/rules.out
--- b/src/test/regress/expected/rules.out
***************
*** 1298,1304 **** SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
   pg_stat_bgwriter                | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
   pg_stat_database                | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
   pg_stat_database_conflicts      | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
!  pg_stat_replication             | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
   pg_stat_sys_indexes             | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
   pg_stat_sys_tables              | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
   pg_stat_user_functions          | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));
--- 1298,1304 ----
   pg_stat_bgwriter                | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
   pg_stat_database                | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
   pg_stat_database_conflicts      | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
!  pg_stat_replication             | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_hostname, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.replay_location, w.sync_priority FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_hostname, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, replay_location, sync_priority) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
   pg_stat_sys_indexes             | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
   pg_stat_sys_tables              | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
   pg_stat_user_functions          | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));
