pg_basebackup and wal streaming

Started by Magnus Haganderalmost 15 years ago16 messages
#1Magnus Hagander
magnus@hagander.net
1 attachment(s)

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Other than that, the only changes to pg_basebackup are the moving of a
couple of functions into streamutil.c to make them usable from both,
and the progress format output fix Fujii-san mentioned recently.

Should be complete except for Win32 support (needs thread/fork thing
for the background streaming feature. Shouldn't be too hard, and I
guess that falls on me anyway..) and the reference documentation.

And with no feedback to my question here
(http://archives.postgresql.org/pgsql-hackers/2011-02/msg00805.php), I
went with the "duplicate the macros that are needed to avoid loading
postgres.h" path.

Yes, I realize that this is far too late in the CF process really, but
I wanted to post it anyway... If it's too late to be acceptable it
should be possible to maintain this outside the main repository until
9.2, since it only changes frontend binaries. So I'm not actually
going to put it on the CF page unless someone else says that's a good
idea, to at least share the blame from Robert ;)

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

Attachments:

streaming_wal.patchtext/x-patch; charset=US-ASCII; name=streaming_wal.patchDownload
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index ccb1502..5bbe52d 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
-OBJS=	pg_basebackup.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup
+all: pg_basebackup pg_receivexlog
 
-pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
-	$(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog(X)'
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
 
 uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_receivexlog(X)'
 
 clean distclean maintainer-clean:
-	rm -f pg_basebackup$(X) $(OBJS)
+	rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 61aa1d3..7442bbe 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -17,6 +17,8 @@
 #include <unistd.h>
 #include <dirent.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
 
 #ifdef HAVE_LIBZ
 #include <zlib.h>
@@ -24,9 +26,11 @@
 
 #include "getopt_long.h"
 
+#include "receivelog.h"
+#include "streamutil.h"
+
 
 /* Global options */
-static const char *progname;
 char	   *basedir = NULL;
 char		format = 'p';		/* p(lain)/t(ar) */
 char	   *label = "pg_basebackup base backup";
@@ -34,38 +38,35 @@ bool		showprogress = false;
 int			verbose = 0;
 int			compresslevel = 0;
 bool		includewal = false;
+bool		streamwal = false;
 bool		fastcheckpoint = false;
-char	   *dbhost = NULL;
-char	   *dbuser = NULL;
-char	   *dbport = NULL;
-int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
 
 /* Progress counters */
 static uint64 totalsize;
 static uint64 totaldone;
 static int	tablespacecount;
 
-/* Connection kept global so we can disconnect easily */
-static PGconn *conn = NULL;
+/* Pipe to communicate with background wal receiver process */
+static int	bgpipe[2] = {-1, -1};
 
-#define disconnect_and_exit(code)				\
-	{											\
-	if (conn != NULL) PQfinish(conn);			\
-	exit(code);									\
-	}
+/* Handle to child process */
+static pid_t bgchild = -1;
+
+/* End position for xlog streaming, empty string if unknown yet */
+static XLogRecPtr xlogendptr;
+static bool has_xlogendptr = false;
 
 /* Function headers */
-static char *xstrdup(const char *s);
-static void *xmalloc0(int size);
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, char *fn);
-static PGconn *GetConnection(void);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup();
 
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
 #ifdef HAVE_LIBZ
 static const char *
 get_gz_error(gzFile *gzf)
@@ -81,39 +82,6 @@ get_gz_error(gzFile *gzf)
 }
 #endif
 
-/*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
-static char *
-xstrdup(const char *s)
-{
-	char	   *result;
-
-	result = strdup(s);
-	if (!result)
-	{
-		fprintf(stderr, _("%s: out of memory\n"), progname);
-		exit(1);
-	}
-	return result;
-}
-
-static void *
-xmalloc0(int size)
-{
-	void	   *result;
-
-	result = malloc(size);
-	if (!result)
-	{
-		fprintf(stderr, _("%s: out of memory\n"), progname);
-		exit(1);
-	}
-	MemSet(result, 0, size);
-	return result;
-}
-
 
 static void
 usage(void)
@@ -125,7 +93,7 @@ usage(void)
 	printf(_("\nOptions controlling the output:\n"));
 	printf(_("  -D, --pgdata=directory    receive base backup into directory\n"));
 	printf(_("  -F, --format=p|t          output format (plain, tar)\n"));
-	printf(_("  -x, --xlog                include required WAL files in backup\n"));
+	printf(_("  -x, --xlog[=stream]       include required WAL files in backup\n"));
 	printf(_("  -Z, --compress=0-9        compress tar output\n"));
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -c, --checkpoint=fast|spread\n"
@@ -146,6 +114,140 @@ usage(void)
 
 
 /*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received. Check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ */
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+	fd_set		fds;
+	struct timeval tv;
+	int			r;
+
+	if (has_xlogendptr)
+	{
+		/* Already know when to stop, compare to the position we got */
+		if (segendpos.xlogid > xlogendptr.xlogid ||
+			(segendpos.xlogid == xlogendptr.xlogid &&
+			 segendpos.xrecoff >= xlogendptr.xrecoff))
+			return true;
+	}
+
+	/*
+	 * Don't have the end pointer yet - check our pipe to see if it has been
+	 * sent now.
+	 */
+	FD_ZERO(&fds);
+	FD_SET(bgpipe[0], &fds);
+
+	MemSet(&tv, 0, sizeof(tv));
+
+	r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+	if (r == 1)
+	{
+		char		xlogend[64];
+
+		MemSet(xlogend, 0, sizeof(xlogend));
+		r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+		if (r < 0)
+		{
+			fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+					progname, strerror(errno));
+			exit(1);
+		}
+
+		if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+		{
+			fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+					progname, xlogend);
+			exit(1);
+		}
+		has_xlogendptr = true;
+
+		/* since we have a value now, call ourselves to make the comparison */
+		return segment_callback(segendpos, timeline);
+	}
+
+	/* Else nothing happened, so don't exit */
+	return false;
+}
+
+/*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use it's own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+static void
+StartLogStreamer(char *startpos, uint32 timeline)
+{
+	PGconn	   *bgconn;
+	XLogRecPtr	startptr;
+	char		xlogdir[MAXPGPATH];
+
+	/* Convert the starting position */
+	if (sscanf(startpos, "%X/%X", &startptr.xlogid, &startptr.xrecoff) != 2)
+	{
+		fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+				progname, startpos);
+		disconnect_and_exit(1);
+	}
+	/* Round off to even segment position */
+	startptr.xrecoff -= startptr.xrecoff % XLOG_SEG_SIZE;
+
+	/* Create our background pipe */
+	if (pgpipe(bgpipe) < 0)
+	{
+		fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	/* Get a second connection */
+	bgconn = GetConnection();
+
+	/*
+	 * Always in plain format, so we can write to basedir/pg_xlog. But the
+	 * directory entry in the tar file may arrive later, so make sure it's
+	 * created before we start.
+	 */
+	snprintf(xlogdir, sizeof(xlogdir), "%s/pg_xlog", basedir);
+	verify_dir_is_empty_or_create(xlogdir);
+
+	/* Fork off the child process and tell it to go about it's business */
+	/* XXX: win32 */
+	bgchild = fork();
+	if (bgchild == 0)
+	{
+		/* in child process */
+
+		if (!ReceiveXlogStream(bgconn, startptr, timeline, xlogdir, segment_callback))
+
+			/*
+			 * Any errors will already have been reported in the function
+			 * process, but we need to tell the parent that we didn't shutdown
+			 * in a nice way. Do this by exiting with an error code and expect
+			 * it to be picked up.
+			 */
+			exit(1);
+
+		PQfinish(bgconn);
+		exit(0);
+	}
+	else if (bgchild < 0)
+	{
+		fprintf(stderr, _("%s: could not create background process: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	/*
+	 * Else we are in the parent process and all is well.
+	 */
+}
+
+/*
  * Verify that the given directory exists and is empty. If it does not
  * exist, it is created. If it exists but is not empty, an error will
  * be give and the process ended.
@@ -202,13 +304,19 @@ verify_dir_is_empty_or_create(char *dirname)
 static void
 progress_report(int tablespacenum, char *fn)
 {
-	int percent = (int) ((totaldone / 1024) * 100 / totalsize);
+	int			percent = (int) ((totaldone / 1024) * 100 / totalsize);
+
 	if (percent > 100)
 		percent = 100;
 
-	if (verbose)
+	if (!fn)
+		fprintf(stderr,
+		INT64_FORMAT "/" INT64_FORMAT " kb g(100%%) %i/%i tablespaces %35s\r",
+				totaldone / 1024, totalsize,
+				tablespacenum, tablespacecount, "");
+	else if (verbose)
 		fprintf(stderr,
-				INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30s)\r",
+				INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30.30s)\r",
 				totaldone / 1024, totalsize,
 				percent,
 				tablespacenum, tablespacecount, fn);
@@ -443,11 +551,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 		strcpy(current_path, PQgetvalue(res, rownum, 1));
 
 	/*
-	 * Make sure we're unpacking into an empty directory
-	 */
-	verify_dir_is_empty_or_create(current_path);
-
-	/*
 	 * Get the COPY data
 	 */
 	res = PQgetResult(conn);
@@ -540,10 +643,18 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					fn[strlen(fn) - 1] = '\0';	/* Remove trailing slash */
 					if (mkdir(fn, S_IRWXU) != 0)
 					{
-						fprintf(stderr,
-							_("%s: could not create directory \"%s\": %s\n"),
-								progname, fn, strerror(errno));
-						disconnect_and_exit(1);
+						/*
+						 * When streaming WAL, pg_xlog will have been created
+						 * by the wal receiver process, so just ignore failure
+						 * on that.
+						 */
+						if (!streamwal || strcmp(fn + strlen(fn) - 8, "/pg_xlog") != 0)
+						{
+							fprintf(stderr,
+									_("%s: could not create directory \"%s\": %s\n"),
+									progname, fn, strerror(errno));
+							disconnect_and_exit(1);
+						}
 					}
 #ifndef WIN32
 					if (chmod(fn, (mode_t) filemode))
@@ -654,90 +765,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 }
 
 
-static PGconn *
-GetConnection(void)
-{
-	PGconn	   *tmpconn;
-	int			argcount = 4;	/* dbname, replication, fallback_app_name,
-								 * password */
-	int			i;
-	const char **keywords;
-	const char **values;
-	char	   *password = NULL;
-
-	if (dbhost)
-		argcount++;
-	if (dbuser)
-		argcount++;
-	if (dbport)
-		argcount++;
-
-	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
-	values = xmalloc0((argcount + 1) * sizeof(*values));
-
-	keywords[0] = "dbname";
-	values[0] = "replication";
-	keywords[1] = "replication";
-	values[1] = "true";
-	keywords[2] = "fallback_application_name";
-	values[2] = progname;
-	i = 3;
-	if (dbhost)
-	{
-		keywords[i] = "host";
-		values[i] = dbhost;
-		i++;
-	}
-	if (dbuser)
-	{
-		keywords[i] = "user";
-		values[i] = dbuser;
-		i++;
-	}
-	if (dbport)
-	{
-		keywords[i] = "port";
-		values[i] = dbport;
-		i++;
-	}
-
-	while (true)
-	{
-		if (dbgetpassword == 1)
-		{
-			/* Prompt for a password */
-			password = simple_prompt(_("Password: "), 100, false);
-			keywords[argcount - 1] = "password";
-			values[argcount - 1] = password;
-		}
-
-		tmpconn = PQconnectdbParams(keywords, values, true);
-		if (password)
-			free(password);
-
-		if (PQstatus(tmpconn) == CONNECTION_BAD &&
-			PQconnectionNeedsPassword(tmpconn) &&
-			dbgetpassword != -1)
-		{
-			dbgetpassword = 1;	/* ask for password next time */
-			PQfinish(tmpconn);
-			continue;
-		}
-
-		if (PQstatus(tmpconn) != CONNECTION_OK)
-		{
-			fprintf(stderr, _("%s: could not connect to server: %s\n"),
-					progname, PQerrorMessage(tmpconn));
-			exit(1);
-		}
-
-		/* Connection ok! */
-		free(values);
-		free(keywords);
-		return tmpconn;
-	}
-}
-
 static void
 BaseBackup()
 {
@@ -780,7 +807,7 @@ BaseBackup()
 	snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
 			 escaped_label,
 			 showprogress ? "PROGRESS" : "",
-			 includewal ? "WAL" : "",
+			 includewal && !streamwal ? "WAL" : "",
 			 fastcheckpoint ? "FAST" : "",
 	         includewal ? "NOWAIT" : "");
 
@@ -859,6 +886,18 @@ BaseBackup()
 	}
 
 	/*
+	 * If we're streaming WAL, start the streaming session before we start
+	 * receiving the actual data chunks.
+	 */
+	if (streamwal)
+	{
+		if (verbose)
+			fprintf(stderr, _("%s: starting background WAL receiver\n"),
+					progname);
+		StartLogStreamer(xlogstart, timeline);
+	}
+
+	/*
 	 * Start receiving chunks
 	 */
 	for (i = 0; i < PQntuples(res); i++)
@@ -871,7 +910,7 @@ BaseBackup()
 
 	if (showprogress)
 	{
-		progress_report(PQntuples(res), "");
+		progress_report(PQntuples(res), NULL);
 		fprintf(stderr, "\n");	/* Need to move to next line */
 	}
 	PQclear(res);
@@ -905,6 +944,49 @@ BaseBackup()
 		disconnect_and_exit(1);
 	}
 
