From aeb75d82acc875252dab800009bba540ab89fec6 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@otacoo.com>
Date: Sun, 21 Sep 2014 18:16:22 +0900
Subject: [PATCH] Support dump from replication slot creation state

pg_dump logic now incorporates a couple of things to be able to manage
a replication connection for the creation of a logical slot, only way
to easily get a snapshot to get a consistent database state based on
the creation of this slot. This is a piece of the puzzle for online
upgrades, and is still useful by itself.
---
 doc/src/sgml/ref/pg_dump.sgml        | 29 ++++++++++++
 src/bin/pg_dump/pg_backup.h          |  6 ++-
 src/bin/pg_dump/pg_backup_archiver.c |  9 ++--
 src/bin/pg_dump/pg_backup_archiver.h |  3 ++
 src/bin/pg_dump/pg_backup_db.c       | 86 ++++++++++++++++++++++++++---------
 src/bin/pg_dump/pg_dump.c            | 88 +++++++++++++++++++++++++++++++++---
 6 files changed, 188 insertions(+), 33 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index eabdc62..c2d1097 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -801,6 +801,16 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+       <term><option>--plugin-name=<replaceable class="parameter">plugin_name</replaceable></option></term>
+       <listitem>
+         <para>
+          Define a decoder plugin name (see <xref linkend="logicaldecoding-output-plugin">)
+          used for the creation of slot when <option>--slot</> is defined.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--quote-all-identifiers</></term>
       <listitem>
        <para>
@@ -866,6 +876,25 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+       <term><option>--slot=<replaceable class="parameter">slot_name</replaceable></option></term>
+       <listitem>
+         <para>
+          Create a logical replication slot (see
+          <xref linkend="streaming-replication-slots"> for more details) from
+          which is taken a dump of data consistent with the point of the slot
+          creation using the synchronized snapshot it created. This is useful
+          to get a consistent image of a database before beginning to apply slot
+          changes to it as this ensures that the data integrity is maintained as
+          the same as when the replication slot was created.
+         </para>
+         <para>
+          This option needs to define a decoder plugin (see
+          <xref linkend="logicaldecoding-output-plugin">) that can be defined using <option>--plugin-name</>.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--use-set-session-authorization</></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 921bc1b..b1628e1 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -85,6 +85,8 @@ struct Archive
 	int			numWorkers;		/* number of parallel processes */
 	char	   *sync_snapshot_id;		/* sync snapshot id for parallel
 										 * operation */
+	char	   *slot_name;		/* Slot used for dump */
+	char	   *plugin_name;	/* Plugin name for slot creation */
 
 	/* info needed for string escaping */
 	int			encoding;		/* libpq code for client_encoding */
@@ -164,9 +166,11 @@ extern void ConnectDatabase(Archive *AH,
 				const char *pghost,
 				const char *pgport,
 				const char *username,
-				enum trivalue prompt_password);
+				enum trivalue prompt_password,
+				bool is_replication);
 extern void DisconnectDatabase(Archive *AHX);
 extern PGconn *GetConnection(Archive *AHX);
+extern PGconn *GetReplicationConnection(Archive *AHX);
 
 /* Called to add a TOC entry */
 extern void ArchiveEntry(Archive *AHX,
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 5476a1e..f3e6abf 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -307,7 +307,7 @@ RestoreArchive(Archive *AHX)
 
 		ConnectDatabase(AHX, ropt->dbname,
 						ropt->pghost, ropt->pgport, ropt->username,
-						ropt->promptPassword);
+						ropt->promptPassword, false);
 
 		/*
 		 * If we're talking to the DB directly, don't send comments since they
@@ -3730,7 +3730,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
 	 */
 	ConnectDatabase((Archive *) AH, ropt->dbname,
 					ropt->pghost, ropt->pgport, ropt->username,
-					ropt->promptPassword);
+					ropt->promptPassword, false);
 
 	_doSetFixedOutputState(AH);
 
