Timeout for asynchronous replication Re: Timeout and wait-forever in sync rep
On Mon, Dec 6, 2010 at 3:42 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Fri, Oct 15, 2010 at 9:41 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
The timeout doesn't oppose to 'wait-forever'. Even if you choose 'wait
-forever' (i.e., you set allow_standalone_master to false), the master
should detect the standby crash as soon as possible by using the
timeout. For example, imagine that max_wal_senders is set to one and
the master cannot detect the standby crash because of absence of the
timeout. In this case, even if you start new standby, it will not be
able to connect to the master since there is no free walsender slot.
As the result, the master actually waits forever.This occurred to me that the timeout would be required even for
asynchronous streaming replication. So, how about implementing the
replication timeout feature before synchronous replication itself?
Here is the patch. This is one of features required for synchronous
replication, so I added this into current CF as a part of synchronous
replication.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
replication_timeout_v1.patchapplication/octet-stream; name=replication_timeout_v1.patchDownload
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1988,1993 **** SET ENABLE_SEQSCAN TO OFF;
--- 1988,2010 ----
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+ <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>replication_timeout</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies the maximum time to send one WAL data message, in milliseconds.
+ If WAL sender process has sent no pending message in this much time,
+ it terminates replication. This is useful for the primary server to
+ detect the standby crash or network outage. A value of zero (the default)
+ turns this off. This parameter can only be set in the
+ <filename>postgresql.conf</> file or on the server command line.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</sect2>
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,63 ----
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_getbyte_if_available - get a byte if available without blocking
+ * pq_putbytes_if_writable - send bytes to connection if writable without blocking
+ * pq_flush_if_writable - flush pending output if writable without blocking
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
***************
*** 112,117 **** static char sock_path[MAXPGPATH];
--- 114,120 ----
static char PqSendBuffer[PQ_BUFFER_SIZE];
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
+ static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer[PQ_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
***************
*** 1153,1158 **** internal_putbytes(const char *s, size_t len)
--- 1156,1211 ----
}
/* --------------------------------
+ * pq_putbytes_if_writable - send bytes to connection (not flushed
+ * until pq_flush), if writable
+ *
+ * Returns the number of bytes written without blocking, or EOF if trouble.
+ * --------------------------------
+ */
+ int
+ pq_putbytes_if_writable(const char *s, size_t len)
+ {
+ size_t amount;
+ size_t nwritten = 0;
+
+ /* Should not be called by old-style COPY OUT */
+ Assert(!DoingCopyOut);
+ /* No-op if reentrant call */
+ if (PqCommBusy)
+ return 0;
+ PqCommBusy = true;
+
+ while (len > 0)
+ {
+ /* If buffer is full, then flush it out */
+ if (PqSendPointer >= PQ_BUFFER_SIZE)
+ {
+ int r;
+
+ r = pq_flush_if_writable();
+ if (r == 0)
+ break;
+ if (r == EOF)
+ {
+ PqCommBusy = false;
+ return r;
+ }
+ }
+ amount = PQ_BUFFER_SIZE - PqSendPointer;
+ if (amount > len)
+ amount = len;
+ memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ PqSendPointer += amount;
+ s += amount;
+ len -= amount;
+ nwritten += amount;
+ }
+
+ PqCommBusy = false;
+ return (int) nwritten;
+ }
+
+ /* --------------------------------
* pq_flush - flush pending output
*
* returns 0 if OK, EOF if trouble
***************
*** 1224,1229 **** internal_flush(void)
--- 1277,1396 ----
return 0;
}
+ /* --------------------------------
+ * pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 1 if OK, 0 if pending output cannot be written without blocking,
+ * or EOF if trouble.
+ * --------------------------------
+ */
+ int
+ pq_flush_if_writable(void)
+ {
+ static int last_reported_send_errno = 0;
+
+ char *bufptr = PqSendBuffer + PqSendStart;
+ char *bufend = PqSendBuffer + PqSendPointer;
+
+ while (bufptr < bufend)
+ {
+ int r;
+
+ /* Temporarily put the socket into non-blocking mode */
+ #ifdef WIN32
+ pgwin32_noblock = 1;
+ #else
+ if (!pg_set_noblock(MyProcPort->sock))
+ ereport(ERROR,
+ (errmsg("could not set socket to non-blocking mode: %m")));
+ #endif
+ MyProcPort->noblock = true;
+ PG_TRY();
+ {
+ r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+
+ if (r < 0)
+ {
+ /*
+ * Ok if no data writable without blocking or interrupted (though
+ * EINTR really shouldn't happen with a non-blocking socket).
+ * Report other errors.
+ */
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ r = 0;
+ else
+ {
+ if (errno != last_reported_send_errno)
+ {
+ /*
+ * Careful: an ereport() that tries to write to the
+ * client would cause recursion to here, leading to
+ * stack overflow and core dump! This message must
+ * go *only* to the postmaster log.
+ *
+ * If a client disconnects while we're in the midst
+ * of output, we might write quite a bit of data before
+ * we get to a safe query abort point. So, suppress
+ * duplicate log messages.
+ */
+ last_reported_send_errno = errno;
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not send data to client: %m")));
+ }
+
+ /*
+ * We drop the buffered data anyway so that processing can
+ * continue, even though we'll probably quit soon.
+ */
+ PqSendStart = PqSendPointer = 0;
+ r = EOF;
+ }
+ }
+ else if (r == 0)
+ {
+ /* EOF detected */
+ r = EOF;
+ }
+ }
+ PG_CATCH();
+ {
+ /*
+ * The rest of the backend code assumes the socket is in blocking
+ * mode, so treat failure as FATAL.
+ */
+ #ifdef WIN32
+ pgwin32_noblock = 0;
+ #else
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(FATAL,
+ (errmsg("could not set socket to blocking mode: %m")));
+ #endif
+ MyProcPort->noblock = false;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ #ifdef WIN32
+ pgwin32_noblock = 0;
+ #else
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(FATAL,
+ (errmsg("could not set socket to blocking mode: %m")));
+ #endif
+ MyProcPort->noblock = false;
+
+ if (r == 0 || r == EOF)
+ return r;
+
+ last_reported_send_errno = 0; /* reset after any successful send */
+ bufptr += r;
+ PqSendStart += r;
+ }
+
+ PqSendStart = PqSendPointer = 0;
+ return 1;
+ }
+
/* --------------------------------
* Message-level I/O routines begin here.
*** a/src/backend/port/unix_latch.c
--- b/src/backend/port/unix_latch.c
***************
*** 193,211 **** DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
}
/*
* Like WaitLatch, but will also return when there's data available in
! * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
! * was set, or 2 if the scoket became readable.
*/
int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
{
struct timeval tv, *tvp = NULL;
fd_set input_mask;
int rc;
int result = 0;
--- 193,214 ----
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
/*
* Like WaitLatch, but will also return when there's data available in
! * 'sock' for reading or writing. Returns 0 if timeout was reached,
! * 1 if the latch was set, 2 if the scoket became readable, or 3 if
! * the socket became writable.
*/
int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
! bool forWrite, long timeout)
{
struct timeval tv, *tvp = NULL;
fd_set input_mask;
+ fd_set output_mask;
int rc;
int result = 0;
***************
*** 241,254 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd;
! if (sock != PGINVALID_SOCKET)
{
FD_SET(sock, &input_mask);
if (sock > hifd)
hifd = sock;
}
! rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
if (rc < 0)
{
if (errno == EINTR)
--- 244,265 ----
FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd;
! if (sock != PGINVALID_SOCKET && forRead)
{
FD_SET(sock, &input_mask);
if (sock > hifd)
hifd = sock;
}
! FD_ZERO(&output_mask);
! if (sock != PGINVALID_SOCKET && forWrite)
! {
! FD_SET(sock, &output_mask);
! if (sock > hifd)
! hifd = sock;
! }
!
! rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
if (rc < 0)
{
if (errno == EINTR)
***************
*** 263,273 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
result = 0;
break;
}
! if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
{
result = 2;
break; /* data available in socket */
}
}
waiting = false;
--- 274,291 ----
result = 0;
break;
}
! if (sock != PGINVALID_SOCKET && forRead &&
! FD_ISSET(sock, &input_mask))
{
result = 2;
break; /* data available in socket */
}
+ if (sock != PGINVALID_SOCKET && forWrite &&
+ FD_ISSET(sock, &output_mask))
+ {
+ result = 3;
+ break; /* data writable in socket */
+ }
}
waiting = false;
*** a/src/backend/port/win32/socket.c
--- b/src/backend/port/win32/socket.c
***************
*** 14,20 ****
#include "postgres.h"
/*
! * Indicate if pgwin32_recv() should operate in non-blocking mode.
*
* Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must
--- 14,21 ----
#include "postgres.h"
/*
! * Indicate if pgwin32_recv() and pgwin32_send() should operate
! * in non-blocking mode.
*
* Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must
***************
*** 399,404 **** pgwin32_send(SOCKET s, char *buf, int len, int flags)
--- 400,415 ----
return -1;
}
+ if (pgwin32_noblock)
+ {
+ /*
+ * No data sent, and we are in "emulated non-blocking mode", so
+ * return indicating that we'd block if we were to continue.
+ */
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
*** a/src/backend/port/win32_latch.c
--- b/src/backend/port/win32_latch.c
***************
*** 85,95 **** DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
}
int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
{
DWORD rc;
HANDLE events[3];
--- 85,96 ----
bool
WaitLatch(volatile Latch *latch, long timeout)
{
! return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
! bool forWrite, long timeout)
{
DWORD rc;
HANDLE events[3];
***************
*** 103,112 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
events[0] = latchevent;
events[1] = pgwin32_signal_event;
numevents = 2;
! if (sock != PGINVALID_SOCKET)
{
sockevent = WSACreateEvent();
! WSAEventSelect(sock, sockevent, FD_READ);
events[numevents++] = sockevent;
}
--- 104,120 ----
events[0] = latchevent;
events[1] = pgwin32_signal_event;
numevents = 2;
! if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
+ int flags = 0;
+
+ if (forRead)
+ flags |= FD_READ;
+ if (forWrite)
+ flags |= FD_WRITE;
+
sockevent = WSACreateEvent();
! WSAEventSelect(sock, sockevent, flags);
events[numevents++] = sockevent;
}
***************
*** 139,146 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2)
{
Assert(sock != PGINVALID_SOCKET);
! result = 2;
break;
}
else if (rc != WAIT_OBJECT_0)
--- 147,165 ----
pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2)
{
+ WSANETWORKEVENTS resEvents;
+
Assert(sock != PGINVALID_SOCKET);
!
! ZeroMemory(&resEvents, sizeof(resEvents));
! if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
! ereport(FATAL,
! (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
!
! if (forRead && resEvents.lNetworkEvents & FD_READ)
! result = 2;
! if (forWrite && resEvents.lNetworkEvents & FD_WRITE)
! result = 3;
break;
}
else if (rc != WAIT_OBJECT_0)
***************
*** 148,154 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
}
/* Clean up the handle we created for the socket */
! if (sock != PGINVALID_SOCKET)
{
WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent);
--- 167,173 ----
}
/* Clean up the handle we created for the socket */
! if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 66,71 **** bool am_walsender = false; /* Am I a walsender process ? */
--- 66,83 ----
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
+ int replication_timeout = 0; /* maximum time to send one WAL data message */
+
+ /*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+ static char *WalSndOutBuffer;
+ static int WalSndOutHead; /* head of pending output */
+ static int WalSndOutTail; /* tail of pending output */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 100,108 **** static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
! static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
/* Main entry point for walsender process */
int
--- 112,125 ----
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
! static bool XLogSend(bool *caughtup, bool *pending);
static void CheckClosedConnection(void);
+ static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
+ Latch *latch);
+ static void WakeupWalSndWaiters(XLogRecPtr record);
+ static XLogRecPtr GetOldestAckdPtr(void);
+
/* Main entry point for walsender process */
int
***************
*** 376,390 **** CheckClosedConnection(void)
static int
WalSndLoop(void)
{
! char *output_message;
bool caughtup = false;
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
* enough for maximum-sized messages.
*/
! output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
/* Loop forever, unless we get an error */
for (;;)
--- 393,409 ----
static int
WalSndLoop(void)
{
! StringInfoData input_message;
bool caughtup = false;
+ bool pending = false;
/*
* Allocate buffer that will be used for each output message. We do this
* just once to reduce palloc overhead. The buffer must be made large
* enough for maximum-sized messages.
*/
! WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
! WalSndOutHead = WalSndOutTail = 0;
/* Loop forever, unless we get an error */
for (;;)
***************
*** 409,417 **** WalSndLoop(void)
*/
if (ready_to_stop)
{
! if (!XLogSend(output_message, &caughtup))
break;
! if (caughtup)
shutdown_requested = true;
}
--- 428,436 ----
*/
if (ready_to_stop)
{
! if (!XLogSend(&caughtup, &pending))
break;
! if (caughtup && !pending)
shutdown_requested = true;
}
***************
*** 426,435 **** WalSndLoop(void)
}
/*
! * If we had sent all accumulated WAL in last round, nap for the
! * configured time before retrying.
*/
! if (caughtup)
{
/*
* Even if we wrote all the WAL that was available when we started
--- 445,455 ----
}
/*
! * If we had sent all accumulated WAL in last round or could not
! * flush pending WAL in output buffer because the socket was not
! * writable, nap for the configured time before retrying.
*/
! if (caughtup || pending)
{
/*
* Even if we wrote all the WAL that was available when we started
***************
*** 440,458 **** WalSndLoop(void)
*/
ResetLatch(&MyWalSnd->latch);
! if (!XLogSend(output_message, &caughtup))
break;
! if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
{
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
/* Sleep */
! WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! WalSndDelay * 1000L);
}
/* Check if the connection was closed */
--- 460,507 ----
*/
ResetLatch(&MyWalSnd->latch);
! if (!XLogSend(&caughtup, &pending))
break;
! if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
! !shutdown_requested)
{
+ bool check_timeout;
+ long sleeptime;
+ int res;
+
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
+ /*
+ * Check for replication timeout if it's enabled and we need to
+ * wait for the socket to be writable to flush pending WAL in
+ * output buffer.
+ */
+ check_timeout = replication_timeout > 0 && pending;
+ if (check_timeout)
+ sleeptime = replication_timeout;
+ else
+ sleeptime = WalSndDelay;
+
/* Sleep */
! res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! true, (WalSndOutTail > 0),
! sleeptime * 1000L);
!
! if (res == 0 && check_timeout)
! {
! /*
! * Since typically expiration of replication timeout means
! * communication problem, we don't send the error message
! * to the standby.
! */
! ereport(COMMERROR,
! (errmsg("terminating walsender process due to replication timeout")));
! break;
! }
}
/* Check if the connection was closed */
***************
*** 461,467 **** WalSndLoop(void)
else
{
/* Attempt to send the log once every loop */
! if (!XLogSend(output_message, &caughtup))
break;
}
}
--- 510,516 ----
else
{
/* Attempt to send the log once every loop */
! if (!XLogSend(&caughtup, &pending))
break;
}
}
***************
*** 670,693 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it.
*
- * msgbuf is a work area in which the output message is constructed. It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
* Returns true if OK, false if trouble.
*/
static bool
! XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
! XLogRecPtr endptr;
Size nbytes;
WalDataMessageHeader msghdr;
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
--- 719,766 ----
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it.
*
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
* Returns true if OK, false if trouble.
*/
static bool
! XLogSend(bool *caughtup, bool *pending)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
! static XLogRecPtr endptr;
Size nbytes;
+ uint32 n32;
+ int res;
WalDataMessageHeader msghdr;
+ /* Attempt to flush pending WAL in output buffer */
+ if (*pending)
+ {
+ if (WalSndOutHead != WalSndOutTail)
+ {
+ res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+ WalSndOutTail - WalSndOutHead);
+ if (res == EOF)
+ return false;
+ WalSndOutHead += res;
+ if (WalSndOutHead != WalSndOutTail)
+ return true;
+ }
+
+ res = pq_flush_if_writable();
+ if (res == EOF)
+ return false;
+ if (res == 0)
+ return true;
+
+ goto updt;
+ }
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
***************
*** 756,768 **** XLogSend(char *msgbuf, bool *caughtup)
/*
* OK to read and send the slice.
*/
! msgbuf[0] = 'w';
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
! XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
--- 829,847 ----
/*
* OK to read and send the slice.
*/
! WalSndOutBuffer[0] = 'd';
! WalSndOutBuffer[5] = 'w';
! WalSndOutHead = 0;
! WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
!
! n32 = htonl((uint32) WalSndOutTail - 1);
! memcpy(WalSndOutBuffer + 1, &n32, 4);
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
! XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
***************
*** 772,784 **** XLogSend(char *msgbuf, bool *caughtup)
msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp();
! memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
! pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
/* Flush pending output to the client */
! if (pq_flush())
return false;
sentPtr = endptr;
--- 851,884 ----
msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp();
! memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
! res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
! if (res == EOF)
! return false;
!
! WalSndOutHead = res;
! if (WalSndOutHead != WalSndOutTail)
! {
! *caughtup = false;
! *pending = true;
! return true;
! }
/* Flush pending output to the client */
! res = pq_flush_if_writable();
! if (res == EOF)
return false;
+ if (res == 0)
+ {
+ *caughtup = false;
+ *pending = true;
+ return true;
+ }
+
+ updt:
+ WalSndOutHead = WalSndOutTail = 0;
+ *pending = false;
sentPtr = endptr;
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1800,1805 **** static struct config_int ConfigureNamesInt[] =
--- 1800,1815 ----
},
{
+ {"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+ gettext_noop("Sets the maximum time to wait for WAL replication."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &replication_timeout,
+ 0, 0, INT_MAX, NULL, NULL
+ },
+
+ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
"flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 191,196 ****
--- 191,197 ----
#wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
+ #replication_timeout = 0 # in milliseconds, 0 is disabled
# - Standby Servers -
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 59,65 **** extern int pq_getbyte(void);
--- 59,67 ----
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+ extern int pq_putbytes_if_writable(const char *s, size_t len);
extern int pq_flush(void);
+ extern int pq_flush_if_writable(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
extern void pq_startcopyout(void);
extern void pq_endcopyout(bool errorAbort);
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 47,52 **** extern bool am_walsender;
--- 47,53 ----
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
+ extern int replication_timeout;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
*** a/src/include/storage/latch.h
--- b/src/include/storage/latch.h
***************
*** 40,46 **** extern void OwnLatch(volatile Latch *latch);
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
--- 40,46 ----
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! bool forRead, bool forWrite, long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
On 06.12.2010 08:51, Fujii Masao wrote:
On Mon, Dec 6, 2010 at 3:42 PM, Fujii Masao<masao.fujii@gmail.com> wrote:
On Fri, Oct 15, 2010 at 9:41 PM, Fujii Masao<masao.fujii@gmail.com> wrote:
The timeout doesn't oppose to 'wait-forever'. Even if you choose 'wait
-forever' (i.e., you set allow_standalone_master to false), the master
should detect the standby crash as soon as possible by using the
timeout. For example, imagine that max_wal_senders is set to one and
the master cannot detect the standby crash because of absence of the
timeout. In this case, even if you start new standby, it will not be
able to connect to the master since there is no free walsender slot.
As the result, the master actually waits forever.This occurred to me that the timeout would be required even for
asynchronous streaming replication. So, how about implementing the
replication timeout feature before synchronous replication itself?Here is the patch. This is one of features required for synchronous
replication, so I added this into current CF as a part of synchronous
replication.
Hmm, that's actually a quite different timeout than what's required for
synchronous replication. In synchronous replication, you need to get an
acknowledgment within a timeout. This patch only puts a timeout on how
long we wait to have enough room in the TCP send buffer. That doesn't
seem all that useful.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Mon, Dec 6, 2010 at 9:54 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
This occurred to me that the timeout would be required even for
asynchronous streaming replication. So, how about implementing the
replication timeout feature before synchronous replication itself?Here is the patch. This is one of features required for synchronous
replication, so I added this into current CF as a part of synchronous
replication.Hmm, that's actually a quite different timeout than what's required for
synchronous replication. In synchronous replication, you need to get an
acknowledgment within a timeout. This patch only puts a timeout on how long
we wait to have enough room in the TCP send buffer. That doesn't seem all
that useful.
Yeah. If we rely on the TCP send buffer filling up, then the amount
of time the master takes to notice a dead standby is going to be hard
for the user to predict. I think the standby ought to send some sort
of heartbeat and the master should declare the standby dead if it
doesn't see a heartbeat soon enough. Maybe the heartbeat could even
include the receive/fsync/replay LSNs, so that sync rep can use the
same machinery but with more aggressive policies about when they must
be sent.
I also can't help noticing that this approach requires drilling a hole
through the abstraction stack. We just invented latches; if the API
is going to have to change every time someone wants to implement a
feature, we've built ourselves an awfully porous abstraction layer.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Mon, Dec 6, 2010 at 11:54 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
Hmm, that's actually a quite different timeout than what's required for
synchronous replication. In synchronous replication, you need to get an
acknowledgment within a timeout. This patch only puts a timeout on how long
we wait to have enough room in the TCP send buffer. That doesn't seem all
that useful.
Yeah, I'm planning to implement that timeout for synchronous replication later.
Since I thought that we should implement the timeout for *asynchronous*
replication first and then extend it for synchronous replication, I created this
patch. This kind of timeout is required for asynchronous replication since
since there is no acknowledgement from the standby in it.
Most part of the patch implements the non-blocking send function and
changes walsender so that it uses that function instead of existing blocking
one. This will be infrastructure for the timeout for synchronous replication.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Dec 7, 2010 at 12:20 AM, Robert Haas <robertmhaas@gmail.com> wrote:
Yeah. If we rely on the TCP send buffer filling up, then the amount
of time the master takes to notice a dead standby is going to be hard
for the user to predict. I think the standby ought to send some sort
of heartbeat and the master should declare the standby dead if it
doesn't see a heartbeat soon enough. Maybe the heartbeat could even
include the receive/fsync/replay LSNs, so that sync rep can use the
same machinery but with more aggressive policies about when they must
be sent.
OK. How about keepalive-like parameters and behaviors?
replication_keepalives_idle
replication_keepalives_interval
replication_keepalives_count
The master sends the keepalive packet if replication_keepalives_idle
elapsed after receiving the last ACK packet including the receive/
fsync/replay LSNs from the standby. OTOH, the standby sends the
ACK packet back to the master as soon as receiving the keepalive
packet.
If the master could not receive the ACK packet for
replication_keepalives_interval, it repeats sending the keepalive
packet and receiving the ACK replication_keepalives_count -1
times. If no ACK packet has finally arrived, the master thinks the
standby has been dead.
One obvious merit against my original proposal is that the master
can notice the death of the standby even when there are no WAL
records sendable. One demerit is that the standby needs to send
some packets even in asynchronous replication.
Thought?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Mon, Dec 20, 2010 at 3:17 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Tue, Dec 7, 2010 at 12:20 AM, Robert Haas <robertmhaas@gmail.com> wrote:
Yeah. If we rely on the TCP send buffer filling up, then the amount
of time the master takes to notice a dead standby is going to be hard
for the user to predict. I think the standby ought to send some sort
of heartbeat and the master should declare the standby dead if it
doesn't see a heartbeat soon enough. Maybe the heartbeat could even
include the receive/fsync/replay LSNs, so that sync rep can use the
same machinery but with more aggressive policies about when they must
be sent.OK. How about keepalive-like parameters and behaviors?
replication_keepalives_idle
replication_keepalives_interval
replication_keepalives_countThe master sends the keepalive packet if replication_keepalives_idle
elapsed after receiving the last ACK packet including the receive/
fsync/replay LSNs from the standby. OTOH, the standby sends the
ACK packet back to the master as soon as receiving the keepalive
packet.If the master could not receive the ACK packet for
replication_keepalives_interval, it repeats sending the keepalive
packet and receiving the ACK replication_keepalives_count -1
times. If no ACK packet has finally arrived, the master thinks the
standby has been dead.
This doesn't really make sense, because you're connecting over a TCP
connection. Once you send the first keepalive, TCP will keep retrying
in some way that we have no control over. If those packets aren't
getting through, adding more data to what has to be transmitted seems
unlikely to do anything useful. I think the parameters we can
usefully set are:
- how long does the master wait before sending a keepalive request?
- how long does the master wait after sending a keepalive before
declaring the slave dead and closing the connection?
But this can be further simplified. The slave doesn't really need the
master to prompt it to send acknowledgments. It only needs to send
them sufficiently often. As part of the start-replication sequence,
let's have the master tell the slave "send me an acknowledgment at
least every N seconds". And then the slave must do that. The master
then has some value K > N, such that if no acknowledgment is received
after K seconds, the connection is disconnected.
The only reason to have the master send explicit keepalive requests
(vs. just telling the client the interval) is if the master might
request them for some reason other than timer expiration. Since the
main point of this is to detect the situation where the slave has e.g.
power cycled so that the connection is gone but the master doesn't
know it, you could imagine a system where, when a new replication
connection is received, we request keepalives on all of the existing
connections to see if any of them are defunct. But I don't really
think it needs to be quite that complicated.
Another consideration is that you could configure the
keepalive-frequency on the slave and the declare-dead-time on the
master. Then the master wouldn't need to tell the slave the
keepalive-frequency at replication start-up time. But that might also
increase the chances of incompatible settings (e.g. slave's keepalive
frequency is >= master's declare-dead-time), which would result in a
lot of unnecessary reconnects. If both parameters are configured on
the master, then we can enforce that declare-dead-time >
keepalive-frequency.
So I suggest:
replication_keepalive_time - how often the slave is instructed to send
acknowledgments when idle
replication_idle_timeout - the period of inactivity after which the
master closes the connection to the slave
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Mon, Dec 20, 2010 at 3:17 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
OK. How about keepalive-like parameters and behaviors?
replication_keepalives_idle
replication_keepalives_interval
replication_keepalives_countThe master sends the keepalive packet if replication_keepalives_idle
elapsed after receiving the last ACK packet including the receive/
fsync/replay LSNs from the standby. OTOH, the standby sends the
ACK packet back to the master as soon as receiving the keepalive
packet.If the master could not receive the ACK packet for
replication_keepalives_interval, it repeats sending the keepalive
packet and receiving the ACK replication_keepalives_count -1
times. If no ACK packet has finally arrived, the master thinks the
standby has been dead.
I thought we were using a single TCP session per standby/slave? So
adding another "KEEPALIVE" into the local buffer side of the TCP
stream isn't going to help a "stuck" one arrive earlier.
You really only have a few situations:
1) Network problems. Stuffing more stuff into the local buffers isn't
gonig to help get packets from the remote that it would like to send
(I say like to send, because network problems could be on either/both
directions, the remote may or may not have seen our keepalive
requrest)
2) The remote is getting them, and is swamped. It's not going to get
processing our 2nd keepalive any sooner than processing our 1st.
If a walreceiver reads a "keepalive" request, Just declare that it
must reply immediately. Then the master config can trust that a
keepalive should be replied to pretty quickly if networks is ok. TCP
will make it get there "eventually" if it's a bad network, and the
admins have set it be very network tolerant.
The ACK might report that the salve is hopelessly behind on
fsyncing/applying it's WAL, but that's good too. At least then the
ACK comes back, and the master knows the slave is still churning away
on the last batch of WAL, and can decide if it wants to think the
slave is too far behind and boot it out.
--
Aidan Van Dyk Create like a god,
aidan@highrise.ca command like a king,
http://www.highrise.ca/ work like a slave.