libpq changes for synchronous replication

Started by Fujii Masaoover 15 years ago22 messages
#1Fujii Masao
masao.fujii@gmail.com
1 attachment(s)

On Fri, Sep 17, 2010 at 5:09 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

That said, there's a few small things that can be progressed regardless of
the details of synchronous replication. There's the changes to trigger
failover with a signal, and it seems that we'll need some libpq changes to
allow acknowledgments to be sent back to the master regardless of the rest
of the design. We can discuss those in separate threads in parallel.

Agreed. The attached patch introduces new function which is used
to send ACK back from walreceiver. The function sends a message
to XLOG stream by calling PQputCopyData. Also I allowed PQputCopyData
to be called even during COPY OUT.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_send_v1.patchapplication/octet-stream; name=libpqrcv_send_v1.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 50,55 **** static char *recvBuf = NULL;
--- 50,56 ----
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
--- 65,75 ----
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_send != NULL || walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 400,417 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message to XLOG stream.
+  *
+  * ereports on error.
+  */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ 	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ 		PQflush(streamConn))
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to WAL stream: %s",
+ 						PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool		am_walreceiver;
--- 57,63 ----
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
***************
*** 247,253 **** WalReceiverMain(void)
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 248,254 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 84,89 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
--- 84,92 ----
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 2002,2007 **** PQnotifies(PGconn *conn)
--- 2002,2010 ----
  /*
   * PQputCopyData - send some data to the backend during COPY IN
   *
+  * This function can be called by walreceiver even during COPY OUT
+  * to send a message to the master.
+  *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
   */
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2013,2020 ----
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN &&
! 		conn->asyncStatus != PGASYNC_COPY_OUT)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
#2Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Fujii Masao (#1)
Re: libpq changes for synchronous replication

On 17/09/10 12:22, Fujii Masao wrote:

On Fri, Sep 17, 2010 at 5:09 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

That said, there's a few small things that can be progressed regardless of
the details of synchronous replication. There's the changes to trigger
failover with a signal, and it seems that we'll need some libpq changes to
allow acknowledgments to be sent back to the master regardless of the rest
of the design. We can discuss those in separate threads in parallel.

Agreed. The attached patch introduces new function which is used
to send ACK back from walreceiver. The function sends a message
to XLOG stream by calling PQputCopyData. Also I allowed PQputCopyData
to be called even during COPY OUT.

Oh, that's simple.

It doesn't feel right to always accept PQputCopyData in COPY OUT mode,
though. IMHO there should be a new COPY IN+OUT mode.