+	if (bgchild != -1)
+	{
+		int			status;
+		int			r;
+
+		if (verbose)
+			fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+		if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+		{
+			fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+
+		/* Just wait for the background process to exit */
+		r = waitpid(bgchild, &status, 0);
+		if (r == -1)
+		{
+			fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+		if (r != bgchild)
+		{
+			fprintf(stderr, "%s: child %i died, expected %i\n",
+					progname, r, bgchild);
+			disconnect_and_exit(1);
+		}
+		if (!WIFEXITED(status))
+		{
+			fprintf(stderr, "%s: child process did not exit normally\n",
+					progname);
+			disconnect_and_exit(1);
+		}
+		if (WEXITSTATUS(status) != 0)
+		{
+			fprintf(stderr, "%s: child process exited with error %i\n",
+					progname, WEXITSTATUS(status));
+			disconnect_and_exit(1);
+		}
+		/* Exited normally, we're happy! */
+	}
+
 	/*
 	 * End of copy data. Final result is already checked inside the loop.
 	 */
@@ -924,7 +1006,7 @@ main(int argc, char **argv)
 		{"pgdata", required_argument, NULL, 'D'},
 		{"format", required_argument, NULL, 'F'},
 		{"checkpoint", required_argument, NULL, 'c'},
-		{"xlog", no_argument, NULL, 'x'},
+		{"xlog", optional_argument, NULL, 'x'},
 		{"compress", required_argument, NULL, 'Z'},
 		{"label", required_argument, NULL, 'l'},
 		{"host", required_argument, NULL, 'h'},
@@ -980,6 +1062,18 @@ main(int argc, char **argv)
 				break;
 			case 'x':
 				includewal = true;
+				if (optarg)
+				{
+					if (strcmp(optarg, "s") == 0 ||
+						strcmp(optarg, "stream") == 0)
+						streamwal = true;
+					else
+					{
+						fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty or \"stream\"\n"),
+								progname, optarg);
+						exit(1);
+					}
+				}
 				break;
 			case 'l':
 				label = xstrdup(optarg);
@@ -1080,6 +1174,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (format != 'p' && streamwal)
+	{
+		fprintf(stderr,
+				_("%s: wal streaming can only be used in plain mode\n"),
+				progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel > 0)
 	{
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
new file mode 100644
index 0000000..41b5bb7
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -0,0 +1,407 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ *					  to a local file.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+
+#include "postgres_fe.h"
+#include "libpq-fe.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "getopt_long.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+
+/* Global options */
+char	   *basedir = NULL;
+int			verbose = 0;
+
+
+static void usage(void);
+static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+static void StreamLog();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+/*
+ * XXX: from xlog_internal.h
+ */
+#define XLogSegsPerFile (((uint32) 0xffffffff) / XLOG_SEG_SIZE)
+#define PrevLogSeg(logId, logSeg)       \
+        do { \
+                if (logSeg) \
+                        (logSeg)--; \
+                else \
+                { \
+                        (logId)--; \
+                        (logSeg) = XLogSegsPerFile-1; \
+                } \
+        } while (0)
+
+
+static void
+usage(void)
+{
+	printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions controlling the output:\n"));
+	printf(_("  -D, --dir=directory       receive xlog files into this directory\n"));
+	printf(_("\nGeneral options:\n"));
+	printf(_("  -v, --verbose             output verbose messages\n"));
+	printf(_("  -?, --help                show this help, then exit\n"));
+	printf(_("  -V, --version             output version information, then exit\n"));
+	printf(_("\nConnection options:\n"));
+	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
+	printf(_("  -p, --port=PORT          database server port number\n"));
+	printf(_("  -U, --username=NAME      connect as specified database user\n"));
+	printf(_("  -w, --no-password        never prompt for password\n"));
+	printf(_("  -W, --password           force password prompt (should happen automatically)\n"));
+	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+	char		fn[MAXPGPATH];
+	struct stat statbuf;
+
+	if (verbose)
+		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+	/*
+	 * Check if there is a partial file for the name we just finished, and if
+	 * there is, remove it under the assumption that we have now got all the
+	 * data we need.
+	 */
+	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+			 basedir, timeline,
+			 segendpos.xlogid,
+			 segendpos.xrecoff / XLOG_SEG_SIZE);
+	if (stat(fn, &statbuf) == 0)
+	{
+		/* File existed, get rid of it */
+		if (verbose)
+			fprintf(stderr, _("%s: removing file \"%s\"\n"),
+					progname, fn);
+		unlink(fn);
+	}
+
+	/* Never abort */
+	return false;
+}
+
+/*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ *	  we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ *	  WAL segment.
+ */
+static XLogRecPtr
+FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+{
+	DIR		   *dir;
+	struct dirent *dirent;
+	int			i;
+	bool		b;
+	XLogRecPtr	high = {0, 0};
+
+	dir = opendir(basedir);
+	if (dir == NULL)
+	{
+		fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+				progname, basedir, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	while ((dirent = readdir(dir)) != NULL)
+	{
+		char		fullpath[MAXPGPATH];
+		struct stat statbuf;
+		uint32		tli,
+					log,
+					seg;
+
+		if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+			continue;
+
+		/* xlog files are always 24 characters */
+		if (strlen(dirent->d_name) != 24)
+			continue;
+
+		/* Filenames are always made out of 0-9 and A-F */
+		b = false;
+		for (i = 0; i < 24; i++)
+		{
+			if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+				!(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+			{
+				b = true;
+				break;
+			}
+		}
+		if (b)
+			continue;
+
+		/*
+		 * Looks like an xlog file. Parse it's position.
+		 */
+		if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+		{
+			fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+					progname, dirent->d_name);
+			disconnect_and_exit(1);
+		}
+		log *= XLOG_SEG_SIZE;
+
+		/* Ignore any files that are for another timeline */
+		if (tli != currenttimeline)
+			continue;
+
+		/* Check if this is a completed segment or not */
+		snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+		if (stat(fullpath, &statbuf) != 0)
+		{
+			fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+					progname, fullpath, strerror(errno));
+			disconnect_and_exit(1);
+		}
+
+		if (statbuf.st_size == 16 * 1024 * 1024)
+		{
+			/* Completed segment */
+			if (log > high.xlogid ||
+				(log == high.xlogid && seg > high.xrecoff))
+			{
+				high.xlogid = log;
+				high.xrecoff = seg;
+				continue;
+			}
+		}
+		else
+		{
+			/*
+			 * This is a partial file. Rename it out of the way.
+			 */
+			char		newfn[MAXPGPATH];
+
+			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+					progname, dirent->d_name, dirent->d_name);
+
+			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+					 basedir, dirent->d_name);
+
+			if (stat(newfn, &statbuf) == 0)
+			{
+				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+						progname, newfn);
+				disconnect_and_exit(1);
+			}
+			if (rename(fullpath, newfn) != 0)
+			{
+				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+						progname, fullpath, newfn, strerror(errno));
+				disconnect_and_exit(1);
+			}
+
+			/* Don't continue looking for more, we assume this is the last */
+			break;
+		}
+	}
+
+	closedir(dir);
+
+	if (high.xlogid > 0 && high.xrecoff > 0)
+		return high;
+
+	return currentpos;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+	PGresult   *res;
+	uint32		timeline;
+	XLogRecPtr	startpos;
+
+	/*
+	 * Connect in replication mode to the server
+	 */
+	conn = GetConnection();
+
+	/*
+	 * Run IDENFITY_SYSTEM so we can get the timeline and current xlog
+	 * position.
+	 */
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not identify system: %s\n"),
+				progname, PQerrorMessage(conn));
+		disconnect_and_exit(1);
+	}
+	if (PQntuples(res) != 1)
+	{
+		fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+				progname, PQntuples(res));
+		disconnect_and_exit(1);
+	}
+	timeline = atoi(PQgetvalue(res, 0, 1));
+	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+	{
+		fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+				progname, PQgetvalue(res, 0, 2));
+		disconnect_and_exit(1);
+	}
+	PQclear(res);
+
+	/*
+	 * Figure out where to start streaming.
+	 */
+	startpos = FindStreamingStart(startpos, timeline);
+
+	/*
+	 * Always start streaming at the beginning of a segment
+	 */
+	startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+	/*
+	 * Start the replication
+	 */
+	if (verbose)
+		fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+				progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+	ReceiveXlogStream(conn, startpos, timeline, basedir, segment_callback);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] = {
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"dir", required_argument, NULL, 'D'},
+		{"host", required_argument, NULL, 'h'},
+		{"port", required_argument, NULL, 'p'},
+		{"username", required_argument, NULL, 'U'},
+		{"no-password", no_argument, NULL, 'w'},
+		{"password", no_argument, NULL, 'W'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+	int			c;
+
+	int			option_index;
+
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	while ((c = getopt_long(argc, argv, "D:h:p:U:wWv",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				basedir = xstrdup(optarg);
+				break;
+			case 'h':
+				dbhost = xstrdup(optarg);
+				break;
+			case 'p':
+				if (atoi(optarg) <= 0)
+				{
+					fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				dbport = xstrdup(optarg);
+				break;
+			case 'U':
+				dbuser = xstrdup(optarg);
+				break;
+			case 'w':
+				dbgetpassword = -1;
+				break;
+			case 'W':
+				dbgetpassword = 1;
+				break;
+			case 'v':
+				verbose++;
+				break;
+			default:
+
+				/*
+				 * getopt_long already emitted a complaint
+				 */
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+						progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		fprintf(stderr,
+				_("%s: too many command-line arguments (first is \"%s\")\n"),
+				progname, argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (basedir == NULL)
+	{
+		fprintf(stderr, _("%s: no target directory specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	StreamLog();
+
+	exit(0);
+}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
new file mode 100644
index 0000000..3be9692
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -0,0 +1,207 @@
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ *				  replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "libpq-fe.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* XXX: from xlog_internal.h */
+#define MAXFNAMELEN		64
+#define XLogFileName(fname, tli, log, seg)	\
+	snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+	int			f;
+	char		fn[MAXPGPATH];
+
+	XLogFileName(namebuf, timeline, startpoint.xlogid,
+				 startpoint.xrecoff / XLOG_SEG_SIZE);
+
+	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+	f = open(fn, O_WRONLY | O_CREAT | O_EXCL, 0666);
+	if (f == -1)
+		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+				progname, namebuf, strerror(errno));
+	return f;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * Note: The log position *must* be at a log segment change, or we will
+ * end up streaming an incomplete file.
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish)
+{
+	char		query[128];
+	char		current_walfile_name[MAXPGPATH];
+	PGresult   *res;
+	char	   *copybuf = NULL;
+	int			walfile = -1;
+
+	/* Initiate the replication stream at specified location */
+	snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_COPY_BOTH)
+	{
+		fprintf(stderr, _("%s: could not start replication: %s\n"),
+				progname, PQresultErrorMessage(res));
+		return false;
+	}
+	PQclear(res);
+
+	/*
+	 * Receive the actual xlog data
+	 */
+	while (1)
+	{
+		XLogRecPtr	blockstart;
+		int			r;
+		int			xlogoff;
+
+		if (copybuf != NULL)
+		{
+			PQfreemem(copybuf);
+			copybuf = NULL;
+		}
+
+		r = PQgetCopyData(conn, &copybuf, 0);
+		if (r == -1)
+			/* End of copy stream */
+			break;
+		if (r == -2)
+		{
+			fprintf(stderr, _("%s: could not read copy data: %s\n"),
+					progname, PQerrorMessage(conn));
+			return false;
+		}
+		if (r < STREAMING_HEADER_SIZE + 1)
+		{
+			fprintf(stderr, _("%s: streaming header too small: %i\n"),
+					progname, r);
+			return false;
+		}
+		if (copybuf[0] != 'w')
+		{
+			fprintf(stderr, _("%s: streaming header corrupt: \"%c\"\n"),
+					progname, copybuf[0]);
+			return false;
+		}
+
+		/* Extract WAL location for this block */
+		memcpy(&blockstart, copybuf + 1, 8);
+
+		xlogoff = blockstart.xrecoff % XLOG_SEG_SIZE;
+
+		if (walfile == -1)
+		{
+			/* No file open yet */
+			if (xlogoff != 0)
+			{
+				fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+						progname, xlogoff);
+				return false;
+			}
+			walfile = open_walfile(blockstart, timeline,
+								   basedir, current_walfile_name);
+			if (walfile == -1)
+				return false;
+		}
+		else
+		{
+			/* More data in existing segment */
+			/* XXX: store seek value don't reseek all the time */
+			if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+			{
+				fprintf(stderr, _("%s: got WAL data offset %i, expected %i\n"),
+						progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+				return false;
+			}
+			/* Position matches, write happens lower down */
+		}
+
+		/* We have a file open in the correct position */
+		if (write(walfile, copybuf + STREAMING_HEADER_SIZE,
+				  r - STREAMING_HEADER_SIZE) != r - STREAMING_HEADER_SIZE)
+		{
+			fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+					progname,
+					r - STREAMING_HEADER_SIZE,
+					current_walfile_name,
+					strerror(errno));
+			return false;
+		}
+
+		/* XXX: callback after each write */
+
+		/* Check if we are at the end of a segment */
+		if (lseek(walfile, 0, SEEK_CUR) == XLOG_SEG_SIZE)
+		{
+			/* Offset zero in new file, close and sync the old one */
+			fsync(walfile);
+			close(walfile);
+			walfile = -1;
+
+			if (segment_finish != NULL)
+			{
+				/*
+				 * Callback when the segment finished, and return if it told
+				 * us to.
+				 *
+				 * A block in the wal stream can never cross a segment
+				 * boundary, so we can safely just add the current block size
+				 * to the offset, so the xlog pointer points to what we have
+				 * actually written.
+				 */
+				blockstart.xrecoff += r - STREAMING_HEADER_SIZE;
+				if (segment_finish(blockstart, timeline))
+					return true;
+			}
+		}
+	}
+
+	/*
+	 * The only way to get out of the loop is if the server shut down the
+	 * replication stream. If it's a controlled shutdown, the server will send
+	 * a shutdown message, and we'll return the latest xlog location that has
+	 * been streamed.
+	 */
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+				progname, PQresultErrorMessage(res));
+		return false;
+	}
+	PQclear(res);
+	return true;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
new file mode 100644
index 0000000..ae34dd6
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -0,0 +1,13 @@
+#include "access/xlogdefs.h"
+
+/*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+bool ReceiveXlogStream(PGconn *conn,
+					   XLogRecPtr startpos,
+					   uint32 timeline,
+					   char *basedir,
+					   segment_finish_callback segment_finish);
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
new file mode 100644
index 0000000..9f5c36f
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -0,0 +1,160 @@
+/*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "streamutil.h"
+
+const char *progname;
+char	   *dbhost = NULL;
+char	   *dbuser = NULL;
+char	   *dbport = NULL;
+int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
+static char *dbpassword = NULL;
+PGconn	   *conn = NULL;
+
+/*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+char *
+xstrdup(const char *s)
+{
+	char	   *result;
+
+	result = strdup(s);
+	if (!result)
+	{
+		fprintf(stderr, _("%s: out of memory\n"), progname);
+		exit(1);
+	}
+	return result;
+}
+
+void *
+xmalloc0(int size)
+{
+	void	   *result;
+
+	result = malloc(size);
+	if (!result)
+	{
+		fprintf(stderr, _("%s: out of memory\n"), progname);
+		exit(1);
+	}
+	MemSet(result, 0, size);
+	return result;
+}
+
+
+PGconn *
+GetConnection(void)
+{
+	PGconn	   *tmpconn;
+	int			argcount = 4;	/* dbname, replication, fallback_app_name,
+								 * password */
+	int			i;
+	const char **keywords;
+	const char **values;
+	char	   *password = NULL;
+
+	if (dbhost)
+		argcount++;
+	if (dbuser)
+		argcount++;
+	if (dbport)
+		argcount++;
+
+	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+	values = xmalloc0((argcount + 1) * sizeof(*values));
+
+	keywords[0] = "dbname";
+	values[0] = "replication";
+	keywords[1] = "replication";
+	values[1] = "true";
+	keywords[2] = "fallback_application_name";
+	values[2] = progname;
+	i = 3;
+	if (dbhost)
+	{
+		keywords[i] = "host";
+		values[i] = dbhost;
+		i++;
+	}
+	if (dbuser)
+	{
+		keywords[i] = "user";
+		values[i] = dbuser;
+		i++;
+	}
+	if (dbport)
+	{
+		keywords[i] = "port";
+		values[i] = dbport;
+		i++;
+	}
+
+	while (true)
+	{
+		if (password)
+			free(password);
+
+		if (dbpassword)
+		{
+			/*
+			 * We've saved a password when a previous connection succeeded,
+			 * meaning this is the call for a second session to the same
+			 * database, so just forcibly reuse that password.
+			 */
+			keywords[argcount - 1] = "password";
+			values[argcount - 1] = dbpassword;
+			dbgetpassword = -1; /* Don't try again if this fails */
+		}
+		else if (dbgetpassword == 1)
+		{
+			password = simple_prompt(_("Password: "), 100, false);
+			keywords[argcount - 1] = "password";
+			values[argcount - 1] = password;
+		}
+
+		tmpconn = PQconnectdbParams(keywords, values, true);
+
+		if (PQstatus(tmpconn) == CONNECTION_BAD &&
+			PQconnectionNeedsPassword(tmpconn) &&
+			dbgetpassword != -1)
+		{
+			dbgetpassword = 1;	/* ask for password next time */
+			PQfinish(tmpconn);
+			continue;
+		}
+
+		if (PQstatus(tmpconn) != CONNECTION_OK)
+		{
+			fprintf(stderr, _("%s: could not connect to server: %s\n"),
+					progname, PQerrorMessage(tmpconn));
+			exit(1);
+		}
+
+		/* Connection ok! */
+		free(values);
+		free(keywords);
+
+		/* Store the password for next run */
+		if (password)
+			dbpassword = password;
+		return tmpconn;
+	}
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
new file mode 100644
index 0000000..cef529a
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -0,0 +1,23 @@
+#include "access/xlogdefs.h"
+#include "libpq-fe.h"
+
+extern const char *progname;
+extern char *dbhost;
+extern char *dbuser;
+extern char *dbport;
+extern int	dbgetpassword;
+
+/* Connection kept global so we can disconnect easily */
+extern PGconn *conn;
+
+#define disconnect_and_exit(code)				\
+	{											\
+	if (conn != NULL) PQfinish(conn);			\
+	exit(code);									\
+	}
+
+
+char	   *xstrdup(const char *s);
+void	   *xmalloc0(int size);
+
+PGconn	   *GetConnection(void);
#2Bruce Momjian
bruce@momjian.us
In reply to: Magnus Hagander (#1)
Re: pg_basebackup and wal streaming

Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Other than that, the only changes to pg_basebackup are the moving of a
couple of functions into streamutil.c to make them usable from both,
and the progress format output fix Fujii-san mentioned recently.

Should be complete except for Win32 support (needs thread/fork thing
for the background streaming feature. Shouldn't be too hard, and I
guess that falls on me anyway..) and the reference documentation.

And with no feedback to my question here
(http://archives.postgresql.org/pgsql-hackers/2011-02/msg00805.php), I
went with the "duplicate the macros that are needed to avoid loading
postgres.h" path.

Yes, I realize that this is far too late in the CF process really, but
I wanted to post it anyway... If it's too late to be acceptable it
should be possible to maintain this outside the main repository until
9.2, since it only changes frontend binaries. So I'm not actually
going to put it on the CF page unless someone else says that's a good
idea, to at least share the blame from Robert ;)

Well, if you are going to stand behind it, the CF is not a requirement
and you can apply it.

--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com

+ It's impossible for everything to be true. +

#3Magnus Hagander
magnus@hagander.net
In reply to: Bruce Momjian (#2)
Re: pg_basebackup and wal streaming

On Fri, Feb 18, 2011 at 20:33, Bruce Momjian <bruce@momjian.us> wrote:

Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Other than that, the only changes to pg_basebackup are the moving of a
couple of functions into streamutil.c to make them usable from both,
and the progress format output fix Fujii-san mentioned recently.

Should be complete except for Win32 support (needs thread/fork thing
for the  background streaming feature. Shouldn't be too hard, and I
guess that falls on me anyway..) and the reference documentation.

And with no feedback to my question here
(http://archives.postgresql.org/pgsql-hackers/2011-02/msg00805.php), I
went with the "duplicate the macros that are needed to avoid loading
postgres.h" path.

Yes, I realize that this is far too late in the CF process really, but
I wanted to post it anyway... If it's too late to be acceptable it
should be possible to maintain this outside the main repository until
9.2, since it only changes frontend binaries. So I'm not actually
going to put it on the CF page unless someone else says that's a good
idea, to at least share the blame from Robert ;)

Well, if you are going to stand behind it, the CF is not a requirement
and you can apply it.

I am, but I'm posting it here because I'd appreciate some review
before it goes in, to protect our users from my bugs :-)

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

#4Dimitri Fontaine
dimitri@2ndQuadrant.fr
In reply to: Magnus Hagander (#1)
Re: pg_basebackup and wal streaming

Hi,

Magnus Hagander <magnus@hagander.net> writes:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

And that's something I've been so interested in! It's only fair game
that I spend time reviewing after my insisting for having it :)

The programs (pg_basebackup and pg_receivexlog) both work as expected,
and show in the pg_stat_replication system view.

pg_basebackup -x option should be revised so that it's easier to make
the difference between streaming WAL while the base backup is ongoing
and fetching them at the end, taking the risk to corrupt the whole
backup as soon as wal_keep_segments is undersized.

-x, --xlog[=stream] include required WAL files in backup

It could be --xlog=stream|fetch or something that reads better.

The sent patch includes a binary called pg_receivexlog\(X\), but Magnus
told me on IRC that this is fixed already in his branch (a missing $ at
several places in the Makefile).

Now, on to the code itself.

I wonder if the segment_callback() routine would better be a while{}
loop rather than a recursive construct. Also, it looks like a lib
function but it's doing exit(1)…

Unfortunately I can't comment (or won't risk learning enough details
tonight to try to be smart here) on FindStreamingStart() implementation,
that seems crucial.

Other than that, the only changes to pg_basebackup are the moving of a
couple of functions into streamutil.c to make them usable from both,
and the progress format output fix Fujii-san mentioned recently.

Check.

Should be complete except for Win32 support (needs thread/fork thing
for the background streaming feature. Shouldn't be too hard, and I
guess that falls on me anyway..) and the reference documentation.

Yeah, StartLogStreamer() is still using fork() at the moment, I guess
you will have to change that prior to commit. Maybe you can reuse the
code found in src/bin/pg_dump/pg_backup_archiver.c, spawn_restore.

I can't wait to see the new streaming replication setup docs using that
integrated tool. Even if baring another step here, we still need to
rely on wal_keep_segments (for switching from the base backup to the
live standby), the critical window is so much reduced… and it's now
possible to prepare the standby using a single integrated command line.

Will the server refrain from recycling a WAL file when all receivers
sent_location are not known to be past the positions contained in it?
If that's the case, the documentation should talk about pg_receivexlog
as an alternative to archiving, relying on libpq. It that's not the
case, is there a good reason for that not being the case? (even if
that's not on this patch to fix that).

Regards,
--
Dimitri Fontaine
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support

#5Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Magnus Hagander (#1)
Re: pg_basebackup and wal streaming

On 18.02.2011 12:02, Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Looks reasonable at a quick glance.

+ /* Already know when to stop, compare to the position we got */

That sentence sounds broken.

+ * The background stream will use it's own database connection so we can

s/it's/its/

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#6Yeb Havinga
yebhavinga@gmail.com
In reply to: Dimitri Fontaine (#4)
Re: pg_basebackup and wal streaming

On 2011-02-20 21:37, Dimitri Fontaine wrote:

Hi,

Magnus Hagander<magnus@hagander.net> writes:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

I can't wait to see the new streaming replication setup docs using that
integrated tool.

I just did some initial playing around with this tool to start testing
the latest syncrep patch. I'm time boxed for today, but just wanted to
say: great tool.

mgrid@standby1:~/off/postgresql$ pg_basebackup -x -D /data -vP -h
192.168.73.34
xlog start point: 0/7000020
34537/18152 kb g(100%) 1/1 tablespaces
xlog end point: 0/7014740
pg_basebackup: base backup completed.
mgrid@standby1:~/off/postgresql$ pg_ctl -D /data -l logfile start
server starting
mgrid@standby1:~/off/postgresql$ psql postgres
psql (9.1devel)
Type "help" for help.

postgres=# \d
List of relations
Schema | Name | Type | Owner
--------+------+-------+-------
public | aap | table | mgrid
(1 row)

regards,
Yeb Havinga

#7Magnus Hagander
magnus@hagander.net
In reply to: Dimitri Fontaine (#4)
Re: pg_basebackup and wal streaming

On Sun, Feb 20, 2011 at 21:37, Dimitri Fontaine <dimitri@2ndquadrant.fr> wrote:

Hi,

Magnus Hagander <magnus@hagander.net> writes:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

And that's something I've been so interested in!  It's only fair game
that I spend time reviewing after my insisting for having it :)

The programs (pg_basebackup and pg_receivexlog) both work as expected,
and show in the pg_stat_replication system view.

pg_basebackup -x option should be revised so that it's easier to make
the difference between streaming WAL while the base backup is ongoing
and fetching them at the end, taking the risk to corrupt the whole
backup as soon as wal_keep_segments is undersized.

 -x, --xlog[=stream]       include required WAL files in backup

It could be --xlog=stream|fetch or something that reads better.

Yeha, that's probably true. I wanted to avoid making it mandatory, but
it's actually easier this way. Will change it to that.

Now, on to the code itself.

I wonder if the segment_callback() routine would better be a while{}
loop rather than a recursive construct.  Also, it looks like a lib
function but it's doing exit(1)…

Actually, it's even better to just reorder the checks in the other
order - that way we don't need a loop *or* a self-call.

Unfortunately I can't comment (or won't risk learning enough details
tonight to try to be smart here) on FindStreamingStart() implementation,
that seems crucial.

It is - so if you can find the time to, that would be great...

Will the server refrain from recycling a WAL file when all receivers
sent_location are not known to be past the positions contained in it?
If that's the case, the documentation should talk about pg_receivexlog
as an alternative to archiving, relying on libpq.  It that's not the
case, is there a good reason for that not being the case? (even if
that's not on this patch to fix that).

No, not at this point. It would be nice to have that option in the future...

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

#8Magnus Hagander
magnus@hagander.net
In reply to: Magnus Hagander (#7)
1 attachment(s)
Re: pg_basebackup and wal streaming

On Sat, Feb 26, 2011 at 16:28, Magnus Hagander <magnus@hagander.net> wrote:

On Sun, Feb 20, 2011 at 21:37, Dimitri Fontaine <dimitri@2ndquadrant.fr> wrote:

Hi,

Magnus Hagander <magnus@hagander.net> writes:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

And that's something I've been so interested in!  It's only fair game
that I spend time reviewing after my insisting for having it :)

The programs (pg_basebackup and pg_receivexlog) both work as expected,
and show in the pg_stat_replication system view.

pg_basebackup -x option should be revised so that it's easier to make
the difference between streaming WAL while the base backup is ongoing
and fetching them at the end, taking the risk to corrupt the whole
backup as soon as wal_keep_segments is undersized.

 -x, --xlog[=stream]       include required WAL files in backup

It could be --xlog=stream|fetch or something that reads better.

Yeha, that's probably true. I wanted to avoid making it mandatory, but
it's actually easier this way. Will change it to that.

Now, on to the code itself.

I wonder if the segment_callback() routine would better be a while{}
loop rather than a recursive construct.  Also, it looks like a lib
function but it's doing exit(1)…

Actually, it's even better to just reorder the checks in the other
order - that way we don't need a loop *or* a self-call.

Unfortunately I can't comment (or won't risk learning enough details
tonight to try to be smart here) on FindStreamingStart() implementation,
that seems crucial.

It is - so if you can find the time to, that would be great...

Attached is an updated version of the patch that includes these
changes, as well as Windows support and an initial cut at a ref page
for pg_receivexlog (needs some more detail still).

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

Attachments:

streaming_xlog.patchtext/x-patch; charset=US-ASCII; name=streaming_xlog.patchDownload
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index ac6ac5b..c91824a 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
 <!entity pgCtl              system "pg_ctl-ref.sgml">
 <!entity pgDump             system "pg_dump.sgml">
 <!entity pgDumpall          system "pg_dumpall.sgml">
+<!entity pgReceivexlog      system "pg_receivexlog.sgml">
 <!entity pgResetxlog        system "pg_resetxlog.sgml">
 <!entity pgRestore          system "pg_restore.sgml">
 <!entity postgres           system "postgres-ref.sgml">
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index bbca5f5..566f506 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -144,8 +144,8 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
-      <term><option>-x</option></term>
-      <term><option>--xlog</option></term>
+      <term><option>-x <replaceable class="parameter">method</replaceable></option></term>
+      <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
       <listitem>
        <para>
         Includes the required transaction log files (WAL files) in the
@@ -155,16 +155,43 @@ PostgreSQL documentation
         to consult the log archive, thus making this a completely standalone
         backup.
        </para>
-       <note>
-        <para>
-         The transaction log files are collected at the end of the backup.
-         Therefore, it is necessary for the
-         <xref linkend="guc-wal-keep-segments"> parameter to be set high
-         enough that the log is not removed before the end of the backup.
-         If the log has been rotated when it's time to transfer it, the
-         backup will fail and be unusable.
-        </para>
-       </note>
+       <para>
+        The following methods for collecting the transaction logs are
+        supported:
+
+        <variablelist>
+         <varlistentry>
+          <term><literal>f</literal></term>
+          <term><literal>fetch</literal></term>
+          <listitem>
+           <para>
+            The transaction log files are collected at the end of the backup.
+            Therefore, it is necessary for the
+            <xref linkend="guc-wal-keep-segments"> parameter to be set high
+             enough that the log is not removed before the end of the backup.
+             If the log has been rotated when it's time to transfer it, the
+             backup will fail and be unusable.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry>
+          <term><literal>s</literal></term>
+          <term><literal>stream</literal></term>
+          <listitem>
+           <para>
+            Stream the transaction log while the backup is created. This will
+            open a second connection to the server and start streaming the
+            transaction log in parallel while running the backup. Therefore,
+            it will use up two slots configured by the
+            <xref linkend="guc-max-wal-senders"> parameter. As long as the
+             client can keep up with transaction log received, using this mode
+             requires no extra transaction logs to be saved on the master.
+           </para>
+          </listitem>
+         </varlistentry>
+        </variablelist>
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
new file mode 100644
index 0000000..fc07ae1
--- /dev/null
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -0,0 +1,257 @@
+<!--
+doc/src/sgml/ref/pg_receivexlog.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="app-pgreceivexlog">
+ <refmeta>
+  <refentrytitle>pg_receivexlog</refentrytitle>
+  <manvolnum>1</manvolnum>
+  <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>pg_receivexlog</refname>
+  <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
+ </refnamediv>
+
+ <indexterm zone="app-pgreceivexlog">
+  <primary>pg_receivexlog</primary>
+ </indexterm>
+
+ <refsynopsisdiv>
+  <cmdsynopsis>
+   <command>pg_receivexlog</command>
+   <arg rep="repeat"><replaceable>option</></arg>
+  </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>
+   Description
+  </title>
+  <para>
+   <application>pg_receivexlog</application> is used to stream transaction log
+   from a running <productname>PostgreSQL</productname> cluster. The transaction
+   log is streamed using the streaming replication protocol, and is written
+   to a local directory of files. This directory can be used as the archive
+   location for doing a restore using point-in-time recovery (see
+   <xref linkend="continuous-archiving">).
+  </para>
+
+  <para>
+   <application>pg_receivexlog</application> streams the transaction
+   log in real time as it's being generated on the server, and does not wait
+   for segments to complete like <xref linkend="guc-archive-command"> does.
+   For this reason, it is not necessary to set
+   <xref linkend="guc-archive-timeout"> when using
+    <application>pg_receivexlog</application>.
+  </para>
+
+  <para>
+   The transaction log is streamed over a regular
+   <productname>PostgreSQL</productname> connection, and uses the
+   replication protocol. The connection must be
+   made with a user having <literal>REPLICATION</literal> permissions (see
+   <xref linkend="role-attributes">), and the user must be granted explicit
+   permissions in <filename>pg_hba.conf</filename>. The server must also
+   be configured with <xref linkend="guc-max-wal-senders"> set high enough
+   to leave at least one session available for the stream.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Options</title>
+
+   <para>
+    The following command-line options control the location and format of the
+    output.
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+      <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
+      <listitem>
+       <para>
+        Directory to write the output to.
+       </para>
+       <para>
+        This parameter is required.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+   <para>
+    The following command-line options control the running of the program.
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-v</option></term>
+      <term><option>--verbose</option></term>
+      <listitem>
+       <para>
+        Enables verbose mode.
+       </para>
+      </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+   <para>
+    The following command-line options control the database connection parameters.
+
+    <variablelist>
+     <varlistentry>
+      <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
+      <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the host name of the machine on which the server is
+        running.  If the value begins with a slash, it is used as the
+        directory for the Unix domain socket. The default is taken
+        from the <envar>PGHOST</envar> environment variable, if set,
+        else a Unix domain socket connection is attempted.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+      <term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
+      <listitem>
+       <para>
+        Specifies the TCP port or local Unix domain socket file
+        extension on which the server is listening for connections.
+        Defaults to the <envar>PGPORT</envar> environment variable, if
+        set, or a compiled-in default.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-U <replaceable>username</replaceable></option></term>
+      <term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
+      <listitem>
+       <para>
+        User name to connect as.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-w</></term>
+      <term><option>--no-password</></term>
+      <listitem>
+       <para>
+        Never issue a password prompt.  If the server requires
+        password authentication and a password is not available by
+        other means such as a <filename>.pgpass</filename> file, the
+        connection attempt will fail.  This option can be useful in
+        batch jobs and scripts where no user is present to enter a
+        password.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
+      <term><option>-W</option></term>
+      <term><option>--password</option></term>
+      <listitem>
+       <para>
+        Force <application>pg_basebackup</application> to prompt for a
+        password before connecting to a database.
+       </para>
+
+       <para>
+        This option is never essential, since
+        <application>pg_bsaebackup</application> will automatically prompt
+        for a password if the server demands password authentication.
+        However, <application>pg_basebackup</application> will waste a
+        connection attempt finding out that the server wants a password.
+        In some cases it is worth typing <option>-W</> to avoid the extra
+        connection attempt.
+       </para>
+      </listitem>
+     </varlistentry>
+    </variablelist>
+   </para>
+
+   <para>
+    Other, less commonly used, parameters are also available:
+
+    <variablelist>
+     <varlistentry>
+       <term><option>-V</></term>
+       <term><option>--version</></term>
+       <listitem>
+       <para>
+       Print the <application>pg_receivexlog</application> version and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
+       <term><option>-?</></term>
+       <term><option>--help</></term>
+       <listitem>
+       <para>
+       Show help about <application>pg_receivexlog</application> command line
+       arguments, and exit.
+       </para>
+       </listitem>
+     </varlistentry>
+
+    </variablelist>
+   </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Environment</title>
+
+  <para>
+   This utility, like most other <productname>PostgreSQL</> utilities,
+   uses the environment variables supported by <application>libpq</>
+   (see <xref linkend="libpq-envars">).
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   When using <application>pg_receivexlog</application> instead of
+   <xref linkend="guc-archive-command">, the server will continue to
+    recycle transaction log files even if the backups are not properly
+    archived, since there is no command that fails. This can be worked
+    around by having an <xref linkend="guc-archive-command"> that fails
+    when the file has not been properly archived yet.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   To stream the transaction log from the server at
+   <literal>mydbserver</literal> and store it in the local directory
+   <filename>/usr/local/pgsql/archive</filename>:
+   <screen>
+    <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /usr/local/pgsql/archive</userinput>
+   </screen>
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>See Also</title>
+
+  <simplelist type="inline">
+   <member><xref linkend="APP-PGBASEBACKUP"></member>
+  </simplelist>
+ </refsect1>
+
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index 9ae8000..91d9820 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -212,6 +212,7 @@
    &pgConfig;
    &pgDump;
    &pgDumpall;
+   &pgReceivexlog;
    &pgRestore;
    &psqlRef;
    &reindexdb;
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index ccb1502..38c9b74 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
-OBJS=	pg_basebackup.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup
+all: pg_basebackup pg_receivexlog
 
-pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
-	$(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog(X)'
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
 
 uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
 
 clean distclean maintainer-clean:
-	rm -f pg_basebackup$(X) $(OBJS)
+	rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 61aa1d3..e56bbaa 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -17,6 +17,8 @@
 #include <unistd.h>
 #include <dirent.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
 
 #ifdef HAVE_LIBZ
 #include <zlib.h>
@@ -24,9 +26,11 @@
 
 #include "getopt_long.h"
 
+#include "receivelog.h"
+#include "streamutil.h"
+
 
 /* Global options */
-static const char *progname;
 char	   *basedir = NULL;
 char		format = 'p';		/* p(lain)/t(ar) */
 char	   *label = "pg_basebackup base backup";
@@ -34,38 +38,37 @@ bool		showprogress = false;
 int			verbose = 0;
 int			compresslevel = 0;
 bool		includewal = false;
+bool		streamwal = false;
 bool		fastcheckpoint = false;
-char	   *dbhost = NULL;
-char	   *dbuser = NULL;
-char	   *dbport = NULL;
-int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
 
 /* Progress counters */
 static uint64 totalsize;
 static uint64 totaldone;
 static int	tablespacecount;
 
-/* Connection kept global so we can disconnect easily */
-static PGconn *conn = NULL;
+/* Pipe to communicate with background wal receiver process */
+#ifndef WIN32
+static int	bgpipe[2] = {-1, -1};
+#endif
 
-#define disconnect_and_exit(code)				\
-	{											\
-	if (conn != NULL) PQfinish(conn);			\
-	exit(code);									\
-	}
+/* Handle to child process */
+static pid_t bgchild = -1;
+
+/* End position for xlog streaming, empty string if unknown yet */
+static XLogRecPtr xlogendptr;
+static int	has_xlogendptr = 0;
 
 /* Function headers */
-static char *xstrdup(const char *s);
-static void *xmalloc0(int size);
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, char *fn);
-static PGconn *GetConnection(void);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup();
 
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
 #ifdef HAVE_LIBZ
 static const char *
 get_gz_error(gzFile *gzf)
@@ -81,39 +84,6 @@ get_gz_error(gzFile *gzf)
 }
 #endif
 
-/*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
-static char *
-xstrdup(const char *s)
-{
-	char	   *result;
-
-	result = strdup(s);
-	if (!result)
-	{
-		fprintf(stderr, _("%s: out of memory\n"), progname);
-		exit(1);
-	}
-	return result;
-}
-
-static void *
-xmalloc0(int size)
-{
-	void	   *result;
-
-	result = malloc(size);
-	if (!result)
-	{
-		fprintf(stderr, _("%s: out of memory\n"), progname);
-		exit(1);
-	}
-	MemSet(result, 0, size);
-	return result;
-}
-
 
 static void
 usage(void)
@@ -125,7 +95,7 @@ usage(void)
 	printf(_("\nOptions controlling the output:\n"));
 	printf(_("  -D, --pgdata=directory    receive base backup into directory\n"));
 	printf(_("  -F, --format=p|t          output format (plain, tar)\n"));
-	printf(_("  -x, --xlog                include required WAL files in backup\n"));
+	printf(_("  -x, --xlog=fetch|stream   include required WAL files in backup\n"));
 	printf(_("  -Z, --compress=0-9        compress tar output\n"));
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -c, --checkpoint=fast|spread\n"
@@ -146,6 +116,195 @@ usage(void)
 
 
 /*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received.
+ * On Unix, we check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ * On Windows, we are in a single process, so we can just check if it's
+ * time to stop.
+ */
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+	if (!has_xlogendptr)
+	{
+#ifndef WIN32
+		fd_set		fds;
+		struct timeval tv;
+		int			r;
+
+		/*
+		 * Don't have the end pointer yet - check our pipe to see if it has
+		 * been sent yet.
+		 */
+		FD_ZERO(&fds);
+		FD_SET(bgpipe[0], &fds);
+
+		MemSet(&tv, 0, sizeof(tv));
+
+		r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+		if (r == 1)
+		{
+			char		xlogend[64];
+
+			MemSet(xlogend, 0, sizeof(xlogend));
+			r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+			if (r < 0)
+			{
+				fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+						progname, strerror(errno));
+				exit(1);
+			}
+
+			if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+			{
+				fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+						progname, xlogend);
+				exit(1);
+			}
+			has_xlogendptr = 1;
+
+			/*
+			 * Fall through to check if we've reached the point further
+			 * already.
+			 */
+		}
+		else
+		{
+			/*
+			 * No data received on the pipe means we don't know the end
+			 * position yet - so just say it's not time to stop yet.
+			 */
+			return false;
+		}
+#else
+
+		/*
+		 * On win32, has_xlogendptr is set by the main thread, so if it's not
+		 * set here, we just go back and wait until it shows up.
+		 */
+		return false;
+#endif
+	}
+
+	/*
+	 * At this point we have an end pointer, so compare it to the current
+	 * position to figure out if it's time to stop.
+	 */
+	if (segendpos.xlogid > xlogendptr.xlogid ||
+		(segendpos.xlogid == xlogendptr.xlogid &&
+		 segendpos.xrecoff >= xlogendptr.xrecoff))
+		return true;
+
+	/*
+	 * Have end pointer, but haven't reached it yet - so tell the caller to
+	 * keep streaming.
+	 */
+	return false;
+}
+
+typedef struct
+{
+	PGconn	   *bgconn;
+	XLogRecPtr	startptr;
+	char		xlogdir[MAXPGPATH];
+	int			timeline;
+}	logstreamer_param;
+
+static int
+LogStreamerMain(logstreamer_param * param)
+{
+	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->xlogdir, segment_callback))
+
+		/*
+		 * Any errors will already have been reported in the function process,
+		 * but we need to tell the parent that we didn't shutdown in a nice
+		 * way.
+		 */
+		return 1;
+
+	PQfinish(param->bgconn);
+	return 0;
+}
+
+/*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use its own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+static void
+StartLogStreamer(char *startpos, uint32 timeline)
+{
+	logstreamer_param *param;
+
+	param = xmalloc0(sizeof(logstreamer_param));
+	param->timeline = timeline;
+
+	/* Convert the starting position */
+	if (sscanf(startpos, "%X/%X", &param->startptr.xlogid, &param->startptr.xrecoff) != 2)
+	{
+		fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+				progname, startpos);
+		disconnect_and_exit(1);
+	}
+	/* Round off to even segment position */
+	param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+
+#ifndef WIN32
+	/* Create our background pipe */
+	if (pgpipe(bgpipe) < 0)
+	{
+		fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+#endif
+
+	/* Get a second connection */
+	param->bgconn = GetConnection();
+
+	/*
+	 * Always in plain format, so we can write to basedir/pg_xlog. But the
+	 * directory entry in the tar file may arrive later, so make sure it's
+	 * created before we start.
+	 */
+	snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+	verify_dir_is_empty_or_create(param->xlogdir);
+
+	/*
+	 * Start a child process and tell it to start streaming. On Unix, this is
+	 * a fork(). On Windows, we create a thread.
+	 */
+#ifndef WIN32
+	bgchild = fork();
+	if (bgchild == 0)
+	{
+		/* in child process */
+		exit(LogStreamerMain(param));
+	}
+	else if (bgchild < 0)
+	{
+		fprintf(stderr, _("%s: could not create background process: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	/*
+	 * Else we are in the parent process and all is well.
+	 */
+#else							/* WIN32 */
+	bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+	if (bgchild == 0)
+	{
+		fprintf(stderr, _("%s: could not create background thread: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+#endif
+}
+
+/*
  * Verify that the given directory exists and is empty. If it does not
  * exist, it is created. If it exists but is not empty, an error will
  * be give and the process ended.
@@ -202,13 +361,19 @@ verify_dir_is_empty_or_create(char *dirname)
 static void
 progress_report(int tablespacenum, char *fn)
 {
-	int percent = (int) ((totaldone / 1024) * 100 / totalsize);
+	int			percent = (int) ((totaldone / 1024) * 100 / totalsize);
+
 	if (percent > 100)
 		percent = 100;
 
-	if (verbose)
+	if (!fn)
+		fprintf(stderr,
+		INT64_FORMAT "/" INT64_FORMAT " kb g(100%%) %i/%i tablespaces %35s\r",
+				totaldone / 1024, totalsize,
+				tablespacenum, tablespacecount, "");
+	else if (verbose)
 		fprintf(stderr,
-				INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30s)\r",
+				INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30.30s)\r",
 				totaldone / 1024, totalsize,
 				percent,
 				tablespacenum, tablespacecount, fn);
@@ -443,11 +608,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 		strcpy(current_path, PQgetvalue(res, rownum, 1));
 
 	/*
-	 * Make sure we're unpacking into an empty directory
-	 */
-	verify_dir_is_empty_or_create(current_path);
-
-	/*
 	 * Get the COPY data
 	 */
 	res = PQgetResult(conn);
@@ -540,10 +700,18 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					fn[strlen(fn) - 1] = '\0';	/* Remove trailing slash */
 					if (mkdir(fn, S_IRWXU) != 0)
 					{
-						fprintf(stderr,
-							_("%s: could not create directory \"%s\": %s\n"),
-								progname, fn, strerror(errno));
-						disconnect_and_exit(1);
+						/*
+						 * When streaming WAL, pg_xlog will have been created
+						 * by the wal receiver process, so just ignore failure
+						 * on that.
+						 */
+						if (!streamwal || strcmp(fn + strlen(fn) - 8, "/pg_xlog") != 0)
+						{
+							fprintf(stderr,
+									_("%s: could not create directory \"%s\": %s\n"),
+									progname, fn, strerror(errno));
+							disconnect_and_exit(1);
+						}
 					}
 #ifndef WIN32
 					if (chmod(fn, (mode_t) filemode))
@@ -654,90 +822,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 }
 
 
-static PGconn *
-GetConnection(void)
-{
-	PGconn	   *tmpconn;
-	int			argcount = 4;	/* dbname, replication, fallback_app_name,
-								 * password */
-	int			i;
-	const char **keywords;
-	const char **values;
-	char	   *password = NULL;
-
-	if (dbhost)
-		argcount++;
-	if (dbuser)
-		argcount++;
-	if (dbport)
-		argcount++;
-
-	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
-	values = xmalloc0((argcount + 1) * sizeof(*values));
-
-	keywords[0] = "dbname";
-	values[0] = "replication";
-	keywords[1] = "replication";
-	values[1] = "true";
-	keywords[2] = "fallback_application_name";
-	values[2] = progname;
-	i = 3;
-	if (dbhost)
-	{
-		keywords[i] = "host";
-		values[i] = dbhost;
-		i++;
-	}
-	if (dbuser)
-	{
-		keywords[i] = "user";
-		values[i] = dbuser;
-		i++;
-	}
-	if (dbport)
-	{
-		keywords[i] = "port";
-		values[i] = dbport;
-		i++;
-	}
-
-	while (true)
-	{
-		if (dbgetpassword == 1)
-		{
-			/* Prompt for a password */
-			password = simple_prompt(_("Password: "), 100, false);
-			keywords[argcount - 1] = "password";
-			values[argcount - 1] = password;
-		}
-
-		tmpconn = PQconnectdbParams(keywords, values, true);
-		if (password)
-			free(password);
-
-		if (PQstatus(tmpconn) == CONNECTION_BAD &&
-			PQconnectionNeedsPassword(tmpconn) &&
-			dbgetpassword != -1)
-		{
-			dbgetpassword = 1;	/* ask for password next time */
-			PQfinish(tmpconn);
-			continue;
-		}
-
-		if (PQstatus(tmpconn) != CONNECTION_OK)
-		{
-			fprintf(stderr, _("%s: could not connect to server: %s\n"),
-					progname, PQerrorMessage(tmpconn));
-			exit(1);
-		}
-
-		/* Connection ok! */
-		free(values);
-		free(keywords);
-		return tmpconn;
-	}
-}
-
 static void
 BaseBackup()
 {
@@ -780,7 +864,7 @@ BaseBackup()
 	snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
 			 escaped_label,
 			 showprogress ? "PROGRESS" : "",
-			 includewal ? "WAL" : "",
+			 includewal && !streamwal ? "WAL" : "",
 			 fastcheckpoint ? "FAST" : "",
 	         includewal ? "NOWAIT" : "");
 
@@ -859,6 +943,18 @@ BaseBackup()
 	}
 
 	/*
+	 * If we're streaming WAL, start the streaming session before we start
+	 * receiving the actual data chunks.
+	 */
+	if (streamwal)
+	{
+		if (verbose)
+			fprintf(stderr, _("%s: starting background WAL receiver\n"),
+					progname);
+		StartLogStreamer(xlogstart, timeline);
+	}
+
+	/*
 	 * Start receiving chunks
 	 */
 	for (i = 0; i < PQntuples(res); i++)
@@ -871,7 +967,7 @@ BaseBackup()
 
 	if (showprogress)
 	{
-		progress_report(PQntuples(res), "");
+		progress_report(PQntuples(res), NULL);
 		fprintf(stderr, "\n");	/* Need to move to next line */
 	}
 	PQclear(res);
@@ -905,6 +1001,89 @@ BaseBackup()
 		disconnect_and_exit(1);
 	}
 
+	if (bgchild > 0)
+	{
+		int			status;
+		int			r;
+
+		if (verbose)
+			fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+
+#ifndef WIN32
+		if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+		{
+			fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+
+		/* Just wait for the background process to exit */
+		r = waitpid(bgchild, &status, 0);
+		if (r == -1)
+		{
+			fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+		if (r != bgchild)
+		{
+			fprintf(stderr, _("%s: child %i died, expected %i\n"),
+					progname, r, bgchild);
+			disconnect_and_exit(1);
+		}
+		if (!WIFEXITED(status))
+		{
+			fprintf(stderr, _("%s: child process did not exit normally\n"),
+					progname);
+			disconnect_and_exit(1);
+		}
+		if (WEXITSTATUS(status) != 0)
+		{
+			fprintf(stderr, _("%s: child process exited with error %i\n"),
+					progname, WEXITSTATUS(status));
+			disconnect_and_exit(1);
+		}
+		/* Exited normally, we're happy! */
+#else							/* WIN32 */
+
+		/*
+		 * On Windows, since we are in the same thread, we can just store the
+		 * value directly in the variable, and then set the flag that says
+		 * it's there.
+		 */
+		if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+		{
+			fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+					progname, xlogend);
+			exit(1);
+		}
+		InterlockedIncrement(&has_xlogendptr);
+
+		/* First wait for the thread to exit */
+		if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+		{
+			_dosmaperr(GetLastError());
+			fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+		if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+		{
+			_dosmaperr(GetLastError());
+			fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+		if (status != 0)
+		{
+			fprintf(stderr, _("%s: child thread exited with error %u\n"),
+					progname, status);
+			disconnect_and_exit(1);
+		}
+		/* Exited normally, we're happy */
+#endif
+	}
+
 	/*
 	 * End of copy data. Final result is already checked inside the loop.
 	 */
@@ -924,7 +1103,7 @@ main(int argc, char **argv)
 		{"pgdata", required_argument, NULL, 'D'},
 		{"format", required_argument, NULL, 'F'},
 		{"checkpoint", required_argument, NULL, 'c'},
-		{"xlog", no_argument, NULL, 'x'},
+		{"xlog", required_argument, NULL, 'x'},
 		{"compress", required_argument, NULL, 'Z'},
 		{"label", required_argument, NULL, 'l'},
 		{"host", required_argument, NULL, 'h'},
@@ -958,7 +1137,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:F:l:Z:c:h:p:U:xwWvP",
+	while ((c = getopt_long(argc, argv, "D:F:l:Z:c:h:p:U:x:wWvP",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -980,6 +1159,18 @@ main(int argc, char **argv)
 				break;
 			case 'x':
 				includewal = true;
+				if (strcmp(optarg, "f") == 0 ||
+					strcmp(optarg, "fetch") == 0)
+					streamwal = false;
+				else if (strcmp(optarg, "s") == 0 ||
+						 strcmp(optarg, "stream") == 0)
+					streamwal = true;
+				else
+				{
+					fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty or \"stream\"\n"),
+							progname, optarg);
+					exit(1);
+				}
 				break;
 			case 'l':
 				label = xstrdup(optarg);
@@ -1080,6 +1271,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (format != 'p' && streamwal)
+	{
+		fprintf(stderr,
+				_("%s: wal streaming can only be used in plain mode\n"),
+				progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel > 0)
 	{
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
new file mode 100644
index 0000000..41b5bb7
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -0,0 +1,407 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ *					  to a local file.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+
+#include "postgres_fe.h"
+#include "libpq-fe.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "getopt_long.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+
+/* Global options */
+char	   *basedir = NULL;
+int			verbose = 0;
+
+
+static void usage(void);
+static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+static void StreamLog();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+/*
+ * XXX: from xlog_internal.h
+ */
+#define XLogSegsPerFile (((uint32) 0xffffffff) / XLOG_SEG_SIZE)
+#define PrevLogSeg(logId, logSeg)       \
+        do { \
+                if (logSeg) \
+                        (logSeg)--; \
+                else \
+                { \
+                        (logId)--; \
+                        (logSeg) = XLogSegsPerFile-1; \
+                } \
+        } while (0)
+
+
+static void
+usage(void)
+{
+	printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions controlling the output:\n"));
+	printf(_("  -D, --dir=directory       receive xlog files into this directory\n"));
+	printf(_("\nGeneral options:\n"));
+	printf(_("  -v, --verbose             output verbose messages\n"));
+	printf(_("  -?, --help                show this help, then exit\n"));
+	printf(_("  -V, --version             output version information, then exit\n"));
+	printf(_("\nConnection options:\n"));
+	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
+	printf(_("  -p, --port=PORT          database server port number\n"));
+	printf(_("  -U, --username=NAME      connect as specified database user\n"));
+	printf(_("  -w, --no-password        never prompt for password\n"));
+	printf(_("  -W, --password           force password prompt (should happen automatically)\n"));
+	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+	char		fn[MAXPGPATH];
+	struct stat statbuf;
+
+	if (verbose)
+		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+	/*
+	 * Check if there is a partial file for the name we just finished, and if
+	 * there is, remove it under the assumption that we have now got all the
+	 * data we need.
+	 */
+	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+			 basedir, timeline,
+			 segendpos.xlogid,
+			 segendpos.xrecoff / XLOG_SEG_SIZE);
+	if (stat(fn, &statbuf) == 0)
+	{
+		/* File existed, get rid of it */
+		if (verbose)
+			fprintf(stderr, _("%s: removing file \"%s\"\n"),
+					progname, fn);
+		unlink(fn);
+	}
+
+	/* Never abort */
+	return false;
+}
+
+/*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ *	  we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ *	  WAL segment.
+ */
+static XLogRecPtr
+FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+{
+	DIR		   *dir;
+	struct dirent *dirent;
+	int			i;
+	bool		b;
+	XLogRecPtr	high = {0, 0};
+
+	dir = opendir(basedir);
+	if (dir == NULL)
+	{
+		fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+				progname, basedir, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	while ((dirent = readdir(dir)) != NULL)
+	{
+		char		fullpath[MAXPGPATH];
+		struct stat statbuf;
+		uint32		tli,
+					log,
+					seg;
+
+		if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+			continue;
+
+		/* xlog files are always 24 characters */
+		if (strlen(dirent->d_name) != 24)
+			continue;
+
+		/* Filenames are always made out of 0-9 and A-F */
+		b = false;
+		for (i = 0; i < 24; i++)
+		{
+			if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+				!(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+			{
+				b = true;
+				break;
+			}
+		}
+		if (b)
+			continue;
+
+		/*
+		 * Looks like an xlog file. Parse it's position.
+		 */
+		if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+		{
+			fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+					progname, dirent->d_name);
+			disconnect_and_exit(1);
+		}
+		log *= XLOG_SEG_SIZE;
+
+		/* Ignore any files that are for another timeline */
+		if (tli != currenttimeline)
+			continue;
+
+		/* Check if this is a completed segment or not */
+		snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+		if (stat(fullpath, &statbuf) != 0)
+		{
+			fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+					progname, fullpath, strerror(errno));
+			disconnect_and_exit(1);
+		}
+
+		if (statbuf.st_size == 16 * 1024 * 1024)
+		{
+			/* Completed segment */
+			if (log > high.xlogid ||
+				(log == high.xlogid && seg > high.xrecoff))
+			{
+				high.xlogid = log;
+				high.xrecoff = seg;
+				continue;
+			}
+		}
+		else
+		{
+			/*
+			 * This is a partial file. Rename it out of the way.
+			 */
+			char		newfn[MAXPGPATH];
+
+			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+					progname, dirent->d_name, dirent->d_name);
+
+			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+					 basedir, dirent->d_name);
+
+			if (stat(newfn, &statbuf) == 0)
+			{
+				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+						progname, newfn);
+				disconnect_and_exit(1);
+			}
+			if (rename(fullpath, newfn) != 0)
+			{
+				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+						progname, fullpath, newfn, strerror(errno));
+				disconnect_and_exit(1);
+			}
+
+			/* Don't continue looking for more, we assume this is the last */
+			break;
+		}
+	}
+
+	closedir(dir);
+
+	if (high.xlogid > 0 && high.xrecoff > 0)
+		return high;
+
+	return currentpos;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+	PGresult   *res;
+	uint32		timeline;
+	XLogRecPtr	startpos;
+
+	/*
+	 * Connect in replication mode to the server
+	 */
+	conn = GetConnection();
+
+	/*
+	 * Run IDENFITY_SYSTEM so we can get the timeline and current xlog
+	 * position.
+	 */
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not identify system: %s\n"),
+				progname, PQerrorMessage(conn));
+		disconnect_and_exit(1);
+	}
+	if (PQntuples(res) != 1)
+	{
+		fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+				progname, PQntuples(res));
+		disconnect_and_exit(1);
+	}
+	timeline = atoi(PQgetvalue(res, 0, 1));
+	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+	{
+		fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+				progname, PQgetvalue(res, 0, 2));
+		disconnect_and_exit(1);
+	}
+	PQclear(res);
+
+	/*
+	 * Figure out where to start streaming.
+	 */
+	startpos = FindStreamingStart(startpos, timeline);
+
+	/*
+	 * Always start streaming at the beginning of a segment
+	 */
+	startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+	/*
+	 * Start the replication
+	 */
+	if (verbose)
+		fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+				progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+	ReceiveXlogStream(conn, startpos, timeline, basedir, segment_callback);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] = {
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"dir", required_argument, NULL, 'D'},
+		{"host", required_argument, NULL, 'h'},
+		{"port", required_argument, NULL, 'p'},
+		{"username", required_argument, NULL, 'U'},
+		{"no-password", no_argument, NULL, 'w'},
+		{"password", no_argument, NULL, 'W'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+	int			c;
+
+	int			option_index;
+
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	while ((c = getopt_long(argc, argv, "D:h:p:U:wWv",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				basedir = xstrdup(optarg);
+				break;
+			case 'h':
+				dbhost = xstrdup(optarg);
+				break;
+			case 'p':
+				if (atoi(optarg) <= 0)
+				{
+					fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				dbport = xstrdup(optarg);
+				break;
+			case 'U':
+				dbuser = xstrdup(optarg);
+				break;
+			case 'w':
+				dbgetpassword = -1;
+				break;
+			case 'W':
+				dbgetpassword = 1;
+				break;
+			case 'v':
+				verbose++;
+				break;
+			default:
+
+				/*
+				 * getopt_long already emitted a complaint
+				 */
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+						progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		fprintf(stderr,
+				_("%s: too many command-line arguments (first is \"%s\")\n"),
+				progname, argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (basedir == NULL)
+	{
+		fprintf(stderr, _("%s: no target directory specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	StreamLog();
+
+	exit(0);
+}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
new file mode 100644
index 0000000..3be9692
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -0,0 +1,207 @@
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ *				  replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "libpq-fe.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* XXX: from xlog_internal.h */
+#define MAXFNAMELEN		64
+#define XLogFileName(fname, tli, log, seg)	\
+	snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+	int			f;
+	char		fn[MAXPGPATH];
+
+	XLogFileName(namebuf, timeline, startpoint.xlogid,
+				 startpoint.xrecoff / XLOG_SEG_SIZE);
+
+	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+	f = open(fn, O_WRONLY | O_CREAT | O_EXCL, 0666);
+	if (f == -1)
+		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+				progname, namebuf, strerror(errno));
+	return f;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * Note: The log position *must* be at a log segment change, or we will
+ * end up streaming an incomplete file.
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish)
+{
+	char		query[128];
+	char		current_walfile_name[MAXPGPATH];
+	PGresult   *res;
+	char	   *copybuf = NULL;
+	int			walfile = -1;
+
+	/* Initiate the replication stream at specified location */
+	snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_COPY_BOTH)
+	{
+		fprintf(stderr, _("%s: could not start replication: %s\n"),
+				progname, PQresultErrorMessage(res));
+		return false;
+	}
+	PQclear(res);
+
+	/*
+	 * Receive the actual xlog data
+	 */
+	while (1)
+	{
+		XLogRecPtr	blockstart;
+		int			r;
+		int			xlogoff;
+
+		if (copybuf != NULL)
+		{
+			PQfreemem(copybuf);
+			copybuf = NULL;
+		}
+
+		r = PQgetCopyData(conn, &copybuf, 0);
+		if (r == -1)
+			/* End of copy stream */
+			break;
+		if (r == -2)
+		{
+			fprintf(stderr, _("%s: could not read copy data: %s\n"),
+					progname, PQerrorMessage(conn));
+			return false;
+		}
+		if (r < STREAMING_HEADER_SIZE + 1)
+		{
+			fprintf(stderr, _("%s: streaming header too small: %i\n"),
+					progname, r);
+			return false;
+		}
+		if (copybuf[0] != 'w')
+		{
+			fprintf(stderr, _("%s: streaming header corrupt: \"%c\"\n"),
+					progname, copybuf[0]);
+			return false;
+		}
+
+		/* Extract WAL location for this block */
+		memcpy(&blockstart, copybuf + 1, 8);
+
+		xlogoff = blockstart.xrecoff % XLOG_SEG_SIZE;
+
+		if (walfile == -1)
+		{
+			/* No file open yet */
+			if (xlogoff != 0)
+			{
+				fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+						progname, xlogoff);
+				return false;
+			}
+			walfile = open_walfile(blockstart, timeline,
+								   basedir, current_walfile_name);
+			if (walfile == -1)
+				return false;
+		}
+		else
+		{
+			/* More data in existing segment */
+			/* XXX: store seek value don't reseek all the time */
+			if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+			{
+				fprintf(stderr, _("%s: got WAL data offset %i, expected %i\n"),
+						progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+				return false;
+			}
+			/* Position matches, write happens lower down */
+		}
+
+		/* We have a file open in the correct position */
+		if (write(walfile, copybuf + STREAMING_HEADER_SIZE,
+				  r - STREAMING_HEADER_SIZE) != r - STREAMING_HEADER_SIZE)
+		{
+			fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+					progname,
+					r - STREAMING_HEADER_SIZE,
+					current_walfile_name,
+					strerror(errno));
+			return false;
+		}
+
+		/* XXX: callback after each write */
+
+		/* Check if we are at the end of a segment */
+		if (lseek(walfile, 0, SEEK_CUR) == XLOG_SEG_SIZE)
+		{
+			/* Offset zero in new file, close and sync the old one */
+			fsync(walfile);
+			close(walfile);
+			walfile = -1;
+
+			if (segment_finish != NULL)
+			{
+				/*
+				 * Callback when the segment finished, and return if it told
+				 * us to.
+				 *
+				 * A block in the wal stream can never cross a segment
+				 * boundary, so we can safely just add the current block size
+				 * to the offset, so the xlog pointer points to what we have
+				 * actually written.
+				 */
+				blockstart.xrecoff += r - STREAMING_HEADER_SIZE;
+				if (segment_finish(blockstart, timeline))
+					return true;
+			}
+		}
+	}
+
+	/*
+	 * The only way to get out of the loop is if the server shut down the
+	 * replication stream. If it's a controlled shutdown, the server will send
+	 * a shutdown message, and we'll return the latest xlog location that has
+	 * been streamed.
+	 */
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+				progname, PQresultErrorMessage(res));
+		return false;
+	}
+	PQclear(res);
+	return true;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
new file mode 100644
index 0000000..ae34dd6
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -0,0 +1,13 @@
+#include "access/xlogdefs.h"
+
+/*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+bool ReceiveXlogStream(PGconn *conn,
+					   XLogRecPtr startpos,
+					   uint32 timeline,
+					   char *basedir,
+					   segment_finish_callback segment_finish);
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
new file mode 100644
index 0000000..9f5c36f
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -0,0 +1,160 @@
+/*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "streamutil.h"
+
+const char *progname;
+char	   *dbhost = NULL;
+char	   *dbuser = NULL;
+char	   *dbport = NULL;
+int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
+static char *dbpassword = NULL;
+PGconn	   *conn = NULL;
+
+/*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+char *
+xstrdup(const char *s)
+{
+	char	   *result;
+
+	result = strdup(s);
+	if (!result)
+	{
+		fprintf(stderr, _("%s: out of memory\n"), progname);
+		exit(1);
+	}
+	return result;
+}
+
+void *
+xmalloc0(int size)
+{
+	void	   *result;
+
+	result = malloc(size);
+	if (!result)
+	{
+		fprintf(stderr, _("%s: out of memory\n"), progname);
+		exit(1);
+	}
+	MemSet(result, 0, size);
+	return result;
+}
+
+
+PGconn *
+GetConnection(void)
+{
+	PGconn	   *tmpconn;
+	int			argcount = 4;	/* dbname, replication, fallback_app_name,
+								 * password */
+	int			i;
+	const char **keywords;
+	const char **values;
+	char	   *password = NULL;
+
+	if (dbhost)
+		argcount++;
+	if (dbuser)
+		argcount++;
+	if (dbport)
+		argcount++;
+
+	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+	values = xmalloc0((argcount + 1) * sizeof(*values));
+
+	keywords[0] = "dbname";
+	values[0] = "replication";
+	keywords[1] = "replication";
+	values[1] = "true";
+	keywords[2] = "fallback_application_name";
+	values[2] = progname;
+	i = 3;
+	if (dbhost)
+	{
+		keywords[i] = "host";
+		values[i] = dbhost;
+		i++;
+	}
+	if (dbuser)
+	{
+		keywords[i] = "user";
+		values[i] = dbuser;
+		i++;
+	}
+	if (dbport)
+	{
+		keywords[i] = "port";
+		values[i] = dbport;
+		i++;
+	}
+
+	while (true)
+	{
+		if (password)
+			free(password);
+
+		if (dbpassword)
+		{
+			/*
+			 * We've saved a password when a previous connection succeeded,
+			 * meaning this is the call for a second session to the same
+			 * database, so just forcibly reuse that password.
+			 */
+			keywords[argcount - 1] = "password";
+			values[argcount - 1] = dbpassword;
+			dbgetpassword = -1; /* Don't try again if this fails */
+		}
+		else if (dbgetpassword == 1)
+		{
+			password = simple_prompt(_("Password: "), 100, false);
+			keywords[argcount - 1] = "password";
+			values[argcount - 1] = password;
+		}
+
+		tmpconn = PQconnectdbParams(keywords, values, true);
+
+		if (PQstatus(tmpconn) == CONNECTION_BAD &&
+			PQconnectionNeedsPassword(tmpconn) &&
+			dbgetpassword != -1)
+		{
+			dbgetpassword = 1;	/* ask for password next time */
+			PQfinish(tmpconn);
+			continue;
+		}
+
+		if (PQstatus(tmpconn) != CONNECTION_OK)
+		{
+			fprintf(stderr, _("%s: could not connect to server: %s\n"),
+					progname, PQerrorMessage(tmpconn));
+			exit(1);
+		}
+
+		/* Connection ok! */
+		free(values);
+		free(keywords);
+
+		/* Store the password for next run */
+		if (password)
+			dbpassword = password;
+		return tmpconn;
+	}
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
new file mode 100644
index 0000000..cef529a
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -0,0 +1,23 @@
+#include "access/xlogdefs.h"
+#include "libpq-fe.h"
+
+extern const char *progname;
+extern char *dbhost;
+extern char *dbuser;
+extern char *dbport;
+extern int	dbgetpassword;
+
+/* Connection kept global so we can disconnect easily */
+extern PGconn *conn;
+
+#define disconnect_and_exit(code)				\
+	{											\
+	if (conn != NULL) PQfinish(conn);			\
+	exit(code);									\
+	}
+
+
+char	   *xstrdup(const char *s);
+void	   *xmalloc0(int size);
+
+PGconn	   *GetConnection(void);
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 21c11df..1190ef4 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -277,6 +277,11 @@ sub mkvcbuild
     $initdb->AddLibrary('ws2_32.lib');
 
     my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+    $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+
+    my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+    $pgreceivexlog->{name} = 'pg_receivexlog';
+    $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
 
     my $pgconfig = AddSimpleFrontend('pg_config');
 
#9Yeb Havinga
yebhavinga@gmail.com
In reply to: Magnus Hagander (#8)
Re: pg_basebackup and wal streaming

On 2011-02-26 18:19, Magnus Hagander wrote:

Attached is an updated version of the patch that includes these
changes, as well as Windows support and an initial cut at a ref page
for pg_receivexlog (needs some more detail still).

I'm testing a bit more (with the previous version, sorry) and got the
following while doing a stream backup from a cluster that was at that
moment doing a pgbench run with 1 synchronous standby.

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 15/720000C8
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 14744, expected 16791960424 )
5148915/5148026 kb g(100%) 1/1 tablespaces
xlog end point: 15/80568878
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

The fetch variant worked ok.

mgrid@mg79:~$ pg_basebackup --xlog -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 15/A2000020
5482029/5153458 kb g(100%) 1/1 tablespaces
xlog end point: 15/B51D0230
pg_basebackup: base backup completed.

I'm in total monkey test mode here, so I don't even know if I'm not
supposed to do the streaming variant while other stuff is going on.

regards,
Yeb Havinga

#10Magnus Hagander
magnus@hagander.net
In reply to: Yeb Havinga (#9)
Re: pg_basebackup and wal streaming

On Sat, Feb 26, 2011 at 20:48, Yeb Havinga <yebhavinga@gmail.com> wrote:

On 2011-02-26 18:19, Magnus Hagander wrote:

Attached is an updated version of the patch that includes these
changes, as well as Windows support and an initial cut at a ref page
for pg_receivexlog (needs some more detail still).

I'm testing a bit more (with the previous version, sorry) and got the
following while doing a stream backup from a cluster that was at that moment
doing a pgbench run with 1 synchronous standby.

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 15/720000C8
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 14744, expected 16791960424        )
5148915/5148026 kb g(100%) 1/1 tablespaces
xlog end point: 15/80568878
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

Hmm, strange. What platform are you on?

I saw something similar *once* on Windows, but it then passed my tests
a lot of times in a row so I figured it was just a "didn't clean
properly" thing. Clearly there's a bug around.

What's the size of the latest WAL file that it did work on? Is it
16791960424 bytes? That's way way to large, but perhaps it's not
switching segment properly? (that value is supposedly the current
write position in the file..)

I'm in total monkey test mode here, so I don't even know if I'm not supposed
to do the streaming variant while other stuff is going on.

Oh yes, that's one of the main reasons to use it, so you should
definitely be able to do that!

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

#11Yeb Havinga
yebhavinga@gmail.com
In reply to: Magnus Hagander (#10)
Re: pg_basebackup and wal streaming

On 2011-02-26 20:59, Magnus Hagander wrote:

On Sat, Feb 26, 2011 at 20:48, Yeb Havinga<yebhavinga@gmail.com> wrote:

On 2011-02-26 18:19, Magnus Hagander wrote:

Attached is an updated version of the patch that includes these
changes, as well as Windows support and an initial cut at a ref page
for pg_receivexlog (needs some more detail still).

I'm testing a bit more (with the previous version, sorry) and got the
following while doing a stream backup from a cluster that was at that moment
doing a pgbench run with 1 synchronous standby.

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 15/720000C8
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 14744, expected 16791960424 )
5148915/5148026 kb g(100%) 1/1 tablespaces
xlog end point: 15/80568878
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

Hmm, strange. What platform are you on?

Both master and backup are on
Linux mg79 2.6.31-22-server #60-Ubuntu SMP Thu May 27 03:42:09 UTC 2010
x86_64 GNU/Linux

If I run a streaming backup with idle master server, things are ok.

I can repeat the error by doing a pgbench run on the master (with a
syncrep standby, don't know if that's of importance, other that there is
another walsender) and then running a streaming pg_basebackup.

I saw something similar *once* on Windows, but it then passed my tests
a lot of times in a row so I figured it was just a "didn't clean
properly" thing. Clearly there's a bug around.

What's the size of the latest WAL file that it did work on? Is it
16791960424 bytes? That's way way to large, but perhaps it's not
switching segment properly? (that value is supposedly the current
write position in the file..)

The files from that specific run are gone.

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 47/8E81F300
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 41432, expected 168186486424 )
4763109/4762428 kb g(100%) 1/1 tablespaces
xlog end point: 47/9D17FFE0
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

About the file sizes, every WAL file on the master is 16M big, though
the sum of the size of all WAL files on the master is close to the 16G
number. Maybe a coincidence..
/dev/sdc1 20970612 16798508 4172104 81% /xlog

New initdb, pgbench with smaller db, fresh /xlog:

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 0/8C6474E8
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 10488, expected 167877046397 )
1564067/1563386 kb g(100%) 1/1 tablespaces
xlog end point: 0/99007798
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

yes a coincidence..

/dev/sdc1 20G 2.6G 18G 13% /xlog

Another test with no sync standby server, only the master with a pgbench
running:

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 0/D5002120
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 23752, expected 168009686397 )
1572173/1570348 kb g(100%) 1/1 tablespaces
xlog end point: 0/EC456968
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

