*** a/doc/src/sgml/ref/pg_basebackup.sgml
--- b/doc/src/sgml/ref/pg_basebackup.sgml
***************
*** 143,150 **** PostgreSQL documentation
       </varlistentry>
  
       <varlistentry>
!       <term><option>-x</option></term>
!       <term><option>--xlog</option></term>
        <listitem>
         <para>
          Includes the required transaction log files (WAL files) in the
--- 143,150 ----
       </varlistentry>
  
       <varlistentry>
!       <term><option>-x <replaceable class="parameter">method</replaceable></option></term>
!       <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
        <listitem>
         <para>
          Includes the required transaction log files (WAL files) in the
***************
*** 154,169 **** PostgreSQL documentation
          to consult the log archive, thus making this a completely standalone
          backup.
         </para>
!        <note>
!         <para>
!          The transaction log files are collected at the end of the backup.
!          Therefore, it is necessary for the
!          <xref linkend="guc-wal-keep-segments"> parameter to be set high
!          enough that the log is not removed before the end of the backup.
!          If the log has been rotated when it's time to transfer it, the
!          backup will fail and be unusable.
!         </para>
!        </note>
        </listitem>
       </varlistentry>
  
--- 154,196 ----
          to consult the log archive, thus making this a completely standalone
          backup.
         </para>
!        <para>
!         The following methods for collecting the transaction logs are
!         supported:
! 
!         <variablelist>
!          <varlistentry>
!           <term><literal>f</literal></term>
!           <term><literal>fetch</literal></term>
!           <listitem>
!            <para>
!             The transaction log files are collected at the end of the backup.
!             Therefore, it is necessary for the
!             <xref linkend="guc-wal-keep-segments"> parameter to be set high
!              enough that the log is not removed before the end of the backup.
!              If the log has been rotated when it's time to transfer it, the
!              backup will fail and be unusable.
!            </para>
!           </listitem>
!          </varlistentry>
! 
!          <varlistentry>
!           <term><literal>s</literal></term>
!           <term><literal>stream</literal></term>
!           <listitem>
!            <para>
!             Stream the transaction log while the backup is created. This will
!             open a second connection to the server and start streaming the
!             transaction log in parallel while running the backup. Therefore,
!             it will use up two slots configured by the
!             <xref linkend="guc-max-wal-senders"> parameter. As long as the
!              client can keep up with transaction log received, using this mode
!              requires no extra transaction logs to be saved on the master.
!            </para>
!           </listitem>
!          </varlistentry>
!         </variablelist>
!        </para>
        </listitem>
       </varlistentry>
  
***************
*** 261,266 **** PostgreSQL documentation
--- 288,307 ----
  
      <variablelist>
       <varlistentry>
+       <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+       <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+       <listitem>
+        <para>
+         Specifies the number of seconds between status packets sent back to the
+         server. This is required when streaming the transaction log (using
+         <literal>--xlog=stream</literal>) if replication timeout is configured
+         on the server, and allows for easier monitoring. The default value is
+         10 seconds.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
        <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
        <listitem>
*** /dev/null
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 0 ****
--- 1,270 ----
+ <!--
+ doc/src/sgml/ref/pg_receivexlog.sgml
+ PostgreSQL documentation
+ -->
+ 
+ <refentry id="app-pgreceivexlog">
+  <refmeta>
+   <refentrytitle>pg_receivexlog</refentrytitle>
+   <manvolnum>1</manvolnum>
+   <refmiscinfo>Application</refmiscinfo>
+  </refmeta>
+ 
+  <refnamediv>
+   <refname>pg_receivexlog</refname>
+   <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
+  </refnamediv>
+ 
+  <indexterm zone="app-pgreceivexlog">
+   <primary>pg_receivexlog</primary>
+  </indexterm>
+ 
+  <refsynopsisdiv>
+   <cmdsynopsis>
+    <command>pg_receivexlog</command>
+    <arg rep="repeat"><replaceable>option</></arg>
+   </cmdsynopsis>
+  </refsynopsisdiv>
+ 
+  <refsect1>
+   <title>
+    Description
+   </title>
+   <para>
+    <application>pg_receivexlog</application> is used to stream transaction log
+    from a running <productname>PostgreSQL</productname> cluster. The transaction
+    log is streamed using the streaming replication protocol, and is written
+    to a local directory of files. This directory can be used as the archive
+    location for doing a restore using point-in-time recovery (see
+    <xref linkend="continuous-archiving">).
+   </para>
+ 
+   <para>
+    <application>pg_receivexlog</application> streams the transaction
+    log in real time as it's being generated on the server, and does not wait
+    for segments to complete like <xref linkend="guc-archive-command"> does.
+    For this reason, it is not necessary to set
+    <xref linkend="guc-archive-timeout"> when using
+     <application>pg_receivexlog</application>.
+   </para>
+ 
+   <para>
+    The transaction log is streamed over a regular
+    <productname>PostgreSQL</productname> connection, and uses the
+    replication protocol. The connection must be
+    made with a user having <literal>REPLICATION</literal> permissions (see
+    <xref linkend="role-attributes">), and the user must be granted explicit
+    permissions in <filename>pg_hba.conf</filename>. The server must also
+    be configured with <xref linkend="guc-max-wal-senders"> set high enough
+    to leave at least one session available for the stream.
+   </para>
+  </refsect1>
+ 
+  <refsect1>
+   <title>Options</title>
+ 
+    <para>
+     The following command-line options control the location and format of the
+     output.
+ 
+     <variablelist>
+      <varlistentry>
+       <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+       <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
+       <listitem>
+        <para>
+         Directory to write the output to.
+        </para>
+        <para>
+         This parameter is required.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </para>
+    <para>
+     The following command-line options control the running of the program.
+ 
+     <variablelist>
+      <varlistentry>
+       <term><option>-v</option></term>
+       <term><option>--verbose</option></term>
+       <listitem>
+        <para>
+         Enables verbose mode.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+     </variablelist>
+    </para>
+ 
+    <para>
+     The following command-line options control the database connection parameters.
+ 
+     <variablelist>
+      <varlistentry>
+       <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+       <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+       <listitem>
+        <para>
+         Specifies the number of seconds between status packets sent back to the
+         server. This is required if replication timeout is configured on the
+         server, and allows for easier monitoring. The default value is
+         10 seconds.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
+       <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
+       <listitem>
+        <para>
+         Specifies the host name of the machine on which the server is
+         running.  If the value begins with a slash, it is used as the
+         directory for the Unix domain socket. The default is taken
+         from the <envar>PGHOST</envar> environment variable, if set,
+         else a Unix domain socket connection is attempted.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+       <term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
+       <listitem>
+        <para>
+         Specifies the TCP port or local Unix domain socket file
+         extension on which the server is listening for connections.
+         Defaults to the <envar>PGPORT</envar> environment variable, if
+         set, or a compiled-in default.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><option>-U <replaceable>username</replaceable></option></term>
+       <term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
+       <listitem>
+        <para>
+         User name to connect as.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><option>-w</></term>
+       <term><option>--no-password</></term>
+       <listitem>
+        <para>
+         Never issue a password prompt.  If the server requires
+         password authentication and a password is not available by
+         other means such as a <filename>.pgpass</filename> file, the
+         connection attempt will fail.  This option can be useful in
+         batch jobs and scripts where no user is present to enter a
+         password.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+       <term><option>-W</option></term>
+       <term><option>--password</option></term>
+       <listitem>
+        <para>
+         Force <application>pg_basebackup</application> to prompt for a
+         password before connecting to a database.
+        </para>
+ 
+        <para>
+         This option is never essential, since
+         <application>pg_bsaebackup</application> will automatically prompt
+         for a password if the server demands password authentication.
+         However, <application>pg_basebackup</application> will waste a
+         connection attempt finding out that the server wants a password.
+         In some cases it is worth typing <option>-W</> to avoid the extra
+         connection attempt.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </para>
+ 
+    <para>
+     Other, less commonly used, parameters are also available:
+ 
+     <variablelist>
+      <varlistentry>
+        <term><option>-V</></term>
+        <term><option>--version</></term>
+        <listitem>
+        <para>
+        Print the <application>pg_receivexlog</application> version and exit.
+        </para>
+        </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
+        <term><option>-?</></term>
+        <term><option>--help</></term>
+        <listitem>
+        <para>
+        Show help about <application>pg_receivexlog</application> command line
+        arguments, and exit.
+        </para>
+        </listitem>
+      </varlistentry>
+ 
+     </variablelist>
+    </para>
+ 
+  </refsect1>
+ 
+  <refsect1>
+   <title>Environment</title>
+ 
+   <para>
+    This utility, like most other <productname>PostgreSQL</> utilities,
+    uses the environment variables supported by <application>libpq</>
+    (see <xref linkend="libpq-envars">).
+   </para>
+ 
+  </refsect1>
+ 
+  <refsect1>
+   <title>Notes</title>
+ 
+   <para>
+    When using <application>pg_receivexlog</application> instead of
+    <xref linkend="guc-archive-command">, the server will continue to
+     recycle transaction log files even if the backups are not properly
+     archived, since there is no command that fails. This can be worked
+     around by having an <xref linkend="guc-archive-command"> that fails
+     when the file has not been properly archived yet.
+   </para>
+ 
+  </refsect1>
+ 
+  <refsect1>
+   <title>Examples</title>
+ 
+   <para>
+    To stream the transaction log from the server at
+    <literal>mydbserver</literal> and store it in the local directory
+    <filename>/usr/local/pgsql/archive</filename>:
+    <screen>
+     <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /usr/local/pgsql/archive</userinput>
+    </screen>
+   </para>
+  </refsect1>
+ 
+  <refsect1>
+   <title>See Also</title>
+ 
+   <simplelist type="inline">
+    <member><xref linkend="APP-PGBASEBACKUP"></member>
+   </simplelist>
+  </refsect1>
+ 
+ </refentry>
*** a/doc/src/sgml/reference.sgml
--- b/doc/src/sgml/reference.sgml
***************
*** 212,217 ****
--- 212,218 ----
     &pgConfig;
     &pgDump;
     &pgDumpall;
