From fff8786049326864d3ef8fe4539e1829f933f32f Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunklau@aiven.io>
Date: Wed, 28 Jul 2021 16:34:54 +0200
Subject: [PATCH v2 1/3] Add READ_REPLICATION_SLOT command.

This commit introduces a new READ_REPLICATION_SLOT <slot_name> command.
This command is used to read information about a replication slot when
using a physical replication connection.

In this first version it returns the slot type, restart_lsn, flush_lsn and
the timeline of the restart_lsn and flush_lsn, which are obtained by following the
current timeline history.
---
 doc/src/sgml/protocol.sgml             |  62 +++++++++++++++
 src/backend/replication/repl_gram.y    |  18 ++++-
 src/backend/replication/repl_scanner.l |   1 +
 src/backend/replication/walsender.c    | 106 +++++++++++++++++++++++++
 src/include/nodes/nodes.h              |   1 +
 src/include/nodes/replnodes.h          |  10 +++
 6 files changed, 197 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a232546b1d..6207171426 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,68 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+      <para>
+      Read information about the named replication slot.  This is useful to determine which WAL location we should be asking the server to start streaming at.
+      </para>
+      <para>
+      In response to this command, the server will return a one-row result set, containing the following fields:
+        <variablelist>
+          <varlistentry>
+            <term><literal>type</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's type, either <literal>physical</literal> or <literal>logical</literal>
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's restart_lsn.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's confirmed_flush LSN.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_lsn_timeline</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the restart_lsn position, when following the current timeline
+               history
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_lsn_timeline</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the confirmed_flush_lsn position, when following the current timeline
+               history
+              </para>
+            </listitem>
+          </varlistentry>
+        </variablelist>
+      </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index e1e8ec29cc..7298f44008 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				identify_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:	';'
 
 command:
 			identify_system
+			| identify_replication_slot
 			| base_backup
 			| start_replication
 			| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+identify_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
@@ -361,6 +375,8 @@ timeline_history:
 				}
 			;
 
+
+
 opt_physical:
 			K_PHYSICAL
 			| /* EMPTY */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..9a13d1c186 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+	ReplicationSlot *slot;
+	ReplicationSlot slot_contents;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc tupdesc;
+	Datum values[5];
+	bool  nulls[5];
+	char  xloc[MAXFNAMELEN];
+	int i = 0;
+	List *timeline_history = NIL;
+	TimeLineID slots_position_timeline;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("replication slot \"%s\" does not exist",
+						cmd->slotname)));
+	}
+	/* Copy slot contents while holding spinlock */
+	SpinLockAcquire(&slot->mutex);
+	slot_contents = *slot;
+	SpinLockRelease(&slot->mutex);
+	LWLockRelease(ReplicationSlotControlLock);
+
+	tupdesc = CreateTemplateTupleDesc(5);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_lsn_timeline",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_lsn_timeline",
+							  INT4OID, -1, 0);
+
+	if (slot_contents.data.database == InvalidOid)
+		values[i] = CStringGetTextDatum("physical");
+	else
+		values[i] = CStringGetTextDatum("logical");
+	nulls[i] = false;
+	i++;
+
+	snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+	values[i] = CStringGetTextDatum(xloc);
+	nulls[i] = false;
+	i++;
+
+	snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+	values[i] = CStringGetTextDatum(xloc);
+	nulls[i] = false;
+	i++;
+
+	/* Now get the timeline this wal was produced on, to get to the current
+	 * timeline
+	 * XXX: should we allow the caller to specify which target timeline it wants
+	 * ?
+	 */
+	if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+	{
+		timeline_history = readTimeLineHistory(ThisTimeLineID);
+		slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, timeline_history);
+		values[i] = Int32GetDatum(slots_position_timeline);
+		nulls[i] = false;
+	} else {
+		values[i] = 0;
+		nulls[i] = true;
+	}
+	i++;
+
+	if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+	{
+		if (!timeline_history)
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+		slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, timeline_history);
+		values[i] = Int32GetDatum(slots_position_timeline);
+		nulls[i] = false;
+	} else {
+		values[i] = 0;
+		nulls[i] = true;
+	}
+	i++;
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1618,6 +1717,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..5f78bdd573 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -495,6 +495,7 @@ typedef enum NodeTag
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
 	T_IdentifySystemCmd,
+	T_ReadReplicationSlotCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..ec85b7d993 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char		*slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *		BASE_BACKUP command
-- 
2.32.0

