From 935cdeef089c985867c9d18df96caca1c751f02d Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunklau@aiven.io>
Date: Wed, 28 Jul 2021 16:35:39 +0200
Subject: [PATCH v2 2/3] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 src/bin/pg_basebackup/pg_receivewal.c        | 26 ++++++++-
 src/bin/pg_basebackup/streamutil.c           | 61 ++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |  2 +
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 46 ++++++++++++++-
 4 files changed, 131 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 4474273daf..453b3644ca 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -26,6 +26,7 @@
 #include "fe_utils/option_utils.h"
 #include "getopt_long.h"
 #include "libpq-fe.h"
+#include "pqexpbuffer.h"
 #include "receivelog.h"
 #include "streamutil.h"
 
@@ -191,6 +192,7 @@ close_destination_dir(DIR *dest_dir, char *dest_folder)
 }
 
 
+
 /*
  * Determine starting location for streaming, based on any existing xlog
  * segments in the directory. We start at the end of the last one that is
@@ -408,8 +410,28 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot)
+		{
+			if (PQserverVersion(conn) >= 150000)
+			{
+				if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline))
+					pg_log_warning("Could not fetch the replication_slot \"%s\" information "
+								   "resuming from the current server position instead", replication_slot);
+			}
+			else
+				pg_log_warning("Server does not suport fetching the slot's position, "
+							   "resuming from the current server position instead");
+		}
+
+		/*
+		 * If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
 	/*
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index f5b3b476e5..c902824617 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -397,6 +397,8 @@ RetrieveDataDirCreatePerm(PGconn *conn)
 	return true;
 }
 
+
+
 /*
  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
  * some result information if requested:
@@ -479,6 +481,65 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Check a replication slot exists through a given connection, and give back to
+ * caller some result information if requested:
+ * 	- restart_lsn
+ * 	- timeline
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, uint32 *restart_lsn_timeline)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+	uint32		hi,
+				lo;
+
+	if (slot_name == NULL)
+		return InvalidXLogRecPtr;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s",
+					  slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return InvalidXLogRecPtr;
+	}
+	if (PQntuples(res) == 0)
+	{
+		pg_log_error("replication slot %s does not exist", slot_name);
+		PQclear(0);
+		return false;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) < 4)
+	{
+		pg_log_error("could not fetch replication slot LSN: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 2);
+		PQclear(res);
+		return false;
+	}
+	if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+	{
+		pg_log_error("could not parse slot's restart_lsn \"%s\"",
+					 PQgetvalue(res, 0, 0));
+		PQclear(res);
+		return false;
+	}
+	if (restart_lsn)
+		*restart_lsn = ((uint64) hi) << 32 | lo;
+	if (restart_lsn_timeline)
+		*restart_lsn_timeline = atoi(PQgetvalue(res, 0, 3));
+	PQclear(res);
+	return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 504803b976..e34f986136 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -40,6 +40,8 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
 							  XLogRecPtr *startpos,
 							  char **db_name);
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, uint32 *restart_lsn_timeline);
+
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..4f0f6956b5 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 30;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full../
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -146,6 +146,48 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+	[ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+	'creating a replication slot');
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.32.0