+    &pgReceivexlog;
     &pgRestore;
     &psqlRef;
     &reindexdb;
*** a/src/bin/pg_basebackup/.gitignore
--- b/src/bin/pg_basebackup/.gitignore
***************
*** 1 ****
--- 1,2 ----
  /pg_basebackup
+ /pg_receivexlog
*** a/src/bin/pg_basebackup/Makefile
--- b/src/bin/pg_basebackup/Makefile
***************
*** 18,38 **** include $(top_builddir)/src/Makefile.global
  
  override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
  
! OBJS=	pg_basebackup.o $(WIN32RES)
  
! all: pg_basebackup
  
! pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
! 	$(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
  
  install: all installdirs
  	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
  
  installdirs:
  	$(MKDIR_P) '$(DESTDIR)$(bindir)'
  
  uninstall:
  	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
  
  clean distclean maintainer-clean:
! 	rm -f pg_basebackup$(X) $(OBJS)
--- 18,43 ----
  
  override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
  
! OBJS=receivelog.o streamutil.o $(WIN32RES)
  
! all: pg_basebackup pg_receivexlog
  
! pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
! 	$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
! 
! pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
! 	$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
  
  install: all installdirs
  	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ 	$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
  
  installdirs:
  	$(MKDIR_P) '$(DESTDIR)$(bindir)'
  
  uninstall:
  	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ 	rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
  
  clean distclean maintainer-clean:
! 	rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 11,22 ****
   *-------------------------------------------------------------------------
   */
  
! #include "postgres_fe.h"
  #include "libpq-fe.h"
  
  #include <unistd.h>
  #include <dirent.h>
  #include <sys/stat.h>
  
  #ifdef HAVE_LIBZ
  #include <zlib.h>
--- 11,30 ----
   *-------------------------------------------------------------------------
   */
  
! /*
!  * We have to use postgres.h not postgres_fe.h here, because there's so much
!  * backend-only stuff in the XLOG include files we need.  But we need a
!  * frontend-ish environment otherwise.	Hence this ugly hack.
!  */
! #define FRONTEND 1
! #include "postgres.h"
  #include "libpq-fe.h"
  
  #include <unistd.h>
  #include <dirent.h>
  #include <sys/stat.h>
+ #include <sys/types.h>
+ #include <sys/wait.h>
  
  #ifdef HAVE_LIBZ
  #include <zlib.h>
***************
*** 24,32 ****
  
  #include "getopt_long.h"
  
  
  /* Global options */
- static const char *progname;
  char	   *basedir = NULL;
  char		format = 'p';		/* p(lain)/t(ar) */
  char	   *label = "pg_basebackup base backup";
--- 32,42 ----
  
  #include "getopt_long.h"
  
+ #include "receivelog.h"
+ #include "streamutil.h"
+ 
  
  /* Global options */
  char	   *basedir = NULL;
  char		format = 'p';		/* p(lain)/t(ar) */
  char	   *label = "pg_basebackup base backup";
***************
*** 34,71 **** bool		showprogress = false;
  int			verbose = 0;
  int			compresslevel = 0;
  bool		includewal = false;
  bool		fastcheckpoint = false;
! char	   *dbhost = NULL;
! char	   *dbuser = NULL;
! char	   *dbport = NULL;
! int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
  
  /* Progress counters */
  static uint64 totalsize;
  static uint64 totaldone;
  static int	tablespacecount;
  
! /* Connection kept global so we can disconnect easily */
! static PGconn *conn = NULL;
  
! #define disconnect_and_exit(code)				\
! 	{											\
! 	if (conn != NULL) PQfinish(conn);			\
! 	exit(code);									\
! 	}
  
  /* Function headers */
- static char *xstrdup(const char *s);
- static void *xmalloc0(int size);
  static void usage(void);
  static void verify_dir_is_empty_or_create(char *dirname);
  static void progress_report(int tablespacenum, const char *filename);
- static PGconn *GetConnection(void);
  
  static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
  static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
  static void BaseBackup(void);
  
  #ifdef HAVE_LIBZ
  static const char *
  get_gz_error(gzFile *gzf)
--- 44,81 ----
  int			verbose = 0;
  int			compresslevel = 0;
  bool		includewal = false;
+ bool		streamwal = false;
  bool		fastcheckpoint = false;
! int			standby_message_timeout = 10;		/* 10 sec = default */
  
  /* Progress counters */
  static uint64 totalsize;
  static uint64 totaldone;
  static int	tablespacecount;
  
! /* Pipe to communicate with background wal receiver process */
! #ifndef WIN32
! static int	bgpipe[2] = {-1, -1};
! #endif
  
! /* Handle to child process */
! static pid_t bgchild = -1;
! 
! /* End position for xlog streaming, empty string if unknown yet */
! static XLogRecPtr xlogendptr;
! static int	has_xlogendptr = 0;
  
  /* Function headers */
  static void usage(void);
  static void verify_dir_is_empty_or_create(char *dirname);
  static void progress_report(int tablespacenum, const char *filename);
  
  static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
  static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
  static void BaseBackup(void);
  
+ static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+ 
  #ifdef HAVE_LIBZ
  static const char *
  get_gz_error(gzFile *gzf)
***************
*** 81,119 **** get_gz_error(gzFile *gzf)
  }
  #endif
  
- /*
-  * strdup() and malloc() replacements that prints an error and exits
-  * if something goes wrong. Can never return NULL.
-  */
- static char *
- xstrdup(const char *s)
- {
- 	char	   *result;
- 
- 	result = strdup(s);
- 	if (!result)
- 	{
- 		fprintf(stderr, _("%s: out of memory\n"), progname);
- 		exit(1);
- 	}
- 	return result;
- }
- 
- static void *
- xmalloc0(int size)
- {
- 	void	   *result;
- 
- 	result = malloc(size);
- 	if (!result)
- 	{
- 		fprintf(stderr, _("%s: out of memory\n"), progname);
- 		exit(1);
- 	}
- 	MemSet(result, 0, size);
- 	return result;
- }
- 
  
  static void
  usage(void)
--- 91,96 ----
***************
*** 125,131 **** usage(void)
  	printf(_("\nOptions controlling the output:\n"));
  	printf(_("  -D, --pgdata=DIRECTORY   receive base backup into directory\n"));
  	printf(_("  -F, --format=p|t         output format (plain, tar)\n"));