It should be pretty safe to add a CopyInOutResponse message to the
protocol without a protocol version bump. Thoughts on that?

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#3Tom Lane
tgl@sss.pgh.pa.us
In reply to: Heikki Linnakangas (#2)
Re: libpq changes for synchronous replication

Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:

It doesn't feel right to always accept PQputCopyData in COPY OUT mode,
though. IMHO there should be a new COPY IN+OUT mode.

Yeah, I was going to make the same complaint. Breaking basic
error-checking functionality in libpq is not very acceptable.

It should be pretty safe to add a CopyInOutResponse message to the
protocol without a protocol version bump. Thoughts on that?

Not if it's something that an existing application might see. If
it can only happen in replication mode it's OK.

Personally I think this demonstrates that piggybacking replication
data transfer on the COPY protocol was a bad design to start with.
It's probably time to split them apart.

regards, tom lane

#4Simon Riggs
simon@2ndQuadrant.com
In reply to: Fujii Masao (#1)
Re: libpq changes for synchronous replication

On Fri, 2010-09-17 at 18:22 +0900, Fujii Masao wrote:

On Fri, Sep 17, 2010 at 5:09 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

That said, there's a few small things that can be progressed regardless of
the details of synchronous replication. There's the changes to trigger
failover with a signal, and it seems that we'll need some libpq changes to
allow acknowledgments to be sent back to the master regardless of the rest
of the design. We can discuss those in separate threads in parallel.

Agreed. The attached patch introduces new function which is used
to send ACK back from walreceiver. The function sends a message
to XLOG stream by calling PQputCopyData. Also I allowed PQputCopyData
to be called even during COPY OUT.

Does this differ from Zoltan's code?

--
Simon Riggs www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Training and Services

#5Fujii Masao
masao.fujii@gmail.com
In reply to: Heikki Linnakangas (#2)
Re: libpq changes for synchronous replication

On Mon, Sep 20, 2010 at 11:55 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

It doesn't feel right to always accept PQputCopyData in COPY OUT mode,
though. IMHO there should be a new COPY IN+OUT mode.

It should be pretty safe to add a CopyInOutResponse message to the protocol
without a protocol version bump. Thoughts on that?

Or we check "replication" field in PGConn, and accept PQputCopyData in
COPY OUT mode only if it indicates TRUE? This is much simpler, but maybe
not versatile..

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#6Boszormenyi Zoltan
zb@cybertec.at
In reply to: Tom Lane (#3)
Re: libpq changes for synchronous replication

Hi,

Tom Lane �rta:

Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:

It doesn't feel right to always accept PQputCopyData in COPY OUT mode,
though. IMHO there should be a new COPY IN+OUT mode.

Yeah, I was going to make the same complaint. Breaking basic
error-checking functionality in libpq is not very acceptable.

if you looked at my sync replication patch, basically I only added
the checking in PQputCopyData that it's allowed in COPY IN mode
iff the pgconn was set up for replication. I introduced a new libpq
function PQsetDuplexCopy() at the time but Fujii's idea was that
it can be omitted and use the conn->replication pointer instead.
It seems he forgot about it. Something like this might work:

if (conn->asyncStatus != PGASYNC_COPY_IN &&
!(conn->asyncStatus == PGASYNC_COPY_OUT &&
conn->replication && conn->replication[0]))
...

This way the original error checking is still in place and only
a replication client can do a duplex COPY.

It should be pretty safe to add a CopyInOutResponse message to the
protocol without a protocol version bump. Thoughts on that?

Not if it's something that an existing application might see. If
it can only happen in replication mode it's OK.

My PQsetDuplexCopy() call was only usable for a replication client,
it resulted in an "unknown protocol message" for a regular client.
For a replication client, walsender sent an ack and libpq have set
the "duplex copy" flag so it allowed PQputCopyData while in
COPY OUT. I'd like a little comment from you whether it's a
good idea, or the above check is enough.

Personally I think this demonstrates that piggybacking replication
data transfer on the COPY protocol was a bad design to start with.
It's probably time to split them apart.

Best regards,
Zolt�n B�sz�rm�nyi

--
----------------------------------
Zolt�n B�sz�rm�nyi
Cybertec Sch�nig & Sch�nig GmbH
Gr�hrm�hlgasse 26
A-2700 Wiener Neustadt, Austria
Web: http://www.postgresql-support.de
http://www.postgresql.at/

#7Boszormenyi Zoltan
zb@cybertec.at
In reply to: Simon Riggs (#4)
Re: libpq changes for synchronous replication

Simon Riggs írta:

On Fri, 2010-09-17 at 18:22 +0900, Fujii Masao wrote:

On Fri, Sep 17, 2010 at 5:09 PM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

That said, there's a few small things that can be progressed regardless of
the details of synchronous replication. There's the changes to trigger
failover with a signal, and it seems that we'll need some libpq changes to
allow acknowledgments to be sent back to the master regardless of the rest
of the design. We can discuss those in separate threads in parallel.

Agreed. The attached patch introduces new function which is used
to send ACK back from walreceiver. The function sends a message
to XLOG stream by calling PQputCopyData. Also I allowed PQputCopyData
to be called even during COPY OUT.

Does this differ from Zoltan's code?

Somewhat. See my other mail to Tom.

Best regards,
Zoltán Böszörményi

--
----------------------------------
Zoltán Böszörményi
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt, Austria
Web: http://www.postgresql-support.de
http://www.postgresql.at/

#8Fujii Masao
masao.fujii@gmail.com
In reply to: Tom Lane (#3)
1 attachment(s)
Re: libpq changes for synchronous replication

On Tue, Sep 21, 2010 at 1:17 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes:

It doesn't feel right to always accept PQputCopyData in COPY OUT mode,
though. IMHO there should be a new COPY IN+OUT mode.

Yeah, I was going to make the same complaint.  Breaking basic
error-checking functionality in libpq is not very acceptable.

It should be pretty safe to add a CopyInOutResponse message to the
protocol without a protocol version bump. Thoughts on that?

Not if it's something that an existing application might see.  If
it can only happen in replication mode it's OK.

The attached patch adds a CopyXLogResponse message. The walsender sends
it after processing START_REPLICATION command, instead of CopyOutResponse.
During Copy XLog mode, walreceiver can receive some data from walsender,
and can send some data to walsender.

Personally I think this demonstrates that piggybacking replication
data transfer on the COPY protocol was a bad design to start with.
It's probably time to split them apart.

In the patch, replication data is still transferred on COPY protocol.
If we'd transfer that on dedicated protocol for replication, we would
need to duplicate PQgetCopyData and PQputCopyData and define those
duplicated functions as something like PQgetXLogData and PQputXLogData
for replication.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_send_v2.patchapplication/octet-stream; name=libpqrcv_send_v2.patchDownload
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1344,1350 **** The commands accepted in walsender mode are:
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyOutResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
--- 1344,1350 ----
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyXLogResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
***************
*** 2696,2701 **** CopyOutResponse (B)
--- 2696,2737 ----
  
  <varlistentry>
  <term>
+ CopyXLogResponse (B)
+ </term>
+ <listitem>
+ <para>
+ 
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('W')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as a Start Copy XLog response.
+                 This message is used only for Streaming Replication.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ 
+ </para>
+ </listitem>
+ </varlistentry>
+ 
+ 
+ <varlistentry>
+ <term>
  DataRow (B)
  </term>
  <listitem>
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 50,55 **** static char *recvBuf = NULL;
--- 50,56 ----
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
--- 65,75 ----
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_send != NULL || walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 400,417 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message to XLOG stream.
+  *
+  * ereports on error.
+  */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ 	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ 		PQflush(streamConn))
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to WAL stream: %s",
+ 						PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool		am_walreceiver;
--- 57,63 ----
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
***************
*** 247,253 **** WalReceiverMain(void)
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 248,254 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 287,296 **** WalSndHandshake(void)
  									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  									 errmsg("standby connections not allowed because wal_level=minimal")));
  
! 						/* Send a CopyOutResponse message, and start streaming */
! 						pq_beginmessage(&buf, 'H');
! 						pq_sendbyte(&buf, 0);
! 						pq_sendint(&buf, 0, 2);
  						pq_endmessage(&buf);
  						pq_flush();
  
--- 287,294 ----
  									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  									 errmsg("standby connections not allowed because wal_level=minimal")));
  
! 						/* Send a CopyXLogResponse message, and start streaming */
! 						pq_beginmessage(&buf, 'W');
  						pq_endmessage(&buf);
  						pq_flush();
  
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 84,89 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
--- 84,92 ----
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 1586,1591 **** PQgetResult(PGconn *conn)
--- 1586,1592 ----
  				res = PQmakeEmptyPGresult(conn, PGRES_COPY_IN);
  			break;
  		case PGASYNC_COPY_OUT:
+ 		case PGASYNC_COPY_XLOG:
  			if (conn->result && conn->result->resultStatus == PGRES_COPY_OUT)
  				res = pqPrepareAsyncResult(conn);
  			else
***************
*** 2000,2006 **** PQnotifies(PGconn *conn)
  }
  
  /*
!  * PQputCopyData - send some data to the backend during COPY IN
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
--- 2001,2007 ----
  }
  
  /*
!  * PQputCopyData - send some data to the backend during COPY IN or COPY XLOG
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2011,2018 ----
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN &&
! 		conn->asyncStatus != PGASYNC_COPY_XLOG)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
***************
*** 2148,2153 **** PQputCopyEnd(PGconn *conn, const char *errormsg)
--- 2150,2156 ----
  
  /*
   * PQgetCopyData - read a row of data from the backend during COPY OUT
+  * or COPY XLOG
   *
   * If successful, sets *buffer to point to a malloc'd row of data, and
   * returns row length (always > 0) as result.
***************
*** 2161,2167 **** PQgetCopyData(PGconn *conn, char **buffer, int async)
  	*buffer = NULL;				/* for all failure cases */
  	if (!conn)
  		return -2;
! 	if (conn->asyncStatus != PGASYNC_COPY_OUT)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2164,2171 ----
  	*buffer = NULL;				/* for all failure cases */
  	if (!conn)
  		return -2;
! 	if (conn->asyncStatus != PGASYNC_COPY_OUT &&
! 		conn->asyncStatus != PGASYNC_COPY_XLOG)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 541,546 **** pqParseInput2(PGconn *conn)
--- 541,550 ----
  				case 'H':		/* Start Copy Out */
  					conn->asyncStatus = PGASYNC_COPY_OUT;
  					break;
+ 					/*
+ 					 * Don't need to process CopyXLogResponse here because
+ 					 * it never arrives from the server during protocol 2.0.
+ 					 */
  				default:
  					printfPQExpBuffer(&conn->errorMessage,
  									  libpq_gettext(
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 358,363 **** pqParseInput3(PGconn *conn)
--- 358,375 ----
  					conn->asyncStatus = PGASYNC_COPY_OUT;
  					conn->copy_already_done = 0;
  					break;
+ 				case 'W':		/* Start Copy XLog */
+ 					/*
+ 					 * We don't need to use getCopyStart here since CopyXLogResponse
+ 					 * specifies neither the copy format nor the number of columns in
+ 					 * the Copy data. They should be always zero.
+ 					 */
+ 					conn->result = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
+ 					if (!conn->result)
+ 						return;
+ 					conn->asyncStatus = PGASYNC_COPY_XLOG;
+ 					conn->copy_already_done = 0;
+ 					break;
  				case 'd':		/* Copy Data */
  
  					/*
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 218,224 **** typedef enum
  	PGASYNC_BUSY,				/* query in progress */
  	PGASYNC_READY,				/* result ready for PQgetResult */
  	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
! 	PGASYNC_COPY_OUT			/* Copy Out data transfer in progress */
  } PGAsyncStatusType;
  
  /* PGQueryClass tracks which query protocol we are now executing */
--- 218,225 ----
  	PGASYNC_BUSY,				/* query in progress */
  	PGASYNC_READY,				/* result ready for PQgetResult */
  	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
! 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
! 	PGASYNC_COPY_XLOG			/* Copy XLog data transfer in progress */
  } PGAsyncStatusType;
  
  /* PGQueryClass tracks which query protocol we are now executing */
#9Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#3)
Re: libpq changes for synchronous replication

On Mon, Sep 20, 2010 at 12:17 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Personally I think this demonstrates that piggybacking replication
data transfer on the COPY protocol was a bad design to start with.
It's probably time to split them apart.

This appears to be the only obvious unresolved issue regarding this patch:

https://commitfest.postgresql.org/action/patch_view?id=412

I don't have a strong personal position on whether or not we should do
this, but it strikes me that Tom hasn't given much justification for
why he thinks we should do this, what benefit we'd get from it, or
what the design should look like. So I guess the question is whether
Tom - or anyone - would like to make a case for a more serious
protocol overhaul, or whether we should just go with the approach
proposed here.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#10Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#9)
Re: libpq changes for synchronous replication

Robert Haas <robertmhaas@gmail.com> writes:

On Mon, Sep 20, 2010 at 12:17 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Personally I think this demonstrates that piggybacking replication
data transfer on the COPY protocol was a bad design to start with.
It's probably time to split them apart.

This appears to be the only obvious unresolved issue regarding this patch:

https://commitfest.postgresql.org/action/patch_view?id=412

I don't have a strong personal position on whether or not we should do
this, but it strikes me that Tom hasn't given much justification for
why he thinks we should do this, what benefit we'd get from it, or
what the design should look like. So I guess the question is whether
Tom - or anyone - would like to make a case for a more serious
protocol overhaul, or whether we should just go with the approach
proposed here.

I was objecting to v1 of the patch. v2 seems somewhat cleaner --- it at
least avoids changing the behavior of libpq for normal COPY operation.
I'm still a bit concerned by the prospect of having to shove further
warts into the COPY data path in future, but maybe its premature to
complain about that when it hasn't happened yet.

Just in a quick scan, I don't have any objection to v2 except that the
protocol documentation is lacking.

regards, tom lane

#11Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#10)
Re: libpq changes for synchronous replication

On Mon, Nov 15, 2010 at 7:26 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Robert Haas <robertmhaas@gmail.com> writes:

On Mon, Sep 20, 2010 at 12:17 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Personally I think this demonstrates that piggybacking replication
data transfer on the COPY protocol was a bad design to start with.
It's probably time to split them apart.

This appears to be the only obvious unresolved issue regarding this patch:

https://commitfest.postgresql.org/action/patch_view?id=412

I don't have a strong personal position on whether or not we should do
this, but it strikes me that Tom hasn't given much justification for
why he thinks we should do this, what benefit we'd get from it, or
what the design should look like.  So I guess the question is whether
Tom - or anyone - would like to make a case for a more serious
protocol overhaul, or whether we should just go with the approach
proposed here.

I was objecting to v1 of the patch.  v2 seems somewhat cleaner --- it at
least avoids changing the behavior of libpq for normal COPY operation.
I'm still a bit concerned by the prospect of having to shove further
warts into the COPY data path in future, but maybe its premature to
complain about that when it hasn't happened yet.

It's not an unreasonable complaint, but I don't have a very clear idea
what to do about it.

Just in a quick scan, I don't have any objection to v2 except that the
protocol documentation is lacking.

