Updated version of pg_receivexlog
Here's an updated version of pg_receivexlog, that should now actually
work (it previously failed miserably when a replication record crossed
a WAL file boundary - something which I at the time could not properly
reproduce, but when I restarted my work on it now could easily
reproduce every time :D).
It also contains an update to pg_basebackup that allows it to stream
the transaction log in the background while the backup is running,
thus reducing the need for wal_keep_segments (if the client can keep
up, it should eliminate the need completely).
In doing so, it moves a number of functions from pg_basebackup.c to
the new file streamutil.c, to be shared between both pg_basebackup and
pg_receivexlog.
So far at least, it's completely client-side, with no changes to the
server. This means that it can be dropped into src/bin on 9.1 as well
to get a version that runs there (since we're way way way past feature
freeze and can't actually stick it in there in the official tree)
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Attachments:
pg_receivexlog.difftext/x-patch; charset=US-ASCII; name=pg_receivexlog.diffDownload
*** a/doc/src/sgml/ref/pg_basebackup.sgml
--- b/doc/src/sgml/ref/pg_basebackup.sgml
***************
*** 143,150 **** PostgreSQL documentation
</varlistentry>
<varlistentry>
! <term><option>-x</option></term>
! <term><option>--xlog</option></term>
<listitem>
<para>
Includes the required transaction log files (WAL files) in the
--- 143,150 ----
</varlistentry>
<varlistentry>
! <term><option>-x <replaceable class="parameter">method</replaceable></option></term>
! <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
<listitem>
<para>
Includes the required transaction log files (WAL files) in the
***************
*** 154,169 **** PostgreSQL documentation
to consult the log archive, thus making this a completely standalone
backup.
</para>
! <note>
! <para>
! The transaction log files are collected at the end of the backup.
! Therefore, it is necessary for the
! <xref linkend="guc-wal-keep-segments"> parameter to be set high
! enough that the log is not removed before the end of the backup.
! If the log has been rotated when it's time to transfer it, the
! backup will fail and be unusable.
! </para>
! </note>
</listitem>
</varlistentry>
--- 154,196 ----
to consult the log archive, thus making this a completely standalone
backup.
</para>
! <para>
! The following methods for collecting the transaction logs are
! supported:
!
! <variablelist>
! <varlistentry>
! <term><literal>f</literal></term>
! <term><literal>fetch</literal></term>
! <listitem>
! <para>
! The transaction log files are collected at the end of the backup.
! Therefore, it is necessary for the
! <xref linkend="guc-wal-keep-segments"> parameter to be set high
! enough that the log is not removed before the end of the backup.
! If the log has been rotated when it's time to transfer it, the
! backup will fail and be unusable.
! </para>
! </listitem>
! </varlistentry>
!
! <varlistentry>
! <term><literal>s</literal></term>
! <term><literal>stream</literal></term>
! <listitem>
! <para>
! Stream the transaction log while the backup is created. This will
! open a second connection to the server and start streaming the
! transaction log in parallel while running the backup. Therefore,
! it will use up two slots configured by the
! <xref linkend="guc-max-wal-senders"> parameter. As long as the
! client can keep up with transaction log received, using this mode
! requires no extra transaction logs to be saved on the master.
! </para>
! </listitem>
! </varlistentry>
! </variablelist>
! </para>
</listitem>
</varlistentry>
***************
*** 261,266 **** PostgreSQL documentation
--- 288,307 ----
<variablelist>
<varlistentry>
+ <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+ <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the number of seconds between status packets sent back to the
+ server. This is required when streaming the transaction log (using
+ <literal>--xlog=stream</literal>) if replication timeout is configured
+ on the server, and allows for easier monitoring. The default value is
+ 10 seconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>-h <replaceable class="parameter">host</replaceable></option></term>
<term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
<listitem>
*** /dev/null
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 0 ****
--- 1,270 ----
+ <!--
+ doc/src/sgml/ref/pg_receivexlog.sgml
+ PostgreSQL documentation
+ -->
+
+ <refentry id="app-pgreceivexlog">
+ <refmeta>
+ <refentrytitle>pg_receivexlog</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>pg_receivexlog</refname>
+ <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
+ </refnamediv>
+
+ <indexterm zone="app-pgreceivexlog">
+ <primary>pg_receivexlog</primary>
+ </indexterm>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>pg_receivexlog</command>
+ <arg rep="repeat"><replaceable>option</></arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>
+ Description
+ </title>
+ <para>
+ <application>pg_receivexlog</application> is used to stream transaction log
+ from a running <productname>PostgreSQL</productname> cluster. The transaction
+ log is streamed using the streaming replication protocol, and is written
+ to a local directory of files. This directory can be used as the archive
+ location for doing a restore using point-in-time recovery (see
+ <xref linkend="continuous-archiving">).
+ </para>
+
+ <para>
+ <application>pg_receivexlog</application> streams the transaction
+ log in real time as it's being generated on the server, and does not wait
+ for segments to complete like <xref linkend="guc-archive-command"> does.
+ For this reason, it is not necessary to set
+ <xref linkend="guc-archive-timeout"> when using
+ <application>pg_receivexlog</application>.
+ </para>
+
+ <para>
+ The transaction log is streamed over a regular
+ <productname>PostgreSQL</productname> connection, and uses the
+ replication protocol. The connection must be
+ made with a user having <literal>REPLICATION</literal> permissions (see
+ <xref linkend="role-attributes">), and the user must be granted explicit
+ permissions in <filename>pg_hba.conf</filename>. The server must also
+ be configured with <xref linkend="guc-max-wal-senders"> set high enough
+ to leave at least one session available for the stream.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Options</title>
+
+ <para>
+ The following command-line options control the location and format of the
+ output.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+ <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
+ <listitem>
+ <para>
+ Directory to write the output to.
+ </para>
+ <para>
+ This parameter is required.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ <para>
+ The following command-line options control the running of the program.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-v</option></term>
+ <term><option>--verbose</option></term>
+ <listitem>
+ <para>
+ Enables verbose mode.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+
+ <para>
+ The following command-line options control the database connection parameters.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+ <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the number of seconds between status packets sent back to the
+ server. This is required if replication timeout is configured on the
+ server, and allows for easier monitoring. The default value is
+ 10 seconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
+ <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the host name of the machine on which the server is
+ running. If the value begins with a slash, it is used as the
+ directory for the Unix domain socket. The default is taken
+ from the <envar>PGHOST</envar> environment variable, if set,
+ else a Unix domain socket connection is attempted.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+ <term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the TCP port or local Unix domain socket file
+ extension on which the server is listening for connections.
+ Defaults to the <envar>PGPORT</envar> environment variable, if
+ set, or a compiled-in default.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-U <replaceable>username</replaceable></option></term>
+ <term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
+ <listitem>
+ <para>
+ User name to connect as.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-w</></term>
+ <term><option>--no-password</></term>
+ <listitem>
+ <para>
+ Never issue a password prompt. If the server requires
+ password authentication and a password is not available by
+ other means such as a <filename>.pgpass</filename> file, the
+ connection attempt will fail. This option can be useful in
+ batch jobs and scripts where no user is present to enter a
+ password.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-W</option></term>
+ <term><option>--password</option></term>
+ <listitem>
+ <para>
+ Force <application>pg_basebackup</application> to prompt for a
+ password before connecting to a database.
+ </para>
+
+ <para>
+ This option is never essential, since
+ <application>pg_bsaebackup</application> will automatically prompt
+ for a password if the server demands password authentication.
+ However, <application>pg_basebackup</application> will waste a
+ connection attempt finding out that the server wants a password.
+ In some cases it is worth typing <option>-W</> to avoid the extra
+ connection attempt.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
+ <para>
+ Other, less commonly used, parameters are also available:
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-V</></term>
+ <term><option>--version</></term>
+ <listitem>
+ <para>
+ Print the <application>pg_receivexlog</application> version and exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-?</></term>
+ <term><option>--help</></term>
+ <listitem>
+ <para>
+ Show help about <application>pg_receivexlog</application> command line
+ arguments, and exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Environment</title>
+
+ <para>
+ This utility, like most other <productname>PostgreSQL</> utilities,
+ uses the environment variables supported by <application>libpq</>
+ (see <xref linkend="libpq-envars">).
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Notes</title>
+
+ <para>
+ When using <application>pg_receivexlog</application> instead of
+ <xref linkend="guc-archive-command">, the server will continue to
+ recycle transaction log files even if the backups are not properly
+ archived, since there is no command that fails. This can be worked
+ around by having an <xref linkend="guc-archive-command"> that fails
+ when the file has not been properly archived yet.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ To stream the transaction log from the server at
+ <literal>mydbserver</literal> and store it in the local directory
+ <filename>/usr/local/pgsql/archive</filename>:
+ <screen>
+ <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /usr/local/pgsql/archive</userinput>
+ </screen>
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>See Also</title>
+
+ <simplelist type="inline">
+ <member><xref linkend="APP-PGBASEBACKUP"></member>
+ </simplelist>
+ </refsect1>
+
+ </refentry>
*** a/doc/src/sgml/reference.sgml
--- b/doc/src/sgml/reference.sgml
***************
*** 212,217 ****
--- 212,218 ----
&pgConfig;
&pgDump;
&pgDumpall;
+ &pgReceivexlog;
&pgRestore;
&psqlRef;
&reindexdb;
*** a/src/bin/pg_basebackup/.gitignore
--- b/src/bin/pg_basebackup/.gitignore
***************
*** 1 ****
--- 1,2 ----
/pg_basebackup
+ /pg_receivexlog
*** a/src/bin/pg_basebackup/Makefile
--- b/src/bin/pg_basebackup/Makefile
***************
*** 18,38 **** include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
! OBJS= pg_basebackup.o $(WIN32RES)
! all: pg_basebackup
! pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
! $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
clean distclean maintainer-clean:
! rm -f pg_basebackup$(X) $(OBJS)
--- 18,43 ----
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
! OBJS=receivelog.o streamutil.o $(WIN32RES)
! all: pg_basebackup pg_receivexlog
! pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
! $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
!
! pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
! $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
clean distclean maintainer-clean:
! rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 11,22 ****
*-------------------------------------------------------------------------
*/
! #include "postgres_fe.h"
#include "libpq-fe.h"
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
--- 11,30 ----
*-------------------------------------------------------------------------
*/
! /*
! * We have to use postgres.h not postgres_fe.h here, because there's so much
! * backend-only stuff in the XLOG include files we need. But we need a
! * frontend-ish environment otherwise. Hence this ugly hack.
! */
! #define FRONTEND 1
! #include "postgres.h"
#include "libpq-fe.h"
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
+ #include <sys/types.h>
+ #include <sys/wait.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
***************
*** 24,32 ****
#include "getopt_long.h"
/* Global options */
- static const char *progname;
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
--- 32,42 ----
#include "getopt_long.h"
+ #include "receivelog.h"
+ #include "streamutil.h"
+
/* Global options */
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
***************
*** 34,71 **** bool showprogress = false;
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
bool fastcheckpoint = false;
! char *dbhost = NULL;
! char *dbuser = NULL;
! char *dbport = NULL;
! int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
! /* Connection kept global so we can disconnect easily */
! static PGconn *conn = NULL;
! #define disconnect_and_exit(code) \
! { \
! if (conn != NULL) PQfinish(conn); \
! exit(code); \
! }
/* Function headers */
- static char *xstrdup(const char *s);
- static void *xmalloc0(int size);
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
- static PGconn *GetConnection(void);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
--- 44,81 ----
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
+ bool streamwal = false;
bool fastcheckpoint = false;
! int standby_message_timeout = 10; /* 10 sec = default */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
! /* Pipe to communicate with background wal receiver process */
! #ifndef WIN32
! static int bgpipe[2] = {-1, -1};
! #endif
! /* Handle to child process */
! static pid_t bgchild = -1;
!
! /* End position for xlog streaming, empty string if unknown yet */
! static XLogRecPtr xlogendptr;
! static int has_xlogendptr = 0;
/* Function headers */
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
+ static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
***************
*** 81,119 **** get_gz_error(gzFile *gzf)
}
#endif
- /*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
- static char *
- xstrdup(const char *s)
- {
- char *result;
-
- result = strdup(s);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- return result;
- }
-
- static void *
- xmalloc0(int size)
- {
- void *result;
-
- result = malloc(size);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- MemSet(result, 0, size);
- return result;
- }
-
static void
usage(void)
--- 91,96 ----
***************
*** 125,131 **** usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
! printf(_(" -x, --xlog include required WAL files in backup\n"));
printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n"));
printf(_("\nGeneral options:\n"));
--- 102,108 ----
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
! printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n"));
printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n"));
printf(_("\nGeneral options:\n"));
***************
*** 137,142 **** usage(void)
--- 114,120 ----
printf(_(" --help show this help, then exit\n"));
printf(_(" --version output version information, then exit\n"));
printf(_("\nConnection options:\n"));
+ printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
***************
*** 147,152 **** usage(void)
--- 125,321 ----
/*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received.
+ * On Unix, we check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ * On Windows, we are in a single process, so we can just check if it's
+ * time to stop.
+ */
+ static bool
+ segment_callback(XLogRecPtr segendpos, uint32 timeline)
+ {
+ if (!has_xlogendptr)
+ {
+ #ifndef WIN32
+ fd_set fds;
+ struct timeval tv;
+ int r;
+
+ /*
+ * Don't have the end pointer yet - check our pipe to see if it has
+ * been sent yet.
+ */
+ FD_ZERO(&fds);
+ FD_SET(bgpipe[0], &fds);
+
+ MemSet(&tv, 0, sizeof(tv));
+
+ r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+ if (r == 1)
+ {
+ char xlogend[64];
+
+ MemSet(xlogend, 0, sizeof(xlogend));
+ r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+ if (r < 0)
+ {
+ fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+ progname, strerror(errno));
+ exit(1);
+ }
+
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ has_xlogendptr = 1;
+
+ /*
+ * Fall through to check if we've reached the point further
+ * already.
+ */
+ }
+ else
+ {
+ /*
+ * No data received on the pipe means we don't know the end
+ * position yet - so just say it's not time to stop yet.
+ */
+ return false;
+ }
+ #else
+
+ /*
+ * On win32, has_xlogendptr is set by the main thread, so if it's not
+ * set here, we just go back and wait until it shows up.
+ */
+ return false;
+ #endif
+ }
+
+ /*
+ * At this point we have an end pointer, so compare it to the current
+ * position to figure out if it's time to stop.
+ */
+ if (segendpos.xlogid > xlogendptr.xlogid ||
+ (segendpos.xlogid == xlogendptr.xlogid &&
+ segendpos.xrecoff >= xlogendptr.xrecoff))
+ return true;
+
+ /*
+ * Have end pointer, but haven't reached it yet - so tell the caller to
+ * keep streaming.
+ */
+ return false;
+ }
+
+ typedef struct
+ {
+ PGconn *bgconn;
+ XLogRecPtr startptr;
+ char xlogdir[MAXPGPATH];
+ int timeline;
+ } logstreamer_param;
+
+ static int
+ LogStreamerMain(logstreamer_param * param)
+ {
+ if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
+ param->xlogdir, segment_callback, NULL,
+ standby_message_timeout))
+
+ /*
+ * Any errors will already have been reported in the function process,
+ * but we need to tell the parent that we didn't shutdown in a nice
+ * way.
+ */
+ return 1;
+
+ PQfinish(param->bgconn);
+ return 0;
+ }
+
+ /*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use its own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+ static void
+ StartLogStreamer(char *startpos, uint32 timeline)
+ {
+ logstreamer_param *param;
+
+ param = xmalloc0(sizeof(logstreamer_param));
+ param->timeline = timeline;
+
+ /* Convert the starting position */
+ if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+ progname, startpos);
+ disconnect_and_exit(1);
+ }
+ /* Round off to even segment position */
+ param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+
+ #ifndef WIN32
+ /* Create our background pipe */
+ if (pgpipe(bgpipe) < 0)
+ {
+ fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ #endif
+
+ /* Get a second connection */
+ param->bgconn = GetConnection();
+
+ /*
+ * Always in plain format, so we can write to basedir/pg_xlog. But the
+ * directory entry in the tar file may arrive later, so make sure it's
+ * created before we start.
+ */
+ snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+ verify_dir_is_empty_or_create(param->xlogdir);
+
+ /*
+ * Start a child process and tell it to start streaming. On Unix, this is
+ * a fork(). On Windows, we create a thread.
+ */
+ #ifndef WIN32
+ bgchild = fork();
+ if (bgchild == 0)
+ {
+ /* in child process */
+ exit(LogStreamerMain(param));
+ }
+ else if (bgchild < 0)
+ {
+ fprintf(stderr, _("%s: could not create background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /*
+ * Else we are in the parent process and all is well.
+ */
+ #else /* WIN32 */
+ bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+ if (bgchild == 0)
+ {
+ fprintf(stderr, _("%s: could not create background thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ #endif
+ }
+
+ /*
* Verify that the given directory exists and is empty. If it does not
* exist, it is created. If it exists but is not empty, an error will
* be give and the process ended.
***************
*** 492,502 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
strcpy(current_path, PQgetvalue(res, rownum, 1));
/*
- * Make sure we're unpacking into an empty directory
- */
- verify_dir_is_empty_or_create(current_path);
-
- /*
* Get the COPY data
*/
res = PQgetResult(conn);
--- 661,666 ----
***************
*** 586,598 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/*
* Directory
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(filename, S_IRWXU) != 0)
{
! fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
! progname, filename, strerror(errno));
! disconnect_and_exit(1);
}
#ifndef WIN32
if (chmod(filename, (mode_t) filemode))
--- 750,770 ----
/*
* Directory
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(filename, S_IRWXU) != 0)
{
! /*
! * When streaming WAL, pg_xlog will have been created
! * by the wal receiver process, so just ignore failure
! * on that.
! */
! if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
! {
! fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
! progname, filename, strerror(errno));
! disconnect_and_exit(1);
! }
}
#ifndef WIN32
if (chmod(filename, (mode_t) filemode))
***************
*** 605,616 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/*
* Symbolic link
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (symlink(©buf[157], filename) != 0)
{
fprintf(stderr,
_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
! progname, filename, ©buf[157], strerror(errno));
disconnect_and_exit(1);
}
}
--- 777,788 ----
/*
* Symbolic link
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (symlink(©buf[157], filename) != 0)
{
fprintf(stderr,
_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
! progname, filename, ©buf[157], strerror(errno));
disconnect_and_exit(1);
}
}
***************
*** 703,796 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
}
- static PGconn *
- GetConnection(void)
- {
- PGconn *tmpconn;
- int argcount = 4; /* dbname, replication, fallback_app_name,
- * password */
- int i;
- const char **keywords;
- const char **values;
- char *password = NULL;
-
- if (dbhost)
- argcount++;
- if (dbuser)
- argcount++;
- if (dbport)
- argcount++;
-
- keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
- values = xmalloc0((argcount + 1) * sizeof(*values));
-
- keywords[0] = "dbname";
- values[0] = "replication";
- keywords[1] = "replication";
- values[1] = "true";
- keywords[2] = "fallback_application_name";
- values[2] = progname;
- i = 3;
- if (dbhost)
- {
- keywords[i] = "host";
- values[i] = dbhost;
- i++;
- }
- if (dbuser)
- {
- keywords[i] = "user";
- values[i] = dbuser;
- i++;
- }
- if (dbport)
- {
- keywords[i] = "port";
- values[i] = dbport;
- i++;
- }
-
- while (true)
- {
- if (dbgetpassword == 1)
- {
- /* Prompt for a password */
- password = simple_prompt(_("Password: "), 100, false);
- keywords[argcount - 1] = "password";
- values[argcount - 1] = password;
- }
-
- tmpconn = PQconnectdbParams(keywords, values, true);
- if (password)
- free(password);
-
- if (PQstatus(tmpconn) == CONNECTION_BAD &&
- PQconnectionNeedsPassword(tmpconn) &&
- dbgetpassword != -1)
- {
- dbgetpassword = 1; /* ask for password next time */
- PQfinish(tmpconn);
- continue;
- }
-
- if (PQstatus(tmpconn) != CONNECTION_OK)
- {
- fprintf(stderr, _("%s: could not connect to server: %s"),
- progname, PQerrorMessage(tmpconn));
- exit(1);
- }
-
- /* Connection ok! */
- free(values);
- free(keywords);
- return tmpconn;
- }
- }
-
static void
BaseBackup(void)
{
PGresult *res;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
--- 875,885 ----
}
static void
BaseBackup(void)
{
PGresult *res;
+ uint32 timeline;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
***************
*** 803,815 **** BaseBackup(void)
conn = GetConnection();
/*
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
! includewal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
--- 892,923 ----
conn = GetConnection();
/*
+ * Run IDENFITY_SYSTEM so we can get the timeline
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ PQclear(res);
+
+ /*
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
! includewal && !streamwal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
***************
*** 888,893 **** BaseBackup(void)
--- 996,1013 ----
}
/*
+ * If we're streaming WAL, start the streaming session before we start
+ * receiving the actual data chunks.
+ */
+ if (streamwal)
+ {
+ if (verbose)
+ fprintf(stderr, _("%s: starting background WAL receiver\n"),
+ progname);
+ StartLogStreamer(xlogstart, timeline);
+ }
+
+ /*
* Start receiving chunks
*/
for (i = 0; i < PQntuples(res); i++)
***************
*** 934,939 **** BaseBackup(void)
--- 1054,1145 ----
disconnect_and_exit(1);
}
+ if (bgchild > 0)
+ {
+ int status;
+
+ #ifndef WIN32
+ int r;
+ #endif
+
+ if (verbose)
+ fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+
+ #ifndef WIN32
+ if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+ {
+ fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Just wait for the background process to exit */
+ r = waitpid(bgchild, &status, 0);
+ if (r == -1)
+ {
+ fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (r != bgchild)
+ {
+ fprintf(stderr, _("%s: child %i died, expected %i\n"),
+ progname, r, bgchild);
+ disconnect_and_exit(1);
+ }
+ if (!WIFEXITED(status))
+ {
+ fprintf(stderr, _("%s: child process did not exit normally\n"),
+ progname);
+ disconnect_and_exit(1);
+ }
+ if (WEXITSTATUS(status) != 0)
+ {
+ fprintf(stderr, _("%s: child process exited with error %i\n"),
+ progname, WEXITSTATUS(status));
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy! */
+ #else /* WIN32 */
+
+ /*
+ * On Windows, since we are in the same process, we can just store the
+ * value directly in the variable, and then set the flag that says
+ * it's there.
+ */
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ InterlockedIncrement(&has_xlogendptr);
+
+ /* First wait for the thread to exit */
+ if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (status != 0)
+ {
+ fprintf(stderr, _("%s: child thread exited with error %u\n"),
+ progname, status);
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy */
+ #endif
+ }
+
/*
* End of copy data. Final result is already checked inside the loop.
*/
***************
*** 953,959 **** main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
! {"xlog", no_argument, NULL, 'x'},
{"gzip", no_argument, NULL, 'z'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
--- 1159,1165 ----
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
! {"xlog", required_argument, NULL, 'x'},
{"gzip", no_argument, NULL, 'z'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
***************
*** 962,967 **** main(int argc, char **argv)
--- 1168,1174 ----
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
+ {"statusint", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{"progress", no_argument, NULL, 'P'},
{NULL, 0, NULL, 0}
***************
*** 988,994 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
--- 1195,1201 ----
}
}
! while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 1010,1015 **** main(int argc, char **argv)
--- 1217,1234 ----
break;
case 'x':
includewal = true;
+ if (strcmp(optarg, "f") == 0 ||
+ strcmp(optarg, "fetch") == 0)
+ streamwal = false;
+ else if (strcmp(optarg, "s") == 0 ||
+ strcmp(optarg, "stream") == 0)
+ streamwal = true;
+ else
+ {
+ fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"),
+ progname, optarg);
+ exit(1);
+ }
break;
case 'l':
label = xstrdup(optarg);
***************
*** 1057,1062 **** main(int argc, char **argv)
--- 1276,1290 ----
case 'W':
dbgetpassword = 1;
break;
+ case 's':
+ standby_message_timeout = atoi(optarg);
+ if (standby_message_timeout < 0)
+ {
+ fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
***************
*** 1111,1116 **** main(int argc, char **argv)
--- 1339,1354 ----
exit(1);
}
+ if (format != 'p' && streamwal)
+ {
+ fprintf(stderr,
+ _("%s: wal streaming can only be used in plain mode\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
*** /dev/null
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 0 ****
--- 1,440 ----
+ /*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ * to a local file.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+ /*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "libpq-fe.h"
+ #include "libpq/pqsignal.h"
+ #include "access/xlog_internal.h"
+
+ #include "receivelog.h"
+ #include "streamutil.h"
+
+ #include <dirent.h>
+ #include <sys/stat.h>
+ #include <sys/types.h>
+ #include <unistd.h>
+
+ #include "getopt_long.h"
+
+ /* Global options */
+ char *basedir = NULL;
+ int verbose = 0;
+ int standby_message_timeout = 10; /* 10 sec = default */
+ volatile bool time_to_abort = false;
+
+
+ static void usage(void);
+ static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+ static void StreamLog();
+ static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+ static void
+ usage(void)
+ {
+ printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+ progname);
+ printf(_("Usage:\n"));
+ printf(_(" %s [OPTION]...\n"), progname);
+ printf(_("\nOptions controlling the output:\n"));
+ printf(_(" -D, --dir=directory receive xlog files into this directory\n"));
+ printf(_("\nGeneral options:\n"));
+ printf(_(" -v, --verbose output verbose messages\n"));
+ printf(_(" -?, --help show this help, then exit\n"));
+ printf(_(" -V, --version output version information, then exit\n"));
+ printf(_("\nConnection options:\n"));
+ printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
+ printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
+ printf(_(" -p, --port=PORT database server port number\n"));
+ printf(_(" -U, --username=NAME connect as specified database user\n"));
+ printf(_(" -w, --no-password never prompt for password\n"));
+ printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+ }
+
+ static bool
+ segment_callback(XLogRecPtr segendpos, uint32 timeline)
+ {
+ char fn[MAXPGPATH];
+ struct stat statbuf;
+
+ if (verbose)
+ fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+ progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+ /*
+ * Check if there is a partial file for the name we just finished, and if
+ * there is, remove it under the assumption that we have now got all the
+ * data we need.
+ */
+ PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+ snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+ basedir, timeline,
+ segendpos.xlogid,
+ segendpos.xrecoff / XLOG_SEG_SIZE);
+ if (stat(fn, &statbuf) == 0)
+ {
+ /* File existed, get rid of it */
+ if (verbose)
+ fprintf(stderr, _("%s: removing file \"%s\"\n"),
+ progname, fn);
+ unlink(fn);
+ }
+
+ /*
+ * Never abort from this - we handle all aborting in continue_streaming()
+ */
+ return false;
+ }
+
+ static bool
+ continue_streaming()
+ {
+ if (time_to_abort)
+ {
+ fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
+ progname);
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ * we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ * WAL segment.
+ */
+ static XLogRecPtr
+ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+ {
+ DIR *dir;
+ struct dirent *dirent;
+ int i;
+ bool b;
+ XLogRecPtr high = {0, 0};
+
+ dir = opendir(basedir);
+ if (dir == NULL)
+ {
+ fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+ progname, basedir, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ while ((dirent = readdir(dir)) != NULL)
+ {
+ char fullpath[MAXPGPATH];
+ struct stat statbuf;
+ uint32 tli,
+ log,
+ seg;
+
+ if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+ continue;
+
+ /* xlog files are always 24 characters */
+ if (strlen(dirent->d_name) != 24)
+ continue;
+
+ /* Filenames are always made out of 0-9 and A-F */
+ b = false;
+ for (i = 0; i < 24; i++)
+ {
+ if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+ !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+ {
+ b = true;
+ break;
+ }
+ }
+ if (b)
+ continue;
+
+ /*
+ * Looks like an xlog file. Parse it's position.
+ */
+ if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+ {
+ fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+ progname, dirent->d_name);
+ disconnect_and_exit(1);
+ }
+ log *= XLOG_SEG_SIZE;
+
+ /* Ignore any files that are for another timeline */
+ if (tli != currenttimeline)
+ continue;
+
+ /* Check if this is a completed segment or not */
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+ if (stat(fullpath, &statbuf) != 0)
+ {
+ fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ if (statbuf.st_size == 16 * 1024 * 1024)
+ {
+ /* Completed segment */
+ if (log > high.xlogid ||
+ (log == high.xlogid && seg > high.xrecoff))
+ {
+ high.xlogid = log;
+ high.xrecoff = seg;
+ continue;
+ }
+ }
+ else
+ {
+ /*
+ * This is a partial file. Rename it out of the way.
+ */
+ char newfn[MAXPGPATH];
+
+ fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+ progname, dirent->d_name, dirent->d_name);
+
+ snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+ basedir, dirent->d_name);
+
+ if (stat(newfn, &statbuf) == 0)
+ {
+ fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+ progname, newfn);
+ disconnect_and_exit(1);
+ }
+ if (rename(fullpath, newfn) != 0)
+ {
+ fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+ progname, fullpath, newfn, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Don't continue looking for more, we assume this is the last */
+ break;
+ }
+ }
+
+ closedir(dir);
+
+ if (high.xlogid > 0 && high.xrecoff > 0)
+ return high;
+
+ return currentpos;
+ }
+
+ /*
+ * Start the log streaming
+ */
+ static void
+ StreamLog(void)
+ {
+ PGresult *res;
+ uint32 timeline;
+ XLogRecPtr startpos;
+
+ /*
+ * Connect in replication mode to the server
+ */
+ conn = GetConnection();
+
+ /*
+ * Run IDENFITY_SYSTEM so we can get the timeline and current xlog
+ * position.
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ disconnect_and_exit(1);
+ }
+ PQclear(res);
+
+ /*
+ * Figure out where to start streaming.
+ */
+ startpos = FindStreamingStart(startpos, timeline);
+
+ /*
+ * Always start streaming at the beginning of a segment
+ */
+ startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Start the replication
+ */
+ if (verbose)
+ fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+ progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+ ReceiveXlogStream(conn, startpos, timeline, basedir,
+ segment_callback, continue_streaming,
+ standby_message_timeout);
+ }
+
+ /*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+ static void
+ sigint_handler(int signum)
+ {
+ time_to_abort = true;
+ }
+
+ int
+ main(int argc, char **argv)
+ {
+ static struct option long_options[] = {
+ {"help", no_argument, NULL, '?'},
+ {"version", no_argument, NULL, 'V'},
+ {"dir", required_argument, NULL, 'D'},
+ {"host", required_argument, NULL, 'h'},
+ {"port", required_argument, NULL, 'p'},
+ {"username", required_argument, NULL, 'U'},
+ {"no-password", no_argument, NULL, 'w'},
+ {"password", no_argument, NULL, 'W'},
+ {"statusint", required_argument, NULL, 's'},
+ {"verbose", no_argument, NULL, 'v'},
+ {NULL, 0, NULL, 0}
+ };
+ int c;
+
+ int option_index;
+
+ progname = get_progname(argv[0]);
+ set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage();
+ exit(0);
+ }
+ else if (strcmp(argv[1], "-V") == 0
+ || strcmp(argv[1], "--version") == 0)
+ {
+ puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
+ while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv",
+ long_options, &option_index)) != -1)
+ {
+ switch (c)
+ {
+ case 'D':
+ basedir = xstrdup(optarg);
+ break;
+ case 'h':
+ dbhost = xstrdup(optarg);
+ break;
+ case 'p':
+ if (atoi(optarg) <= 0)
+ {
+ fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ dbport = xstrdup(optarg);
+ break;
+ case 'U':
+ dbuser = xstrdup(optarg);
+ break;
+ case 'w':
+ dbgetpassword = -1;
+ break;
+ case 'W':
+ dbgetpassword = 1;
+ break;
+ case 's':
+ standby_message_timeout = atoi(optarg);
+ if (standby_message_timeout < 0)
+ {
+ fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
+ case 'v':
+ verbose++;
+ break;
+ default:
+
+ /*
+ * getopt_long already emitted a complaint
+ */
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+ }
+
+ /*
+ * Any non-option arguments?
+ */
+ if (optind < argc)
+ {
+ fprintf(stderr,
+ _("%s: too many command-line arguments (first is \"%s\")\n"),
+ progname, argv[optind]);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ /*
+ * Required arguments
+ */
+ if (basedir == NULL)
+ {
+ fprintf(stderr, _("%s: no target directory specified\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ #ifndef WIN32
+ pqsignal(SIGINT, sigint_handler);
+ #endif
+
+ StreamLog();
+
+ exit(0);
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 0 ****
--- 1,353 ----
+ /*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ * replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+ /*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "libpq-fe.h"
+ #include "access/xlog_internal.h"
+ #include "replication/walprotocol.h"
+ #include "utils/datetime.h"
+
+ #include "receivelog.h"
+ #include "streamutil.h"
+
+ #include <sys/time.h>
+ #include <sys/types.h>
+ #include <unistd.h>
+
+
+ /* Size of the streaming replication protocol header */
+ #define STREAMING_HEADER_SIZE (1+8+8+8)
+
+ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+
+ /*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+ static int
+ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+ {
+ int f;
+ char fn[MAXPGPATH];
+
+ XLogFileName(namebuf, timeline, startpoint.xlogid,
+ startpoint.xrecoff / XLOG_SEG_SIZE);
+
+ snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+ f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+ if (f == -1)
+ fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+ progname, namebuf, strerror(errno));
+ return f;
+ }
+
+ /*
+ * Local version of GetCurrentTimestamp(), since we are not linked with
+ * backend code.
+ */
+ static TimestampTz
+ localGetCurrentTimestamp(void)
+ {
+ TimestampTz result;
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+
+ result = (TimestampTz) tp.tv_sec -
+ ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+ #ifdef HAVE_INT64_TIMESTAMP
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+ #else
+ result = result + (tp.tv_usec / 1000000.0);
+ #endif
+
+ return result;
+ }
+
+ /*
+ * Receive a log stream starting at the specified position.
+ *
+ * Note: The log position *must* be at a log segment start!
+ */
+ bool
+ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+ {
+ char query[128];
+ char current_walfile_name[MAXPGPATH];
+ PGresult *res;
+ char *copybuf = NULL;
+ int walfile = -1;
+ int64 last_status = -1;
+ XLogRecPtr blockpos = InvalidXLogRecPtr;
+
+ /* Initiate the replication stream at specified location */
+ snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ {
+ fprintf(stderr, _("%s: could not start replication: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+
+ /*
+ * Receive the actual xlog data
+ */
+ while (1)
+ {
+ int r;
+ int xlogoff;
+ int bytes_left;
+ int bytes_written;
+ int64 now;
+
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
+ /*
+ * Check if we should continue streaming, or abort at this point.
+ */
+ if (stream_continue && stream_continue())
+ {
+ if (walfile != -1)
+ {
+ fsync(walfile);
+ close(walfile);
+ }
+ return true;
+ }
+
+ /*
+ * Potentially send a status message to the master
+ */
+ now = localGetCurrentTimestamp();
+ if (standby_message_timeout > 0 &&
+ last_status < now - standby_message_timeout * 1000000)
+ {
+ /* Time to send feedback! */
+ char replybuf[sizeof(StandbyReplyMessage) + 1];
+ StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
+
+ replymsg->write = blockpos;
+ replymsg->flush = InvalidXLogRecPtr;
+ replymsg->apply = InvalidXLogRecPtr;
+ replymsg->sendTime = now;
+ replybuf[0] = 'r';
+
+ if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
+ PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send feedback packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+
+ last_status = now;
+ }
+
+ r = PQgetCopyData(conn, ©buf, 1);
+ if (r == 0)
+ {
+ /*
+ * In async mode, and no data available. We block on reading but
+ * not more than the specified timeout, so that we can send a
+ * response back to the client.
+ */
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *timeoutptr;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+ if (standby_message_timeout)
+ {
+ timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+ if (timeout.tv_sec <= 0)
+ timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ timeout.tv_usec = 0;
+ timeoutptr = &timeout;
+ }
+ else
+ timeoutptr = NULL;
+
+ r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ if (r == 0 || (r < 0 && errno == EINTR))
+ {
+ /*
+ * Got a timeout or signal. Continue the loop and either
+ * deliver a status packet to the server or just go back into
+ * blocking.
+ */
+ continue;
+ }
+ else if (r < 0)
+ {
+ fprintf(stderr, _("%s: select() failed: %m\n"), progname);
+ return false;
+ }
+ /* Else there is actually data on the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ continue;
+ }
+ if (r == -1)
+ /* End of copy stream */
+ break;
+ if (r == -2)
+ {
+ fprintf(stderr, _("%s: could not read copy data: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ if (r < STREAMING_HEADER_SIZE + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ progname, r);
+ return false;
+ }
+ if (copybuf[0] != 'w')
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
+ return false;
+ }
+
+ /* Extract WAL location for this block */
+ memcpy(&blockpos, copybuf + 1, 8);
+ xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Verify that the initial location in the stream matches where we
+ * think we are.
+ */
+ if (walfile == -1)
+ {
+ /* No file open yet */
+ if (xlogoff != 0)
+ {
+ fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+ progname, xlogoff);
+ return false;
+ }
+ }
+ else
+ {
+ /* More data in existing segment */
+ /* XXX: store seek value don't reseek all the time */
+ if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ {
+ fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ return false;
+ }
+ }
+
+ bytes_left = r - STREAMING_HEADER_SIZE;
+ bytes_written = 0;
+
+ while (bytes_left)
+ {
+ int bytes_to_write;
+
+ /*
+ * If crossing a WAL boundary, only write up until we reach
+ * XLOG_SEG_SIZE.
+ */
+ if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ else
+ bytes_to_write = bytes_left;
+
+ if (walfile == -1)
+ {
+ walfile = open_walfile(blockpos, timeline,
+ basedir, current_walfile_name);
+ if (walfile == -1)
+ /* Error logged by open_walfile */
+ return false;
+ }
+
+ if (write(walfile,
+ copybuf + STREAMING_HEADER_SIZE + bytes_written,
+ bytes_to_write) != bytes_to_write)
+ {
+ fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+ progname,
+ bytes_to_write,
+ current_walfile_name,
+ strerror(errno));
+ return false;
+ }
+
+ /* Write was successful, advance our position */
+ bytes_written += bytes_to_write;
+ bytes_left -= bytes_to_write;
+ XLByteAdvance(blockpos, bytes_to_write);
+ xlogoff += bytes_to_write;
+
+ /* Did we reach the end of a WAL segment? */
+ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
+ {
+ fsync(walfile);
+ close(walfile);
+ walfile = -1;
+ xlogoff = 0;
+
+ if (segment_finish != NULL)
+ {
+ /*
+ * Callback when the segment finished, and return if it
+ * told us to.
+ */
+ if (segment_finish(blockpos, timeline))
+ return true;
+ }
+ }
+ }
+ /* No more data left to write, start receiving next copy packet */
+ }
+
+ /*
+ * The only way to get out of the loop is if the server shut down the
+ * replication stream. If it's a controlled shutdown, the server will send
+ * a shutdown message, and we'll return the latest xlog location that has
+ * been streamed.
+ */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+ return true;
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 0 ****
--- 1,21 ----
+ #include "access/xlogdefs.h"
+
+ /*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+ typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+ /*
+ * Called before trying to read more data. Return true to stop
+ * the streaming at this point.
+ */
+ typedef bool (*stream_continue_callback)(void);
+
+ bool ReceiveXlogStream(PGconn *conn,
+ XLogRecPtr startpos,
+ uint32 timeline,
+ char *basedir,
+ segment_finish_callback segment_finish,
+ stream_continue_callback stream_continue,
+ int standby_message_timeout);
*** /dev/null
--- b/src/bin/pg_basebackup/streamutil.c
***************
*** 0 ****
--- 1,165 ----
+ /*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+ /*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "streamutil.h"
+
+ #include <stdio.h>
+ #include <string.h>
+
+ const char *progname;
+ char *dbhost = NULL;
+ char *dbuser = NULL;
+ char *dbport = NULL;
+ int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
+ static char *dbpassword = NULL;
+ PGconn *conn = NULL;
+
+ /*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+ char *
+ xstrdup(const char *s)
+ {
+ char *result;
+
+ result = strdup(s);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ return result;
+ }
+
+ void *
+ xmalloc0(int size)
+ {
+ void *result;
+
+ result = malloc(size);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ MemSet(result, 0, size);
+ return result;
+ }
+
+
+ PGconn *
+ GetConnection(void)
+ {
+ PGconn *tmpconn;
+ int argcount = 4; /* dbname, replication, fallback_app_name,
+ * password */
+ int i;
+ const char **keywords;
+ const char **values;
+ char *password = NULL;
+
+ if (dbhost)
+ argcount++;
+ if (dbuser)
+ argcount++;
+ if (dbport)
+ argcount++;
+
+ keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+ values = xmalloc0((argcount + 1) * sizeof(*values));
+
+ keywords[0] = "dbname";
+ values[0] = "replication";
+ keywords[1] = "replication";
+ values[1] = "true";
+ keywords[2] = "fallback_application_name";
+ values[2] = progname;
+ i = 3;
+ if (dbhost)
+ {
+ keywords[i] = "host";
+ values[i] = dbhost;
+ i++;
+ }
+ if (dbuser)
+ {
+ keywords[i] = "user";
+ values[i] = dbuser;
+ i++;
+ }
+ if (dbport)
+ {
+ keywords[i] = "port";
+ values[i] = dbport;
+ i++;
+ }
+
+ while (true)
+ {
+ if (password)
+ free(password);
+
+ if (dbpassword)
+ {
+ /*
+ * We've saved a password when a previous connection succeeded,
+ * meaning this is the call for a second session to the same
+ * database, so just forcibly reuse that password.
+ */
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = dbpassword;
+ dbgetpassword = -1; /* Don't try again if this fails */
+ }
+ else if (dbgetpassword == 1)
+ {
+ password = simple_prompt(_("Password: "), 100, false);
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = password;
+ }
+
+ tmpconn = PQconnectdbParams(keywords, values, true);
+
+ if (PQstatus(tmpconn) == CONNECTION_BAD &&
+ PQconnectionNeedsPassword(tmpconn) &&
+ dbgetpassword != -1)
+ {
+ dbgetpassword = 1; /* ask for password next time */
+ PQfinish(tmpconn);
+ continue;
+ }
+
+ if (PQstatus(tmpconn) != CONNECTION_OK)
+ {
+ fprintf(stderr, _("%s: could not connect to server: %s\n"),
+ progname, PQerrorMessage(tmpconn));
+ exit(1);
+ }
+
+ /* Connection ok! */
+ free(values);
+ free(keywords);
+
+ /* Store the password for next run */
+ if (password)
+ dbpassword = password;
+ return tmpconn;
+ }
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/streamutil.h
***************
*** 0 ****
--- 1,22 ----
+ #include "libpq-fe.h"
+
+ extern const char *progname;
+ extern char *dbhost;
+ extern char *dbuser;
+ extern char *dbport;
+ extern int dbgetpassword;
+
+ /* Connection kept global so we can disconnect easily */
+ extern PGconn *conn;
+
+ #define disconnect_and_exit(code) \
+ { \
+ if (conn != NULL) PQfinish(conn); \
+ exit(code); \
+ }
+
+
+ char *xstrdup(const char *s);
+ void *xmalloc0(int size);
+
+ PGconn *GetConnection(void);
*** a/src/tools/msvc/Mkvcbuild.pm
--- b/src/tools/msvc/Mkvcbuild.pm
***************
*** 304,309 **** sub mkvcbuild
--- 304,316 ----
$initdb->AddLibrary('ws2_32.lib');
my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+ $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+ $pgbasebackup->AddLibrary('ws2_32.lib');
+
+ my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+ $pgreceivexlog->{name} = 'pg_receivexlog';
+ $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
+ $pgreceivexlog->AddLibrary('ws2_32.lib');
my $pgconfig = AddSimpleFrontend('pg_config');
On Tue, Aug 16, 2011 at 9:32 AM, Magnus Hagander <magnus@hagander.net> wrote:
Here's an updated version of pg_receivexlog, that should now actually
work (it previously failed miserably when a replication record crossed
a WAL file boundary - something which I at the time could not properly
reproduce, but when I restarted my work on it now could easily
reproduce every time :D).It also contains an update to pg_basebackup that allows it to stream
the transaction log in the background while the backup is running,
thus reducing the need for wal_keep_segments (if the client can keep
up, it should eliminate the need completely).
reviewing this...
i found useful pg_receivexlog as an independent utility, but i'm not
so sure about the ability to call it from pg_basebackup via --xlog
option. this is because pg_receivexlog will continue streaming even
after pg_basebackup if it's called independently but not in the other
case so the use case for --xlog seems more narrow and error prone (ie:
you said that it reduces the need for wal_keep_segments *if the client
can keep up*... how can we know that before starting pg_basebackup?)
pg_receivexlog worked good in my tests.
pg_basebackup with --xlog=stream gives me an already recycled wal
segment message (note that the file was in pg_xlog in the standby):
FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removed
haven't read all the code in the detail but seems fine to me
in other things:
do we need to include src/bin/pg_basebackup/.gitignore in the patch?
--
Jaime Casanova www.2ndQuadrant.com
Professional PostgreSQL: Soporte 24x7 y capacitación
On Wed, Sep 28, 2011 at 1:38 AM, Jaime Casanova <jaime@2ndquadrant.com> wrote:
On Tue, Aug 16, 2011 at 9:32 AM, Magnus Hagander <magnus@hagander.net> wrote:
Here's an updated version of pg_receivexlog, that should now actually
work (it previously failed miserably when a replication record crossed
a WAL file boundary - something which I at the time could not properly
reproduce, but when I restarted my work on it now could easily
reproduce every time :D).It also contains an update to pg_basebackup that allows it to stream
the transaction log in the background while the backup is running,
thus reducing the need for wal_keep_segments (if the client can keep
up, it should eliminate the need completely).reviewing this...
btw, executing 'make world' with this patch gives me this error (seems
like an entry is missing in doc/src/sgml/ref/allfiles.sgml):
jade:reference.sgml:223:4:E: general entity "pgReceivexlog" not
defined and no default entity
--
Jaime Casanova www.2ndQuadrant.com
Professional PostgreSQL: Soporte 24x7 y capacitación
On Wed, Sep 28, 2011 at 08:38, Jaime Casanova <jaime@2ndquadrant.com> wrote:
On Tue, Aug 16, 2011 at 9:32 AM, Magnus Hagander <magnus@hagander.net> wrote:
Here's an updated version of pg_receivexlog, that should now actually
work (it previously failed miserably when a replication record crossed
a WAL file boundary - something which I at the time could not properly
reproduce, but when I restarted my work on it now could easily
reproduce every time :D).It also contains an update to pg_basebackup that allows it to stream
the transaction log in the background while the backup is running,
thus reducing the need for wal_keep_segments (if the client can keep
up, it should eliminate the need completely).reviewing this...
i found useful pg_receivexlog as an independent utility, but i'm not
so sure about the ability to call it from pg_basebackup via --xlog
option. this is because pg_receivexlog will continue streaming even
after pg_basebackup if it's called independently but not in the other
case so the use case for --xlog seems more narrow and error prone (ie:
you said that it reduces the need for wal_keep_segments *if the client
can keep up*... how can we know that before starting pg_basebackup?)
These two are not intended to be used together.
pg_basebackup --xlog=stream is intended for the same use-case as
"pg_basebackup -x" today, which is take a backup of just the parts
that you actually need to clone the database, but to do so without
having to guestimate the value for wal_keep_segments.
pg_receivexlog worked good in my tests.
pg_basebackup with --xlog=stream gives me an already recycled wal
segment message (note that the file was in pg_xlog in the standby):
FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removed
Do you get this reproducibly? Or did you get it just once?
And when you say "in the standby" what are you referring to? There is
no standby server in the case of pg_basebackup --xlog=stream, it's
just backup... But are you saying pg_basebackup had received the file,
yet tried to get it again?
in other things:
do we need to include src/bin/pg_basebackup/.gitignore in the patch?
Not sure what you mean? We need to add pg_receivexlog to this file,
yes - in head it just contains pg_basebackup.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Wed, Sep 28, 2011 at 09:30, Jaime Casanova <jaime@2ndquadrant.com> wrote:
On Wed, Sep 28, 2011 at 1:38 AM, Jaime Casanova <jaime@2ndquadrant.com> wrote:
On Tue, Aug 16, 2011 at 9:32 AM, Magnus Hagander <magnus@hagander.net> wrote:
Here's an updated version of pg_receivexlog, that should now actually
work (it previously failed miserably when a replication record crossed
a WAL file boundary - something which I at the time could not properly
reproduce, but when I restarted my work on it now could easily
reproduce every time :D).It also contains an update to pg_basebackup that allows it to stream
the transaction log in the background while the backup is running,
thus reducing the need for wal_keep_segments (if the client can keep
up, it should eliminate the need completely).reviewing this...
btw, executing 'make world' with this patch gives me this error (seems
like an entry is missing in doc/src/sgml/ref/allfiles.sgml):jade:reference.sgml:223:4:E: general entity "pgReceivexlog" not
defined and no default entity
Ugh, how did I miss that. You need this:
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 8a8616b..382d297 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY pgCtl SYSTEM "pg_ctl-ref.sgml">
<!ENTITY pgDump SYSTEM "pg_dump.sgml">
<!ENTITY pgDumpall SYSTEM "pg_dumpall.sgml">
+<!ENTITY pgReceivexlog SYSTEM "pg_receivexlog.sgml">
<!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml">
<!ENTITY pgRestore SYSTEM "pg_restore.sgml">
<!ENTITY postgres SYSTEM "postgres-ref.sgml">
I think I broke it in a merge at some point..
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Wed, Sep 28, 2011 at 12:50 PM, Magnus Hagander <magnus@hagander.net> wrote:
pg_receivexlog worked good in my tests.
pg_basebackup with --xlog=stream gives me an already recycled wal
segment message (note that the file was in pg_xlog in the standby):
FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removedDo you get this reproducibly? Or did you get it just once?
And when you say "in the standby" what are you referring to? There is
no standby server in the case of pg_basebackup --xlog=stream, it's
just backup... But are you saying pg_basebackup had received the file,
yet tried to get it again?
ok, i was trying to setup a standby server cloning with
pg_basebackup... i can't use it that way?
the docs says:
"""
If this option is specified, it is possible to start a postmaster
directly in the extracted directory without the need to consult the
log archive, thus making this a completely standalone backup.
"""
it doesn't say that is not possible to use this for a standby
server... probably that's why i get the error i put a recovery.conf
after pg_basebackup finished... maybe we can say that more loudly?
in other things:
do we need to include src/bin/pg_basebackup/.gitignore in the patch?Not sure what you mean? We need to add pg_receivexlog to this file,
yes - in head it just contains pg_basebackup.
your patch includes a modification in the file
src/bin/pg_basebackup/.gitignore, maybe i'm just being annoying
besides is a simple change... just forget that...
--
Jaime Casanova www.2ndQuadrant.com
Professional PostgreSQL: Soporte 24x7 y capacitación
On Thu, Sep 29, 2011 at 01:55, Jaime Casanova <jaime@2ndquadrant.com> wrote:
On Wed, Sep 28, 2011 at 12:50 PM, Magnus Hagander <magnus@hagander.net> wrote:
pg_receivexlog worked good in my tests.
pg_basebackup with --xlog=stream gives me an already recycled wal
segment message (note that the file was in pg_xlog in the standby):
FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removedDo you get this reproducibly? Or did you get it just once?
And when you say "in the standby" what are you referring to? There is
no standby server in the case of pg_basebackup --xlog=stream, it's
just backup... But are you saying pg_basebackup had received the file,
yet tried to get it again?ok, i was trying to setup a standby server cloning with
pg_basebackup... i can't use it that way?the docs says:
"""
If this option is specified, it is possible to start a postmaster
directly in the extracted directory without the need to consult the
log archive, thus making this a completely standalone backup.
"""it doesn't say that is not possible to use this for a standby
server... probably that's why i get the error i put a recovery.conf
after pg_basebackup finished... maybe we can say that more loudly?
The idea is, if you use it with -x (or --xlog), it's for taking a
backup/clone, *not* for replication.
If you use it without -x, then you can use it as the start of a
replica, by adding a recovery.conf.
But you can't do both at once, that will confuse it.
in other things:
do we need to include src/bin/pg_basebackup/.gitignore in the patch?Not sure what you mean? We need to add pg_receivexlog to this file,
yes - in head it just contains pg_basebackup.your patch includes a modification in the file
src/bin/pg_basebackup/.gitignore, maybe i'm just being annoying
besides is a simple change... just forget that...
Well, it needs to be included inthe commit, and if I exclude it inthe
posted patch, I'll just forget it in the end :-)
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
+ /* + * Looks like an xlog file. Parse it's position.
s/it's/its/
+ */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + log *= XLOG_SEG_SIZE;
That multiplication by XLOG_SEG_SIZE could overflow, if logid is very
high. It seems completely unnecessary, anyway,
s/IDENFITY_SYSTEM/IDENTIFY_SYSTEM/ (two occurrences)
In pg_basebackup, it would be a good sanity check to check that the
systemid returned by IDENTIFY_SYSTEM in the main connection and the
WAL-streaming connection match. Just to be sure that some connection
pooler didn't hijack one of the connections and point to a different
server. And better check timelineid too while you're at it.
How does this interact with synchronous replication? If a base backup
that streams WAL is in progress, and you have synchronous_standby_names
set to '*', I believe the in-progress backup will count as a standby for
that purpose. That might give a false sense of security.
synchronous_standby_names='*' is prone to such confusion in general, but
it seems that it's particularly surprising if a running pg_basebackup
lets a commit in synchronous replication to proceed. Maybe we just need
a warning in the docs. I think we should advise that
synchronous_standby_names='*' is dangerous in general, and cite this as
one reason for that.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Mon, Oct 24, 2011 at 13:46, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
+ /* + * Looks like an xlog file. Parse it's position.s/it's/its/
+ */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + log *= XLOG_SEG_SIZE;That multiplication by XLOG_SEG_SIZE could overflow, if logid is very high.
It seems completely unnecessary, anyway,
How do you mean completely unnecessary? We'd have to change the points
that use it to divide by XLOG_SEG_SIZE otherwise, no? That might be a
way to get around the overflow, but I'm not sure that's what you mean?
s/IDENFITY_SYSTEM/IDENTIFY_SYSTEM/ (two occurrences)
Oops.
In pg_basebackup, it would be a good sanity check to check that the systemid
returned by IDENTIFY_SYSTEM in the main connection and the WAL-streaming
connection match. Just to be sure that some connection pooler didn't hijack
one of the connections and point to a different server. And better check
timelineid too while you're at it.
That's a good idea. Will fix.
How does this interact with synchronous replication? If a base backup that
streams WAL is in progress, and you have synchronous_standby_names set to
'*', I believe the in-progress backup will count as a standby for that
purpose. That might give a false sense of security.
Ah yes. Did not think of that. Yes, it will have this problem.
synchronous_standby_names='*' is prone to such confusion in general, but it
seems that it's particularly surprising if a running pg_basebackup lets a
commit in synchronous replication to proceed. Maybe we just need a warning
in the docs. I think we should advise that synchronous_standby_names='*' is
dangerous in general, and cite this as one reason for that.
Hmm. i think this is common enough that we want to make sure we avoid
it in code.
Could we pass a parameter from the client indicating to the master
that it refuses to be a sync slave? An optional keyword to the
START_REPLICATION command, perhaps?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Mon, Oct 24, 2011 at 7:40 AM, Magnus Hagander <magnus@hagander.net> wrote:
synchronous_standby_names='*' is prone to such confusion in general, but it
seems that it's particularly surprising if a running pg_basebackup lets a
commit in synchronous replication to proceed. Maybe we just need a warning
in the docs. I think we should advise that synchronous_standby_names='*' is
dangerous in general, and cite this as one reason for that.Hmm. i think this is common enough that we want to make sure we avoid
it in code.Could we pass a parameter from the client indicating to the master
that it refuses to be a sync slave? An optional keyword to the
START_REPLICATION command, perhaps?
can't you execute "set synchronous_commit to off/local" for this connection?
--
Jaime Casanova www.2ndQuadrant.com
Professional PostgreSQL: Soporte 24x7 y capacitación
On Mon, Oct 24, 2011 at 16:12, Jaime Casanova <jaime@2ndquadrant.com> wrote:
On Mon, Oct 24, 2011 at 7:40 AM, Magnus Hagander <magnus@hagander.net> wrote:
synchronous_standby_names='*' is prone to such confusion in general, but it
seems that it's particularly surprising if a running pg_basebackup lets a
commit in synchronous replication to proceed. Maybe we just need a warning
in the docs. I think we should advise that synchronous_standby_names='*' is
dangerous in general, and cite this as one reason for that.Hmm. i think this is common enough that we want to make sure we avoid
it in code.Could we pass a parameter from the client indicating to the master
that it refuses to be a sync slave? An optional keyword to the
START_REPLICATION command, perhaps?can't you execute "set synchronous_commit to off/local" for this connection?
This is a walsender connection, it doesn't take SQL. Plus it's the
receiving end, and SET sync_commit is for the sending end.
that said, we are reasonably safe in current implementations, because
it always sets the flush location to invalidxlogptr, so it will not be
considered for sync slave. Should we ever start accepting "write" as
the point to sync against, the problem will show up, of course.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Mon, Oct 24, 2011 at 14:40, Magnus Hagander <magnus@hagander.net> wrote:
On Mon, Oct 24, 2011 at 13:46, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:+ /* + * Looks like an xlog file. Parse it's position.s/it's/its/
+ */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + log *= XLOG_SEG_SIZE;That multiplication by XLOG_SEG_SIZE could overflow, if logid is very high.
It seems completely unnecessary, anyway,How do you mean completely unnecessary? We'd have to change the points
that use it to divide by XLOG_SEG_SIZE otherwise, no? That might be a
way to get around the overflow, but I'm not sure that's what you mean?
Talked to Heikki on IM about this one, turns out we were both wrong.
It's needed, but there was a bug hiding under it, due to (once again)
mixing up segments and offsets. Has been fixed now.
In pg_basebackup, it would be a good sanity check to check that the systemid
returned by IDENTIFY_SYSTEM in the main connection and the WAL-streaming
connection match. Just to be sure that some connection pooler didn't hijack
one of the connections and point to a different server. And better check
timelineid too while you're at it.That's a good idea. Will fix.
Added to the new version of the patch.
How does this interact with synchronous replication? If a base backup that
streams WAL is in progress, and you have synchronous_standby_names set to
'*', I believe the in-progress backup will count as a standby for that
purpose. That might give a false sense of security.Ah yes. Did not think of that. Yes, it will have this problem.
Actually, thinking more, per other mail, it won't. Because it will
never report that the data is synced to disk, so it will not be
considered for sync standby.
This is something we might consider in the future (it could be a
reasonable scenario where you had this), but not in the first version.
Updated version of the patch attached.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Attachments:
pg_receivexlog2.difftext/x-patch; charset=US-ASCII; name=pg_receivexlog2.diffDownload
*** a/doc/src/sgml/ref/allfiles.sgml
--- b/doc/src/sgml/ref/allfiles.sgml
***************
*** 172,177 **** Complete list of usable sgml source files in this directory.
--- 172,178 ----
<!ENTITY pgCtl SYSTEM "pg_ctl-ref.sgml">
<!ENTITY pgDump SYSTEM "pg_dump.sgml">
<!ENTITY pgDumpall SYSTEM "pg_dumpall.sgml">
+ <!ENTITY pgReceivexlog SYSTEM "pg_receivexlog.sgml">
<!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml">
<!ENTITY pgRestore SYSTEM "pg_restore.sgml">
<!ENTITY postgres SYSTEM "postgres-ref.sgml">
*** a/doc/src/sgml/ref/pg_basebackup.sgml
--- b/doc/src/sgml/ref/pg_basebackup.sgml
***************
*** 143,150 **** PostgreSQL documentation
</varlistentry>
<varlistentry>
! <term><option>-x</option></term>
! <term><option>--xlog</option></term>
<listitem>
<para>
Includes the required transaction log files (WAL files) in the
--- 143,150 ----
</varlistentry>
<varlistentry>
! <term><option>-x <replaceable class="parameter">method</replaceable></option></term>
! <term><option>--xlog=<replaceable class="parameter">method</replaceable></option></term>
<listitem>
<para>
Includes the required transaction log files (WAL files) in the
***************
*** 154,169 **** PostgreSQL documentation
to consult the log archive, thus making this a completely standalone
backup.
</para>
! <note>
! <para>
! The transaction log files are collected at the end of the backup.
! Therefore, it is necessary for the
! <xref linkend="guc-wal-keep-segments"> parameter to be set high
! enough that the log is not removed before the end of the backup.
! If the log has been rotated when it's time to transfer it, the
! backup will fail and be unusable.
! </para>
! </note>
</listitem>
</varlistentry>
--- 154,196 ----
to consult the log archive, thus making this a completely standalone
backup.
</para>
! <para>
! The following methods for collecting the transaction logs are
! supported:
!
! <variablelist>
! <varlistentry>
! <term><literal>f</literal></term>
! <term><literal>fetch</literal></term>
! <listitem>
! <para>
! The transaction log files are collected at the end of the backup.
! Therefore, it is necessary for the
! <xref linkend="guc-wal-keep-segments"> parameter to be set high
! enough that the log is not removed before the end of the backup.
! If the log has been rotated when it's time to transfer it, the
! backup will fail and be unusable.
! </para>
! </listitem>
! </varlistentry>
!
! <varlistentry>
! <term><literal>s</literal></term>
! <term><literal>stream</literal></term>
! <listitem>
! <para>
! Stream the transaction log while the backup is created. This will
! open a second connection to the server and start streaming the
! transaction log in parallel while running the backup. Therefore,
! it will use up two slots configured by the
! <xref linkend="guc-max-wal-senders"> parameter. As long as the
! client can keep up with transaction log received, using this mode
! requires no extra transaction logs to be saved on the master.
! </para>
! </listitem>
! </varlistentry>
! </variablelist>
! </para>
</listitem>
</varlistentry>
***************
*** 261,266 **** PostgreSQL documentation
--- 288,307 ----
<variablelist>
<varlistentry>
+ <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+ <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the number of seconds between status packets sent back to the
+ server. This is required when streaming the transaction log (using
+ <literal>--xlog=stream</literal>) if replication timeout is configured
+ on the server, and allows for easier monitoring. The default value is
+ 10 seconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>-h <replaceable class="parameter">host</replaceable></option></term>
<term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
<listitem>
*** /dev/null
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 0 ****
--- 1,270 ----
+ <!--
+ doc/src/sgml/ref/pg_receivexlog.sgml
+ PostgreSQL documentation
+ -->
+
+ <refentry id="app-pgreceivexlog">
+ <refmeta>
+ <refentrytitle>pg_receivexlog</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo>Application</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>pg_receivexlog</refname>
+ <refpurpose>streams transaction logs from a <productname>PostgreSQL</productname> cluster</refpurpose>
+ </refnamediv>
+
+ <indexterm zone="app-pgreceivexlog">
+ <primary>pg_receivexlog</primary>
+ </indexterm>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>pg_receivexlog</command>
+ <arg rep="repeat"><replaceable>option</></arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>
+ Description
+ </title>
+ <para>
+ <application>pg_receivexlog</application> is used to stream transaction log
+ from a running <productname>PostgreSQL</productname> cluster. The transaction
+ log is streamed using the streaming replication protocol, and is written
+ to a local directory of files. This directory can be used as the archive
+ location for doing a restore using point-in-time recovery (see
+ <xref linkend="continuous-archiving">).
+ </para>
+
+ <para>
+ <application>pg_receivexlog</application> streams the transaction
+ log in real time as it's being generated on the server, and does not wait
+ for segments to complete like <xref linkend="guc-archive-command"> does.
+ For this reason, it is not necessary to set
+ <xref linkend="guc-archive-timeout"> when using
+ <application>pg_receivexlog</application>.
+ </para>
+
+ <para>
+ The transaction log is streamed over a regular
+ <productname>PostgreSQL</productname> connection, and uses the
+ replication protocol. The connection must be
+ made with a user having <literal>REPLICATION</literal> permissions (see
+ <xref linkend="role-attributes">), and the user must be granted explicit
+ permissions in <filename>pg_hba.conf</filename>. The server must also
+ be configured with <xref linkend="guc-max-wal-senders"> set high enough
+ to leave at least one session available for the stream.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Options</title>
+
+ <para>
+ The following command-line options control the location and format of the
+ output.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-D <replaceable class="parameter">directory</replaceable></option></term>
+ <term><option>--dir=<replaceable class="parameter">directory</replaceable></option></term>
+ <listitem>
+ <para>
+ Directory to write the output to.
+ </para>
+ <para>
+ This parameter is required.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ <para>
+ The following command-line options control the running of the program.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-v</option></term>
+ <term><option>--verbose</option></term>
+ <listitem>
+ <para>
+ Enables verbose mode.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+
+ <para>
+ The following command-line options control the database connection parameters.
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
+ <term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the number of seconds between status packets sent back to the
+ server. This is required if replication timeout is configured on the
+ server, and allows for easier monitoring. The default value is
+ 10 seconds.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-h <replaceable class="parameter">host</replaceable></option></term>
+ <term><option>--host=<replaceable class="parameter">host</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the host name of the machine on which the server is
+ running. If the value begins with a slash, it is used as the
+ directory for the Unix domain socket. The default is taken
+ from the <envar>PGHOST</envar> environment variable, if set,
+ else a Unix domain socket connection is attempted.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-p <replaceable class="parameter">port</replaceable></option></term>
+ <term><option>--port=<replaceable class="parameter">port</replaceable></option></term>
+ <listitem>
+ <para>
+ Specifies the TCP port or local Unix domain socket file
+ extension on which the server is listening for connections.
+ Defaults to the <envar>PGPORT</envar> environment variable, if
+ set, or a compiled-in default.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-U <replaceable>username</replaceable></option></term>
+ <term><option>--username=<replaceable class="parameter">username</replaceable></option></term>
+ <listitem>
+ <para>
+ User name to connect as.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-w</></term>
+ <term><option>--no-password</></term>
+ <listitem>
+ <para>
+ Never issue a password prompt. If the server requires
+ password authentication and a password is not available by
+ other means such as a <filename>.pgpass</filename> file, the
+ connection attempt will fail. This option can be useful in
+ batch jobs and scripts where no user is present to enter a
+ password.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-W</option></term>
+ <term><option>--password</option></term>
+ <listitem>
+ <para>
+ Force <application>pg_basebackup</application> to prompt for a
+ password before connecting to a database.
+ </para>
+
+ <para>
+ This option is never essential, since
+ <application>pg_bsaebackup</application> will automatically prompt
+ for a password if the server demands password authentication.
+ However, <application>pg_basebackup</application> will waste a
+ connection attempt finding out that the server wants a password.
+ In some cases it is worth typing <option>-W</> to avoid the extra
+ connection attempt.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
+ <para>
+ Other, less commonly used, parameters are also available:
+
+ <variablelist>
+ <varlistentry>
+ <term><option>-V</></term>
+ <term><option>--version</></term>
+ <listitem>
+ <para>
+ Print the <application>pg_receivexlog</application> version and exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>-?</></term>
+ <term><option>--help</></term>
+ <listitem>
+ <para>
+ Show help about <application>pg_receivexlog</application> command line
+ arguments, and exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Environment</title>
+
+ <para>
+ This utility, like most other <productname>PostgreSQL</> utilities,
+ uses the environment variables supported by <application>libpq</>
+ (see <xref linkend="libpq-envars">).
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Notes</title>
+
+ <para>
+ When using <application>pg_receivexlog</application> instead of
+ <xref linkend="guc-archive-command">, the server will continue to
+ recycle transaction log files even if the backups are not properly
+ archived, since there is no command that fails. This can be worked
+ around by having an <xref linkend="guc-archive-command"> that fails
+ when the file has not been properly archived yet.
+ </para>
+
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ To stream the transaction log from the server at
+ <literal>mydbserver</literal> and store it in the local directory
+ <filename>/usr/local/pgsql/archive</filename>:
+ <screen>
+ <prompt>$</prompt> <userinput>pg_receivexlog -h mydbserver -D /usr/local/pgsql/archive</userinput>
+ </screen>
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>See Also</title>
+
+ <simplelist type="inline">
+ <member><xref linkend="APP-PGBASEBACKUP"></member>
+ </simplelist>
+ </refsect1>
+
+ </refentry>
*** a/doc/src/sgml/reference.sgml
--- b/doc/src/sgml/reference.sgml
***************
*** 220,225 ****
--- 220,226 ----
&pgConfig;
&pgDump;
&pgDumpall;
+ &pgReceivexlog;
&pgRestore;
&psqlRef;
&reindexdb;
*** a/src/bin/pg_basebackup/.gitignore
--- b/src/bin/pg_basebackup/.gitignore
***************
*** 1 ****
--- 1,2 ----
/pg_basebackup
+ /pg_receivexlog
*** a/src/bin/pg_basebackup/Makefile
--- b/src/bin/pg_basebackup/Makefile
***************
*** 18,38 **** include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
! OBJS= pg_basebackup.o $(WIN32RES)
! all: pg_basebackup
! pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
! $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
clean distclean maintainer-clean:
! rm -f pg_basebackup$(X) $(OBJS)
--- 18,43 ----
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
! OBJS=receivelog.o streamutil.o $(WIN32RES)
! all: pg_basebackup pg_receivexlog
! pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
! $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
!
! pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
! $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
clean distclean maintainer-clean:
! rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 11,22 ****
*-------------------------------------------------------------------------
*/
! #include "postgres_fe.h"
#include "libpq-fe.h"
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
--- 11,30 ----
*-------------------------------------------------------------------------
*/
! /*
! * We have to use postgres.h not postgres_fe.h here, because there's so much
! * backend-only stuff in the XLOG include files we need. But we need a
! * frontend-ish environment otherwise. Hence this ugly hack.
! */
! #define FRONTEND 1
! #include "postgres.h"
#include "libpq-fe.h"
#include <unistd.h>
#include <dirent.h>
#include <sys/stat.h>
+ #include <sys/types.h>
+ #include <sys/wait.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
***************
*** 24,32 ****
#include "getopt_long.h"
/* Global options */
- static const char *progname;
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
--- 32,42 ----
#include "getopt_long.h"
+ #include "receivelog.h"
+ #include "streamutil.h"
+
/* Global options */
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
***************
*** 34,71 **** bool showprogress = false;
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
bool fastcheckpoint = false;
! char *dbhost = NULL;
! char *dbuser = NULL;
! char *dbport = NULL;
! int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
! /* Connection kept global so we can disconnect easily */
! static PGconn *conn = NULL;
! #define disconnect_and_exit(code) \
! { \
! if (conn != NULL) PQfinish(conn); \
! exit(code); \
! }
/* Function headers */
- static char *xstrdup(const char *s);
- static void *xmalloc0(int size);
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
- static PGconn *GetConnection(void);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
--- 44,81 ----
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
+ bool streamwal = false;
bool fastcheckpoint = false;
! int standby_message_timeout = 10; /* 10 sec = default */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
! /* Pipe to communicate with background wal receiver process */
! #ifndef WIN32
! static int bgpipe[2] = {-1, -1};
! #endif
! /* Handle to child process */
! static pid_t bgchild = -1;
!
! /* End position for xlog streaming, empty string if unknown yet */
! static XLogRecPtr xlogendptr;
! static int has_xlogendptr = 0;
/* Function headers */
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
+ static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
***************
*** 81,119 **** get_gz_error(gzFile *gzf)
}
#endif
- /*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
- static char *
- xstrdup(const char *s)
- {
- char *result;
-
- result = strdup(s);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- return result;
- }
-
- static void *
- xmalloc0(int size)
- {
- void *result;
-
- result = malloc(size);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- MemSet(result, 0, size);
- return result;
- }
-
static void
usage(void)
--- 91,96 ----
***************
*** 125,131 **** usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
! printf(_(" -x, --xlog include required WAL files in backup\n"));
printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n"));
printf(_("\nGeneral options:\n"));
--- 102,108 ----
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
! printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n"));
printf(_(" -z, --gzip compress tar output\n"));
printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n"));
printf(_("\nGeneral options:\n"));
***************
*** 137,142 **** usage(void)
--- 114,120 ----
printf(_(" --help show this help, then exit\n"));
printf(_(" --version output version information, then exit\n"));
printf(_("\nConnection options:\n"));
+ printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
***************
*** 147,152 **** usage(void)
--- 125,323 ----
/*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received.
+ * On Unix, we check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ * On Windows, we are in a single process, so we can just check if it's
+ * time to stop.
+ */
+ static bool
+ segment_callback(XLogRecPtr segendpos, uint32 timeline)
+ {
+ if (!has_xlogendptr)
+ {
+ #ifndef WIN32
+ fd_set fds;
+ struct timeval tv;
+ int r;
+
+ /*
+ * Don't have the end pointer yet - check our pipe to see if it has
+ * been sent yet.
+ */
+ FD_ZERO(&fds);
+ FD_SET(bgpipe[0], &fds);
+
+ MemSet(&tv, 0, sizeof(tv));
+
+ r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+ if (r == 1)
+ {
+ char xlogend[64];
+
+ MemSet(xlogend, 0, sizeof(xlogend));
+ r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+ if (r < 0)
+ {
+ fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+ progname, strerror(errno));
+ exit(1);
+ }
+
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ has_xlogendptr = 1;
+
+ /*
+ * Fall through to check if we've reached the point further
+ * already.
+ */
+ }
+ else
+ {
+ /*
+ * No data received on the pipe means we don't know the end
+ * position yet - so just say it's not time to stop yet.
+ */
+ return false;
+ }
+ #else
+
+ /*
+ * On win32, has_xlogendptr is set by the main thread, so if it's not
+ * set here, we just go back and wait until it shows up.
+ */
+ return false;
+ #endif
+ }
+
+ /*
+ * At this point we have an end pointer, so compare it to the current
+ * position to figure out if it's time to stop.
+ */
+ if (segendpos.xlogid > xlogendptr.xlogid ||
+ (segendpos.xlogid == xlogendptr.xlogid &&
+ segendpos.xrecoff >= xlogendptr.xrecoff))
+ return true;
+
+ /*
+ * Have end pointer, but haven't reached it yet - so tell the caller to
+ * keep streaming.
+ */
+ return false;
+ }
+
+ typedef struct
+ {
+ PGconn *bgconn;
+ XLogRecPtr startptr;
+ char xlogdir[MAXPGPATH];
+ char *sysidentifier;
+ int timeline;
+ } logstreamer_param;
+
+ static int
+ LogStreamerMain(logstreamer_param * param)
+ {
+ if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
+ param->sysidentifier, param->xlogdir,
+ segment_callback, NULL, standby_message_timeout))
+
+ /*
+ * Any errors will already have been reported in the function process,
+ * but we need to tell the parent that we didn't shutdown in a nice
+ * way.
+ */
+ return 1;
+
+ PQfinish(param->bgconn);
+ return 0;
+ }
+
+ /*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use its own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+ static void
+ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
+ {
+ logstreamer_param *param;
+
+ param = xmalloc0(sizeof(logstreamer_param));
+ param->timeline = timeline;
+ param->sysidentifier = sysidentifier;
+
+ /* Convert the starting position */
+ if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+ progname, startpos);
+ disconnect_and_exit(1);
+ }
+ /* Round off to even segment position */
+ param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+
+ #ifndef WIN32
+ /* Create our background pipe */
+ if (pgpipe(bgpipe) < 0)
+ {
+ fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ #endif
+
+ /* Get a second connection */
+ param->bgconn = GetConnection();
+
+ /*
+ * Always in plain format, so we can write to basedir/pg_xlog. But the
+ * directory entry in the tar file may arrive later, so make sure it's
+ * created before we start.
+ */
+ snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+ verify_dir_is_empty_or_create(param->xlogdir);
+
+ /*
+ * Start a child process and tell it to start streaming. On Unix, this is
+ * a fork(). On Windows, we create a thread.
+ */
+ #ifndef WIN32
+ bgchild = fork();
+ if (bgchild == 0)
+ {
+ /* in child process */
+ exit(LogStreamerMain(param));
+ }
+ else if (bgchild < 0)
+ {
+ fprintf(stderr, _("%s: could not create background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /*
+ * Else we are in the parent process and all is well.
+ */
+ #else /* WIN32 */
+ bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+ if (bgchild == 0)
+ {
+ fprintf(stderr, _("%s: could not create background thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ #endif
+ }
+
+ /*
* Verify that the given directory exists and is empty. If it does not
* exist, it is created. If it exists but is not empty, an error will
* be give and the process ended.
***************
*** 503,513 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
strcpy(current_path, PQgetvalue(res, rownum, 1));
/*
- * Make sure we're unpacking into an empty directory
- */
- verify_dir_is_empty_or_create(current_path);
-
- /*
* Get the COPY data
*/
res = PQgetResult(conn);
--- 674,679 ----
***************
*** 597,609 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/*
* Directory
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(filename, S_IRWXU) != 0)
{
! fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
! progname, filename, strerror(errno));
! disconnect_and_exit(1);
}
#ifndef WIN32
if (chmod(filename, (mode_t) filemode))
--- 763,783 ----
/*
* Directory
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(filename, S_IRWXU) != 0)
{
! /*
! * When streaming WAL, pg_xlog will have been created
! * by the wal receiver process, so just ignore failure
! * on that.
! */
! if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
! {
! fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
! progname, filename, strerror(errno));
! disconnect_and_exit(1);
! }
}
#ifndef WIN32
if (chmod(filename, (mode_t) filemode))
***************
*** 616,627 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/*
* Symbolic link
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (symlink(©buf[157], filename) != 0)
{
fprintf(stderr,
_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
! progname, filename, ©buf[157], strerror(errno));
disconnect_and_exit(1);
}
}
--- 790,801 ----
/*
* Symbolic link
*/
! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
if (symlink(©buf[157], filename) != 0)
{
fprintf(stderr,
_("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"),
! progname, filename, ©buf[157], strerror(errno));
disconnect_and_exit(1);
}
}
***************
*** 714,807 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
}
- static PGconn *
- GetConnection(void)
- {
- PGconn *tmpconn;
- int argcount = 4; /* dbname, replication, fallback_app_name,
- * password */
- int i;
- const char **keywords;
- const char **values;
- char *password = NULL;
-
- if (dbhost)
- argcount++;
- if (dbuser)
- argcount++;
- if (dbport)
- argcount++;
-
- keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
- values = xmalloc0((argcount + 1) * sizeof(*values));
-
- keywords[0] = "dbname";
- values[0] = "replication";
- keywords[1] = "replication";
- values[1] = "true";
- keywords[2] = "fallback_application_name";
- values[2] = progname;
- i = 3;
- if (dbhost)
- {
- keywords[i] = "host";
- values[i] = dbhost;
- i++;
- }
- if (dbuser)
- {
- keywords[i] = "user";
- values[i] = dbuser;
- i++;
- }
- if (dbport)
- {
- keywords[i] = "port";
- values[i] = dbport;
- i++;
- }
-
- while (true)
- {
- if (dbgetpassword == 1)
- {
- /* Prompt for a password */
- password = simple_prompt(_("Password: "), 100, false);
- keywords[argcount - 1] = "password";
- values[argcount - 1] = password;
- }
-
- tmpconn = PQconnectdbParams(keywords, values, true);
- if (password)
- free(password);
-
- if (PQstatus(tmpconn) == CONNECTION_BAD &&
- PQconnectionNeedsPassword(tmpconn) &&
- dbgetpassword != -1)
- {
- dbgetpassword = 1; /* ask for password next time */
- PQfinish(tmpconn);
- continue;
- }
-
- if (PQstatus(tmpconn) != CONNECTION_OK)
- {
- fprintf(stderr, _("%s: could not connect to server: %s"),
- progname, PQerrorMessage(tmpconn));
- exit(1);
- }
-
- /* Connection ok! */
- free(values);
- free(keywords);
- return tmpconn;
- }
- }
-
static void
BaseBackup(void)
{
PGresult *res;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
--- 888,899 ----
}
static void
BaseBackup(void)
{
PGresult *res;
+ char *sysidentifier;
+ uint32 timeline;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
int i;
***************
*** 814,826 **** BaseBackup(void)
conn = GetConnection();
/*
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
! includewal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
--- 906,938 ----
conn = GetConnection();
/*
+ * Run IDENTIFY_SYSTEM so we can get the timeline
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ sysidentifier = strdup(PQgetvalue(res, 0, 0));
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ PQclear(res);
+
+ /*
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
! includewal && !streamwal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
***************
*** 899,904 **** BaseBackup(void)
--- 1011,1028 ----
}
/*
+ * If we're streaming WAL, start the streaming session before we start
+ * receiving the actual data chunks.
+ */
+ if (streamwal)
+ {
+ if (verbose)
+ fprintf(stderr, _("%s: starting background WAL receiver\n"),
+ progname);
+ StartLogStreamer(xlogstart, timeline, sysidentifier);
+ }
+
+ /*
* Start receiving chunks
*/
for (i = 0; i < PQntuples(res); i++)
***************
*** 945,950 **** BaseBackup(void)
--- 1069,1160 ----
disconnect_and_exit(1);
}
+ if (bgchild > 0)
+ {
+ int status;
+
+ #ifndef WIN32
+ int r;
+ #endif
+
+ if (verbose)
+ fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+
+ #ifndef WIN32
+ if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+ {
+ fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Just wait for the background process to exit */
+ r = waitpid(bgchild, &status, 0);
+ if (r == -1)
+ {
+ fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (r != bgchild)
+ {
+ fprintf(stderr, _("%s: child %i died, expected %i\n"),
+ progname, r, bgchild);
+ disconnect_and_exit(1);
+ }
+ if (!WIFEXITED(status))
+ {
+ fprintf(stderr, _("%s: child process did not exit normally\n"),
+ progname);
+ disconnect_and_exit(1);
+ }
+ if (WEXITSTATUS(status) != 0)
+ {
+ fprintf(stderr, _("%s: child process exited with error %i\n"),
+ progname, WEXITSTATUS(status));
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy! */
+ #else /* WIN32 */
+
+ /*
+ * On Windows, since we are in the same process, we can just store the
+ * value directly in the variable, and then set the flag that says
+ * it's there.
+ */
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ InterlockedIncrement(&has_xlogendptr);
+
+ /* First wait for the thread to exit */
+ if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (status != 0)
+ {
+ fprintf(stderr, _("%s: child thread exited with error %u\n"),
+ progname, status);
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy */
+ #endif
+ }
+
/*
* End of copy data. Final result is already checked inside the loop.
*/
***************
*** 964,970 **** main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
! {"xlog", no_argument, NULL, 'x'},
{"gzip", no_argument, NULL, 'z'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
--- 1174,1180 ----
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
! {"xlog", required_argument, NULL, 'x'},
{"gzip", no_argument, NULL, 'z'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
***************
*** 973,978 **** main(int argc, char **argv)
--- 1183,1189 ----
{"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
+ {"statusint", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'},
{"progress", no_argument, NULL, 'P'},
{NULL, 0, NULL, 0}
***************
*** 999,1005 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
--- 1210,1216 ----
}
}
! while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 1021,1026 **** main(int argc, char **argv)
--- 1232,1249 ----
break;
case 'x':
includewal = true;
+ if (strcmp(optarg, "f") == 0 ||
+ strcmp(optarg, "fetch") == 0)
+ streamwal = false;
+ else if (strcmp(optarg, "s") == 0 ||
+ strcmp(optarg, "stream") == 0)
+ streamwal = true;
+ else
+ {
+ fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"),
+ progname, optarg);
+ exit(1);
+ }
break;
case 'l':
label = xstrdup(optarg);
***************
*** 1068,1073 **** main(int argc, char **argv)
--- 1291,1305 ----
case 'W':
dbgetpassword = 1;
break;
+ case 's':
+ standby_message_timeout = atoi(optarg);
+ if (standby_message_timeout < 0)
+ {
+ fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
***************
*** 1122,1127 **** main(int argc, char **argv)
--- 1354,1369 ----
exit(1);
}
+ if (format != 'p' && streamwal)
+ {
+ fprintf(stderr,
+ _("%s: wal streaming can only be used in plain mode\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel != 0)
{
*** /dev/null
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 0 ****
--- 1,465 ----
+ /*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ * to a local file.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+ /*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "libpq-fe.h"
+ #include "libpq/pqsignal.h"
+ #include "access/xlog_internal.h"
+
+ #include "receivelog.h"
+ #include "streamutil.h"
+
+ #include <dirent.h>
+ #include <sys/stat.h>
+ #include <sys/types.h>
+ #include <unistd.h>
+
+ #include "getopt_long.h"
+
+ /* Global options */
+ char *basedir = NULL;
+ int verbose = 0;
+ int standby_message_timeout = 10; /* 10 sec = default */
+ volatile bool time_to_abort = false;
+
+
+ static void usage(void);
+ static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+ static void StreamLog();
+ static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+ static void
+ usage(void)
+ {
+ printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+ progname);
+ printf(_("Usage:\n"));
+ printf(_(" %s [OPTION]...\n"), progname);
+ printf(_("\nOptions controlling the output:\n"));
+ printf(_(" -D, --dir=directory receive xlog files into this directory\n"));
+ printf(_("\nGeneral options:\n"));
+ printf(_(" -v, --verbose output verbose messages\n"));
+ printf(_(" -?, --help show this help, then exit\n"));
+ printf(_(" -V, --version output version information, then exit\n"));
+ printf(_("\nConnection options:\n"));
+ printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n"));
+ printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
+ printf(_(" -p, --port=PORT database server port number\n"));
+ printf(_(" -U, --username=NAME connect as specified database user\n"));
+ printf(_(" -w, --no-password never prompt for password\n"));
+ printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+ }
+
+ static bool
+ segment_callback(XLogRecPtr segendpos, uint32 timeline)
+ {
+ char fn[MAXPGPATH];
+ struct stat statbuf;
+
+ if (verbose)
+ fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+ progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+ /*
+ * Check if there is a partial file for the name we just finished, and if
+ * there is, remove it under the assumption that we have now got all the
+ * data we need.
+ */
+ segendpos.xrecoff /= XLOG_SEG_SIZE;
+ PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+ snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+ basedir, timeline,
+ segendpos.xlogid,
+ segendpos.xrecoff);
+ if (stat(fn, &statbuf) == 0)
+ {
+ /* File existed, get rid of it */
+ if (verbose)
+ fprintf(stderr, _("%s: removing file \"%s\"\n"),
+ progname, fn);
+ unlink(fn);
+ }
+
+ /*
+ * Never abort from this - we handle all aborting in continue_streaming()
+ */
+ return false;
+ }
+
+ static bool
+ continue_streaming()
+ {
+ if (time_to_abort)
+ {
+ fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
+ progname);
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ * we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ * WAL segment.
+ */
+ static XLogRecPtr
+ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+ {
+ DIR *dir;
+ struct dirent *dirent;
+ int i;
+ bool b;
+ uint32 high_log = 0;
+ uint32 high_seg = 0;
+ bool partial = false;
+
+ dir = opendir(basedir);
+ if (dir == NULL)
+ {
+ fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+ progname, basedir, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ while ((dirent = readdir(dir)) != NULL)
+ {
+ char fullpath[MAXPGPATH];
+ struct stat statbuf;
+ uint32 tli,
+ log,
+ seg;
+
+ if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+ continue;
+
+ /* xlog files are always 24 characters */
+ if (strlen(dirent->d_name) != 24)
+ continue;
+
+ /* Filenames are always made out of 0-9 and A-F */
+ b = false;
+ for (i = 0; i < 24; i++)
+ {
+ if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+ !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+ {
+ b = true;
+ break;
+ }
+ }
+ if (b)
+ continue;
+
+ /*
+ * Looks like an xlog file. Parse its position.
+ */
+ if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+ {
+ fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+ progname, dirent->d_name);
+ disconnect_and_exit(1);
+ }
+
+ /* Ignore any files that are for another timeline */
+ if (tli != currenttimeline)
+ continue;
+
+ /* Check if this is a completed segment or not */
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+ if (stat(fullpath, &statbuf) != 0)
+ {
+ fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ if (statbuf.st_size == 16 * 1024 * 1024)
+ {
+ /* Completed segment */
+ if (log > high_log ||
+ (log == high_log && seg > high_seg))
+ {
+ high_log = log;
+ high_seg = seg;
+ continue;
+ }
+ }
+ else
+ {
+ /*
+ * This is a partial file. Rename it out of the way.
+ */
+ char newfn[MAXPGPATH];
+
+ fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+ progname, dirent->d_name, dirent->d_name);
+
+ snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+ basedir, dirent->d_name);
+
+ if (stat(newfn, &statbuf) == 0)
+ {
+ /*
+ * XXX: perhaps we should only error out if the existing file
+ * is larger?
+ */
+ fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+ progname, newfn);
+ disconnect_and_exit(1);
+ }
+ if (rename(fullpath, newfn) != 0)
+ {
+ fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+ progname, fullpath, newfn, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Don't continue looking for more, we assume this is the last */
+ partial = true;
+ break;
+ }
+ }
+
+ closedir(dir);
+
+ if (high_log > 0 || high_seg > 0)
+ {
+ XLogRecPtr high_ptr;
+
+ if (!partial)
+ {
+ /*
+ * If the segment was partial, the pointer is already at the right
+ * location since we want to re-transmit that segment. If it was
+ * not, we need to move it to the next segment, since we are
+ * tracking the last one that was complete.
+ */
+ NextLogSeg(high_log, high_seg);
+ }
+
+ high_ptr.xlogid = high_log;
+ high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
+
+ return high_ptr;
+ }
+ else
+ return currentpos;
+ }
+
+ /*
+ * Start the log streaming
+ */
+ static void
+ StreamLog(void)
+ {
+ PGresult *res;
+ uint32 timeline;
+ XLogRecPtr startpos;
+
+ /*
+ * Connect in replication mode to the server
+ */
+ conn = GetConnection();
+
+ /*
+ * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+ * position.
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ disconnect_and_exit(1);
+ }
+ PQclear(res);
+
+ /*
+ * Figure out where to start streaming.
+ */
+ startpos = FindStreamingStart(startpos, timeline);
+
+ /*
+ * Always start streaming at the beginning of a segment
+ */
+ startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Start the replication
+ */
+ if (verbose)
+ fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+ progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+ ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
+ segment_callback, continue_streaming,
+ standby_message_timeout);
+ }
+
+ /*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+ static void
+ sigint_handler(int signum)
+ {
+ time_to_abort = true;
+ }
+
+ int
+ main(int argc, char **argv)
+ {
+ static struct option long_options[] = {
+ {"help", no_argument, NULL, '?'},
+ {"version", no_argument, NULL, 'V'},
+ {"dir", required_argument, NULL, 'D'},
+ {"host", required_argument, NULL, 'h'},
+ {"port", required_argument, NULL, 'p'},
+ {"username", required_argument, NULL, 'U'},
+ {"no-password", no_argument, NULL, 'w'},
+ {"password", no_argument, NULL, 'W'},
+ {"statusint", required_argument, NULL, 's'},
+ {"verbose", no_argument, NULL, 'v'},
+ {NULL, 0, NULL, 0}
+ };
+ int c;
+
+ int option_index;
+
+ progname = get_progname(argv[0]);
+ set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage();
+ exit(0);
+ }
+ else if (strcmp(argv[1], "-V") == 0
+ || strcmp(argv[1], "--version") == 0)
+ {
+ puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
+ while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv",
+ long_options, &option_index)) != -1)
+ {
+ switch (c)
+ {
+ case 'D':
+ basedir = xstrdup(optarg);
+ break;
+ case 'h':
+ dbhost = xstrdup(optarg);
+ break;
+ case 'p':
+ if (atoi(optarg) <= 0)
+ {
+ fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ dbport = xstrdup(optarg);
+ break;
+ case 'U':
+ dbuser = xstrdup(optarg);
+ break;
+ case 'w':
+ dbgetpassword = -1;
+ break;
+ case 'W':
+ dbgetpassword = 1;
+ break;
+ case 's':
+ standby_message_timeout = atoi(optarg);
+ if (standby_message_timeout < 0)
+ {
+ fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
+ case 'v':
+ verbose++;
+ break;
+ default:
+
+ /*
+ * getopt_long already emitted a complaint
+ */
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+ }
+
+ /*
+ * Any non-option arguments?
+ */
+ if (optind < argc)
+ {
+ fprintf(stderr,
+ _("%s: too many command-line arguments (first is \"%s\")\n"),
+ progname, argv[optind]);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ /*
+ * Required arguments
+ */
+ if (basedir == NULL)
+ {
+ fprintf(stderr, _("%s: no target directory specified\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ #ifndef WIN32
+ pqsignal(SIGINT, sigint_handler);
+ #endif
+
+ StreamLog();
+
+ exit(0);
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 0 ****
--- 1,383 ----
+ /*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ * replication protocol.
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+ /*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "libpq-fe.h"
+ #include "access/xlog_internal.h"
+ #include "replication/walprotocol.h"
+ #include "utils/datetime.h"
+
+ #include "receivelog.h"
+ #include "streamutil.h"
+
+ #include <sys/time.h>
+ #include <sys/types.h>
+ #include <unistd.h>
+
+
+ /* Size of the streaming replication protocol header */
+ #define STREAMING_HEADER_SIZE (1+8+8+8)
+
+ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
+
+ /*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+ static int
+ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+ {
+ int f;
+ char fn[MAXPGPATH];
+
+ XLogFileName(namebuf, timeline, startpoint.xlogid,
+ startpoint.xrecoff / XLOG_SEG_SIZE);
+
+ snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+ f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
+ if (f == -1)
+ fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+ progname, namebuf, strerror(errno));
+ return f;
+ }
+
+ /*
+ * Local version of GetCurrentTimestamp(), since we are not linked with
+ * backend code.
+ */
+ static TimestampTz
+ localGetCurrentTimestamp(void)
+ {
+ TimestampTz result;
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+
+ result = (TimestampTz) tp.tv_sec -
+ ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+ #ifdef HAVE_INT64_TIMESTAMP
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+ #else
+ result = result + (tp.tv_usec / 1000000.0);
+ #endif
+
+ return result;
+ }
+
+ /*
+ * Receive a log stream starting at the specified position.
+ *
+ * If sysidentifier is specified, validate that both the system
+ * identifier and the timeline matches the specified ones
+ * (by sending an extra IDENTIFY_SYSTEM command)
+ *
+ * Note: The log position *must* be at a log segment start!
+ */
+ bool
+ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+ {
+ char query[128];
+ char current_walfile_name[MAXPGPATH];
+ PGresult *res;
+ char *copybuf = NULL;
+ int walfile = -1;
+ int64 last_status = -1;
+ XLogRecPtr blockpos = InvalidXLogRecPtr;
+
+ if (sysidentifier != NULL)
+ {
+ /* Validate system identifier and timeline hasn't changed */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ PQclear(res);
+ return false;
+ }
+ if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
+ {
+ fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname);
+ PQclear(res);
+ return false;
+ }
+ if (timeline != atoi(PQgetvalue(res, 0, 1)))
+ {
+ fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname);
+ PQclear(res);
+ return false;
+ }
+ PQclear(res);
+ }
+
+ /* Initiate the replication stream at specified location */
+ snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ {
+ fprintf(stderr, _("%s: could not start replication: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+
+ /*
+ * Receive the actual xlog data
+ */
+ while (1)
+ {
+ int r;
+ int xlogoff;
+ int bytes_left;
+ int bytes_written;
+ int64 now;
+
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
+ /*
+ * Check if we should continue streaming, or abort at this point.
+ */
+ if (stream_continue && stream_continue())
+ {
+ if (walfile != -1)
+ {
+ fsync(walfile);
+ close(walfile);
+ }
+ return true;
+ }
+
+ /*
+ * Potentially send a status message to the master
+ */
+ now = localGetCurrentTimestamp();
+ if (standby_message_timeout > 0 &&
+ last_status < now - standby_message_timeout * 1000000)
+ {
+ /* Time to send feedback! */
+ char replybuf[sizeof(StandbyReplyMessage) + 1];
+ StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1);
+
+ replymsg->write = blockpos;
+ replymsg->flush = InvalidXLogRecPtr;
+ replymsg->apply = InvalidXLogRecPtr;
+ replymsg->sendTime = now;
+ replybuf[0] = 'r';
+
+ if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 ||
+ PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send feedback packet: %s"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+
+ last_status = now;
+ }
+
+ r = PQgetCopyData(conn, ©buf, 1);
+ if (r == 0)
+ {
+ /*
+ * In async mode, and no data available. We block on reading but
+ * not more than the specified timeout, so that we can send a
+ * response back to the client.
+ */
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *timeoutptr;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+ if (standby_message_timeout)
+ {
+ timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+ if (timeout.tv_sec <= 0)
+ timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ timeout.tv_usec = 0;
+ timeoutptr = &timeout;
+ }
+ else
+ timeoutptr = NULL;
+
+ r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ if (r == 0 || (r < 0 && errno == EINTR))
+ {
+ /*
+ * Got a timeout or signal. Continue the loop and either
+ * deliver a status packet to the server or just go back into
+ * blocking.
+ */
+ continue;
+ }
+ else if (r < 0)
+ {
+ fprintf(stderr, _("%s: select() failed: %m\n"), progname);
+ return false;
+ }
+ /* Else there is actually data on the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ continue;
+ }
+ if (r == -1)
+ /* End of copy stream */
+ break;
+ if (r == -2)
+ {
+ fprintf(stderr, _("%s: could not read copy data: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ if (r < STREAMING_HEADER_SIZE + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ progname, r);
+ return false;
+ }
+ if (copybuf[0] != 'w')
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
+ return false;
+ }
+
+ /* Extract WAL location for this block */
+ memcpy(&blockpos, copybuf + 1, 8);
+ xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Verify that the initial location in the stream matches where we
+ * think we are.
+ */
+ if (walfile == -1)
+ {
+ /* No file open yet */
+ if (xlogoff != 0)
+ {
+ fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+ progname, xlogoff);
+ return false;
+ }
+ }
+ else
+ {
+ /* More data in existing segment */
+ /* XXX: store seek value don't reseek all the time */
+ if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ {
+ fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ return false;
+ }
+ }
+
+ bytes_left = r - STREAMING_HEADER_SIZE;
+ bytes_written = 0;
+
+ while (bytes_left)
+ {
+ int bytes_to_write;
+
+ /*
+ * If crossing a WAL boundary, only write up until we reach
+ * XLOG_SEG_SIZE.
+ */
+ if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ else
+ bytes_to_write = bytes_left;
+
+ if (walfile == -1)
+ {
+ walfile = open_walfile(blockpos, timeline,
+ basedir, current_walfile_name);
+ if (walfile == -1)
+ /* Error logged by open_walfile */
+ return false;
+ }
+
+ if (write(walfile,
+ copybuf + STREAMING_HEADER_SIZE + bytes_written,
+ bytes_to_write) != bytes_to_write)
+ {
+ fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+ progname,
+ bytes_to_write,
+ current_walfile_name,
+ strerror(errno));
+ return false;
+ }
+
+ /* Write was successful, advance our position */
+ bytes_written += bytes_to_write;
+ bytes_left -= bytes_to_write;
+ XLByteAdvance(blockpos, bytes_to_write);
+ xlogoff += bytes_to_write;
+
+ /* Did we reach the end of a WAL segment? */
+ if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
+ {
+ fsync(walfile);
+ close(walfile);
+ walfile = -1;
+ xlogoff = 0;
+
+ if (segment_finish != NULL)
+ {
+ /*
+ * Callback when the segment finished, and return if it
+ * told us to.
+ */
+ if (segment_finish(blockpos, timeline))
+ return true;
+ }
+ }
+ }
+ /* No more data left to write, start receiving next copy packet */
+ }
+
+ /*
+ * The only way to get out of the loop is if the server shut down the
+ * replication stream. If it's a controlled shutdown, the server will send
+ * a shutdown message, and we'll return the latest xlog location that has
+ * been streamed.
+ */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+ return true;
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 0 ****
--- 1,22 ----
+ #include "access/xlogdefs.h"
+
+ /*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+ typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+ /*
+ * Called before trying to read more data. Return true to stop
+ * the streaming at this point.
+ */
+ typedef bool (*stream_continue_callback)(void);
+
+ bool ReceiveXlogStream(PGconn *conn,
+ XLogRecPtr startpos,
+ uint32 timeline,
+ char *sysidentifier,
+ char *basedir,
+ segment_finish_callback segment_finish,
+ stream_continue_callback stream_continue,
+ int standby_message_timeout);
*** /dev/null
--- b/src/bin/pg_basebackup/streamutil.c
***************
*** 0 ****
--- 1,165 ----
+ /*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <magnus@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+ /*
+ * We have to use postgres.h not postgres_fe.h here, because there's so much
+ * backend-only stuff in the XLOG include files we need. But we need a
+ * frontend-ish environment otherwise. Hence this ugly hack.
+ */
+ #define FRONTEND 1
+ #include "postgres.h"
+ #include "streamutil.h"
+
+ #include <stdio.h>
+ #include <string.h>
+
+ const char *progname;
+ char *dbhost = NULL;
+ char *dbuser = NULL;
+ char *dbport = NULL;
+ int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
+ static char *dbpassword = NULL;
+ PGconn *conn = NULL;
+
+ /*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+ char *
+ xstrdup(const char *s)
+ {
+ char *result;
+
+ result = strdup(s);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ return result;
+ }
+
+ void *
+ xmalloc0(int size)
+ {
+ void *result;
+
+ result = malloc(size);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ MemSet(result, 0, size);
+ return result;
+ }
+
+
+ PGconn *
+ GetConnection(void)
+ {
+ PGconn *tmpconn;
+ int argcount = 4; /* dbname, replication, fallback_app_name,
+ * password */
+ int i;
+ const char **keywords;
+ const char **values;
+ char *password = NULL;
+
+ if (dbhost)
+ argcount++;
+ if (dbuser)
+ argcount++;
+ if (dbport)
+ argcount++;
+
+ keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+ values = xmalloc0((argcount + 1) * sizeof(*values));
+
+ keywords[0] = "dbname";
+ values[0] = "replication";
+ keywords[1] = "replication";
+ values[1] = "true";
+ keywords[2] = "fallback_application_name";
+ values[2] = progname;
+ i = 3;
+ if (dbhost)
+ {
+ keywords[i] = "host";
+ values[i] = dbhost;
+ i++;
+ }
+ if (dbuser)
+ {
+ keywords[i] = "user";
+ values[i] = dbuser;
+ i++;
+ }
+ if (dbport)
+ {
+ keywords[i] = "port";
+ values[i] = dbport;
+ i++;
+ }
+
+ while (true)
+ {
+ if (password)
+ free(password);
+
+ if (dbpassword)
+ {
+ /*
+ * We've saved a password when a previous connection succeeded,
+ * meaning this is the call for a second session to the same
+ * database, so just forcibly reuse that password.
+ */
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = dbpassword;
+ dbgetpassword = -1; /* Don't try again if this fails */
+ }
+ else if (dbgetpassword == 1)
+ {
+ password = simple_prompt(_("Password: "), 100, false);
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = password;
+ }
+
+ tmpconn = PQconnectdbParams(keywords, values, true);
+
+ if (PQstatus(tmpconn) == CONNECTION_BAD &&
+ PQconnectionNeedsPassword(tmpconn) &&
+ dbgetpassword != -1)
+ {
+ dbgetpassword = 1; /* ask for password next time */
+ PQfinish(tmpconn);
+ continue;
+ }
+
+ if (PQstatus(tmpconn) != CONNECTION_OK)
+ {
+ fprintf(stderr, _("%s: could not connect to server: %s\n"),
+ progname, PQerrorMessage(tmpconn));
+ exit(1);
+ }
+
+ /* Connection ok! */
+ free(values);
+ free(keywords);
+
+ /* Store the password for next run */
+ if (password)
+ dbpassword = password;
+ return tmpconn;
+ }
+ }
*** /dev/null
--- b/src/bin/pg_basebackup/streamutil.h
***************
*** 0 ****
--- 1,22 ----
+ #include "libpq-fe.h"
+
+ extern const char *progname;
+ extern char *dbhost;
+ extern char *dbuser;
+ extern char *dbport;
+ extern int dbgetpassword;
+
+ /* Connection kept global so we can disconnect easily */
+ extern PGconn *conn;
+
+ #define disconnect_and_exit(code) \
+ { \
+ if (conn != NULL) PQfinish(conn); \
+ exit(code); \
+ }
+
+
+ char *xstrdup(const char *s);
+ void *xmalloc0(int size);
+
+ PGconn *GetConnection(void);
*** a/src/tools/msvc/Mkvcbuild.pm
--- b/src/tools/msvc/Mkvcbuild.pm
***************
*** 305,310 **** sub mkvcbuild
--- 305,317 ----
$initdb->AddLibrary('ws2_32.lib');
my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+ $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+ $pgbasebackup->AddLibrary('ws2_32.lib');
+
+ my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+ $pgreceivexlog->{name} = 'pg_receivexlog';
+ $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
+ $pgreceivexlog->AddLibrary('ws2_32.lib');
my $pgconfig = AddSimpleFrontend('pg_config');
On Tue, Oct 25, 2011 at 12:37, Magnus Hagander <magnus@hagander.net> wrote:
On Mon, Oct 24, 2011 at 14:40, Magnus Hagander <magnus@hagander.net> wrote:
On Mon, Oct 24, 2011 at 13:46, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:+ /* + * Looks like an xlog file. Parse it's position.s/it's/its/
+ */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + log *= XLOG_SEG_SIZE;That multiplication by XLOG_SEG_SIZE could overflow, if logid is very high.
It seems completely unnecessary, anyway,How do you mean completely unnecessary? We'd have to change the points
that use it to divide by XLOG_SEG_SIZE otherwise, no? That might be a
way to get around the overflow, but I'm not sure that's what you mean?Talked to Heikki on IM about this one, turns out we were both wrong.
It's needed, but there was a bug hiding under it, due to (once again)
mixing up segments and offsets. Has been fixed now.In pg_basebackup, it would be a good sanity check to check that the systemid
returned by IDENTIFY_SYSTEM in the main connection and the WAL-streaming
connection match. Just to be sure that some connection pooler didn't hijack
one of the connections and point to a different server. And better check
timelineid too while you're at it.That's a good idea. Will fix.
Added to the new version of the patch.
How does this interact with synchronous replication? If a base backup that
streams WAL is in progress, and you have synchronous_standby_names set to
'*', I believe the in-progress backup will count as a standby for that
purpose. That might give a false sense of security.Ah yes. Did not think of that. Yes, it will have this problem.
Actually, thinking more, per other mail, it won't. Because it will
never report that the data is synced to disk, so it will not be
considered for sync standby.This is something we might consider in the future (it could be a
reasonable scenario where you had this), but not in the first version.Updated version of the patch attached.
I've applied this version with a few more minor changes that Heikki found.
His comment about .partial files still applies, and I intend to
address this in a follow-up commit, along with some further
documentation enhancements.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Oct 27, 2011 at 3:29 AM, Magnus Hagander <magnus@hagander.net> wrote:
I've applied this version with a few more minor changes that Heikki found.
Cool!
When I tried pg_receivexlog and checked the contents of streamed WAL file by
xlogdump, I found that recent WAL records that walsender has already sent don't
exist in that WAL file. I expected that pg_receivexlog writes the streamed WAL
records to the disk as soon as possible, but it doesn't. Is this
intentional? Or bug?
Am I missing something?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Oct 27, 2011 at 09:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 3:29 AM, Magnus Hagander <magnus@hagander.net> wrote:
I've applied this version with a few more minor changes that Heikki found.
Cool!
When I tried pg_receivexlog and checked the contents of streamed WAL file by
xlogdump, I found that recent WAL records that walsender has already sent don't
exist in that WAL file. I expected that pg_receivexlog writes the streamed WAL
records to the disk as soon as possible, but it doesn't. Is this
intentional? Or bug?
Am I missing something?
It writes it to disk as soon as possible, but doesn't fsync() until
the end of each segment. Are you by any chance looking at the file
while it's running?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Oct 27, 2011 at 4:40 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 09:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 3:29 AM, Magnus Hagander <magnus@hagander.net> wrote:
I've applied this version with a few more minor changes that Heikki found.
Cool!
When I tried pg_receivexlog and checked the contents of streamed WAL file by
xlogdump, I found that recent WAL records that walsender has already sent don't
exist in that WAL file. I expected that pg_receivexlog writes the streamed WAL
records to the disk as soon as possible, but it doesn't. Is this
intentional? Or bug?
Am I missing something?It writes it to disk as soon as possible, but doesn't fsync() until
the end of each segment. Are you by any chance looking at the file
while it's running?
No. I looked at that file after shutting down the master server.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Oct 27, 2011 at 09:46, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 4:40 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 09:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 3:29 AM, Magnus Hagander <magnus@hagander.net> wrote:
I've applied this version with a few more minor changes that Heikki found.
Cool!
When I tried pg_receivexlog and checked the contents of streamed WAL file by
xlogdump, I found that recent WAL records that walsender has already sent don't
exist in that WAL file. I expected that pg_receivexlog writes the streamed WAL
records to the disk as soon as possible, but it doesn't. Is this
intentional? Or bug?
Am I missing something?It writes it to disk as soon as possible, but doesn't fsync() until
the end of each segment. Are you by any chance looking at the file
while it's running?No. I looked at that file after shutting down the master server.
Ugh, in that case something is certainly wrong. There is nothing but
setting up some offset values between PQgetCopyData() and write()...
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Oct 27, 2011 at 4:49 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 09:46, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 4:40 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 09:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 3:29 AM, Magnus Hagander <magnus@hagander.net> wrote:
I've applied this version with a few more minor changes that Heikki found.
Cool!
When I tried pg_receivexlog and checked the contents of streamed WAL file by
xlogdump, I found that recent WAL records that walsender has already sent don't
exist in that WAL file. I expected that pg_receivexlog writes the streamed WAL
records to the disk as soon as possible, but it doesn't. Is this
intentional? Or bug?
Am I missing something?It writes it to disk as soon as possible, but doesn't fsync() until
the end of each segment. Are you by any chance looking at the file
while it's running?No. I looked at that file after shutting down the master server.
Ugh, in that case something is certainly wrong. There is nothing but
setting up some offset values between PQgetCopyData() and write()...
When end-of-copy stream is found or an error happens, pg_receivexlog
exits without flushing outstanding WAL records. Which seems to cause
the problem I reported.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Oct 27, 2011 at 10:12, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 4:49 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 09:46, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 4:40 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 09:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 3:29 AM, Magnus Hagander <magnus@hagander.net> wrote:
I've applied this version with a few more minor changes that Heikki found.
Cool!
When I tried pg_receivexlog and checked the contents of streamed WAL file by
xlogdump, I found that recent WAL records that walsender has already sent don't
exist in that WAL file. I expected that pg_receivexlog writes the streamed WAL
records to the disk as soon as possible, but it doesn't. Is this
intentional? Or bug?
Am I missing something?It writes it to disk as soon as possible, but doesn't fsync() until
the end of each segment. Are you by any chance looking at the file
while it's running?No. I looked at that file after shutting down the master server.
Ugh, in that case something is certainly wrong. There is nothing but
setting up some offset values between PQgetCopyData() and write()...When end-of-copy stream is found or an error happens, pg_receivexlog
exits without flushing outstanding WAL records. Which seems to cause
the problem I reported.
Not sure I follow. When we arrive at PQgetCopyData() there should be
nothing buffered, and if the end of stream happens there it returns
-1, and we exit, no? So where is the data that's lost?
I do realize we don't actually fsync() and close() in this case - is
that what you are referring to? But the data should already have been
write()d, so it should still be there, no?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Oct 27, 2011 at 5:18 PM, Magnus Hagander <magnus@hagander.net> wrote:
Not sure I follow. When we arrive at PQgetCopyData() there should be
nothing buffered, and if the end of stream happens there it returns
-1, and we exit, no? So where is the data that's lost?I do realize we don't actually fsync() and close() in this case - is
that what you are referring to? But the data should already have been
write()d, so it should still be there, no?
Oh, right. Hmm.. xlogdump might be the cause.
Though I've not read the code of xlogdump, I wonder if it gives up
outputting the contents of WAL file when it finds a partial WAL page...
This strikes me that recovery code has the same problem. No?
IOW, when a partial WAL page is found during recovery, I'm afraid
that page would not be replayed though it contains valid data.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Oct 27, 2011 at 6:25 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 5:18 PM, Magnus Hagander <magnus@hagander.net> wrote:
Not sure I follow. When we arrive at PQgetCopyData() there should be
nothing buffered, and if the end of stream happens there it returns
-1, and we exit, no? So where is the data that's lost?I do realize we don't actually fsync() and close() in this case - is
that what you are referring to? But the data should already have been
write()d, so it should still be there, no?Oh, right. Hmm.. xlogdump might be the cause.
Though I've not read the code of xlogdump, I wonder if it gives up
outputting the contents of WAL file when it finds a partial WAL page...
This strikes me that recovery code has the same problem. No?
IOW, when a partial WAL page is found during recovery, I'm afraid
that page would not be replayed though it contains valid data.
My concern was right. When I performed a recovery using the streamed
WAL files, the loss of data happened. A partial WAL page was not replayed.
To avoid this problem, I think that we should change pg_receivexlog so
that it writes WAL data *by the block*, or creates, like walreceiver, WAL file
before writing any data. Otherwise, though pg_receivexlog streams WAL
data in realtime, the latest WAL data might not be available for recovery.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Oct 27, 2011 at 12:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 6:25 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 5:18 PM, Magnus Hagander <magnus@hagander.net> wrote:
Not sure I follow. When we arrive at PQgetCopyData() there should be
nothing buffered, and if the end of stream happens there it returns
-1, and we exit, no? So where is the data that's lost?I do realize we don't actually fsync() and close() in this case - is
that what you are referring to? But the data should already have been
write()d, so it should still be there, no?Oh, right. Hmm.. xlogdump might be the cause.
Though I've not read the code of xlogdump, I wonder if it gives up
outputting the contents of WAL file when it finds a partial WAL page...
This strikes me that recovery code has the same problem. No?
IOW, when a partial WAL page is found during recovery, I'm afraid
that page would not be replayed though it contains valid data.My concern was right. When I performed a recovery using the streamed
WAL files, the loss of data happened. A partial WAL page was not replayed.To avoid this problem, I think that we should change pg_receivexlog so
that it writes WAL data *by the block*, or creates, like walreceiver, WAL file
before writing any data. Otherwise, though pg_receivexlog streams WAL
data in realtime, the latest WAL data might not be available for recovery.
Ah, so you were recovering data from the last, partial, file? Not from
a completed file?
I'm rewriting the handling of partial files per the other thread
started by Heikki. The idea is that there will be an actual .partial
file in there when pg_receivexlog has ended, and you have to deal with
it manually. The typical way would be to pad it with zeroes to the
end. Doing such padding would solve this recovery issue, correct?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Oct 27, 2011 at 7:48 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 12:29, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 6:25 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Oct 27, 2011 at 5:18 PM, Magnus Hagander <magnus@hagander.net> wrote:
Not sure I follow. When we arrive at PQgetCopyData() there should be
nothing buffered, and if the end of stream happens there it returns
-1, and we exit, no? So where is the data that's lost?I do realize we don't actually fsync() and close() in this case - is
that what you are referring to? But the data should already have been
write()d, so it should still be there, no?Oh, right. Hmm.. xlogdump might be the cause.
Though I've not read the code of xlogdump, I wonder if it gives up
outputting the contents of WAL file when it finds a partial WAL page...
This strikes me that recovery code has the same problem. No?
IOW, when a partial WAL page is found during recovery, I'm afraid
that page would not be replayed though it contains valid data.My concern was right. When I performed a recovery using the streamed
WAL files, the loss of data happened. A partial WAL page was not replayed.To avoid this problem, I think that we should change pg_receivexlog so
that it writes WAL data *by the block*, or creates, like walreceiver, WAL file
before writing any data. Otherwise, though pg_receivexlog streams WAL
data in realtime, the latest WAL data might not be available for recovery.Ah, so you were recovering data from the last, partial, file? Not from
a completed file?
Yes. I copied all streamed WAL files to pg_xlog directory and started recovery.
I'm rewriting the handling of partial files per the other thread
started by Heikki. The idea is that there will be an actual .partial
file in there when pg_receivexlog has ended, and you have to deal with
it manually. The typical way would be to pad it with zeroes to the
end. Doing such padding would solve this recovery issue, correct?
Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On 27.10.2011 14:09, Fujii Masao wrote:
On Thu, Oct 27, 2011 at 7:48 PM, Magnus Hagander<magnus@hagander.net> wrote:
I'm rewriting the handling of partial files per the other thread
started by Heikki. The idea is that there will be an actual .partial
file in there when pg_receivexlog has ended, and you have to deal with
it manually. The typical way would be to pad it with zeroes to the
end. Doing such padding would solve this recovery issue, correct?Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?
"truncate -s 16M <file>" works at least on my Linux system. Not sure how
you'd do it on Windows.
Perhaps we should add automatic padding in the server, though. It
wouldn't take much code in the server, and would make life easier for
people writing their scripts. Maybe even have the backend check for a
.partial file if it can't find a regularly named XLOG file. Needs some
thought..
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On Thu, Oct 27, 2011 at 13:19, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
On 27.10.2011 14:09, Fujii Masao wrote:
On Thu, Oct 27, 2011 at 7:48 PM, Magnus Hagander<magnus@hagander.net>
wrote:I'm rewriting the handling of partial files per the other thread
started by Heikki. The idea is that there will be an actual .partial
file in there when pg_receivexlog has ended, and you have to deal with
it manually. The typical way would be to pad it with zeroes to the
end. Doing such padding would solve this recovery issue, correct?Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?"truncate -s 16M <file>" works at least on my Linux system. Not sure how
you'd do it on Windows.
Yeah, taht's easy enough. I don't think there are similar tools on
windows, but we could probably put together a quick script for people
to use if necessary.
Perhaps we should add automatic padding in the server, though. It wouldn't
take much code in the server, and would make life easier for people writing
their scripts. Maybe even have the backend check for a .partial file if it
can't find a regularly named XLOG file. Needs some thought..
I'd definitely want to avoid anything that requires pg_receivexlog to
actually *parse* the WAL. That'll make it way more complex than I'd
like.
Having recovery consider a .partial file might be interesting. It
could consider that only if there are no other complete files
available, or something like that?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Oct 27, 2011 at 7:19 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
On 27.10.2011 14:09, Fujii Masao wrote:
On Thu, Oct 27, 2011 at 7:48 PM, Magnus Hagander<magnus@hagander.net>
wrote:I'm rewriting the handling of partial files per the other thread
started by Heikki. The idea is that there will be an actual .partial
file in there when pg_receivexlog has ended, and you have to deal with
it manually. The typical way would be to pad it with zeroes to the
end. Doing such padding would solve this recovery issue, correct?Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?"truncate -s 16M <file>" works at least on my Linux system. Not sure how
you'd do it on Windows.
One of the common I hear about PostgreSQL is that our replication
system is more difficult to set up than people would like, and it's
too easy to make mistakes that can corrupt your data without realizing
it; I don't think making them need to truncate a file to 16 megabytes
is going to improve things there.
Perhaps we should add automatic padding in the server, though. It wouldn't
take much code in the server, and would make life easier for people writing
their scripts. Maybe even have the backend check for a .partial file if it
can't find a regularly named XLOG file. Needs some thought..
+1 for figuring out something, though I'm not sure exactly what.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Magnus Hagander <magnus@hagander.net> writes:
On Thu, Oct 27, 2011 at 13:19, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:On 27.10.2011 14:09, Fujii Masao wrote:
Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?
I'd definitely want to avoid anything that requires pg_receivexlog to
actually *parse* the WAL. That'll make it way more complex than I'd
like.
What parsing? Just pad to 16MB with zeroes. In fact, I think the
receiver should just create the file that size to start with, and then
write received data into it, much like normal WAL creation does.
regards, tom lane
On Thu, Oct 27, 2011 at 16:54, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Magnus Hagander <magnus@hagander.net> writes:
On Thu, Oct 27, 2011 at 13:19, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:On 27.10.2011 14:09, Fujii Masao wrote:
Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?I'd definitely want to avoid anything that requires pg_receivexlog to
actually *parse* the WAL. That'll make it way more complex than I'd
like.What parsing? Just pad to 16MB with zeroes. In fact, I think the
I'm just sayihng that *if* parsing is required, it would be bad.
receiver should just create the file that size to start with, and then
write received data into it, much like normal WAL creation does.
So when pg_receivexlog starts up, how would it know if the last file
represents a completed file, or a half-full file, without actually
parsing it? It could be a 16Mb file with 10 bytes of valid data, or a
complete file with 16Mb of valid data.
We could always ask for a retransmit of the whole file, but if that
file is gone on the master, we won't be able to do that, and will
error out in a situation that's not actually an error.
Though I guess if we leave the file as .partial up until this point
(per my other patch just posted), I guess this doesn't actually apply
- if the file is called .partial, we'll overwrite into it. If it's
not, then we assume it's a complete segment.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Magnus Hagander <magnus@hagander.net> writes:
On Thu, Oct 27, 2011 at 13:19, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:Perhaps we should add automatic padding in the server, though. It wouldn't
take much code in the server, and would make life easier for people writing
their scripts. Maybe even have the backend check for a .partial file if it
can't find a regularly named XLOG file. Needs some thought..I'd definitely want to avoid anything that requires pg_receivexlog to
actually *parse* the WAL. That'll make it way more complex than I'd
like.
What about creating the WAL file filled up with zeroes at the receiving
end and then overwriting data as we receive it?
Regards,
--
Dimitri Fontaine
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support
On Thu, Oct 27, 2011 at 11:57 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Thu, Oct 27, 2011 at 16:54, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Magnus Hagander <magnus@hagander.net> writes:
On Thu, Oct 27, 2011 at 13:19, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:On 27.10.2011 14:09, Fujii Masao wrote:
Yes. But that sounds unuserfriendly. Padding the WAL file manually
is easy-to-do for a user?I'd definitely want to avoid anything that requires pg_receivexlog to
actually *parse* the WAL. That'll make it way more complex than I'd
like.What parsing? Just pad to 16MB with zeroes. In fact, I think the
I'm just sayihng that *if* parsing is required, it would be bad.
receiver should just create the file that size to start with, and then
write received data into it, much like normal WAL creation does.So when pg_receivexlog starts up, how would it know if the last file
represents a completed file, or a half-full file, without actually
parsing it? It could be a 16Mb file with 10 bytes of valid data, or a
complete file with 16Mb of valid data.We could always ask for a retransmit of the whole file, but if that
file is gone on the master, we won't be able to do that, and will
error out in a situation that's not actually an error.Though I guess if we leave the file as .partial up until this point
(per my other patch just posted), I guess this doesn't actually apply
- if the file is called .partial, we'll overwrite into it. If it's
not, then we assume it's a complete segment.
Yeah, I think that we should commit the patch that you posted in
other thread, and should change pg_receivexlog so that it creates
new WAL file filled up with zero or opens a pre-existing one, like
XLogFileInit() does, before writing any streamed data. If we do
this, a user can easily use a partial WAL file for recovery by
renaming that file.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Sep 29, 2011 at 11:30 PM, Magnus Hagander <magnus@hagander.net> wrote:
it doesn't say that is not possible to use this for a standby
server... probably that's why i get the error i put a recovery.conf
after pg_basebackup finished... maybe we can say that more loudly?The idea is, if you use it with -x (or --xlog), it's for taking a
backup/clone, *not* for replication.If you use it without -x, then you can use it as the start of a
replica, by adding a recovery.conf.But you can't do both at once, that will confuse it.
I stumbled upon this again today. There's nothing in the docs that
would even hint that using -x shouldn't work to create a replica. Why
does it get confused and can we (easily) make it not get confused? At
the very least it needs a big fat warning in documentation for the -x
option that the resulting backup might not be usable as a standby.
Ants Aasma
--
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt
Web: http://www.postgresql-support.de
On Mon, Jun 4, 2012 at 11:25 PM, Ants Aasma <ants@cybertec.at> wrote:
On Thu, Sep 29, 2011 at 11:30 PM, Magnus Hagander <magnus@hagander.net> wrote:
it doesn't say that is not possible to use this for a standby
server... probably that's why i get the error i put a recovery.conf
after pg_basebackup finished... maybe we can say that more loudly?The idea is, if you use it with -x (or --xlog), it's for taking a
backup/clone, *not* for replication.If you use it without -x, then you can use it as the start of a
replica, by adding a recovery.conf.But you can't do both at once, that will confuse it.
I stumbled upon this again today. There's nothing in the docs that
would even hint that using -x shouldn't work to create a replica. Why
does it get confused and can we (easily) make it not get confused? At
the very least it needs a big fat warning in documentation for the -x
option that the resulting backup might not be usable as a standby.
Unless I'm missing something, you can use pg_basebackup -x for the
standby. If lots of WAL files are generated in the master after
pg_basebackup -x ends and before you start the standby instance,
you may get the following error. In this case, you need to consult with
archived WAL files even though you specified -x option in pg_basebackup.
FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removed
Though we have the above problem, pg_basebackup -x is usable for
the standby, I think.
Regards,
--
Fujii Masao
On Mon, Jun 4, 2012 at 6:20 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Mon, Jun 4, 2012 at 11:25 PM, Ants Aasma <ants@cybertec.at> wrote:
On Thu, Sep 29, 2011 at 11:30 PM, Magnus Hagander <magnus@hagander.net> wrote:
it doesn't say that is not possible to use this for a standby
server... probably that's why i get the error i put a recovery.conf
after pg_basebackup finished... maybe we can say that more loudly?The idea is, if you use it with -x (or --xlog), it's for taking a
backup/clone, *not* for replication.If you use it without -x, then you can use it as the start of a
replica, by adding a recovery.conf.But you can't do both at once, that will confuse it.
I stumbled upon this again today. There's nothing in the docs that
would even hint that using -x shouldn't work to create a replica. Why
does it get confused and can we (easily) make it not get confused? At
the very least it needs a big fat warning in documentation for the -x
option that the resulting backup might not be usable as a standby.Unless I'm missing something, you can use pg_basebackup -x for the
standby. If lots of WAL files are generated in the master after
pg_basebackup -x ends and before you start the standby instance,
you may get the following error. In this case, you need to consult with
archived WAL files even though you specified -x option in pg_basebackup.FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removedThough we have the above problem, pg_basebackup -x is usable for
the standby, I think.
I assumed from Magnus's comment that this is a known problem. I wonder
what went wrong if it should have worked. In the case where this
turned up the missing file was an xlog file with the new timeline ID
but one segment before the timeline switch. I'll have to see if I can
create a reproducible case for this.
Ants Aasma
--
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt
Web: http://www.postgresql-support.de
On Mon, Jun 4, 2012 at 5:48 PM, Ants Aasma <ants@cybertec.at> wrote:
On Mon, Jun 4, 2012 at 6:20 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Mon, Jun 4, 2012 at 11:25 PM, Ants Aasma <ants@cybertec.at> wrote:
On Thu, Sep 29, 2011 at 11:30 PM, Magnus Hagander <magnus@hagander.net> wrote:
it doesn't say that is not possible to use this for a standby
server... probably that's why i get the error i put a recovery.conf
after pg_basebackup finished... maybe we can say that more loudly?The idea is, if you use it with -x (or --xlog), it's for taking a
backup/clone, *not* for replication.If you use it without -x, then you can use it as the start of a
replica, by adding a recovery.conf.But you can't do both at once, that will confuse it.
I stumbled upon this again today. There's nothing in the docs that
would even hint that using -x shouldn't work to create a replica. Why
does it get confused and can we (easily) make it not get confused? At
the very least it needs a big fat warning in documentation for the -x
option that the resulting backup might not be usable as a standby.Unless I'm missing something, you can use pg_basebackup -x for the
standby. If lots of WAL files are generated in the master after
pg_basebackup -x ends and before you start the standby instance,
you may get the following error. In this case, you need to consult with
archived WAL files even though you specified -x option in pg_basebackup.FATAL: could not receive data from WAL stream: FATAL: requested WAL
segment 00000001000000000000005C has already been removedThough we have the above problem, pg_basebackup -x is usable for
the standby, I think.I assumed from Magnus's comment that this is a known problem. I wonder
what went wrong if it should have worked. In the case where this
turned up the missing file was an xlog file with the new timeline ID
but one segment before the timeline switch. I'll have to see if I can
create a reproducible case for this.
No, it's more a "there's no reason to do that". I don't think it
should necessarily be an actual problem.
In your case the missing piece of information is why was there a
timeline switch? pg_basebackup shouldn't cause a timeline switch
whether you use it in -x mode or not.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Mon, Jun 4, 2012 at 6:53 PM, Magnus Hagander <magnus@hagander.net> wrote:
No, it's more a "there's no reason to do that". I don't think it
should necessarily be an actual problem.
Ok, good to know.
In your case the missing piece of information is why was there a
timeline switch? pg_basebackup shouldn't cause a timeline switch
whether you use it in -x mode or not.
No mystery there. The timeline switch was because I had just promoted
the master for standby mode. There's a chance I might have
accidentally done something horribly wrong somewhere because I can't
immediately reproduce this. I'll let you know if I find out how I
managed to create this error.
Ants Aasma
--
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt
Web: http://www.postgresql-support.de