>From 078dcdd696604801c898decbe478e3c99fe257a6 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 1/8] wal_decoding: Allow walsender's to connect to a specific
 database

Extend the existing 'replication' parameter to not only allow a boolean value
but also "database". If the latter is specified we connect to the database
specified in 'dbname'.

This is useful for future walsender commands which need database interaction,
e.g. changeset extraction.
---
 doc/src/sgml/protocol.sgml                         | 24 +++++++++---
 src/backend/postmaster/postmaster.c                | 23 ++++++++++--
 .../libpqwalreceiver/libpqwalreceiver.c            |  4 +-
 src/backend/replication/walsender.c                | 43 +++++++++++++++++++---
 src/backend/utils/init/postinit.c                  |  5 +++
 src/bin/pg_basebackup/pg_basebackup.c              |  4 +-
 src/bin/pg_basebackup/pg_receivexlog.c             |  4 +-
 src/bin/pg_basebackup/receivelog.c                 |  4 +-
 src/include/replication/walsender.h                |  1 +
 9 files changed, 89 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 0b2e60e..2ea14e5 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1301,10 +1301,13 @@
 
 <para>
 To initiate streaming replication, the frontend sends the
-<literal>replication</> parameter in the startup message. This tells the
-backend to go into walsender mode, wherein a small set of replication commands
-can be issued instead of SQL statements. Only the simple query protocol can be
-used in walsender mode.
+<literal>replication</> parameter in the startup message. A boolean value
+of <literal>true</> tells the backend to go into walsender mode, wherein a
+small set of replication commands can be issued instead of SQL statements. Only
+the simple query protocol can be used in walsender mode.
+Passing a <literal>database</> as the value instructs walsender to connect to
+the database specified in the <literal>dbname</> paramter which will in future
+allow some additional commands to the ones specified below to be run.
 
 The commands accepted in walsender mode are:
 
@@ -1314,7 +1317,7 @@ The commands accepted in walsender mode are:
     <listitem>
      <para>
       Requests the server to identify itself. Server replies with a result
-      set of a single row, containing three fields:
+      set of a single row, containing four fields:
      </para>
 
      <para>
@@ -1356,6 +1359,17 @@ The commands accepted in walsender mode are:
       </listitem>
       </varlistentry>
 
+      <varlistentry>
+      <term>
+       dbname
+      </term>
+      <listitem>
+      <para>
+       Database connected to or NULL.
+      </para>
+      </listitem>
+      </varlistentry>
+
       </variablelist>
      </para>
     </listitem>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 01d2618..a31b01d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1894,10 +1894,21 @@ retry1:
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
 			{
-				if (!parse_bool(valptr, &am_walsender))
+				/*
+				 * Due to backward compatibility concerns replication is a
+				 * bybrid beast which allows the value to be either a boolean
+				 * or the string 'database'. The latter connects to a specific
+				 * database which is e.g. required for changeset extraction.
+				 */
+				if (strcmp(valptr, "database") == 0)
+				{
+					am_walsender = true;
+					am_db_walsender = true;
+				}
+				else if (!parse_bool(valptr, &am_walsender))
 					ereport(FATAL,
 							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-							 errmsg("invalid value for boolean option \"replication\"")));