! 	printf(_("  -x, --xlog               include required WAL files in backup\n"));
  	printf(_("  -z, --gzip               compress tar output\n"));
  	printf(_("  -Z, --compress=0-9       compress tar output with given compression level\n"));
  	printf(_("\nGeneral options:\n"));
--- 102,108 ----
  	printf(_("\nOptions controlling the output:\n"));
  	printf(_("  -D, --pgdata=DIRECTORY   receive base backup into directory\n"));
  	printf(_("  -F, --format=p|t         output format (plain, tar)\n"));
! 	printf(_("  -x, --xlog=fetch|stream  include required WAL files in backup\n"));
  	printf(_("  -z, --gzip               compress tar output\n"));
  	printf(_("  -Z, --compress=0-9       compress tar output with given compression level\n"));
  	printf(_("\nGeneral options:\n"));
***************
*** 137,142 **** usage(void)
--- 114,120 ----
  	printf(_("  --help                   show this help, then exit\n"));
  	printf(_("  --version                output version information, then exit\n"));
  	printf(_("\nConnection options:\n"));
+ 	printf(_("  -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
  	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
  	printf(_("  -p, --port=PORT          database server port number\n"));
  	printf(_("  -U, --username=NAME      connect as specified database user\n"));
***************
*** 147,152 **** usage(void)
--- 125,321 ----
  
  
  /*
+  * Called in the background process whenever a complete segment of WAL
+  * has been received.
+  * On Unix, we check to see if there is any data on our pipe
+  * (which would mean we have a stop position), and if it is, check if
+  * it is time to stop.
+  * On Windows, we are in a single process, so we can just check if it's
+  * time to stop.
+  */
+ static bool
+ segment_callback(XLogRecPtr segendpos, uint32 timeline)
+ {
+ 	if (!has_xlogendptr)
+ 	{
+ #ifndef WIN32
+ 		fd_set		fds;
+ 		struct timeval tv;
+ 		int			r;
+ 
+ 		/*
+ 		 * Don't have the end pointer yet - check our pipe to see if it has
+ 		 * been sent yet.
+ 		 */
+ 		FD_ZERO(&fds);
+ 		FD_SET(bgpipe[0], &fds);
+ 
+ 		MemSet(&tv, 0, sizeof(tv));
+ 
+ 		r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+ 		if (r == 1)
+ 		{
+ 			char		xlogend[64];
+ 
+ 			MemSet(xlogend, 0, sizeof(xlogend));
+ 			r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+ 			if (r < 0)
+ 			{
+ 				fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+ 						progname, strerror(errno));
+ 				exit(1);
+ 			}
+ 
+ 			if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ 			{
+ 				fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ 						progname, xlogend);
+ 				exit(1);
+ 			}
+ 			has_xlogendptr = 1;
+ 
+ 			/*
+ 			 * Fall through to check if we've reached the point further
+ 			 * already.
+ 			 */
+ 		}
+ 		else
+ 		{
+ 			/*
+ 			 * No data received on the pipe means we don't know the end
+ 			 * position yet - so just say it's not time to stop yet.
+ 			 */
+ 			return false;
+ 		}
+ #else
+ 
+ 		/*
+ 		 * On win32, has_xlogendptr is set by the main thread, so if it's not
+ 		 * set here, we just go back and wait until it shows up.
+ 		 */
+ 		return false;
+ #endif
+ 	}
+ 
+ 	/*
+ 	 * At this point we have an end pointer, so compare it to the current
+ 	 * position to figure out if it's time to stop.
+ 	 */
+ 	if (segendpos.xlogid > xlogendptr.xlogid ||
+ 		(segendpos.xlogid == xlogendptr.xlogid &&
+ 		 segendpos.xrecoff >= xlogendptr.xrecoff))
+ 		return true;
+ 
+ 	/*
+ 	 * Have end pointer, but haven't reached it yet - so tell the caller to
+ 	 * keep streaming.
+ 	 */
+ 	return false;
+ }
+ 
+ typedef struct
+ {
+ 	PGconn	   *bgconn;
+ 	XLogRecPtr	startptr;
+ 	char		xlogdir[MAXPGPATH];
+ 	int			timeline;
+ }	logstreamer_param;
+ 
+ static int
+ LogStreamerMain(logstreamer_param * param)
+ {
+ 	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
+ 						   param->xlogdir, segment_callback, NULL,
+ 						   standby_message_timeout))
+ 
+ 		/*
+ 		 * Any errors will already have been reported in the function process,
+ 		 * but we need to tell the parent that we didn't shutdown in a nice
+ 		 * way.
+ 		 */
+ 		return 1;
+ 
+ 	PQfinish(param->bgconn);
+ 	return 0;
+ }
+ 
+ /*
+  * Initiate background process for receiving xlog during the backup.
+  * The background stream will use its own database connection so we can
+  * stream the logfile in parallel with the backups.
+  */
+ static void
+ StartLogStreamer(char *startpos, uint32 timeline)
+ {
+ 	logstreamer_param *param;
+ 
+ 	param = xmalloc0(sizeof(logstreamer_param));
+ 	param->timeline = timeline;
+ 
+ 	/* Convert the starting position */
+ 	if (sscanf(startpos, "%X/%X", &param->startptr.xlogid, &param->startptr.xrecoff) != 2)
+ 	{
+ 		fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+ 				progname, startpos);
+ 		disconnect_and_exit(1);
+ 	}
+ 	/* Round off to even segment position */
+ 	param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+ 
+ #ifndef WIN32
+ 	/* Create our background pipe */
+ 	if (pgpipe(bgpipe) < 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+ 				progname, strerror(errno));
+ 		disconnect_and_exit(1);
+ 	}
+ #endif
+ 
+ 	/* Get a second connection */
+ 	param->bgconn = GetConnection();
+ 
+ 	/*
+ 	 * Always in plain format, so we can write to basedir/pg_xlog. But the
+ 	 * directory entry in the tar file may arrive later, so make sure it's
+ 	 * created before we start.
+ 	 */
+ 	snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+ 	verify_dir_is_empty_or_create(param->xlogdir);
+ 
+ 	/*
+ 	 * Start a child process and tell it to start streaming. On Unix, this is
+ 	 * a fork(). On Windows, we create a thread.
+ 	 */
+ #ifndef WIN32
+ 	bgchild = fork();
+ 	if (bgchild == 0)
+ 	{
+ 		/* in child process */
+ 		exit(LogStreamerMain(param));
+ 	}
+ 	else if (bgchild < 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not create background process: %s\n"),
+ 				progname, strerror(errno));
+ 		disconnect_and_exit(1);
+ 	}
+ 
+ 	/*
+ 	 * Else we are in the parent process and all is well.
+ 	 */
+ #else							/* WIN32 */
+ 	bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+ 	if (bgchild == 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not create background thread: %s\n"),
+ 				progname, strerror(errno));
+ 		disconnect_and_exit(1);
+ 	}
+ #endif
+ }
+ 
+ /*
   * Verify that the given directory exists and is empty. If it does not
   * exist, it is created. If it exists but is not empty, an error will
   * be give and the process ended.
***************
*** 492,502 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
  		strcpy(current_path, PQgetvalue(res, rownum, 1));
  
  	/*
- 	 * Make sure we're unpacking into an empty directory
- 	 */
- 	verify_dir_is_empty_or_create(current_path);
- 
- 	/*
  	 * Get the COPY data
  	 */
  	res = PQgetResult(conn);
--- 661,666 ----
***************
*** 586,598 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
  					/*
  					 * Directory
  					 */
! 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
  					if (mkdir(filename, S_IRWXU) != 0)
  					{
! 						fprintf(stderr,
  							_("%s: could not create directory \"%s\": %s\n"),
! 								progname, filename, strerror(errno));
! 						disconnect_and_exit(1);
  					}
  #ifndef WIN32
  					if (chmod(filename, (mode_t) filemode))
--- 750,770 ----
  					/*
  					 * Directory
  					 */