@@ -4282,7 +4282,7 @@ CloneArchive(ArchiveHandle *AH)
 		/* this also sets clone->connection */
 		ConnectDatabase((Archive *) clone, ropt->dbname,
 						ropt->pghost, ropt->pgport, ropt->username,
-						ropt->promptPassword);
+						ropt->promptPassword, false);
 	}
 	else
 	{
@@ -4307,7 +4307,8 @@ CloneArchive(ArchiveHandle *AH)
 		encname = pg_encoding_to_char(AH->public.encoding);
 
 		/* this also sets clone->connection */
-		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username,
+						TRI_NO, false);
 
 		/*
 		 * Set the same encoding, whatever we set here is what we got from
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index c163f29..832d956 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -293,6 +293,9 @@ typedef struct _archiveHandle
 	ArchiverOutput outputKind;	/* Flag for what we're currently writing */
 	bool		pgCopyIn;		/* Currently in libpq 'COPY IN' mode. */
 
+	/* Replication connection */
+	PGconn	   *repConnection;	/* Connection using replication protocol */
+
 	int			loFd;			/* BLOB fd */
 	int			writingBlob;	/* Flag */
 	int			blobCount;		/* # of blobs restored */
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index 4d1d14f..d7ce0c5 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -27,18 +27,19 @@
 /* translator: this is a module name */
 static const char *modulename = gettext_noop("archiver (db)");
 
-static void _check_database_version(ArchiveHandle *AH);
+static void _check_database_version(ArchiveHandle *AH, bool is_replication);
 static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
 static void notice_processor(void *arg, const char *message);
 
 static void
-_check_database_version(ArchiveHandle *AH)
+_check_database_version(ArchiveHandle *AH, bool is_replication)
 {
 	const char *remoteversion_str;
 	int			remoteversion;
+	PGconn	   *conn = is_replication ? AH->repConnection : AH->connection;
 
-	remoteversion_str = PQparameterStatus(AH->connection, "server_version");
-	remoteversion = PQserverVersion(AH->connection);
+	remoteversion_str = PQparameterStatus(conn, "server_version");
+	remoteversion = PQserverVersion(conn);
 	if (remoteversion == 0 || !remoteversion_str)
 		exit_horribly(modulename, "could not get server_version from libpq\n");
 
@@ -194,7 +195,7 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
 	AH->savedPassword = password;
 
 	/* check for version mismatch */
-	_check_database_version(AH);
+	_check_database_version(AH, false);
 
 	PQsetNoticeProcessor(newConn, notice_processor, NULL);
 
@@ -217,13 +218,21 @@ ConnectDatabase(Archive *AHX,
 				const char *pghost,
 				const char *pgport,
 				const char *username,
-				enum trivalue prompt_password)
+				enum trivalue prompt_password,
+				bool is_replication)
 {
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
 	char	   *password = AH->savedPassword;
 	bool		new_pass;
+	PGconn	   *conn = is_replication ? AH->repConnection : AH->connection;
 
-	if (AH->connection)
+	/*
+	 * Replication connection cannot be established before a normal connection
+	 * as a check on the remote server version is necessary for compatibility.
+	 */
+	Assert((is_replication && AH->connection) || !is_replication);
+
+	if (conn)
 		exit_horribly(modulename, "already connected to a database\n");
 
 	if (prompt_password == TRI_YES && password == NULL)
@@ -240,9 +249,9 @@ ConnectDatabase(Archive *AHX,
 	 */
 	do
 	{
-#define PARAMS_ARRAY_SIZE	7
-		const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
-		const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
+#define PARAMS_CONNECT_SIZE	8
+		const char **keywords = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*keywords));
+		const char **values = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*values));
 
 		keywords[0] = "host";
 		values[0] = pghost;