OK, I'll mark it Waiting on Author pending that issue.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#12Fujii Masao
masao.fujii@gmail.com
In reply to: Robert Haas (#11)
Re: libpq changes for synchronous replication

On Tue, Nov 16, 2010 at 10:49 AM, Robert Haas <robertmhaas@gmail.com> wrote:

Just in a quick scan, I don't have any objection to v2 except that the
protocol documentation is lacking.

OK, I'll mark it Waiting on Author pending that issue.

The patch is touching protocol.sgml as follows. Isn't this enough?

-----------------------------
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1344,1350 **** The commands accepted in walsender mode are:
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyOutResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
--- 1344,1350 ----
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyXLogResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
***************
*** 2696,2701 **** CopyOutResponse (B)
--- 2696,2737 ----
  <varlistentry>
  <term>
+ CopyXLogResponse (B)
+ </term>
+ <listitem>
+ <para>
+
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('W')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as a Start Copy XLog response.
+                 This message is used only for Streaming Replication.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ </para>
+ </listitem>
+ </varlistentry>
+
+
+ <varlistentry>
+ <term>
  DataRow (B)
  </term>
  <listitem>
-----------------------------

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#13Robert Haas
robertmhaas@gmail.com
In reply to: Fujii Masao (#12)
Re: libpq changes for synchronous replication

On Thu, Nov 18, 2010 at 7:43 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Nov 16, 2010 at 10:49 AM, Robert Haas <robertmhaas@gmail.com> wrote:

Just in a quick scan, I don't have any objection to v2 except that the
protocol documentation is lacking.

OK, I'll mark it Waiting on Author pending that issue.

The patch is touching protocol.sgml as follows. Isn't this enough?

How about some updates to the "Message Flow" section, especially the
section on "COPY Operations"?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#14Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#13)
Re: libpq changes for synchronous replication

Robert Haas <robertmhaas@gmail.com> writes:

On Thu, Nov 18, 2010 at 7:43 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

The patch is touching protocol.sgml as follows. Isn't this enough?

How about some updates to the "Message Flow" section, especially the
section on "COPY Operations"?

Yeah. You're adding a new fundamental state to the protocol; it's not
enough to bury that in the description of a message format. I don't
think a whole lot of new verbiage is needed, but the COPY section needs
to point out that this is a different state that allows both send and
receive, and explain what the conditions are for getting into and out of
that state.

regards, tom lane

#15Alvaro Herrera
alvherre@commandprompt.com
In reply to: Tom Lane (#14)
Re: libpq changes for synchronous replication

Excerpts from Tom Lane's message of vie nov 19 12:25:13 -0300 2010:

Robert Haas <robertmhaas@gmail.com> writes:

On Thu, Nov 18, 2010 at 7:43 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

The patch is touching protocol.sgml as follows. Isn't this enough?

How about some updates to the "Message Flow" section, especially the
section on "COPY Operations"?

Yeah. You're adding a new fundamental state to the protocol; it's not
enough to bury that in the description of a message format. I don't
think a whole lot of new verbiage is needed, but the COPY section needs
to point out that this is a different state that allows both send and
receive, and explain what the conditions are for getting into and out of
that state.

Is it sane that the new message has so specific a name?

--
Álvaro Herrera <alvherre@commandprompt.com>
The PostgreSQL Company - Command Prompt, Inc.
PostgreSQL Replication, Consulting, Custom Development, 24x7 support

#16Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#15)
Re: libpq changes for synchronous replication

Alvaro Herrera <alvherre@commandprompt.com> writes:

Excerpts from Tom Lane's message of vie nov 19 12:25:13 -0300 2010:

Yeah. You're adding a new fundamental state to the protocol; it's not
enough to bury that in the description of a message format. I don't
think a whole lot of new verbiage is needed, but the COPY section needs
to point out that this is a different state that allows both send and
receive, and explain what the conditions are for getting into and out of
that state.

Is it sane that the new message has so specific a name?

Yeah, it might be better to call it something generic like CopyBoth.

regards, tom lane

#17Fujii Masao
masao.fujii@gmail.com
In reply to: Tom Lane (#16)
1 attachment(s)
Re: libpq changes for synchronous replication

On Sat, Nov 20, 2010 at 2:04 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Alvaro Herrera <alvherre@commandprompt.com> writes:

Excerpts from Tom Lane's message of vie nov 19 12:25:13 -0300 2010:

Yeah.  You're adding a new fundamental state to the protocol; it's not
enough to bury that in the description of a message format.  I don't
think a whole lot of new verbiage is needed, but the COPY section needs
to point out that this is a different state that allows both send and
receive, and explain what the conditions are for getting into and out of
that state.

Is it sane that the new message has so specific a name?

Yeah, it might be better to call it something generic like CopyBoth.

Thanks for the review!

The attached patch s/CopyXLog/CopyBoth/g and adds the description
about CopyBoth into the COPY section.

While modifying the code, it occurred to me that we might have to add new
ExecStatusType like PGRES_COPY_BOTH and use that for CopyBoth mode,
for the sake of consistency. But since it's just alias of PGRES_COPY_BOTH
for now, i.e., there is no specific behavior for that ExecStatusType, I don't
think that it's worth adding that yet.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_send_v3.patchapplication/octet-stream; name=libpqrcv_send_v3.patchDownload
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1039,1044 ****
--- 1039,1057 ----
      all columns in a given <command>COPY</> operation will use the same
      format, but the message design does not assume this.)
     </para>
+ 
+    <para>
+     There is another Copy-related mode called Copy-both, which allows
+     high-speed bulk data transfer to <emphasis>and</> from the server.
+     Copy-both mode is initiated when the backend with walsender mode
+     executes a <command>START_REPLICATION</command> statement.  The
+     backend sends a CopyBothResponse message to the frontend.  Both
+     the backend and the frontend should then send CopyData messages
+     between each other until the connection is terminated.  Copy-both
+     mode is used by streaming replication (see <xref
+     linkend="protocol-replication">).
+    </para>
+ 
    </sect2>
  
    <sect2 id="protocol-async">
***************
*** 1344,1350 **** The commands accepted in walsender mode are:
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyOutResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
--- 1357,1363 ----
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyBothResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
***************
*** 2696,2701 **** CopyOutResponse (B)
--- 2709,2750 ----
  
  <varlistentry>
  <term>
+ CopyBothResponse (B)
+ </term>
+ <listitem>
+ <para>
+ 
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('W')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as a Start Copy Both response.
+                 This message is used only for Streaming Replication.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ 
+ </para>
+ </listitem>
+ </varlistentry>
+ 
+ 
+ <varlistentry>
+ <term>
  DataRow (B)
  </term>
  <listitem>
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 50,55 **** static char *recvBuf = NULL;
--- 50,56 ----
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
--- 65,75 ----
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_send != NULL || walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 400,417 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message to XLOG stream.
+  *
+  * ereports on error.
+  */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ 	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ 		PQflush(streamConn))
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to WAL stream: %s",
+ 						PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool		am_walreceiver;
--- 57,63 ----
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
***************
*** 247,253 **** WalReceiverMain(void)
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 248,254 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 287,296 **** WalSndHandshake(void)
  									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  									 errmsg("standby connections not allowed because wal_level=minimal")));
  
! 						/* Send a CopyOutResponse message, and start streaming */
! 						pq_beginmessage(&buf, 'H');
! 						pq_sendbyte(&buf, 0);
! 						pq_sendint(&buf, 0, 2);
  						pq_endmessage(&buf);
  						pq_flush();
  
--- 287,294 ----
  									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  									 errmsg("standby connections not allowed because wal_level=minimal")));
  
! 						/* Send a CopyBothResponse message, and start streaming */
! 						pq_beginmessage(&buf, 'W');
  						pq_endmessage(&buf);
  						pq_flush();
  
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 84,89 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
--- 84,92 ----
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 1586,1591 **** PQgetResult(PGconn *conn)
--- 1586,1592 ----
  				res = PQmakeEmptyPGresult(conn, PGRES_COPY_IN);
  			break;
  		case PGASYNC_COPY_OUT:
+ 		case PGASYNC_COPY_BOTH:
  			if (conn->result && conn->result->resultStatus == PGRES_COPY_OUT)
  				res = pqPrepareAsyncResult(conn);
  			else
***************
*** 2000,2006 **** PQnotifies(PGconn *conn)
  }
  
  /*
!  * PQputCopyData - send some data to the backend during COPY IN
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
--- 2001,2007 ----
  }
  
  /*
!  * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2011,2018 ----
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN &&
! 		conn->asyncStatus != PGASYNC_COPY_BOTH)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
***************
*** 2148,2153 **** PQputCopyEnd(PGconn *conn, const char *errormsg)
--- 2150,2156 ----
  
  /*
   * PQgetCopyData - read a row of data from the backend during COPY OUT
+  * or COPY BOTH
   *
   * If successful, sets *buffer to point to a malloc'd row of data, and
   * returns row length (always > 0) as result.
***************
*** 2161,2167 **** PQgetCopyData(PGconn *conn, char **buffer, int async)
  	*buffer = NULL;				/* for all failure cases */
  	if (!conn)
  		return -2;
! 	if (conn->asyncStatus != PGASYNC_COPY_OUT)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2164,2171 ----
  	*buffer = NULL;				/* for all failure cases */
  	if (!conn)
  		return -2;
! 	if (conn->asyncStatus != PGASYNC_COPY_OUT &&
! 		conn->asyncStatus != PGASYNC_COPY_BOTH)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 541,546 **** pqParseInput2(PGconn *conn)
--- 541,550 ----
  				case 'H':		/* Start Copy Out */
  					conn->asyncStatus = PGASYNC_COPY_OUT;
  					break;
+ 					/*
+ 					 * Don't need to process CopyBothResponse here because
+ 					 * it never arrives from the server during protocol 2.0.
+ 					 */
  				default:
  					printfPQExpBuffer(&conn->errorMessage,
  									  libpq_gettext(
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 358,363 **** pqParseInput3(PGconn *conn)
--- 358,375 ----
  					conn->asyncStatus = PGASYNC_COPY_OUT;
  					conn->copy_already_done = 0;
  					break;
+ 				case 'W':		/* Start Copy Both */
+ 					/*
+ 					 * We don't need to use getCopyStart here since CopyBothResponse
+ 					 * specifies neither the copy format nor the number of columns in
+ 					 * the Copy data. They should be always zero.
+ 					 */
+ 					conn->result = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
+ 					if (!conn->result)
+ 						return;
+ 					conn->asyncStatus = PGASYNC_COPY_BOTH;
+ 					conn->copy_already_done = 0;
+ 					break;
  				case 'd':		/* Copy Data */
  
  					/*
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 218,224 **** typedef enum
  	PGASYNC_BUSY,				/* query in progress */
  	PGASYNC_READY,				/* result ready for PQgetResult */
  	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
! 	PGASYNC_COPY_OUT			/* Copy Out data transfer in progress */
  } PGAsyncStatusType;
  
  /* PGQueryClass tracks which query protocol we are now executing */
--- 218,225 ----
  	PGASYNC_BUSY,				/* query in progress */
  	PGASYNC_READY,				/* result ready for PQgetResult */
  	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
! 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
! 	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
  } PGAsyncStatusType;
  
  /* PGQueryClass tracks which query protocol we are now executing */
