From 420059da8bd03c8e1ee6cda6991e7745385caf29 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 9 Jun 2017 13:39:37 +0900 Subject: [PATCH] Implement pg_receivewal --endpos This is primarily useful in making any regression test using this utility more deterministic as pg_receivewal cannot be started as a deamon in TAP tests. While this is less useful than the equivalent of pg_recvlogical, users can as well use it for example to enforce WAL streaming up to a end-of-backup position, to save only a minimal amount of WAL, though the end position of WAL is only known once the backup is finished, but some users may want to get that done as a two-step process with one single WAL receiver in use all the time. Use this new option to stream WAL data in a deterministic way within a new set of TAP tests. --- doc/src/sgml/ref/pg_receivewal.sgml | 16 ++++++++ src/bin/pg_basebackup/pg_receivewal.c | 35 +++++++++++++++- src/bin/pg_basebackup/t/020_pg_receivewal.pl | 61 +++++++++++++++++++++++++++- 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml index a17082bb11..80995c115b 100644 --- a/doc/src/sgml/ref/pg_receivewal.sgml +++ b/doc/src/sgml/ref/pg_receivewal.sgml @@ -93,6 +93,22 @@ PostgreSQL documentation + + + + + If specified, automatically stop replication and exit with normal + exit status 0 when receiving reaches the specified LSN. + + + + If there's a record with LSN exactly equal to lsn, + the record will be considered. + + + + + diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index 370d871660..8a272c9ef1 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -42,6 +42,8 @@ static bool slot_exists_ok = false; static bool do_drop_slot = false; static bool synchronous = false; static char *replication_slot = NULL; +static XLogRecPtr endpos = InvalidXLogRecPtr; +static bool exit_on_stop = false; static void usage(void); @@ -78,6 +80,7 @@ usage(void) printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n")); printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); + printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -s, --status-interval=SECS\n" " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); @@ -112,6 +115,16 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) progname, (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); + if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos) + { + if (verbose) + fprintf(stderr, _("%s: stopped streaming at %X/%X (timeline %u)\n"), + progname, (uint32) (xlogpos >> 32), (uint32) xlogpos, + timeline); + exit_on_stop = true; + return true; + } + /* * Note that we report the previous, not current, position here. After a * timeline switch, xlogpos points to the beginning of the segment because @@ -459,6 +472,7 @@ main(int argc, char **argv) {"version", no_argument, NULL, 'V'}, {"directory", required_argument, NULL, 'D'}, {"dbname", required_argument, NULL, 'd'}, + {"endpos", required_argument, NULL, 'E'}, {"host", required_argument, NULL, 'h'}, {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, @@ -480,6 +494,7 @@ main(int argc, char **argv) int c; int option_index; char *db_name; + uint32 hi, lo; progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup")); @@ -499,7 +514,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:", + while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:", long_options, &option_index)) != -1) { switch (c) @@ -543,6 +558,16 @@ main(int argc, char **argv) case 'S': replication_slot = pg_strdup(optarg); break; + case 'E': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse end position \"%s\"\n"), + progname, optarg); + exit(1); + } + endpos = ((uint64) hi) << 32 | lo; + break; case 'n': noloop = 1; break; @@ -721,6 +746,14 @@ main(int argc, char **argv) */ exit(0); } + else if (exit_on_stop) + { + /* + * End of streaming position has been willingly reached, so exit + * without an error code. + */ + exit(0); + } else if (noloop) { fprintf(stderr, _("%s: disconnected\n"), progname); diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl index b4cb6f729d..db060cf4f5 100644 --- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl +++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl @@ -1,8 +1,67 @@ use strict; use warnings; use TestLib; -use Test::More tests => 8; +use PostgresNode; +use Test::More tests => 15; program_help_ok('pg_receivewal'); program_version_ok('pg_receivewal'); program_options_handling_ok('pg_receivewal'); + +my $primary = get_new_node('primary'); +$primary->init(allows_streaming => 1); +$primary->start; + +my $stream_dir = $primary->basedir . '/archive_wal'; +mkdir($stream_dir); + +# Sanity checks for command line options. +command_fails(['pg_receivewal'], + 'pg_receivewal needs target directory specified'); +command_fails(['pg_receivewal', '-D', $stream_dir, '--create-slot', + '--drop-slot'], + 'failure if both --create-slot and --drop-slot specified'); +command_fails(['pg_receivewal', '-D', $stream_dir, '--create-slot'], + 'failure if --create-slot defined without --slot'); + +# Slot creation and drop +my $slot_name = 'test'; +command_ok(['pg_receivewal', '--slot', $slot_name, + '-d', $primary->connstr(), '--create-slot' ], + 'creation of replication slot'); +command_ok(['pg_receivewal', '--slot', $slot_name, + '-d', $primary->connstr(), '--drop-slot' ], + 'drop of replication slot'); + +# Generate some WAL using non-compression mode. Use --synchronous +# at the same time to add more code coverage. Switch to the next +# segment first so as subsequent restarts of pg_receivewal will +# see this segment as full and non-compressed. +$primary->psql('postgres', 'CREATE TABLE test_table(x integer);'); +$primary->psql('postgres', 'SELECT pg_switch_wal();'); +my $nextlsn = + $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); +chomp($nextlsn); +$primary->psql('postgres', + 'INSERT INTO test_table VALUES (generate_series(1,100));'); + +# Stream up to the given position. +$primary->command_ok( + [ 'pg_receivewal', '-d', $primary->connstr(), '-D', $stream_dir, + '--verbose', '--endpos', $nextlsn, '--synchronous', '--no-loop' ], + 'streaming some WAL with --synchronous and without compression'); + +# Now generate more WAL, switch to a new segment and stream +# changes using the compression mode. +$primary->psql('postgres', + 'INSERT INTO test_table VALUES (generate_series(1,100));'); +$primary->psql('postgres', 'SELECT pg_switch_wal();'); +$nextlsn = + $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); +chomp($nextlsn); +$primary->psql('postgres', + 'INSERT INTO test_table VALUES (generate_series(1,100));'); +$primary->command_ok( + [ 'pg_receivewal', '-d', $primary->connstr(), '-D', $stream_dir, + '--verbose', '--endpos', $nextlsn, '--compress', '1', '--no-loop' ], + 'streaming some WAL with with compression'); -- 2.13.1