*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1033,1044 ****
! The CopyInResponse and CopyOutResponse messages include fields that
! inform the frontend of the number of columns per row and the format
! codes being used for each column. (As of the present implementation,
! all columns in a given COPY> operation will use the same
! format, but the message design does not assume this.)
--- 1033,1058 ----
! There is another Copy-related mode called Copy-both, which allows
! high-speed bulk data transfer to and> from the server.
! Copy-both mode is initiated when the backend with walsender mode
! executes a START_REPLICATION statement. The
! backend sends a CopyBothResponse message to the frontend. Both
! the backend and the frontend should then send CopyData messages
! between each other until the connection is terminated. Copy-both
! mode is used by streaming replication (see ).
+
+
+ The CopyInResponse, CopyOutResponse and CopyBothResponse messages
+ include fields that inform the frontend of the number of columns
+ per row and the format codes being used for each column. (As of
+ the present implementation, all columns in a given COPY>
+ operation will use the same format, but the message design does not
+ assume this.)
+
+
***************
*** 1344,1350 **** The commands accepted in walsender mode are:
WAL position XXX>/XXX>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
! CopyOutResponse message, and then starts to stream WAL to the frontend.
WAL will continue to be streamed until the connection is broken;
no further commands will be accepted.
--- 1358,1364 ----
WAL position XXX>/XXX>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
! CopyBothResponse message, and then starts to stream WAL to the frontend.
WAL will continue to be streamed until the connection is broken;
no further commands will be accepted.
***************
*** 2696,2701 **** CopyOutResponse (B)
--- 2710,2788 ----
+ CopyBothResponse (B)
+
+
+
+
+
+
+
+ Byte1('W')
+
+
+
+ Identifies the message as a Start Copy Both response.
+ This message is used only for Streaming Replication.
+
+
+
+
+
+ Int32
+
+
+
+ Length of message contents in bytes, including self.
+
+
+
+
+
+ Int8
+
+
+
+ 0 indicates the overall COPY format
+ is textual (rows separated by newlines, columns
+ separated by separator characters, etc). 1 indicates
+ the overall copy format is binary (similar to DataRow
+ format). See for more information.
+
+
+
+
+
+ Int16
+
+
+
+ The number of columns in the data to be copied
+ (denoted N> below).
+
+
+
+
+
+ Int16[N>]
+
+
+
+ The format codes to be used for each column.
+ Each must presently be zero (text) or one (binary).
+ All must be zero if the overall copy format is textual.
+
+
+
+
+
+
+
+
+
+
+
+
DataRow (B)
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 50,55 **** static char *recvBuf = NULL;
--- 50,56 ----
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
+ static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
***************
*** 64,73 **** _PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
walrcv_disconnect = libpqrcv_disconnect;
}
--- 65,75 ----
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
! walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
+ walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
***************
*** 157,163 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
! if (PQresultStatus(res) != PGRES_COPY_OUT)
{
PQclear(res);
ereport(ERROR,
--- 159,165 ----
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
! if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
***************
*** 303,308 **** libpqrcv_PQexec(const char *query)
--- 305,311 ----
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+ PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
***************
*** 398,400 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
--- 401,418 ----
return true;
}
+
+ /*
+ * Send a message to XLOG stream.
+ *
+ * ereports on error.
+ */
+ static void
+ libpqrcv_send(const char *buffer, int nbytes)
+ {
+ if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send data to WAL stream: %s",
+ PQerrorMessage(streamConn))));
+ }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 57,62 **** bool am_walreceiver;
--- 57,63 ----
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
+ walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
***************
*** 247,253 **** WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
--- 248,254 ----
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
! walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 287,294 **** WalSndHandshake(void)
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
! /* Send a CopyOutResponse message, and start streaming */
! pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
--- 287,294 ----
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
! /* Send a CopyBothResponse message, and start streaming */
! pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 84,89 **** typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
--- 84,92 ----
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+ typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
+ extern PGDLLIMPORT walrcv_send_type walrcv_send;
+
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
*** a/src/interfaces/libpq/fe-exec.c
--- b/src/interfaces/libpq/fe-exec.c
***************
*** 35,40 **** char *const pgresStatus[] = {
--- 35,41 ----
"PGRES_TUPLES_OK",
"PGRES_COPY_OUT",
"PGRES_COPY_IN",
+ "PGRES_COPY_BOTH",
"PGRES_BAD_RESPONSE",
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR"
***************
*** 174,179 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
--- 175,181 ----
case PGRES_TUPLES_OK:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
+ case PGRES_COPY_BOTH:
/* non-error cases */
break;
default:
***************
*** 1591,1596 **** PQgetResult(PGconn *conn)
--- 1593,1604 ----
else
res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
break;
+ case PGASYNC_COPY_BOTH:
+ if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH)
+ res = pqPrepareAsyncResult(conn);
+ else
+ res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH);
+ break;
default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("unexpected asyncStatus: %d\n"),
***************
*** 1775,1780 **** PQexecStart(PGconn *conn)
--- 1783,1795 ----
return false;
}
}
+ else if (resultStatus == PGRES_COPY_BOTH)
+ {
+ /* We don't allow PQexec during COPY BOTH */
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("PQexec not allowed during COPY BOTH\n"));
+ return false;
+ }
/* check for loss of connection, too */
if (conn->status == CONNECTION_BAD)
return false;
***************
*** 1798,1804 **** PQexecFinish(PGconn *conn)
* than one --- but merge error messages if we get more than one error
* result.
*
! * We have to stop if we see copy in/out, however. We will resume parsing
* after application performs the data transfer.
*
* Also stop if the connection is lost (else we'll loop infinitely).
--- 1813,1819 ----
* than one --- but merge error messages if we get more than one error
* result.
*
! * We have to stop if we see copy in/out/both, however. We will resume parsing
* after application performs the data transfer.
*
* Also stop if the connection is lost (else we'll loop infinitely).
***************
*** 1827,1832 **** PQexecFinish(PGconn *conn)
--- 1842,1848 ----
lastResult = result;
if (result->resultStatus == PGRES_COPY_IN ||
result->resultStatus == PGRES_COPY_OUT ||
+ result->resultStatus == PGRES_COPY_BOTH ||
conn->status == CONNECTION_BAD)
break;
}
***************
*** 2000,2006 **** PQnotifies(PGconn *conn)
}
/*
! * PQputCopyData - send some data to the backend during COPY IN
*
* Returns 1 if successful, 0 if data could not be sent (only possible
* in nonblock mode), or -1 if an error occurs.
--- 2016,2022 ----
}
/*
! * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
*
* Returns 1 if successful, 0 if data could not be sent (only possible
* in nonblock mode), or -1 if an error occurs.
***************
*** 2010,2016 **** PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
--- 2026,2033 ----
{
if (!conn)
return -1;
! if (conn->asyncStatus != PGASYNC_COPY_IN &&
! conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
***************
*** 2148,2153 **** PQputCopyEnd(PGconn *conn, const char *errormsg)
--- 2165,2171 ----
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
***************
*** 2161,2167 **** PQgetCopyData(PGconn *conn, char **buffer, int async)
*buffer = NULL; /* for all failure cases */
if (!conn)
return -2;
! if (conn->asyncStatus != PGASYNC_COPY_OUT)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
--- 2179,2186 ----
*buffer = NULL; /* for all failure cases */
if (!conn)
return -2;
! if (conn->asyncStatus != PGASYNC_COPY_OUT &&
! conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
*** a/src/interfaces/libpq/fe-protocol2.c
--- b/src/interfaces/libpq/fe-protocol2.c
***************
*** 541,546 **** pqParseInput2(PGconn *conn)
--- 541,550 ----
case 'H': /* Start Copy Out */
conn->asyncStatus = PGASYNC_COPY_OUT;
break;
+ /*
+ * Don't need to process CopyBothResponse here because
+ * it never arrives from the server during protocol 2.0.
+ */
default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
*** a/src/interfaces/libpq/fe-protocol3.c
--- b/src/interfaces/libpq/fe-protocol3.c
***************
*** 358,363 **** pqParseInput3(PGconn *conn)
--- 358,369 ----
conn->asyncStatus = PGASYNC_COPY_OUT;
conn->copy_already_done = 0;
break;
+ case 'W': /* Start Copy Both */
+ if (getCopyStart(conn, PGRES_COPY_BOTH))
+ return;
+ conn->asyncStatus = PGASYNC_COPY_BOTH;
+ conn->copy_already_done = 0;
+ break;
case 'd': /* Copy Data */
/*
***************
*** 1196,1202 **** getNotify(PGconn *conn)
}
/*
! * getCopyStart - process CopyInResponse or CopyOutResponse message
*
* parseInput already read the message type and length.
*/
--- 1202,1209 ----
}
/*
! * getCopyStart - process CopyInResponse, CopyOutResponse or
! * CopyBothResponse message
*
* parseInput already read the message type and length.
*/
***************
*** 1367,1372 **** getCopyDataMessage(PGconn *conn)
--- 1374,1380 ----
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
***************
*** 1390,1399 **** pqGetCopyData3(PGconn *conn, char **buffer, int async)
if (msgLength < 0)
{
/*
! * On end-of-copy, exit COPY_OUT mode and let caller read status
! * with PQgetResult(). The normal case is that it's Copy Done,
! * but we let parseInput read that. If error, we expect the state
! * was already changed.
*/
if (msgLength == -1)
conn->asyncStatus = PGASYNC_BUSY;
--- 1398,1407 ----
if (msgLength < 0)
{
/*
! * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
! * read status with PQgetResult(). The normal case is that it's
! * Copy Done, but we let parseInput read that. If error, we expect
! * the state was already changed.
*/
if (msgLength == -1)
conn->asyncStatus = PGASYNC_BUSY;
*** a/src/interfaces/libpq/libpq-fe.h
--- b/src/interfaces/libpq/libpq-fe.h
***************
*** 85,90 **** typedef enum
--- 85,91 ----
* contains the result tuples */
PGRES_COPY_OUT, /* Copy Out data transfer in progress */
PGRES_COPY_IN, /* Copy In data transfer in progress */
+ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGRES_BAD_RESPONSE, /* an unexpected response was recv'd from the
* backend */
PGRES_NONFATAL_ERROR, /* notice or warning message */
*** a/src/interfaces/libpq/libpq-int.h
--- b/src/interfaces/libpq/libpq-int.h
***************
*** 218,224 **** typedef enum
PGASYNC_BUSY, /* query in progress */
PGASYNC_READY, /* result ready for PQgetResult */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
! PGASYNC_COPY_OUT /* Copy Out data transfer in progress */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
--- 218,225 ----
PGASYNC_BUSY, /* query in progress */
PGASYNC_READY, /* result ready for PQgetResult */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
! PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
! PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */