From 0258bb4e628c5dc5fb35b8b289d8599740e42b17 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunklau@aiven.io>
Date: Wed, 28 Jul 2021 16:35:39 +0200
Subject: [PATCH v1 2/2] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 src/bin/pg_basebackup/pg_receivewal.c        | 61 +++++++++++++++++++-
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 46 ++++++++++++++-
 2 files changed, 103 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 4474273daf..02c4d4f1a5 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -26,6 +26,7 @@
 #include "fe_utils/option_utils.h"
 #include "getopt_long.h"
 #include "libpq-fe.h"
+#include "pqexpbuffer.h"
 #include "receivelog.h"
 #include "streamutil.h"
 
@@ -190,6 +191,48 @@ close_destination_dir(DIR *dest_dir, char *dest_folder)
 	}
 }
 
+static XLogRecPtr
+GetSlotRestartLSN(const char *slot_name, uint32 *timeline)
+{
+	PGresult *res;
+	PQExpBuffer query;
+	uint32 hi,
+		   lo;
+	XLogRecPtr startpos;
+	if (slot_name == NULL)
+		return InvalidXLogRecPtr;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s",
+					  slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not acquire replication slot \"%s\": %s",
+					 slot_name, PQerrorMessage(conn));
+		PQclear(res);
+		return InvalidXLogRecPtr;
+	}
+	if (PQntuples(res) != 1 || PQnfields(res) < 4)
+	{
+		pg_log_error("could not fetch replication slot LSN: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 2);
+		PQclear(res);
+		return InvalidXLogRecPtr;
+	}
+	if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+	{
+		pg_log_error("could not parse slot's restart_lsn \"%s\"",
+					 PQgetvalue(res, 0, 0));
+		PQclear(res);
+		return InvalidXLogRecPtr;
+	}
+	startpos = ((uint64) hi) << 32 | lo;
+	*timeline = atoi(PQgetvalue(res, 0, 3));
+	PQclear(res);
+	return startpos;
+}
 
 /*
  * Determine starting location for streaming, based on any existing xlog
@@ -408,8 +451,22 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot)
+		{
+			if (PQserverVersion(conn) >= 150000)
+				stream.startpos = GetSlotRestartLSN(replication_slot, &stream.timeline);
+			else
+				pg_log_warning("Server does not suport fetching the slot's position, "
+							   "resuming from the current server position instead");
+		}
+		/* If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
 	/*
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 950083d21c..537a0d9602 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 30;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full../
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -144,6 +144,48 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+	[ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+	'creating a replication slot');
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.32.0