@@ -259,21 +268,31 @@ ConnectDatabase(Archive *AHX,
 		keywords[6] = NULL;
 		values[6] = NULL;
 
+		/* Process replication connection for logical slot */
+		if (is_replication)
+		{
+			keywords[6] = "replication";
+			values[6] = "database";
+			keywords[7] = NULL;
+			values[7] = NULL;
+		}
+
+		/* Process regular connection */
 		new_pass = false;
-		AH->connection = PQconnectdbParams(keywords, values, true);
+		conn = PQconnectdbParams(keywords, values, true);
 
 		free(keywords);
 		free(values);
 
-		if (!AH->connection)
+		if (!conn)
 			exit_horribly(modulename, "failed to connect to database\n");
 
-		if (PQstatus(AH->connection) == CONNECTION_BAD &&
-			PQconnectionNeedsPassword(AH->connection) &&
+		if (PQstatus(conn) == CONNECTION_BAD &&
+			PQconnectionNeedsPassword(conn) &&
 			password == NULL &&
 			prompt_password != TRI_NO)
 		{
-			PQfinish(AH->connection);
+			PQfinish(conn);
 			password = simple_prompt("Password: ", 100, false);
 			if (password == NULL)
 				exit_horribly(modulename, "out of memory\n");
@@ -281,18 +300,25 @@ ConnectDatabase(Archive *AHX,
 		}
 	} while (new_pass);
 
-	AH->savedPassword = password;
-
 	/* check to see that the backend connection was successfully made */
-	if (PQstatus(AH->connection) == CONNECTION_BAD)
+	if (PQstatus(conn) == CONNECTION_BAD)
 		exit_horribly(modulename, "connection to database \"%s\" failed: %s",
-					  PQdb(AH->connection) ? PQdb(AH->connection) : "",
-					  PQerrorMessage(AH->connection));
+					  PQdb(conn) ? PQdb(conn) : "",
+					  PQerrorMessage(conn));
+
+	/* Save obtained connection to correct slot */
+	if (is_replication)
+		AH->repConnection = conn;
+	else
+		AH->connection = conn;
+
+	AH->savedPassword = password;
 
 	/* check for version mismatch */
-	_check_database_version(AH);
+	_check_database_version(AH, is_replication);
 
-	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
+	if (!is_replication)
+		PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
 }
 
 /*
@@ -306,6 +332,14 @@ DisconnectDatabase(Archive *AHX)
 	PGcancel   *cancel;
 	char		errbuf[1];
 
+	/* Disconnect replication connection if there is one */
+	if (AH->repConnection)
+	{
+		PQfinish(AH->repConnection);
+		AH->repConnection = NULL;
+	}
+
+	/* Leave if no connection */
 	if (!AH->connection)
 		return;
 
@@ -330,6 +364,14 @@ GetConnection(Archive *AHX)
 	return AH->connection;
 }
 
+PGconn *
+GetReplicationConnection(Archive *AHX)
+{
+	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+	return AH->repConnection;
+}
+
 static void
 notice_processor(void *arg, const char *message)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2915329..fdbdcaa 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -142,7 +142,8 @@ static int	enable_row_security = 0;
 
 static void help(const char *progname);
 static void setup_connection(Archive *AH, const char *dumpencoding,
-				 char *use_role);
+							 char *use_role);
+static void setup_replication_connection(Archive *AH);
 static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode);
 static void expand_schema_name_patterns(Archive *fout,
 							SimpleStringList *patterns,
@@ -280,6 +281,8 @@ main(int argc, char **argv)
 	const char *pgport = NULL;
 	const char *username = NULL;
 	const char *dumpencoding = NULL;
+	char	   *slot_name = NULL;
+	char	   *plugin_name = NULL;
 	bool		oids = false;
 	TableInfo  *tblinfo;
 	int			numTables;
@@ -361,6 +364,8 @@ main(int argc, char **argv)
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1},
 		{"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1},
+		{"slot", required_argument, NULL, 6},
+		{"plugin-name", required_argument, NULL, 7},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -538,6 +543,14 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &dumpSections);
 				break;
 