+							 errmsg("invalid value for option \"replication\", legal values are false, 0, true, 1 or database")));
 			}
 			else
 			{
@@ -1983,8 +1994,12 @@ retry1:
 	if (strlen(port->user_name) >= NAMEDATALEN)
 		port->user_name[NAMEDATALEN - 1] = '\0';
 
-	/* Walsender is not related to a particular database */
-	if (am_walsender)
+	/*
+	 * Generic walsender, e.g. for streaming replication, is not connected to a
+	 * particular database. But walsenders used for logical replication need to
+	 * connect to a specific database.
+	 */
+	if (am_walsender && !am_db_walsender)
 		port->database_name[0] = '\0';
 
 	/*
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6bc0aa1..ee0f1fe 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -130,7 +130,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 						"the primary server: %s",
 						PQerrorMessage(streamConn))));
 	}
-	if (PQnfields(res) != 3 || PQntuples(res) != 1)
+	if (PQnfields(res) != 4 || PQntuples(res) != 1)
 	{
 		int			ntuples = PQntuples(res);
 		int			nfields = PQnfields(res);
@@ -138,7 +138,7 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
 		PQclear(res);
 		ereport(ERROR,
 				(errmsg("invalid response from primary server"),
-				 errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+				 errdetail("Expected 1 tuple with 4 fields, got %d tuples with %d fields.",
 						   ntuples, nfields)));
 	}
 	primary_sysid = PQgetvalue(res, 0, 0);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index afd559d..b00a91a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -46,7 +46,10 @@
 #include "access/timeline.h"
 #include "access/transam.h"
 #include "access/xlog_internal.h"
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -89,9 +92,10 @@ WalSndCtlData *WalSndCtl = NULL;
 WalSnd	   *MyWalSnd = NULL;
 
 /* Global state */
-bool		am_walsender = false;		/* Am I a walsender process ? */
+bool		am_walsender = false;		/* Am I a walsender process? */
 bool		am_cascading_walsender = false;		/* Am I cascading WAL to
-												 * another standby ? */
+												 * another standby? */
+bool		am_db_walsender = false;		/* connect to database? */
 
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
@@ -243,10 +247,12 @@ IdentifySystem(void)
 	char		tli[11];
 	char		xpos[MAXFNAMELEN];
 	XLogRecPtr	logptr;
+	char*        dbname = NULL;
 
 	/*
-	 * Reply with a result set with one row, three columns. First col is
-	 * system ID, second is timeline ID, and third is current xlog location.
+	 * Reply with a result set with one row, four columns. First col is system
+	 * ID, second is timeline ID, third is current xlog location and the fourth
+	 * contains the database name if we are connected to one.
 	 */
 
 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -265,9 +271,23 @@ IdentifySystem(void)
 
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
+	if (MyDatabaseId != InvalidOid)
+	{
+		MemoryContext cur = CurrentMemoryContext;
+
+		/* syscache access needs a transaction env. */
+		StartTransactionCommand();
+		/* make dbname live outside TX context */
+		MemoryContextSwitchTo(cur);
+		dbname = get_database_name(MyDatabaseId);
+		CommitTransactionCommand();
+		/* CommitTransactionCommand switches to TopMemoryContext */
+		MemoryContextSwitchTo(cur);
+	}
+
 	/* Send a RowDescription message */
 	pq_beginmessage(&buf, 'T');
-	pq_sendint(&buf, 3, 2);		/* 3 fields */
+	pq_sendint(&buf, 4, 2);		/* 4 fields */
 
 	/* first field */
 	pq_sendstring(&buf, "systemid");	/* col name */
@@ -295,17 +315,28 @@ IdentifySystem(void)
 	pq_sendint(&buf, -1, 2);
 	pq_sendint(&buf, 0, 4);
 	pq_sendint(&buf, 0, 2);
+
+	/* fourth field */
+	pq_sendstring(&buf, "dbname");
+	pq_sendint(&buf, 0, 4);
+	pq_sendint(&buf, 0, 2);
+	pq_sendint(&buf, TEXTOID, 4);
+	pq_sendint(&buf, -1, 2);
+	pq_sendint(&buf, 0, 4);
+	pq_sendint(&buf, 0, 2);
 	pq_endmessage(&buf);
 
 	/* Send a DataRow message */
 	pq_beginmessage(&buf, 'D');
-	pq_sendint(&buf, 3, 2);		/* # of columns */
+	pq_sendint(&buf, 4, 2);		/* # of columns */
 	pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
 	pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
 	pq_sendint(&buf, strlen(tli), 4);	/* col2 len */
 	pq_sendbytes(&buf, (char *) tli, strlen(tli));
 	pq_sendint(&buf, strlen(xpos), 4);	/* col3 len */
 	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+	pq_sendint(&buf, strlen(dbname), 4);	/* col4 len */
+	pq_sendbytes(&buf, (char *) dbname, strlen(dbname));
 
 	pq_endmessage(&buf);
 }
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 2c7f0f1..56c352c 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -725,7 +725,12 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("must be superuser or replication role to start walsender")));
+	}
 
+	if (am_walsender &&
+	    (in_dbname == NULL || in_dbname[0] == '\0') &&
+	    dboid == InvalidOid)
+	{
 		/* process any options passed in the startup packet */
 		if (MyProcPort != NULL)
 			process_startup_options(MyProcPort, am_superuser);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a1e12a8..89e2376 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1361,11 +1361,11 @@ BaseBackup(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				progname, PQntuples(res), PQnfields(res), 1, 4);
 		disconnect_and_exit(1);
 	}
 	sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 787a395..fe8aef6 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -252,11 +252,11 @@ StreamLog(void)
 				progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
 		disconnect_and_exit(1);
 	}
-	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	if (PQntuples(res) != 1 || PQnfields(res) != 4)
 	{
 		fprintf(stderr,
 				_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-				progname, PQntuples(res), PQnfields(res), 1, 3);
+				progname, PQntuples(res), PQnfields(res), 1, 4);
 		disconnect_and_exit(1);
 	}
 	servertli = atoi(PQgetvalue(res, 0, 1));
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d56a4d7..22a5340 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -534,11 +534,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			PQclear(res);
 			return false;
 		}
-		if (PQnfields(res) != 3 || PQntuples(res) != 1)
+		if (PQnfields(res) != 4 || PQntuples(res) != 1)
 		{
 			fprintf(stderr,
 					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
-					progname, PQntuples(res), PQnfields(res), 1, 3);
+					progname, PQntuples(res), PQnfields(res), 1, 4);
 			PQclear(res);
 			return false;
 		}
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2cc7ddf..5097235 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -19,6 +19,7 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
+extern bool am_db_walsender;
 extern bool wake_wal_senders;
 
 /* user-settable parameters */
-- 
1.8.4.21.g992c386.dirty

