pg_recvlogical --endpos
Hi all
Here's a rebased version of my pg_recvlogical --endpos patch from the
9.5 series, updated to incoroprate Álvaro's changes.
This will be mainly useful for recovery tests as we start adding more
logical decoding testing.
See original post for more detail:
/messages/by-id/CAMsr+YHBm3mUtXb2_RD=QsnUpdT0dR8K-+GTbBgpRdYuZFmXtw@mail.gmail.com
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Add-an-optional-endpos-LSN-argument-to-pg_reclogical.patchtext/x-patch; charset=UTF-8; name=0001-Add-an-optional-endpos-LSN-argument-to-pg_reclogical.patchDownload
From 464e487e6235e1f86d06aa82a4a57716ff188579 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 12:37:40 +0800
Subject: [PATCH] Add an optional --endpos LSN argument to pg_reclogical
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
pg_recvlogical usually just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's useful
to have the ability to stop receive at a specified LSN without having
to parse the output and deal with buffering issues, etc.
Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.
Craig Ringer, Álvaro Herrera
---
doc/src/sgml/ref/pg_recvlogical.sgml | 35 +++++++++
src/bin/pg_basebackup/pg_recvlogical.c | 137 +++++++++++++++++++++++++++++----
2 files changed, 157 insertions(+), 15 deletions(-)
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index b35881f..00829a7 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -155,6 +155,41 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-E <replaceable>lsn</replaceable></option></term>
+ <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+ <listitem>
+ <para>
+ In <option>--start</option> mode, automatically stop replication
+ and exit with normal exit status 0 when receiving reaches the
+ specified LSN. If specified when not in <option>--start</option>
+ mode, an error is raised.
+ </para>
+
+ <para>
+ Note the following points:
+ <itemizedlist>
+ <listitem>
+ <para>
+ If there's a record with LSN exactly equal to <replaceable>lsn</>,
+ the record will not be output. If you want to receive up to and
+ including a given LSN, specify LSN + 1 as the desired stop point.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ The <option>--endpos</option> option is not aware of transaction
+ boundaries and may truncate output partway through a transaction.
+ Any partially output transaction will not be consumed and will be
+ replayed again when the slot is next read from. Individual messages
+ are never truncated.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--if-not-exists</option></term>
<listitem>
<para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 4c6cf70..5108222 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -37,6 +37,7 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool slot_exists_ok = false;
static bool do_start_slot = false;
@@ -60,6 +61,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void);
static void StreamLogicalLog(void);
static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+ bool keepalive, XLogRecPtr lsn);
static void
usage(void)
@@ -78,6 +82,7 @@ usage(void)
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
+ printf(_(" -E, --endpos=LSN exit upon receiving the specified LSN\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -o, --option=NAME[=VALUE]\n"
" pass option NAME with optional value VALUE to the\n"
@@ -278,6 +283,7 @@ StreamLogicalLog(void)
int bytes_written;
int64 now;
int hdr_len;
+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
@@ -451,6 +457,7 @@ StreamLogicalLog(void)
int pos;
bool replyRequested;
XLogRecPtr walEnd;
+ bool endposReached = false;
/*
* Parse the keepalive message, enclosed in the CopyData message.
@@ -473,18 +480,32 @@ StreamLogicalLog(void)
}
replyRequested = copybuf[pos];
- /* If the server requested an immediate reply, send one. */
- if (replyRequested)
+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
{
- /* fsync data, so we send a recent flush pointer */
- if (!OutputFsync(now))
- goto error;
+ /*
+ * If there's nothing to read on the socket until a keepalive
+ * we know that the server has nothing to send us; and if
+ * walEnd has passed endpos, we know nothing else can have
+ * committed before endpos. So we can bail out now.
+ */
+ endposReached = true;
+ }
- now = feGetCurrentTimestamp();
- if (!sendFeedback(conn, now, true, false))
+ /* Send a reply, if necessary */
+ if (replyRequested || endposReached)
+ {
+ if (!flushAndSendFeedback(conn, &now))
goto error;
last_status = now;
}
+
+ if (endposReached)
+ {
+ prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+ time_to_abort = true;
+ break;
+ }
+
continue;
}
else if (copybuf[0] != 'w')
@@ -494,7 +515,6 @@ StreamLogicalLog(void)
goto error;
}
-
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
@@ -512,10 +532,23 @@ StreamLogicalLog(void)
}
/* Extract WAL location for this block */
- {
- XLogRecPtr temp = fe_recvint64(©buf[1]);
+ cur_record_lsn = fe_recvint64(©buf[1]);
- output_written_lsn = Max(temp, output_written_lsn);
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+ {
+ /*
+ * We've read past our endpoint, so prepare to go away being
+ * cautious about what happens to our output data.
+ */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
+ }
+ else
+ {
+ output_written_lsn = Max(cur_record_lsn, output_written_lsn);
}
bytes_left = r - hdr_len;
@@ -557,7 +590,16 @@ StreamLogicalLog(void)
}
res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) == PGRES_COPY_OUT)
+ {
+ /*
+ * We're doing a client-initiated clean exit and have sent CopyDone to
+ * the server. We've already sent replay confirmation and fsync'd so
+ * we can just clean up the connection now.
+ */
+ goto error;
+ }
+ else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
@@ -635,6 +677,7 @@ main(int argc, char **argv)
{"password", no_argument, NULL, 'W'},
/* replication options */
{"startpos", required_argument, NULL, 'I'},
+ {"endpos", required_argument, NULL, 'E'},
{"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'},
@@ -670,7 +713,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -730,6 +773,16 @@ main(int argc, char **argv)
}
startpos = ((uint64) hi) << 32 | lo;
break;
+ case 'E':
+ if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse end position \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ endpos = ((uint64) hi) << 32 | lo;
+ break;
case 'o':
{
char *data = pg_strdup(optarg);
@@ -854,6 +907,16 @@ main(int argc, char **argv)
exit(1);
}
+ if (endpos != InvalidXLogRecPtr && !do_start_slot)
+ {
+ fprintf(stderr,
+ _("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
@@ -920,8 +983,8 @@ main(int argc, char **argv)
if (time_to_abort)
{
/*
- * We've been Ctrl-C'ed. That's not an error, so exit without an
- * errorcode.
+ * We've been Ctrl-C'ed or reached an exit limit condition. That's
+ * not an error, so exit without an errorcode.
*/
disconnect_and_exit(0);
}
@@ -940,3 +1003,47 @@ main(int argc, char **argv)
}
}
}
+
+/*
+ * Fsync our output data, and send a feedback message to the server. Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+ /* flush data, so that we send a recent flush pointer */
+ if (!OutputFsync(*now))
+ return false;
+ *now = feGetCurrentTimestamp();
+ if (!sendFeedback(conn, *now, true, false))
+ return false;
+
+ return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+ (void) PQputCopyEnd(conn, NULL);
+ (void) PQflush(conn);
+
+ if (verbose)
+ {
+ if (keepalive)
+ fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+ progname,
+ (uint32) (endpos >> 32), (uint32) endpos);
+ else
+ fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+ progname, (uint32) (endpos >> 32), (uint32) (endpos),
+ (uint32) (lsn >> 32), (uint32) lsn);
+
+ }
+}
--
2.5.5
On 01-09-2016 01:41, Craig Ringer wrote:
Here's a rebased version of my pg_recvlogical --endpos patch from the
9.5 series, updated to incoroprate Álvaro's changes.
I should review this patch in the other commitfest but was swamped with
work. The patch is almost ready but I have some points.
* We usually don't include itemlist into binary options. I suggest you
to add a new paragraph for the first item and the second one you could
add a note;
* I think you should add a small note explaining the 'endpos' behavior.
Also, I think endpos could be inclusive (since recovery also has this
behavior by default);
* I think there is a typo in those pieces of code:
+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
If you decide to be inclusive, it should be > otherwise >=.
There could be TAP tests for pg_recvlogical but it is material for
another patch.
I'll mark this patch waiting on author for your considerations.
--
Euler Taveira Timbira - http://www.timbira.com.br/
PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 19 November 2016 at 10:04, Euler Taveira <euler@timbira.com.br> wrote:
On 01-09-2016 01:41, Craig Ringer wrote:
Here's a rebased version of my pg_recvlogical --endpos patch from the
9.5 series, updated to incoroprate Álvaro's changes.I should review this patch in the other commitfest but was swamped with
work. The patch is almost ready but I have some points.* We usually don't include itemlist into binary options. I suggest you
to add a new paragraph for the first item and the second one you could
add a note;
* I think you should add a small note explaining the 'endpos' behavior.
Also, I think endpos could be inclusive (since recovery also has this
behavior by default);
* I think there is a typo in those pieces of code:+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
If you decide to be inclusive, it should be > otherwise >=.
There could be TAP tests for pg_recvlogical but it is material for
another patch.I'll mark this patch waiting on author for your considerations.
Thanks.
I've updated the patch for this. It's already posted on the logical
decoding timeline following thread, so I'll avoid repeating it here.
/messages/by-id/CAMsr+YGd5dv3zPNch6BU4UXX49NJDC9m3-Y=V5q=TNcE9QgSaQ@mail.gmail.com
I addressed the docs formatting and made endpos inclusive, and added tests.
You're right about the keepalive boundary, it should've been > not >=
. In the updated patch it's >= because endpos is now inclusive.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Monday, November 21, 2016 1:08 PM Craig Ringer wrote:
I've updated the patch for this. It's already posted on the logical
decoding timeline following thread, so I'll avoid repeating it here./messages/by-id/CAMsr+YGd5dv3zPNch6BU4UXX49NJDC9m3-Y=V5q=TNcE9QgSaQ@mail.gmail.com
I checked the latest patch.
I think that the error message shown below is a typo.
+ if (endpos != InvalidXLogRecPtr && !do_start_slot) + { + fprintf(stderr, + _("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),
The condition '!do_start_slot' is not reflected in the error message.
The patch should allow --endpos to work with --create-slot.
Also, the document explains as follows.
+ specified LSN. If specified when not in <option>--start</option> + mode, an error is raised.
So, it is better to output an error message that matches the document when changing the error message.
Regards,
Okano Naoki
Fujitsu
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 22 November 2016 at 16:52, Okano, Naoki <okano.naoki@jp.fujitsu.com> wrote:
On Monday, November 21, 2016 1:08 PM Craig Ringer wrote:
I've updated the patch for this. It's already posted on the logical
decoding timeline following thread, so I'll avoid repeating it here./messages/by-id/CAMsr+YGd5dv3zPNch6BU4UXX49NJDC9m3-Y=V5q=TNcE9QgSaQ@mail.gmail.com
I checked the latest patch.
I think that the error message shown below is a typo.+ if (endpos != InvalidXLogRecPtr && !do_start_slot) + { + fprintf(stderr, + _("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),The condition '!do_start_slot' is not reflected in the error message.
Would it be better rephrased as "--endpos can only be used with --start" ?
The patch should allow --endpos to work with --create-slot.
How? It doesn't make sense with --create-slot .
Also, the document explains as follows.
+ specified LSN. If specified when not in <option>--start</option> + mode, an error is raised.So, it is better to output an error message that matches the document when changing the error message.
Not sure I understand this.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On November 29, 2016 at 5:03 PM Craig Ringer wrote:
Would it be better rephrased as "--endpos can only be used with --start" ?
OK. I think this phrase is better than the previous phrase.
The patch should allow --endpos to work with --create-slot.
How? It doesn't make sense with --create-slot .
This patch is updated to incorporate Alvaro's changes shown below, isn't it?
/messages/by-id/20160503180658.GA59498@alvherre.pgsql
I thought that the consent in community was taken with this specification...
I could not find any mention that it did not make sense with --create-slot.
But if exists, would you let me know?
Regards,
Okano Naoki
Fujitsu
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 30 November 2016 at 09:18, Okano, Naoki <okano.naoki@jp.fujitsu.com> wrote:
On November 29, 2016 at 5:03 PM Craig Ringer wrote:
Would it be better rephrased as "--endpos can only be used with --start" ?
OK. I think this phrase is better than the previous phrase.
The patch should allow --endpos to work with --create-slot.
How? It doesn't make sense with --create-slot .
This patch is updated to incorporate Alvaro's changes shown below, isn't it?
/messages/by-id/20160503180658.GA59498@alvherre.pgsql
I thought that the consent in community was taken with this specification...
I could not find any mention that it did not make sense with --create-slot.
What would --endpos with --create-slot do?
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wednesday, November 30, 2016 10:34 AM Craig Ringer wrote:
On 30 November 2016 at 09:18, Okano, Naoki <okano(dot)naoki(at)jp(dot)fujitsu(dot)com> wrote:
On November 29, 2016 at 5:03 PM Craig Ringer wrote:
Would it be better rephrased as "--endpos can only be used with --start" ?
OK. I think this phrase is better than the previous phrase.
The patch should allow --endpos to work with --create-slot.
How? It doesn't make sense with --create-slot .
This patch is updated to incorporate Alvaro's changes shown below, isn't it?
/messages/by-id/20160503180658.GA59498@alvherre.pgsql
I thought that the consent in community was taken with this specification...I could not find any mention that it did not make sense with --create-slot.
What would --endpos with --create-slot do?
Sorry, I was misunderstanding. you are right.
I have noticed that --endpos doesn't make sense unless it is with --start.
I checked --endpos works with --create-slot and --start.
So, there is no problem with this patch.
Regards,
Okano Naoki
Fujitsu
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, Nov 30, 2016 at 4:26 PM, Okano, Naoki <okano.naoki@jp.fujitsu.com>
wrote:
On Wednesday, November 30, 2016 10:34 AM Craig Ringer wrote:
On 30 November 2016 at 09:18, Okano, Naoki <okano(dot)naoki(at)jp(dot)fujitsu(dot)com>
wrote:
On November 29, 2016 at 5:03 PM Craig Ringer wrote:
Would it be better rephrased as "--endpos can only be used with
--start" ?
OK. I think this phrase is better than the previous phrase.
The patch should allow --endpos to work with --create-slot.
How? It doesn't make sense with --create-slot .
This patch is updated to incorporate Alvaro's changes shown below,
isn't it?
GA59498%40alvherre.pgsql
I thought that the consent in community was taken with this
specification...
I could not find any mention that it did not make sense with
--create-slot.
What would --endpos with --create-slot do?
Sorry, I was misunderstanding. you are right.
I have noticed that --endpos doesn't make sense unless it is with --start.
I checked --endpos works with --create-slot and --start.
So, there is no problem with this patch.
Moved to next CF with "needs review" state.
Regards,
Hari Babu
Fujitsu Australia
Moved to next CF with "needs review" state.
Here's an updated series. It's on top of the entry
https://commitfest.postgresql.org/12/883/ for PostgresNode TAP test
enhancements.
It corresponds exactly to patches [2,3,4] in the logical decoding on
standby post at
/messages/by-id/CAMsr+YEzC=-+eV09A=ra150FjtkmTqT5Q70PiqBwytbOR3cshg@mail.gmail.com
If this is committed, the remaining decoding on standby patches will
just be the meat of the feature.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Attachments:
0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patchtext/x-patch; charset=UTF-8; name=0001-Add-an-optional-endpos-LSN-argument-to-pg_recvlogica.patchDownload
From b7458118d98204d4b44f0d3e2953a117f1ed876a Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Thu, 1 Sep 2016 12:37:40 +0800
Subject: [PATCH 1/3] Add an optional --endpos LSN argument to pg_recvlogical
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
pg_recvlogical usually just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's useful
to have the ability to stop receive at a specified LSN without having
to parse the output and deal with buffering issues, etc.
Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.
Craig Ringer, Álvaro Herrera
---
doc/src/sgml/ref/pg_recvlogical.sgml | 34 ++++++++
src/bin/pg_basebackup/pg_recvlogical.c | 145 +++++++++++++++++++++++++++++----
2 files changed, 164 insertions(+), 15 deletions(-)
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index b35881f..d066ce8 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -38,6 +38,14 @@ PostgreSQL documentation
constraints as <xref linkend="app-pgreceivexlog">, plus those for logical
replication (see <xref linkend="logicaldecoding">).
</para>
+
+ <para>
+ <command>pg_recvlogical</> has no equivalent to the logical decoding
+ SQL interface's peek and get modes. It sends replay confirmations for
+ data lazily as it receives it and on clean exit. To examine pending data on
+ a slot without consuming it, use
+ <link linkend="functions-replication"><function>pg_logical_slot_peek_changes</></>.
+ </para>
</refsect1>
<refsect1>
@@ -155,6 +163,32 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-E <replaceable>lsn</replaceable></option></term>
+ <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+ <listitem>
+ <para>
+ In <option>--start</option> mode, automatically stop replication
+ and exit with normal exit status 0 when receiving reaches the
+ specified LSN. If specified when not in <option>--start</option>
+ mode, an error is raised.
+ </para>
+
+ <para>
+ If there's a record with LSN exactly equal to <replaceable>lsn</>,
+ the record will be output.
+ </para>
+
+ <para>
+ The <option>--endpos</option> option is not aware of transaction
+ boundaries and may truncate output partway through a transaction.
+ Any partially output transaction will not be consumed and will be
+ replayed again when the slot is next read from. Individual messages
+ are never truncated.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>--if-not-exists</option></term>
<listitem>
<para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..4e6a8c2 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -40,6 +40,7 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
static bool do_create_slot = false;
static bool slot_exists_ok = false;
static bool do_start_slot = false;
@@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage(void);
static void StreamLogicalLog(void);
static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+ bool keepalive, XLogRecPtr lsn);
static void
usage(void)
@@ -81,6 +85,7 @@ usage(void)
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
+ printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -o, --option=NAME[=VALUE]\n"
" pass option NAME with optional value VALUE to the\n"
@@ -281,6 +286,7 @@ StreamLogicalLog(void)
int bytes_written;
int64 now;
int hdr_len;
+ XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
@@ -454,6 +460,7 @@ StreamLogicalLog(void)
int pos;
bool replyRequested;
XLogRecPtr walEnd;
+ bool endposReached = false;
/*
* Parse the keepalive message, enclosed in the CopyData message.
@@ -476,18 +483,32 @@ StreamLogicalLog(void)
}
replyRequested = copybuf[pos];
- /* If the server requested an immediate reply, send one. */
- if (replyRequested)
+ if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
{
- /* fsync data, so we send a recent flush pointer */
- if (!OutputFsync(now))
- goto error;
+ /*
+ * If there's nothing to read on the socket until a keepalive
+ * we know that the server has nothing to send us; and if
+ * walEnd has passed endpos, we know nothing else can have
+ * committed before endpos. So we can bail out now.
+ */
+ endposReached = true;
+ }
- now = feGetCurrentTimestamp();
- if (!sendFeedback(conn, now, true, false))
+ /* Send a reply, if necessary */
+ if (replyRequested || endposReached)
+ {
+ if (!flushAndSendFeedback(conn, &now))
goto error;
last_status = now;
}
+
+ if (endposReached)
+ {
+ prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+ time_to_abort = true;
+ break;
+ }
+
continue;
}
else if (copybuf[0] != 'w')
@@ -497,7 +518,6 @@ StreamLogicalLog(void)
goto error;
}
-
/*
* Read the header of the XLogData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
@@ -515,12 +535,23 @@ StreamLogicalLog(void)
}
/* Extract WAL location for this block */
- {
- XLogRecPtr temp = fe_recvint64(©buf[1]);
+ cur_record_lsn = fe_recvint64(©buf[1]);
- output_written_lsn = Max(temp, output_written_lsn);
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+ {
+ /*
+ * We've read past our endpoint, so prepare to go away being
+ * cautious about what happens to our output data.
+ */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
}
+ output_written_lsn = Max(cur_record_lsn, output_written_lsn);
+
bytes_left = r - hdr_len;
bytes_written = 0;
@@ -557,10 +588,29 @@ StreamLogicalLog(void)
strerror(errno));
goto error;
}
+
+ if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
+ {
+ /* endpos was exactly the record we just processed, we're done */
+ if (!flushAndSendFeedback(conn, &now))
+ goto error;
+ prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ time_to_abort = true;
+ break;
+ }
}
res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ if (PQresultStatus(res) == PGRES_COPY_OUT)
+ {
+ /*
+ * We're doing a client-initiated clean exit and have sent CopyDone to
+ * the server. We've already sent replay confirmation and fsync'd so
+ * we can just clean up the connection now.
+ */
+ goto error;
+ }
+ else if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
@@ -638,6 +688,7 @@ main(int argc, char **argv)
{"password", no_argument, NULL, 'W'},
/* replication options */
{"startpos", required_argument, NULL, 'I'},
+ {"endpos", required_argument, NULL, 'E'},
{"option", required_argument, NULL, 'o'},
{"plugin", required_argument, NULL, 'P'},
{"status-interval", required_argument, NULL, 's'},
@@ -673,7 +724,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
@@ -733,6 +784,16 @@ main(int argc, char **argv)
}
startpos = ((uint64) hi) << 32 | lo;
break;
+ case 'E':
+ if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse end position \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ endpos = ((uint64) hi) << 32 | lo;
+ break;
case 'o':
{
char *data = pg_strdup(optarg);
@@ -857,6 +918,16 @@ main(int argc, char **argv)
exit(1);
}
+ if (endpos != InvalidXLogRecPtr && !do_start_slot)
+ {
+ fprintf(stderr,
+ _("%s: --endpos may only be specified with --start\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
@@ -923,8 +994,8 @@ main(int argc, char **argv)
if (time_to_abort)
{
/*
- * We've been Ctrl-C'ed. That's not an error, so exit without an
- * errorcode.
+ * We've been Ctrl-C'ed or reached an exit limit condition. That's
+ * not an error, so exit without an errorcode.
*/
disconnect_and_exit(0);
}
@@ -943,3 +1014,47 @@ main(int argc, char **argv)
}
}
}
+
+/*
+ * Fsync our output data, and send a feedback message to the server. Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+ /* flush data to disk, so that we send a recent flush pointer */
+ if (!OutputFsync(*now))
+ return false;
+ *now = feGetCurrentTimestamp();
+ if (!sendFeedback(conn, *now, true, false))
+ return false;
+
+ return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+ (void) PQputCopyEnd(conn, NULL);
+ (void) PQflush(conn);
+
+ if (verbose)
+ {
+ if (keepalive)
+ fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+ progname,
+ (uint32) (endpos >> 32), (uint32) endpos);
+ else
+ fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+ progname, (uint32) (endpos >> 32), (uint32) (endpos),
+ (uint32) (lsn >> 32), (uint32) lsn);
+
+ }
+}
--
2.5.5
0002-Add-some-minimal-tests-for-pg_recvlogical.patchtext/x-patch; charset=US-ASCII; name=0002-Add-some-minimal-tests-for-pg_recvlogical.patchDownload
From 4a58bbc76031b966cce8b66227ffdc6785b08e33 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 3 Jan 2017 18:21:48 +0800
Subject: [PATCH 2/3] Add some minimal tests for pg_recvlogical
---
src/bin/pg_basebackup/Makefile | 2 ++
src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 46 +++++++++++++++++++++++++++
2 files changed, 48 insertions(+)
create mode 100644 src/bin/pg_basebackup/t/030_pg_recvlogical.pl
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 52ac9e9..1e54b19 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -12,6 +12,8 @@
PGFILEDESC = "pg_basebackup/pg_receivexlog/pg_recvlogical - streaming WAL and backup receivers"
PGAPPICON=win32
+EXTRA_INSTALL=contrib/test_decoding
+
subdir = src/bin/pg_basebackup
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
new file mode 100644
index 0000000..dca5ef2
--- /dev/null
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -0,0 +1,46 @@
+use strict;
+use warnings;
+use TestLib;
+use PostgresNode;
+use Test::More tests => 15;
+
+program_help_ok('pg_recvlogical');
+program_version_ok('pg_recvlogical');
+program_options_handling_ok('pg_recvlogical');
+
+my $node = get_new_node('main');
+
+# Initialize node without replication settings
+$node->init(allows_streaming => 1, has_archiving => 1);
+$node->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug1'
+log_error_verbosity = verbose
+});
+$node->dump_info;
+$node->start;
+
+$node->command_fails(['pg_recvlogical'],
+ 'pg_recvlogical needs a slot name');
+$node->command_fails(['pg_recvlogical', '-S', 'test'],
+ 'pg_recvlogical needs a database');
+$node->command_fails(['pg_recvlogical', '-S', 'test', '-d', 'postgres'],
+ 'pg_recvlogical needs an action');
+$node->command_fails(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--start'],
+ 'no destionation file');
+
+$node->command_ok(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--create-slot'],
+ 'slot created');
+
+my $slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->psql('postgres', 'CREATE TABLE test_table(x integer)');
+$node->psql('postgres', 'INSERT INTO test_table(x) SELECT y FROM generate_series(1, 10) a(y);');
+my $nextlsn = $node->safe_psql('postgres', 'SELECT pg_current_xlog_insert_location()');
+chomp($nextlsn);
+
+$node->command_ok(['pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'],
+ 'replayed a transaction');
--
2.5.5
0003-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patchtext/x-patch; charset=US-ASCII; name=0003-Add-a-pg_recvlogical-wrapper-to-PostgresNode.patchDownload
From d3b3ef86b5a91cea404c555d7a029bf2b21fb4f4 Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Tue, 15 Nov 2016 16:06:16 +0800
Subject: [PATCH 3/3] Add a pg_recvlogical wrapper to PostgresNode
---
src/test/perl/PostgresNode.pm | 75 ++++++++++++++++++++++++++++-
src/test/recovery/t/006_logical_decoding.pl | 31 +++++++++++-
2 files changed, 104 insertions(+), 2 deletions(-)
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 2f009d4..5197e80 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1124,7 +1124,7 @@ sub psql
# IPC::Run::run threw an exception. re-throw unless it's a
# timeout, which we'll handle by testing is_expired
die $exc_save
- if (blessed($exc_save) || $exc_save ne $timeout_exception);
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
$ret = undef;
@@ -1493,6 +1493,79 @@ sub slot
return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns);
}
+=pod $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
+
+Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
+corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero).
+
+Disallows pg_recvlogial from internally retrying on error by passing --no-loop.
+
+Plugin options are passed as additional keyword arguments.
+
+If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
+
+If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
+timeout is the IPC::Run::Timeout object whose is_expired method can be tested
+to check for timeout. retval is undef on timeout.
+
+=cut
+
+sub pg_recvlogical_upto
+{
+ my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_;
+ my ($stdout, $stderr);
+
+ my $timeout_exception = 'pg_recvlogical timed out';
+
+ my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname));
+ push @cmd, '--endpos', $endpos if ($endpos);
+ push @cmd, '-f', '-', '--no-loop', '--start';
+
+ while (my ($k, $v) = each %plugin_options)
+ {
+ die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
+ push @cmd, "-o", "$k=$v";
+ }
+
+ my $timeout;
+ $timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs;
+ my $ret = 0;
+
+ do {
+ local $@;
+ eval {
+ IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+ $ret = $?;
+ };
+ my $exc_save = $@;
+ if ($exc_save)
+ {
+ # IPC::Run::run threw an exception. re-throw unless it's a
+ # timeout, which we'll handle by testing is_expired
+ die $exc_save
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+ $ret = undef;
+
+ die "Got timeout exception '$exc_save' but timer not expired?!"
+ unless $timeout->is_expired;
+
+ die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
+ unless wantarray;
+ }
+ };
+
+ if (wantarray)
+ {
+ return ($ret, $stdout, $stderr, $timeout);
+ }
+ else
+ {
+ die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
+ return $stdout;
+ }
+}
+
=pod
=back
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index b80a9a9..d8cc8d3 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -1,9 +1,13 @@
# Testing of logical decoding using SQL interface and/or pg_recvlogical
+#
+# Most logical decoding tests are in contrib/test_decoding. This module
+# is for work that doesn't fit well there, like where server restarts
+# are required.
use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 5;
# Initialize master node
my $node_master = get_new_node('master');
@@ -36,5 +40,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan
chomp($result);
is($result, '', 'Decoding after fast restart repeats no rows');
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
+diag "waiting to replay $endpos";
+
+my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
+
+$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
+
# done with the node
$node_master->stop;
--
2.5.5
On 4 January 2017 at 13:37, Craig Ringer <craig@2ndquadrant.com> wrote:
Moved to next CF with "needs review" state.
Here's an updated series. It's on top of the entry
https://commitfest.postgresql.org/12/883/ for PostgresNode TAP test
enhancements.It corresponds exactly to patches [2,3,4] in the logical decoding on
standby post at
/messages/by-id/CAMsr+YEzC=-+eV09A=ra150FjtkmTqT5Q70PiqBwytbOR3cshg@mail.gmail.comIf this is committed, the remaining decoding on standby patches will
just be the meat of the feature.
Patches 2 and 3 committed for now. Please do everything else on the
logical decoding on standby posts. Thanks.
--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers