Synchronous replication
Hi,
The attached patch provides core of synchronous replication feature
based on streaming replication. I added this patch into CF 2010-07.
The code is also available in my git repository:
git://git.postgresql.org/git/users/fujii/postgres.git
branch: synchrep
Synchronization levels
----------------------
The patch provides replication_mode parameter in recovery.conf, which
specifies the replication mode which can control how long transaction
commit on the master server waits for replication before the command
returns a "success" indication to the client. Valid modes are:
1. async
doesn't make transaction commit wait for replication, i.e.,
asynchronous replication. This mode has been already supported in
9.0.
2. recv
makes transaction commit wait until the standby has received WAL
records.
3. fsync
makes transaction commit wait until the standby has received and
flushed WAL records to disk
4. replay
makes transaction commit wait until the standby has replayed WAL
records after receiving and flushing them to disk
You can choose the synchronization level per standby.
Quorum commit
-------------
In previous discussion about synchronous replication, some people
wanted the quorum commit feature. This feature is included in also
Zontan's synchronous replication patch, so I decided to create it.
The patch provides quorum parameter in postgresql.conf, which
specifies how many standby servers transaction commit will wait for
WAL records to be replicated to, before the command returns a
"success" indication to the client. The default value is zero, which
always doesn't make transaction commit wait for replication without
regard to replication_mode. Also transaction commit always doesn't
wait for replication to asynchronous standby (i.e., replication_mode
is set to async) without regard to this parameter. If quorum is more
than the number of synchronous standbys, transaction commit returns
a "success" when the ACK has arrived from all of synchronous standbys.
Currently quorum parameter is defined as PGC_USERSET. You can have
some transactions replicate synchronously and others asynchronously.
Protocol
--------
I extended the handshake message "START_REPLICATION" so that it
includes replication_mode read from recovery.conf. If 'async' is
passed, the master thinks that it doesn't need to wait for the ACK
from the standby.
I added XLogRecPtr message, which is used to send the ACK meaning
completion of replication from walreceiver to walsender. If
replication_mode = 'async', this message is never sent. XLogRecPtr
message always includes the current receive location if mode is 'recv',
the current flush location if mode is 'fsync' and the current replay
location if mode is 'replay'.
Then, if the location in the ACK is more than or equal to the
location of the COMMIT record, transaction breaks out of the wait-loop
and returns a "success" to the client.
TODO
----
The patch have no features for performance improvement of synchronous
replication. I admit that currently the performance overhead in the
master is terrible. We need to address the following TODO items in the
subsequent CF.
* Change the poll loop in the walsender
* Change the poll loop in the backend
* Change the poll loop in the startup process
* Change the poll loop in the walreceiver
* Perform the WAL write and replication concurrently
* Send WAL from not only disk but also WAL buffers
For the case where the network outage happens or the standby fails, we
should expose the maximum time to wait for replication, as a parameter.
Furthermore you might want to specify the reaction to the timeout. These
are also not in the patch, so we need to address them in the subsequent
CF, too.
In synchronous replication, it's important to check whether the standby
has been sync with the master. But such a monitoring feature is also not
in the patch. That's TODO.
It would be difficult to commit whole of synchronous replication feature
at one time. I'm planning to develop it by stages.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
synch_rep_0714.patchapplication/octet-stream; name=synch_rep_0714.patchDownload
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1932,1937 **** SET ENABLE_SEQSCAN TO OFF;
--- 1932,1962 ----
</listitem>
</varlistentry>
+ <varlistentry id="guc-quorum" xreflabel="quorum">
+ <term><varname>quorum</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>quorum</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies how many standby servers transaction commit will wait
+ for WAL records to be replicated to, before the command returns
+ a <quote>success</> indication to the client. The default value
+ is zero, which always doesn't make transaction commit wait for
+ replication without regard to <xref linkend="replication-mode">.
+ Also transaction commit always doesn't wait for replication to
+ asynchronous standby (i.e., its <varname>replication_mode</> is
+ set to <literal>async</>) without regard to this parameter.
+ </para>
+ <para>
+ This parameter can be changed at any time; the behavior for any
+ one transaction is determined by the setting in effect when it
+ commits. It is therefore possible, and useful, to have some
+ transactions replicate synchronously and others asynchronously.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
<term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)</term>
<indexterm>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 148,161 **** protocol to make nodes agree on a serializable transactional order.
stream of write-ahead log (<acronym>WAL</>)
records. If the main server fails, the standby contains
almost all of the data of the main server, and can be quickly
! made the new master database server. This is asynchronous and
! can only be done for the entire database server.
</para>
<para>
A PITR standby server can be implemented using file-based log shipping
(<xref linkend="warm-standby">) or streaming replication (see
! <xref linkend="streaming-replication">), or a combination of both. For
! information on hot standby, see <xref linkend="hot-standby">.
</para>
</listitem>
</varlistentry>
--- 148,163 ----
stream of write-ahead log (<acronym>WAL</>)
records. If the main server fails, the standby contains
almost all of the data of the main server, and can be quickly
! made the new master database server. This can only be done for
! the entire database server.
</para>
<para>
A PITR standby server can be implemented using file-based log shipping
(<xref linkend="warm-standby">) or streaming replication (see
! <xref linkend="streaming-replication">), or a combination of both.
! While file-based log shipping is asynchronous, synchronization mode can
! be chosen in streaming replication (see <xref linkend="replication-mode">).
! For information on hot standby, see <xref linkend="hot-standby">.
</para>
</listitem>
</varlistentry>
***************
*** 348,354 **** protocol to make nodes agree on a serializable transactional order.
<entry>No master server overhead</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">•</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
--- 350,356 ----
<entry>No master server overhead</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">Optional</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
***************
*** 359,365 **** protocol to make nodes agree on a serializable transactional order.
<entry>No waiting for multiple servers</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">•</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
--- 361,367 ----
<entry>No waiting for multiple servers</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">Optional</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
***************
*** 370,376 **** protocol to make nodes agree on a serializable transactional order.
<entry>Master failure will never lose data</entry>
<entry align="center">•</entry>
<entry align="center">•</entry>
! <entry align="center"></entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
--- 372,378 ----
<entry>Master failure will never lose data</entry>
<entry align="center">•</entry>
<entry align="center">•</entry>
! <entry align="center">Optional</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1337,1347 **** The commands accepted in walsender mode are:
</varlistentry>
<varlistentry>
! <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
! WAL position <replaceable>XXX</>/<replaceable>XXX</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
--- 1337,1348 ----
</varlistentry>
<varlistentry>
! <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</> <replaceable>N</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
! WAL position <replaceable>XXX</>/<replaceable>XXX</>
! with the replication mode <replaceable>N</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
***************
*** 1360,1365 **** The commands accepted in walsender mode are:
--- 1361,1401 ----
<variablelist>
<varlistentry>
<term>
+ XLogRecPtr (F)
+ </term>
+ <listitem>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ Byte1('l')
+ </term>
+ <listitem>
+ <para>
+ Identifies the message as an acknowledgment of replication.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte8
+ </term>
+ <listitem>
+ <para>
+ The end of the WAL data replicated to the standby, given in
+ XLogRecPtr format.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry>
+ <term>
XLogData (B)
</term>
<listitem>
*** a/doc/src/sgml/recovery-config.sgml
--- b/doc/src/sgml/recovery-config.sgml
***************
*** 280,285 **** restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
--- 280,343 ----
</para>
</listitem>
</varlistentry>
+ <varlistentry id="replication-mode" xreflabel="replication_mode">
+ <term><varname>replication_mode</varname> (<type>string</type>)</term>
+ <indexterm>
+ <primary><varname>replication_mode</> recovery parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies the replication mode which can control how long transaction
+ commit on the master server waits for replication before the command
+ returns a <quote>success</> indication to the client. Valid modes are:
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>async</> (doesn't make transaction commit wait for replication,
+ i.e., asynchronous replication)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>recv</> (makes transaction commit wait until the standby has
+ received WAL records)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>fsync</> (makes transaction commit wait until the standby has
+ received and flushed WAL records to disk)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>replay</> (makes transaction commit wait until the standby has
+ replayed WAL records after receiving and flushing them to disk)
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+ <para>
+ In asynchronous replication, there can be a delay between when a
+ <quote>success</> is reported to the client and when the transaction
+ is really guaranteed to be safe against a failover. Setting this
+ parameter to <literal>async</> does not create any risk of database
+ inconsistency: a crash at the master server might result in some recent
+ allegedly-committed transactions being lost at the standby server,
+ but the database state of the standby will be just the same as if
+ those transactions had been aborted cleanly. So, turning
+ <varname>replication_mode</> <literal>async</> can be a useful
+ alternative when performance is more important than exact certainty
+ about the durability of a transaction.
+ The default setting is <literal>async</>.
+ </para>
+ <para>
+ If <xref linkend="guc-quorum"> is set to zero in the master server,
+ transaction commit always doesn't wait for replication without regard
+ to this parameter.
+ </para>
+ </listitem>
+ </varlistentry>
<varlistentry id="trigger-file" xreflabel="trigger_file">
<term><varname>trigger_file</varname> (<type>string</type>)</term>
<indexterm>
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 100,105 ****
--- 100,110 ----
#primary_conninfo = '' # e.g. 'host=localhost port=5432'
#
#
+ # Specifies the synchronization mode of replication.
+ #
+ #replication_mode = 'async' # 'async', 'recv', 'fsync' or 'replay'
+ #
+ #
# By default, a standby server keeps streaming XLOG records from the
# primary indefinitely. If you want to stop streaming and finish recovery,
# opening up the system in read/write mode, specify path to a trigger file.
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 36,41 ****
--- 36,42 ----
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1088,1093 **** RecordTransactionCommit(void)
--- 1089,1106 ----
/* Compute latestXid while we have the child XIDs handy */
latestXid = TransactionIdLatest(xid, nchildren, children);
+ /*
+ * Wait for WAL to be replicated up to the COMMIT record if replication
+ * is enabled and quorum > 0. This operation has to be performed after
+ * the COMMIT record is generated and before other transactions know that
+ * this one has been committed.
+ *
+ * XXX: Since the caller prevents cancel/die interrupt, we cannot
+ * process that while waiting. Should we remove this restriction?
+ */
+ if (max_wal_senders > 0 && quorum > 0)
+ WaitXLogSend(XactLastRecEnd);
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd.xrecoff = 0;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 189,194 **** static TimestampTz recoveryTargetTime;
--- 189,195 ----
static bool StandbyMode = false;
static char *PrimaryConnInfo = NULL;
static char *TriggerFile = NULL;
+ int rplMode = REPLICATION_MODE_ASYNC;
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
static TransactionId recoveryStopXid;
***************
*** 5258,5263 **** readRecoveryCommandFile(void)
--- 5259,5282 ----
(errmsg("trigger_file = '%s'",
TriggerFile)));
}
+ else if (strcmp(tok1, "replication_mode") == 0)
+ {
+ if (strcmp(tok2, "async") == 0)
+ rplMode = REPLICATION_MODE_ASYNC;
+ else if (strcmp(tok2, "recv") == 0)
+ rplMode = REPLICATION_MODE_RECV;
+ else if (strcmp(tok2, "fsync") == 0)
+ rplMode = REPLICATION_MODE_FSYNC;
+ else if (strcmp(tok2, "replay") == 0)
+ rplMode = REPLICATION_MODE_REPLAY;
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for parameter \"replication_mode\": \"%s\"",
+ tok2)));
+ ereport(DEBUG2,
+ (errmsg("replication_mode = '%s'", tok2)));
+ }
else
ereport(FATAL,
(errmsg("unrecognized recovery parameter \"%s\"",
***************
*** 6867,6872 **** GetFlushRecPtr(void)
--- 6886,6908 ----
}
/*
+ * GetReplayRecPtr -- Returns the last replay position.
+ */
+ XLogRecPtr
+ GetReplayRecPtr(void)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ recptr = xlogctl->recoveryLastRecPtr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return recptr;
+ }
+
+ /*
* Get the time of the last xlog segment switch
*/
pg_time_t
***************
*** 8824,8837 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! SpinLockAcquire(&xlogctl->info_lck);
! recptr = xlogctl->recoveryLastRecPtr;
! SpinLockRelease(&xlogctl->info_lck);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
--- 8860,8869 ----
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! recptr = GetReplayRecPtr();
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
***************
*** 9463,9469 **** retry:
{
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
! PrimaryConnInfo);
continue;
}
}
--- 9495,9501 ----
{
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
! PrimaryConnInfo, rplMode);
continue;
}
}
*** a/src/backend/libpq/be-secure.c
--- b/src/backend/libpq/be-secure.c
***************
*** 71,76 ****
--- 71,86 ----
#endif
#endif /* USE_SSL */
+ #ifdef HAVE_POLL_H
+ #include <poll.h>
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include <sys/poll.h>
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include <sys/select.h>
+ #endif
+
#include "libpq/libpq.h"
#include "tcop/tcopprot.h"
***************
*** 397,402 **** wloop:
--- 407,472 ----
return n;
}
+ /*
+ * Checks a socket, using poll or select, for data to be read.
+ * Returns >0 if there is data to read, 0 if it timed out, -1
+ * if an error occurred (including the interrupt).
+ *
+ * Timeout is specified in millisec. Timeout is infinite if
+ * timeout_ms is negative. Timeout is immediate (no blocking)
+ * if timeout_ms is 0.
+ *
+ * If SSL is in use, the SSL buffer is checked prior to
+ * checking the socket for read data directly.
+ *
+ * This function is based on pqSocketCheck and pqSocketPoll.
+ */
+ int
+ secure_poll(Port *port, int timeout_ms)
+ {
+ #ifdef USE_SSL
+ /* Check for SSL library buffering read bytes */
+ if (port->ssl && SSL_pending(port->ssl) > 0)
+ {
+ /* short-circuit the select */
+ return 1;
+ }
+ #endif
+
+ {
+ /* We use poll(2) if available, otherwise select(2) */
+ #ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = port->sock;
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ return poll(&input_fd, 1, timeout_ms);
+ #else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+
+ FD_ZERO(&input_mask);
+ FD_SET(port->sock, &input_mask);
+
+ if (timeout_ms < 0)
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
+ }
+
+ return select(port->sock + 1, &input_mask,
+ NULL, NULL, ptr_timeout);
+ #endif /* HAVE_POLL */
+ }
+ }
+
/* ------------------------------------------------------------ */
/* SSL specific code */
/* ------------------------------------------------------------ */
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,62 ----
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_getbyte_if_available - get a byte if available without blocking
+ * pq_wait - wait until we can read connection
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
***************
*** 911,916 **** pq_getbyte_if_available(unsigned char *c)
--- 912,959 ----
}
/* --------------------------------
+ * pq_wait - wait until we can read the connection socket.
+ *
+ * returns >0 if there is data to read, 0 if it timed out or
+ * interrupted, -1 if an error occurred.
+ *
+ * this function is based on pqSocketCheck.
+ * --------------------------------
+ */
+ int
+ pq_wait(int timeout_ms)
+ {
+ int result;
+
+ if (!MyProcPort)
+ return -1;
+ if (MyProcPort->sock < 0)
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+ return -1;
+ }
+
+ result = secure_poll(MyProcPort, timeout_ms);
+ if (result < 0)
+ {
+ if (errno == EINTR)
+ return 0; /* interrupted */
+
+ /*
+ * XXX: Should we suppress duplicate log messages also here,
+ * like internal_flush?
+ */
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ }
+
+ return result;
+ }
+
+ /* --------------------------------
* pq_getbytes - get a known number of bytes from connection
*
* returns 0 if OK, EOF if trouble
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 47,55 **** static bool justconnected = false;
static char *recvBuf = NULL;
/* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
--- 47,57 ----
static char *recvBuf = NULL;
/* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint,
! int mode);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
walrcv_disconnect = libpqrcv_disconnect;
}
--- 66,76 ----
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
+ walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
***************
*** 75,81 **** _PG_init(void)
* Establish the connection to the primary server for XLOG streaming
*/
static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
{
char conninfo_repl[MAXCONNINFO + 37];
char *primary_sysid;
--- 78,84 ----
* Establish the connection to the primary server for XLOG streaming
*/
static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, int mode)
{
char conninfo_repl[MAXCONNINFO + 37];
char *primary_sysid;
***************
*** 154,161 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
! startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
--- 157,164 ----
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X %d",
! startpoint.xlogid, startpoint.xrecoff, mode);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 401,418 ----
return true;
}
+
+ /*
+ * Send a message to XLOG stream.
+ *
+ * ereports on error.
+ */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send data to WAL stream: %s",
+ PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool am_walreceiver;
--- 57,63 ----
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
***************
*** 113,118 **** static void WalRcvDie(int code, Datum arg);
--- 114,120 ----
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
+ static void XLogWalRcvSendRecPtr(XLogRecPtr recptr);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
***************
*** 159,164 **** WalReceiverMain(void)
--- 161,167 ----
{
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
+ XLogRecPtr ackedpoint = {0, 0};
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
***************
*** 206,211 **** WalReceiverMain(void)
--- 209,215 ----
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ rplMode = walrcv->rplMode;
startpoint = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
***************
*** 247,253 **** WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
--- 251,257 ----
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
***************
*** 261,267 **** WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
! walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
--- 265,271 ----
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
! walrcv_connect(conninfo, startpoint, rplMode);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
***************
*** 311,316 **** WalReceiverMain(void)
--- 315,339 ----
*/
XLogWalRcvFlush();
}
+
+ /*
+ * If replication_mode is "replay", send the last WAL replay location
+ * to the primary, to acknowledge that replication has been completed
+ * up to that. This occurs only when WAL records were replayed since
+ * the last acknowledgement.
+ */
+ if (rplMode == REPLICATION_MODE_REPLAY &&
+ XLByteLT(ackedpoint, LogstreamResult.Flush))
+ {
+ XLogRecPtr recptr;
+
+ recptr = GetReplayRecPtr();
+ if (XLByteLT(ackedpoint, recptr))
+ {
+ XLogWalRcvSendRecPtr(recptr);
+ ackedpoint = recptr;
+ }
+ }
}
}
***************
*** 406,411 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
--- 429,447 ----
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
+ /*
+ * If replication_mode is "recv", send the last WAL receive
+ * location to the primary, to acknowledge that replication
+ * has been completed up to that.
+ */
+ if (rplMode == REPLICATION_MODE_RECV)
+ {
+ XLogRecPtr endptr = msghdr.dataStart;
+
+ XLByteAdvance(endptr, len);
+ XLogWalRcvSendRecPtr(endptr);
+ }
+
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
***************
*** 523,528 **** XLogWalRcvFlush(void)
--- 559,572 ----
LogstreamResult.Flush = LogstreamResult.Write;
+ /*
+ * If replication_mode is "fsync", send the last WAL flush
+ * location to the primary, to acknowledge that replication
+ * has been completed up to that.
+ */
+ if (rplMode == REPLICATION_MODE_FSYNC)
+ XLogWalRcvSendRecPtr(LogstreamResult.Flush);
+
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
walrcv->latestChunkStart = walrcv->receivedUpto;
***************
*** 541,543 **** XLogWalRcvFlush(void)
--- 585,608 ----
}
}
}
+
+ /* Send the lsn to the primary server */
+ static void
+ XLogWalRcvSendRecPtr(XLogRecPtr recptr)
+ {
+ static char *msgbuf = NULL;
+ WalAckMessageData msgdata;
+
+ /*
+ * Allocate buffer that will be used for each output message if first
+ * time through. We do this just once to reduce palloc overhead.
+ * The buffer must be made large enough for maximum-sized messages.
+ */
+ if (msgbuf == NULL)
+ msgbuf = palloc(1 + sizeof(WalAckMessageData));
+
+ msgbuf[0] = 'l';
+ msgdata.ackEnd = recptr;
+ memcpy(msgbuf + 1, &msgdata, sizeof(WalAckMessageData));
+ walrcv_send(msgbuf, 1 + sizeof(WalAckMessageData));
+ }
*** a/src/backend/replication/walreceiverfuncs.c
--- b/src/backend/replication/walreceiverfuncs.c
***************
*** 168,178 **** ShutdownWalRcv(void)
/*
* Request postmaster to start walreceiver.
*
! * recptr indicates the position where streaming should begin, and conninfo
! * is a libpq connection string to use.
*/
void
! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
--- 168,178 ----
/*
* Request postmaster to start walreceiver.
*
! * recptr indicates the position where streaming should begin, conninfo is
! * a libpq connection string to use, and mode is a replication mode.
*/
void
! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo, int mode)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
***************
*** 196,201 **** RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
--- 196,202 ----
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
+ walrcv->rplMode = mode;
walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 65,73 **** bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
! #define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles
! * (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
--- 65,73 ----
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
+ int quorum = 0; /* the maximum number of synchronous walsenders */
! #define NAPTIME_PER_CYCLE 100L /* max sleep time between cycles (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 84,89 **** static uint32 sendOff = 0;
--- 84,96 ----
*/
static XLogRecPtr sentPtr = {0, 0};
+ /*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+ static XLogRecPtr ackdPtr = {0, 0};
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
***************
*** 101,107 **** static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
! static void CheckClosedConnection(void);
/* Main entry point for walsender process */
--- 108,114 ----
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
! static void ProcessStreamMsgs(StringInfo inMsg);
/* Main entry point for walsender process */
***************
*** 255,262 **** WalSndHandshake(void)
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
! else if (sscanf(query_string, "START_REPLICATION %X/%X",
! &recptr.xlogid, &recptr.xrecoff) == 2)
{
StringInfoData buf;
--- 262,269 ----
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
! else if (sscanf(query_string, "START_REPLICATION %X/%X %d",
! &recptr.xlogid, &recptr.xrecoff, &rplMode) == 3)
{
StringInfoData buf;
***************
*** 277,282 **** WalSndHandshake(void)
--- 284,318 ----
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
+ /* Verify that the specified replication mode is valid */
+ switch (rplMode)
+ {
+ case REPLICATION_MODE_ASYNC:
+ break;
+ case REPLICATION_MODE_RECV:
+ case REPLICATION_MODE_FSYNC:
+ case REPLICATION_MODE_REPLAY:
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+
+ /*
+ * Update the current number of synchronous standbys
+ * if replication mode is "synchronous"
+ */
+ SpinLockAcquire(&walsndctl->info_lck);
+ walsndctl->num_sync_sbys++;
+ Assert(walsndctl->num_sync_sbys > 0);
+ SpinLockRelease(&walsndctl->info_lck);
+ break;
+ }
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication mode: %d", rplMode)));
+ }
+ MyWalSnd->rplMode = rplMode;
+
/* Send a CopyOutResponse message, and start streaming */
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
***************
*** 285,294 **** WalSndHandshake(void)
pq_flush();
/*
! * Initialize position to the received one, then the
* xlog records begin to be shipped from that position
*/
! sentPtr = recptr;
/* break out of the loop */
replication_started = true;
--- 321,330 ----
pq_flush();
/*
! * Initialize positions to the received one, then the
* xlog records begin to be shipped from that position
*/
! sentPtr = ackdPtr = recptr;
/* break out of the loop */
replication_started = true;
***************
*** 322,364 **** WalSndHandshake(void)
}
/*
! * Check if the remote end has closed the connection.
*/
static void
! CheckClosedConnection(void)
{
! unsigned char firstchar;
! int r;
! r = pq_getbyte_if_available(&firstchar);
! if (r < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (r == 0)
{
! /* no data available without blocking */
! return;
! }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
/*
* 'X' means that the standby is closing down the socket.
*/
! case 'X':
! proc_exit(0);
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid standby closing message type %d",
! firstchar)));
}
}
--- 358,466 ----
}
/*
! * Process messages received from the standby.
! *
! * ereports on error.
*/
static void
! ProcessStreamMsgs(StringInfo inMsg)
{
! bool acked = false;
! /* Loop to process successive complete messages available */
! for (;;)
{
! unsigned char firstchar;
! int r;
!
! r = pq_getbyte_if_available(&firstchar);
! if (r < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (r == 0)
! {
! /* no data available without blocking */
! break;
! }
!
! /* Handle the very limited subset of commands expected in this phase */
! switch (firstchar)
! {
! case 'd': /* CopyData message */
! {
! unsigned char rpltype;
!
! /*
! * Read the message contents. This is expected to be done without
! * blocking because we've been able to get message type code.
! */
! if (pq_getmessage(inMsg, 0))
! proc_exit(0); /* suitable message already logged */
!
! /* Read the replication message type from CopyData message */
! rpltype = pq_getmsgbyte(inMsg);
! switch (rpltype)
! {
! case 'l':
! {
! WalAckMessageData *msgdata;
!
! msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
!
! /*
! * Update local status.
! *
! * The ackd ptr received from standby should not
! * go backwards.
! */
! if (XLByteLE(ackdPtr, msgdata->ackEnd))
! ackdPtr = msgdata->ackEnd;
! else
! ereport(FATAL,
! (errmsg("replication completion location went back from "
! "%X/%X to %X/%X",
! ackdPtr.xlogid, ackdPtr.xrecoff,
! msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
!
! acked = true; /* also need to update shared position */
! break;
! }
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid replication message type %d",
! rpltype)));
! }
! break;
! }
/*
* 'X' means that the standby is closing down the socket.
*/
! case 'X':
! proc_exit(0);
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid standby closing message type %d",
! firstchar)));
! }
! }
!
! if (acked)
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = MyWalSnd;
!
! SpinLockAcquire(&walsnd->mutex);
! walsnd->ackdPtr = ackdPtr;
! SpinLockRelease(&walsnd->mutex);
}
}
***************
*** 366,374 **** CheckClosedConnection(void)
--- 468,479 ----
static int
WalSndLoop(void)
{
+ StringInfoData input_message;
char *output_message;
bool caughtup = false;
+ initStringInfo(&input_message);
+
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
***************
*** 428,443 **** WalSndLoop(void)
*/
if (caughtup)
{
! remain = WalSndDelay * 1000L;
while (remain > 0)
{
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
! /* Sleep and check that the connection is still alive */
! pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! CheckClosedConnection();
remain -= NAPTIME_PER_CYCLE;
}
--- 533,562 ----
*/
if (caughtup)
{
! remain = WalSndDelay;
while (remain > 0)
{
+ int res;
+
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
! /*
! * Check to see whether a message from the standby or an interrupt
! * from other processes has arrived.
! */
! res = pq_wait(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! if (res < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (res > 0)
! ProcessStreamMsgs(&input_message);
remain -= NAPTIME_PER_CYCLE;
}
***************
*** 496,501 **** InitWalSnd(void)
--- 615,621 ----
MyWalSnd = (WalSnd *) walsnd;
walsnd->pid = MyProcPid;
MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&MyWalSnd->ackdPtr, 0, sizeof(XLogRecPtr));
SpinLockRelease(&walsnd->mutex);
break;
}
***************
*** 523,528 **** WalSndKill(int code, Datum arg)
--- 643,663 ----
*/
MyWalSnd->pid = 0;
+ /*
+ * Update the current number of synchronous standbys if replication
+ * mode is "synchronous"
+ */
+ if (rplMode >= REPLICATION_MODE_RECV)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+
+ SpinLockAcquire(&walsndctl->info_lck);
+ Assert(walsndctl->num_sync_sbys > 0);
+ walsndctl->num_sync_sbys--;
+ SpinLockRelease(&walsndctl->info_lck);
+ }
+
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
}
***************
*** 884,889 **** WalSndShmemInit(void)
--- 1019,1025 ----
{
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
+ SpinLockInit(&WalSndCtl->info_lck);
for (i = 0; i < max_wal_senders; i++)
{
***************
*** 895,937 **** WalSndShmemInit(void)
}
/*
! * This isn't currently used for anything. Monitoring tools might be
! * interested in the future, and we'll need something like this in the
! * future for synchronous replication.
*/
! #ifdef NOT_USED
! /*
! * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
! * if none.
! */
! XLogRecPtr
! GetOldestWALSendPointer(void)
{
! XLogRecPtr oldest = {0, 0};
! int i;
! bool found = false;
! for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! XLogRecPtr recptr;
! if (walsnd->pid == 0)
! continue;
! SpinLockAcquire(&walsnd->mutex);
! recptr = walsnd->sentPtr;
! SpinLockRelease(&walsnd->mutex);
! if (recptr.xlogid == 0 && recptr.xrecoff == 0)
! continue;
! if (!found || XLByteLT(recptr, oldest))
! oldest = recptr;
! found = true;
}
- return oldest;
}
-
- #endif
--- 1031,1102 ----
}
/*
! * Ensure that all xlog records through the given position is
! * replicated to the standby
*/
! void
! WaitXLogSend(XLogRecPtr record)
{
! Assert(max_wal_senders > 0);
! Assert(quorum > 0);
! for (;;)
{
/* use volatile pointer to prevent code rearrangement */
! volatile WalSndCtlData *walsndctl = WalSndCtl;
! int i;
! int already_acked = 0;
! bool unacked = false;
! /* Don't need to wait if there is no synchronous standbys */
! if (walsndctl->num_sync_sbys == 0)
! return;
! /*
! * Count walsenders which have already received the ACK meaning
! * completion of replication up to the given position. If the
! * sum is more than or equal to the "quorum", the backend breaks
! * out of this loop and returns a "success" of the transaction
! * to a client.
! */
! for (i = 0; i < max_wal_senders; i++)
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! XLogRecPtr recptr;
! /* Don't count inactive or asynchronous walsenders */
! if (walsnd->pid == 0 ||
! walsnd->rplMode == REPLICATION_MODE_ASYNC)
! continue;
! SpinLockAcquire(&walsnd->mutex);
! recptr = walsnd->ackdPtr;
! SpinLockRelease(&walsnd->mutex);
!
! if ((recptr.xlogid == 0 && recptr.xrecoff == 0) ||
! XLByteLT(recptr, record))
! {
! unacked = true;
! continue;
! }
!
! if (++already_acked >= quorum)
! return;
! }
!
! /*
! * If synchronous walsender was not found in the WalSnd array,
! * we no longer need to wait. This can happen if all synchronous
! * walsenders are terminated while searching the array.
! *
! * If all synchronous walsenders have already received the ACK,
! * we no longer need to wait, too. This can happen when the
! * "quorum" is more than max_wal_senders.
! */
! if (!unacked)
! return;
!
! pg_usleep(100000L); /* 100ms */
}
}
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1756,1761 **** static struct config_int ConfigureNamesInt[] =
--- 1756,1770 ----
},
{
+ {"quorum", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Sets the maximum number of synchronous standby servers."),
+ NULL
+ },
+ &quorum,
+ 0, 0, INT_MAX / 4, NULL, NULL
+ },
+
+ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
"flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 188,193 ****
--- 188,194 ----
#max_wal_senders = 0 # max number of walsender processes
#wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
+ #quorum = 0 # max number of synchronous standbys
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
# - Standby Servers -
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 261,266 **** typedef struct CheckpointStatsData
--- 261,299 ----
extern CheckpointStatsData CheckpointStats;
+ /*
+ * Synchronization mode of replication. These modes identify how far
+ * we should wait for replication.
+ */
+ typedef enum
+ {
+ /*
+ * doesn't make transaction commit wait for replication, i.e.,
+ * asynchronous replication.
+ */
+ REPLICATION_MODE_ASYNC,
+
+ /*
+ * makes transaction commit wait for XLOG records to be received
+ * on the standby server.
+ */
+ REPLICATION_MODE_RECV,
+
+ /*
+ * makes transaction commit wait for XLOG records to be received
+ * and fsync'd on the standby server.
+ */
+ REPLICATION_MODE_FSYNC,
+
+ /*
+ * makes transaction commit wait for XLOG records to be received,
+ * fsync'd and replayed on the standby server.
+ */
+ REPLICATION_MODE_REPLAY
+ } ReplicationMode;
+ extern int rplMode;
+
+
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
extern void XLogBackgroundFlush(void);
***************
*** 298,303 **** extern void XLogPutNextOid(Oid nextOid);
--- 331,337 ----
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
+ extern XLogRecPtr GetReplayRecPtr(void);
extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
extern TimeLineID GetRecoveryTargetTLI(void);
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 58,63 **** extern int pq_getmessage(StringInfo s, int maxlen);
--- 58,64 ----
extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
+ extern int pq_wait(int timeout_ms);
extern int pq_putbytes(const char *s, size_t len);
extern int pq_flush(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
***************
*** 74,78 **** extern int secure_open_server(Port *port);
--- 75,80 ----
extern void secure_close(Port *port);
extern ssize_t secure_read(Port *port, void *ptr, size_t len);
extern ssize_t secure_write(Port *port, void *ptr, size_t len);
+ extern int secure_poll(Port *port, int timeout_ms);
#endif /* LIBPQ_H */
*** a/src/include/replication/walprotocol.h
--- b/src/include/replication/walprotocol.h
***************
*** 50,53 **** typedef struct
--- 50,63 ----
*/
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
+ /*
+ * Body for a WAL acknowledgment message (message type 'l'). This is wrapped
+ * within a CopyData message at the FE/BE protocol level.
+ */
+ typedef struct
+ {
+ /* End of WAL replicated to the standby */
+ XLogRecPtr ackEnd;
+ } WalAckMessageData;
+
#endif /* _WALPROTOCOL_H */
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 71,89 **** typedef struct
*/
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
--- 71,99 ----
*/
char conninfo[MAXCONNINFO];
+ /*
+ * replication mode; controls how long transaction commit on the primary
+ * server waits for replication.
+ */
+ int rplMode;
+
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint,
! int mode);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
***************
*** 93,99 **** extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
--- 103,110 ----
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo,
! int mode);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 22,27 **** typedef struct WalSnd
--- 22,30 ----
{
pid_t pid; /* this walsender's process id, or 0 */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ XLogRecPtr ackdPtr; /* WAL has been replicated up to this point */
+
+ int rplMode; /* replication mode */
slock_t mutex; /* locks shared variables shown above */
} WalSnd;
***************
*** 29,34 **** typedef struct WalSnd
--- 32,41 ----
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
+ int num_sync_sbys; /* current # of synchronous standbys */
+
+ slock_t info_lck; /* protects the variable shown above */
+
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
***************
*** 40,49 **** extern bool am_walsender;
--- 47,58 ----
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
+ extern int quorum;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
+ extern void WaitXLogSend(XLogRecPtr recptr);
#endif /* _WALSENDER_H */
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 2002,2007 **** PQnotifies(PGconn *conn)
--- 2002,2010 ----
/*
* PQputCopyData - send some data to the backend during COPY IN
*
+ * Note that this function might be called by walreceiver even during
+ * COPY OUT to send a message to XLOG stream.
+ *
* Returns 1 if successful, 0 if data could not be sent (only possible
* in nonblock mode), or -1 if an error occurs.
*/
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
--- 2013,2020 ----
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN &&
! conn->asyncStatus != PGASYNC_COPY_OUT)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
On Wed, Jul 14, 2010 at 2:50 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
The patch have no features for performance improvement of synchronous
replication. I admit that currently the performance overhead in the
master is terrible. We need to address the following TODO items in the
subsequent CF.* Change the poll loop in the walsender
* Change the poll loop in the backend
* Change the poll loop in the startup process
* Change the poll loop in the walreceiver
* Perform the WAL write and replication concurrently
* Send WAL from not only disk but also WAL buffers
I have a feeling that if we don't have a design for these last two
before we start committing things, we're possibly going to regret it
later.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On Thu, Jul 15, 2010 at 12:16 AM, Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, Jul 14, 2010 at 2:50 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
The patch have no features for performance improvement of synchronous
replication. I admit that currently the performance overhead in the
master is terrible. We need to address the following TODO items in the
subsequent CF.* Change the poll loop in the walsender
* Change the poll loop in the backend
* Change the poll loop in the startup process
* Change the poll loop in the walreceiver
* Perform the WAL write and replication concurrently
* Send WAL from not only disk but also WAL buffersI have a feeling that if we don't have a design for these last two
before we start committing things, we're possibly going to regret it
later.
Yeah, I'll give it a try.
The problem is that the standby can apply the non-fsync'd WAL on the
master. So if we allow walsender to send the non-fsync'd WAL, we should
make walsender send also the current fsync location and prevent the
standby from applying the newer WAL than the fsync location.
New message type for sending the fsync location would be required in
Streaming Replication Protocol. But sometimes it might go along with
XLogData message.
After the master crashes and walreceiver is terminated, currently the
standby attempts to replay the WAL in the pg_xlog and the archive.
Since WAL in the archive is guaranteed to have already been fsync'd by
the master, it's not problem for the standby to apply that WAL. OTOH,
WAL records in pg_xlog directory might not exist in the crashed master.
So we should always prevent the standby from applying any WAL in pg_xlog
unless walreceiver is in progress. That is, if there is no WAL available
in the archive, the standby ignores pg_xlog and starts walreceiver
process to request for WAL streaming.
This idea is a little inefficient because the already-sent WAL might
be sent again when the master is restarted. But since this ensures
that the standby will not apply the non-fsync'd WAL on the master,
it's quite safe.
What about this idea?
This idea doesn't conflict with the patch I submitted for CF 2010-07.
So please feel free to review the patch :) But if you think that the
patch is not reviewable until that idea has been implemented, I'll
try to implement that ASAP.
PS. Probably I cannot reply to the mail until July 21. Sorry.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On 16/07/10 10:40, Fujii Masao wrote:
So we should always prevent the standby from applying any WAL in pg_xlog
unless walreceiver is in progress. That is, if there is no WAL available
in the archive, the standby ignores pg_xlog and starts walreceiver
process to request for WAL streaming.
That completely defeats the purpose of storing streamed WAL in pg_xlog
in the first place. The reason it's written and fsync'd to pg_xlog is
that if the standby subsequently crashes, you can use the WAL from
pg_xlog to reapply the WAL up to minRecoveryPoint. Otherwise you can't
start up the standby anymore.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Le 16 juil. 2010 à 12:43, Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> a écrit :
On 16/07/10 10:40, Fujii Masao wrote:
So we should always prevent the standby from applying any WAL in pg_xlog
unless walreceiver is in progress. That is, if there is no WAL available
in the archive, the standby ignores pg_xlog and starts walreceiver
process to request for WAL streaming.That completely defeats the purpose of storing streamed WAL in pg_xlog in the first place. The reason it's written and fsync'd to pg_xlog is that if the standby subsequently crashes, you can use the WAL from pg_xlog to reapply the WAL up to minRecoveryPoint. Otherwise you can't start up the standby anymore.
I guess we know for sure that this point has been fsync()ed on the Master, or that we could arrange it so that we know that?
On 16/07/10 20:26, Dimitri Fontaine wrote:
Le 16 juil. 2010 à 12:43, Heikki Linnakangas<heikki.linnakangas@enterprisedb.com> a écrit :
On 16/07/10 10:40, Fujii Masao wrote:
So we should always prevent the standby from applying any WAL in pg_xlog
unless walreceiver is in progress. That is, if there is no WAL available
in the archive, the standby ignores pg_xlog and starts walreceiver
process to request for WAL streaming.That completely defeats the purpose of storing streamed WAL in pg_xlog in the first place. The reason it's written and fsync'd to pg_xlog is that if the standby subsequently crashes, you can use the WAL from pg_xlog to reapply the WAL up to minRecoveryPoint. Otherwise you can't start up the standby anymore.
I guess we know for sure that this point has been fsync()ed on the Master, or that we could arrange it so that we know that?
At the moment we only stream WAL that's already been fsync()ed on the
master, so we don't have this problem, but Fujii is proposing to change
that.
I think that's a premature optimization, and we should not try to change
that. There is no evidence from field (granted, streaming replication is
a new feature) or from performance tests that it is a problem in
practice, or that sending WAL earlier would help. Let's concentrate on
the bare minimum required to make synchronous replication work.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 14/07/10 09:50, Fujii Masao wrote:
TODO
----
The patch have no features for performance improvement of synchronous
replication. I admit that currently the performance overhead in the
master is terrible. We need to address the following TODO items in the
subsequent CF.* Change the poll loop in the walsender
* Change the poll loop in the backend
* Change the poll loop in the startup process
* Change the poll loop in the walreceiver
I was actually hoping to see a patch for these things first, before any
of the synchronous replication stuff. Eliminating the polling loops is
important, latency will be laughable otherwise, and it will help the
synchronous case too.
* Perform the WAL write and replication concurrently
* Send WAL from not only disk but also WAL buffers
IMHO these are premature optimizations that we should not spend any
effort on now. Maybe later, if ever.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 14/07/10 09:50, Fujii Masao wrote:
Quorum commit
-------------
In previous discussion about synchronous replication, some people
wanted the quorum commit feature. This feature is included in also
Zontan's synchronous replication patch, so I decided to create it.The patch provides quorum parameter in postgresql.conf, which
specifies how many standby servers transaction commit will wait for
WAL records to be replicated to, before the command returns a
"success" indication to the client. The default value is zero, which
always doesn't make transaction commit wait for replication without
regard to replication_mode. Also transaction commit always doesn't
wait for replication to asynchronous standby (i.e., replication_mode
is set to async) without regard to this parameter. If quorum is more
than the number of synchronous standbys, transaction commit returns
a "success" when the ACK has arrived from all of synchronous standbys.
There should be a way to specify "wait for *all* connected standby
servers to acknowledge"
Protocol
--------
I extended the handshake message "START_REPLICATION" so that it
includes replication_mode read from recovery.conf. If 'async' is
passed, the master thinks that it doesn't need to wait for the ACK
from the standby.
Please use self-explanatory names for the modes in START_REPLICATION
command, instead of just an integer.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Fri, Jul 16, 2010 at 7:43 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
On 16/07/10 10:40, Fujii Masao wrote:
So we should always prevent the standby from applying any WAL in pg_xlog
unless walreceiver is in progress. That is, if there is no WAL available
in the archive, the standby ignores pg_xlog and starts walreceiver
process to request for WAL streaming.That completely defeats the purpose of storing streamed WAL in pg_xlog in
the first place. The reason it's written and fsync'd to pg_xlog is that if
the standby subsequently crashes, you can use the WAL from pg_xlog to
reapply the WAL up to minRecoveryPoint. Otherwise you can't start up the
standby anymore.
But, the standby can start up by reading the missing WAL files from the
master. No?
On the second thought, minRecoveryPoint can be guaranteed to be older
than the fsync location on the master if we'll prevent the standby from
applying the WAL files more than the fsync location. So we can safely
apply the WAL files in pg_xlog up to minRecoveryPoint.
Consequently, we should always prevent the standby from applying any
newer WAL in pg_xlog than minRecoveryPoint unless walreceiver is in
progress. Thought?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Sat, Jul 17, 2010 at 3:25 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
On 14/07/10 09:50, Fujii Masao wrote:
TODO
----
The patch have no features for performance improvement of synchronous
replication. I admit that currently the performance overhead in the
master is terrible. We need to address the following TODO items in the
subsequent CF.* Change the poll loop in the walsender
* Change the poll loop in the backend
* Change the poll loop in the startup process
* Change the poll loop in the walreceiverI was actually hoping to see a patch for these things first, before any of
the synchronous replication stuff. Eliminating the polling loops is
important, latency will be laughable otherwise, and it will help the
synchronous case too.
At first, note that the poll loop in the backend and walreceiver doesn't
exist without synchronous replication stuff.
Yeah, I'll start with the change of the poll loop in the walsender. I'm
thinking that we should make the backend signal the walsender to send the
outstanding WAL immediately as the previous synchronous replication patch
I submitted in the past year did. I use the signal here because walsender
needs to wait for the request from the backend and the ack message from
the standby *concurrently* in synchronous replication. If we use the
semaphore instead of the signal, the walsender would not be able to
respond the ack immediately, which also degrades the performance.
The problem of this idea is that signal can be sent per transaction commit.
I'm not sure if this frequent signaling really harms the performance of
replication. BTW, when I benchmarked the previous synchronous replication
patch based on the idea, AFAIR the result showed no impact of the
signaling. But... Thought? Do you have another better idea?
* Perform the WAL write and replication concurrently
* Send WAL from not only disk but also WAL buffersIMHO these are premature optimizations that we should not spend any effort
on now. Maybe later, if ever.
Yep!
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Sun, Jul 18, 2010 at 3:14 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
On 14/07/10 09:50, Fujii Masao wrote:
Quorum commit
-------------
In previous discussion about synchronous replication, some people
wanted the quorum commit feature. This feature is included in also
Zontan's synchronous replication patch, so I decided to create it.The patch provides quorum parameter in postgresql.conf, which
specifies how many standby servers transaction commit will wait for
WAL records to be replicated to, before the command returns a
"success" indication to the client. The default value is zero, which
always doesn't make transaction commit wait for replication without
regard to replication_mode. Also transaction commit always doesn't
wait for replication to asynchronous standby (i.e., replication_mode
is set to async) without regard to this parameter. If quorum is more
than the number of synchronous standbys, transaction commit returns
a "success" when the ACK has arrived from all of synchronous standbys.There should be a way to specify "wait for *all* connected standby servers
to acknowledge"
Agreed. I'll allow -1 as the valid value of the quorum parameter, which
means that transaction commit waits for all connected standbys.
Protocol
--------
I extended the handshake message "START_REPLICATION" so that it
includes replication_mode read from recovery.conf. If 'async' is
passed, the master thinks that it doesn't need to wait for the ACK
from the standby.Please use self-explanatory names for the modes in START_REPLICATION
command, instead of just an integer.
Agreed. What about changing the START_REPLICATION message to?:
START_REPLICATION XXX/XXX SYNC_LEVEL { async | recv | fsync | replay }
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
* Fujii Masao <masao.fujii@gmail.com> [100721 03:49]:
The patch provides quorum parameter in postgresql.conf, which
specifies how many standby servers transaction commit will wait for
WAL records to be replicated to, before the command returns a
"success" indication to the client. The default value is zero, which
always doesn't make transaction commit wait for replication without
regard to replication_mode. Also transaction commit always doesn't
wait for replication to asynchronous standby (i.e., replication_mode
is set to async) without regard to this parameter. If quorum is more
than the number of synchronous standbys, transaction commit returns
a "success" when the ACK has arrived from all of synchronous standbys.There should be a way to specify "wait for *all* connected standby servers
to acknowledge"Agreed. I'll allow -1 as the valid value of the quorum parameter, which
means that transaction commit waits for all connected standbys.
Hm... so if my 1 synchronouse standby is operatign normally, and quarum
is set to 1, I'll get what I want (commit waits until it's safely on both
servers). But what happens if my standby goes bad. Suddenly the quarum
setting is ignored (because it's > number of connected standby
servers?) Is there a way for me to not allow any commits if the quarum
setting number of standbies is *not* availble? Yes, I want my db to
"halt" in that situation, and yes, alarmbells will be ringing...
In reality, I'm likely to run 2 synchronous slaves, with quarum of 1.
So 1 slave can fail an dI can still have 2 going. But if that 2nd slave
ever failed while the other was down, I definately don't want the master
to forge on ahead!
Of course, this won't be for everyone, just as the current "just
connected standbys" isn't for everything either...
a.
--
Aidan Van Dyk Create like a god,
aidan@highrise.ca command like a king,
http://www.highrise.ca/ work like a slave.
On Wed, Jul 21, 2010 at 9:52 PM, Aidan Van Dyk <aidan@highrise.ca> wrote:
* Fujii Masao <masao.fujii@gmail.com> [100721 03:49]:
The patch provides quorum parameter in postgresql.conf, which
specifies how many standby servers transaction commit will wait for
WAL records to be replicated to, before the command returns a
"success" indication to the client. The default value is zero, which
always doesn't make transaction commit wait for replication without
regard to replication_mode. Also transaction commit always doesn't
wait for replication to asynchronous standby (i.e., replication_mode
is set to async) without regard to this parameter. If quorum is more
than the number of synchronous standbys, transaction commit returns
a "success" when the ACK has arrived from all of synchronous standbys.There should be a way to specify "wait for *all* connected standby servers
to acknowledge"Agreed. I'll allow -1 as the valid value of the quorum parameter, which
means that transaction commit waits for all connected standbys.Hm... so if my 1 synchronouse standby is operatign normally, and quarum
is set to 1, I'll get what I want (commit waits until it's safely on both
servers). But what happens if my standby goes bad. Suddenly the quarum
setting is ignored (because it's > number of connected standby
servers?) Is there a way for me to not allow any commits if the quarum
setting number of standbies is *not* availble? Yes, I want my db to
"halt" in that situation, and yes, alarmbells will be ringing...In reality, I'm likely to run 2 synchronous slaves, with quarum of 1.
So 1 slave can fail an dI can still have 2 going. But if that 2nd slave
ever failed while the other was down, I definately don't want the master
to forge on ahead!Of course, this won't be for everyone, just as the current "just
connected standbys" isn't for everything either...
Yeah, we need to clear up the detailed design of quorum commit feature,
and reach consensus on that.
How should the synchronous replication behave when the number of connected
standby servers is less than quorum?
1. Ignore quorum. The current patch adopts this. If the ACKs from all
connected standbys have arrived, transaction commit is successful
even if the number of standbys is less than quorum. If there is no
connected standby, transaction commit always is successful without
regard to quorum.
2. Observe quorum. Aidan wants this. Until the number of connected
standbys has become more than or equal to quorum, transaction commit
waits.
Which is the right behavior of quorum commit? Or we should add new
parameter specifying the behavior of quorum commit?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Wed, Jul 21, 2010 at 4:48 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
There should be a way to specify "wait for *all* connected standby servers
to acknowledge"Agreed. I'll allow -1 as the valid value of the quorum parameter, which
means that transaction commit waits for all connected standbys.
Done.
Please use self-explanatory names for the modes in START_REPLICATION
command, instead of just an integer.Agreed. What about changing the START_REPLICATION message to?:
START_REPLICATION XXX/XXX SYNC_LEVEL { async | recv | fsync | replay }
Done.
I attached the updated version of the patch.
The code is also available in my git repository:
git://git.postgresql.org/git/users/fujii/postgres.git
branch: synchrep
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
synch_rep_0722.patchapplication/octet-stream; name=synch_rep_0722.patchDownload
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1933,1938 **** SET ENABLE_SEQSCAN TO OFF;
--- 1933,1965 ----
</listitem>
</varlistentry>
+ <varlistentry id="guc-quorum" xreflabel="quorum">
+ <term><varname>quorum</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>quorum</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies how many standby servers transaction commit will wait
+ for WAL records to be replicated to, before the command returns
+ a <quote>success</> indication to the client. The default value
+ is zero, which always doesn't make transaction commit wait for
+ replication without regard to <xref linkend="replication-mode">.
+ Also transaction commit always doesn't wait for replication to
+ asynchronous standby (i.e., its <varname>replication_mode</> is
+ set to <literal>async</>) without regard to this parameter.
+ A value of <literal>-1</> regards all connected standby servers
+ as synchronous.
+ </para>
+ <para>
+ This parameter can be changed at any time; the behavior for any
+ one transaction is determined by the setting in effect when it
+ commits. It is therefore possible, and useful, to have some
+ transactions replicate synchronously and others asynchronously.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
<term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)</term>
<indexterm>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 148,161 **** protocol to make nodes agree on a serializable transactional order.
stream of write-ahead log (<acronym>WAL</>)
records. If the main server fails, the standby contains
almost all of the data of the main server, and can be quickly
! made the new master database server. This is asynchronous and
! can only be done for the entire database server.
</para>
<para>
A PITR standby server can be implemented using file-based log shipping
(<xref linkend="warm-standby">) or streaming replication (see
! <xref linkend="streaming-replication">), or a combination of both. For
! information on hot standby, see <xref linkend="hot-standby">.
</para>
</listitem>
</varlistentry>
--- 148,163 ----
stream of write-ahead log (<acronym>WAL</>)
records. If the main server fails, the standby contains
almost all of the data of the main server, and can be quickly
! made the new master database server. This can only be done for
! the entire database server.
</para>
<para>
A PITR standby server can be implemented using file-based log shipping
(<xref linkend="warm-standby">) or streaming replication (see
! <xref linkend="streaming-replication">), or a combination of both.
! While file-based log shipping is asynchronous, synchronization mode can
! be chosen in streaming replication (see <xref linkend="replication-mode">).
! For information on hot standby, see <xref linkend="hot-standby">.
</para>
</listitem>
</varlistentry>
***************
*** 348,354 **** protocol to make nodes agree on a serializable transactional order.
<entry>No master server overhead</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">•</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
--- 350,356 ----
<entry>No master server overhead</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">Optional</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
***************
*** 359,365 **** protocol to make nodes agree on a serializable transactional order.
<entry>No waiting for multiple servers</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">•</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
--- 361,367 ----
<entry>No waiting for multiple servers</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
! <entry align="center">Optional</entry>
<entry align="center">•</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
***************
*** 370,376 **** protocol to make nodes agree on a serializable transactional order.
<entry>Master failure will never lose data</entry>
<entry align="center">•</entry>
<entry align="center">•</entry>
! <entry align="center"></entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
--- 372,378 ----
<entry>Master failure will never lose data</entry>
<entry align="center">•</entry>
<entry align="center">•</entry>
! <entry align="center">Optional</entry>
<entry align="center"></entry>
<entry align="center">•</entry>
<entry align="center"></entry>
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1337,1347 **** The commands accepted in walsender mode are:
</varlistentry>
<varlistentry>
! <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
! WAL position <replaceable>XXX</>/<replaceable>XXX</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
--- 1337,1348 ----
</varlistentry>
<varlistentry>
! <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</> SYNC_LEVEL <replaceable>mode-string</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
! WAL position <replaceable>XXX</>/<replaceable>XXX</>
! with the replication mode <replaceable>mode-string</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyOutResponse message, and then starts to stream WAL to the frontend.
***************
*** 1360,1365 **** The commands accepted in walsender mode are:
--- 1361,1401 ----
<variablelist>
<varlistentry>
<term>
+ XLogRecPtr (F)
+ </term>
+ <listitem>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ Byte1('l')
+ </term>
+ <listitem>
+ <para>
+ Identifies the message as an acknowledgment of replication.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte8
+ </term>
+ <listitem>
+ <para>
+ The end of the WAL data replicated to the standby, given in
+ XLogRecPtr format.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ <variablelist>
+ <varlistentry>
+ <term>
XLogData (B)
</term>
<listitem>
*** a/doc/src/sgml/recovery-config.sgml
--- b/doc/src/sgml/recovery-config.sgml
***************
*** 280,285 **** restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
--- 280,343 ----
</para>
</listitem>
</varlistentry>
+ <varlistentry id="replication-mode" xreflabel="replication_mode">
+ <term><varname>replication_mode</varname> (<type>string</type>)</term>
+ <indexterm>
+ <primary><varname>replication_mode</> recovery parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies the replication mode which can control how long transaction
+ commit on the master server waits for replication before the command
+ returns a <quote>success</> indication to the client. Valid modes are:
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>async</> (doesn't make transaction commit wait for replication,
+ i.e., asynchronous replication)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>recv</> (makes transaction commit wait until the standby has
+ received WAL records)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>fsync</> (makes transaction commit wait until the standby has
+ received and flushed WAL records to disk)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>replay</> (makes transaction commit wait until the standby has
+ replayed WAL records after receiving and flushing them to disk)
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+ <para>
+ In asynchronous replication, there can be a delay between when a
+ <quote>success</> is reported to the client and when the transaction
+ is really guaranteed to be safe against a failover. Setting this
+ parameter to <literal>async</> does not create any risk of database
+ inconsistency: a crash at the master server might result in some recent
+ allegedly-committed transactions being lost at the standby server,
+ but the database state of the standby will be just the same as if
+ those transactions had been aborted cleanly. So, turning
+ <varname>replication_mode</> <literal>async</> can be a useful
+ alternative when performance is more important than exact certainty
+ about the durability of a transaction.
+ The default setting is <literal>async</>.
+ </para>
+ <para>
+ If <xref linkend="guc-quorum"> is set to zero in the master server,
+ transaction commit always doesn't wait for replication without regard
+ to this parameter.
+ </para>
+ </listitem>
+ </varlistentry>
<varlistentry id="trigger-file" xreflabel="trigger_file">
<term><varname>trigger_file</varname> (<type>string</type>)</term>
<indexterm>
*** a/src/backend/access/transam/recovery.conf.sample
--- b/src/backend/access/transam/recovery.conf.sample
***************
*** 100,105 ****
--- 100,110 ----
#primary_conninfo = '' # e.g. 'host=localhost port=5432'
#
#
+ # Specifies the synchronization mode of replication.
+ #
+ #replication_mode = 'async' # 'async', 'recv', 'fsync' or 'replay'
+ #
+ #
# By default, a standby server keeps streaming XLOG records from the
# primary indefinitely. If you want to stop streaming and finish recovery,
# opening up the system in read/write mode, specify path to a trigger file.
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 36,41 ****
--- 36,42 ----
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1088,1093 **** RecordTransactionCommit(void)
--- 1089,1106 ----
/* Compute latestXid while we have the child XIDs handy */
latestXid = TransactionIdLatest(xid, nchildren, children);
+ /*
+ * Wait for WAL to be replicated up to the COMMIT record if replication
+ * is enabled and quorum != 0. This operation has to be performed after
+ * the COMMIT record is generated and before other transactions know that
+ * this one has been committed.
+ *
+ * XXX: Since the caller prevents cancel/die interrupt, we cannot
+ * process that while waiting. Should we remove this restriction?
+ */
+ if (max_wal_senders > 0 && quorum != 0)
+ WaitXLogSend(XactLastRecEnd);
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd.xrecoff = 0;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 189,194 **** static TimestampTz recoveryTargetTime;
--- 189,202 ----
static bool StandbyMode = false;
static char *PrimaryConnInfo = NULL;
static char *TriggerFile = NULL;
+ const struct config_enum_entry replication_mode_options[] = {
+ {"async", REPLICATION_MODE_ASYNC, false},
+ {"recv", REPLICATION_MODE_RECV, false},
+ {"fsync", REPLICATION_MODE_FSYNC, false},
+ {"replay", REPLICATION_MODE_REPLAY, false},
+ {NULL, 0, false}
+ };
+ int rplMode = REPLICATION_MODE_ASYNC;
/* if recoveryStopsHere returns true, it saves actual stop xid/time here */
static TransactionId recoveryStopXid;
***************
*** 5258,5263 **** readRecoveryCommandFile(void)
--- 5266,5291 ----
(errmsg("trigger_file = '%s'",
TriggerFile)));
}
+ else if (strcmp(tok1, "replication_mode") == 0)
+ {
+ const struct config_enum_entry *entry;
+
+ for (entry = replication_mode_options; entry->name; entry++)
+ {
+ if (strcmp(tok2, entry->name) == 0)
+ {
+ rplMode = entry->val;
+ break;
+ }
+ }
+ if (entry->name == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for parameter \"replication_mode\": \"%s\"",
+ tok2)));
+ ereport(DEBUG2,
+ (errmsg("replication_mode = '%s'", tok2)));
+ }
else
ereport(FATAL,
(errmsg("unrecognized recovery parameter \"%s\"",
***************
*** 6867,6872 **** GetFlushRecPtr(void)
--- 6895,6917 ----
}
/*
+ * GetReplayRecPtr -- Returns the last replay position.
+ */
+ XLogRecPtr
+ GetReplayRecPtr(void)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ recptr = xlogctl->recoveryLastRecPtr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return recptr;
+ }
+
+ /*
* Get the time of the last xlog segment switch
*/
pg_time_t
***************
*** 8824,8837 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! SpinLockAcquire(&xlogctl->info_lck);
! recptr = xlogctl->recoveryLastRecPtr;
! SpinLockRelease(&xlogctl->info_lck);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
--- 8869,8878 ----
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
XLogRecPtr recptr;
char location[MAXFNAMELEN];
! recptr = GetReplayRecPtr();
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
***************
*** 9463,9469 **** retry:
{
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
! PrimaryConnInfo);
continue;
}
}
--- 9504,9510 ----
{
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
! PrimaryConnInfo, rplMode);
continue;
}
}
*** a/src/backend/libpq/be-secure.c
--- b/src/backend/libpq/be-secure.c
***************
*** 71,76 ****
--- 71,86 ----
#endif
#endif /* USE_SSL */
+ #ifdef HAVE_POLL_H
+ #include <poll.h>
+ #endif
+ #ifdef HAVE_SYS_POLL_H
+ #include <sys/poll.h>
+ #endif
+ #ifdef HAVE_SYS_SELECT_H
+ #include <sys/select.h>
+ #endif
+
#include "libpq/libpq.h"
#include "tcop/tcopprot.h"
***************
*** 397,402 **** wloop:
--- 407,472 ----
return n;
}
+ /*
+ * Checks a socket, using poll or select, for data to be read.
+ * Returns >0 if there is data to read, 0 if it timed out, -1
+ * if an error occurred (including the interrupt).
+ *
+ * Timeout is specified in millisec. Timeout is infinite if
+ * timeout_ms is negative. Timeout is immediate (no blocking)
+ * if timeout_ms is 0.
+ *
+ * If SSL is in use, the SSL buffer is checked prior to
+ * checking the socket for read data directly.
+ *
+ * This function is based on pqSocketCheck and pqSocketPoll.
+ */
+ int
+ secure_poll(Port *port, int timeout_ms)
+ {
+ #ifdef USE_SSL
+ /* Check for SSL library buffering read bytes */
+ if (port->ssl && SSL_pending(port->ssl) > 0)
+ {
+ /* short-circuit the select */
+ return 1;
+ }
+ #endif
+
+ {
+ /* We use poll(2) if available, otherwise select(2) */
+ #ifdef HAVE_POLL
+ struct pollfd input_fd;
+
+ input_fd.fd = port->sock;
+ input_fd.events = POLLIN | POLLERR;
+ input_fd.revents = 0;
+
+ return poll(&input_fd, 1, timeout_ms);
+ #else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+
+ FD_ZERO(&input_mask);
+ FD_SET(port->sock, &input_mask);
+
+ if (timeout_ms < 0)
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
+ }
+
+ return select(port->sock + 1, &input_mask,
+ NULL, NULL, ptr_timeout);
+ #endif /* HAVE_POLL */
+ }
+ }
+
/* ------------------------------------------------------------ */
/* SSL specific code */
/* ------------------------------------------------------------ */
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,62 ----
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_getbyte_if_available - get a byte if available without blocking
+ * pq_wait - wait until we can read connection
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
***************
*** 911,916 **** pq_getbyte_if_available(unsigned char *c)
--- 912,959 ----
}
/* --------------------------------
+ * pq_wait - wait until we can read the connection socket.
+ *
+ * returns >0 if there is data to read, 0 if it timed out or
+ * interrupted, -1 if an error occurred.
+ *
+ * this function is based on pqSocketCheck.
+ * --------------------------------
+ */
+ int
+ pq_wait(int timeout_ms)
+ {
+ int result;
+
+ if (!MyProcPort)
+ return -1;
+ if (MyProcPort->sock < 0)
+ {
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+ return -1;
+ }
+
+ result = secure_poll(MyProcPort, timeout_ms);
+ if (result < 0)
+ {
+ if (errno == EINTR)
+ return 0; /* interrupted */
+
+ /*
+ * XXX: Should we suppress duplicate log messages also here,
+ * like internal_flush?
+ */
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ }
+
+ return result;
+ }
+
+ /* --------------------------------
* pq_getbytes - get a known number of bytes from connection
*
* returns 0 if OK, EOF if trouble
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 47,55 **** static bool justconnected = false;
static char *recvBuf = NULL;
/* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
--- 47,57 ----
static char *recvBuf = NULL;
/* Prototypes for interface functions */
! static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint,
! const char *modename);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
walrcv_disconnect = libpqrcv_disconnect;
}
--- 66,76 ----
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
+ walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
***************
*** 75,81 **** _PG_init(void)
* Establish the connection to the primary server for XLOG streaming
*/
static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
{
char conninfo_repl[MAXCONNINFO + 37];
char *primary_sysid;
--- 78,84 ----
* Establish the connection to the primary server for XLOG streaming
*/
static bool
! libpqrcv_connect(char *conninfo, XLogRecPtr startpoint, const char *modename)
{
char conninfo_repl[MAXCONNINFO + 37];
char *primary_sysid;
***************
*** 154,161 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
! startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
--- 157,164 ----
ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
! snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X SYNC_LEVEL %s",
! startpoint.xlogid, startpoint.xrecoff, modename);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 401,418 ----
return true;
}
+
+ /*
+ * Send a message to XLOG stream.
+ *
+ * ereports on error.
+ */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send data to WAL stream: %s",
+ PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool am_walreceiver;
--- 57,63 ----
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
***************
*** 113,118 **** static void WalRcvDie(int code, Datum arg);
--- 114,120 ----
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
+ static void XLogWalRcvSendRecPtr(XLogRecPtr recptr);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
***************
*** 158,164 **** void
--- 160,168 ----
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
+ const char *modename;
XLogRecPtr startpoint;
+ XLogRecPtr ackedpoint = {0, 0};
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
***************
*** 206,211 **** WalReceiverMain(void)
--- 210,216 ----
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ rplMode = walrcv->rplMode;
startpoint = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
***************
*** 247,253 **** WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
--- 252,258 ----
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
***************
*** 259,267 **** WalReceiverMain(void)
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
! walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
--- 264,287 ----
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /* Lookup the name of replication mode with the "rplMode" */
+ {
+ const struct config_enum_entry *entry;
+
+ for (entry = replication_mode_options; entry->name; entry++)
+ {
+ if (entry->val == rplMode)
+ {
+ modename = entry->name;
+ break;
+ }
+ }
+ Assert(entry->name != NULL);
+ }
+
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
! walrcv_connect(conninfo, startpoint, modename);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
***************
*** 311,316 **** WalReceiverMain(void)
--- 331,355 ----
*/
XLogWalRcvFlush();
}
+
+ /*
+ * If replication_mode is "replay", send the last WAL replay location
+ * to the primary, to acknowledge that replication has been completed
+ * up to that. This occurs only when WAL records were replayed since
+ * the last acknowledgement.
+ */
+ if (rplMode == REPLICATION_MODE_REPLAY &&
+ XLByteLT(ackedpoint, LogstreamResult.Flush))
+ {
+ XLogRecPtr recptr;
+
+ recptr = GetReplayRecPtr();
+ if (XLByteLT(ackedpoint, recptr))
+ {
+ XLogWalRcvSendRecPtr(recptr);
+ ackedpoint = recptr;
+ }
+ }
}
}
***************
*** 406,411 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
--- 445,463 ----
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
+ /*
+ * If replication_mode is "recv", send the last WAL receive
+ * location to the primary, to acknowledge that replication
+ * has been completed up to that.
+ */
+ if (rplMode == REPLICATION_MODE_RECV)
+ {
+ XLogRecPtr endptr = msghdr.dataStart;
+
+ XLByteAdvance(endptr, len);
+ XLogWalRcvSendRecPtr(endptr);
+ }
+
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
***************
*** 523,528 **** XLogWalRcvFlush(void)
--- 575,588 ----
LogstreamResult.Flush = LogstreamResult.Write;
+ /*
+ * If replication_mode is "fsync", send the last WAL flush
+ * location to the primary, to acknowledge that replication
+ * has been completed up to that.
+ */
+ if (rplMode == REPLICATION_MODE_FSYNC)
+ XLogWalRcvSendRecPtr(LogstreamResult.Flush);
+
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
walrcv->latestChunkStart = walrcv->receivedUpto;
***************
*** 541,543 **** XLogWalRcvFlush(void)
--- 601,624 ----
}
}
}
+
+ /* Send the lsn to the primary server */
+ static void
+ XLogWalRcvSendRecPtr(XLogRecPtr recptr)
+ {
+ static char *msgbuf = NULL;
+ WalAckMessageData msgdata;
+
+ /*
+ * Allocate buffer that will be used for each output message if first
+ * time through. We do this just once to reduce palloc overhead.
+ * The buffer must be made large enough for maximum-sized messages.
+ */
+ if (msgbuf == NULL)
+ msgbuf = palloc(1 + sizeof(WalAckMessageData));
+
+ msgbuf[0] = 'l';
+ msgdata.ackEnd = recptr;
+ memcpy(msgbuf + 1, &msgdata, sizeof(WalAckMessageData));
+ walrcv_send(msgbuf, 1 + sizeof(WalAckMessageData));
+ }
*** a/src/backend/replication/walreceiverfuncs.c
--- b/src/backend/replication/walreceiverfuncs.c
***************
*** 168,178 **** ShutdownWalRcv(void)
/*
* Request postmaster to start walreceiver.
*
! * recptr indicates the position where streaming should begin, and conninfo
! * is a libpq connection string to use.
*/
void
! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
--- 168,178 ----
/*
* Request postmaster to start walreceiver.
*
! * recptr indicates the position where streaming should begin, conninfo is
! * a libpq connection string to use, and mode is a replication mode.
*/
void
! RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo, int mode)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
***************
*** 196,201 **** RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
--- 196,202 ----
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
+ walrcv->rplMode = mode;
walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 65,73 **** bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
! #define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles
! * (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
--- 65,73 ----
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
+ int quorum = 0; /* the maximum number of synchronous walsenders */
! #define NAPTIME_PER_CYCLE 100L /* max sleep time between cycles (100ms) */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 84,89 **** static uint32 sendOff = 0;
--- 84,96 ----
*/
static XLogRecPtr sentPtr = {0, 0};
+ /*
+ * How far have we completed replication already? This is also
+ * advertised in MyWalSnd->ackdPtr. This is not used in asynchronous
+ * replication case.
+ */
+ static XLogRecPtr ackdPtr = {0, 0};
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
***************
*** 101,107 **** static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
! static void CheckClosedConnection(void);
/* Main entry point for walsender process */
--- 108,114 ----
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(char *msgbuf, bool *caughtup);
! static void ProcessStreamMsgs(StringInfo inMsg);
/* Main entry point for walsender process */
***************
*** 195,200 **** WalSndHandshake(void)
--- 202,209 ----
/* Handle the very limited subset of commands expected in this phase */
switch (firstchar)
{
+ char modename[8];
+
case 'Q': /* Query message */
{
const char *query_string;
***************
*** 255,262 **** WalSndHandshake(void)
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
! else if (sscanf(query_string, "START_REPLICATION %X/%X",
! &recptr.xlogid, &recptr.xrecoff) == 2)
{
StringInfoData buf;
--- 264,271 ----
ReadyForQuery(DestRemote);
/* ReadyForQuery did pq_flush for us */
}
! else if (sscanf(query_string, "START_REPLICATION %X/%X SYNC_LEVEL %8s",
! &recptr.xlogid, &recptr.xrecoff, modename) == 3)
{
StringInfoData buf;
***************
*** 277,282 **** WalSndHandshake(void)
--- 286,325 ----
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
+ /* Verify that the specified replication mode is valid */
+ {
+ const struct config_enum_entry *entry;
+
+ for (entry = replication_mode_options; entry->name; entry++)
+ {
+ if (strcmp(modename, entry->name) == 0)
+ {
+ rplMode = entry->val;
+ break;
+ }
+ }
+ if (entry->name == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication mode: %s", modename)));
+ }
+ MyWalSnd->rplMode = rplMode;
+
+ /*
+ * Update the current number of synchronous standbys if replication
+ * mode is "synchronous"
+ */
+ if (rplMode >= REPLICATION_MODE_RECV)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+
+ SpinLockAcquire(&walsndctl->info_lck);
+ walsndctl->num_sync_sbys++;
+ Assert(walsndctl->num_sync_sbys > 0);
+ SpinLockRelease(&walsndctl->info_lck);
+ }
+
/* Send a CopyOutResponse message, and start streaming */
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
***************
*** 285,294 **** WalSndHandshake(void)
pq_flush();
/*
! * Initialize position to the received one, then the
* xlog records begin to be shipped from that position
*/
! sentPtr = recptr;
/* break out of the loop */
replication_started = true;
--- 328,337 ----
pq_flush();
/*
! * Initialize positions to the received one, then the
* xlog records begin to be shipped from that position
*/
! sentPtr = ackdPtr = recptr;
/* break out of the loop */
replication_started = true;
***************
*** 322,364 **** WalSndHandshake(void)
}
/*
! * Check if the remote end has closed the connection.
*/
static void
! CheckClosedConnection(void)
{
! unsigned char firstchar;
! int r;
! r = pq_getbyte_if_available(&firstchar);
! if (r < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (r == 0)
{
! /* no data available without blocking */
! return;
! }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
/*
* 'X' means that the standby is closing down the socket.
*/
! case 'X':
! proc_exit(0);
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid standby closing message type %d",
! firstchar)));
}
}
--- 365,473 ----
}
/*
! * Process messages received from the standby.
! *
! * ereports on error.
*/
static void
! ProcessStreamMsgs(StringInfo inMsg)
{
! bool acked = false;
! /* Loop to process successive complete messages available */
! for (;;)
{
! unsigned char firstchar;
! int r;
!
! r = pq_getbyte_if_available(&firstchar);
! if (r < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (r == 0)
! {
! /* no data available without blocking */
! break;
! }
!
! /* Handle the very limited subset of commands expected in this phase */
! switch (firstchar)
! {
! case 'd': /* CopyData message */
! {
! unsigned char rpltype;
!
! /*
! * Read the message contents. This is expected to be done without
! * blocking because we've been able to get message type code.
! */
! if (pq_getmessage(inMsg, 0))
! proc_exit(0); /* suitable message already logged */
!
! /* Read the replication message type from CopyData message */
! rpltype = pq_getmsgbyte(inMsg);
! switch (rpltype)
! {
! case 'l':
! {
! WalAckMessageData *msgdata;
!
! msgdata = (WalAckMessageData *) pq_getmsgbytes(inMsg, sizeof(WalAckMessageData));
!
! /*
! * Update local status.
! *
! * The ackd ptr received from standby should not
! * go backwards.
! */
! if (XLByteLE(ackdPtr, msgdata->ackEnd))
! ackdPtr = msgdata->ackEnd;
! else
! ereport(FATAL,
! (errmsg("replication completion location went back from "
! "%X/%X to %X/%X",
! ackdPtr.xlogid, ackdPtr.xrecoff,
! msgdata->ackEnd.xlogid, msgdata->ackEnd.xrecoff)));
!
! acked = true; /* also need to update shared position */
! break;
! }
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid replication message type %d",
! rpltype)));
! }
! break;
! }
/*
* 'X' means that the standby is closing down the socket.
*/
! case 'X':
! proc_exit(0);
! default:
! ereport(FATAL,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("invalid standby closing message type %d",
! firstchar)));
! }
! }
!
! if (acked)
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = MyWalSnd;
!
! SpinLockAcquire(&walsnd->mutex);
! walsnd->ackdPtr = ackdPtr;
! SpinLockRelease(&walsnd->mutex);
}
}
***************
*** 366,374 **** CheckClosedConnection(void)
--- 475,486 ----
static int
WalSndLoop(void)
{
+ StringInfoData input_message;
char *output_message;
bool caughtup = false;
+ initStringInfo(&input_message);
+
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
***************
*** 428,443 **** WalSndLoop(void)
*/
if (caughtup)
{
! remain = WalSndDelay * 1000L;
while (remain > 0)
{
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
! /* Sleep and check that the connection is still alive */
! pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! CheckClosedConnection();
remain -= NAPTIME_PER_CYCLE;
}
--- 540,569 ----
*/
if (caughtup)
{
! remain = WalSndDelay;
while (remain > 0)
{
+ int res;
+
/* Check for interrupts */
if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
! /*
! * Check to see whether a message from the standby or an interrupt
! * from other processes has arrived.
! */
! res = pq_wait(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
! if (res < 0)
! {
! /* unexpected error or EOF */
! ereport(COMMERROR,
! (errcode(ERRCODE_PROTOCOL_VIOLATION),
! errmsg("unexpected EOF on standby connection")));
! proc_exit(0);
! }
! if (res > 0)
! ProcessStreamMsgs(&input_message);
remain -= NAPTIME_PER_CYCLE;
}
***************
*** 496,501 **** InitWalSnd(void)
--- 622,628 ----
MyWalSnd = (WalSnd *) walsnd;
walsnd->pid = MyProcPid;
MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&MyWalSnd->ackdPtr, 0, sizeof(XLogRecPtr));
SpinLockRelease(&walsnd->mutex);
break;
}
***************
*** 523,528 **** WalSndKill(int code, Datum arg)
--- 650,670 ----
*/
MyWalSnd->pid = 0;
+ /*
+ * Update the current number of synchronous standbys if replication
+ * mode is "synchronous"
+ */
+ if (rplMode >= REPLICATION_MODE_RECV)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSndCtlData *walsndctl = WalSndCtl;
+
+ SpinLockAcquire(&walsndctl->info_lck);
+ Assert(walsndctl->num_sync_sbys > 0);
+ walsndctl->num_sync_sbys--;
+ SpinLockRelease(&walsndctl->info_lck);
+ }
+
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
}
***************
*** 884,889 **** WalSndShmemInit(void)
--- 1026,1032 ----
{
/* First time through, so initialize */
MemSet(WalSndCtl, 0, WalSndShmemSize());
+ SpinLockInit(&WalSndCtl->info_lck);
for (i = 0; i < max_wal_senders; i++)
{
***************
*** 895,937 **** WalSndShmemInit(void)
}
/*
! * This isn't currently used for anything. Monitoring tools might be
! * interested in the future, and we'll need something like this in the
! * future for synchronous replication.
! */
! #ifdef NOT_USED
! /*
! * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
! * if none.
*/
! XLogRecPtr
! GetOldestWALSendPointer(void)
{
! XLogRecPtr oldest = {0, 0};
! int i;
! bool found = false;
! for (i = 0; i < max_wal_senders; i++)
{
/* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! XLogRecPtr recptr;
! if (walsnd->pid == 0)
! continue;
! SpinLockAcquire(&walsnd->mutex);
! recptr = walsnd->sentPtr;
! SpinLockRelease(&walsnd->mutex);
! if (recptr.xlogid == 0 && recptr.xrecoff == 0)
! continue;
! if (!found || XLByteLT(recptr, oldest))
! oldest = recptr;
! found = true;
}
- return oldest;
}
-
- #endif
--- 1038,1109 ----
}
/*
! * Ensure that all xlog records through the given position is
! * replicated to the standby
*/
! void
! WaitXLogSend(XLogRecPtr record)
{
! Assert(max_wal_senders > 0);
! Assert(quorum != 0);
! for (;;)
{
/* use volatile pointer to prevent code rearrangement */
! volatile WalSndCtlData *walsndctl = WalSndCtl;
! int i;
! int already_acked = 0;
! bool unacked = false;
! /* Don't need to wait if there is no synchronous standbys */
! if (walsndctl->num_sync_sbys == 0)
! return;
! /*
! * Count walsenders which have already received the ACK meaning
! * completion of replication up to the given position. If the
! * sum is more than or equal to the "quorum", the backend breaks
! * out of this loop and returns a "success" of the transaction
! * to a client.
! */
! for (i = 0; i < max_wal_senders; i++)
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
! XLogRecPtr recptr;
! /* Don't count inactive or asynchronous walsenders */
! if (walsnd->pid == 0 ||
! walsnd->rplMode == REPLICATION_MODE_ASYNC)
! continue;
!
! SpinLockAcquire(&walsnd->mutex);
! recptr = walsnd->ackdPtr;
! SpinLockRelease(&walsnd->mutex);
! if ((recptr.xlogid == 0 && recptr.xrecoff == 0) ||
! XLByteLT(recptr, record))
! {
! unacked = true;
! continue;
! }
!
! if (quorum > -1 && ++already_acked >= quorum)
! return;
! }
!
! /*
! * If synchronous walsender was not found in the WalSnd array,
! * we no longer need to wait. This can happen if all synchronous
! * walsenders are terminated while searching the array.
! *
! * If all synchronous walsenders have already received the ACK,
! * we no longer need to wait, too. This can happen when the
! * "quorum" is more than max_wal_senders or is equal to -1.
! */
! if (!unacked)
! return;
!
! pg_usleep(100000L); /* 100ms */
}
}
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1782,1787 **** static struct config_int ConfigureNamesInt[] =
--- 1782,1796 ----
},
{
+ {"quorum", PGC_USERSET, WAL_REPLICATION,
+ gettext_noop("Sets the maximum number of synchronous standby servers."),
+ NULL
+ },
+ &quorum,
+ 0, -1, INT_MAX / 4, NULL, NULL
+ },
+
+ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
"flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 188,193 ****
--- 188,195 ----
#max_wal_senders = 0 # max number of walsender processes
#wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
+ #quorum = 0 # max number of synchronous standbys;
+ # -1 regards all connected standbys as synchronous
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
# - Standby Servers -
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 15,20 ****
--- 15,21 ----
#include "access/xlogdefs.h"
#include "lib/stringinfo.h"
#include "storage/buf.h"
+ #include "utils/guc.h"
#include "utils/pg_crc.h"
#include "utils/timestamp.h"
***************
*** 261,266 **** typedef struct CheckpointStatsData
--- 262,291 ----
extern CheckpointStatsData CheckpointStats;
+ /*
+ * Synchronization mode of replication. These modes identify how far
+ * we should wait for replication.
+ *
+ * REPLICATION_MODE_ASYNC doesn't make transaction commit wait for
+ * replication, i.e., asynchronous replication.
+ *
+ * REPLICATION_MODE_RECV makes transaction commit wait for XLOG
+ * records to be received on the standby server.
+ *
+ * REPLICATION_MODE_FSYNC makes transaction commit wait for XLOG
+ * records to be received and fsync'd on the standby server.
+ *
+ * REPLICATION_MODE_REPLAY makes transaction commit wait for XLOG
+ * records to be received, fsync'd and replayed on the standby server.
+ */
+ #define REPLICATION_MODE_ASYNC 0
+ #define REPLICATION_MODE_RECV 1
+ #define REPLICATION_MODE_FSYNC 2
+ #define REPLICATION_MODE_REPLAY 3
+ extern int rplMode;
+ extern const struct config_enum_entry replication_mode_options[];
+
+
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
extern void XLogBackgroundFlush(void);
***************
*** 298,303 **** extern void XLogPutNextOid(Oid nextOid);
--- 323,329 ----
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
+ extern XLogRecPtr GetReplayRecPtr(void);
extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
extern TimeLineID GetRecoveryTargetTLI(void);
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 58,63 **** extern int pq_getmessage(StringInfo s, int maxlen);
--- 58,64 ----
extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
+ extern int pq_wait(int timeout_ms);
extern int pq_putbytes(const char *s, size_t len);
extern int pq_flush(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
***************
*** 74,78 **** extern int secure_open_server(Port *port);
--- 75,80 ----
extern void secure_close(Port *port);
extern ssize_t secure_read(Port *port, void *ptr, size_t len);
extern ssize_t secure_write(Port *port, void *ptr, size_t len);
+ extern int secure_poll(Port *port, int timeout_ms);
#endif /* LIBPQ_H */
*** a/src/include/replication/walprotocol.h
--- b/src/include/replication/walprotocol.h
***************
*** 50,53 **** typedef struct
--- 50,63 ----
*/
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
+ /*
+ * Body for a WAL acknowledgment message (message type 'l'). This is wrapped
+ * within a CopyData message at the FE/BE protocol level.
+ */
+ typedef struct
+ {
+ /* End of WAL replicated to the standby */
+ XLogRecPtr ackEnd;
+ } WalAckMessageData;
+
#endif /* _WALPROTOCOL_H */
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 71,89 **** typedef struct
*/
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
--- 71,99 ----
*/
char conninfo[MAXCONNINFO];
+ /*
+ * replication mode; controls how long transaction commit on the primary
+ * server waits for replication.
+ */
+ int rplMode;
+
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
! typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint,
! const char *modename);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
***************
*** 93,99 **** extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
--- 103,110 ----
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
! extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo,
! int mode);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 22,27 **** typedef struct WalSnd
--- 22,30 ----
{
pid_t pid; /* this walsender's process id, or 0 */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ XLogRecPtr ackdPtr; /* WAL has been replicated up to this point */
+
+ int rplMode; /* replication mode */
slock_t mutex; /* locks shared variables shown above */
} WalSnd;
***************
*** 29,34 **** typedef struct WalSnd
--- 32,41 ----
/* There is one WalSndCtl struct for the whole database cluster */
typedef struct
{
+ int num_sync_sbys; /* current # of synchronous standbys */
+
+ slock_t info_lck; /* protects the variable shown above */
+
WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */
} WalSndCtlData;
***************
*** 40,49 **** extern bool am_walsender;
--- 47,58 ----
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
+ extern int quorum;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
+ extern void WaitXLogSend(XLogRecPtr recptr);
#endif /* _WALSENDER_H */
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 2002,2007 **** PQnotifies(PGconn *conn)
--- 2002,2010 ----
/*
* PQputCopyData - send some data to the backend during COPY IN
*
+ * Note that this function might be called by walreceiver even during
+ * COPY OUT to send a message to XLOG stream.
+ *
* Returns 1 if successful, 0 if data could not be sent (only possible
* in nonblock mode), or -1 if an error occurs.
*/
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
--- 2013,2020 ----
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN &&
! conn->asyncStatus != PGASYNC_COPY_OUT)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
Fujii Masao wrote:
How should the synchronous replication behave when the number of connected
standby servers is less than quorum?1. Ignore quorum. The current patch adopts this. If the ACKs from all
connected standbys have arrived, transaction commit is successful
even if the number of standbys is less than quorum. If there is no
connected standby, transaction commit always is successful without
regard to quorum.2. Observe quorum. Aidan wants this. Until the number of connected
standbys has become more than or equal to quorum, transaction commit
waits.Which is the right behavior of quorum commit? Or we should add new
parameter specifying the behavior of quorum commit?
Initially I also expected the quorum to behave like described by
Aidan/option 2. Also, IMHO the name "quorom" is a bit short, like having
"maximum" but not saying a max_something.
quorum_min_sync_standbys
quorum_max_sync_standbys
The question remains what are the sync standbys? Does it mean not-async?
Intuitively by looking at the enumeration of replication_mode I'd think
that the sync standbys are all standby's that operate in a not async
mode. That would be clearer with a boolean sync (or not) and for sync
standbys the replication_mode specified.
regards,
Yeb Havinga
On Thu, Jul 22, 2010 at 5:37 PM, Yeb Havinga <yebhavinga@gmail.com> wrote:
Fujii Masao wrote:
How should the synchronous replication behave when the number of connected
standby servers is less than quorum?1. Ignore quorum. The current patch adopts this. If the ACKs from all
connected standbys have arrived, transaction commit is successful
even if the number of standbys is less than quorum. If there is no
connected standby, transaction commit always is successful without
regard to quorum.2. Observe quorum. Aidan wants this. Until the number of connected
standbys has become more than or equal to quorum, transaction commit
waits.Which is the right behavior of quorum commit? Or we should add new
parameter specifying the behavior of quorum commit?Initially I also expected the quorum to behave like described by
Aidan/option 2.
OK. But some people (including me) would like to prevent the master
from halting when the standby fails, so I think that 1. also should
be supported. So I'm inclined to add new parameter specifying the
behavior of quorum commit when the number of synchronous standbys
becomes less than quorum.
Also, IMHO the name "quorom" is a bit short, like having
"maximum" but not saying a max_something.quorum_min_sync_standbys
quorum_max_sync_standbys
What about quorum_standbys?
The question remains what are the sync standbys? Does it mean not-async?
It's the standby which sets replication_mode to "recv", "fsync", or "replay".
Intuitively by looking at the enumeration of replication_mode I'd think that
the sync standbys are all standby's that operate in a not async mode. That
would be clearer with a boolean sync (or not) and for sync standbys the
replication_mode specified.
You mean that something like synchronous_replication as the recovery.conf
parameter should be added in addition to replication_mode? Since increasing
the number of similar parameters would confuse users, I don't like do that.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Fujii Masao wrote:
Intuitively by looking at the enumeration of replication_mode I'd think that
the sync standbys are all standby's that operate in a not async mode. That
would be clearer with a boolean sync (or not) and for sync standbys the
replication_mode specified.You mean that something like synchronous_replication as the recovery.conf
parameter should be added in addition to replication_mode? Since increasing
the number of similar parameters would confuse users, I don't like do that.
I think what would be confusing if there is a mismatch between
implemented concepts and parameters.
1 does the master wait for standby servers on commit?
2 how many acknowledgements must the master receive before it can continue?
3 is a standby server a synchronous one, i.e. does it acknowledge a commit?
4 when do standby servers acknowledge a commit?
5 does it only wait when the standby's are connected, or also when they
are not connected?
6..?
When trying to match parameter names for the concepts above:
1 - does not exist, but can be answered with quorum_standbys = 0
2 - quorum_standbys
3 - yes, if replication_mode != async (here is were I thought I had to
think to much)
4 - replication modes recv, fsync and replay bot not async
5 - Zoltan's strict_sync_replication parameter
Just an idea, what about
for 4: acknowledge_commit = {no|recv|fsync|replay}
then 3 = yes, if acknowledge_commit != no
regards,
Yeb Havinga
On Mon, Jul 26, 2010 at 5:27 PM, Yeb Havinga <yebhavinga@gmail.com> wrote:
Fujii Masao wrote:
Intuitively by looking at the enumeration of replication_mode I'd think
that
the sync standbys are all standby's that operate in a not async mode.
That
would be clearer with a boolean sync (or not) and for sync standbys the
replication_mode specified.You mean that something like synchronous_replication as the recovery.conf
parameter should be added in addition to replication_mode? Since
increasing
the number of similar parameters would confuse users, I don't like do
that.I think what would be confusing if there is a mismatch between implemented
concepts and parameters.1 does the master wait for standby servers on commit?
2 how many acknowledgements must the master receive before it can continue?
3 is a standby server a synchronous one, i.e. does it acknowledge a commit?
4 when do standby servers acknowledge a commit?
5 does it only wait when the standby's are connected, or also when they are
not connected?
6..?When trying to match parameter names for the concepts above:
1 - does not exist, but can be answered with quorum_standbys = 0
2 - quorum_standbys
3 - yes, if replication_mode != async (here is were I thought I had to think
to much)
4 - replication modes recv, fsync and replay bot not async
5 - Zoltan's strict_sync_replication parameterJust an idea, what about
for 4: acknowledge_commit = {no|recv|fsync|replay}
then 3 = yes, if acknowledge_commit != no
Thanks for the clarification.
I still like
replication_mode = {async|recv|fsync|replay}
rather than
synchronous_replication = {on|off}
acknowledge_commit = {no|recv|fsync|replay}
because the former is more intuitive for me and I don't want
to increase the number of parameters.
We need to hear from some users in this respect. If most want
the latter, of course, I'd love to adopt it.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Fujii Masao wrote:
I still like
replication_mode = {async|recv|fsync|replay}
rather than
synchronous_replication = {on|off}
acknowledge_commit = {no|recv|fsync|replay}
Hello Fujii,
I wasn't entirely clear. My suggestion was to have only
acknowledge_commit = {no|recv|fsync|replay}
instead of
replication_mode = {async|recv|fsync|replay}
regards,
Yeb Havinga
On Mon, Jul 26, 2010 at 6:36 PM, Yeb Havinga <yebhavinga@gmail.com> wrote:
Fujii Masao wrote:
I still like
replication_mode = {async|recv|fsync|replay}
rather than
synchronous_replication = {on|off}
acknowledge_commit = {no|recv|fsync|replay}Hello Fujii,
I wasn't entirely clear. My suggestion was to have only
acknowledge_commit = {no|recv|fsync|replay}
instead of
replication_mode = {async|recv|fsync|replay}
Okay, I'll change the patch accordingly.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On 7/26/10 1:44 PM +0300, Fujii Masao wrote:
On Mon, Jul 26, 2010 at 6:36 PM, Yeb Havinga<yebhavinga@gmail.com> wrote:
I wasn't entirely clear. My suggestion was to have only
acknowledge_commit = {no|recv|fsync|replay}
instead of
replication_mode = {async|recv|fsync|replay}
Okay, I'll change the patch accordingly.
For what it's worth, I think replication_mode is a lot clearer.
Acknowledge_commit sounds like it would do something similar to
asynchronous_commit.
Regards,
Marko Tiikkaja
On Mon, Jul 26, 2010 at 6:48 AM, Marko Tiikkaja
<marko.tiikkaja@cs.helsinki.fi> wrote:
On 7/26/10 1:44 PM +0300, Fujii Masao wrote:
On Mon, Jul 26, 2010 at 6:36 PM, Yeb Havinga<yebhavinga@gmail.com> wrote:
I wasn't entirely clear. My suggestion was to have only
acknowledge_commit = {no|recv|fsync|replay}
instead of
replication_mode = {async|recv|fsync|replay}
Okay, I'll change the patch accordingly.
For what it's worth, I think replication_mode is a lot clearer.
Acknowledge_commit sounds like it would do something similar to
asynchronous_commit.
I agree.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On Thu, Jul 22, 2010 at 10:37:12AM +0200, Yeb Havinga wrote:
Fujii Masao wrote:
Initially I also expected the quorum to behave like described by
Aidan/option 2. Also, IMHO the name "quorom" is a bit short, like having
"maximum" but not saying a max_something.quorum_min_sync_standbys
quorum_max_sync_standbys
Perhaps I'm hijacking the wrong thread for this, but I wonder if the quorum
idea is really the best thing for us. I've been thinking about Oracle's way of
doing things[1]http://download.oracle.com/docs/cd/B28359_01/server.111/b28294/protection.htm#SBYDB02000 alternatively, http://is.gd/dLkq4. In short, there are three different modes: availability,
performance, and protection. "Protection" appears to mean that at least one
standby has applied the log; "availability" means at least one standby has
received the log info (it doesn't specify whether that info has been fsynced
or applied, but presumably does not mean "applied", since it's distinct from
"protection" mode); "performance" means replication is asynchronous. I'm not
sure this method is perfect, but it might be simpler than the quorum behavior
that has been considered, and adequate for actual use cases.
[1]: http://download.oracle.com/docs/cd/B28359_01/server.111/b28294/protection.htm#SBYDB02000 alternatively, http://is.gd/dLkq4
http://download.oracle.com/docs/cd/B28359_01/server.111/b28294/protection.htm#SBYDB02000
alternatively, http://is.gd/dLkq4
--
Joshua Tolley / eggyknap
End Point Corporation
http://www.endpoint.com
On Tue, Jul 27, 2010 at 12:36 PM, Joshua Tolley <eggyknap@gmail.com> wrote:
Perhaps I'm hijacking the wrong thread for this, but I wonder if the quorum
idea is really the best thing for us. I've been thinking about Oracle's way of
doing things[1]. In short, there are three different modes: availability,
performance, and protection. "Protection" appears to mean that at least one
standby has applied the log; "availability" means at least one standby has
received the log info (it doesn't specify whether that info has been fsynced
or applied, but presumably does not mean "applied", since it's distinct from
"protection" mode); "performance" means replication is asynchronous. I'm not
sure this method is perfect, but it might be simpler than the quorum behavior
that has been considered, and adequate for actual use cases.
In my case, I'd like to set up one synchronous standby on the near rack for
high-availability, and one asynchronous standby on the remote site for disaster
recovery. Can Oracle's way cover the case?
"availability" mode with two standbys might create a sort of similar situation.
That is, since the ACK from the near standby arrives in first, the near standby
acts synchronous and the remote one does asynchronous. But the ACK from the
remote standby can arrive in first, so it's not guaranteed that the near standby
has received the log info before transaction commit returns a "success" to the
client. In this case, we have to failover to the remote standby even if it's not
under control of a clusterware. This is a problem for me.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Mon, Jul 26, 2010 at 8:25 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, Jul 26, 2010 at 6:48 AM, Marko Tiikkaja
<marko.tiikkaja@cs.helsinki.fi> wrote:On 7/26/10 1:44 PM +0300, Fujii Masao wrote:
On Mon, Jul 26, 2010 at 6:36 PM, Yeb Havinga<yebhavinga@gmail.com> wrote:
I wasn't entirely clear. My suggestion was to have only
acknowledge_commit = {no|recv|fsync|replay}
instead of
replication_mode = {async|recv|fsync|replay}
Okay, I'll change the patch accordingly.
For what it's worth, I think replication_mode is a lot clearer.
Acknowledge_commit sounds like it would do something similar to
asynchronous_commit.I agree.
As the result of the vote, I'll leave the parameter "replication_mode"
as it is.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Wed, Jul 21, 2010 at 4:36 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
I was actually hoping to see a patch for these things first, before any of
the synchronous replication stuff. Eliminating the polling loops is
important, latency will be laughable otherwise, and it will help the
synchronous case too.At first, note that the poll loop in the backend and walreceiver doesn't
exist without synchronous replication stuff.Yeah, I'll start with the change of the poll loop in the walsender. I'm
thinking that we should make the backend signal the walsender to send the
outstanding WAL immediately as the previous synchronous replication patch
I submitted in the past year did. I use the signal here because walsender
needs to wait for the request from the backend and the ack message from
the standby *concurrently* in synchronous replication. If we use the
semaphore instead of the signal, the walsender would not be able to
respond the ack immediately, which also degrades the performance.The problem of this idea is that signal can be sent per transaction commit.
I'm not sure if this frequent signaling really harms the performance of
replication. BTW, when I benchmarked the previous synchronous replication
patch based on the idea, AFAIR the result showed no impact of the
signaling. But... Thought? Do you have another better idea?
The attached patch changes the backend so that it signals walsender to
wake up from the sleep and send WAL immediately. It doesn't include any
other synchronous replication stuff.
The signal is sent right after a COMMIT, PREPARE TRANSACTION,
COMMIT PREPARED or ABORT PREPARED record has been fsync'd.
To suppress redundant signaling, I added the flag which indicates whether
walsender is ready for sending WAL up to the currently-fsync'd location.
Only when the flag is false, the backend sets it to true and sends the
signal to walsender. When the flag is true, the signal doesn't need to be
sent. The flag is set to false right before walsender sends WAL.
The code is also available in my git repository:
git://git.postgresql.org/git/users/fujii/postgres.git
branch: wakeup-walsnd
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
change_poll_loop_in_walsender_0727.patchapplication/octet-stream; name=change_poll_loop_in_walsender_0727.patchDownload
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 55,60 ****
--- 55,61 ----
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
***************
*** 1025,1030 **** EndPrepare(GlobalTransaction gxact)
--- 1026,1038 ----
/* If we crash now, we have prepared: WAL replay will fix things */
+ /*
+ * Wake up all walsenders to send WAL up to the PREPARE record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
/* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{
***************
*** 2005,2010 **** RecordTransactionCommitPrepared(TransactionId xid,
--- 2013,2025 ----
/* Flush XLOG to disk */
XLogFlush(recptr);
+ /*
+ * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
/* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children);
***************
*** 2078,2083 **** RecordTransactionAbortPrepared(TransactionId xid,
--- 2093,2105 ----
XLogFlush(recptr);
/*
+ * Wake up all walsenders to send WAL up to the ABORT PREPARED record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
+ /*
* Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here.
*/
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 36,41 ****
--- 36,42 ----
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1068,1073 **** RecordTransactionCommit(void)
--- 1069,1081 ----
XLogFlush(XactLastRecEnd);
/*
+ * Wake up all walsenders to send WAL up to the COMMIT record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
+ /*
* Now we may update the CLOG, if we wrote a COMMIT record above
*/
if (markXidCommitted)
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 87,98 **** static XLogRecPtr sentPtr = {0, 0};
--- 87,100 ----
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
+ static volatile sig_atomic_t xlogsend_requested = false;
static volatile sig_atomic_t ready_to_stop = false;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);
+ static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
***************
*** 418,423 **** WalSndLoop(void)
--- 420,428 ----
proc_exit(0);
}
+ if (xlogsend_requested)
+ xlogsend_requested = false;
+
/*
* If we had sent all accumulated WAL in last round, nap for the
* configured time before retrying.
***************
*** 433,439 **** WalSndLoop(void)
while (remain > 0)
{
/* Check for interrupts */
! if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
/* Sleep and check that the connection is still alive */
--- 438,445 ----
while (remain > 0)
{
/* Check for interrupts */
! if (got_SIGHUP || shutdown_requested || xlogsend_requested ||
! ready_to_stop)
break;
/* Sleep and check that the connection is still alive */
***************
*** 665,670 **** XLogSend(char *msgbuf, bool *caughtup)
--- 671,678 ----
Size nbytes;
WalDataMessageHeader msghdr;
+ MyWalSnd->sndrqst = false;
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
***************
*** 678,684 **** XLogSend(char *msgbuf, bool *caughtup)
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
{
! *caughtup = true;
return true;
}
--- 686,692 ----
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
{
! *caughtup = !MyWalSnd->sndrqst;
return true;
}
***************
*** 718,730 **** XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, endptr))
{
endptr = SendRqstPtr;
! *caughtup = true;
}
else
{
/* round down to page boundary. */
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
*caughtup = false;
}
nbytes = endptr.xrecoff - startptr.xrecoff;
--- 726,739 ----
if (XLByteLE(SendRqstPtr, endptr))
{
endptr = SendRqstPtr;
! *caughtup = !MyWalSnd->sndrqst;
}
else
{
/* round down to page boundary. */
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
*caughtup = false;
+ MyWalSnd->sndrqst = true;
}
nbytes = endptr.xrecoff - startptr.xrecoff;
***************
*** 828,833 **** WalSndQuickDieHandler(SIGNAL_ARGS)
--- 837,849 ----
exit(2);
}
+ /* SIGUSR1: set flag to send WAL records */
+ static void
+ WalSndXLogSendHandler(SIGNAL_ARGS)
+ {
+ xlogsend_requested = true;
+ }
+
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
***************
*** 847,853 **** WalSndSignals(void)
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, SIG_IGN); /* not used */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
--- 863,869 ----
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
***************
*** 895,900 **** WalSndShmemInit(void)
--- 911,937 ----
}
}
+ /* Wake all walsenders up by signaling */
+ void
+ WalSndWakeup(void)
+ {
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ pid_t pid = walsnd->pid;
+
+ /* we don't need to signal to waking walsenders */
+ if (pid != 0 && !walsnd->sndrqst)
+ {
+ walsnd->sndrqst = true;
+ kill(pid, SIGUSR1);
+ }
+ }
+ }
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 12,17 ****
--- 12,21 ----
#ifndef _WALSENDER_H
#define _WALSENDER_H
+ #include "postgres.h"
+
+ #include <signal.h>
+
#include "access/xlog.h"
#include "storage/spin.h"
***************
*** 23,28 **** typedef struct WalSnd
--- 27,42 ----
pid_t pid; /* this walsender's process id, or 0 */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ /*
+ * The request for WAL sending has already been sent? If false,
+ * we set sndrqst to true and signal walsender to send WAL up to
+ * the current WAL write location immeidately. If true, since
+ * walsender is ready for sending all the currently-written WAL,
+ * we don't need to signal. This value is used to suppress
+ * redundant signaling.
+ */
+ sig_atomic_t sndrqst;
+
slock_t mutex; /* locks shared variables shown above */
} WalSnd;
***************
*** 45,49 **** extern int WalSenderMain(void);
--- 59,64 ----
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
+ extern void WalSndWakeup(void);
#endif /* _WALSENDER_H */
Fujii Masao wrote:
On Mon, Jul 26, 2010 at 8:25 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, Jul 26, 2010 at 6:48 AM, Marko Tiikkaja
<marko.tiikkaja@cs.helsinki.fi> wrote:On 7/26/10 1:44 PM +0300, Fujii Masao wrote:
On Mon, Jul 26, 2010 at 6:36 PM, Yeb Havinga<yebhavinga@gmail.com> wrote:
I wasn't entirely clear. My suggestion was to have only
acknowledge_commit = {no|recv|fsync|replay}
instead of
replication_mode = {async|recv|fsync|replay}
Okay, I'll change the patch accordingly.
For what it's worth, I think replication_mode is a lot clearer.
Acknowledge_commit sounds like it would do something similar to
asynchronous_commit.I agree.
As the result of the vote, I'll leave the parameter "replication_mode"
as it is.
I'd like to bring forward another suggestion (please tell me when it is
becoming spam). My feeling about replication_mode as is, is that is says
in the same parameter something about async or sync, as well as, if
sync, which method of feedback to the master. OTOH having two parameters
would need documentation that the feedback method may only be set if the
replication_mode was sync, as well as checks. So it is actually good to
have it all in one parameter
But somehow the shoe pinches, because async feels different from the
other three parameters. There is a way to move async out of the enumeration:
synchronous_replication_mode = off | recv | fsync | replay
This also looks a bit like the "synchronous_replication = N # similar in
name to synchronous_commit" Simon Riggs proposed in
http://archives.postgresql.org/pgsql-hackers/2010-05/msg01418.php
regards,
Yeb Havinga
PS: Please bear with me, I thought a bit about a way to make clear what
deduction users must make when figuring out if the replication mode is
synchronous. That question might be important when counting 'which
servers are the synchronous standbys' to debug quorum settings.
replication_mode
from the assumption !async -> sync
and !async -> recv|fsync|replay
to infer recv|fsync|replay -> synchronous_replication.
synchronous_replication_mode
from the assumption !off -> on
and !off -> recv|fsync|replay
to infer recv|fsync|replay -> synchronous_replication.
I think the last one is easier made by humans, since everybody will make
the !off-> on assumption, but not the !async -> sync without having that
verified in the documentation.
Joshua Tolley wrote:
Perhaps I'm hijacking the wrong thread for this, but I wonder if the quorum
idea is really the best thing for us.
For reference: it appeared in a long thread a while ago
http://archives.postgresql.org/pgsql-hackers/2010-05/msg01226.php.
In short, there are three different modes: availability,
performance, and protection. "Protection" appears to mean that at least one
standby has applied the log; "availability" means at least one standby has
received the log info
Maybe we could do both, by describing use cases along the availability,
performance and protection setups in the documentation and how they
would be reflected with the standby related parameters.
regards,
Yeb Havinga
Fujii Masao wrote:
The attached patch changes the backend so that it signals walsender to
wake up from the sleep and send WAL immediately. It doesn't include any
other synchronous replication stuff.
Hello Fujii,
I noted the changes in XlogSend where instead of *caughtup = true/false
it now returns !MyWalSnd->sndrqst. That value is initialized to false in
that procedure and it cannot be changed to true during execution of that
procedure, or can it?
regards,
Yeb Havinga
On Tue, Jul 27, 2010 at 7:39 PM, Yeb Havinga <yebhavinga@gmail.com> wrote:
Fujii Masao wrote:
The attached patch changes the backend so that it signals walsender to
wake up from the sleep and send WAL immediately. It doesn't include any
other synchronous replication stuff.Hello Fujii,
Thanks for the review!
I noted the changes in XlogSend where instead of *caughtup = true/false it
now returns !MyWalSnd->sndrqst. That value is initialized to false in that
procedure and it cannot be changed to true during execution of that
procedure, or can it?
That value is set to true in WalSndWakeup(). If WalSndWakeup() is called
after initialization of that value in XLogSend(), *caughtup is set to false.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Jul 27, 2010 at 5:42 PM, Yeb Havinga <yebhavinga@gmail.com> wrote:
I'd like to bring forward another suggestion (please tell me when it is
becoming spam). My feeling about replication_mode as is, is that is says in
the same parameter something about async or sync, as well as, if sync, which
method of feedback to the master. OTOH having two parameters would need
documentation that the feedback method may only be set if the
replication_mode was sync, as well as checks. So it is actually good to have
it all in one parameterBut somehow the shoe pinches, because async feels different from the other
three parameters. There is a way to move async out of the enumeration:synchronous_replication_mode = off | recv | fsync | replay
ISTM that we need to get more feedback from users to determine which
is the best. So, how about leaving the parameter as it is and revisiting
this topic later? Since it's not difficult to change the parameter later,
we will not regret even if we delay that determination.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Fujii Masao wrote:
I noted the changes in XlogSend where instead of *caughtup = true/false it
now returns !MyWalSnd->sndrqst. That value is initialized to false in that
procedure and it cannot be changed to true during execution of that
procedure, or can it?That value is set to true in WalSndWakeup(). If WalSndWakeup() is called
after initialization of that value in XLogSend(), *caughtup is set to false.
Ah, so it can be changed by another backend process.
Another question:
Is there a reason not to send the signal in XlogFlush itself, so it
would be called at
CreateCheckPoint(), EndPrepare(), FlushBuffer(),
RecordTransactionAbortPrepared(), RecordTransactionCommit(),
RecordTransactionCommitPrepared(), RelationTruncate(),
SlruPhysicalWritePage(), write_relmap_file(), WriteTruncateXlogRec(),
and xact_redo_commit().
regards,
Yeb Havinga
On Tue, Jul 27, 2010 at 01:41:10PM +0900, Fujii Masao wrote:
On Tue, Jul 27, 2010 at 12:36 PM, Joshua Tolley <eggyknap@gmail.com> wrote:
Perhaps I'm hijacking the wrong thread for this, but I wonder if the quorum
idea is really the best thing for us. I've been thinking about Oracle's way of
doing things[1]. In short, there are three different modes: availability,
performance, and protection. "Protection" appears to mean that at least one
standby has applied the log; "availability" means at least one standby has
received the log info (it doesn't specify whether that info has been fsynced
or applied, but presumably does not mean "applied", since it's distinct from
"protection" mode); "performance" means replication is asynchronous. I'm not
sure this method is perfect, but it might be simpler than the quorum behavior
that has been considered, and adequate for actual use cases.In my case, I'd like to set up one synchronous standby on the near rack for
high-availability, and one asynchronous standby on the remote site for disaster
recovery. Can Oracle's way cover the case?
I don't think it can support the case you're interested in, though I'm not
terribly expert on it. I'm definitely not arguing for the syntax Oracle uses,
or something similar; I much prefer the flexibility we're proposing, and agree
with Yeb Havinga in another email who suggests we spell out in documentation
some recipes for achieving various possible scenarios given whatever GUCs we
settle on.
"availability" mode with two standbys might create a sort of similar situation.
That is, since the ACK from the near standby arrives in first, the near standby
acts synchronous and the remote one does asynchronous. But the ACK from the
remote standby can arrive in first, so it's not guaranteed that the near standby
has received the log info before transaction commit returns a "success" to the
client. In this case, we have to failover to the remote standby even if it's not
under control of a clusterware. This is a problem for me.
My concern is that in a quorum system, if the quorum number is less than the
total number of replicas, there's no way to know *which* replicas composed the
quorum for any given transaction, so we can't know which servers to fail to if
the master dies. This isn't different from Oracle, where it looks like
essentially the "quorum" value is always 1. Your scenario shows that all
replicas are not created equal, and that sometimes we'll be interested in WAL
getting committed on a specific subset of the available servers. If I had two
nearby replicas called X and Y, and one at a remote site called Z, for
instance, I'd set quorum to 2, but really I'd want to say "wait for server X
and Y before committing, but don't worry about Z".
I have no idea how to set up our GUCs to encode a situation like that :)
--
Joshua Tolley / eggyknap
End Point Corporation
http://www.endpoint.com
On Tue, Jul 27, 2010 at 8:48 PM, Yeb Havinga <yebhavinga@gmail.com> wrote:
Is there a reason not to send the signal in XlogFlush itself, so it would be
called atCreateCheckPoint(), EndPrepare(), FlushBuffer(),
RecordTransactionAbortPrepared(), RecordTransactionCommit(),
RecordTransactionCommitPrepared(), RelationTruncate(),
SlruPhysicalWritePage(), write_relmap_file(), WriteTruncateXlogRec(), and
xact_redo_commit().
Yes, it's because there is no need to send WAL immediately in other
than the following functions:
* EndPrepare()
* RecordTransactionAbortPrepared()
* RecordTransactionCommit()
* RecordTransactionCommitPrepared()
Some functions call XLogFlush() to follow the basic WAL rule. In the
standby, WAL records are always flushed to disk prior to any corresponding
data-file change. So, we don't need to replicate the result of XLogFlush()
immediately for the WAL rule.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Jul 27, 2010 at 10:12 PM, Joshua Tolley <eggyknap@gmail.com> wrote:
I don't think it can support the case you're interested in, though I'm not
terribly expert on it. I'm definitely not arguing for the syntax Oracle uses,
or something similar; I much prefer the flexibility we're proposing, and agree
with Yeb Havinga in another email who suggests we spell out in documentation
some recipes for achieving various possible scenarios given whatever GUCs we
settle on.
Agreed. I'll add it to my TODO list.
My concern is that in a quorum system, if the quorum number is less than the
total number of replicas, there's no way to know *which* replicas composed the
quorum for any given transaction, so we can't know which servers to fail to if
the master dies.
What about checking the current WAL receive location of each standby by
using pg_last_xlog_receive_location()? The standby which has the newest
location should be failed over to.
This isn't different from Oracle, where it looks like
essentially the "quorum" value is always 1. Your scenario shows that all
replicas are not created equal, and that sometimes we'll be interested in WAL
getting committed on a specific subset of the available servers. If I had two
nearby replicas called X and Y, and one at a remote site called Z, for
instance, I'd set quorum to 2, but really I'd want to say "wait for server X
and Y before committing, but don't worry about Z".I have no idea how to set up our GUCs to encode a situation like that :)
Yeah, quorum commit alone cannot cover that situation. I think that
current approach (i.e., quorum commit plus replication mode per standby)
would cover that. In your example, you can choose "recv", "fsync" or
"replay" as replication_mode in X and Y, and choose "async" in Z.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Jul 27, 2010 at 10:53:45PM +0900, Fujii Masao wrote:
On Tue, Jul 27, 2010 at 10:12 PM, Joshua Tolley <eggyknap@gmail.com> wrote:
My concern is that in a quorum system, if the quorum number is less than the
total number of replicas, there's no way to know *which* replicas composed the
quorum for any given transaction, so we can't know which servers to fail to if
the master dies.What about checking the current WAL receive location of each standby by
using pg_last_xlog_receive_location()? The standby which has the newest
location should be failed over to.
That makes sense. Thanks.
This isn't different from Oracle, where it looks like
essentially the "quorum" value is always 1. Your scenario shows that all
replicas are not created equal, and that sometimes we'll be interested in WAL
getting committed on a specific subset of the available servers. If I had two
nearby replicas called X and Y, and one at a remote site called Z, for
instance, I'd set quorum to 2, but really I'd want to say "wait for server X
and Y before committing, but don't worry about Z".I have no idea how to set up our GUCs to encode a situation like that :)
Yeah, quorum commit alone cannot cover that situation. I think that
current approach (i.e., quorum commit plus replication mode per standby)
would cover that. In your example, you can choose "recv", "fsync" or
"replay" as replication_mode in X and Y, and choose "async" in Z.
Clearly I need to read through the GUCs and docs better. I'll try to keep
quiet until that's finished :)
--
Joshua Tolley / eggyknap
End Point Corporation
http://www.endpoint.com
Le 27 juil. 2010 à 15:12, Joshua Tolley <eggyknap@gmail.com> a écrit :
My concern is that in a quorum system, if the quorum number is less than the
total number of replicas, there's no way to know *which* replicas composed the
quorum for any given transaction, so we can't know which servers to fail to if
the master dies. This isn't different from Oracle, where it looks like
essentially the "quorum" value is always 1. Your scenario shows that all
replicas are not created equal, and that sometimes we'll be interested in WAL
getting committed on a specific subset of the available servers. If I had two
nearby replicas called X and Y, and one at a remote site called Z, for
instance, I'd set quorum to 2, but really I'd want to say "wait for server X
and Y before committing, but don't worry about Z".I have no idea how to set up our GUCs to encode a situation like that :)
You make it so that Z does not take a vote, by setting it async.
Regards,
--
dim
On 27/07/10 16:12, Joshua Tolley wrote:
My concern is that in a quorum system, if the quorum number is less than the
total number of replicas, there's no way to know *which* replicas composed the
quorum for any given transaction, so we can't know which servers to fail to if
the master dies.
In fact, it's possible for one standby to sync up to X, then disconnect
and reconnect, and have the master count it second time in the quorum.
Especially if the master doesn't notice that the standby disconnected,
e.g a network problem.
I don't think any of this quorum stuff makes much sense without
explicitly registering standbys in the master.
That would also solve the fuzziness with wal_keep_segments - if the
master knew what standbys exist, it could keep track of how far each
standby has received WAL, and keep just enough WAL for each standby to
catch up.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Sun, Aug 1, 2010 at 7:11 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
In fact, it's possible for one standby to sync up to X, then disconnect and
reconnect, and have the master count it second time in the quorum.
Especially if the master doesn't notice that the standby disconnected, e.g a
network problem.I don't think any of this quorum stuff makes much sense without explicitly
registering standbys in the master.
This doesn't have to be done manually. The streaming protocol could
include the standby sending its system id to the master. The master
could just keep a list of system ids with the last record they've been
sent and the last they've confirmed receipt, fsync, application,
whatever the protocol covers. If the same system reconnects it just
overwrites the existing data for that system id.
--
greg
On Sun, Aug 1, 2010 at 8:30 AM, Greg Stark <gsstark@mit.edu> wrote:
On Sun, Aug 1, 2010 at 7:11 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:In fact, it's possible for one standby to sync up to X, then disconnect and
reconnect, and have the master count it second time in the quorum.
Especially if the master doesn't notice that the standby disconnected, e.g a
network problem.I don't think any of this quorum stuff makes much sense without explicitly
registering standbys in the master.This doesn't have to be done manually. The streaming protocol could
include the standby sending its system id to the master. The master
could just keep a list of system ids with the last record they've been
sent and the last they've confirmed receipt, fsync, application,
whatever the protocol covers. If the same system reconnects it just
overwrites the existing data for that system id.
That seems entirely too clever. Where are you going to store this
data? What if you want to clean out the list?
I've felt from the beginning that the idea of doing synchronous
replication without having an explicit notion of what standbys are out
there was not on very sound footing, and I think the difficulties of
making quorum commit work properly are only further evidence of that.
Much has been made of the notion of "wait for N votes, but allow
standbys to explicitly give up their vote", but that's still not fully
general - for example, you can't implement A && (B || C).
Perhaps someone will claim that nobody wants to do that anyway (which
I don't believe, BTW), but even in simpler cases it would be nicer to
have an explicit policy rather than - in effect - inferring a policy
from a soup of GUC settings. For example, if you want one synchronous
standby (A) and two asynchronous standbys (B and C). You can say
quorum=1 on the master and then configure vote=1 on A and vote=0 on B
and C, but now you have to look at four machines to figure out what
the policy is, and a change on any one of those machines can break it.
ISTM that if you can just write synchronous_standbys=A on the master,
that's a whole lot more clear and less error-prone.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On Sun, Aug 1, 2010 at 9:30 PM, Greg Stark <gsstark@mit.edu> wrote:
This doesn't have to be done manually.
Agreed, if we register standbys in the master.
The streaming protocol could
include the standby sending its system id to the master. The master
could just keep a list of system ids with the last record they've been
sent and the last they've confirmed receipt, fsync, application,
whatever the protocol covers. If the same system reconnects it just
overwrites the existing data for that system id.
Since every standby has the same system id, we cannot distinguish
them by that id. ISTM that the master should assign the unique id
for each standby, and they should save it in pg_control.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Sun, Aug 1, 2010 at 10:08 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Sun, Aug 1, 2010 at 9:30 PM, Greg Stark <gsstark@mit.edu> wrote:
This doesn't have to be done manually.
Agreed, if we register standbys in the master.
The streaming protocol could
include the standby sending its system id to the master. The master
could just keep a list of system ids with the last record they've been
sent and the last they've confirmed receipt, fsync, application,
whatever the protocol covers. If the same system reconnects it just
overwrites the existing data for that system id.Since every standby has the same system id, we cannot distinguish
them by that id. ISTM that the master should assign the unique id
for each standby, and they should save it in pg_control.
Another option might be to let the user name them.
standby_name='near'
standby_name='far1'
standby_name='far2'
...or whatever.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On Sun, Aug 1, 2010 at 3:11 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
I don't think any of this quorum stuff makes much sense without explicitly
registering standbys in the master.
I'm not sure if this is a good idea. This requires users to do more
manual operations than ever when setting up the replication; assign
unique name (or ID) to each standby, register them in the master,
specify the names in each recovery.conf (or elsewhere), and remove
the registration from the master when getting rid of the standby.
But this is similar to the way of MySQL replication setup, so some
people (excluding me) may be familiar with it.
That would also solve the fuzziness with wal_keep_segments - if the master
knew what standbys exist, it could keep track of how far each standby has
received WAL, and keep just enough WAL for each standby to catch up.
What if the registered standby stays down for a long time?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Sun, Aug 1, 2010 at 9:51 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Perhaps someone will claim that nobody wants to do that anyway (which
I don't believe, BTW), but even in simpler cases it would be nicer to
have an explicit policy rather than - in effect - inferring a policy
from a soup of GUC settings. For example, if you want one synchronous
standby (A) and two asynchronous standbys (B and C). You can say
quorum=1 on the master and then configure vote=1 on A and vote=0 on B
and C, but now you have to look at four machines to figure out what
the policy is, and a change on any one of those machines can break it.
ISTM that if you can just write synchronous_standbys=A on the master,
that's a whole lot more clear and less error-prone.
Some standbys may become master later by failover. So we would
need to write something like synchronous_standbys=A on not only
current one master but also those standbys. Changing
synchronous_standbys would require change on all those servers.
Or the master should replicate even that change to the standbys?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Mon, Aug 2, 2010 at 5:02 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Sun, Aug 1, 2010 at 9:51 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Perhaps someone will claim that nobody wants to do that anyway (which
I don't believe, BTW), but even in simpler cases it would be nicer to
have an explicit policy rather than - in effect - inferring a policy
from a soup of GUC settings. For example, if you want one synchronous
standby (A) and two asynchronous standbys (B and C). You can say
quorum=1 on the master and then configure vote=1 on A and vote=0 on B
and C, but now you have to look at four machines to figure out what
the policy is, and a change on any one of those machines can break it.
ISTM that if you can just write synchronous_standbys=A on the master,
that's a whole lot more clear and less error-prone.Some standbys may become master later by failover. So we would
need to write something like synchronous_standbys=A on not only
current one master but also those standbys. Changing
synchronous_standbys would require change on all those servers.
Or the master should replicate even that change to the standbys?
Let's not get *the manner of specifying the policy* confused with *the
need to update the policy when the master changes*. It doesn't seem
likely you would want the same value for synchronous_standbys on all
your machines. In the most common configuration, you'd probably have:
on A: synchronous_standbys=B
on B: synchronous_standbys=A
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On Mon, Aug 2, 2010 at 7:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Let's not get *the manner of specifying the policy* confused with *the
need to update the policy when the master changes*. It doesn't seem
likely you would want the same value for synchronous_standbys on all
your machines. In the most common configuration, you'd probably have:on A: synchronous_standbys=B
on B: synchronous_standbys=A
Oh, true. But, what if we have another synchronous standby called C?
We specify the policy as follows?:
on A: synchronous_standbys=B,C
on B: synchronous_standbys=A,C
on C: synchronous_standbys=A,B
We would need to change the setting on both A and B when we want to
change the name of the third standby from C to D, for example. No?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Mon, Aug 2, 2010 at 7:06 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Mon, Aug 2, 2010 at 7:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Let's not get *the manner of specifying the policy* confused with *the
need to update the policy when the master changes*. It doesn't seem
likely you would want the same value for synchronous_standbys on all
your machines. In the most common configuration, you'd probably have:on A: synchronous_standbys=B
on B: synchronous_standbys=AOh, true. But, what if we have another synchronous standby called C?
We specify the policy as follows?:on A: synchronous_standbys=B,C
on B: synchronous_standbys=A,C
on C: synchronous_standbys=A,BWe would need to change the setting on both A and B when we want to
change the name of the third standby from C to D, for example. No?
Sure. If you give the standbys names, then if people change the
names, they'll have to update their configuration. But I can't see
that as an argument against doing it. You can remove the possibility
that someone will have a hassle if they rename a server by not
allowing them to give it a name in the first place, but that doesn't
seem like a win from a usability perspective.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On Mon, Aug 2, 2010 at 8:32 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Sure. If you give the standbys names, then if people change the
names, they'll have to update their configuration. But I can't see
that as an argument against doing it. You can remove the possibility
that someone will have a hassle if they rename a server by not
allowing them to give it a name in the first place, but that doesn't
seem like a win from a usability perspective.
I'm just comparing your idea (i.e., set synchronous_standbys on
each possible master) with my idea (i.e., set replication_mode on
each standby). Though your idea has the advantage described in the
following post, it seems to make the setup of the standbys more
complicated, as I described. So I'm trying to generate better idea.
http://archives.postgresql.org/pgsql-hackers/2010-08/msg00007.php
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Fujii Masao wrote:
On Mon, Aug 2, 2010 at 7:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Let's not get *the manner of specifying the policy* confused with *the
need to update the policy when the master changes*. It doesn't seem
likely you would want the same value for synchronous_standbys on all
your machines. In the most common configuration, you'd probably have:on A: synchronous_standbys=B
on B: synchronous_standbys=AOh, true. But, what if we have another synchronous standby called C?
We specify the policy as follows?:on A: synchronous_standbys=B,C
on B: synchronous_standbys=A,C
on C: synchronous_standbys=A,BWe would need to change the setting on both A and B when we want to
change the name of the third standby from C to D, for example. No?
What if the master is named as well in the 'pool of servers that are in
sync'? In the scenario above this pool would be A,B,C. Working with this
concept has as benefit that the setting can be copied to all other
servers as well, and is invariant under any number of failures or
switchovers. The same could also hold for quorum expressions like A &&
(B || C), if A,B,C are either master or standby.
I initially though that once the definitions could be the same on all
servers, having them in a system catalog would be a good thing. However
that'd propably hard to setup, and also in the case of failures during
change of the parameters it could become very messy.
regards,
Yeb Havinga
On Mon, Aug 2, 2010 at 8:57 AM, Yeb Havinga <yebhavinga@gmail.com> wrote:
Fujii Masao wrote:
On Mon, Aug 2, 2010 at 7:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Let's not get *the manner of specifying the policy* confused with *the
need to update the policy when the master changes*. It doesn't seem
likely you would want the same value for synchronous_standbys on all
your machines. In the most common configuration, you'd probably have:on A: synchronous_standbys=B
on B: synchronous_standbys=AOh, true. But, what if we have another synchronous standby called C?
We specify the policy as follows?:on A: synchronous_standbys=B,C
on B: synchronous_standbys=A,C
on C: synchronous_standbys=A,BWe would need to change the setting on both A and B when we want to
change the name of the third standby from C to D, for example. No?What if the master is named as well in the 'pool of servers that are in
sync'? In the scenario above this pool would be A,B,C. Working with this
concept has as benefit that the setting can be copied to all other servers
as well, and is invariant under any number of failures or switchovers. The
same could also hold for quorum expressions like A && (B || C), if A,B,C are
either master or standby.I initially though that once the definitions could be the same on all
servers, having them in a system catalog would be a good thing. However
that'd propably hard to setup, and also in the case of failures during
change of the parameters it could become very messy.
Yeah, I think this information has to be stored either in GUCs or in a
flat-file somewhere. Putting it in a system catalog will cause major
problems when trying to get a down system back up, I think.
I suspect that for complex setups, people will need to use some kind
of cluster-ware to update the settings as nodes go up and down. But I
think it will still be simpler if the nodes are named.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise Postgres Company
On 27/07/10 13:29, Fujii Masao wrote:
On Tue, Jul 27, 2010 at 7:39 PM, Yeb Havinga<yebhavinga@gmail.com> wrote:
Fujii Masao wrote:
I noted the changes in XlogSend where instead of *caughtup = true/false it
now returns !MyWalSnd->sndrqst. That value is initialized to false in that
procedure and it cannot be changed to true during execution of that
procedure, or can it?That value is set to true in WalSndWakeup(). If WalSndWakeup() is called
after initialization of that value in XLogSend(), *caughtup is set to false.
There's some race conditions with the signaling. If another process
finishes XLOG flush and sends the signal when a walsender has just
finished one iteration of its main loop, walsender will reset
xlogsend_requested and go to sleep. It should not sleep but send the
pending WAL immediately.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 02/08/10 11:45, Fujii Masao wrote:
On Sun, Aug 1, 2010 at 3:11 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:I don't think any of this quorum stuff makes much sense without explicitly
registering standbys in the master.I'm not sure if this is a good idea. This requires users to do more
manual operations than ever when setting up the replication; assign
unique name (or ID) to each standby, register them in the master,
specify the names in each recovery.conf (or elsewhere), and remove
the registration from the master when getting rid of the standby.But this is similar to the way of MySQL replication setup, so some
people (excluding me) may be familiar with it.That would also solve the fuzziness with wal_keep_segments - if the master
knew what standbys exist, it could keep track of how far each standby has
received WAL, and keep just enough WAL for each standby to catch up.What if the registered standby stays down for a long time?
Then you risk running out of disk space. Similar to having an archive
command that fails for some reason.
That's one reason the registration should not be too automatic - there
is serious repercussions if the standby just disappears. If the standby
is a synchronous one, the master will stop committing or delay
acknowledging commits, depending on the configuration, and the master
needs to keep extra WAL around.
Of course, we can still support unregistered standbys, with the current
semantics.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Wed, Aug 4, 2010 at 12:35 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
There's some race conditions with the signaling. If another process finishes
XLOG flush and sends the signal when a walsender has just finished one
iteration of its main loop, walsender will reset xlogsend_requested and go
to sleep. It should not sleep but send the pending WAL immediately.
Yep. To avoid that race condition, xlogsend_requested should be reset to
false after sleep and before calling XLogSend(). I attached the updated
version of the patch.
Of course, the code is also available in my git repository:
git://git.postgresql.org/git/users/fujii/postgres.git
branch: wakeup-walsnd
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
change_poll_loop_in_walsender_0805.patchapplication/octet-stream; name=change_poll_loop_in_walsender_0805.patchDownload
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 55,60 ****
--- 55,61 ----
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
***************
*** 1025,1030 **** EndPrepare(GlobalTransaction gxact)
--- 1026,1038 ----
/* If we crash now, we have prepared: WAL replay will fix things */
+ /*
+ * Wake up all walsenders to send WAL up to the PREPARE record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
/* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{
***************
*** 2005,2010 **** RecordTransactionCommitPrepared(TransactionId xid,
--- 2013,2025 ----
/* Flush XLOG to disk */
XLogFlush(recptr);
+ /*
+ * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
/* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children);
***************
*** 2078,2083 **** RecordTransactionAbortPrepared(TransactionId xid,
--- 2093,2105 ----
XLogFlush(recptr);
/*
+ * Wake up all walsenders to send WAL up to the ABORT PREPARED record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
+ /*
* Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here.
*/
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 36,41 ****
--- 36,42 ----
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+ #include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
***************
*** 1068,1073 **** RecordTransactionCommit(void)
--- 1069,1081 ----
XLogFlush(XactLastRecEnd);
/*
+ * Wake up all walsenders to send WAL up to the COMMIT record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
+ /*
* Now we may update the CLOG, if we wrote a COMMIT record above
*/
if (markXidCommitted)
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 87,98 **** static XLogRecPtr sentPtr = {0, 0};
--- 87,100 ----
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t shutdown_requested = false;
+ static volatile sig_atomic_t xlogsend_requested = false;
static volatile sig_atomic_t ready_to_stop = false;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);
+ static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
***************
*** 433,439 **** WalSndLoop(void)
while (remain > 0)
{
/* Check for interrupts */
! if (got_SIGHUP || shutdown_requested || ready_to_stop)
break;
/* Sleep and check that the connection is still alive */
--- 435,442 ----
while (remain > 0)
{
/* Check for interrupts */
! if (got_SIGHUP || shutdown_requested || xlogsend_requested ||
! ready_to_stop)
break;
/* Sleep and check that the connection is still alive */
***************
*** 444,449 **** WalSndLoop(void)
--- 447,455 ----
}
}
+ if (xlogsend_requested)
+ xlogsend_requested = false;
+
/* Attempt to send the log once every loop */
if (!XLogSend(output_message, &caughtup))
break;
***************
*** 665,670 **** XLogSend(char *msgbuf, bool *caughtup)
--- 671,678 ----
Size nbytes;
WalDataMessageHeader msghdr;
+ MyWalSnd->sndrqst = false;
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
***************
*** 678,684 **** XLogSend(char *msgbuf, bool *caughtup)
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
{
! *caughtup = true;
return true;
}
--- 686,692 ----
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
{
! *caughtup = !MyWalSnd->sndrqst;
return true;
}
***************
*** 718,730 **** XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, endptr))
{
endptr = SendRqstPtr;
! *caughtup = true;
}
else
{
/* round down to page boundary. */
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
*caughtup = false;
}
nbytes = endptr.xrecoff - startptr.xrecoff;
--- 726,739 ----
if (XLByteLE(SendRqstPtr, endptr))
{
endptr = SendRqstPtr;
! *caughtup = !MyWalSnd->sndrqst;
}
else
{
/* round down to page boundary. */
endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
*caughtup = false;
+ MyWalSnd->sndrqst = true;
}
nbytes = endptr.xrecoff - startptr.xrecoff;
***************
*** 828,833 **** WalSndQuickDieHandler(SIGNAL_ARGS)
--- 837,849 ----
exit(2);
}
+ /* SIGUSR1: set flag to send WAL records */
+ static void
+ WalSndXLogSendHandler(SIGNAL_ARGS)
+ {
+ xlogsend_requested = true;
+ }
+
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
***************
*** 847,853 **** WalSndSignals(void)
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, SIG_IGN); /* not used */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
--- 863,869 ----
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
***************
*** 895,900 **** WalSndShmemInit(void)
--- 911,937 ----
}
}
+ /* Wake all walsenders up by signaling */
+ void
+ WalSndWakeup(void)
+ {
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ pid_t pid = walsnd->pid;
+
+ /* we don't need to signal to waking walsenders */
+ if (pid != 0 && !walsnd->sndrqst)
+ {
+ walsnd->sndrqst = true;
+ kill(pid, SIGUSR1);
+ }
+ }
+ }
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 12,17 ****
--- 12,21 ----
#ifndef _WALSENDER_H
#define _WALSENDER_H
+ #include "postgres.h"
+
+ #include <signal.h>
+
#include "access/xlog.h"
#include "storage/spin.h"
***************
*** 23,28 **** typedef struct WalSnd
--- 27,42 ----
pid_t pid; /* this walsender's process id, or 0 */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ /*
+ * The request for WAL sending has already been sent? If false,
+ * we set sndrqst to true and signal walsender to send WAL up to
+ * the current WAL write location immeidately. If true, since
+ * walsender is ready for sending all the currently-written WAL,
+ * we don't need to signal. This value is used to suppress
+ * redundant signaling.
+ */
+ sig_atomic_t sndrqst;
+
slock_t mutex; /* locks shared variables shown above */
} WalSnd;
***************
*** 45,49 **** extern int WalSenderMain(void);
--- 59,64 ----
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
+ extern void WalSndWakeup(void);
#endif /* _WALSENDER_H */
On Wed, Aug 4, 2010 at 10:38 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
Then you risk running out of disk space. Similar to having an archive
command that fails for some reason.That's one reason the registration should not be too automatic - there is
serious repercussions if the standby just disappears. If the standby is a
synchronous one, the master will stop committing or delay acknowledging
commits, depending on the configuration, and the master needs to keep extra
WAL around.
Umm... in addition to registration of each standby, I think we should allow
users to set the upper limit of the number of WAL files kept in pg_xlog to
avoid running out of disk space. If it exceeds the upper limit, the master
disconnects too old standbys from the cluster and removes all the WAL files
not required for current connected standbys. If you don't want any standby
to disappear unexpectedly because of the upper limit, you can set it to 0
(= no limit).
I'm thinking to make users register and unregister each standbys via SQL
functions like register_standby() and unregister_standby():
void register_standby(standby_name text, streaming_start_lsn text)
void unregister_standby(standby_name text)
Note that standby_name should be specified in recovery.conf of each
standby.
By using them we can easily specify which WAL files are unremovable because
of new standby when taking the base backup for it as follows:
SELECT register_standby('foo', pg_start_backup())
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On 05/08/10 17:14, Fujii Masao wrote:
I'm thinking to make users register and unregister each standbys via SQL
functions like register_standby() and unregister_standby():
The register/unregister facility should be accessible from the streaming
replication connection, so that you don't need to connect to any
particular database in addition to the streaming connection.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 01/08/10 15:30, Greg Stark wrote:
On Sun, Aug 1, 2010 at 7:11 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:I don't think any of this quorum stuff makes much sense without explicitly
registering standbys in the master.This doesn't have to be done manually. The streaming protocol could
include the standby sending its system id to the master. The master
could just keep a list of system ids with the last record they've been
sent and the last they've confirmed receipt, fsync, application,
whatever the protocol covers. If the same system reconnects it just
overwrites the existing data for that system id.
Systemid doesn't work for that. Systemid is assigned at initdb time, so
all the standbys have the same systemid as the master.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
I wonder if we can continue to rely on the pg_sleep() loop for sleeping
in walsender. On those platforms where interrupts don't interrupt sleep,
sending the signal is not going to promptly wake up walsender. That was
fine before, but any delay is going to be poison to synchronous
replication performance.
Thoughts?
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
Fujii Masao wrote:
On Wed, Aug 4, 2010 at 10:38 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:Then you risk running out of disk space. Similar to having an archive
command that fails for some reason.That's one reason the registration should not be too automatic - there is
serious repercussions if the standby just disappears. If the standby is a
synchronous one, the master will stop committing or delay acknowledging
commits, depending on the configuration, and the master needs to keep extra
WAL around.Umm... in addition to registration of each standby, I think we should allow
users to set the upper limit of the number of WAL files kept in pg_xlog to
avoid running out of disk space. If it exceeds the upper limit, the master
disconnects too old standbys from the cluster and removes all the WAL files
not required for current connected standbys. If you don't want any standby
to disappear unexpectedly because of the upper limit, you can set it to 0
(= no limit).I'm thinking to make users register and unregister each standbys via SQL
functions like register_standby() and unregister_standby():void register_standby(standby_name text, streaming_start_lsn text)
void unregister_standby(standby_name text)Note that standby_name should be specified in recovery.conf of each
standby.By using them we can easily specify which WAL files are unremovable because
of new standby when taking the base backup for it as follows:SELECT register_standby('foo', pg_start_backup())
I know there has been discussion about how to identify the standby
servers --- how about using the connection application_name in
recovery.conf:
primary_conninfo = 'host=localhost port=5432 application_name=slave1'
The good part is that once recovery.conf goes away because it isn't a
standby anymore, the the application_name is gone.
An even more interesting approach would be to specify the replication
mode in the application_name:
primary_conninfo = 'host=localhost port=5432 application_name=replay'
and imagine being able to view the status of standby servers from
pg_stat_activity. (Right now standby servers do not appear in
pg_stat_activity.)
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ It's impossible for everything to be true. +
On 05/08/10 13:40, Fujii Masao wrote:
On Wed, Aug 4, 2010 at 12:35 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:There's some race conditions with the signaling. If another process finishes
XLOG flush and sends the signal when a walsender has just finished one
iteration of its main loop, walsender will reset xlogsend_requested and go
to sleep. It should not sleep but send the pending WAL immediately.Yep. To avoid that race condition, xlogsend_requested should be reset to
false after sleep and before calling XLogSend(). I attached the updated
version of the patch.
There's still a small race condition: if you receive the signal just
before entering pg_usleep(), it will not be interrupted.
Of course, on platforms where signals don't interrupt sleep, the problem
is even bigger. Magnus reminded me that we can use select() instead of
pg_usleep() on such platforms, but that's still vulnerable to the race
condition.
ppoll() or pselect() could be used, but I don't think they're fully
portable. I think we'll have to resort to the self-pipe trick mentioned
in the Linux select(3) man page:
On systems that lack pselect(), reliable (and
more portable) signal trapping can be achieved using the self-pipe
trick (where a signal handler writes a byte to a pipe whose other end
is monitored by select() in the main program.)
Another idea is to use something different than Unix signals, like
ProcSendSignal/ProcWaitForSignal which are implemented using semaphores.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com