#18Alvaro Herrera
alvherre@commandprompt.com
In reply to: Fujii Masao (#17)
Re: libpq changes for synchronous replication

Excerpts from Fujii Masao's message of jue nov 25 10:47:12 -0300 2010:

The attached patch s/CopyXLog/CopyBoth/g and adds the description
about CopyBoth into the COPY section.

I gave this a look. It seems good, but I'm not sure about this bit:

+               case 'W':       /* Start Copy Both */
+                   /*
+                    * We don't need to use getCopyStart here since CopyBothResponse
+                    * specifies neither the copy format nor the number of columns in
+                    * the Copy data. They should be always zero.
+                    */
+                   conn->result = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
+                   if (!conn->result)
+                       return;
+                   conn->asyncStatus = PGASYNC_COPY_BOTH;
+                   conn->copy_already_done = 0;
+                   break;

I guess this was OK when this was conceived as CopyXlog, but since it's
now a generic mechanism, this seems a bit unwise. Should this be
reconsidered so that it's possible to change the format or number of
columns?

(The paragraph added to the docs is also a bit too specific about this
being used exclusively in streaming replication, ISTM)

While modifying the code, it occurred to me that we might have to add new
ExecStatusType like PGRES_COPY_BOTH and use that for CopyBoth mode,
for the sake of consistency. But since it's just alias of PGRES_COPY_BOTH
for now, i.e., there is no specific behavior for that ExecStatusType, I don't
think that it's worth adding that yet.

I'm not so sure about this. If we think that it's worth adding a new
possible state, we should do so now; we will not be able to change this
behavior later.

--
Álvaro Herrera <alvherre@commandprompt.com>
The PostgreSQL Company - Command Prompt, Inc.
PostgreSQL Replication, Consulting, Custom Development, 24x7 support

#19Greg Smith
greg@2ndquadrant.com
In reply to: Alvaro Herrera (#18)
Re: libpq changes for synchronous replication

The one time this year top-posting seems appropriate...this patch seems
stalled waiting for some sort of response to the concerns Alvaro raised
here.

Alvaro Herrera wrote:

Show quoted text

Excerpts from Fujii Masao's message of jue nov 25 10:47:12 -0300 2010:

The attached patch s/CopyXLog/CopyBoth/g and adds the description
about CopyBoth into the COPY section.

I gave this a look. It seems good, but I'm not sure about this bit:

+               case 'W':       /* Start Copy Both */
+                   /*
+                    * We don't need to use getCopyStart here since CopyBothResponse
+                    * specifies neither the copy format nor the number of columns in
+                    * the Copy data. They should be always zero.
+                    */
+                   conn->result = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
+                   if (!conn->result)
+                       return;
+                   conn->asyncStatus = PGASYNC_COPY_BOTH;
+                   conn->copy_already_done = 0;
+                   break;

I guess this was OK when this was conceived as CopyXlog, but since it's
now a generic mechanism, this seems a bit unwise. Should this be
reconsidered so that it's possible to change the format or number of
columns?

(The paragraph added to the docs is also a bit too specific about this
being used exclusively in streaming replication, ISTM)

While modifying the code, it occurred to me that we might have to add new
ExecStatusType like PGRES_COPY_BOTH and use that for CopyBoth mode,
for the sake of consistency. But since it's just alias of PGRES_COPY_BOTH
for now, i.e., there is no specific behavior for that ExecStatusType, I don't
think that it's worth adding that yet.

I'm not so sure about this. If we think that it's worth adding a new
possible state, we should do so now; we will not be able to change this
behavior later.

#20Fujii Masao
masao.fujii@gmail.com
In reply to: Greg Smith (#19)
1 attachment(s)
Re: libpq changes for synchronous replication

On Mon, Dec 6, 2010 at 3:07 AM, Greg Smith <greg@2ndquadrant.com> wrote:

The one time this year top-posting seems appropriate...this patch seems
stalled waiting for some sort of response to the concerns Alvaro raised
here.

Sorry for the delay. I didn't have the time.

I gave this a look. It seems good, but I'm not sure about this bit:

Thanks for the review!

I guess this was OK when this was conceived as CopyXlog, but since it's
now a generic mechanism, this seems a bit unwise. Should this be
reconsidered so that it's possible to change the format or number of
columns?

I changed CopyBothResponse message so that it includes the format
and number of columns of copy data. Please see the attached patch.

(The paragraph added to the docs is also a bit too specific about this
being used exclusively in streaming replication, ISTM)

Yes. But it seems difficult to generalize the docs more because currently
only SR uses Copy-both. So I had to write that, for example, the condition
to get into the state is only "START REPLICATION" command.

While modifying the code, it occurred to me that we might have to add new
ExecStatusType like PGRES_COPY_BOTH and use that for CopyBoth mode,
for the sake of consistency. But since it's just alias of PGRES_COPY_BOTH
for now, i.e., there is no specific behavior for that ExecStatusType, I
don't
think that it's worth adding that yet.

I'm not so sure about this. If we think that it's worth adding a new
possible state, we should do so now; we will not be able to change this
behavior later.

OK. I added that new state.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

Attachments:

libpqrcv_send_v4.patchapplication/octet-stream; name=libpqrcv_send_v4.patchDownload
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1033,1044 ****
     </para>
  
     <para>
!     The CopyInResponse and CopyOutResponse messages include fields that
!     inform the frontend of the number of columns per row and the format
!     codes being used for each column.  (As of the present implementation,
!     all columns in a given <command>COPY</> operation will use the same
!     format, but the message design does not assume this.)
     </para>
    </sect2>
  
    <sect2 id="protocol-async">
--- 1033,1058 ----
     </para>
  
     <para>
!     There is another Copy-related mode called Copy-both, which allows
!     high-speed bulk data transfer to <emphasis>and</> from the server.
!     Copy-both mode is initiated when the backend with walsender mode
!     executes a <command>START_REPLICATION</command> statement.  The
!     backend sends a CopyBothResponse message to the frontend.  Both
!     the backend and the frontend should then send CopyData messages
!     between each other until the connection is terminated.  Copy-both
!     mode is used by streaming replication (see <xref
!     linkend="protocol-replication">).
     </para>
+ 
+    <para>
+     The CopyInResponse, CopyOutResponse and CopyBothResponse messages
+     include fields that inform the frontend of the number of columns
+     per row and the format codes being used for each column.  (As of
+     the present implementation, all columns in a given <command>COPY</>
+     operation will use the same format, but the message design does not
+     assume this.)
+    </para>
+ 
    </sect2>
  
    <sect2 id="protocol-async">
***************
*** 1344,1350 **** The commands accepted in walsender mode are:
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyOutResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
--- 1358,1364 ----
        WAL position <replaceable>XXX</>/<replaceable>XXX</>.
        The server can reply with an error, e.g. if the requested section of WAL
        has already been recycled. On success, server responds with a
!       CopyBothResponse message, and then starts to stream WAL to the frontend.
        WAL will continue to be streamed until the connection is broken;
        no further commands will be accepted.
       </para>
***************
*** 2696,2701 **** CopyOutResponse (B)
--- 2710,2788 ----
  
  <varlistentry>
  <term>
+ CopyBothResponse (B)
+ </term>
+ <listitem>
+ <para>
+ 
+ <variablelist>
+ <varlistentry>
+ <term>
+         Byte1('W')
+ </term>
+ <listitem>
+ <para>
+                 Identifies the message as a Start Copy Both response.
+                 This message is used only for Streaming Replication.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int32
+ </term>
+ <listitem>
+ <para>
+                 Length of message contents in bytes, including self.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int8
+ </term>
+ <listitem>
+ <para>
+                 0 indicates the overall <command>COPY</command> format
+                 is textual (rows separated by newlines, columns
+                 separated by separator characters, etc). 1 indicates
+                 the overall copy format is binary (similar to DataRow
+                 format). See <xref linkend="sql-copy"> for more information.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int16
+ </term>
+ <listitem>
+ <para>
+                 The number of columns in the data to be copied
+                 (denoted <replaceable>N</> below).
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+         Int16[<replaceable>N</>]
+ </term>
+ <listitem>
+ <para>
+                 The format codes to be used for each column.
+                 Each must presently be zero (text) or one (binary).
+                 All must be zero if the overall copy format is textual.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ 
+ </para>
+ </listitem>
+ </varlistentry>
+ 
+ 
+ <varlistentry>
+ <term>
  DataRow (B)
  </term>
  <listitem>
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 50,55 **** static char *recvBuf = NULL;
--- 50,56 ----
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
  static bool libpqrcv_receive(int timeout, unsigned char *type,
  				 char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
--- 65,75 ----
  {
  	/* Tell walreceiver how to reach us */
  	if (walrcv_connect != NULL || walrcv_receive != NULL ||
! 		walrcv_send != NULL || walrcv_disconnect != NULL)
  		elog(ERROR, "libpqwalreceiver already loaded");
  	walrcv_connect = libpqrcv_connect;
  	walrcv_receive = libpqrcv_receive;
+ 	walrcv_send = libpqrcv_send;
  	walrcv_disconnect = libpqrcv_disconnect;
  }
  
***************
*** 157,163 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
  	res = libpqrcv_PQexec(cmd);
! 	if (PQresultStatus(res) != PGRES_COPY_OUT)
  	{
  		PQclear(res);
  		ereport(ERROR,
--- 159,165 ----
  	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
  			 startpoint.xlogid, startpoint.xrecoff);
  	res = libpqrcv_PQexec(cmd);
! 	if (PQresultStatus(res) != PGRES_COPY_BOTH)
  	{
  		PQclear(res);
  		ereport(ERROR,
***************
*** 303,308 **** libpqrcv_PQexec(const char *query)
--- 305,311 ----
  
  		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
  			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+ 			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
  			PQstatus(streamConn) == CONNECTION_BAD)
  			break;
  	}
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 401,418 ----
  
  	return true;
  }
+ 
+ /*
+  * Send a message to XLOG stream.
+  *
+  * ereports on error.
+  */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ 	if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ 		PQflush(streamConn))
+ 		ereport(ERROR,
+ 				(errmsg("could not send data to WAL stream: %s",
+ 						PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool		am_walreceiver;
--- 57,63 ----
  /* libpqreceiver hooks to these when loaded */
  walrcv_connect_type walrcv_connect = NULL;
  walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
  walrcv_disconnect_type walrcv_disconnect = NULL;
  
  #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
***************
*** 247,253 **** WalReceiverMain(void)
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
--- 248,254 ----
  	/* Load the libpq-specific functions */
  	load_file("libpqwalreceiver", false);
  	if (walrcv_connect == NULL || walrcv_receive == NULL ||
! 		walrcv_send == NULL || walrcv_disconnect == NULL)
  		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
  
  	/*
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 287,294 **** WalSndHandshake(void)
  									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  									 errmsg("standby connections not allowed because wal_level=minimal")));
  
! 						/* Send a CopyOutResponse message, and start streaming */
! 						pq_beginmessage(&buf, 'H');
  						pq_sendbyte(&buf, 0);
  						pq_sendint(&buf, 0, 2);
  						pq_endmessage(&buf);
--- 287,294 ----
  									(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  									 errmsg("standby connections not allowed because wal_level=minimal")));
  
! 						/* Send a CopyBothResponse message, and start streaming */
! 						pq_beginmessage(&buf, 'W');
  						pq_sendbyte(&buf, 0);
  						pq_sendint(&buf, 0, 2);
  						pq_endmessage(&buf);
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 84,89 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
--- 84,92 ----
  												 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+ 
  typedef void (*walrcv_disconnect_type) (void);
  extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
  
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 35,40 **** char	   *const pgresStatus[] = {
--- 35,41 ----
  	"PGRES_TUPLES_OK",
  	"PGRES_COPY_OUT",
  	"PGRES_COPY_IN",
+ 	"PGRES_COPY_BOTH",
  	"PGRES_BAD_RESPONSE",
  	"PGRES_NONFATAL_ERROR",
  	"PGRES_FATAL_ERROR"
***************
*** 174,179 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
--- 175,181 ----
  			case PGRES_TUPLES_OK:
  			case PGRES_COPY_OUT:
  			case PGRES_COPY_IN:
+ 			case PGRES_COPY_BOTH:
  				/* non-error cases */
  				break;
  			default:
***************
*** 1591,1596 **** PQgetResult(PGconn *conn)
--- 1593,1604 ----
  			else
  				res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
  			break;
+ 		case PGASYNC_COPY_BOTH:
+ 			if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH)
+ 				res = pqPrepareAsyncResult(conn);
+ 			else
+ 				res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH);
+ 			break;
  		default:
  			printfPQExpBuffer(&conn->errorMessage,
  							  libpq_gettext("unexpected asyncStatus: %d\n"),
***************
*** 1775,1780 **** PQexecStart(PGconn *conn)
--- 1783,1795 ----
  				return false;
  			}
  		}
+ 		else if (resultStatus == PGRES_COPY_BOTH)
+ 		{
+ 			/* We don't allow PQexec during COPY BOTH */
+ 			printfPQExpBuffer(&conn->errorMessage,
+ 			 libpq_gettext("PQexec not allowed during COPY BOTH\n"));
+ 			return false;			
+ 		}
  		/* check for loss of connection, too */
  		if (conn->status == CONNECTION_BAD)
  			return false;
***************
*** 1798,1804 **** PQexecFinish(PGconn *conn)
  	 * than one --- but merge error messages if we get more than one error
  	 * result.
  	 *
! 	 * We have to stop if we see copy in/out, however. We will resume parsing
  	 * after application performs the data transfer.
  	 *
  	 * Also stop if the connection is lost (else we'll loop infinitely).
--- 1813,1819 ----
  	 * than one --- but merge error messages if we get more than one error
  	 * result.
  	 *
! 	 * We have to stop if we see copy in/out/both, however. We will resume parsing
  	 * after application performs the data transfer.
  	 *
  	 * Also stop if the connection is lost (else we'll loop infinitely).
***************
*** 1827,1832 **** PQexecFinish(PGconn *conn)
--- 1842,1848 ----
  		lastResult = result;
  		if (result->resultStatus == PGRES_COPY_IN ||
  			result->resultStatus == PGRES_COPY_OUT ||
+ 			result->resultStatus == PGRES_COPY_BOTH ||
  			conn->status == CONNECTION_BAD)
  			break;
  	}
***************
*** 2000,2006 **** PQnotifies(PGconn *conn)
  }
  
  /*
!  * PQputCopyData - send some data to the backend during COPY IN
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
--- 2016,2022 ----
  }
  
  /*
!  * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
   *
   * Returns 1 if successful, 0 if data could not be sent (only possible
   * in nonblock mode), or -1 if an error occurs.
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2026,2033 ----
  {
  	if (!conn)
  		return -1;
! 	if (conn->asyncStatus != PGASYNC_COPY_IN &&
! 		conn->asyncStatus != PGASYNC_COPY_BOTH)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
***************
*** 2148,2153 **** PQputCopyEnd(PGconn *conn, const char *errormsg)
--- 2165,2171 ----
  
  /*
   * PQgetCopyData - read a row of data from the backend during COPY OUT
+  * or COPY BOTH
   *
   * If successful, sets *buffer to point to a malloc'd row of data, and
   * returns row length (always > 0) as result.
***************
*** 2161,2167 **** PQgetCopyData(PGconn *conn, char **buffer, int async)
  	*buffer = NULL;				/* for all failure cases */
  	if (!conn)
  		return -2;
! 	if (conn->asyncStatus != PGASYNC_COPY_OUT)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
--- 2179,2186 ----
  	*buffer = NULL;				/* for all failure cases */
  	if (!conn)
  		return -2;
! 	if (conn->asyncStatus != PGASYNC_COPY_OUT &&
! 		conn->asyncStatus != PGASYNC_COPY_BOTH)
  	{
  		printfPQExpBuffer(&conn->errorMessage,
  						  libpq_gettext("no COPY in progress\n"));
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 541,546 **** pqParseInput2(PGconn *conn)
--- 541,550 ----
  				case 'H':		/* Start Copy Out */
  					conn->asyncStatus = PGASYNC_COPY_OUT;
  					break;
+ 					/*
+ 					 * Don't need to process CopyBothResponse here because
+ 					 * it never arrives from the server during protocol 2.0.
+ 					 */
  				default:
  					printfPQExpBuffer(&conn->errorMessage,
  									  libpq_gettext(
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 358,363 **** pqParseInput3(PGconn *conn)
--- 358,369 ----
  					conn->asyncStatus = PGASYNC_COPY_OUT;
  					conn->copy_already_done = 0;
  					break;
+ 				case 'W':		/* Start Copy Both */
+ 					if (getCopyStart(conn, PGRES_COPY_BOTH))
+ 						return;
+ 					conn->asyncStatus = PGASYNC_COPY_BOTH;
+ 					conn->copy_already_done = 0;
+ 					break;
  				case 'd':		/* Copy Data */
  
  					/*
***************
*** 1196,1202 **** getNotify(PGconn *conn)
  }
  
  /*
!  * getCopyStart - process CopyInResponse or CopyOutResponse message
   *
   * parseInput already read the message type and length.
   */
--- 1202,1209 ----
  }
  
  /*
!  * getCopyStart - process CopyInResponse, CopyOutResponse or
!  * CopyBothResponse message
   *
   * parseInput already read the message type and length.
   */
***************
*** 1367,1372 **** getCopyDataMessage(PGconn *conn)
--- 1374,1380 ----
  
  /*
   * PQgetCopyData - read a row of data from the backend during COPY OUT
+  * or COPY BOTH
   *
   * If successful, sets *buffer to point to a malloc'd row of data, and
   * returns row length (always > 0) as result.
***************
*** 1390,1399 **** pqGetCopyData3(PGconn *conn, char **buffer, int async)
  		if (msgLength < 0)
  		{
  			/*
! 			 * On end-of-copy, exit COPY_OUT mode and let caller read status
! 			 * with PQgetResult().	The normal case is that it's Copy Done,
! 			 * but we let parseInput read that.  If error, we expect the state
! 			 * was already changed.
  			 */
  			if (msgLength == -1)
  				conn->asyncStatus = PGASYNC_BUSY;
--- 1398,1407 ----
  		if (msgLength < 0)
  		{
  			/*
! 			 * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
! 			 * read status with PQgetResult().	The normal case is that it's
! 			 * Copy Done, but we let parseInput read that.  If error, we expect
! 			 * the state was already changed.
  			 */
  			if (msgLength == -1)
  				conn->asyncStatus = PGASYNC_BUSY;
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
***************
*** 85,90 **** typedef enum
--- 85,91 ----
  								 * contains the result tuples */
  	PGRES_COPY_OUT,				/* Copy Out data transfer in progress */
  	PGRES_COPY_IN,				/* Copy In data transfer in progress */
+ 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
  	PGRES_BAD_RESPONSE,			/* an unexpected response was recv'd from the
  								 * backend */
  	PGRES_NONFATAL_ERROR,		/* notice or warning message */
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 218,224 **** typedef enum
  	PGASYNC_BUSY,				/* query in progress */
  	PGASYNC_READY,				/* result ready for PQgetResult */
  	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
! 	PGASYNC_COPY_OUT			/* Copy Out data transfer in progress */
  } PGAsyncStatusType;
  
  /* PGQueryClass tracks which query protocol we are now executing */
--- 218,225 ----
  	PGASYNC_BUSY,				/* query in progress */
  	PGASYNC_READY,				/* result ready for PQgetResult */
  	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
! 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
! 	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
  } PGAsyncStatusType;
  
  /* PGQueryClass tracks which query protocol we are now executing */
#21Robert Haas
robertmhaas@gmail.com
In reply to: Fujii Masao (#20)
Re: libpq changes for synchronous replication

On Mon, Dec 6, 2010 at 12:54 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Mon, Dec 6, 2010 at 3:07 AM, Greg Smith <greg@2ndquadrant.com> wrote:

The one time this year top-posting seems appropriate...this patch seems
stalled waiting for some sort of response to the concerns Alvaro raised
here.

Sorry for the delay. I didn't have the time.

I gave this a look.  It seems good, but I'm not sure about this bit:

Thanks for the review!

I guess this was OK when this was conceived as CopyXlog, but since it's
now a generic mechanism, this seems a bit unwise.  Should this be
reconsidered so that it's possible to change the format or number of
columns?

I changed CopyBothResponse message so that it includes the format
and number of columns of copy data. Please see the attached patch.

(The paragraph added to the docs is also a bit too specific about this
being used exclusively in streaming replication, ISTM)

Yes. But it seems difficult to generalize the docs more because currently
only SR uses Copy-both. So I had to write that, for example, the condition
to get into the state is only "START REPLICATION" command.

While modifying the code, it occurred to me that we might have to add new
ExecStatusType like PGRES_COPY_BOTH and use that for CopyBoth mode,
for the sake of consistency. But since it's just alias of PGRES_COPY_BOTH
for now, i.e., there is no specific behavior for that ExecStatusType, I
don't
think that it's worth adding that yet.

I'm not so sure about this.  If we think that it's worth adding a new
possible state, we should do so now; we will not be able to change this
behavior later.

OK. I added that new state.

Committed with just a few changes to the documentation.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#22Fujii Masao
masao.fujii@gmail.com
In reply to: Robert Haas (#21)
Re: libpq changes for synchronous replication

On Sat, Dec 11, 2010 at 11:37 PM, Robert Haas <robertmhaas@gmail.com> wrote:

Committed with just a few changes to the documentation.

Thanks a lot!

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center