+			case 6:				/* Name of slot to be created for the dump */
+				slot_name = pg_strdup(optarg);
+				break;
+
+			case 7:				/* Plugin associated with slot created */
+				plugin_name = pg_strdup(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -645,10 +658,41 @@ main(int argc, char **argv)
 	 * Open the database using the Archiver, so it knows about it. Errors mean
 	 * death.
 	 */
-	ConnectDatabase(fout, dbname, pghost, pgport, username, prompt_password);
+	ConnectDatabase(fout, dbname, pghost, pgport, username,
+					prompt_password, false);
+
+	/* Sanity check for replication connection */
+	if (slot_name && !plugin_name)
+		exit_horribly(NULL, "Slot name is defined but plugin name is missing.\n");
+	fout->slot_name = slot_name;
+	fout->plugin_name = plugin_name;
+
+	/* Establish replication connection if necessary for logical slot creation */
+	if (fout->remoteVersion < 90400 && slot_name)
+	{
+		exit_horribly(NULL,
+		  "Logical slot creation is not supported by this server version.\n");
+	}
+	else if (slot_name)
+	{
+		ConnectDatabase(fout, dbname, pghost, pgport, username,
+						prompt_password, true);
+		setup_replication_connection(fout);
+	}
+
+	/*
+	 * Any synchronized snapshot needed for a dump may have been taken using
+	 * the replication connection so be sure to setup connection used for the
+	 * dump with a consistent set of parameters.
+	 */
 	setup_connection(fout, dumpencoding, use_role);
 
 	/*
+	 * Setup connection for dump. It may be possible that it uses a snapshot from
+	 * a replication slot.
+	 */
+
+	/*
 	 * Disable security label support if server version < v9.1.x (prevents
 	 * access to nonexistent pg_seclabel catalog)
 	 */
@@ -916,6 +960,10 @@ help(const char *progname)
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
 
+	printf(_("\nReplication slot options:\n"));
+	printf(_("  --plugin-name            Output plugin used for slot creation defined by --slot\n"));
+	printf(_("  --slot                   Slot created and used for dump\n"));
+
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=DBNAME      database to dump\n"));
 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
@@ -1039,9 +1087,16 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 		ExecuteSqlStatement(AH,
 							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
 
-
-
-	if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots)
+	/*
+	 * In this code path, a transaction snashot can be exported from a
+	 * non-replication connection, something that can be done only if
+	 * slot connection is not set for this archive handler.
+	 */
+	if ((AH->numWorkers > 1 &&
+		 AH->remoteVersion >= 90200 &&
+		 !no_synchronized_snapshots) ||
+		(AH->remoteVersion >= 90400 &&
+		 AH->slot_name))
 	{
 		if (AH->sync_snapshot_id)
 		{
@@ -1052,7 +1107,7 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 			ExecuteSqlStatement(AH, query->data);
 			destroyPQExpBuffer(query);
 		}
-		else
+		else if (!AH->slot_name)
 			AH->sync_snapshot_id = get_synchronized_snapshot(AH);
 	}
 
@@ -1066,6 +1121,27 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 }
 
 static void
+setup_replication_connection(Archive *AH)
+{
+	char		query[256];
+	PGresult   *res;
+	PGconn	   *conn = GetReplicationConnection(AH);
+
+	/* Create a slot and obtain an exported snapshot from it for the dump */
+	snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+			 AH->slot_name, AH->plugin_name);
+
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		exit_horribly(NULL, "%s: could not send replication command \"%s\": %s",
+				progname, query, PQerrorMessage(conn));
+
+	AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2));
+	PQclear(res);
+}
+
+
+static void
 setupDumpWorker(Archive *AHX, RestoreOptions *ropt)
 {
 	setup_connection(AHX, NULL, NULL);
-- 
2.1.0

