Streaming replication and message type header
On Tue, Jan 19, 2010 at 12:20 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
Tom Lane wrote:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
Simon Riggs wrote:
Do we need a new record type for that, is there a handy record type to
bounce from?After starting streaming, slices of WAL are sent as CopyData messages.
The CopyData payload begins with an XLogRecPtr, followed by the WAL
data. That payload format needs to be extended with a 'message type'
field and a new message type for the timestamps need to be added.Whether or not anyone bothers with the timestamp message, I think adding
a message type header is a Must Fix item. A protocol with no provision
for extension is certainly going to bite us in the rear before long.Agreed a message type header is a good idea, although we don't expect
streaming replication and the protocol to work across different major
versions anyway.
The attached patch adds a message type header into the payload in
CopyData message sent from walsender to walreceiver, to make the
replication protocol more extensible.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
add_msgtype_20100203.patchtext/x-patch; charset=US-ASCII; name=add_msgtype_20100203.patchDownload
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 4179,4190 **** The commands accepted in walsender mode are:
already been recycled. On success, server responds with a
CopyOutResponse message, and backend starts to stream WAL as CopyData
messages.
</para>
<para>
! The payload in each CopyData message consists of an XLogRecPtr,
! indicating the starting point of the WAL in the message, immediately
! followed by the WAL data itself.
</para>
<para>
A single WAL record is never split across two CopyData messages. When
--- 4179,4243 ----
already been recycled. On success, server responds with a
CopyOutResponse message, and backend starts to stream WAL as CopyData
messages.
+ The payload in CopyData message consists of the following format.
</para>
<para>
! <variablelist>
! <varlistentry>
! <term>
! XLogData (B)
! </term>
! <listitem>
! <para>
! <variablelist>
! <varlistentry>
! <term>
! Byte1('w')
! </term>
! <listitem>
! <para>
! Identifies the message as WAL data.
! </para>
! </listitem>
! </varlistentry>
! <varlistentry>
! <term>
! Int32
! </term>
! <listitem>
! <para>
! The log file number of the LSN, indicating the starting point of
! the WAL in the message.
! </para>
! </listitem>
! </varlistentry>
! <varlistentry>
! <term>
! Int32
! </term>
! <listitem>
! <para>
! The byte offset of the LSN, indicating the starting point of
! the WAL in the message.
! </para>
! </listitem>
! </varlistentry>
! <varlistentry>
! <term>
! Byte<replaceable>n</replaceable>
! </term>
! <listitem>
! <para>
! Data that forms part of WAL data stream.
! </para>
! </listitem>
! </varlistentry>
! </variablelist>
! </para>
! </listitem>
! </varlistentry>
! </variablelist>
</para>
<para>
A single WAL record is never split across two CopyData messages. When
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 48,55 **** static char *recvBuf = NULL;
/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
! static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
! int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
--- 48,55 ----
/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
! static bool libpqrcv_receive(int timeout, unsigned char *type,
! char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
***************
*** 236,248 **** libpqrcv_disconnect(void)
}
/*
! * Receive any WAL records available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
! * True if data was received. *recptr, *buffer and *len are set to
! * the WAL location of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
--- 236,248 ----
}
/*
! * Receive any messages available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
! * True if data was received. *type, *buffer and *len are set to
! * the type of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
***************
*** 254,260 **** libpqrcv_disconnect(void)
* ereports on error.
*/
static bool
! libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
{
int rawlen;
--- 254,260 ----
* ereports on error.
*/
static bool
! libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
{
int rawlen;
***************
*** 275,288 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
! (errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
}
justconnected = false;
/* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
! if (rawlen == 0) /* no records available yet, then return */
return false;
if (rawlen == -1) /* end-of-streaming or error */
{
--- 275,288 ----
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
! (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
}
justconnected = false;
/* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
! if (rawlen == 0) /* no data available yet, then return */
return false;
if (rawlen == -1) /* end-of-streaming or error */
{
***************
*** 297,318 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
}
PQclear(res);
ereport(ERROR,
! (errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
! (errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
! if (rawlen < sizeof(XLogRecPtr))
! ereport(ERROR,
! (errmsg("invalid WAL message received from primary")));
!
! /* Return received WAL records to caller */
! *recptr = *((XLogRecPtr *) recvBuf);
! *buffer = recvBuf + sizeof(XLogRecPtr);
! *len = rawlen - sizeof(XLogRecPtr);
return true;
}
--- 297,314 ----
}
PQclear(res);
ereport(ERROR,
! (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
! (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
! /* Return received messages to caller */
! *type = *((unsigned char *) recvBuf);
! *buffer = recvBuf + sizeof(*type);
! *len = rawlen - sizeof(*type);
return true;
}
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 135,140 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
--- 135,141 ----
/* Prototypes for private functions */
static void WalRcvDie(int code, Datum arg);
+ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
***************
*** 258,264 **** WalReceiverMain(void)
/* Loop until end-of-streaming or error */
for (;;)
{
! XLogRecPtr recptr;
char *buf;
int len;
--- 259,265 ----
/* Loop until end-of-streaming or error */
for (;;)
{
! unsigned char type;
char *buf;
int len;
***************
*** 287,303 **** WalReceiverMain(void)
}
/* Wait a while for data to arrive */
! if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
{
! /* Write received WAL records to disk */
! XLogWalRcvWrite(buf, len, recptr);
! /* Receive any more WAL records we can without sleeping */
! while(walrcv_receive(0, &recptr, &buf, &len))
! XLogWalRcvWrite(buf, len, recptr);
/*
! * Now that we've written some records, flush them to disk and
* let the startup process know about them.
*/
XLogWalRcvFlush();
--- 288,304 ----
}
/* Wait a while for data to arrive */
! if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
! /* Accept the received data, and process it */
! XLogWalRcvProcessMsg(type, buf, len);
! /* Receive any more data we can without sleeping */
! while(walrcv_receive(0, &type, &buf, &len))
! XLogWalRcvProcessMsg(type, buf, len);
/*
! * If we've written some records, flush them to disk and
* let the startup process know about them.
*/
XLogWalRcvFlush();
***************
*** 376,381 **** WalRcvQuickDieHandler(SIGNAL_ARGS)
--- 377,412 ----
}
/*
+ * Accept the message from XLOG stream, and process it.
+ */
+ static void
+ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+ {
+ switch (type)
+ {
+ case 'w': /* WAL records */
+ {
+ XLogRecPtr recptr;
+
+ if (len < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ recptr = *((XLogRecPtr *) buf);
+ buf += sizeof(XLogRecPtr);
+ len -= sizeof(XLogRecPtr);
+ XLogWalRcvWrite(buf, len, recptr);
+ break;
+ }
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication message type %d",
+ type)));
+ }
+ }
+
+ /*
* Write XLOG data to disk.
*/
static void
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 659,664 **** XLogSend(StringInfo outMsg)
--- 659,665 ----
* have the same byte order. If they have different byte order, we
* don't reach here.
*/
+ pq_sendbyte(outMsg, 'w');
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
if (endptr.xlogid != startptr.xlogid)
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 66,72 **** extern WalRcvData *WalRcv;
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
! typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
--- 66,73 ----
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
! typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
! char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
Fujii Masao wrote:
On Tue, Jan 19, 2010 at 12:20 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:Tom Lane wrote:
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:
Simon Riggs wrote:
Do we need a new record type for that, is there a handy record type to
bounce from?After starting streaming, slices of WAL are sent as CopyData messages.
The CopyData payload begins with an XLogRecPtr, followed by the WAL
data. That payload format needs to be extended with a 'message type'
field and a new message type for the timestamps need to be added.Whether or not anyone bothers with the timestamp message, I think adding
a message type header is a Must Fix item. A protocol with no provision
for extension is certainly going to bite us in the rear before long.Agreed a message type header is a good idea, although we don't expect
streaming replication and the protocol to work across different major
versions anyway.The attached patch adds a message type header into the payload in
CopyData message sent from walsender to walreceiver, to make the
replication protocol more extensible.
Ok, commmitted.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com