! 					filename[strlen(filename) - 1] = '\0';		/* Remove trailing slash */
  					if (mkdir(filename, S_IRWXU) != 0)
  					{
! 						/*
! 						 * When streaming WAL, pg_xlog will have been created
! 						 * by the wal receiver process, so just ignore failure
! 						 * on that.
! 						 */
! 						if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
! 						{
! 							fprintf(stderr,
  							_("%s: could not create directory \"%s\": %s\n"),
! 									progname, filename, strerror(errno));
! 							disconnect_and_exit(1);
! 						}
  					}
  #ifndef WIN32
  					if (chmod(filename, (mode_t) filemode))
***************
*** 605,616 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
  					/*
  					 * Symbolic link
  					 */
! 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
  					if (symlink(&copybuf[157], filename) != 0)
  					{
  						fprintf(stderr,
  								_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
! 								progname, filename, &copybuf[157], strerror(errno));
  						disconnect_and_exit(1);
  					}
  				}
--- 777,788 ----
  					/*
  					 * Symbolic link
  					 */
! 					filename[strlen(filename) - 1] = '\0';		/* Remove trailing slash */
  					if (symlink(&copybuf[157], filename) != 0)
  					{
  						fprintf(stderr,
  								_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
! 						 progname, filename, &copybuf[157], strerror(errno));
  						disconnect_and_exit(1);
  					}
  				}
***************
*** 703,796 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
  }
  
  
- static PGconn *
- GetConnection(void)
- {
- 	PGconn	   *tmpconn;
- 	int			argcount = 4;	/* dbname, replication, fallback_app_name,
- 								 * password */
- 	int			i;
- 	const char **keywords;
- 	const char **values;
- 	char	   *password = NULL;
- 
- 	if (dbhost)
- 		argcount++;
- 	if (dbuser)
- 		argcount++;
- 	if (dbport)
- 		argcount++;
- 
- 	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
- 	values = xmalloc0((argcount + 1) * sizeof(*values));
- 
- 	keywords[0] = "dbname";
- 	values[0] = "replication";
- 	keywords[1] = "replication";
- 	values[1] = "true";
- 	keywords[2] = "fallback_application_name";
- 	values[2] = progname;
- 	i = 3;
- 	if (dbhost)
- 	{
- 		keywords[i] = "host";
- 		values[i] = dbhost;
- 		i++;
- 	}
- 	if (dbuser)
- 	{
- 		keywords[i] = "user";
- 		values[i] = dbuser;
- 		i++;
- 	}
- 	if (dbport)
- 	{
- 		keywords[i] = "port";
- 		values[i] = dbport;
- 		i++;
- 	}
- 
- 	while (true)
- 	{
- 		if (dbgetpassword == 1)
- 		{
- 			/* Prompt for a password */
- 			password = simple_prompt(_("Password: "), 100, false);
- 			keywords[argcount - 1] = "password";
- 			values[argcount - 1] = password;
- 		}
- 
- 		tmpconn = PQconnectdbParams(keywords, values, true);
- 		if (password)
- 			free(password);
- 
- 		if (PQstatus(tmpconn) == CONNECTION_BAD &&
- 			PQconnectionNeedsPassword(tmpconn) &&
- 			dbgetpassword != -1)
- 		{
- 			dbgetpassword = 1;	/* ask for password next time */
- 			PQfinish(tmpconn);
- 			continue;
- 		}
- 
- 		if (PQstatus(tmpconn) != CONNECTION_OK)
- 		{
- 			fprintf(stderr, _("%s: could not connect to server: %s"),
- 					progname, PQerrorMessage(tmpconn));
- 			exit(1);
- 		}
- 
- 		/* Connection ok! */
- 		free(values);
- 		free(keywords);
- 		return tmpconn;
- 	}
- }
- 
  static void
  BaseBackup(void)
  {
  	PGresult   *res;
  	char		current_path[MAXPGPATH];
  	char		escaped_label[MAXPGPATH];
  	int			i;
--- 875,885 ----
  }
  
  
  static void
  BaseBackup(void)
  {
  	PGresult   *res;
+ 	uint32		timeline;
  	char		current_path[MAXPGPATH];
  	char		escaped_label[MAXPGPATH];
  	int			i;
***************
*** 803,815 **** BaseBackup(void)
  	conn = GetConnection();
  
  	/*
  	 * Start the actual backup
  	 */
  	PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
  	snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
  			 escaped_label,
  			 showprogress ? "PROGRESS" : "",
! 			 includewal ? "WAL" : "",
  			 fastcheckpoint ? "FAST" : "",
  			 includewal ? "NOWAIT" : "");
  
--- 892,923 ----
  	conn = GetConnection();
  
  	/*
+ 	 * Run IDENFITY_SYSTEM so we can get the timeline
+ 	 */
+ 	res = PQexec(conn, "IDENTIFY_SYSTEM");
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 	{
+ 		fprintf(stderr, _("%s: could not identify system: %s\n"),
+ 				progname, PQerrorMessage(conn));
+ 		disconnect_and_exit(1);
+ 	}
+ 	if (PQntuples(res) != 1)
+ 	{
+ 		fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ 				progname, PQntuples(res));
+ 		disconnect_and_exit(1);
+ 	}
+ 	timeline = atoi(PQgetvalue(res, 0, 1));
+ 	PQclear(res);
+ 
+ 	/*
  	 * Start the actual backup
  	 */
  	PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
  	snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
  			 escaped_label,
  			 showprogress ? "PROGRESS" : "",
! 			 includewal && !streamwal ? "WAL" : "",
  			 fastcheckpoint ? "FAST" : "",
  			 includewal ? "NOWAIT" : "");
  
***************
*** 888,893 **** BaseBackup(void)
--- 996,1013 ----
  	}
  
  	/*
+ 	 * If we're streaming WAL, start the streaming session before we start
+ 	 * receiving the actual data chunks.
+ 	 */
+ 	if (streamwal)
+ 	{
+ 		if (verbose)
+ 			fprintf(stderr, _("%s: starting background WAL receiver\n"),
+ 					progname);
+ 		StartLogStreamer(xlogstart, timeline);
+ 	}
+ 
+ 	/*
  	 * Start receiving chunks
  	 */
  	for (i = 0; i < PQntuples(res); i++)
***************
*** 934,939 **** BaseBackup(void)
--- 1054,1145 ----
  		disconnect_and_exit(1);
  	}
  
+ 	if (bgchild > 0)
+ 	{
+ 		int			status;
+ 
+ #ifndef WIN32
+ 		int			r;
+ #endif
+ 
+ 		if (verbose)
+ 			fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+ 
+ #ifndef WIN32
+ 		if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+ 		{
+ 			fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+ 					progname, strerror(errno));
+ 			disconnect_and_exit(1);
+ 		}
+ 
+ 		/* Just wait for the background process to exit */
+ 		r = waitpid(bgchild, &status, 0);
+ 		if (r == -1)
+ 		{
+ 			fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+ 					progname, strerror(errno));
+ 			disconnect_and_exit(1);
+ 		}
+ 		if (r != bgchild)
+ 		{
+ 			fprintf(stderr, _("%s: child %i died, expected %i\n"),
+ 					progname, r, bgchild);
+ 			disconnect_and_exit(1);
+ 		}
+ 		if (!WIFEXITED(status))
+ 		{
+ 			fprintf(stderr, _("%s: child process did not exit normally\n"),
+ 					progname);
+ 			disconnect_and_exit(1);
+ 		}
+ 		if (WEXITSTATUS(status) != 0)
+ 		{
+ 			fprintf(stderr, _("%s: child process exited with error %i\n"),
+ 					progname, WEXITSTATUS(status));
+ 			disconnect_and_exit(1);
+ 		}
+ 		/* Exited normally, we're happy! */
+ #else							/* WIN32 */
+ 
+ 		/*
+ 		 * On Windows, since we are in the same process, we can just store the
+ 		 * value directly in the variable, and then set the flag that says
+ 		 * it's there.
+ 		 */
+ 		if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ 		{
+ 			fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ 					progname, xlogend);
+ 			exit(1);
+ 		}
+ 		InterlockedIncrement(&has_xlogendptr);
+ 
+ 		/* First wait for the thread to exit */
+ 		if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+ 		{
+ 			_dosmaperr(GetLastError());
+ 			fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+ 					progname, strerror(errno));
+ 			disconnect_and_exit(1);
+ 		}
+ 		if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+ 		{
+ 			_dosmaperr(GetLastError());
+ 			fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+ 					progname, strerror(errno));
+ 			disconnect_and_exit(1);
+ 		}
+ 		if (status != 0)
+ 		{
+ 			fprintf(stderr, _("%s: child thread exited with error %u\n"),
+ 					progname, status);
+ 			disconnect_and_exit(1);
+ 		}
+ 		/* Exited normally, we're happy */
+ #endif
+ 	}
+ 
  	/*
  	 * End of copy data. Final result is already checked inside the loop.
  	 */
