From 9fa01789f663975b963c26875b70857055cadb9b Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunklau@aiven.io>
Date: Wed, 28 Jul 2021 16:34:54 +0200
Subject: [PATCH v3 1/3] Add READ_REPLICATION_SLOT command.

This commit introduces a new READ_REPLICATION_SLOT <slot_name> command.
This command is used to read information about a replication slot when
using a physical replication connection.

In this first version it returns the slot type, restart_lsn, flush_lsn and
the timeline of the restart_lsn and flush_lsn, which are obtained by following the
current timeline history.
---
 doc/src/sgml/protocol.sgml                  |  66 ++++++++++
 src/backend/replication/repl_gram.y         |  16 ++-
 src/backend/replication/repl_scanner.l      |   1 +
 src/backend/replication/walsender.c         | 129 ++++++++++++++++++++
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/replnodes.h               |  10 ++
 src/test/recovery/t/001_stream_rep.pl       |  46 ++++++-
 src/test/recovery/t/006_logical_decoding.pl |  13 +-
 8 files changed, 279 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a232546b1d..0e6fb01054 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,72 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+      <para>
+      Read information about the named replication slot.
+      This is useful to determine which WAL location we should be asking the
+      server to start streaming at.
+      </para>
+      <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+        <variablelist>
+          <varlistentry>
+            <term><literal>type</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's type, either <literal>physical</literal> or
+               <literal>logical</literal>
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's <literal>restart_lsn</literal>.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's <literal>confirmed_flush_lsn</literal>.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the <literal>restart_lsn</literal> position,
+               when following the current timeline history
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_tli</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the <literal>confirmed_flush_lsn</literal>
+               position, when following the current timeline history
+              </para>
+            </listitem>
+          </varlistentry>
+        </variablelist>
+      </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index e1e8ec29cc..5de003b7dc 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				identify_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:	';'
 
 command:
 			identify_system
+			| identify_replication_slot
 			| base_backup
 			| start_replication
 			| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+identify_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..de50f06e90 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,127 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd * cmd)
+{
+	ReplicationSlot *slot;
+	ReplicationSlot slot_contents;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[5];
+	bool		nulls[5];
+	char		xloc[MAXFNAMELEN];
+	int			i = 0;
+	List	   *timeline_history = NIL;
+	TimeLineID	slots_position_timeline;
+	bool		has_value;
+
+	tupdesc = CreateTemplateTupleDesc(5);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_tli",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_tli",
+							  INT4OID, -1, 0);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+		has_value = false;
+	}
+	else
+	{
+		/* Copy slot contents while holding spinlock */
+		SpinLockAcquire(&slot->mutex);
+		slot_contents = *slot;
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+
+		if (slot_contents.data.database == InvalidOid)
+			values[i] = CStringGetTextDatum("physical");
+		else
+			values[i] = CStringGetTextDatum("logical");
+		nulls[i] = false;
+		i++;
+
+		if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+
+		}
+		i++;
+
+		if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+		}
+		i++;
+
+		/*
+		 * Now get the timeline this wal was produced on, to get to the
+		 * current timeline
+		 */
+		if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+		{
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+		}
+		i++;
+
+		if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+		{
+			if (!timeline_history)
+				timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		else
+		{
+			values[i] = 0;
+			nulls[i] = true;
+		}
+		i++;
+		has_value = true;
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	if (has_value)
+		do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1618,6 +1740,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..5f78bdd573 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -495,6 +495,7 @@ typedef enum NodeTag
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
 	T_IdentifySystemCmd,
+	T_ReadReplicationSlotCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..ec85b7d993 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char		*slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ac581c1c07..75a8c6e45e 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -252,6 +252,50 @@ ok( $ret == 0,
 	"SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres', 'READ_REPLICATION_SLOT non_existent_slot;',
+  extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+  "READ_REPLICATION_SLOT does not produce an error with non existent slot");
+ok( $stdout eq '',
+    "READ_REPLICATION_SLOT returns no tuple if a slot is non existent");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres',
+  "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+  extra_params => [ '-d', $connstr_rep ],
+  0,
+  'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres',
+  "READ_REPLICATION_SLOT $slotname;",
+  extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+  "READ_REPLICATION_SLOT does not produce an error with existing slot");
+ok( $stdout =~ 'physical\|[^|]*\|\|1\|',
+    "READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+  'postgres',
+  "DROP_REPLICATION_SLOT $slotname;",
+  extra_params => [ '-d', $connstr_rep ],
+  0,
+  'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+  'postgres',
+  "READ_REPLICATION_SLOT $slotname;",
+  extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+  "READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok( $stdout eq '',
+    "READ_REPLICATION_SLOT returns no tuple if a slot has been dropped");
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..07396f40e8 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 16;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,17 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql('template1',
+  qq[READ_REPLICATION_SLOT test_slot;],
+  replication => 'database');
+ok ($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+  'Logical replication slot can be read on any logical connection');
+($result, $stdout, $stderr) = $node_primary->psql('postgres',
+  qq[READ_REPLICATION_SLOT test_slot;],
+  replication => '1');
+ok ($stdout =~ 'logical\|[^|]*\|[^|]*\|1\|1',
+  'Logical replication slot can be read on a physical connection');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
-- 
2.32.0

