>From e4e5016a34411c2a18cccd6ab4b3f749fe283ce1 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 19 Aug 2013 13:24:30 +0200
Subject: [PATCH 6/8] wal_decoding: pg_receivellog: Introduce pg_receivexlog
 equivalent for logical changes

---
 src/backend/utils/cache/relcache.c     |   3 +
 src/bin/pg_basebackup/.gitignore       |   1 +
 src/bin/pg_basebackup/Makefile         |  11 +-
 src/bin/pg_basebackup/pg_receivellog.c | 860 +++++++++++++++++++++++++++++++++
 src/bin/pg_basebackup/receivelog.c     | 137 +-----
 src/bin/pg_basebackup/receivelog.h     |   2 +
 src/bin/pg_basebackup/streamutil.c     | 123 ++++-
 src/bin/pg_basebackup/streamutil.h     |  10 +
 8 files changed, 1023 insertions(+), 124 deletions(-)
 create mode 100644 src/bin/pg_basebackup/pg_receivellog.c

diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 5d304ce..1b66e64 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1577,6 +1577,9 @@ RelationIdGetRelation(Oid relationId)
 {
 	Relation	rd;
 
+	/* Make sure we're in a xact, even if this ends up being a cache hit */
+	Assert(IsTransactionState());
+
 	/*
 	 * first try to find reldesc in the cache
 	 */
diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore
index 1334a1f..eb2978c 100644
--- a/src/bin/pg_basebackup/.gitignore
+++ b/src/bin/pg_basebackup/.gitignore
@@ -1,2 +1,3 @@
 /pg_basebackup
 /pg_receivexlog
+/pg_receivellog
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index a707c93..c251249 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
 OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup pg_receivexlog
+all: pg_basebackup pg_receivexlog pg_receivellog
 
 pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
 	$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -28,9 +28,13 @@ pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
 pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
 	$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+pg_receivellog: pg_receivellog.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_receivellog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
 	$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
+	$(INSTALL_PROGRAM) pg_receivellog$(X) '$(DESTDIR)$(bindir)/pg_receivellog$(X)'
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
@@ -38,6 +42,9 @@ installdirs:
 uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
 	rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_receivellog$(X)'
 
 clean distclean maintainer-clean:
-	rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
+	rm -f pg_basebackup$(X) pg_receivexlog$(X) pg_receivellog$(X) \
+		pg_basebackup.o pg_receivexlog.o pg_receivellog.o \
+		$(OBJS)
diff --git a/src/bin/pg_basebackup/pg_receivellog.c b/src/bin/pg_basebackup/pg_receivellog.c
new file mode 100644
index 0000000..fc81608
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_receivellog.c
@@ -0,0 +1,860 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivellog.c - receive streaming logical log data and write it
+ *					  to a local file.
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/pg_receivellog.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "streamutil.h"
+
+#include "getopt_long.h"
+
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+
+#include "access/xlog_internal.h"
+#include "common/fe_memutils.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+/* Time to sleep between reconnection attempts */
+#define RECONNECT_SLEEP_TIME 5
+
+/* Global Options */
+static char    *outfile = NULL;
+static int		verbose = 0;
+static int		noloop = 0;
+static int		standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+static const char *slot = NULL;
+static XLogRecPtr startpos = InvalidXLogRecPtr;
+static bool 	do_init_slot = false;
+static bool 	do_start_slot = false;
+static bool 	do_stop_slot = false;
+
+/* filled pairwise with option, value. value may be NULL */
+static char	  **options;
+static size_t	noptions = 0;
+static const char *plugin = "test_decoding";
+
+/* Global State */
+static int		outfd = -1;
+static volatile bool time_to_abort = false;
+
+static void usage(void);
+static void StreamLog();
+
+static void
+usage(void)
+{
+	printf(_("%s receives PostgreSQL logical change stream.\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_("  -f, --file=FILE        receive log into this file. - for stdout\n"));
+	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+	printf(_("  -v, --verbose          output verbose messages\n"));
+	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -?, --help             show this help, then exit\n"));
+	printf(_("\nConnection options:\n"));
+	printf(_("  -d, --database=DBNAME  database to connect to\n"));
+	printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
+	printf(_("  -p, --port=PORT        database server port number\n"));
+	printf(_("  -U, --username=NAME    connect as specified database user\n"));
+	printf(_("  -w, --no-password      never prompt for password\n"));
+	printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
+	printf(_("\nReplication options:\n"));
+	printf(_("  -o, --option=NAME[=VALUE]\n"
+			 "                         Specify option NAME with optional value VAL, to be passed\n"
+			 "                         to the output plugin\n"));
+	printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (defaults to test_decoding)\n"));
+	printf(_("  -s, --status-interval=INTERVAL\n"
+			 "                         time between status packets sent to server (in seconds)\n"));
+	printf(_("  -S, --slot=SLOT        use existing replication slot SLOT instead of starting a new one\n"));
+	printf(_("  -I, --startpos=PTR     Where in an existing slot should the streaming start"));
+	printf(_("\nAction to be performed:\n"));
+	printf(_("      --init             initiate a new replication slot (for the slotname see --slot)\n"));
+	printf(_("      --start            start streaming in a replication slot (for the slotname see --slot)\n"));
+	printf(_("      --stop             stop the replication slot (for the slotname see --slot)\n"));
+	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ */
+static bool
+sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool force, bool replyRequested)
+{
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	int			len = 0;
+
+	/*
+	 * we normally don't want to send superflous feedbacks, but if
+	 * it's because of a timeout we need to, otherwise
+	 * replication_timeout will kill us.
+	 */
+	if (blockpos == startpos && !force)
+		return true;
+
+	if (verbose)
+		fprintf(stderr,
+				_("%s: confirming flush up to %X/%X (slot %s)\n"),
+				progname, (uint32) (blockpos >> 32), (uint32) blockpos,
+				slot);
+
+	replybuf[len] = 'r';
+	len += 1;
+	fe_sendint64(blockpos, &replybuf[len]);		/* write */
+	len += 8;
+	fe_sendint64(blockpos, &replybuf[len]);		/* flush */
+	len += 8;
+	fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);		/* apply */
+	len += 8;
+	fe_sendint64(now, &replybuf[len]);		/* sendTime */
+	len += 8;
+	replybuf[len] = replyRequested ? 1 : 0;		/* replyRequested */
+	len += 1;
+
+	startpos = blockpos;
+
+	if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
+	{
+		fprintf(stderr, _("%s: could not send feedback packet: %s"),
+				progname, PQerrorMessage(conn));
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+	PGresult   *res;
+	char		query[512];
+	char	   *copybuf = NULL;
+	int64		last_status = -1;
+	XLogRecPtr	logoff = InvalidXLogRecPtr;
+	int			written;
+	int			i;
+
+	/*
+	 * Connect in replication mode to the server
+	 */
+	if (!conn)
+		conn = GetConnection();
+	if (!conn)
+		/* Error message already written in GetConnection() */
+		return;
+
+	/*
+	 * Start the replication
+	 */
+	if (verbose)
+		fprintf(stderr,
+				_("%s: starting log streaming at %X/%X (slot %s)\n"),
+				progname, (uint32) (startpos >> 32), (uint32) startpos,
+				slot);
+
+	/* Initiate the replication stream at specified location */
+	written = snprintf(query, sizeof(query), "START_LOGICAL_REPLICATION \"%s\" %X/%X",
+			 slot, (uint32) (startpos >> 32), (uint32) startpos);
+
+	/*
+	 * add options to string, if present
+	 * Oh, if we just had stringinfo in src/common...
+	 */
+	if (noptions)
+		written += snprintf(query + written, sizeof(query) - written, " (");
+
+	for (i = 0; i < noptions; i++)
+	{
+		/* separator */
+		if (i > 0)
+			written += snprintf(query + written, sizeof(query) - written, ", ");
+
+		/* write option name */
+		written += snprintf(query + written, sizeof(query) - written, "\"%s\"",
+							options[(i * 2)]);
+
+		if (written >= sizeof(query) - 1)
+		{
+			fprintf(stderr, _("%s: option string too long\n"), progname);
+			exit(1); /* no point in retrying, fatal error */
+		}
+
+		/* write option name if specified */
+		if (options[(i * 2) + 1] != NULL)
+		{
+			written += snprintf(query + written, sizeof(query) - written, " '%s'",
+								options[(i * 2) + 1]);
+
+			if (written >= sizeof(query) - 1)
+			{
+				fprintf(stderr, _("%s: option string too long\n"), progname);
+				exit(1); /* no point in retrying, fatal error */
+			}
+		}
+	}
+
+	if (noptions)
+	{
+		written += snprintf(query + written, sizeof(query) - written, ")");
+		if (written >= sizeof(query) - 1)
+		{
+			fprintf(stderr, _("%s: option string too long\n"), progname);
+			exit(1); /* no point in retrying, fatal error */
+		}
+	}
+
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_COPY_BOTH)
+	{
+		fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
+				progname, query, PQresultErrorMessage(res));
+		PQclear(res);
+		goto error;
+	}
+	PQclear(res);
+
+	if (verbose)
+		fprintf(stderr,
+				_("%s: initiated streaming\n"),
+				progname);
+
+	while (!time_to_abort)
+	{
+		int			r;
+		int			bytes_left;
+		int			bytes_written;
+		int64		now;
+		int			hdr_len;
+
+		if (copybuf != NULL)
+		{
+			PQfreemem(copybuf);
+			copybuf = NULL;
+		}
+
+		/*
+		 * Potentially send a status message to the master
+		 */
+		now = feGetCurrentTimestamp();
+		if (standby_message_timeout > 0 &&
+			feTimestampDifferenceExceeds(last_status, now,
+										 standby_message_timeout))
+		{
+			/* Time to send feedback! */
+			if (!sendFeedback(conn, logoff, now, true, false))
+				goto error;
+
+			last_status = now;
+		}
+
+		r = PQgetCopyData(conn, &copybuf, 1);
+		if (r == 0)
+		{
+			/*
+			 * In async mode, and no data available. We block on reading but
+			 * not more than the specified timeout, so that we can send a
+			 * response back to the client.
+			 */
+			fd_set		input_mask;
+			struct timeval timeout;
+			struct timeval *timeoutptr;
+
+			FD_ZERO(&input_mask);
+			FD_SET(PQsocket(conn), &input_mask);
+			if (standby_message_timeout)
+			{
+				int64		targettime;
+				long		secs;
+				int			usecs;
+
+				targettime = last_status + (standby_message_timeout - 1) *
+					((int64) 1000);
+				feTimestampDifference(now,
+									  targettime,
+									  &secs,
+									  &usecs);
+				if (secs <= 0)
+					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+				else
+					timeout.tv_sec = secs;
+				timeout.tv_usec = usecs;
+				timeoutptr = &timeout;
+			}
+			else
+				timeoutptr = NULL;
+
+			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+			if (r == 0 || (r < 0 && errno == EINTR))
+			{
+				/*
+				 * Got a timeout or signal. Continue the loop and either
+				 * deliver a status packet to the server or just go back into
+				 * blocking.
+				 */
+				continue;
+			}
+			else if (r < 0)
+			{
+				fprintf(stderr, _("%s: select() failed: %s\n"),
+						progname, strerror(errno));
+				goto error;
+			}
+			/* Else there is actually data on the socket */
+			if (PQconsumeInput(conn) == 0)
+			{
+				fprintf(stderr,
+						_("%s: could not receive data from WAL stream: %s"),
+						progname, PQerrorMessage(conn));
+				goto error;
+			}
+			continue;
+		}
+		if (r == -1)
+			/* End of copy stream */
+			break;
+		if (r == -2)
+		{
+			fprintf(stderr, _("%s: could not read COPY data: %s"),
+					progname, PQerrorMessage(conn));
+			goto error;
+		}
+
+		/* Check the message type. */
+		if (copybuf[0] == 'k')
+		{
+			int			pos;
+			bool		replyRequested;
+
+			/*
+			 * Parse the keepalive message, enclosed in the CopyData message.
+			 * We just check if the server requested a reply, and ignore the
+			 * rest.
+			 */
+			pos = 1;			/* skip msgtype 'k' */
+			pos += 8;			/* skip walEnd */
+			pos += 8;			/* skip sendTime */
+
+			if (r < pos + 1)
+			{
+				fprintf(stderr, _("%s: streaming header too small: %d\n"),
+						progname, r);
+				goto error;
+			}
+			replyRequested = copybuf[pos];
+
+			/* If the server requested an immediate reply, send one. */
+			if (replyRequested)
+			{
+				now = feGetCurrentTimestamp();
+				if (!sendFeedback(conn, logoff, now, false, false))
+					goto error;
+				last_status = now;
+			}
+			continue;
+		}
+		else if (copybuf[0] != 'w')
+		{
+			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+					progname, copybuf[0]);
+			goto error;
+		}
+
+
+		/*
+		 * Read the header of the XLogData message, enclosed in the CopyData
+		 * message. We only need the WAL location field (dataStart), the rest
+		 * of the header is ignored.
+		 */
+		hdr_len = 1;			/* msgtype 'w' */
+		hdr_len += 8;			/* dataStart */
+		hdr_len += 8;			/* walEnd */
+		hdr_len += 8;			/* sendTime */
+		if (r < hdr_len + 1)
+		{
+			fprintf(stderr, _("%s: streaming header too small: %d\n"),
+					progname, r);
+			goto error;
+		}
+
+		/* Extract WAL location for this block */
+		{
+			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
+
+			logoff = Max(temp, logoff);
+		}
+
+		if (outfd == -1 && strcmp(outfile, "-") == 0)
+		{
+			outfd = fileno(stdout);
+		}
+		else if (outfd == -1)
+		{
+			outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
+						 S_IRUSR | S_IWUSR);
+			if (outfd == -1)
+			{
+				fprintf(stderr,
+						_("%s: could not open log file \"%s\": %s\n"),
+						progname, outfile, strerror(errno));
+				goto error;
+			}
+		}
+
+		bytes_left = r - hdr_len;
+		bytes_written = 0;
+
+
+		while (bytes_left)
+		{
+			int			ret;
+
+			ret = write(outfd,
+						copybuf + hdr_len + bytes_written,
+						bytes_left);
+
+			if (ret < 0)
+			{
+				fprintf(stderr,
+				  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+						progname, bytes_left, outfile,
+						strerror(errno));
+				goto error;
+			}
+
+			/* Write was successful, advance our position */
+			bytes_written += ret;
+			bytes_left -= ret;
+		}
+
+		if (write(outfd, "\n", 1) != 1)
+		{
+			fprintf(stderr,
+				  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+					progname, 1, outfile,
+					strerror(errno));
+			goto error;
+		}
+	}
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr,
+				_("%s: unexpected termination of replication stream: %s"),
+				progname, PQresultErrorMessage(res));
+		goto error;
+	}
+	PQclear(res);
+
+	if (copybuf != NULL)
+		PQfreemem(copybuf);
+
+	if (outfd != -1 && close(outfd) != 0)
+		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+				progname, outfile, strerror(errno));
+	outfd = -1;
+error:
+	PQfinish(conn);
+	conn = NULL;
+}
+
+/*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+#ifndef WIN32
+
+static void
+sigint_handler(int signum)
+{
+	time_to_abort = true;
+}
+#endif
+
+int
+main(int argc, char **argv)
+{
+	PGresult   *res;
+	static struct option long_options[] = {
+/* general options */
+		{"file", required_argument, NULL, 'f'},
+		{"no-loop", no_argument, NULL, 'n'},
+		{"verbose", no_argument, NULL, 'v'},
+		{"version", no_argument, NULL, 'V'},
+		{"help", no_argument, NULL, '?'},
+/* connnection options */
+		{"database", required_argument, NULL, 'd'},
+		{"host", required_argument, NULL, 'h'},
+		{"port", required_argument, NULL, 'p'},
+		{"username", required_argument, NULL, 'U'},
+		{"no-password", no_argument, NULL, 'w'},
+		{"password", no_argument, NULL, 'W'},
+/* replication options */
+		{"option", required_argument, NULL, 'o'},
+		{"plugin", required_argument, NULL, 'P'},
+		{"status-interval", required_argument, NULL, 's'},
+		{"slot", required_argument, NULL, 'S'},
+		{"startpos", required_argument, NULL, 'I'},
+/* action */
+		{"init", no_argument, NULL, 1},
+		{"start", no_argument, NULL, 2},
+		{"stop", no_argument, NULL, 3},
+		{NULL, 0, NULL, 0}
+	};
+	int			c;
+	int			option_index;
+	uint32		hi,
+				lo;
+
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivellog"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0 ||
+				 strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_receivellog (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	while ((c = getopt_long(argc, argv, "f:nvd:h:o:p:U:wWP:s:S:",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+/* general options */
+			case 'f':
+				outfile = pg_strdup(optarg);
+				break;
+			case 'n':
+				noloop = 1;
+				break;
+			case 'v':
+				verbose++;
+				break;
+/* connnection options */
+			case 'd':
+				dbname = pg_strdup(optarg);
+				break;
+			case 'h':
+				dbhost = pg_strdup(optarg);
+				break;
+			case 'p':
+				if (atoi(optarg) <= 0)
+				{
+					fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				dbport = pg_strdup(optarg);
+				break;
+			case 'U':
+				dbuser = pg_strdup(optarg);
+				break;
+			case 'w':
+				dbgetpassword = -1;
+				break;
+			case 'W':
+				dbgetpassword = 1;
+				break;
+/* replication options */
+			case 'o':
+				{
+					char *data = pg_strdup(optarg);
+					char *val = strchr(data, '=');
+
+					if (val != NULL)
+					{
+						/* remove =; separate data from val */
+						*val = '\0';
+						val++;
+					}
+
+					noptions += 1;
+					options = pg_realloc(options, sizeof(char*) * noptions * 2);
+
+					options[(noptions - 1) * 2] = data;
+					options[(noptions - 1) * 2 + 1] = val;
+				}
+
+				break;
+			case 'P':
+				plugin = pg_strdup(optarg);
+				break;
+			case 's':
+				standby_message_timeout = atoi(optarg) * 1000;
+				if (standby_message_timeout < 0)
+				{
+					fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
+			case 'S':
+				slot = pg_strdup(optarg);
+				break;
+			case 'I':
+				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+				{
+					fprintf(stderr,
+							_("%s: could not parse start position \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				startpos = ((uint64) hi) << 32 | lo;
+				break;
+/* action */
+			case 1:
+				do_init_slot = true;
+				break;
+			case 2:
+				do_start_slot = true;
+				break;
+			case 3:
+				do_stop_slot = true;
+				break;
+
+			default:
+
+				/*
+				 * getopt_long already emitted a complaint
+				 */
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+						progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		fprintf(stderr,
+				_("%s: too many command-line arguments (first is \"%s\")\n"),
+				progname, argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (slot == NULL)
+	{
+		fprintf(stderr, _("%s: no slot specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (!do_stop_slot && outfile == NULL)
+	{
+		fprintf(stderr, _("%s: no target file specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (!do_stop_slot && dbname == NULL)
+	{
+		fprintf(stderr, _("%s: no database specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (!do_stop_slot && !do_init_slot && !do_start_slot)
+	{
+		fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (do_stop_slot && (do_init_slot || do_start_slot))
+	{
+		fprintf(stderr, _("%s: --stop cannot be combined with --init or --start\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	if (startpos && (do_init_slot || do_stop_slot))
+	{
+		fprintf(stderr, _("%s: --startpos cannot be combined with --init or --stop\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+#ifndef WIN32
+	pqsignal(SIGINT, sigint_handler);
+#endif
+
+	/*
+	 * don't really need this but it actually helps to get more precise error
+	 * messages about authentication, required GUCs and such without starting
+	 * to loop around connection attempts lateron.
+	 */
+	{
+		conn = GetConnection();
+		if (!conn)
+			/* Error message already written in GetConnection() */
+			exit(1);
+
+		/*
+		 * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+		 * position.
+		 */
+		res = PQexec(conn, "IDENTIFY_SYSTEM");
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+					progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+			disconnect_and_exit(1);
+		}
+
+		if (PQntuples(res) != 1 || PQnfields(res) != 4)
+		{
+			fprintf(stderr,
+					_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
+					progname, PQntuples(res), PQnfields(res), 1, 4);
+			disconnect_and_exit(1);
+		}
+		PQclear(res);
+	}
+
+
+	/*
+	 * stop a replication slot
+	 */
+	if (do_stop_slot)
+	{
+		char		query[256];
+
+		if (verbose)
+			fprintf(stderr,
+					_("%s: init replication slot \"%s\"\n"),
+					progname, slot);
+
+		snprintf(query, sizeof(query), "FREE_LOGICAL_REPLICATION \"%s\"",
+				 slot);
+		res = PQexec(conn, query);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+					progname, query, PQerrorMessage(conn));
+			disconnect_and_exit(1);
+		}
+
+		if (PQntuples(res) != 0 || PQnfields(res) != 0)
+		{
+			fprintf(stderr,
+					_("%s: could not stop logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
+					progname, PQntuples(res), PQnfields(res), 0, 0);
+			disconnect_and_exit(1);
+		}
+
+		PQclear(res);
+		disconnect_and_exit(0);
+	}
+
+	/*
+	 * init a replication slot
+	 */
+	if (do_init_slot)
+	{
+		char		query[256];
+
+		if (verbose)
+			fprintf(stderr,
+					_("%s: init replication slot \"%s\"\n"),
+					progname, slot);
+
+		snprintf(query, sizeof(query), "INIT_LOGICAL_REPLICATION \"%s\" \"%s\"",
+				 slot, plugin);
+
+		res = PQexec(conn, query);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+					progname, query, PQerrorMessage(conn));
+			disconnect_and_exit(1);
+		}
+
+		if (PQntuples(res) != 1 || PQnfields(res) != 4)
+		{
+			fprintf(stderr,
+					_("%s: could not init logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
+					progname, PQntuples(res), PQnfields(res), 1, 4);
+			disconnect_and_exit(1);
+		}
+
+		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+		{
+			fprintf(stderr,
+					_("%s: could not parse log location \"%s\"\n"),
+					progname, PQgetvalue(res, 0, 1));
+			disconnect_and_exit(1);
+		}
+		startpos = ((uint64) hi) << 32 | lo;
+
+		slot = strdup(PQgetvalue(res, 0, 0));
+		PQclear(res);
+	}
+
+
+	if (!do_start_slot)
+		disconnect_and_exit(0);
+
+	while (true)
+	{
+		StreamLog();
+		if (time_to_abort)
+		{
+			/*
+			 * We've been Ctrl-C'ed. That's not an error, so exit without an
+			 * errorcode.
+			 */
+			disconnect_and_exit(0);
+		}
+		else if (noloop)
+		{
+			fprintf(stderr, _("%s: disconnected.\n"), progname);
+			exit(1);
+		}
+		else
+		{
+			fprintf(stderr,
+			/* translator: check source for value for %d */
+					_("%s: disconnected. Waiting %d seconds to try again.\n"),
+					progname, RECONNECT_SLEEP_TIME);
+			pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
+		}
+	}
+}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 22a5340..f027e1e 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -11,21 +11,18 @@
  *		  src/bin/pg_basebackup/receivelog.c
  *-------------------------------------------------------------------------
  */
+
 #include "postgres_fe.h"
 
-#include <sys/stat.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <unistd.h>
-/* for ntohl/htonl */
-#include <netinet/in.h>
-#include <arpa/inet.h>
+/* local includes */
+#include "receivelog.h"
+#include "streamutil.h"
 
 #include "libpq-fe.h"
 #include "access/xlog_internal.h"
 
-#include "receivelog.h"
-#include "streamutil.h"
+#include <sys/stat.h>
+#include <unistd.h>
 
 
 /* fd and filename for currently open WAL file */
@@ -193,63 +190,6 @@ close_walfile(char *basedir, char *partial_suffix)
 
 
 /*
- * Local version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
- */
-static int64
-localGetCurrentTimestamp(void)
-{
-	int64		result;
-	struct timeval tp;
-
-	gettimeofday(&tp, NULL);
-
-	result = (int64) tp.tv_sec -
-		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
-	result = (result * USECS_PER_SEC) + tp.tv_usec;
-
-	return result;
-}
-
-/*
- * Local version of TimestampDifference(), since we are not linked with
- * backend code.
- */
-static void
-localTimestampDifference(int64 start_time, int64 stop_time,
-						 long *secs, int *microsecs)
-{
-	int64		diff = stop_time - start_time;
-
-	if (diff <= 0)
-	{
-		*secs = 0;
-		*microsecs = 0;
-	}
-	else
-	{
-		*secs = (long) (diff / USECS_PER_SEC);
-		*microsecs = (int) (diff % USECS_PER_SEC);
-	}
-}
-
-/*
- * Local version of TimestampDifferenceExceeds(), since we are not
- * linked with backend code.
- */
-static bool
-localTimestampDifferenceExceeds(int64 start_time,
-								int64 stop_time,
-								int msec)
-{
-	int64		diff = stop_time - start_time;
-
-	return (diff >= msec * INT64CONST(1000));
-}
-
-/*
  * Check if a timeline history file exists.
  */
 static bool
@@ -369,47 +309,6 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
 }
 
 /*
- * Converts an int64 to network byte order.
- */
-static void
-sendint64(int64 i, char *buf)
-{
-	uint32		n32;
-
-	/* High order half first, since we're doing MSB-first */
-	n32 = (uint32) (i >> 32);
-	n32 = htonl(n32);
-	memcpy(&buf[0], &n32, 4);
-
-	/* Now the low order half */
-	n32 = (uint32) i;
-	n32 = htonl(n32);
-	memcpy(&buf[4], &n32, 4);
-}
-
-/*
- * Converts an int64 from network byte order to native format.
- */
-static int64
-recvint64(char *buf)
-{
-	int64		result;
-	uint32		h32;
-	uint32		l32;
-
-	memcpy(&h32, buf, 4);
-	memcpy(&l32, buf + 4, 4);
-	h32 = ntohl(h32);
-	l32 = ntohl(l32);
-
-	result = h32;
-	result <<= 32;
-	result |= l32;
-
-	return result;
-}
-
-/*
  * Send a Standby Status Update message to server.
  */
 static bool
@@ -420,13 +319,13 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 
 	replybuf[len] = 'r';
 	len += 1;
-	sendint64(blockpos, &replybuf[len]);		/* write */
+	fe_sendint64(blockpos, &replybuf[len]);		/* write */
 	len += 8;
-	sendint64(InvalidXLogRecPtr, &replybuf[len]);		/* flush */
+	fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);		/* flush */
 	len += 8;
-	sendint64(InvalidXLogRecPtr, &replybuf[len]);		/* apply */
+	fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);		/* apply */
 	len += 8;
-	sendint64(now, &replybuf[len]);		/* sendTime */
+	fe_sendint64(now, &replybuf[len]);		/* sendTime */
 	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0;		/* replyRequested */
 	len += 1;
@@ -828,9 +727,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		/*
 		 * Potentially send a status message to the master
 		 */
-		now = localGetCurrentTimestamp();
+		now = feGetCurrentTimestamp();
 		if (still_sending && standby_message_timeout > 0 &&
-			localTimestampDifferenceExceeds(last_status, now,
+			feTimestampDifferenceExceeds(last_status, now,
 											standby_message_timeout))
 		{
 			/* Time to send feedback! */
@@ -859,10 +758,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 				int			usecs;
 
 				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-				localTimestampDifference(now,
-										 targettime,
-										 &secs,
-										 &usecs);
+				feTimestampDifference(now,
+									  targettime,
+									  &secs,
+									  &usecs);
 				if (secs <= 0)
 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
 				else
@@ -966,7 +865,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			/* If the server requested an immediate reply, send one. */
 			if (replyRequested && still_sending)
 			{
-				now = localGetCurrentTimestamp();
+				now = feGetCurrentTimestamp();
 				if (!sendFeedback(conn, blockpos, now, false))
 					goto error;
 				last_status = now;
@@ -996,7 +895,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 						progname, r);
 				goto error;
 			}
-			blockpos = recvint64(&copybuf[1]);
+			blockpos = fe_recvint64(&copybuf[1]);
 
 			/* Extract WAL location for this block */
 			xlogoff = blockpos % XLOG_SEG_SIZE;
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 7c983cd..f4789a5 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -1,3 +1,5 @@
+#include "libpq-fe.h"
+
 #include "access/xlogdefs.h"
 
 /*
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1dfb80f..c8d436d 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -11,17 +11,35 @@
  *-------------------------------------------------------------------------
  */
 
-#include "postgres_fe.h"
+/*
+ * We have to use postgres.h not postgres_fe.h here, because there's
+ * backend-only stuff in the datetime include files we need.  But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+#define FRONTEND 1
+#include "postgres.h"
+
 #include "streamutil.h"
 
+#include "common/fe_memutils.h"
+#include "utils/datetime.h"
+
 #include <stdio.h>
 #include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+/* for ntohl/htonl */
+#include <netinet/in.h>
+#include <arpa/inet.h>
 
 const char *progname;
 char	   *connection_string = NULL;
 char	   *dbhost = NULL;
 char	   *dbuser = NULL;
 char	   *dbport = NULL;
+char	   *dbname = NULL;
 int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
 static char *dbpassword = NULL;
 PGconn	   *conn = NULL;
@@ -86,10 +104,10 @@ GetConnection(void)
 	}
 
 	keywords[i] = "dbname";
-	values[i] = "replication";
+	values[i] = dbname == NULL ? "replication" : dbname;
 	i++;
 	keywords[i] = "replication";
-	values[i] = "true";
+	values[i] = dbname == NULL ? "true" : "database";
 	i++;
 	keywords[i] = "fallback_application_name";
 	values[i] = progname;
@@ -210,3 +228,102 @@ GetConnection(void)
 		return tmpconn;
 	}
 }
+
+
+/*
+ * Frontend version of GetCurrentTimestamp(), since we are not linked with
+ * backend code. The protocol always uses integer timestamps, regardless of
+ * server setting.
+ */
+int64
+feGetCurrentTimestamp(void)
+{
+	int64		result;
+	struct timeval tp;
+
+	gettimeofday(&tp, NULL);
+
+	result = (int64) tp.tv_sec -
+		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+	result = (result * USECS_PER_SEC) + tp.tv_usec;
+
+	return result;
+}
+
+/*
+ * Frontend version of TimestampDifference(), since we are not linked with
+ * backend code.
+ */
+void
+feTimestampDifference(int64 start_time, int64 stop_time,
+						 long *secs, int *microsecs)
+{
+	int64		diff = stop_time - start_time;
+
+	if (diff <= 0)
+	{
+		*secs = 0;
+		*microsecs = 0;
+	}
+	else
+	{
+		*secs = (long) (diff / USECS_PER_SEC);
+		*microsecs = (int) (diff % USECS_PER_SEC);
+	}
+}
+
+/*
+ * Frontend version of TimestampDifferenceExceeds(), since we are not
+ * linked with backend code.
+ */
+bool
+feTimestampDifferenceExceeds(int64 start_time,
+								int64 stop_time,
+								int msec)
+{
+	int64		diff = stop_time - start_time;
+
+	return (diff >= msec * INT64CONST(1000));
+}
+
+/*
+ * Converts an int64 to network byte order.
+ */
+void
+fe_sendint64(int64 i, char *buf)
+{
+	uint32		n32;
+
+	/* High order half first, since we're doing MSB-first */
+	n32 = (uint32) (i >> 32);
+	n32 = htonl(n32);
+	memcpy(&buf[0], &n32, 4);
+
+	/* Now the low order half */
+	n32 = (uint32) i;
+	n32 = htonl(n32);
+	memcpy(&buf[4], &n32, 4);
+}
+
+/*
+ * Converts an int64 from network byte order to native format.
+ */
+int64
+fe_recvint64(char *buf)
+{
+	int64		result;
+	uint32		h32;
+	uint32		l32;
+
+	memcpy(&h32, buf, 4);
+	memcpy(&l32, buf + 4, 4);
+	h32 = ntohl(h32);
+	l32 = ntohl(l32);
+
+	result = h32;
+	result <<= 32;
+	result |= l32;
+
+	return result;
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 77d6b86..4286df8 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -5,6 +5,7 @@ extern char *connection_string;
 extern char *dbhost;
 extern char *dbuser;
 extern char *dbport;
+extern char *dbname;
 extern int	dbgetpassword;
 
 /* Connection kept global so we can disconnect easily */
@@ -17,3 +18,12 @@ extern PGconn *conn;
 	}
 
 extern PGconn *GetConnection(void);
+
+extern int64 feGetCurrentTimestamp(void);
+extern void feTimestampDifference(int64 start_time, int64 stop_time,
+									 long *secs, int *microsecs);
+
+extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time,
+											int msec);
+extern void fe_sendint64(int64 i, char *buf);
+extern int64 fe_recvint64(char *buf);
-- 
1.8.4.21.g992c386.dirty