***************
*** 953,959 **** main(int argc, char **argv)
  		{"pgdata", required_argument, NULL, 'D'},
  		{"format", required_argument, NULL, 'F'},
  		{"checkpoint", required_argument, NULL, 'c'},
! 		{"xlog", no_argument, NULL, 'x'},
  		{"gzip", no_argument, NULL, 'z'},
  		{"compress", required_argument, NULL, 'Z'},
  		{"label", required_argument, NULL, 'l'},
--- 1159,1165 ----
  		{"pgdata", required_argument, NULL, 'D'},
  		{"format", required_argument, NULL, 'F'},
  		{"checkpoint", required_argument, NULL, 'c'},
! 		{"xlog", required_argument, NULL, 'x'},
  		{"gzip", no_argument, NULL, 'z'},
  		{"compress", required_argument, NULL, 'Z'},
  		{"label", required_argument, NULL, 'l'},
***************
*** 962,967 **** main(int argc, char **argv)
--- 1168,1174 ----
  		{"username", required_argument, NULL, 'U'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
+ 		{"statusint", required_argument, NULL, 's'},
  		{"verbose", no_argument, NULL, 'v'},
  		{"progress", no_argument, NULL, 'P'},
  		{NULL, 0, NULL, 0}
***************
*** 988,994 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 1195,1201 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 1010,1015 **** main(int argc, char **argv)
--- 1217,1234 ----
  				break;
  			case 'x':
  				includewal = true;
+ 				if (strcmp(optarg, "f") == 0 ||
+ 					strcmp(optarg, "fetch") == 0)
+ 					streamwal = false;
+ 				else if (strcmp(optarg, "s") == 0 ||
+ 						 strcmp(optarg, "stream") == 0)
+ 					streamwal = true;
+ 				else
+ 				{
+ 					fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
  				break;
  			case 'l':
  				label = xstrdup(optarg);
***************
*** 1057,1062 **** main(int argc, char **argv)
--- 1276,1290 ----
  			case 'W':
  				dbgetpassword = 1;
  				break;
+ 			case 's':
+ 				standby_message_timeout = atoi(optarg);
+ 				if (standby_message_timeout < 0)
+ 				{
+ 					fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'v':
  				verbose++;
  				break;
***************
*** 1111,1116 **** main(int argc, char **argv)
--- 1339,1354 ----
  		exit(1);
  	}
  
+ 	if (format != 'p' && streamwal)
+ 	{
+ 		fprintf(stderr,
+ 				_("%s: wal streaming can only be used in plain mode\n"),
+ 				progname);
+ 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ 				progname);
+ 		exit(1);
+ 	}
+ 
  #ifndef HAVE_LIBZ
  	if (compresslevel != 0)
  	{
*** /dev/null
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 0 ****
--- 1,440 ----
+ /*-------------------------------------------------------------------------
+  *
+  * pg_receivexlog.c - receive streaming transaction log data and write it
+  *					  to a local file.
+  *
+  * Author: Magnus Hagander <magnus@hagander.net>
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *		  src/bin/pg_basebackup/pg_receivexlog.c
+  *-------------------------------------------------------------------------
+  */
+ 
+ /*
+  * We have to use postgres.h not postgres_fe.h here, because there's so much
+  * backend-only stuff in the XLOG include files we need.  But we need a
+  * frontend-ish environment otherwise.	Hence this ugly hack.
+  */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "libpq-fe.h"
+ #include "libpq/pqsignal.h"
+ #include "access/xlog_internal.h"
+ 
+ #include "receivelog.h"
+ #include "streamutil.h"
+ 
+ #include <dirent.h>
+ #include <sys/stat.h>
+ #include <sys/types.h>
+ #include <unistd.h>
+ 
+ #include "getopt_long.h"
+ 
+ /* Global options */
+ char	   *basedir = NULL;
+ int			verbose = 0;
+ int			standby_message_timeout = 10;		/* 10 sec = default */
+ volatile bool time_to_abort = false;
+ 
+ 
+ static void usage(void);
+ static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+ static void StreamLog();
+ static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+ 
+ static void
+ usage(void)
+ {
+ 	printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+ 		   progname);
+ 	printf(_("Usage:\n"));
+ 	printf(_("  %s [OPTION]...\n"), progname);
+ 	printf(_("\nOptions controlling the output:\n"));
+ 	printf(_("  -D, --dir=directory       receive xlog files into this directory\n"));
+ 	printf(_("\nGeneral options:\n"));
+ 	printf(_("  -v, --verbose             output verbose messages\n"));
+ 	printf(_("  -?, --help                show this help, then exit\n"));
+ 	printf(_("  -V, --version             output version information, then exit\n"));
+ 	printf(_("\nConnection options:\n"));
+ 	printf(_("  -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
+ 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
+ 	printf(_("  -p, --port=PORT          database server port number\n"));
+ 	printf(_("  -U, --username=NAME      connect as specified database user\n"));
+ 	printf(_("  -w, --no-password        never prompt for password\n"));
+ 	printf(_("  -W, --password           force password prompt (should happen automatically)\n"));
+ 	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+ }
+ 
+ static bool
+ segment_callback(XLogRecPtr segendpos, uint32 timeline)
+ {
+ 	char		fn[MAXPGPATH];
+ 	struct stat statbuf;
+ 
+ 	if (verbose)
+ 		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+ 				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+ 
+ 	/*
+ 	 * Check if there is a partial file for the name we just finished, and if
+ 	 * there is, remove it under the assumption that we have now got all the
+ 	 * data we need.
+ 	 */
+ 	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+ 	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+ 			 basedir, timeline,
+ 			 segendpos.xlogid,
+ 			 segendpos.xrecoff / XLOG_SEG_SIZE);
+ 	if (stat(fn, &statbuf) == 0)
+ 	{
+ 		/* File existed, get rid of it */
+ 		if (verbose)
+ 			fprintf(stderr, _("%s: removing file \"%s\"\n"),
+ 					progname, fn);
+ 		unlink(fn);
+ 	}
+ 
+ 	/*
+ 	 * Never abort from this - we handle all aborting in continue_streaming()
+ 	 */
+ 	return false;
+ }
+ 
+ static bool
+ continue_streaming()
+ {
+ 	if (time_to_abort)
+ 	{
+ 		fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
+ 				progname);
+ 		return true;
+ 	}
+ 	return false;
+ }
+ 
+ /*
+  * Determine starting location for streaming, based on:
+  * 1. If there are existing xlog segments, start at the end of the last one
+  * 2. If the last one is a partial segment, rename it and start over, since
+  *	  we don't sync after every write.
+  * 3. If no existing xlog exists, start from the beginning of the current
+  *	  WAL segment.
+  */
+ static XLogRecPtr
+ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+ {
+ 	DIR		   *dir;
+ 	struct dirent *dirent;
+ 	int			i;
+ 	bool		b;
+ 	XLogRecPtr	high = {0, 0};
+ 
+ 	dir = opendir(basedir);
+ 	if (dir == NULL)
+ 	{
+ 		fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+ 				progname, basedir, strerror(errno));
+ 		disconnect_and_exit(1);
+ 	}
+ 
+ 	while ((dirent = readdir(dir)) != NULL)
+ 	{
+ 		char		fullpath[MAXPGPATH];
+ 		struct stat statbuf;
+ 		uint32		tli,
+ 					log,
+ 					seg;
+ 
+ 		if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+ 			continue;
+ 
+ 		/* xlog files are always 24 characters */
+ 		if (strlen(dirent->d_name) != 24)
+ 			continue;
+ 
+ 		/* Filenames are always made out of 0-9 and A-F */
+ 		b = false;
+ 		for (i = 0; i < 24; i++)
+ 		{
+ 			if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+ 				!(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+ 			{
+ 				b = true;
+ 				break;
+ 			}
+ 		}
+ 		if (b)
+ 			continue;
+ 
+ 		/*
+ 		 * Looks like an xlog file. Parse it's position.
+ 		 */
+ 		if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+ 		{
+ 			fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+ 					progname, dirent->d_name);
+ 			disconnect_and_exit(1);
+ 		}
+ 		log *= XLOG_SEG_SIZE;
+ 
+ 		/* Ignore any files that are for another timeline */
+ 		if (tli != currenttimeline)
+ 			continue;
+ 
+ 		/* Check if this is a completed segment or not */
+ 		snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+ 		if (stat(fullpath, &statbuf) != 0)
+ 		{
+ 			fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+ 					progname, fullpath, strerror(errno));
+ 			disconnect_and_exit(1);
+ 		}
+ 
+ 		if (statbuf.st_size == 16 * 1024 * 1024)
+ 		{
+ 			/* Completed segment */
+ 			if (log > high.xlogid ||
+ 				(log == high.xlogid && seg > high.xrecoff))
+ 			{
+ 				high.xlogid = log;
+ 				high.xrecoff = seg;
+ 				continue;
+ 			}
+ 		}
+ 		else
+ 		{
+ 			/*
+ 			 * This is a partial file. Rename it out of the way.
+ 			 */
+ 			char		newfn[MAXPGPATH];
+ 
+ 			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+ 					progname, dirent->d_name, dirent->d_name);
+ 
+ 			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+ 					 basedir, dirent->d_name);
+ 
+ 			if (stat(newfn, &statbuf) == 0)
+ 			{
+ 				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+ 						progname, newfn);
+ 				disconnect_and_exit(1);
+ 			}
+ 			if (rename(fullpath, newfn) != 0)
+ 			{
+ 				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+ 						progname, fullpath, newfn, strerror(errno));
+ 				disconnect_and_exit(1);
+ 			}
+ 
+ 			/* Don't continue looking for more, we assume this is the last */
+ 			break;
+ 		}
+ 	}
+ 
+ 	closedir(dir);
+ 
+ 	if (high.xlogid > 0 && high.xrecoff > 0)
+ 		return high;
+ 
+ 	return currentpos;
+ }
+ 
+ /*
+  * Start the log streaming
+  */
+ static void
+ StreamLog(void)
+ {
+ 	PGresult   *res;
+ 	uint32		timeline;
+ 	XLogRecPtr	startpos;
+ 
+ 	/*
+ 	 * Connect in replication mode to the server
+ 	 */
+ 	conn = GetConnection();
+ 
+ 	/*
+ 	 * Run IDENFITY_SYSTEM so we can get the timeline and current xlog
+ 	 * position.
+ 	 */
+ 	res = PQexec(conn, "IDENTIFY_SYSTEM");
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 	{
+ 		fprintf(stderr, _("%s: could not identify system: %s\n"),
+ 				progname, PQerrorMessage(conn));
+ 		disconnect_and_exit(1);
+ 	}
+ 	if (PQntuples(res) != 1)
+ 	{
+ 		fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ 				progname, PQntuples(res));
+ 		disconnect_and_exit(1);
+ 	}
+ 	timeline = atoi(PQgetvalue(res, 0, 1));
+ 	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+ 	{
+ 		fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+ 				progname, PQgetvalue(res, 0, 2));
+ 		disconnect_and_exit(1);
+ 	}
+ 	PQclear(res);
+ 
+ 	/*
+ 	 * Figure out where to start streaming.
+ 	 */
+ 	startpos = FindStreamingStart(startpos, timeline);
+ 
+ 	/*
+ 	 * Always start streaming at the beginning of a segment
+ 	 */
+ 	startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+ 
+ 	/*
+ 	 * Start the replication
+ 	 */
+ 	if (verbose)
+ 		fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+ 				progname, startpos.xlogid, startpos.xrecoff, timeline);
+ 
+ 	ReceiveXlogStream(conn, startpos, timeline, basedir,
+ 					  segment_callback, continue_streaming,
+ 					  standby_message_timeout);
+ }
+ 
+ /*
+  * When sigint is called, just tell the system to exit at the next possible
+  * moment.
+  */
+ static void
+ sigint_handler(int signum)
+ {
+ 	time_to_abort = true;
+ }
+ 
+ int
+ main(int argc, char **argv)
+ {
+ 	static struct option long_options[] = {
+ 		{"help", no_argument, NULL, '?'},
+ 		{"version", no_argument, NULL, 'V'},
+ 		{"dir", required_argument, NULL, 'D'},
+ 		{"host", required_argument, NULL, 'h'},
+ 		{"port", required_argument, NULL, 'p'},
+ 		{"username", required_argument, NULL, 'U'},
+ 		{"no-password", no_argument, NULL, 'w'},
+ 		{"password", no_argument, NULL, 'W'},
+ 		{"statusint", required_argument, NULL, 's'},
+ 		{"verbose", no_argument, NULL, 'v'},
+ 		{NULL, 0, NULL, 0}
+ 	};
+ 	int			c;
+ 
+ 	int			option_index;
+ 
+ 	progname = get_progname(argv[0]);
+ 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+ 
+ 	if (argc > 1)
+ 	{
+ 		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ 		{
+ 			usage();
+ 			exit(0);
+ 		}
+ 		else if (strcmp(argv[1], "-V") == 0
+ 				 || strcmp(argv[1], "--version") == 0)
+ 		{
+ 			puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+ 			exit(0);
+ 		}
+ 	}
+ 
+ 	while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv",
+ 							long_options, &option_index)) != -1)
+ 	{
+ 		switch (c)
+ 		{
+ 			case 'D':
+ 				basedir = xstrdup(optarg);
+ 				break;
+ 			case 'h':
+ 				dbhost = xstrdup(optarg);
+ 				break;
+ 			case 'p':
+ 				if (atoi(optarg) <= 0)
+ 				{
+ 					fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				dbport = xstrdup(optarg);
+ 				break;
+ 			case 'U':
+ 				dbuser = xstrdup(optarg);
+ 				break;
+ 			case 'w':
+ 				dbgetpassword = -1;
+ 				break;
+ 			case 'W':
+ 				dbgetpassword = 1;
+ 				break;
+ 			case 's':
+ 				standby_message_timeout = atoi(optarg);
+ 				if (standby_message_timeout < 0)
+ 				{
+ 					fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
+ 			case 'v':
+ 				verbose++;
+ 				break;
+ 			default:
+ 
+ 				/*
+ 				 * getopt_long already emitted a complaint
+ 				 */
+ 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ 						progname);
+ 				exit(1);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Any non-option arguments?
+ 	 */
+ 	if (optind < argc)
+ 	{
+ 		fprintf(stderr,
+ 				_("%s: too many command-line arguments (first is \"%s\")\n"),
+ 				progname, argv[optind]);
+ 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ 				progname);
+ 		exit(1);
+ 	}
+ 
+ 	/*
+ 	 * Required arguments
+ 	 */
+ 	if (basedir == NULL)
+ 	{
+ 		fprintf(stderr, _("%s: no target directory specified\n"), progname);
+ 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ 				progname);
+ 		exit(1);
+ 	}
+ 
+ #ifndef WIN32
+ 	pqsignal(SIGINT, sigint_handler);
+ #endif
+ 
+ 	StreamLog();
+ 
+ 	exit(0);
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 0 ****
--- 1,353 ----
+ /*-------------------------------------------------------------------------
+  *
+  * receivelog.c - receive transaction log files using the streaming
+  *				  replication protocol.
+  *
+  * Author: Magnus Hagander <magnus@hagander.net>
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *		  src/bin/pg_basebackup/receivelog.c
+  *-------------------------------------------------------------------------
+  */
+ 
+ /*
+  * We have to use postgres.h not postgres_fe.h here, because there's so much
+  * backend-only stuff in the XLOG include files we need.  But we need a
+  * frontend-ish environment otherwise.	Hence this ugly hack.
+  */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "libpq-fe.h"
+ #include "access/xlog_internal.h"
+ #include "replication/walprotocol.h"
+ #include "utils/datetime.h"
+ 
+ #include "receivelog.h"
+ #include "streamutil.h"
+ 
+ #include <sys/time.h>
+ #include <sys/types.h>
+ #include <unistd.h>
+ 
+ 
+ /* Size of the streaming replication protocol header */
+ #define STREAMING_HEADER_SIZE (1+8+8+8)
+ 
+ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+ 
+ /*
+  * Open a new WAL file in the specified directory. Store the name
+  * (not including the full directory) in namebuf. Assumes there is
+  * enough room in this buffer...
+  */
+ static int
+ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+ {
+ 	int			f;
+ 	char		fn[MAXPGPATH];
+ 
+ 	XLogFileName(namebuf, timeline, startpoint.xlogid,
+ 				 startpoint.xrecoff / XLOG_SEG_SIZE);
+ 
+ 	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+ 	f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+ 	if (f == -1)
+ 		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+ 				progname, namebuf, strerror(errno));
+ 	return f;
+ }
+ 
+ /*
+  * Local version of GetCurrentTimestamp(), since we are not linked with
+  * backend code.
+  */
+ static TimestampTz
+ localGetCurrentTimestamp(void)
+ {
+ 	TimestampTz result;
+ 	struct timeval tp;
+ 
+ 	gettimeofday(&tp, NULL);
+ 
+ 	result = (TimestampTz) tp.tv_sec -
+ 		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+ 
+ #ifdef HAVE_INT64_TIMESTAMP
+ 	result = (result * USECS_PER_SEC) + tp.tv_usec;
+ #else
+ 	result = result + (tp.tv_usec / 1000000.0);
+ #endif
+ 
+ 	return result;
+ }
+ 
+ /*
+  * Receive a log stream starting at the specified position.
+  *
+  * Note: The log position *must* be at a log segment start!
+  */
+ bool
+ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+ {
+ 	char		query[128];
+ 	char		current_walfile_name[MAXPGPATH];
+ 	PGresult   *res;
+ 	char	   *copybuf = NULL;
+ 	int			walfile = -1;
+ 	int64		last_status = -1;
+ 	XLogRecPtr	blockpos = InvalidXLogRecPtr;
+ 
+ 	/* Initiate the replication stream at specified location */
+ 	snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+ 	res = PQexec(conn, query);
+ 	if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ 	{
+ 		fprintf(stderr, _("%s: could not start replication: %s\n"),
+ 				progname, PQresultErrorMessage(res));
+ 		return false;
+ 	}
+ 	PQclear(res);
+ 
+ 	/*
+ 	 * Receive the actual xlog data
+ 	 */
+ 	while (1)
+ 	{
+ 		int			r;
+ 		int			xlogoff;
+ 		int			bytes_left;
+ 		int			bytes_written;
+ 		int64		now;
+ 
+ 		if (copybuf != NULL)
+ 		{
+ 			PQfreemem(copybuf);
+ 			copybuf = NULL;
+ 		}
+ 
+ 		/*
+ 		 * Check if we should continue streaming, or abort at this point.
+ 		 */
+ 		if (stream_continue && stream_continue())
+ 		{
+ 			if (walfile != -1)
+ 			{
+ 				fsync(walfile);
+ 				close(walfile);
+ 			}
+ 			return true;
+ 		}
+ 
+ 		/*
+ 		 * Potentially send a status message to the master
+ 		 */
+ 		now = localGetCurrentTimestamp();
+ 		if (standby_message_timeout > 0 &&
+ 			last_status < now - standby_message_timeout * 1000000)
+ 		{
+ 			/* Time to send feedback! */
+ 			char		replybuf[sizeof(StandbyReplyMessage) + 1];
+ 			StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
+ 
+ 			replymsg->write = blockpos;
+ 			replymsg->flush = InvalidXLogRecPtr;
+ 			replymsg->apply = InvalidXLogRecPtr;
+ 			replymsg->sendTime = now;
+ 			replybuf[0] = 'r';
+ 
+ 			if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
+ 				PQflush(conn))
+ 			{
+ 				fprintf(stderr, _("%s: could not send feedback packet: %s"),
+ 						progname, PQerrorMessage(conn));
+ 				return false;
+ 			}
+ 
+ 			last_status = now;
+ 		}
+ 
+ 		r = PQgetCopyData(conn, &copybuf, 1);
+ 		if (r == 0)
+ 		{
+ 			/*
+ 			 * In async mode, and no data available. We block on reading but
+ 			 * not more than the specified timeout, so that we can send a
+ 			 * response back to the client.
+ 			 */
+ 			fd_set		input_mask;
+ 			struct timeval timeout;
+ 			struct timeval *timeoutptr;
+ 
+ 			FD_ZERO(&input_mask);
+ 			FD_SET(PQsocket(conn), &input_mask);
+ 			if (standby_message_timeout)
+ 			{
+ 				timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+ 				if (timeout.tv_sec <= 0)
+ 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ 				timeout.tv_usec = 0;
+ 				timeoutptr = &timeout;
+ 			}
+ 			else
+ 				timeoutptr = NULL;
+ 
+ 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ 			if (r == 0 || (r < 0 && errno == EINTR))
+ 			{
+ 				/*
+ 				 * Got a timeout or signal. Continue the loop and either
+ 				 * deliver a status packet to the server or just go back into
+ 				 * blocking.
+ 				 */
+ 				continue;
+ 			}
+ 			else if (r < 0)
+ 			{
+ 				fprintf(stderr, _("%s: select() failed: %m\n"), progname);
+ 				return false;
+ 			}
+ 			/* Else there is actually data on the socket */
+ 			if (PQconsumeInput(conn) == 0)
+ 			{
+ 				fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
+ 						progname, PQerrorMessage(conn));
+ 				return false;
+ 			}
+ 			continue;
+ 		}
+ 		if (r == -1)
+ 			/* End of copy stream */
+ 			break;
+ 		if (r == -2)
+ 		{
+ 			fprintf(stderr, _("%s: could not read copy data: %s\n"),
+ 					progname, PQerrorMessage(conn));
+ 			return false;
+ 		}
+ 		if (r < STREAMING_HEADER_SIZE + 1)
+ 		{
+ 			fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ 					progname, r);
+ 			return false;
+ 		}
+ 		if (copybuf[0] != 'w')
+ 		{
+ 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ 					progname, copybuf[0]);
+ 			return false;
+ 		}
+ 
+ 		/* Extract WAL location for this block */
+ 		memcpy(&blockpos, copybuf + 1, 8);
+ 		xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
+ 
+ 		/*
+ 		 * Verify that the initial location in the stream matches where we
+ 		 * think we are.
+ 		 */
+ 		if (walfile == -1)
+ 		{
+ 			/* No file open yet */
+ 			if (xlogoff != 0)
+ 			{
+ 				fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+ 						progname, xlogoff);
+ 				return false;
+ 			}
+ 		}
+ 		else
+ 		{
+ 			/* More data in existing segment */
+ 			/* XXX: store seek value don't reseek all the time */
+ 			if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ 			{
+ 				fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
+ 						progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ 				return false;
+ 			}
+ 		}
+ 
+ 		bytes_left = r - STREAMING_HEADER_SIZE;
+ 		bytes_written = 0;
+ 
+ 		while (bytes_left)
+ 		{
+ 			int			bytes_to_write;
+ 
+ 			/*
+ 			 * If crossing a WAL boundary, only write up until we reach
+ 			 * XLOG_SEG_SIZE.
+ 			 */
+ 			if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ 				bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ 			else
+ 				bytes_to_write = bytes_left;
+ 
+ 			if (walfile == -1)
+ 			{
+ 				walfile = open_walfile(blockpos, timeline,
+ 									   basedir, current_walfile_name);
+ 				if (walfile == -1)
+ 					/* Error logged by open_walfile */
+ 					return false;
+ 			}
+ 
+ 			if (write(walfile,
+ 					  copybuf + STREAMING_HEADER_SIZE + bytes_written,
+ 					  bytes_to_write) != bytes_to_write)
+ 			{
+ 				fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+ 						progname,
+ 						bytes_to_write,
+ 						current_walfile_name,
+ 						strerror(errno));
+ 				return false;
+ 			}
+ 
+ 			/* Write was successful, advance our position */
+ 			bytes_written += bytes_to_write;
+ 			bytes_left -= bytes_to_write;
+ 			XLByteAdvance(blockpos, bytes_to_write);
+ 			xlogoff += bytes_to_write;
+ 
+ 			/* Did we reach the end of a WAL segment? */
+ 			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
+ 			{
+ 				fsync(walfile);
+ 				close(walfile);
+ 				walfile = -1;
+ 				xlogoff = 0;
+ 
+ 				if (segment_finish != NULL)
+ 				{
+ 					/*
+ 					 * Callback when the segment finished, and return if it
+ 					 * told us to.
+ 					 */
+ 					if (segment_finish(blockpos, timeline))
+ 						return true;
+ 				}
+ 			}
+ 		}
+ 		/* No more data left to write, start receiving next copy packet */
+ 	}
+ 
+ 	/*
+ 	 * The only way to get out of the loop is if the server shut down the
+ 	 * replication stream. If it's a controlled shutdown, the server will send
+ 	 * a shutdown message, and we'll return the latest xlog location that has
+ 	 * been streamed.
+ 	 */
+ 
+ 	res = PQgetResult(conn);
+ 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 	{
+ 		fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+ 				progname, PQresultErrorMessage(res));
+ 		return false;
+ 	}
+ 	PQclear(res);
+ 	return true;
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 0 ****
--- 1,21 ----
+ #include "access/xlogdefs.h"
+ 
+ /*
+  * Called whenever a segment is finished, return true to stop
+  * the streaming at this point.
+  */
+ typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+ 
+ /*
+  * Called before trying to read more data. Return true to stop
+  * the streaming at this point.
+  */
+ typedef bool (*stream_continue_callback)(void);
+ 
+ bool ReceiveXlogStream(PGconn *conn,
+ 					   XLogRecPtr startpos,
+ 					   uint32 timeline,
+ 					   char *basedir,
+ 					   segment_finish_callback segment_finish,
+ 					   stream_continue_callback stream_continue,
+ 					   int standby_message_timeout);
*** /dev/null
--- b/src/bin/pg_basebackup/streamutil.c
***************
*** 0 ****
--- 1,165 ----
+ /*-------------------------------------------------------------------------
+  *
+  * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+  *
+  * Author: Magnus Hagander <magnus@hagander.net>
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *		  src/bin/pg_basebackup/streamutil.c
+  *-------------------------------------------------------------------------
+  */
+ 
+ /*
+  * We have to use postgres.h not postgres_fe.h here, because there's so much
+  * backend-only stuff in the XLOG include files we need.  But we need a
+  * frontend-ish environment otherwise.	Hence this ugly hack.
+  */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "streamutil.h"
+ 
+ #include <stdio.h>
+ #include <string.h>
+ 
+ const char *progname;
+ char	   *dbhost = NULL;
+ char	   *dbuser = NULL;
+ char	   *dbport = NULL;
+ int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
+ static char *dbpassword = NULL;
+ PGconn	   *conn = NULL;
+ 
+ /*
+  * strdup() and malloc() replacements that prints an error and exits
+  * if something goes wrong. Can never return NULL.
+  */
+ char *
+ xstrdup(const char *s)
+ {
+ 	char	   *result;
+ 
+ 	result = strdup(s);
+ 	if (!result)
+ 	{
+ 		fprintf(stderr, _("%s: out of memory\n"), progname);
+ 		exit(1);
+ 	}
+ 	return result;
+ }
+ 
+ void *
+ xmalloc0(int size)
+ {
+ 	void	   *result;
+ 
+ 	result = malloc(size);
+ 	if (!result)
+ 	{
+ 		fprintf(stderr, _("%s: out of memory\n"), progname);
+ 		exit(1);
+ 	}
+ 	MemSet(result, 0, size);
+ 	return result;
+ }
+ 
+ 
+ PGconn *
+ GetConnection(void)
+ {
+ 	PGconn	   *tmpconn;
+ 	int			argcount = 4;	/* dbname, replication, fallback_app_name,
+ 								 * password */
+ 	int			i;
+ 	const char **keywords;
+ 	const char **values;
+ 	char	   *password = NULL;
+ 
+ 	if (dbhost)
+ 		argcount++;
+ 	if (dbuser)
+ 		argcount++;
+ 	if (dbport)
+ 		argcount++;
+ 
+ 	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+ 	values = xmalloc0((argcount + 1) * sizeof(*values));
+ 
+ 	keywords[0] = "dbname";
+ 	values[0] = "replication";
+ 	keywords[1] = "replication";
+ 	values[1] = "true";
+ 	keywords[2] = "fallback_application_name";
+ 	values[2] = progname;
+ 	i = 3;
+ 	if (dbhost)
+ 	{
+ 		keywords[i] = "host";
+ 		values[i] = dbhost;
+ 		i++;
+ 	}
+ 	if (dbuser)
+ 	{
+ 		keywords[i] = "user";
+ 		values[i] = dbuser;
+ 		i++;
+ 	}
+ 	if (dbport)
+ 	{
+ 		keywords[i] = "port";
+ 		values[i] = dbport;
+ 		i++;
+ 	}
+ 
+ 	while (true)
+ 	{
+ 		if (password)
+ 			free(password);
+ 
+ 		if (dbpassword)
+ 		{
+ 			/*
+ 			 * We've saved a password when a previous connection succeeded,
+ 			 * meaning this is the call for a second session to the same
+ 			 * database, so just forcibly reuse that password.
+ 			 */
+ 			keywords[argcount - 1] = "password";
+ 			values[argcount - 1] = dbpassword;
+ 			dbgetpassword = -1; /* Don't try again if this fails */
+ 		}
+ 		else if (dbgetpassword == 1)
+ 		{
+ 			password = simple_prompt(_("Password: "), 100, false);
+ 			keywords[argcount - 1] = "password";
+ 			values[argcount - 1] = password;
+ 		}
+ 
+ 		tmpconn = PQconnectdbParams(keywords, values, true);
+ 
+ 		if (PQstatus(tmpconn) == CONNECTION_BAD &&
+ 			PQconnectionNeedsPassword(tmpconn) &&
+ 			dbgetpassword != -1)
+ 		{
+ 			dbgetpassword = 1;	/* ask for password next time */
+ 			PQfinish(tmpconn);
+ 			continue;
+ 		}
+ 
+ 		if (PQstatus(tmpconn) != CONNECTION_OK)
+ 		{
+ 			fprintf(stderr, _("%s: could not connect to server: %s\n"),
+ 					progname, PQerrorMessage(tmpconn));
+ 			exit(1);
+ 		}
+ 
+ 		/* Connection ok! */
+ 		free(values);
+ 		free(keywords);
+ 
+ 		/* Store the password for next run */
+ 		if (password)
+ 			dbpassword = password;
+ 		return tmpconn;
+ 	}
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/streamutil.h
***************
*** 0 ****
--- 1,22 ----
+ #include "libpq-fe.h"
+ 
+ extern const char *progname;
+ extern char *dbhost;
+ extern char *dbuser;
+ extern char *dbport;
+ extern int	dbgetpassword;
+ 
+ /* Connection kept global so we can disconnect easily */
+ extern PGconn *conn;
+ 
+ #define disconnect_and_exit(code)				\
+ 	{											\
+ 	if (conn != NULL) PQfinish(conn);			\
+ 	exit(code);									\
+ 	}
+ 
+ 
+ char	   *xstrdup(const char *s);
+ void	   *xmalloc0(int size);
+ 
+ PGconn	   *GetConnection(void);
*** a/src/tools/msvc/Mkvcbuild.pm
--- b/src/tools/msvc/Mkvcbuild.pm
***************
*** 304,309 **** sub mkvcbuild
--- 304,316 ----
      $initdb->AddLibrary('ws2_32.lib');
  
      my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+     $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+     $pgbasebackup->AddLibrary('ws2_32.lib');
+ 
+     my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+     $pgreceivexlog->{name} = 'pg_receivexlog';
+     $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
+     $pgreceivexlog->AddLibrary('ws2_32.lib');
  
      my $pgconfig = AddSimpleFrontend('pg_config');
  