Stopping pgbench and starting a basebackup., then it finishes correctly
after a while (with in between something that looks like ~ 2 minutes
inactivity).

regards,
Yeb Havinga

#12Yeb Havinga
yebhavinga@gmail.com
In reply to: Magnus Hagander (#1)
Re: pg_basebackup and wal streaming

On 2011-02-18 11:02, Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Is it a welcome idea to add a -X argument to specify a seperate xlog
directory like initdb -X ?

regards,
Yeb Havinga

#13Magnus Hagander
magnus@hagander.net
In reply to: Yeb Havinga (#12)
Re: pg_basebackup and wal streaming

On Fri, Mar 4, 2011 at 15:23, Yeb Havinga <yebhavinga@gmail.com> wrote:

On 2011-02-18 11:02, Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Is it a welcome idea to add a -X argument to specify a seperate xlog
directory like initdb -X ?

Probably not a bad idea - for a future enhancement ;)

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

#14Bruce Momjian
bruce@momjian.us
In reply to: Magnus Hagander (#13)
Re: pg_basebackup and wal streaming

Magnus Hagander wrote:

On Fri, Mar 4, 2011 at 15:23, Yeb Havinga <yebhavinga@gmail.com> wrote:

On 2011-02-18 11:02, Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Is it a welcome idea to add a -X argument to specify a seperate xlog
directory like initdb -X ?

Probably not a bad idea - for a future enhancement ;)

A TODO? Wording?

--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com

+ It's impossible for everything to be true. +

#15Magnus Hagander
magnus@hagander.net
In reply to: Bruce Momjian (#14)
Re: pg_basebackup and wal streaming

On Fri, Mar 11, 2011 at 16:19, Bruce Momjian <bruce@momjian.us> wrote:

Magnus Hagander wrote:

On Fri, Mar 4, 2011 at 15:23, Yeb Havinga <yebhavinga@gmail.com> wrote:

On 2011-02-18 11:02, Magnus Hagander wrote:

Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Is it a welcome idea to add a -X argument to specify a seperate xlog
directory like initdb -X ?

Probably not a bad idea - for a future enhancement ;)

A TODO?  Wording?

"Add -X parameter to pg_basebackup to specify a different directory
for px_xlog, like initdb"

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

#16Magnus Hagander
magnus@hagander.net
In reply to: Magnus Hagander (#10)
Re: pg_basebackup and wal streaming

On Sat, Feb 26, 2011 at 20:59, Magnus Hagander <magnus@hagander.net> wrote:

On Sat, Feb 26, 2011 at 20:48, Yeb Havinga <yebhavinga@gmail.com> wrote:

On 2011-02-26 18:19, Magnus Hagander wrote:

Attached is an updated version of the patch that includes these
changes, as well as Windows support and an initial cut at a ref page
for pg_receivexlog (needs some more detail still).

I'm testing a bit more (with the previous version, sorry) and got the
following while doing a stream backup from a cluster that was at that moment
doing a pgbench run with 1 synchronous standby.

mgrid@mg79:~$ pg_basebackup --xlog=stream -D /data -vP -h mg73 -U repuser
Password:
xlog start point: 15/720000C8
pg_basebackup: starting background WAL receiver
pg_basebackup: got WAL data offset 14744, expected 16791960424        )
5148915/5148026 kb g(100%) 1/1 tablespaces
xlog end point: 15/80568878
pg_basebackup: waiting for background process to finish streaming...
pg_basebackup: child process exited with error 1

Hmm, strange. What platform are you on?

I saw something similar *once* on Windows, but it then passed my tests
a lot of times in a row so I figured it was just a "didn't clean
properly" thing. Clearly there's a bug around.

What's the size of the latest WAL file that it did work on? Is it
16791960424 bytes? That's way way to large, but perhaps it's not
switching segment properly? (that value is supposedly the current
write position in the file..)

I'm in total monkey test mode here, so I don't even know if I'm not supposed
to do the streaming variant while other stuff is going on.

Oh yes, that's one of the main reasons to use it, so you should
definitely be able to do that!

I've posted a new version of this patch at
http://archives.postgresql.org/pgsql-hackers/2011-08/msg00776.php -
forgot there was an open thread on this, sorry.

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/