pg_receivexlog add synchronous mode

Started by Nonameover 11 years ago34 messages
#1Noname
furuyao@pm.nttdata.co.jp
1 attachment(s)

Hi,

This patch implements a pg_receivexlog add synchronous mode.
Now, synchronous(synchronous_commit = remote_write) is supported.
But synchronous(synchronous_commit = remote_write), if the server crashes then WAL file may not to be flushed to disk , causing data loss.

Synchronous(synchronous_commit = on) mode offers the ability to confirm WAL have been streamed in the same way as synchronous replication.
If an output is used as a different disk from the directory where the transaction log should be stored.
Prevent the loss of data due to disk failure.

the additional parameter(-m) and replicationslot specify, that its synchronous mode.
All received WAL write after, flush is executed and reply flush position.
Flush is not performed every time write, it is performed collectively like walrecever.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-synchronous-mode.patchapplication/octet-stream; name=pg_receivexlog-add-synchronous-mode.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,78 ----
     as possible. To avoid this behavior, use the <literal>-n</literal>
     parameter.
    </para>
+ 
+   <para>
+    Synchronous mode offers the ability to confirm WAL have been streamed
+    in the same way as synchronous replication. To use synchronous mode, 
+    set up synchronous replication as described in
+    <xref linkend="synchronous-replication-config">, and set parameter(that is, -m and --slot).
+   </para>
   </refsect1>
  
   <refsect1>
***************
*** 106,111 **** PostgreSQL documentation
--- 113,129 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-m</option></term>
+       <term><option>--sync-mode</option></term>
+       <listitem>
+        <para>
+         Enables synchronous mode. If replication slot is disabled then 
+         this setting is irrelevant.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
  static char *basedir = NULL;
  static int	verbose = 0;
  static int	noloop = 0;
+ static int	syncmode = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
  static volatile bool time_to_abort = false;
  
***************
*** 62,67 **** usage(void)
--- 63,69 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -m, --sync-mode        synchronous mode\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 332,338 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", syncmode);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"sync-mode", no_argument, NULL, 'm'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 392,398 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'm':
+ 				syncmode = 1;
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
--- 34,40 ----
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos, int syncmode);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 417,432 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * If 'syncmode' is not zero, synchronous mode. Flush is executed after all
+  * received WAL is written, and reply flush position.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int syncmode)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 571,577 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, syncmode);
  		if (res == NULL)
  			goto error;
  
***************
*** 729,735 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 732,738 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int syncmode)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 1041,1046 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 1044,1063 ----
  					}
  				}
  			}
+ 			/* No close_walfile() and synchronous mode ? */
+ 			if (walfile != -1 && syncmode)
+ 			{
+ 				if (fsync(walfile) != 0)
+ 				{
+ 					fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ 							progname, current_walfile_name, strerror(errno));
+ 					goto error;
+ 				}
+ 				lastFlushPosition = blockpos;
+ 				/* flush position to send feedback! */
+ 				if (!sendFeedback(conn, blockpos, now, false))
+ 					goto error;
+ 			}
  			/* No more data left to write, receive next copy packet */
  		}
  		else
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int syncmode);
#2Andres Freund
andres@2ndquadrant.com
In reply to: Noname (#1)
Re: pg_receivexlog add synchronous mode

Hi,

On 2014-06-05 17:09:44 +0900, furuyao@pm.nttdata.co.jp wrote:

Synchronous(synchronous_commit = on) mode offers the ability to confirm WAL have been streamed in the same way as synchronous replication.
If an output is used as a different disk from the directory where the transaction log should be stored.
Prevent the loss of data due to disk failure.

the additional parameter(-m) and replicationslot specify, that its synchronous mode.
All received WAL write after, flush is executed and reply flush
position.

What's the usecase for this? I can see some benefit in easier testing of
syncrep, but that's basically it?

Flush is not performed every time write, it is performed collectively
like walrecever.

I only glanced at this, but afaics you're only flushing at the end every
WAL segment. That will result in absolutely horrible performance, right?
Walreceiver does flush more frequently than that. It basically syncs
every chunk of received WAL...

Greetings,

Andres Freund

--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Noname
furuyao@pm.nttdata.co.jp
In reply to: Andres Freund (#2)
Re: pg_receivexlog add synchronous mode

-----Original Message-----
Hi,

On 2014-06-05 17:09:44 +0900, furuyao@pm.nttdata.co.jp wrote:

Synchronous(synchronous_commit = on) mode offers the ability to

confirm WAL have been streamed in the same way as synchronous
replication.

If an output is used as a different disk from the directory where the

transaction log should be stored.

Prevent the loss of data due to disk failure.

the additional parameter(-m) and replicationslot specify, that its

synchronous mode.

All received WAL write after, flush is executed and reply flush
position.

What's the usecase for this? I can see some benefit in easier testing
of syncrep, but that's basically it?

When used with syncrep, standby server crashes, multiplexing of WAL can be collateral.
Data loss can be to nearly zero.

Flush is not performed every time write, it is performed collectively
like walrecever.

I only glanced at this, but afaics you're only flushing at the end every
WAL segment. That will result in absolutely horrible performance, right?
Walreceiver does flush more frequently than that. It basically syncs
every chunk of received WAL...

IMO the completion of the write loop was completion of received WAL.
And Walreceiver same.

I confirm it about the flush position.

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Noname
furuyao@pm.nttdata.co.jp
In reply to: Noname (#3)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

-----Original Message-----

Flush is not performed every time write, it is performed
collectively like walrecever.

I only glanced at this, but afaics you're only flushing at the end
every WAL segment. That will result in absolutely horrible performance,

right?

Walreceiver does flush more frequently than that. It basically syncs
every chunk of received WAL...

IMO the completion of the write loop was completion of received WAL.
And Walreceiver same.

I confirm it about the flush position.

As you say,Walreceiver does flush more frequently than that.
However, it seems difficult to apply as same way.
So, I have tried a different approach.

1. select () time-out 100msec setting.
2. flush check is time-out of the select ().
3. wirte() only when flush.

I think this is what cause the problem, but I don't have some good idea to solve it.
Can someone please advise me?

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-synchronous-mode-v2.patchapplication/octet-stream; name=pg_receivexlog-add-synchronous-mode-v2.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,78 ----
     as possible. To avoid this behavior, use the <literal>-n</literal>
     parameter.
    </para>
+ 
+   <para>
+    Synchronous mode offers the ability to confirm WAL have been streamed
+    in the same way as synchronous replication. To use synchronous mode, 
+    set up synchronous replication as described in
+    <xref linkend="synchronous-replication-config">, and set parameter(that is, -m and --slot).
+   </para>
   </refsect1>
  
   <refsect1>
***************
*** 106,111 **** PostgreSQL documentation
--- 113,129 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-m</option></term>
+       <term><option>--sync-mode</option></term>
+       <listitem>
+        <para>
+         Enables synchronous mode. If replication slot is disabled then 
+         this setting is irrelevant.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
  static char *basedir = NULL;
  static int	verbose = 0;
  static int	noloop = 0;
+ static int	syncmode = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
  static volatile bool time_to_abort = false;
  
***************
*** 62,67 **** usage(void)
--- 63,69 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -m, --sync-mode        synchronous mode\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 332,338 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", syncmode);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"sync-mode", no_argument, NULL, 'm'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 392,398 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'm':
+ 				syncmode = 1;
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
--- 34,40 ----
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos, int syncmode);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 417,432 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * If 'syncmode' is not zero, synchronous mode. Flush is executed after all
+  * received WAL is written, and reply flush position.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int syncmode)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 571,577 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, syncmode);
  		if (res == NULL)
  			goto error;
  
***************
*** 729,740 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
  	bool		still_sending = true;
  
  	while (1)
  	{
--- 732,744 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int syncmode)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
  	bool		still_sending = true;
+ 	bool		flush_flg = false;
  
  	while (1)
  	{
***************
*** 813,818 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 817,827 ----
  				else
  					timeout.tv_sec = secs;
  				timeout.tv_usec = usecs;
+ 				if (syncmode)
+ 				{
+ 					timeout.tv_sec = 0;
+ 					timeout.tv_usec = 100000;/* sync mode sleep at 100 msec*/
+ 				}
  				timeoutptr = &timeout;
  			}
  			else
***************
*** 826,831 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 835,853 ----
  				 * deliver a status packet to the server or just go back into
  				 * blocking.
  				 */
+ 				if (flush_flg && walfile != -1 && syncmode)
+ 				{
+ 					if (fsync(walfile) != 0)
+ 					{
+ 						fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ 								progname, current_walfile_name, strerror(errno));
+ 						goto error;
+ 					}
+ 					lastFlushPosition = blockpos;
+ 					if (!sendFeedback(conn, blockpos, now, false))
+ 						goto error;
+ 					flush_flg = false;
+ 				}
  				continue;
  			}
  			else if (r < 0)
***************
*** 1041,1046 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 1063,1069 ----
  					}
  				}
  			}
+ 			flush_flg = true;
  			/* No more data left to write, receive next copy packet */
  		}
  		else
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int syncmode);
#5Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#4)
Re: pg_receivexlog add synchronous mode

On Fri, Jun 6, 2014 at 8:05 PM, <furuyao@pm.nttdata.co.jp> wrote:

-----Original Message-----

Flush is not performed every time write, it is performed
collectively like walrecever.

I only glanced at this, but afaics you're only flushing at the end
every WAL segment. That will result in absolutely horrible performance,

right?

Walreceiver does flush more frequently than that. It basically syncs
every chunk of received WAL...

IMO the completion of the write loop was completion of received WAL.
And Walreceiver same.

I confirm it about the flush position.

As you say,Walreceiver does flush more frequently than that.

No. IIUC walreceiver does flush *less* frequently than what you implemented
on pg_receivexlog. Your version of pg_receivexlog tries to do flush every time
when it receives one WAL chunk. OTOH, walreceiver does flush only when
there is no extra WAL chunk in receive buffer. IOW, after writing WAL chunk,
if there is another WAL chunk that walreceiver can receive immediately, it
postpones flush later.

However, it seems difficult to apply as same way.

Why? ISTM that's not so difficult.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#5)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

No. IIUC walreceiver does flush *less* frequently than what you
implemented on pg_receivexlog. Your version of pg_receivexlog tries to
do flush every time when it receives one WAL chunk. OTOH, walreceiver
does flush only when there is no extra WAL chunk in receive buffer. IOW,
after writing WAL chunk, if there is another WAL chunk that walreceiver
can receive immediately, it postpones flush later.

However, it seems difficult to apply as same way.

Why? ISTM that's not so difficult.

I was not able to understand movement of walreceiver well.
While walreceiver writes data, do PQconsumeInput() by omitting the select().
Do flush if the PQgetCopyData has been to return the zero continuously.
Fixed to the same process using the flag.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-synchronous-mode-v3.patchapplication/octet-stream; name=pg_receivexlog-add-synchronous-mode-v3.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,78 ----
     as possible. To avoid this behavior, use the <literal>-n</literal>
     parameter.
    </para>
+ 
+   <para>
+    Synchronous mode offers the ability to confirm WAL have been streamed
+    in the same way as synchronous replication. To use synchronous mode, 
+    set up synchronous replication as described in
+    <xref linkend="synchronous-replication-config">, and set parameter(that is, -m and --slot).
+   </para>
   </refsect1>
  
   <refsect1>
***************
*** 106,111 **** PostgreSQL documentation
--- 113,129 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-m</option></term>
+       <term><option>--sync-mode</option></term>
+       <listitem>
+        <para>
+         Enables synchronous mode. If replication slot is disabled then 
+         this setting is irrelevant.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
  static char *basedir = NULL;
  static int	verbose = 0;
  static int	noloop = 0;
+ static int	syncmode = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
  static volatile bool time_to_abort = false;
  
***************
*** 62,67 **** usage(void)
--- 63,69 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -m, --sync-mode        synchronous mode\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 332,338 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", syncmode);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"sync-mode", no_argument, NULL, 'm'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 392,398 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'm':
+ 				syncmode = 1;
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
--- 34,40 ----
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos, int syncmode);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 417,432 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * If 'syncmode' is not zero, synchronous mode. Flush is executed after all
+  * received WAL is written, and reply flush position.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int syncmode)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 571,577 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, syncmode);
  		if (res == NULL)
  			goto error;
  
***************
*** 729,740 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
  	bool		still_sending = true;
  
  	while (1)
  	{
--- 732,744 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int syncmode)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
  	bool		still_sending = true;
+ 	int		flush_flg = 0;
  
  	while (1)
  	{
***************
*** 787,839 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		r = PQgetCopyData(conn, &copybuf, 1);
  		if (r == 0)
  		{
! 			/*
! 			 * No data available. Wait for some to appear, but not longer than
! 			 * the specified timeout, so that we can ping the server.
! 			 */
! 			fd_set		input_mask;
! 			struct timeval timeout;
! 			struct timeval *timeoutptr;
! 
! 			FD_ZERO(&input_mask);
! 			FD_SET(PQsocket(conn), &input_mask);
! 			if (standby_message_timeout && still_sending)
  			{
! 				int64		targettime;
! 				long		secs;
! 				int			usecs;
! 
! 				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
! 				feTimestampDifference(now,
! 									  targettime,
! 									  &secs,
! 									  &usecs);
! 				if (secs <= 0)
! 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
! 				else
! 					timeout.tv_sec = secs;
! 				timeout.tv_usec = usecs;
! 				timeoutptr = &timeout;
  			}
! 			else
! 				timeoutptr = NULL;
! 
! 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
! 			if (r == 0 || (r < 0 && errno == EINTR))
  			{
  				/*
! 				 * Got a timeout or signal. Continue the loop and either
! 				 * deliver a status packet to the server or just go back into
! 				 * blocking.
  				 */
! 				continue;
! 			}
! 			else if (r < 0)
! 			{
! 				fprintf(stderr, _("%s: select() failed: %s\n"),
! 						progname, strerror(errno));
! 				goto error;
  			}
  			/* Else there is actually data on the socket */
  			if (PQconsumeInput(conn) == 0)
  			{
--- 791,864 ----
  		r = PQgetCopyData(conn, &copybuf, 1);
  		if (r == 0)
  		{
! 			if (flush_flg == 2)
  			{
! 				if (walfile != -1)
! 				{
! 					if (fsync(walfile) != 0)
! 					{
! 						fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 								progname, current_walfile_name, strerror(errno));
! 						goto error;
! 					}
! 					lastFlushPosition = blockpos;
! 					if (!sendFeedback(conn, blockpos, now, false))
! 						goto error;
! 				}
! 				flush_flg = 0;
  			}
! 			if (flush_flg == 0 || !syncmode)
  			{
  				/*
! 				 * No data available. Wait for some to appear, but not longer than
! 				 * the specified timeout, so that we can ping the server.
  				 */
! 				fd_set		input_mask;
! 				struct timeval timeout;
! 				struct timeval *timeoutptr;
! 
! 				FD_ZERO(&input_mask);
! 				FD_SET(PQsocket(conn), &input_mask);
! 				if (standby_message_timeout && still_sending)
! 				{
! 					int64		targettime;
! 					long		secs;
! 					int			usecs;
! 
! 					targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
! 					feTimestampDifference(now,
! 										  targettime,
! 										  &secs,
! 										  &usecs);
! 					if (secs <= 0)
! 						timeout.tv_sec = 1; /* Always sleep at least 1 sec */
! 					else
! 						timeout.tv_sec = secs;
! 					timeout.tv_usec = usecs;
! 					timeoutptr = &timeout;
! 				}
! 				else
! 					timeoutptr = NULL;
! 
! 				r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
! 				if (r == 0 || (r < 0 && errno == EINTR))
! 				{
! 					/*
! 					 * Got a timeout or signal. Continue the loop and either
! 					 * deliver a status packet to the server or just go back into
! 					 * blocking.
! 					 */
! 					continue;
! 				}
! 				else if (r < 0)
! 				{
! 					fprintf(stderr, _("%s: select() failed: %s\n"),
! 							progname, strerror(errno));
! 					goto error;
! 				}
  			}
+ 			else
+ 				flush_flg++;
  			/* Else there is actually data on the socket */
  			if (PQconsumeInput(conn) == 0)
  			{
***************
*** 1041,1046 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 1066,1073 ----
  					}
  				}
  			}
+ 			if(syncmode)
+ 				flush_flg = 1;
  			/* No more data left to write, receive next copy packet */
  		}
  		else
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int syncmode);
#7Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#6)
Re: pg_receivexlog add synchronous mode

On Tue, Jun 10, 2014 at 5:01 PM, <furuyao@pm.nttdata.co.jp> wrote:

No. IIUC walreceiver does flush *less* frequently than what you
implemented on pg_receivexlog. Your version of pg_receivexlog tries to
do flush every time when it receives one WAL chunk. OTOH, walreceiver
does flush only when there is no extra WAL chunk in receive buffer. IOW,
after writing WAL chunk, if there is another WAL chunk that walreceiver
can receive immediately, it postpones flush later.

However, it seems difficult to apply as same way.

Why? ISTM that's not so difficult.

I was not able to understand movement of walreceiver well.
While walreceiver writes data, do PQconsumeInput() by omitting the select().
Do flush if the PQgetCopyData has been to return the zero continuously.
Fixed to the same process using the flag.

You introduced the state machine using the flag "flush_flg" into pg_receivexlog.
That's complicated and would reduce the readability of the source code. I think
that the logic should be simpler like walreceiver's one.

Maybe I found one problematic path as follows:

1. WAL is written and flush_flag is set to 1
2. PQgetCopyData() returns 0 and flush_flg is incremented to 2
3. PQconsumeInput() is executed
4. PQgetCopyData() reads keepalive message
5. After processing keepalive message, PQgetCopyDate() returns 0
6. Since flush_flg is 2, WAL is flushed and flush_flg is reset to 0

But new message can arrive while processing keepalive message. Before
flushing WAL, such new message should be processed.

+        Enables synchronous mode. If replication slot is disabled then
+        this setting is irrelevant.

Why is that irrelevant in that case?

Even when replication slot is not used, thanks to this feature, pg_receivexlog
can flush WAL more proactively and which may improve the durability of WAL
which pg_receivexlog writes.

+ printf(_(" -m, --sync-mode synchronous mode\n"));

I think that calling this feature "synchronous mode" is confusing.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#7)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

You introduced the state machine using the flag "flush_flg" into
pg_receivexlog.
That's complicated and would reduce the readability of the source code.
I think that the logic should be simpler like walreceiver's one.

Maybe I found one problematic path as follows:

1. WAL is written and flush_flag is set to 1 2. PQgetCopyData() returns
0 and flush_flg is incremented to 2 3. PQconsumeInput() is executed 4.
PQgetCopyData() reads keepalive message 5. After processing keepalive
message, PQgetCopyDate() returns 0 6. Since flush_flg is 2, WAL is
flushed and flush_flg is reset to 0

But new message can arrive while processing keepalive message. Before
flushing WAL, such new message should be processed.

Together with the readability, fixed to the same process as the loop of walreceiver.

+        Enables synchronous mode. If replication slot is disabled then
+        this setting is irrelevant.

Why is that irrelevant in that case?

Even when replication slot is not used, thanks to this feature,
pg_receivexlog can flush WAL more proactively and which may improve the
durability of WAL which pg_receivexlog writes.

It's mean, report the flush position or not.
If the SLOT is not used, it is not reported.
Fixed to be reported only when using the SLOT.

+ printf(_(" -m, --sync-mode synchronous mode\n"));

I think that calling this feature "synchronous mode" is confusing.

Modified the "synchronous mode" to "this mode is written some records, flush them to disk.".

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-synchronous-mode-v4.patchapplication/octet-stream; name=pg_receivexlog-add-synchronous-mode-v4.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,78 ----
     as possible. To avoid this behavior, use the <literal>-n</literal>
     parameter.
    </para>
+ 
+   <para>
+    Synchronous mode offers the ability to confirm WAL have been streamed
+    in the same way as synchronous replication. To use synchronous mode, 
+    set up synchronous replication as described in
+    <xref linkend="synchronous-replication-config">, and set parameter(that is, -m and --slot).
+   </para>
   </refsect1>
  
   <refsect1>
***************
*** 106,111 **** PostgreSQL documentation
--- 113,130 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-m</option></term>
+       <term><option>--sync-mode</option></term>
+       <listitem>
+        <para>
+         Enables synchronous mode. Add to flush of the change timing of a WAL file. 
+         If we've written some records, flush them to disk.
+         We only report the flush position,when a slot has explicitly been used.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
  static char *basedir = NULL;
  static int	verbose = 0;
  static int	noloop = 0;
+ static int	syncmode = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
  static volatile bool time_to_abort = false;
  
***************
*** 62,67 **** usage(void)
--- 63,69 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -m, --sync-mode        this mode is written some records, flush them to disk.\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 332,338 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", syncmode);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"sync-mode", no_argument, NULL, 'm'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 392,398 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'm':
+ 				syncmode = 1;
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
--- 34,40 ----
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos, int syncmode);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 417,433 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * If 'syncmode' is not zero, synchronous mode. Flush is executed after all
+  * received WAL is written.We only report the flush position,when a slot 
+  * has explicitly been used.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int syncmode)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 572,578 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, syncmode);
  		if (res == NULL)
  			goto error;
  
***************
*** 717,724 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
  	return true;
  }
  
  /*
!  * The main loop of ReceiveXlogStream. Handles the COPY stream after
   * initiating streaming with the START_STREAMING command.
   *
   * If the COPY ends (not necessarily successfully) due a message from the
--- 721,808 ----
  	return true;
  }
  
+ 
+ static int
+ rcv_receive(bool timeout, char **copybuf, PGconn *conn, int standby_message_timeout, int64 last_status, int64 now)
+ {
+ 	int r;
+ 	
+ 	r = PQgetCopyData(conn, copybuf, 1);
+ 	if (r == 0)
+ 	{
+ 		if (timeout)
+ 		{
+ 			/*
+ 			 * No data available. Wait for some to appear, but not longer than
+ 			 * the specified timeout, so that we can ping the server.
+ 			 */
+ 			fd_set		input_mask;
+ 			struct timeval timeout;
+ 			struct timeval *timeoutptr;
+ 
+ 			FD_ZERO(&input_mask);
+ 			FD_SET(PQsocket(conn), &input_mask);
+ 			if (standby_message_timeout)
+ 			{
+ 				int64		targettime;
+ 				long		secs;
+ 				int			usecs;
+ 
+ 				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+ 				feTimestampDifference(now,
+ 									  targettime,
+ 									  &secs,
+ 									  &usecs);
+ 				if (secs <= 0)
+ 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ 				else
+ 					timeout.tv_sec = secs;
+ 				timeout.tv_usec = usecs;
+ 				timeoutptr = &timeout;
+ 			}
+ 			else
+ 				timeoutptr = NULL;
+ 
+ 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ 			if (r == 0 || (r < 0 && errno == EINTR))
+ 			{
+ 				/*
+ 				 * Got a timeout or signal. Continue the loop and either
+ 				 * deliver a status packet to the server or just go back into
+ 				 * blocking.
+ 				 */
+ 				return 0;
+ 			}
+ 			else if (r < 0)
+ 			{
+ 				fprintf(stderr, _("%s: select() failed: %s\n"),
+ 						progname, strerror(errno));
+ 				return -2;
+ 			}
+ 		}
+ 		/* Else there is actually data on the socket */
+ 		if (PQconsumeInput(conn) == 0)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: could not receive data from WAL stream: %s"),
+ 					progname, PQerrorMessage(conn));
+ 			return -2;
+ 		}
+ 		r = PQgetCopyData(conn, copybuf, 1);
+ 	}
+ 	if (r == -2)
+ 	{
+ 		fprintf(stderr, _("%s: could not read COPY data: %s"),
+ 				progname, PQerrorMessage(conn));
+ 	}
+ 
+ 	/* Return received messages to caller */
+ 	return r;
+ 
+ }
+ 
  /*
!  * The main loop of ReceiveXLogStream. Handles the COPY stream after
   * initiating streaming with the START_STREAMING command.
   *
   * If the COPY ends (not necessarily successfully) due a message from the
***************
*** 729,735 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 813,819 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int syncmode)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 784,850 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			last_status = now;
  		}
  
! 		r = PQgetCopyData(conn, &copybuf, 1);
! 		if (r == 0)
! 		{
! 			/*
! 			 * No data available. Wait for some to appear, but not longer than
! 			 * the specified timeout, so that we can ping the server.
! 			 */
! 			fd_set		input_mask;
! 			struct timeval timeout;
! 			struct timeval *timeoutptr;
  
! 			FD_ZERO(&input_mask);
! 			FD_SET(PQsocket(conn), &input_mask);
! 			if (standby_message_timeout && still_sending)
  			{
! 				int64		targettime;
! 				long		secs;
! 				int			usecs;
  
- 				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
- 				feTimestampDifference(now,
- 									  targettime,
- 									  &secs,
- 									  &usecs);
- 				if (secs <= 0)
- 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
- 				else
- 					timeout.tv_sec = secs;
- 				timeout.tv_usec = usecs;
- 				timeoutptr = &timeout;
- 			}
- 			else
- 				timeoutptr = NULL;
- 
- 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
- 			if (r == 0 || (r < 0 && errno == EINTR))
- 			{
  				/*
! 				 * Got a timeout or signal. Continue the loop and either
! 				 * deliver a status packet to the server or just go back into
! 				 * blocking.
  				 */
! 				continue;
  			}
! 			else if (r < 0)
  			{
! 				fprintf(stderr, _("%s: select() failed: %s\n"),
! 						progname, strerror(errno));
! 				goto error;
  			}
! 			/* Else there is actually data on the socket */
! 			if (PQconsumeInput(conn) == 0)
  			{
! 				fprintf(stderr,
! 						_("%s: could not receive data from WAL stream: %s"),
! 						progname, PQerrorMessage(conn));
  				goto error;
  			}
! 			continue;
  		}
! 		if (r == -1)
  		{
  			PGresult   *res = PQgetResult(conn);
  
--- 868,1043 ----
  			last_status = now;
  		}
  
! 		r = rcv_receive(true , &copybuf, conn, standby_message_timeout, last_status, now);
  
! 		while(r > 0)
! 		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
  			{
! 				int			pos;
! 				bool		replyRequested;
  
  				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
  				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
! 						goto error;
! 					last_status = now;
! 				}
  			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					continue;
! 
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				blockpos = fe_recvint64(&copybuf[1]);
! 
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
! 				 */
! 				if (walfile == -1)
! 				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
! 					{
! 						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
! 						goto error;
! 					}
! 				}
! 				else
! 				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! 						goto error;
! 					}
! 				}
! 
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
! 
! 				while (bytes_left)
! 				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
! 
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
! 						goto error;
! 					}
! 
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
! 
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
! 					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
! 							goto error;
! 
! 						xlogoff = 0;
! 
! 						if (still_sending && stream_stop(blockpos, timeline, false))
! 						{
! 							if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 							{
! 								fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 										progname, PQerrorMessage(conn));
! 								goto error;
! 							}
! 							still_sending = false;
! 							break;	/* ignore the rest of this XLogData packet */
! 						}
! 					}
! 				}
! 				/* No more data left to write, receive next copy packet */
  			}
! 			else
  			{
! 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 						progname, copybuf[0]);
  				goto error;
  			}
! 			r = rcv_receive(false , &copybuf, conn, standby_message_timeout, last_status, now);
  		}
! 
! 		if(r == -1)
  		{
  			PGresult   *res = PQgetResult(conn);
  
***************
*** 880,1054 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			*stoppos = blockpos;
  			return res;
  		}
- 		if (r == -2)
- 		{
- 			fprintf(stderr, _("%s: could not read COPY data: %s"),
- 					progname, PQerrorMessage(conn));
- 			goto error;
- 		}
- 
- 		/* Check the message type. */
- 		if (copybuf[0] == 'k')
- 		{
- 			int			pos;
- 			bool		replyRequested;
- 
- 			/*
- 			 * Parse the keepalive message, enclosed in the CopyData message.
- 			 * We just check if the server requested a reply, and ignore the
- 			 * rest.
- 			 */
- 			pos = 1;			/* skip msgtype 'k' */
- 			pos += 8;			/* skip walEnd */
- 			pos += 8;			/* skip sendTime */
  
- 			if (r < pos + 1)
- 			{
- 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
- 						progname, r);
- 				goto error;
- 			}
- 			replyRequested = copybuf[pos];
  
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
! 			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
  		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
! 
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
  			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
  				goto error;
  			}
! 			blockpos = fe_recvint64(&copybuf[1]);
! 
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
! 				}
! 			}
! 			else
! 			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! 					goto error;
! 				}
! 			}
! 
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
! 
! 			while (bytes_left)
! 			{
! 				int			bytes_to_write;
! 
! 				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
! 				 */
! 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 				else
! 					bytes_to_write = bytes_left;
! 
! 				if (walfile == -1)
! 				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
! 					{
! 						/* Error logged by open_walfile */
! 						goto error;
! 					}
! 				}
! 
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
! 				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
! 					goto error;
! 				}
! 
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
! 
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
! 				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
! 						goto error;
! 
! 					xlogoff = 0;
! 
! 					if (still_sending && stream_stop(blockpos, timeline, false))
! 					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
! 							goto error;
! 						}
! 						still_sending = false;
! 						break;	/* ignore the rest of this XLogData packet */
! 					}
! 				}
! 			}
! 			/* No more data left to write, receive next copy packet */
! 		}
! 		else
! 		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
! 			goto error;
  		}
  	}
  
  error:
--- 1073,1093 ----
  			*stoppos = blockpos;
  			return res;
  		}
  
  
! 		if (walfile != -1 && lastFlushPosition < blockpos && syncmode)
  		{
! 			if (fsync(walfile) != 0)
  			{
! 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 						progname, current_walfile_name, strerror(errno));
  				goto error;
  			}
! 			lastFlushPosition = blockpos;
! 			if (!sendFeedback(conn, blockpos, now, false))
! 				goto error;
  		}
+ 
  	}
  
  error:
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int syncmode);
#9Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#8)
Re: pg_receivexlog add synchronous mode

On Mon, Jun 16, 2014 at 7:03 PM, <furuyao@pm.nttdata.co.jp> wrote:

You introduced the state machine using the flag "flush_flg" into
pg_receivexlog.
That's complicated and would reduce the readability of the source code.
I think that the logic should be simpler like walreceiver's one.

Maybe I found one problematic path as follows:

1. WAL is written and flush_flag is set to 1 2. PQgetCopyData() returns
0 and flush_flg is incremented to 2 3. PQconsumeInput() is executed 4.
PQgetCopyData() reads keepalive message 5. After processing keepalive
message, PQgetCopyDate() returns 0 6. Since flush_flg is 2, WAL is
flushed and flush_flg is reset to 0

But new message can arrive while processing keepalive message. Before
flushing WAL, such new message should be processed.

Together with the readability, fixed to the same process as the loop of walreceiver.

+        Enables synchronous mode. If replication slot is disabled then
+        this setting is irrelevant.

Why is that irrelevant in that case?

Even when replication slot is not used, thanks to this feature,
pg_receivexlog can flush WAL more proactively and which may improve the
durability of WAL which pg_receivexlog writes.

It's mean, report the flush position or not.
If the SLOT is not used, it is not reported.
Fixed to be reported only when using the SLOT.

+ printf(_(" -m, --sync-mode synchronous mode\n"));

I think that calling this feature "synchronous mode" is confusing.

Modified the "synchronous mode" to "this mode is written some records, flush them to disk.".

I found that this patch breaks --status-interval option of pg_receivexlog
when -m option which the patch introduced is supplied. When -m is set,
pg_receivexlog tries to send the feedback message as soon as it flushes
WAL file even if status interval timeout has not been passed yet. If you
want to send the feedback as soon as WAL is written or flushed, like
walreceiver does, you need to extend --status-interval option, for example,
so that it accepts the value "-1" which means enabling that behavior.

Including this change in your original patch would make it more difficult
to review. I think that you should implement this as separate patch.
Thought?

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#9)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

I found that this patch breaks --status-interval option of
pg_receivexlog when -m option which the patch introduced is supplied.
When -m is set, pg_receivexlog tries to send the feedback message as soon
as it flushes WAL file even if status interval timeout has not been passed
yet. If you want to send the feedback as soon as WAL is written or flushed,
like walreceiver does, you need to extend --status-interval option, for
example, so that it accepts the value "-1" which means enabling that
behavior.

Including this change in your original patch would make it more difficult
to review. I think that you should implement this as separate patch.
Thought?

As your comments, the current specification to ignore the --status-intarvall.
It is necessary to respond immediately to synchronize.

It is necessary to think about specifications the --status-intarvall.
So I revised it to a patch of flushmode which performed flush by a timing same as walreceiver.

A changed part deletes the feedback message after flush, and transmitted the feedback message according to the status interval.
Change to flushmode from syncmode the mode name, and fixed the document.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-flush-mode-v1.patchapplication/octet-stream; name=pg_receivexlog-add-flush-mode-v1.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,76 ----
     as possible. To avoid this behavior, use the <literal>-n</literal>
     parameter.
    </para>
+ 
+   <para>
+    Flush mode offers the ability to we've written some records, 
+    flush them to disk. To use flush mode, set parameter(that is, -m).
+   </para>
   </refsect1>
  
   <refsect1>
***************
*** 106,111 **** PostgreSQL documentation
--- 111,127 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-m</option></term>
+       <term><option>--flush-mode</option></term>
+       <listitem>
+        <para>
+         Enables flush mode. Add to flush of the change timing of a WAL file. 
+         If we've written some records, flush them to disk.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
  static char *basedir = NULL;
  static int	verbose = 0;
  static int	noloop = 0;
+ static int	flushmode = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
  static volatile bool time_to_abort = false;
  
***************
*** 62,67 **** usage(void)
--- 63,69 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -m, --flush-mode       this mode is written some records, flush them to disk.\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 332,338 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", flushmode);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"flush-mode", no_argument, NULL, 'm'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 392,398 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'm':
+ 				flushmode = 1;
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
--- 34,40 ----
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos, int flushmode);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 417,432 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * If 'flushmode' is not zero, flush mode. Flush is executed after all
+  * received WAL is written.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int flushmode)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 571,577 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, flushmode);
  		if (res == NULL)
  			goto error;
  
***************
*** 717,724 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
  	return true;
  }
  
  /*
!  * The main loop of ReceiveXlogStream. Handles the COPY stream after
   * initiating streaming with the START_STREAMING command.
   *
   * If the COPY ends (not necessarily successfully) due a message from the
--- 720,823 ----
  	return true;
  }
  
+ 
  /*
!  * Receive a message available from XLOG stream, blocking for
!  * maximum of 'timeout' ms.
!  *
!  * Returns:
!  *
!  *	 If data was received, returns the length of the data. **copybuf is set to
!  *	 point to a buffer holding the received message. 
!  *
!  *	 0 if no data was available within timeout, or wait was interrupted
!  *	 by signal.
!  *
!  *	 -1 if the server ended the COPY.
!  *
!  *	 -2 if on error.
!  */
! static int
! rcv_receive(bool timeout, char **copybuf, PGconn *conn, int standby_message_timeout, int64 last_status, int64 now)
! {
! 	int r;
! 	
! 	r = PQgetCopyData(conn, copybuf, 1);
! 	if (r == 0)
! 	{
! 		if (timeout)
! 		{
! 			/*
! 			 * No data available. Wait for some to appear, but not longer than
! 			 * the specified timeout, so that we can ping the server.
! 			 */
! 			fd_set		input_mask;
! 			struct timeval timeout;
! 			struct timeval *timeoutptr;
! 
! 			FD_ZERO(&input_mask);
! 			FD_SET(PQsocket(conn), &input_mask);
! 			if (standby_message_timeout)
! 			{
! 				int64		targettime;
! 				long		secs;
! 				int			usecs;
! 
! 				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
! 				feTimestampDifference(now,
! 									  targettime,
! 									  &secs,
! 									  &usecs);
! 				if (secs <= 0)
! 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
! 				else
! 					timeout.tv_sec = secs;
! 				timeout.tv_usec = usecs;
! 				timeoutptr = &timeout;
! 			}
! 			else
! 				timeoutptr = NULL;
! 
! 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
! 			if (r == 0 || (r < 0 && errno == EINTR))
! 			{
! 				/*
! 				 * Got a timeout or signal. Continue the loop and either
! 				 * deliver a status packet to the server or just go back into
! 				 * blocking.
! 				 */
! 				return 0;
! 			}
! 			else if (r < 0)
! 			{
! 				fprintf(stderr, _("%s: select() failed: %s\n"),
! 						progname, strerror(errno));
! 				return -2;
! 			}
! 		}
! 		/* Else there is actually data on the socket */
! 		if (PQconsumeInput(conn) == 0)
! 		{
! 			fprintf(stderr,
! 					_("%s: could not receive data from WAL stream: %s"),
! 					progname, PQerrorMessage(conn));
! 			return -2;
! 		}
! 		r = PQgetCopyData(conn, copybuf, 1);
! 	}
! 	if (r == -2)
! 	{
! 		fprintf(stderr, _("%s: could not read COPY data: %s"),
! 				progname, PQerrorMessage(conn));
! 	}
! 
! 	/* Return received messages to caller */
! 	return r;
! 
! }
! 
! /*
!  * The main loop of ReceiveXLogStream. Handles the COPY stream after
   * initiating streaming with the START_STREAMING command.
   *
   * If the COPY ends (not necessarily successfully) due a message from the
***************
*** 729,735 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 828,834 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int flushmode)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 784,850 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			last_status = now;
  		}
  
! 		r = PQgetCopyData(conn, &copybuf, 1);
! 		if (r == 0)
! 		{
! 			/*
! 			 * No data available. Wait for some to appear, but not longer than
! 			 * the specified timeout, so that we can ping the server.
! 			 */
! 			fd_set		input_mask;
! 			struct timeval timeout;
! 			struct timeval *timeoutptr;
  
! 			FD_ZERO(&input_mask);
! 			FD_SET(PQsocket(conn), &input_mask);
! 			if (standby_message_timeout && still_sending)
  			{
! 				int64		targettime;
! 				long		secs;
! 				int			usecs;
! 
! 				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
! 				feTimestampDifference(now,
! 									  targettime,
! 									  &secs,
! 									  &usecs);
! 				if (secs <= 0)
! 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
! 				else
! 					timeout.tv_sec = secs;
! 				timeout.tv_usec = usecs;
! 				timeoutptr = &timeout;
! 			}
! 			else
! 				timeoutptr = NULL;
  
- 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
- 			if (r == 0 || (r < 0 && errno == EINTR))
- 			{
  				/*
! 				 * Got a timeout or signal. Continue the loop and either
! 				 * deliver a status packet to the server or just go back into
! 				 * blocking.
  				 */
! 				continue;
  			}
! 			else if (r < 0)
  			{
! 				fprintf(stderr, _("%s: select() failed: %s\n"),
! 						progname, strerror(errno));
! 				goto error;
  			}
! 			/* Else there is actually data on the socket */
! 			if (PQconsumeInput(conn) == 0)
  			{
! 				fprintf(stderr,
! 						_("%s: could not receive data from WAL stream: %s"),
! 						progname, PQerrorMessage(conn));
  				goto error;
  			}
! 			continue;
  		}
! 		if (r == -1)
  		{
  			PGresult   *res = PQgetResult(conn);
  
--- 883,1058 ----
  			last_status = now;
  		}
  
! 		r = rcv_receive(true , &copybuf, conn, standby_message_timeout, last_status, now);
  
! 		while(r > 0)
! 		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
  			{
! 				int			pos;
! 				bool		replyRequested;
  
  				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
  				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
! 						goto error;
! 					last_status = now;
! 				}
  			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					continue;
! 
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				blockpos = fe_recvint64(&copybuf[1]);
! 
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
! 				 */
! 				if (walfile == -1)
! 				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
! 					{
! 						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
! 						goto error;
! 					}
! 				}
! 				else
! 				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! 						goto error;
! 					}
! 				}
! 
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
! 
! 				while (bytes_left)
! 				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
! 
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
! 						goto error;
! 					}
! 
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
! 
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
! 					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
! 							goto error;
! 
! 						xlogoff = 0;
! 
! 						if (still_sending && stream_stop(blockpos, timeline, false))
! 						{
! 							if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 							{
! 								fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 										progname, PQerrorMessage(conn));
! 								goto error;
! 							}
! 							still_sending = false;
! 							break;	/* ignore the rest of this XLogData packet */
! 						}
! 					}
! 				}
! 				/* No more data left to write, receive next copy packet */
  			}
! 			else
  			{
! 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 						progname, copybuf[0]);
  				goto error;
  			}
! 			r = rcv_receive(false , &copybuf, conn, standby_message_timeout, last_status, now);
  		}
! 
! 		if(r == -1)
  		{
  			PGresult   *res = PQgetResult(conn);
  
***************
*** 880,1053 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			*stoppos = blockpos;
  			return res;
  		}
- 		if (r == -2)
- 		{
- 			fprintf(stderr, _("%s: could not read COPY data: %s"),
- 					progname, PQerrorMessage(conn));
- 			goto error;
- 		}
  
! 		/* Check the message type. */
! 		if (copybuf[0] == 'k')
  		{
! 			int			pos;
! 			bool		replyRequested;
! 
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
! 
! 			if (r < pos + 1)
  			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
  				goto error;
  			}
! 			replyRequested = copybuf[pos];
! 
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
! 			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
! 		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
! 
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			blockpos = fe_recvint64(&copybuf[1]);
! 
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
! 				}
! 			}
! 			else
! 			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! 					goto error;
! 				}
! 			}
! 
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
! 
! 			while (bytes_left)
! 			{
! 				int			bytes_to_write;
! 
! 				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
! 				 */
! 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 				else
! 					bytes_to_write = bytes_left;
! 
! 				if (walfile == -1)
! 				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
! 					{
! 						/* Error logged by open_walfile */
! 						goto error;
! 					}
! 				}
! 
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
! 				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
! 					goto error;
! 				}
! 
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
! 
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
! 				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
! 						goto error;
! 
! 					xlogoff = 0;
! 
! 					if (still_sending && stream_stop(blockpos, timeline, false))
! 					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
! 							goto error;
! 						}
! 						still_sending = false;
! 						break;	/* ignore the rest of this XLogData packet */
! 					}
! 				}
! 			}
! 			/* No more data left to write, receive next copy packet */
! 		}
! 		else
! 		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
! 			goto error;
  		}
  	}
  
--- 1088,1103 ----
  			*stoppos = blockpos;
  			return res;
  		}
  
! 		if (walfile != -1 && lastFlushPosition < blockpos && flushmode)
  		{
! 			if (fsync(walfile) != 0)
  			{
! 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 						progname, current_walfile_name, strerror(errno));
  				goto error;
  			}
! 			lastFlushPosition = blockpos;
  		}
  	}
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int flushmode);
#11Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#10)
Re: pg_receivexlog add synchronous mode

On Tue, Jun 24, 2014 at 3:18 PM, <furuyao@pm.nttdata.co.jp> wrote:

I found that this patch breaks --status-interval option of
pg_receivexlog when -m option which the patch introduced is supplied.
When -m is set, pg_receivexlog tries to send the feedback message as soon
as it flushes WAL file even if status interval timeout has not been passed
yet. If you want to send the feedback as soon as WAL is written or flushed,
like walreceiver does, you need to extend --status-interval option, for
example, so that it accepts the value "-1" which means enabling that
behavior.

Including this change in your original patch would make it more difficult
to review. I think that you should implement this as separate patch.
Thought?

As your comments, the current specification to ignore the --status-intarvall.
It is necessary to respond immediately to synchronize.

It is necessary to think about specifications the --status-intarvall.
So I revised it to a patch of flushmode which performed flush by a timing same as walreceiver.

I'm not sure if it's good idea to call the feature which you'd like to
add as 'flush mode'.
ISTM that 'flush mode' is vague and confusion for users. Instead, what
about adding
something like --fsync-interval which pg_recvlogical supports?

A changed part deletes the feedback message after flush, and transmitted the feedback message according to the status interval.
Change to flushmode from syncmode the mode name, and fixed the document.

+ * Receive a message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.

The above comment seems incorrect because 'timeout' is boolean argument.

+            FD_ZERO(&input_mask);
+            FD_SET(PQsocket(conn), &input_mask);
+            if (standby_message_timeout)

Why did you get rid of the check of 'still_sending' flag here? Originally the
flag was checked but not in the patch.

+ r = rcv_receive(true , &copybuf, conn,
standby_message_timeout, last_status, now);

When the return value is -2 (i.e., an error happend), we should go to
the 'error' label.

ISTM that stream_stop() should be called every time a message is
processed. But the
patch changes pg_receivexlog so that it keeps processing the received
data without
calling stream_stop(). This seems incorrect.

'copybuf' needs to be free'd every time new message is received. But you seem to
have forgotten to do that when rcv_receive() with no timeout is called.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#11)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

On Wed, Jun 25, 2014 at 3:50 AM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Jun 24, 2014 at 3:18 PM, <furuyao@pm.nttdata.co.jp> wrote:

I found that this patch breaks --status-interval option of
pg_receivexlog when -m option which the patch introduced is supplied.
When -m is set, pg_receivexlog tries to send the feedback message as soon
as it flushes WAL file even if status interval timeout has not been passed
yet. If you want to send the feedback as soon as WAL is written or flushed,
like walreceiver does, you need to extend --status-interval option, for
example, so that it accepts the value "-1" which means enabling that
behavior.

Including this change in your original patch would make it more difficult
to review. I think that you should implement this as separate patch.
Thought?

As your comments, the current specification to ignore the --status-intarvall.
It is necessary to respond immediately to synchronize.

It is necessary to think about specifications the --status-intarvall.
So I revised it to a patch of flushmode which performed flush by a timing same as walreceiver.

I'm not sure if it's good idea to call the feature which you'd like to
add as 'flush mode'.
ISTM that 'flush mode' is vague and confusion for users. Instead, what
about adding
something like --fsync-interval which pg_recvlogical supports?

A changed part deletes the feedback message after flush, and transmitted the feedback message according to the status interval.
Change to flushmode from syncmode the mode name, and fixed the document.

+ * Receive a message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.

The above comment seems incorrect because 'timeout' is boolean argument.

+            FD_ZERO(&input_mask);
+            FD_SET(PQsocket(conn), &input_mask);
+            if (standby_message_timeout)

Why did you get rid of the check of 'still_sending' flag here? Originally the
flag was checked but not in the patch.

+ r = rcv_receive(true , &copybuf, conn,
standby_message_timeout, last_status, now);

When the return value is -2 (i.e., an error happend), we should go to
the 'error' label.

ISTM that stream_stop() should be called every time a message is
processed. But the
patch changes pg_receivexlog so that it keeps processing the received
data without
calling stream_stop(). This seems incorrect.

'copybuf' needs to be free'd every time new message is received. But you seem to
have forgotten to do that when rcv_receive() with no timeout is called.

The patch looks somewhat complicated and bugs can be easily introduced
because it tries to not only add new feature but also reorganize
the main loop in HandleCopyStream at the same time. To keep the patch
simple, I'm thinking to firstly apply the attached patch which just
refactors the main loop. Then we can apply the main patch, i.e., add new
feature. Thought?

Regards,

--
Fujii Masao

Attachments:

refactor_receivelog_v1.patchtext/x-patch; charset=US-ASCII; name=refactor_receivelog_v1.patchDownload
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d76e605..1182dc7 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
 				 char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
-
-		if (copybuf != NULL)
-		{
-			PQfreemem(copybuf);
-			copybuf = NULL;
-		}
+		long		sleeptime;
 
 		/*
 		 * Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,34 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			last_status = now;
 		}
 
-		r = PQgetCopyData(conn, &copybuf, 1);
-		if (r == 0)
+		/*
+		 * Compute how long send/receive loops should sleep
+		 */
+		if (standby_message_timeout && still_sending)
 		{
-			/*
-			 * No data available. Wait for some to appear, but not longer than
-			 * the specified timeout, so that we can ping the server.
-			 */
-			fd_set		input_mask;
-			struct timeval timeout;
-			struct timeval *timeoutptr;
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-			if (standby_message_timeout && still_sending)
-			{
-				int64		targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-			else
-				timeoutptr = NULL;
+			int64		targettime;
+			long		secs;
+			int			usecs;
+
+			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+			feTimestampDifference(now,
+								  targettime,
+								  &secs,
+								  &usecs);
+			if (secs <= 0)
+				secs = 1;	/* Always sleep at least 1 sec */
+
+			sleeptime = secs * 1000 + usecs / 1000;
+		}
+		else
+			sleeptime = -1;
 
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
-			{
-				/*
-				 * Got a timeout or signal. Continue the loop and either
-				 * deliver a status packet to the server or just go back into
-				 * blocking.
-				 */
-				continue;
-			}
-			else if (r < 0)
-			{
-				fprintf(stderr, _("%s: select() failed: %s\n"),
-						progname, strerror(errno));
-				goto error;
-			}
-			/* Else there is actually data on the socket */
-			if (PQconsumeInput(conn) == 0)
-			{
-				fprintf(stderr,
-						_("%s: could not receive data from WAL stream: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
+		r = CopyStreamReceive(conn, sleeptime, &copybuf);
+		if (r == 0)
 			continue;
-		}
 		if (r == -1)
+			goto error;
+		if (r == -2)
 		{
 			PGresult   *res = PQgetResult(conn);
 
@@ -877,15 +841,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			}
 			if (copybuf != NULL)
 				PQfreemem(copybuf);
+			copybuf = NULL;
 			*stoppos = blockpos;
 			return res;
 		}
-		if (r == -2)
-		{
-			fprintf(stderr, _("%s: could not read COPY data: %s"),
-					progname, PQerrorMessage(conn));
-			goto error;
-		}
 
 		/* Check the message type. */
 		if (copybuf[0] == 'k')
@@ -1056,3 +1015,115 @@ error:
 		PQfreemem(copybuf);
 	return NULL;
 }
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+	int			ret;
+	fd_set		input_mask;
+	struct timeval timeout;
+	struct timeval *timeoutptr;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr, _("%s: socket not open"), progname);
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	if (timeout_ms < 0)
+		timeoutptr = NULL;
+	else
+	{
+		timeout.tv_sec = timeout_ms / 1000L;
+		timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+		timeoutptr = &timeout;
+	}
+
+	ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+	if (ret == 0 || (ret < 0 && errno == EINTR))
+		return 0;		/* Got a timeout or signal */
+	else if (ret < 0)
+	{
+		fprintf(stderr, _("%s: select() failed: %s\n"),
+				progname, strerror(errno));
+		return -1;
+	}
+
+	return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+	static char	   *copybuf = NULL;
+	int			rawlen;
+
+	if (copybuf != NULL)
+		PQfreemem(copybuf);
+	copybuf = NULL;
+	*buffer = NULL;
+
+	/* Try to receive a CopyData message */
+	rawlen = PQgetCopyData(conn, &copybuf, 1);
+	if (rawlen == 0)
+	{
+		/*
+		 * No data available. Wait for some to appear, but not longer than
+		 * the specified timeout, so that we can ping the server.
+		 */
+		if (timeout > 0)
+		{
+			int		ret;
+
+			ret = CopyStreamPoll(conn, timeout);
+			if (ret <= 0)
+				return ret;
+		}
+
+		/* Else there is actually data on the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			fprintf(stderr,
+					_("%s: could not receive data from WAL stream: %s"),
+					progname, PQerrorMessage(conn));
+			return -1;
+		}
+
+		/* Now that we've consumed some input, try again */
+		rawlen = PQgetCopyData(conn, &copybuf, 1);
+		if (rawlen == 0)
+			return 0;
+	}
+	if (rawlen == -1)			/* end-of-streaming or error */
+		return -2;
+	if (rawlen == -2)
+	{
+		fprintf(stderr, _("%s: could not read COPY data: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	/* Return received messages to caller */
+	*buffer = copybuf;
+	return rawlen;
+}
#13Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#12)
Re: pg_receivexlog add synchronous mode

The patch looks somewhat complicated and bugs can be easily introduced
because it tries to not only add new feature but also reorganize the main
loop in HandleCopyStream at the same time. To keep the patch simple, I'm
thinking to firstly apply the attached patch which just refactors the
main loop. Then we can apply the main patch, i.e., add new feature.
Thought?

Thank you for the refactoring patch.
I did a review of the patch.
As a result, I found the calculation of sleeptime when the --status-intarvall is set to 1 was incorrect.

--status-intarvall 1 --> sleeptime 1.9999 !?
--status-intarvall 2 --> sleeptime 1.9999 OK
--status-intarvall 3 --> sleeptime 2.9999 OK

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#13)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

On Thu, Jun 26, 2014 at 7:01 PM, <furuyao@pm.nttdata.co.jp> wrote:

The patch looks somewhat complicated and bugs can be easily introduced
because it tries to not only add new feature but also reorganize the main
loop in HandleCopyStream at the same time. To keep the patch simple, I'm
thinking to firstly apply the attached patch which just refactors the
main loop. Then we can apply the main patch, i.e., add new feature.
Thought?

Thank you for the refactoring patch.
I did a review of the patch.
As a result, I found the calculation of sleeptime when the --status-intarvall is set to 1 was incorrect.

--status-intarvall 1 --> sleeptime 1.9999 !?
--status-intarvall 2 --> sleeptime 1.9999 OK
--status-intarvall 3 --> sleeptime 2.9999 OK

Thanks for the review!

+            if (secs <= 0)
+                secs = 1;    /* Always sleep at least 1 sec */
+
+            sleeptime = secs * 1000 + usecs / 1000;

The above is the code which caused that problem. 'usecs' should have been
reset to zero when 'secs' are rounded up to 1 second. But not. Attached is
the updated version of the patch.

Regards,

--
Fujii Masao

Attachments:

refactor_receivelog_v2.patchtext/x-diff; charset=US-ASCII; name=refactor_receivelog_v2.patchDownload
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d76e605..4aa35da 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
 				 char *partial_suffix, XLogRecPtr *stoppos);
+static int CopyStreamPoll(PGconn *conn, long timeout_ms);
+static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
-
-		if (copybuf != NULL)
-		{
-			PQfreemem(copybuf);
-			copybuf = NULL;
-		}
+		long		sleeptime;
 
 		/*
 		 * Check if we should continue streaming, or abort at this point.
@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			last_status = now;
 		}
 
-		r = PQgetCopyData(conn, &copybuf, 1);
-		if (r == 0)
+		/*
+		 * Compute how long send/receive loops should sleep
+		 */
+		if (standby_message_timeout && still_sending)
 		{
-			/*
-			 * No data available. Wait for some to appear, but not longer than
-			 * the specified timeout, so that we can ping the server.
-			 */
-			fd_set		input_mask;
-			struct timeval timeout;
-			struct timeval *timeoutptr;
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-			if (standby_message_timeout && still_sending)
+			int64		targettime;
+			long		secs;
+			int			usecs;
+
+			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+			feTimestampDifference(now,
+								  targettime,
+								  &secs,
+								  &usecs);
+			/* Always sleep at least 1 sec */
+			if (secs <= 0)
 			{
-				int64		targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
+				secs = 1;
+				usecs = 0;
 			}
-			else
-				timeoutptr = NULL;
 
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
-			{
-				/*
-				 * Got a timeout or signal. Continue the loop and either
-				 * deliver a status packet to the server or just go back into
-				 * blocking.
-				 */
-				continue;
-			}
-			else if (r < 0)
-			{
-				fprintf(stderr, _("%s: select() failed: %s\n"),
-						progname, strerror(errno));
-				goto error;
-			}
-			/* Else there is actually data on the socket */
-			if (PQconsumeInput(conn) == 0)
-			{
-				fprintf(stderr,
-						_("%s: could not receive data from WAL stream: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
-			continue;
+			sleeptime = secs * 1000 + usecs / 1000;
 		}
+		else
+			sleeptime = -1;
+
+		r = CopyStreamReceive(conn, sleeptime, &copybuf);
+		if (r == 0)
+			continue;
 		if (r == -1)
+			goto error;
+		if (r == -2)
 		{
 			PGresult   *res = PQgetResult(conn);
 
@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			}
 			if (copybuf != NULL)
 				PQfreemem(copybuf);
+			copybuf = NULL;
 			*stoppos = blockpos;
 			return res;
 		}
-		if (r == -2)
-		{
-			fprintf(stderr, _("%s: could not read COPY data: %s"),
-					progname, PQerrorMessage(conn));
-			goto error;
-		}
 
 		/* Check the message type. */
 		if (copybuf[0] == 'k')
@@ -1056,3 +1019,115 @@ error:
 		PQfreemem(copybuf);
 	return NULL;
 }
+
+/*
+ * Wait until we can read CopyData message, or timeout.
+ *
+ * Returns 1 if data has become available for reading, 0 if timed out
+ * or interrupted by signal, and -1 on an error.
+ */
+static int
+CopyStreamPoll(PGconn *conn, long timeout_ms)
+{
+	int			ret;
+	fd_set		input_mask;
+	struct timeval timeout;
+	struct timeval *timeoutptr;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr, _("%s: socket not open"), progname);
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	if (timeout_ms < 0)
+		timeoutptr = NULL;
+	else
+	{
+		timeout.tv_sec = timeout_ms / 1000L;
+		timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
+		timeoutptr = &timeout;
+	}
+
+	ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+	if (ret == 0 || (ret < 0 && errno == EINTR))
+		return 0;		/* Got a timeout or signal */
+	else if (ret < 0)
+	{
+		fprintf(stderr, _("%s: select() failed: %s\n"),
+				progname, strerror(errno));
+		return -1;
+	}
+
+	return 1;
+}
+
+/*
+ * Receive CopyData message available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next CopyStreamReceive call.
+ *
+ * 0 if no data was available within timeout, or wait was interrupted
+ * by signal. -1 on error. -2 if the server ended the COPY.
+ */
+static int
+CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
+{
+	static char	   *copybuf = NULL;
+	int			rawlen;
+
+	if (copybuf != NULL)
+		PQfreemem(copybuf);
+	copybuf = NULL;
+	*buffer = NULL;
+
+	/* Try to receive a CopyData message */
+	rawlen = PQgetCopyData(conn, &copybuf, 1);
+	if (rawlen == 0)
+	{
+		/*
+		 * No data available. Wait for some to appear, but not longer than
+		 * the specified timeout, so that we can ping the server.
+		 */
+		if (timeout > 0)
+		{
+			int		ret;
+
+			ret = CopyStreamPoll(conn, timeout);
+			if (ret <= 0)
+				return ret;
+		}
+
+		/* Else there is actually data on the socket */
+		if (PQconsumeInput(conn) == 0)
+		{
+			fprintf(stderr,
+					_("%s: could not receive data from WAL stream: %s"),
+					progname, PQerrorMessage(conn));
+			return -1;
+		}
+
+		/* Now that we've consumed some input, try again */
+		rawlen = PQgetCopyData(conn, &copybuf, 1);
+		if (rawlen == 0)
+			return 0;
+	}
+	if (rawlen == -1)			/* end-of-streaming or error */
+		return -2;
+	if (rawlen == -2)
+	{
+		fprintf(stderr, _("%s: could not read COPY data: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	/* Return received messages to caller */
+	*buffer = copybuf;
+	return rawlen;
+}
#15Andres Freund
andres@2ndquadrant.com
In reply to: Noname (#3)
Re: pg_receivexlog add synchronous mode

On 2014-06-05 19:13:34 +0900, furuyao@pm.nttdata.co.jp wrote:

-----Original Message-----
Hi,

On 2014-06-05 17:09:44 +0900, furuyao@pm.nttdata.co.jp wrote:

Synchronous(synchronous_commit = on) mode offers the ability to

confirm WAL have been streamed in the same way as synchronous
replication.

If an output is used as a different disk from the directory where the

transaction log should be stored.

Prevent the loss of data due to disk failure.

the additional parameter(-m) and replicationslot specify, that its

synchronous mode.

All received WAL write after, flush is executed and reply flush
position.

What's the usecase for this? I can see some benefit in easier testing
of syncrep, but that's basically it?

When used with syncrep, standby server crashes, multiplexing of WAL can be collateral.
Data loss can be to nearly zero.

I don't see how this gets pg_receivexlog much closer to a solution for
multiplexing WAL. Since there's no support for streaming data, removing
old WAL and such it seems to me you'd need to have something entirely
different anyway?
I'm not really averse to adding this feature (on the basis of easier
syncrep testing alone), but I don't like the arguments for it so far...

Greetings,

Andres Freund

--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#16Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#14)
Re: pg_receivexlog add synchronous mode

Thanks for the review!

+            if (secs <= 0)
+                secs = 1;    /* Always sleep at least 1 sec */
+
+            sleeptime = secs * 1000 + usecs / 1000;

The above is the code which caused that problem. 'usecs' should have been
reset to zero when 'secs' are rounded up to 1 second. But not. Attached
is the updated version of the patch.

Thank you for the refactoring v2 patch.
I did a review of the patch.

1. applied cleanly and compilation was without warnings and errors
2. all regress tests was passed ok
3. sleeptime is ok when the --status-intarvall is set to 1

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Noname
furuyao@pm.nttdata.co.jp
In reply to: Andres Freund (#15)
Re: pg_receivexlog add synchronous mode

Synchronous(synchronous_commit = on) mode offers the ability to

confirm WAL have been streamed in the same way as synchronous
replication.

If an output is used as a different disk from the directory where
the

transaction log should be stored.

Prevent the loss of data due to disk failure.

the additional parameter(-m) and replicationslot specify, that its

synchronous mode.

All received WAL write after, flush is executed and reply flush
position.

What's the usecase for this? I can see some benefit in easier
testing of syncrep, but that's basically it?

When used with syncrep, standby server crashes, multiplexing of WAL

can be collateral.

Data loss can be to nearly zero.

I don't see how this gets pg_receivexlog much closer to a solution for
multiplexing WAL. Since there's no support for streaming data, removing
old WAL and such it seems to me you'd need to have something entirely
different anyway?
I'm not really averse to adding this feature (on the basis of easier
syncrep testing alone), but I don't like the arguments for it so far...

The problems of multiplex WAL which I recognize as follows.

1.To flush multiple records per received consecutively. (implemented in pg_receivexlog)
2.A feedback message reports a flush position for every flush. (implemented in pg_receivexlog)
3.establishment of recovery steps by using pg_receivexlog.
4.removing old WAL.(Remove the recycled or archived WAL)

First, it is not considered multiple WAL.
I will post a patch to flush to multiple records for each received continuously.

By increasing the frequency of flush,
this patch reduces the lost of data of pg_receivexlog machine crash.

I will consider in turn also other problems.

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#18Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#16)
Re: pg_receivexlog add synchronous mode

On Mon, Jun 30, 2014 at 7:09 PM, <furuyao@pm.nttdata.co.jp> wrote:

Thanks for the review!

+            if (secs <= 0)
+                secs = 1;    /* Always sleep at least 1 sec */
+
+            sleeptime = secs * 1000 + usecs / 1000;

The above is the code which caused that problem. 'usecs' should have been
reset to zero when 'secs' are rounded up to 1 second. But not. Attached
is the updated version of the patch.

Thank you for the refactoring v2 patch.
I did a review of the patch.

1. applied cleanly and compilation was without warnings and errors
2. all regress tests was passed ok
3. sleeptime is ok when the --status-intarvall is set to 1

Thanks for reviewing the patch!

I think that this refactoring patch is useful for improving source code
readability and making the future patches simpler, whether we adopt
your patch or not. So, barring any objections, I'm thinking to commit
this refactoring patch.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#19Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#18)
Re: pg_receivexlog add synchronous mode

On Tue, Jul 1, 2014 at 10:11 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Mon, Jun 30, 2014 at 7:09 PM, <furuyao@pm.nttdata.co.jp> wrote:

Thanks for the review!

+            if (secs <= 0)
+                secs = 1;    /* Always sleep at least 1 sec */
+
+            sleeptime = secs * 1000 + usecs / 1000;

The above is the code which caused that problem. 'usecs' should have been
reset to zero when 'secs' are rounded up to 1 second. But not. Attached
is the updated version of the patch.

Thank you for the refactoring v2 patch.
I did a review of the patch.

1. applied cleanly and compilation was without warnings and errors
2. all regress tests was passed ok
3. sleeptime is ok when the --status-intarvall is set to 1

Thanks for reviewing the patch!

I think that this refactoring patch is useful for improving source code
readability and making the future patches simpler, whether we adopt
your patch or not. So, barring any objections, I'm thinking to commit
this refactoring patch.

Committed!

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#20Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#19)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

Thanks for reviewing the patch!

I think that this refactoring patch is useful for improving source
code readability and making the future patches simpler, whether we
adopt your patch or not. So, barring any objections, I'm thinking to
commit this refactoring patch.

Committed!

It is a patch that added the --fsync-interval option.
Interface of --fsync-interval option was referring to the "pg_recvlogical.c".

It is not judgement the flush on a per-message basis.
It is judgment at the time of receipt stop of the message.

If you specify a zero --fsync-interval make the flush at the same timing as the walreceiver .
If you do not specify --fsync-interval, you will flush only when switching as WAL files as in the past.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-fsync-interval-v1.patchapplication/octet-stream; name=pg_receivexlog-add-fsync-interval-v1.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,127 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-F <replaceable>interval_seconds</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term>
+       <listitem>
+        <para>
+         How often should <application>pg_receivexlog</application> issue sync
+         commands to ensure the received WAL file is safely
+         flushed to disk without being asked by the server to do so. Specifying
+         an interval of <literal>0</literal> together the consecutive data.
+         Not specifying an interval disables issuing fsyncs altogether,
+         while still reporting progress the server.  In this case, data may be
+         lost in the event of a crash.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, -1))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+ static int	fsync_interval = -1; /* Invalid = default */
  static volatile bool time_to_abort = false;
  
  
***************
*** 62,67 **** usage(void)
--- 63,70 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -F  --fsync-interval=SECS\n"
+ 			 "                         frequency of syncs to the output file (default: file close only)\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 333,339 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", fsync_interval);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 363,369 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"fsync-interval", required_argument, NULL, 'F'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 393,399 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 440,454 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'F':
+ 				fsync_interval = atoi(optarg) * 1000;
+ 				if (fsync_interval < 0)
+ 				{
+ 					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,46 **** static int	walfile = -1;
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
- 
  /*
   * Open a new WAL file in the specified directory.
   *
--- 30,46 ----
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
  /*
   * Open a new WAL file in the specified directory.
   *
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 419,434 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * fsync_interval controls how often we flush to the received
+  * WAL file, in seconds.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int fsync_interval)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 573,579 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, fsync_interval);
  		if (res == NULL)
  			goto error;
  
***************
*** 731,737 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 734,740 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int fsync_interval)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 750,757 ----
  		int64		now;
  		int			hdr_len;
  		long		sleeptime;
+ 		int64		message_target = 0;
+ 		int64		fsync_target = 0;
  
  		/*
  		 * Check if we should continue streaming, or abort at this point.
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				goto error;
  			last_status = now;
  		}
! 
! 		/*
! 		 * Compute how long send/receive loops should sleep
! 		 */
! 		if (standby_message_timeout && still_sending)
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
--- 785,813 ----
  				goto error;
  			last_status = now;
  		}
! 		
! 		/* Compute when we need to wakeup to send a keepalive message. */
! 		if (standby_message_timeout)
! 			message_target = last_status + (standby_message_timeout - 1) *
! 				((int64) 1000);
! 
! 		/* Compute when we need to wakeup to fsync the output file. */
! 		if (fsync_interval > 0 && lastFlushPosition < blockpos)
! 			fsync_target = output_last_fsync + (fsync_interval - 1) *
! 				((int64) 1000);
! 
! 		/* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! 		if (still_sending && (message_target > 0 || fsync_target > 0))
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = message_target;
! 
! 			if (fsync_target > 0 && fsync_target < targettime)
! 				targettime = fsync_target;
! 
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		if (r == 0)
! 			continue;
! 		if (r == -1)
! 			goto error;
! 		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
! 
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
  			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 					{
! 						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
  						goto error;
! 					}
! 					res = PQgetResult(conn);
  				}
- 				still_sending = false;
- 			}
- 			if (copybuf != NULL)
- 				PQfreemem(copybuf);
- 			copybuf = NULL;
- 			*stoppos = blockpos;
- 			return res;
- 		}
- 
- 		/* Check the message type. */
- 		if (copybuf[0] == 'k')
- 		{
- 			int			pos;
- 			bool		replyRequested;
- 
- 			/*
- 			 * Parse the keepalive message, enclosed in the CopyData message.
- 			 * We just check if the server requested a reply, and ignore the
- 			 * rest.
- 			 */
- 			pos = 1;			/* skip msgtype 'k' */
- 			pos += 8;			/* skip walEnd */
- 			pos += 8;			/* skip sendTime */
- 
- 			if (r < pos + 1)
- 			{
- 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
- 						progname, r);
- 				goto error;
  			}
! 			replyRequested = copybuf[pos];
! 
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
  			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
! 		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
! 
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			blockpos = fe_recvint64(&copybuf[1]);
! 
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
  
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
! 				}
! 			}
! 			else
! 			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
  				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  					goto error;
  				}
! 			}
! 
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
  
! 			while (bytes_left)
! 			{
! 				int			bytes_to_write;
  
  				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
  				 */
- 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
- 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
- 				else
- 					bytes_to_write = bytes_left;
- 
  				if (walfile == -1)
  				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
  					{
! 						/* Error logged by open_walfile */
  						goto error;
  					}
  				}
! 
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
  				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
! 					goto error;
  				}
  
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
  
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
  				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
  						goto error;
  
! 					xlogoff = 0;
  
! 					if (still_sending && stream_stop(blockpos, timeline, false))
  					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
  							goto error;
  						}
- 						still_sending = false;
- 						break;	/* ignore the rest of this XLogData packet */
  					}
  				}
  			}
! 			/* No more data left to write, receive next copy packet */
  		}
! 		else
  		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
  			goto error;
  		}
  	}
  
--- 825,1061 ----
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		while(r > 0)
  		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
  			{
! 				int			pos;
! 				bool		replyRequested;
! 
! 				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
! 				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
  				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
  						goto error;
! 					last_status = now;
  				}
  			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					break;
  
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				blockpos = fe_recvint64(&copybuf[1]);
  
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
  
  				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
  				 */
  				if (walfile == -1)
  				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
  					{
! 						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
  						goto error;
  					}
  				}
! 				else
  				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! 						goto error;
! 					}
  				}
  
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
  
! 				while (bytes_left)
  				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
! 
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
  						goto error;
+ 					}
  
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
  
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
  					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
  							goto error;
+ 
+ 						xlogoff = 0;
+ 
+ 						if (still_sending && stream_stop(blockpos, timeline, false))
+ 						{
+ 							if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 							{
+ 								fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ 										progname, PQerrorMessage(conn));
+ 								goto error;
+ 							}
+ 							still_sending = false;
+ 							break;	/* ignore the rest of this XLogData packet */
  						}
  					}
  				}
+ 				/* No more data left to write, receive next copy packet */
+ 			}
+ 			else
+ 			{
+ 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ 								progname, copybuf[0]);
+ 				goto error;
  			}
! 			r = CopyStreamReceive(conn, -1, &copybuf);
  		}
! 		if (r == 0)
  		{
! 			/* --fsync-interval argument has been specified */
! 			if (fsync_interval >= 0)
! 			{
! 				 /* interval has been specified */
! 				if (fsync_interval > 0)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
! 						continue;
! 					output_last_fsync = now;
! 				}
! 				/* check the need for flush */
! 				if (walfile != -1 && lastFlushPosition < blockpos)
! 				{
! 					if (fsync(walfile) != 0)
! 					{
! 						fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 										progname, current_walfile_name, strerror(errno));
! 						goto error;
! 					}
! 					lastFlushPosition = blockpos;
! 				}
! 			}
! 			continue;
! 		}
! 		if (r == -1)
  			goto error;
+ 		if (r == -2)
+ 		{
+ 			PGresult   *res = PQgetResult(conn);
+ 
+ 			/*
+ 			 * The server closed its end of the copy stream.  If we haven't
+ 			 * closed ours already, we need to do so now, unless the server
+ 			 * threw an error, in which case we don't.
+ 			 */
+ 			if (still_sending)
+ 			{
+ 				if (!close_walfile(basedir, partial_suffix, blockpos))
+ 				{
+ 					/* Error message written in close_walfile() */
+ 					PQclear(res);
+ 					goto error;
+ 				}
+ 				if (PQresultStatus(res) == PGRES_COPY_IN)
+ 				{
+ 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 					{
+ 						fprintf(stderr,
+ 							_("%s: could not send copy-end packet: %s"),
+ 							progname, PQerrorMessage(conn));
+ 						PQclear(res);
+ 						goto error;
+ 					}
+ 					res = PQgetResult(conn);
+ 				}
+ 				still_sending = false;
+ 			}
+ 			if (copybuf != NULL)
+ 				PQfreemem(copybuf);
+ 			copybuf = NULL;
+ 			*stoppos = blockpos;
+ 			return res;
  		}
  	}
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int fsync_interval);
#21Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#20)
Re: pg_receivexlog add synchronous mode

On Fri, Jul 4, 2014 at 7:45 PM, <furuyao@pm.nttdata.co.jp> wrote:

Thanks for reviewing the patch!

I think that this refactoring patch is useful for improving source
code readability and making the future patches simpler, whether we
adopt your patch or not. So, barring any objections, I'm thinking to
commit this refactoring patch.

Committed!

It is a patch that added the --fsync-interval option.
Interface of --fsync-interval option was referring to the "pg_recvlogical.c".

It is not judgement the flush on a per-message basis.
It is judgment at the time of receipt stop of the message.

If you specify a zero --fsync-interval make the flush at the same timing as the walreceiver .
If you do not specify --fsync-interval, you will flush only when switching as WAL files as in the past.

Thanks for revising the patch!

Could you update the status of this patch from "Waiting on Author" to
"Needs Review" when you post the revised version of the patch?

+        How often should <application>pg_receivexlog</application> issue sync
+        commands to ensure the received WAL file is safely
+        flushed to disk without being asked by the server to do so.

"without being asked by the server to do so" implies that the server asks
pg_receivexlog to flush WAL files periodically?

Specifying
+ an interval of <literal>0</literal> together the consecutive data.

This text looks corrupted. What does this mean?

+        Not specifying an interval disables issuing fsyncs altogether,
+        while still reporting progress the server.

No. Even when the option is not specified, WAL files should be flushed at
WAL file switch, like current pg_receivexlog does. If you want to disable
the flush completely, you can change the option so that it accepts -1 which
means to disable the flush, for example.

+    printf(_("  -F  --fsync-interval=SECS\n"
+             "                         frequency of syncs to the
output file (default: file close only)\n"));

It's better to use "transaction log files" rather than "output file" here.

SECS should be INTERVAL for the sake of consistency with --stat-interval's
help message?

+ * fsync_interval controls how often we flush to the received
+ * WAL file, in seconds.

"seconds" should be "miliseconds"?

The patch changes pg_receivexlog so that it keep processing
the continuous messages without calling stream_stop(). But
as I commented before, stream_stop should be called every time
the message is received? Otherwise pg_basebackup background
WAL receiver might not be able to stop streaming at proper point.

The flush interval is checked and WAL files are flushed only when
CopyStreamReceive() returns 0, i.e., there is no message available
and the timeout occurs. Why did you do that? I'm afraid that
CopyStreamReceive() always waits for at least one second before
WAL files are flushed even when --fsync-interval is set to 0.

Why don't you update output_last_status when WAL file is flushed
and closed?

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#22Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#21)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

This patch is modified the comment.
Each comment is coping as follows.

Could you update the status of this patch from "Waiting on Author" to
"Needs Review" when you post the revised version of the patch?

Thank you for pointing out.
From now on, I will update status when I post the patch.

+        How often should <application>pg_receivexlog</application>
issue sync
+        commands to ensure the received WAL file is safely
+        flushed to disk without being asked by the server to do so.

"without being asked by the server to do so" implies that the server asks
pg_receivexlog to flush WAL files periodically?

I think that the sentence means the following.
Without waiting for the feedback request from the server, select is timed out and flush is checked.

Because I cause misunderstanding, I delete a sentence.

Specifying
+ an interval of <literal>0</literal> together the consecutive
data.

This text looks corrupted. What does this mean?

+        Not specifying an interval disables issuing fsyncs altogether,
+        while still reporting progress the server.

No. Even when the option is not specified, WAL files should be flushed
at WAL file switch, like current pg_receivexlog does. If you want to
disable the flush completely, you can change the option so that it accepts
-1 which means to disable the flush, for example.

Fix to description "issuing fsyncs at only WAL file close".

+    printf(_("  -F  --fsync-interval=SECS\n"
+             "                         frequency of syncs to the
output file (default: file close only)\n"));

It's better to use "transaction log files" rather than "output file"
here.

Fix as you pointed out.

SECS should be INTERVAL for the sake of consistency with
--stat-interval's help message?

Fix as you pointed out.

+ * fsync_interval controls how often we flush to the received
+ * WAL file, in seconds.

"seconds" should be "miliseconds"?

Fix as you pointed out.

The patch changes pg_receivexlog so that it keep processing the
continuous messages without calling stream_stop(). But as I commented
before, stream_stop should be called every time the message is received?
Otherwise pg_basebackup background WAL receiver might not be able to stop
streaming at proper point.

FIx the calling stream_stop() with 1 message processing is complete.

The flush interval is checked and WAL files are flushed only when
CopyStreamReceive() returns 0, i.e., there is no message available and
the timeout occurs. Why did you do that? I'm afraid that
CopyStreamReceive() always waits for at least one second before WAL files
are flushed even when --fsync-interval is set to 0.

CopyStreamReceive() is according to pg_recvlogical --fsync-interval and
--status-interval shorter intervals runs the wait.
About specifying an interval of zero.
Every flush to continuously message, so no problem will wait one second.

Why don't you update output_last_status when WAL file is flushed and
closed?

About the closed, add the update step.
About the flush, according to pg_recvlogical, update is performed after an interval check before flush.
Therefore not modify.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-fsync-interval-v2.patchapplication/octet-stream; name=pg_receivexlog-add-fsync-interval-v2.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,126 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-F <replaceable>interval_seconds</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term>
+       <listitem>
+        <para>
+         How often should <application>pg_receivexlog</application> issue sync
+         commands to ensure the received WAL file is safely flushed to disk. 
+         Specifying an interval of <literal>0</literal> issuing fsyncs at 
+         every consecutive data. Not specifying an interval issuing fsyncs 
+         at only  WAL file close, while still reporting progress the server. 
+         In this case, data may be lost in the event of a crash.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, -1))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+ static int	fsync_interval = -1; /* Invalid = default */
  static volatile bool time_to_abort = false;
  
  
***************
*** 62,67 **** usage(void)
--- 63,71 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -F  --fsync-interval=INTERVAL\n"
+ 			 "                         frequency of syncs to the transaction log files (in seconds)\n"
+ 			 "                         (default: file close only)\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 334,340 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", fsync_interval);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 364,370 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"fsync-interval", required_argument, NULL, 'F'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 394,400 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 441,455 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'F':
+ 				fsync_interval = atoi(optarg) * 1000;
+ 				if (fsync_interval < 0)
+ 				{
+ 					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,40 **** static int	walfile = -1;
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
--- 30,41 ----
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
***************
*** 187,193 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 
  	lastFlushPosition = pos;
  	return true;
  }
--- 188,194 ----
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 	output_last_fsync = feGetCurrentTimestamp();
  	lastFlushPosition = pos;
  	return true;
  }
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 420,435 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * fsync_interval controls how often we flush to the received
+  * WAL file, in milliseconds.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int fsync_interval)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 574,580 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, fsync_interval);
  		if (res == NULL)
  			goto error;
  
***************
*** 731,737 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 735,741 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int fsync_interval)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 751,758 ----
  		int64		now;
  		int			hdr_len;
  		long		sleeptime;
+ 		int64		message_target = 0;
+ 		int64		fsync_target = 0;
  
  		/*
  		 * Check if we should continue streaming, or abort at this point.
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				goto error;
  			last_status = now;
  		}
! 
! 		/*
! 		 * Compute how long send/receive loops should sleep
! 		 */
! 		if (standby_message_timeout && still_sending)
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
--- 786,814 ----
  				goto error;
  			last_status = now;
  		}
! 		
! 		/* Compute when we need to wakeup to send a keepalive message. */
! 		if (standby_message_timeout)
! 			message_target = last_status + (standby_message_timeout - 1) *
! 				((int64) 1000);
! 
! 		/* Compute when we need to wakeup to fsync the output file. */
! 		if (fsync_interval > 0 && lastFlushPosition < blockpos)
! 			fsync_target = output_last_fsync + (fsync_interval - 1) *
! 				((int64) 1000);
! 
! 		/* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! 		if (still_sending && (message_target > 0 || fsync_target > 0))
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = message_target;
! 
! 			if (fsync_target > 0 && fsync_target < targettime)
! 				targettime = fsync_target;
! 
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		if (r == 0)
! 			continue;
! 		if (r == -1)
! 			goto error;
! 		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
  
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
  			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  					{
  						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
  						goto error;
  					}
- 					res = PQgetResult(conn);
  				}
- 				still_sending = false;
- 			}
- 			if (copybuf != NULL)
- 				PQfreemem(copybuf);
- 			copybuf = NULL;
- 			*stoppos = blockpos;
- 			return res;
- 		}
  
! 		/* Check the message type. */
! 		if (copybuf[0] == 'k')
! 		{
! 			int			pos;
! 			bool		replyRequested;
  
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
  
! 			if (r < pos + 1)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			replyRequested = copybuf[pos];
  
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
! 			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
! 		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
  
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			blockpos = fe_recvint64(&copybuf[1]);
  
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
  
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
  				}
  			}
  			else
  			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
  				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  					goto error;
  				}
  			}
! 
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
! 
! 			while (bytes_left)
  			{
! 				int			bytes_to_write;
! 
! 				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
! 				 */
! 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 				else
! 					bytes_to_write = bytes_left;
! 
! 				if (walfile == -1)
  				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
  					{
! 						/* Error logged by open_walfile */
  						goto error;
  					}
  				}
  
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
  				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
  					goto error;
  				}
! 
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
! 
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
  				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
! 						goto error;
! 
! 					xlogoff = 0;
! 
! 					if (still_sending && stream_stop(blockpos, timeline, false))
  					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
! 							goto error;
! 						}
! 						still_sending = false;
! 						break;	/* ignore the rest of this XLogData packet */
  					}
  				}
  			}
! 			/* No more data left to write, receive next copy packet */
! 		}
! 		else
! 		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
! 			goto error;
  		}
  	}
  
--- 826,1073 ----
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		while(r > 0)
  		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
! 			{
! 				int			pos;
! 				bool		replyRequested;
  
! 				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
! 				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
! 						goto error;
! 					last_status = now;
! 				}
! 			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					break;
! 
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				blockpos = fe_recvint64(&copybuf[1]);
! 
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
! 				 */
! 				if (walfile == -1)
  				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
  					{
  						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
! 						goto error;
! 					}
! 				}
! 				else
! 				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  						goto error;
  					}
  				}
  
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
  
! 				while (bytes_left)
! 				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
  
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
! 						goto error;
! 					}
  
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
  
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
! 					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
! 							goto error;
  
! 						xlogoff = 0;
  
! 						if (still_sending && stream_stop(blockpos, timeline, false))
! 						{
! 							if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 							{
! 								fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 										progname, PQerrorMessage(conn));
! 								goto error;
! 							}
! 							still_sending = false;
! 							break;	/* ignore the rest of this XLogData packet */
! 						}
! 					}
  				}
+ 				/* No more data left to write, receive next copy packet */
  			}
  			else
  			{
! 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 								progname, copybuf[0]);
! 				goto error;
! 			}
! 			if (still_sending && stream_stop(blockpos, timeline, false))
! 			{
! 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  				{
! 					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
  					goto error;
  				}
+ 				still_sending = false;
+ 				break;  /* ignore the rest of this XLogData packet */
  			}
! 			r = CopyStreamReceive(conn, -1, &copybuf);
! 		}
! 		if (r == 0)
! 		{
! 			/* --fsync-interval argument has been specified */
! 			if (fsync_interval >= 0)
  			{
! 				 /* interval has been specified */
! 				if (fsync_interval > 0)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
! 						continue;
! 					output_last_fsync = now;
! 				}
! 				/* check the need for flush */
! 				if (walfile != -1 && lastFlushPosition < blockpos)
  				{
! 					if (fsync(walfile) != 0)
  					{
! 						fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 										progname, current_walfile_name, strerror(errno));
  						goto error;
  					}
+ 					lastFlushPosition = blockpos;
  				}
+ 			}
+ 			continue;
+ 		}
+ 		if (r == -1)
+ 			goto error;
+ 		if (r == -2)
+ 		{
+ 			PGresult   *res = PQgetResult(conn);
  
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
! 			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  					{
! 						fprintf(stderr,
! 							_("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
! 						PQclear(res);
! 						goto error;
  					}
+ 					res = PQgetResult(conn);
  				}
+ 				still_sending = false;
  			}
! 			if (copybuf != NULL)
! 				PQfreemem(copybuf);
! 			copybuf = NULL;
! 			*stoppos = blockpos;
! 			return res;
  		}
  	}
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int fsync_interval);
#23Noname
furuyao@pm.nttdata.co.jp
In reply to: Noname (#22)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

This patch was made by the following process.

1. post patch for additional pg_receivexlog synchronous mode.

2. In response to comment for the flush frequency, I revise the patch to do flush every consecutive message in reference to walreceiver.

3. The synchronization mode was necessary to reply after flush at a flush position, but --status-interval for setting at least 1 second, it was pointed out.
Therefore I changed it to the patch to add the mode which the same interval flush with walreceiver, and I canceled the synchronization mode.

4. The refactor patch was offered, and there was the "--fsync-interval" option in the "pg_recvlogical" which specifies a flush interval.
I changed it to the patch which did flush specified interval by the "--fsync-interval" in the same way as "pg_recvlogical".

5. Post the patch that reflects the comments centered on the document pointed out that.

6. The patch for correcting the operation at the time of specifying zero as "--status-interval" was offered.
In response to it, those that modify the behavior of If you specify -1 or zero of the "fsync-interval" is this patch.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-fsync-interval-v3.patchapplication/octet-stream; name=pg_receivexlog-add-fsync-interval-v3.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,127 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-F <replaceable>interval_seconds</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term>
+       <listitem>
+        <para>
+         How often should <application>pg_receivexlog</application> issue sync
+         commands to ensure the received WAL file is safely flushed to disk. 
+         Specifying an interval of <literal>-1</literal> issuing fsyncs at 
+         every consecutive data. The value zero issuing fsyncs at WAL file close.
+         Also not specifying an interval,issuing fsyncs at WAL file close.
+         In this case, data may be lost in the event of a crash.
+         The default value is zero.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 371,377 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 371,377 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+ static int	fsync_interval = 0; /* 0 = default */
  static volatile bool time_to_abort = false;
  
  
***************
*** 62,67 **** usage(void)
--- 63,72 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -F  --fsync-interval=INTERVAL\n"
+ 			 "                         frequency of syncs to the transaction log files (in seconds)\n"
+ 			 "                         The value -1 issuing fsyncs at every consecutive data\n"
+ 			 "                         (default: file close only)\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 335,341 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", fsync_interval);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 365,371 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"fsync-interval", required_argument, NULL, 'F'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 395,401 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 442,456 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'F':
+ 				fsync_interval = atoi(optarg) * 1000;
+ 				if (fsync_interval < -1000)
+ 				{
+ 					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,46 **** static int	walfile = -1;
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
- 
  /*
   * Open a new WAL file in the specified directory.
   *
--- 30,46 ----
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
  /*
   * Open a new WAL file in the specified directory.
   *
***************
*** 187,193 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 
  	lastFlushPosition = pos;
  	return true;
  }
--- 187,193 ----
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 	output_last_fsync = feGetCurrentTimestamp();
  	lastFlushPosition = pos;
  	return true;
  }
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 419,434 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * fsync_interval controls how often we flush to the received
+  * WAL file, in milliseconds.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int fsync_interval)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 573,579 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, fsync_interval);
  		if (res == NULL)
  			goto error;
  
***************
*** 731,737 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 734,740 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int fsync_interval)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 750,757 ----
  		int64		now;
  		int			hdr_len;
  		long		sleeptime;
+ 		int64		message_target = 0;
+ 		int64		fsync_target = 0;
  
  		/*
  		 * Check if we should continue streaming, or abort at this point.
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				goto error;
  			last_status = now;
  		}
! 
! 		/*
! 		 * Compute how long send/receive loops should sleep
! 		 */
! 		if (standby_message_timeout && still_sending)
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
--- 785,813 ----
  				goto error;
  			last_status = now;
  		}
! 		
! 		/* Compute when we need to wakeup to send a keepalive message. */
! 		if (standby_message_timeout)
! 			message_target = last_status + (standby_message_timeout - 1) *
! 				((int64) 1000);
! 
! 		/* Compute when we need to wakeup to fsync the output file. */
! 		if (fsync_interval > 0 && lastFlushPosition < blockpos)
! 			fsync_target = output_last_fsync + (fsync_interval - 1) *
! 				((int64) 1000);
! 
! 		/* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! 		if (still_sending && (message_target > 0 || fsync_target > 0))
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = message_target;
! 
! 			if (targettime == 0 || (fsync_target > 0 && fsync_target < targettime))
! 				targettime = fsync_target;
! 
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		if (r == 0)
! 			continue;
! 		if (r == -1)
! 			goto error;
! 		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
  
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
  			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  					{
  						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
  						goto error;
  					}
- 					res = PQgetResult(conn);
  				}
- 				still_sending = false;
- 			}
- 			if (copybuf != NULL)
- 				PQfreemem(copybuf);
- 			copybuf = NULL;
- 			*stoppos = blockpos;
- 			return res;
- 		}
  
! 		/* Check the message type. */
! 		if (copybuf[0] == 'k')
! 		{
! 			int			pos;
! 			bool		replyRequested;
  
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
  
! 			if (r < pos + 1)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			replyRequested = copybuf[pos];
  
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
! 			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
! 		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
  
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			blockpos = fe_recvint64(&copybuf[1]);
  
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
  
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
  				}
  			}
  			else
  			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
  				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  					goto error;
  				}
  			}
! 
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
! 
! 			while (bytes_left)
  			{
! 				int			bytes_to_write;
! 
! 				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
! 				 */
! 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 				else
! 					bytes_to_write = bytes_left;
! 
! 				if (walfile == -1)
  				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
  					{
! 						/* Error logged by open_walfile */
  						goto error;
  					}
  				}
  
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
  				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
  					goto error;
  				}
! 
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
! 
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
  				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
! 						goto error;
! 
! 					xlogoff = 0;
! 
! 					if (still_sending && stream_stop(blockpos, timeline, false))
  					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
! 							goto error;
! 						}
! 						still_sending = false;
! 						break;	/* ignore the rest of this XLogData packet */
  					}
  				}
  			}
! 			/* No more data left to write, receive next copy packet */
! 		}
! 		else
! 		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
! 			goto error;
  		}
  	}
  
--- 825,1072 ----
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		while(r > 0)
  		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
! 			{
! 				int			pos;
! 				bool		replyRequested;
  
! 				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
! 				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
! 				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
! 					goto error;
! 				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
! 						goto error;
! 					last_status = now;
! 				}
! 			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					break;
! 
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				blockpos = fe_recvint64(&copybuf[1]);
! 
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
! 
! 				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
! 				 */
! 				if (walfile == -1)
  				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
  					{
  						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
! 						goto error;
! 					}
! 				}
! 				else
! 				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  						goto error;
  					}
  				}
  
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
  
! 				while (bytes_left)
! 				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
  
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
! 						goto error;
! 					}
  
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
  
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
! 					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
! 							goto error;
  
! 						xlogoff = 0;
  
! 						if (still_sending && stream_stop(blockpos, timeline, false))
! 						{
! 							if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 							{
! 								fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 										progname, PQerrorMessage(conn));
! 								goto error;
! 							}
! 							still_sending = false;
! 							break;	/* ignore the rest of this XLogData packet */
! 						}
! 					}
  				}
+ 				/* No more data left to write, receive next copy packet */
  			}
  			else
  			{
! 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 								progname, copybuf[0]);
! 				goto error;
! 			}
! 			if (still_sending && stream_stop(blockpos, timeline, false))
! 			{
! 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  				{
! 					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
  					goto error;
  				}
+ 				still_sending = false;
+ 				break;  /* ignore the rest of this XLogData packet */
  			}
! 			r = CopyStreamReceive(conn, 0, &copybuf);
! 		}
! 		if (r == 0)
! 		{
! 			/* --fsync-interval argument has been specified */
! 			if (fsync_interval != 0)
  			{
! 				 /* interval has been specified */
! 				if (fsync_interval > 0)
! 				{
! 					now = feGetCurrentTimestamp();
! 					if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
! 						continue;
! 					output_last_fsync = now;
! 				}
! 				/* check the need for flush */
! 				if (walfile != -1 && lastFlushPosition < blockpos)
  				{
! 					if (fsync(walfile) != 0)
  					{
! 						fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! 										progname, current_walfile_name, strerror(errno));
  						goto error;
  					}
+ 					lastFlushPosition = blockpos;
  				}
+ 			}
+ 			continue;
+ 		}
+ 		if (r == -1)
+ 			goto error;
+ 		if (r == -2)
+ 		{
+ 			PGresult   *res = PQgetResult(conn);
  
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
! 			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
  					{
! 						fprintf(stderr,
! 							_("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
! 						PQclear(res);
! 						goto error;
  					}
+ 					res = PQgetResult(conn);
  				}
+ 				still_sending = false;
  			}
! 			if (copybuf != NULL)
! 				PQfreemem(copybuf);
! 			copybuf = NULL;
! 			*stoppos = blockpos;
! 			return res;
  		}
  	}
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int fsync_interval);
#24Noname
furuyao@pm.nttdata.co.jp
In reply to: Noname (#23)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

I have improved the patch by making following changes:

1. Since stream_stop() was redundant, stream_stop() at the time of WAL file closing was deleted.

2. Change the Flash judging timing for the readability of source code.
I have changed the Flash judging timing , from the continuous message after receiving to
before the feedbackmassege decision of continue statement after execution.

Regards,

--
Furuya Osamu

Attachments:

pg_receivexlog-add-fsync-interval-v4.patchapplication/octet-stream; name=pg_receivexlog-add-fsync-interval-v4.patchDownload
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,127 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-F <replaceable>interval_seconds</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable>interval_seconds</replaceable></option></term>
+       <listitem>
+        <para>
+         How often should <application>pg_receivexlog</application> issue sync
+         commands to ensure the received WAL file is safely flushed to disk. 
+         Specifying an interval of <literal>-1</literal> issuing fsyncs at 
+         every consecutive data. The value zero issuing fsyncs at WAL file close.
+         Also not specifying an interval,issuing fsyncs at WAL file close.
+         In this case, data may be lost in the event of a crash.
+         The default value is zero.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 371,377 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 371,377 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+ static int	fsync_interval = 0; /* 0 = default */
  static volatile bool time_to_abort = false;
  
  
***************
*** 62,67 **** usage(void)
--- 63,72 ----
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+ 	printf(_("  -F  --fsync-interval=INTERVAL\n"
+ 			 "                         frequency of syncs to the transaction log files (in seconds)\n"
+ 			 "                         The value -1 issuing fsyncs at every consecutive data\n"
+ 			 "                         (default: file close only)\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial");
  
  	PQfinish(conn);
  }
--- 335,341 ----
  				starttli);
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! 					  stop_streaming, standby_message_timeout, ".partial", fsync_interval);
  
  	PQfinish(conn);
  }
***************
*** 360,365 **** main(int argc, char **argv)
--- 365,371 ----
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
+ 		{"fsync-interval", required_argument, NULL, 'F'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 395,401 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 442,456 ----
  			case 'n':
  				noloop = 1;
  				break;
+ 			case 'F':
+ 				fsync_interval = atoi(optarg) * 1000;
+ 				if (fsync_interval < -1000)
+ 				{
+ 					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ 							progname, optarg);
+ 					exit(1);
+ 				}
+ 				break;
  			case 'v':
  				verbose++;
  				break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,46 **** static int	walfile = -1;
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
- 
  /*
   * Open a new WAL file in the specified directory.
   *
--- 30,46 ----
  static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
! 				 char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
  /*
   * Open a new WAL file in the specified directory.
   *
***************
*** 187,193 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 
  	lastFlushPosition = pos;
  	return true;
  }
--- 187,193 ----
  		fprintf(stderr,
  				_("%s: not renaming \"%s%s\", segment is not complete\n"),
  				progname, current_walfile_name, partial_suffix);
! 	output_last_fsync = feGetCurrentTimestamp();
  	lastFlushPosition = pos;
  	return true;
  }
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 419,434 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
+  * fsync_interval controls how often we flush to the received
+  * WAL file, in milliseconds.
+  *
   * Note: The log position *must* be at a log segment start!
   */
  bool
  ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
! 				  int standby_message_timeout, char *partial_suffix, int fsync_interval)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos);
  		if (res == NULL)
  			goto error;
  
--- 573,579 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, fsync_interval);
  		if (res == NULL)
  			goto error;
  
***************
*** 731,737 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 734,740 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int fsync_interval)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 750,757 ----
  		int64		now;
  		int			hdr_len;
  		long		sleeptime;
+ 		int64		message_target = 0;
+ 		int64		fsync_target = 0;
  
  		/*
  		 * Check if we should continue streaming, or abort at this point.
***************
*** 767,776 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			still_sending = false;
  		}
  
  		/*
  		 * Potentially send a status message to the master
  		 */
- 		now = feGetCurrentTimestamp();
  		if (still_sending && standby_message_timeout > 0 &&
  			feTimestampDifferenceExceeds(last_status, now,
  										 standby_message_timeout))
--- 772,805 ----
  			still_sending = false;
  		}
  
+ 		now = feGetCurrentTimestamp();
+ 
+ 		/* --fsync-interval argument has been specified */
+ 		if (fsync_interval != 0)
+ 		{
+ 			 /* interval has been specified */
+ 			if (fsync_interval > 0)
+ 			{
+ 				if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
+ 					continue;
+ 				output_last_fsync = now;
+ 			}
+ 			/* check the need for flush */
+ 			if (walfile != -1 && lastFlushPosition < blockpos)
+ 			{
+ 				if (fsync(walfile) != 0)
+ 				{
+ 					fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ 									progname, current_walfile_name, strerror(errno));
+ 					goto error;
+ 				}
+ 				lastFlushPosition = blockpos;
+ 			}
+ 		}
+ 
  		/*
  		 * Potentially send a status message to the master
  		 */
  		if (still_sending && standby_message_timeout > 0 &&
  			feTimestampDifferenceExceeds(last_status, now,
  										 standby_message_timeout))
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				goto error;
  			last_status = now;
  		}
! 
! 		/*
! 		 * Compute how long send/receive loops should sleep
! 		 */
! 		if (standby_message_timeout && still_sending)
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
--- 809,837 ----
  				goto error;
  			last_status = now;
  		}
! 		
! 		/* Compute when we need to wakeup to send a keepalive message. */
! 		if (standby_message_timeout)
! 			message_target = last_status + (standby_message_timeout - 1) *
! 				((int64) 1000);
! 
! 		/* Compute when we need to wakeup to fsync the output file. */
! 		if (fsync_interval > 0 && lastFlushPosition < blockpos)
! 			fsync_target = output_last_fsync + (fsync_interval - 1) *
! 				((int64) 1000);
! 
! 		/* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! 		if (still_sending && (message_target > 0 || fsync_target > 0))
  		{
  			int64		targettime;
  			long		secs;
  			int			usecs;
  
! 			targettime = message_target;
! 
! 			if (targettime == 0 || (fsync_target > 0 && fsync_target < targettime))
! 				targettime = fsync_target;
! 
  			feTimestampDifference(now,
  								  targettime,
  								  &secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		if (r == 0)
! 			continue;
! 		if (r == -1)
! 			goto error;
! 		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
! 
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
  			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
  				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
  					goto error;
  				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
  				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 					{
! 						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
  						goto error;
! 					}
! 					res = PQgetResult(conn);
  				}
- 				still_sending = false;
  			}
! 			if (copybuf != NULL)
! 				PQfreemem(copybuf);
! 			copybuf = NULL;
! 			*stoppos = blockpos;
! 			return res;
! 		}
! 
! 		/* Check the message type. */
! 		if (copybuf[0] == 'k')
! 		{
! 			int			pos;
! 			bool		replyRequested;
! 
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
! 
! 			if (r < pos + 1)
  			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			replyRequested = copybuf[pos];
! 
! 			/* If the server requested an immediate reply, send one. */
! 			if (replyRequested && still_sending)
! 			{
! 				now = feGetCurrentTimestamp();
! 				if (!sendFeedback(conn, blockpos, now, false))
! 					goto error;
! 				last_status = now;
! 			}
! 		}
! 		else if (copybuf[0] == 'w')
! 		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
! 
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
! 				goto error;
! 			}
! 			blockpos = fe_recvint64(&copybuf[1]);
! 
! 			/* Extract WAL location for this block */
! 			xlogoff = blockpos % XLOG_SEG_SIZE;
  
! 			/*
! 			 * Verify that the initial location in the stream matches where we
! 			 * think we are.
! 			 */
! 			if (walfile == -1)
! 			{
! 				/* No file open yet */
! 				if (xlogoff != 0)
! 				{
! 					fprintf(stderr,
! 							_("%s: received transaction log record for offset %u with no file open\n"),
! 							progname, xlogoff);
! 					goto error;
! 				}
! 			}
! 			else
! 			{
! 				/* More data in existing segment */
! 				/* XXX: store seek value don't reseek all the time */
! 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
  				{
! 					fprintf(stderr,
! 						  _("%s: got WAL data offset %08x, expected %08x\n"),
! 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
  					goto error;
  				}
! 			}
  
! 			bytes_left = r - hdr_len;
! 			bytes_written = 0;
! 
! 			while (bytes_left)
! 			{
! 				int			bytes_to_write;
  
  				/*
! 				 * If crossing a WAL boundary, only write up until we reach
! 				 * XLOG_SEG_SIZE.
  				 */
- 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
- 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
- 				else
- 					bytes_to_write = bytes_left;
- 
  				if (walfile == -1)
  				{
! 					if (!open_walfile(blockpos, timeline,
! 									  basedir, partial_suffix))
  					{
! 						/* Error logged by open_walfile */
  						goto error;
  					}
  				}
! 
! 				if (write(walfile,
! 						  copybuf + hdr_len + bytes_written,
! 						  bytes_to_write) != bytes_to_write)
  				{
! 					fprintf(stderr,
! 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 							progname, bytes_to_write, current_walfile_name,
! 							strerror(errno));
! 					goto error;
  				}
  
! 				/* Write was successful, advance our position */
! 				bytes_written += bytes_to_write;
! 				bytes_left -= bytes_to_write;
! 				blockpos += bytes_to_write;
! 				xlogoff += bytes_to_write;
  
! 				/* Did we reach the end of a WAL segment? */
! 				if (blockpos % XLOG_SEG_SIZE == 0)
  				{
! 					if (!close_walfile(basedir, partial_suffix, blockpos))
! 						/* Error message written in close_walfile() */
  						goto error;
  
! 					xlogoff = 0;
  
! 					if (still_sending && stream_stop(blockpos, timeline, false))
  					{
! 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 						{
! 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 									progname, PQerrorMessage(conn));
  							goto error;
! 						}
! 						still_sending = false;
! 						break;	/* ignore the rest of this XLogData packet */
  					}
  				}
  			}
! 			/* No more data left to write, receive next copy packet */
  		}
! 		else
  		{
! 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! 					progname, copybuf[0]);
  			goto error;
  		}
  	}
  
--- 849,1061 ----
  			sleeptime = -1;
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
! 		while(r > 0)
  		{
! 			/* Check the message type. */
! 			if (copybuf[0] == 'k')
  			{
! 				int			pos;
! 				bool		replyRequested;
! 
! 				/*
! 				 * Parse the keepalive message, enclosed in the CopyData message.
! 				 * We just check if the server requested a reply, and ignore the
! 				 * rest.
! 				 */
! 				pos = 1;			/* skip msgtype 'k' */
! 				pos += 8;			/* skip walEnd */
! 				pos += 8;			/* skip sendTime */
! 
! 				if (r < pos + 1)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				replyRequested = copybuf[pos];
! 
! 				/* If the server requested an immediate reply, send one. */
! 				if (replyRequested && still_sending)
  				{
! 					now = feGetCurrentTimestamp();
! 					if (!sendFeedback(conn, blockpos, now, false))
  						goto error;
! 					last_status = now;
  				}
  			}
! 			else if (copybuf[0] == 'w')
  			{
! 				/*
! 				 * Once we've decided we don't want to receive any more, just
! 				 * ignore any subsequent XLogData messages.
! 				 */
! 				if (!still_sending)
! 					break;
  
! 				/*
! 				 * Read the header of the XLogData message, enclosed in the
! 				 * CopyData message. We only need the WAL location field
! 				 * (dataStart), the rest of the header is ignored.
! 				 */
! 				hdr_len = 1;		/* msgtype 'w' */
! 				hdr_len += 8;		/* dataStart */
! 				hdr_len += 8;		/* walEnd */
! 				hdr_len += 8;		/* sendTime */
! 				if (r < hdr_len)
  				{
! 					fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 							progname, r);
  					goto error;
  				}
! 				blockpos = fe_recvint64(&copybuf[1]);
  
! 				/* Extract WAL location for this block */
! 				xlogoff = blockpos % XLOG_SEG_SIZE;
  
  				/*
! 				 * Verify that the initial location in the stream matches where we
! 				 * think we are.
  				 */
  				if (walfile == -1)
  				{
! 					/* No file open yet */
! 					if (xlogoff != 0)
  					{
! 						fprintf(stderr,
! 								_("%s: received transaction log record for offset %u with no file open\n"),
! 								progname, xlogoff);
  						goto error;
  					}
  				}
! 				else
  				{
! 					/* More data in existing segment */
! 					/* XXX: store seek value don't reseek all the time */
! 					if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! 					{
! 						fprintf(stderr,
! 							  _("%s: got WAL data offset %08x, expected %08x\n"),
! 						   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! 						goto error;
! 					}
  				}
  
! 				bytes_left = r - hdr_len;
! 				bytes_written = 0;
  
! 				while (bytes_left)
  				{
! 					int			bytes_to_write;
! 
! 					/*
! 					 * If crossing a WAL boundary, only write up until we reach
! 					 * XLOG_SEG_SIZE.
! 					 */
! 					if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! 						bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! 					else
! 						bytes_to_write = bytes_left;
! 
! 					if (walfile == -1)
! 					{
! 						if (!open_walfile(blockpos, timeline,
! 										  basedir, partial_suffix))
! 						{
! 							/* Error logged by open_walfile */
! 							goto error;
! 						}
! 					}
! 
! 					if (write(walfile,
! 							  copybuf + hdr_len + bytes_written,
! 							  bytes_to_write) != bytes_to_write)
! 					{
! 						fprintf(stderr,
! 								_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! 								progname, bytes_to_write, current_walfile_name,
! 								strerror(errno));
  						goto error;
+ 					}
  
! 					/* Write was successful, advance our position */
! 					bytes_written += bytes_to_write;
! 					bytes_left -= bytes_to_write;
! 					blockpos += bytes_to_write;
! 					xlogoff += bytes_to_write;
  
! 					/* Did we reach the end of a WAL segment? */
! 					if (blockpos % XLOG_SEG_SIZE == 0)
  					{
! 						if (!close_walfile(basedir, partial_suffix, blockpos))
! 							/* Error message written in close_walfile() */
  							goto error;
! 
! 						xlogoff = 0;
  					}
  				}
+ 				/* No more data left to write, receive next copy packet */
+ 			}
+ 			else
+ 			{
+ 				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ 								progname, copybuf[0]);
+ 				goto error;
  			}
! 			if (still_sending && stream_stop(blockpos, timeline, false))
! 			{
! 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 				{
! 					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! 							progname, PQerrorMessage(conn));
! 					goto error;
! 				}
! 				still_sending = false;
! 				break;  /* ignore the rest of this XLogData packet */
! 			}
! 			r = CopyStreamReceive(conn, 0, &copybuf);
  		}
! 		if (r == 0)
  		{
! 			continue;
! 		}
! 		if (r == -1)
  			goto error;
+ 		if (r == -2)
+ 		{
+ 			PGresult   *res = PQgetResult(conn);
+ 
+ 			/*
+ 			 * The server closed its end of the copy stream.  If we haven't
+ 			 * closed ours already, we need to do so now, unless the server
+ 			 * threw an error, in which case we don't.
+ 			 */
+ 			if (still_sending)
+ 			{
+ 				if (!close_walfile(basedir, partial_suffix, blockpos))
+ 				{
+ 					/* Error message written in close_walfile() */
+ 					PQclear(res);
+ 					goto error;
+ 				}
+ 				if (PQresultStatus(res) == PGRES_COPY_IN)
+ 				{
+ 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 					{
+ 						fprintf(stderr,
+ 							_("%s: could not send copy-end packet: %s"),
+ 							progname, PQerrorMessage(conn));
+ 						PQclear(res);
+ 						goto error;
+ 					}
+ 					res = PQgetResult(conn);
+ 				}
+ 				still_sending = false;
+ 			}
+ 			if (copybuf != NULL)
+ 				PQfreemem(copybuf);
+ 			copybuf = NULL;
+ 			*stoppos = blockpos;
+ 			return res;
  		}
  	}
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix);
--- 16,20 ----
  				  char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
! 				  char *partial_suffix,
! 				  int fsync_interval);
#25Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#24)
Re: pg_receivexlog add synchronous mode

On Tue, Jul 29, 2014 at 7:07 PM, <furuyao@pm.nttdata.co.jp> wrote:

I have improved the patch by making following changes:

1. Since stream_stop() was redundant, stream_stop() at the time of WAL file closing was deleted.

2. Change the Flash judging timing for the readability of source code.
I have changed the Flash judging timing , from the continuous message after receiving to
before the feedbackmassege decision of continue statement after execution.

Thanks for the updated version of the patch!

While reviewing the patch, I found that HandleCopyStream() is still
long and which decreases the readability of the source code.
So I feel inclined to refactor the HandleCopyStream() more for better
readability. What about the attached refactoring patch?

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#26Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#25)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

On Tue, Aug 5, 2014 at 9:04 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Jul 29, 2014 at 7:07 PM, <furuyao@pm.nttdata.co.jp> wrote:

I have improved the patch by making following changes:

1. Since stream_stop() was redundant, stream_stop() at the time of WAL file closing was deleted.

2. Change the Flash judging timing for the readability of source code.
I have changed the Flash judging timing , from the continuous message after receiving to
before the feedbackmassege decision of continue statement after execution.

Thanks for the updated version of the patch!

While reviewing the patch, I found that HandleCopyStream() is still
long and which decreases the readability of the source code.
So I feel inclined to refactor the HandleCopyStream() more for better
readability. What about the attached refactoring patch?

Sorry, I forgot to attached the patch in previous email. So attached.

Regards,

--
Fujii Masao

Attachments:

refactor-receivexlog.patchtext/x-patch; charset=US-ASCII; name=refactor-receivexlog.patchDownload
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 31,42 **** static char current_walfile_name[MAXPGPATH] = "";
--- 31,53 ----
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
+ static bool still_sending = true;		/* feedback still needs to be sent? */
+ 
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
  				 char *partial_suffix, XLogRecPtr *stoppos);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ 								XLogRecPtr blockpos, int64 *last_status);
+ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+ 							   XLogRecPtr *blockpos, uint32 timeline,
+ 							   char *basedir, stream_stop_callback stream_stop,
+ 							   char *partial_suffix);
+ static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+ 									   XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+ 									   XLogRecPtr *stoppos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 740,755 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
! 	bool		still_sending = true;
  
  	while (1)
  	{
  		int			r;
- 		int			xlogoff;
- 		int			bytes_left;
- 		int			bytes_written;
  		int64		now;
- 		int			hdr_len;
  		long		sleeptime;
  
  		/*
--- 751,763 ----
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
  	XLogRecPtr	blockpos = startpos;
! 
! 	still_sending = true;
  
  	while (1)
  	{
  		int			r;
  		int64		now;
  		long		sleeptime;
  
  		/*
***************
*** 818,1015 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  			goto error;
  		if (r == -2)
  		{
! 			PGresult   *res = PQgetResult(conn);
! 
! 			/*
! 			 * The server closed its end of the copy stream.  If we haven't
! 			 * closed ours already, we need to do so now, unless the server
! 			 * threw an error, in which case we don't.
! 			 */
! 			if (still_sending)
! 			{
! 				if (!close_walfile(basedir, partial_suffix, blockpos))
! 				{
! 					/* Error message written in close_walfile() */
! 					PQclear(res);
! 					goto error;
! 				}
! 				if (PQresultStatus(res) == PGRES_COPY_IN)
! 				{
! 					if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! 					{
! 						fprintf(stderr,
! 								_("%s: could not send copy-end packet: %s"),
! 								progname, PQerrorMessage(conn));
! 						PQclear(res);
! 						goto error;
! 					}
! 					PQclear(res);
! 					res = PQgetResult(conn);
! 				}
! 				still_sending = false;
! 			}
! 			if (copybuf != NULL)
! 				PQfreemem(copybuf);
! 			copybuf = NULL;
! 			*stoppos = blockpos;
! 			return res;
  		}
  
  		/* Check the message type. */
  		if (copybuf[0] == 'k')
  		{
! 			int			pos;
! 			bool		replyRequested;
! 
! 			/*
! 			 * Parse the keepalive message, enclosed in the CopyData message.
! 			 * We just check if the server requested a reply, and ignore the
! 			 * rest.
! 			 */
! 			pos = 1;			/* skip msgtype 'k' */
! 			pos += 8;			/* skip walEnd */
! 			pos += 8;			/* skip sendTime */
! 
! 			if (r < pos + 1)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
  				goto error;
- 			}
- 			replyRequested = copybuf[pos];
- 
- 			/* If the server requested an immediate reply, send one. */
- 			if (replyRequested && still_sending)
- 			{
- 				now = feGetCurrentTimestamp();
- 				if (!sendFeedback(conn, blockpos, now, false))
- 					goto error;
- 				last_status = now;
- 			}
  		}
  		else if (copybuf[0] == 'w')
  		{
! 			/*
! 			 * Once we've decided we don't want to receive any more, just
! 			 * ignore any subsequent XLogData messages.
! 			 */
! 			if (!still_sending)
! 				continue;
! 
! 			/*
! 			 * Read the header of the XLogData message, enclosed in the
! 			 * CopyData message. We only need the WAL location field
! 			 * (dataStart), the rest of the header is ignored.
! 			 */
! 			hdr_len = 1;		/* msgtype 'w' */
! 			hdr_len += 8;		/* dataStart */
! 			hdr_len += 8;		/* walEnd */
! 			hdr_len += 8;		/* sendTime */
! 			if (r < hdr_len)
! 			{
! 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
! 						progname, r);
  				goto error;
- 			}
- 			blockpos = fe_recvint64(&copybuf[1]);
- 
- 			/* Extract WAL location for this block */
- 			xlogoff = blockpos % XLOG_SEG_SIZE;
- 
- 			/*
- 			 * Verify that the initial location in the stream matches where we
- 			 * think we are.
- 			 */
- 			if (walfile == -1)
- 			{
- 				/* No file open yet */
- 				if (xlogoff != 0)
- 				{
- 					fprintf(stderr,
- 							_("%s: received transaction log record for offset %u with no file open\n"),
- 							progname, xlogoff);
- 					goto error;
- 				}
- 			}
- 			else
- 			{
- 				/* More data in existing segment */
- 				/* XXX: store seek value don't reseek all the time */
- 				if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
- 				{
- 					fprintf(stderr,
- 						  _("%s: got WAL data offset %08x, expected %08x\n"),
- 					   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
- 					goto error;
- 				}
- 			}
- 
- 			bytes_left = r - hdr_len;
- 			bytes_written = 0;
- 
- 			while (bytes_left)
- 			{
- 				int			bytes_to_write;
- 
- 				/*
- 				 * If crossing a WAL boundary, only write up until we reach
- 				 * XLOG_SEG_SIZE.
- 				 */
- 				if (xlogoff + bytes_left > XLOG_SEG_SIZE)
- 					bytes_to_write = XLOG_SEG_SIZE - xlogoff;
- 				else
- 					bytes_to_write = bytes_left;
- 
- 				if (walfile == -1)
- 				{
- 					if (!open_walfile(blockpos, timeline,
- 									  basedir, partial_suffix))
- 					{
- 						/* Error logged by open_walfile */
- 						goto error;
- 					}
- 				}
- 
- 				if (write(walfile,
- 						  copybuf + hdr_len + bytes_written,
- 						  bytes_to_write) != bytes_to_write)
- 				{
- 					fprintf(stderr,
- 							_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
- 							progname, bytes_to_write, current_walfile_name,
- 							strerror(errno));
- 					goto error;
- 				}
- 
- 				/* Write was successful, advance our position */
- 				bytes_written += bytes_to_write;
- 				bytes_left -= bytes_to_write;
- 				blockpos += bytes_to_write;
- 				xlogoff += bytes_to_write;
- 
- 				/* Did we reach the end of a WAL segment? */
- 				if (blockpos % XLOG_SEG_SIZE == 0)
- 				{
- 					if (!close_walfile(basedir, partial_suffix, blockpos))
- 						/* Error message written in close_walfile() */
- 						goto error;
- 
- 					xlogoff = 0;
- 
- 					if (still_sending && stream_stop(blockpos, timeline, true))
- 					{
- 						if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
- 						{
- 							fprintf(stderr, _("%s: could not send copy-end packet: %s"),
- 									progname, PQerrorMessage(conn));
- 							goto error;
- 						}
- 						still_sending = false;
- 						break;	/* ignore the rest of this XLogData packet */
- 					}
- 				}
- 			}
- 			/* No more data left to write, receive next copy packet */
  		}
  		else
  		{
--- 826,851 ----
  			goto error;
  		if (r == -2)
  		{
! 			PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! 													 basedir, partial_suffix, stoppos);
! 			if (res == NULL)
! 				goto error;
! 			else
! 				return res;
  		}
  
  		/* Check the message type. */
  		if (copybuf[0] == 'k')
  		{
! 			if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
! 									 &last_status))
  				goto error;
  		}
  		else if (copybuf[0] == 'w')
  		{
! 			if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! 									timeline, basedir, stream_stop, partial_suffix))
  				goto error;
  		}
  		else
  		{
***************
*** 1135,1137 **** CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
--- 971,1195 ----
  	*buffer = copybuf;
  	return rawlen;
  }
+ 
+ /*
+  * Process the keepalive message.
+  */
+ static bool
+ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ 					XLogRecPtr blockpos, int64 *last_status)
+ {
+ 	int			pos;
+ 	bool		replyRequested;
+ 	int64		now;
+ 
+ 	/*
+ 	 * Parse the keepalive message, enclosed in the CopyData message.
+ 	 * We just check if the server requested a reply, and ignore the
+ 	 * rest.
+ 	 */
+ 	pos = 1;			/* skip msgtype 'k' */
+ 	pos += 8;			/* skip walEnd */
+ 	pos += 8;			/* skip sendTime */
+ 
+ 	if (len < pos + 1)
+ 	{
+ 		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+ 				progname, len);
+ 		return false;
+ 	}
+ 	replyRequested = copybuf[pos];
+ 
+ 	/* If the server requested an immediate reply, send one. */
+ 	if (replyRequested && still_sending)
+ 	{
+ 		now = feGetCurrentTimestamp();
+ 		if (!sendFeedback(conn, blockpos, now, false))
+ 			return false;
+ 		*last_status = now;
+ 	}
+ 
+ 	return true;
+ }
+ 
+ /*
+  * Process XLogData message.
+  */
+ static bool
+ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+ 				   XLogRecPtr *blockpos, uint32 timeline,
+ 				   char *basedir, stream_stop_callback stream_stop,
+ 				   char *partial_suffix)
+ {
+ 	int			xlogoff;
+ 	int			bytes_left;
+ 	int			bytes_written;
+ 	int			hdr_len;
+ 
+ 	/*
+ 	 * Once we've decided we don't want to receive any more, just
+ 	 * ignore any subsequent XLogData messages.
+ 	 */
+ 	if (!(still_sending))
+ 		return true;
+ 
+ 	/*
+ 	 * Read the header of the XLogData message, enclosed in the
+ 	 * CopyData message. We only need the WAL location field
+ 	 * (dataStart), the rest of the header is ignored.
+ 	 */
+ 	hdr_len = 1;		/* msgtype 'w' */
+ 	hdr_len += 8;		/* dataStart */
+ 	hdr_len += 8;		/* walEnd */
+ 	hdr_len += 8;		/* sendTime */
+ 	if (len < hdr_len)
+ 	{
+ 		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+ 				progname, len);
+ 		return false;
+ 	}
+ 	*blockpos = fe_recvint64(&copybuf[1]);
+ 
+ 	/* Extract WAL location for this block */
+ 	xlogoff = *blockpos % XLOG_SEG_SIZE;
+ 
+ 	/*
+ 	 * Verify that the initial location in the stream matches where we
+ 	 * think we are.
+ 	 */
+ 	if (walfile == -1)
+ 	{
+ 		/* No file open yet */
+ 		if (xlogoff != 0)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: received transaction log record for offset %u with no file open\n"),
+ 					progname, xlogoff);
+ 			return false;
+ 		}
+ 	}
+ 	else
+ 	{
+ 		/* More data in existing segment */
+ 		/* XXX: store seek value don't reseek all the time */
+ 		if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: got WAL data offset %08x, expected %08x\n"),
+ 					progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ 			return false;
+ 		}
+ 	}
+ 
+ 	bytes_left = len - hdr_len;
+ 	bytes_written = 0;
+ 
+ 	while (bytes_left)
+ 	{
+ 		int			bytes_to_write;
+ 
+ 		/*
+ 		 * If crossing a WAL boundary, only write up until we reach
+ 		 * XLOG_SEG_SIZE.
+ 		 */
+ 		if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+ 			bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+ 		else
+ 			bytes_to_write = bytes_left;
+ 
+ 		if (walfile == -1)
+ 		{
+ 			if (!open_walfile(*blockpos, timeline,
+ 							  basedir, partial_suffix))
+ 			{
+ 				/* Error logged by open_walfile */
+ 				return false;
+ 			}
+ 		}
+ 
+ 		if (write(walfile,
+ 				  copybuf + hdr_len + bytes_written,
+ 				  bytes_to_write) != bytes_to_write)
+ 		{
+ 			fprintf(stderr,
+ 					_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+ 					progname, bytes_to_write, current_walfile_name,
+ 					strerror(errno));
+ 			return false;
+ 		}
+ 
+ 		/* Write was successful, advance our position */
+ 		bytes_written += bytes_to_write;
+ 		bytes_left -= bytes_to_write;
+ 		*blockpos += bytes_to_write;
+ 		xlogoff += bytes_to_write;
+ 
+ 		/* Did we reach the end of a WAL segment? */
+ 		if (*blockpos % XLOG_SEG_SIZE == 0)
+ 		{
+ 			if (!close_walfile(basedir, partial_suffix, *blockpos))
+ 				/* Error message written in close_walfile() */
+ 				return false;
+ 
+ 			xlogoff = 0;
+ 
+ 			if (still_sending && stream_stop(*blockpos, timeline, true))
+ 			{
+ 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 				{
+ 					fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ 							progname, PQerrorMessage(conn));
+ 					return false;
+ 				}
+ 				still_sending = false;
+ 				return true;	/* ignore the rest of this XLogData packet */
+ 			}
+ 		}
+ 	}
+ 	/* No more data left to write, receive next copy packet */
+ 
+ 	return true;
+ }
+ 
+ /*
+  * Handle end of the copy stream.
+  */
+ static PGresult *
+ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+ 					  XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+ 					  XLogRecPtr *stoppos)
+ {
+ 	PGresult   *res = PQgetResult(conn);
+ 
+ 	/*
+ 	 * The server closed its end of the copy stream.  If we haven't
+ 	 * closed ours already, we need to do so now, unless the server
+ 	 * threw an error, in which case we don't.
+ 	 */
+ 	if (still_sending)
+ 	{
+ 		if (!close_walfile(basedir, partial_suffix, blockpos))
+ 		{
+ 			/* Error message written in close_walfile() */
+ 			PQclear(res);
+ 			return NULL;
+ 		}
+ 		if (PQresultStatus(res) == PGRES_COPY_IN)
+ 		{
+ 			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ 			{
+ 				fprintf(stderr,
+ 						_("%s: could not send copy-end packet: %s"),
+ 						progname, PQerrorMessage(conn));
+ 				PQclear(res);
+ 				return NULL;
+ 			}
+ 			res = PQgetResult(conn);
+ 		}
+ 		still_sending = false;
+ 	}
+ 	if (copybuf != NULL)
+ 		PQfreemem(copybuf);
+ 	*stoppos = blockpos;
+ 	return res;
+ }
#27Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#26)
Re: pg_receivexlog add synchronous mode

I have improved the patch by making following changes:

1. Since stream_stop() was redundant, stream_stop() at the time of

WAL file closing was deleted.

2. Change the Flash judging timing for the readability of source code.
I have changed the Flash judging timing , from the continuous

message after receiving to

before the feedbackmassege decision of continue statement after

execution.

Thanks for the updated version of the patch!

While reviewing the patch, I found that HandleCopyStream() is still
long and which decreases the readability of the source code.
So I feel inclined to refactor the HandleCopyStream() more for better
readability. What about the attached refactoring patch?

Sorry, I forgot to attached the patch in previous email. So attached.

Thank you for the refactoring patch.
I did a review of the patch.

- break; /* ignore the rest of this XLogData packet */

+ return true; /* ignore the rest of this XLogData packet */

For break statement at close of wal file, it is a return to true.
It may be a behavior of continue statement. Is it satisfactory?

The walreceiver distributes XLogWalRcvProcessMsg and XLogWalRcvWrite, but isn't that division necessary?

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#28Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#27)
Re: pg_receivexlog add synchronous mode

On Wed, Aug 6, 2014 at 2:34 PM, <furuyao@pm.nttdata.co.jp> wrote:

I have improved the patch by making following changes:

1. Since stream_stop() was redundant, stream_stop() at the time of

WAL file closing was deleted.

2. Change the Flash judging timing for the readability of source code.
I have changed the Flash judging timing , from the continuous

message after receiving to

before the feedbackmassege decision of continue statement after

execution.

Thanks for the updated version of the patch!

While reviewing the patch, I found that HandleCopyStream() is still
long and which decreases the readability of the source code.
So I feel inclined to refactor the HandleCopyStream() more for better
readability. What about the attached refactoring patch?

Sorry, I forgot to attached the patch in previous email. So attached.

Thank you for the refactoring patch.
I did a review of the patch.

- break; /* ignore the rest of this XLogData packet */

+ return true; /* ignore the rest of this XLogData packet */

For break statement at close of wal file, it is a return to true.
It may be a behavior of continue statement. Is it satisfactory?

Sorry I failed to see your point.

In the original code, when we reach the end of WAL file and it's streaming
stopping point, we break out of inner loop in the code block for processing
XLogData packet. And then we goes back to top of outer loop in
HandleCopyStream. ISTM that the refactored code also works the same way.
Anyway, could you elaborate the problem?

The walreceiver distributes XLogWalRcvProcessMsg and XLogWalRcvWrite, but isn't that division necessary?

Not necessary, but I have no objection to the idea.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#29Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#28)
Re: pg_receivexlog add synchronous mode

- break; /* ignore

the rest of this XLogData packet */

+ return true; /* ignore the rest of

this XLogData packet */

For break statement at close of wal file, it is a return to true.
It may be a behavior of continue statement. Is it satisfactory?

Sorry I failed to see your point.

In the original code, when we reach the end of WAL file and it's streaming
stopping point, we break out of inner loop in the code block for
processing XLogData packet. And then we goes back to top of outer loop
in HandleCopyStream. ISTM that the refactored code also works the same
way.
Anyway, could you elaborate the problem?

I'm sorry. I was confused with the patch that I have to offer.
It is necessary to exit the loop since the loop added to the continuously received the message if the patch.
Refactor patch is no problem with the contents of the presentation.

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#30Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#29)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

On Wed, Aug 6, 2014 at 5:10 PM, <furuyao@pm.nttdata.co.jp> wrote:

- break; /* ignore

the rest of this XLogData packet */

+ return true; /* ignore the rest of

this XLogData packet */

For break statement at close of wal file, it is a return to true.
It may be a behavior of continue statement. Is it satisfactory?

Sorry I failed to see your point.

In the original code, when we reach the end of WAL file and it's streaming
stopping point, we break out of inner loop in the code block for
processing XLogData packet. And then we goes back to top of outer loop
in HandleCopyStream. ISTM that the refactored code also works the same
way.
Anyway, could you elaborate the problem?

I'm sorry. I was confused with the patch that I have to offer.
It is necessary to exit the loop since the loop added to the continuously received the message if the patch.
Refactor patch is no problem with the contents of the presentation.

Okay, applied the patch.

I heavily modified your patch based on the master that the refactoring
patch has been applied. Attached is that modified version. Could you
review that?

Regards,

--
Fujii Masao

Attachments:

pg_receivexlog_add_fsync_interval_v5_fujii.patchtext/x-patch; charset=US-ASCII; name=pg_receivexlog_add_fsync_interval_v5_fujii.patchDownload
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index 7c50b01..c15776f 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -106,6 +106,21 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+       <term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
+       <listitem>
+        <para>
+        Specifies the maximum time to issue sync commands to ensure the
+        received WAL file is safely flushed to disk, in seconds. The default
+        value is zero, which disables issuing fsyncs except when WAL file is
+        closed. If <literal>-1</literal> is specified, WAL file is flushed as
+        soon as possible, that is, as soon as there are WAL data which has
+        not been flushed yet.
+        </para>
+       </listitem>
+      </varlistentry>
+
+     <varlistentry>
       <term><option>-v</option></term>
       <term><option>--verbose</option></term>
       <listitem>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 5df2eb8..0b02c4c 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
 	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
 						   param->sysidentifier, param->xlogdir,
 						   reached_end_position, standby_message_timeout,
-						   NULL))
+						   NULL, 0))
 
 		/*
 		 * Any errors will already have been reported in the function process,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 9640838..0b7af54 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -36,6 +36,7 @@ static char *basedir = NULL;
 static int	verbose = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+static int	fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
 
 
@@ -62,6 +63,8 @@ usage(void)
 	printf(_("\nOptions:\n"));
 	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+	printf(_("  -F  --fsync-interval=INTERVAL\n"
+			 "                         frequency of syncs to transaction log files (in seconds)\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
@@ -330,7 +333,8 @@ StreamLog(void)
 				starttli);
 
 	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
-					  stop_streaming, standby_message_timeout, ".partial");
+					  stop_streaming, standby_message_timeout, ".partial",
+					  fsync_interval);
 
 	PQfinish(conn);
 }
@@ -360,6 +364,7 @@ main(int argc, char **argv)
 		{"port", required_argument, NULL, 'p'},
 		{"username", required_argument, NULL, 'U'},
 		{"no-loop", no_argument, NULL, 'n'},
+		{"fsync-interval", required_argument, NULL, 'F'},
 		{"no-password", no_argument, NULL, 'w'},
 		{"password", no_argument, NULL, 'W'},
 		{"status-interval", required_argument, NULL, 's'},
@@ -389,7 +394,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -436,6 +441,15 @@ main(int argc, char **argv)
 			case 'n':
 				noloop = 1;
 				break;
+		case 'F':
+			fsync_interval = atoi(optarg) * 1000;
+			if (fsync_interval < -1000)
+			{
+				fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+						progname, optarg);
+				exit(1);
+			}
+			break;
 			case 'v':
 				verbose++;
 				break;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d28e13b..89b22f2 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static int64 last_fsync = -1;		/* timestamp of last WAL file flush */
 static bool still_sending = true;		/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
-				 char *partial_suffix, XLogRecPtr *stoppos);
+				  char *partial_suffix, XLogRecPtr *stoppos,
+				  int fsync_interval);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
 									   XLogRecPtr blockpos, char *basedir, char *partial_suffix,
 									   XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
+								uint32 timeline, char *basedir,
+								stream_stop_callback stream_stop,
+								char *partial_suffix, XLogRecPtr *stoppos);
+static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+										 int64 last_status, int fsync_interval,
+										 XLogRecPtr blockpos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
 				progname, current_walfile_name, partial_suffix);
 
 	lastFlushPosition = pos;
+	last_fsync = feGetCurrentTimestamp();
 	return true;
 }
 
@@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
  * allows you to tell the difference between partial and completed files,
  * so that you can continue later where you left.
  *
+ * fsync_interval controls how often we flush to the received WAL file,
+ * in milliseconds.
+ *
  * Note: The log position *must* be at a log segment start!
  */
 bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 				  char *sysidentifier, char *basedir,
 				  stream_stop_callback stream_stop,
-				  int standby_message_timeout, char *partial_suffix)
+				  int standby_message_timeout, char *partial_suffix,
+				  int fsync_interval)
 {
 	char		query[128];
 	char		slotcmd[128];
@@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		/* Stream the WAL */
 		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
 							   standby_message_timeout, partial_suffix,
-							   &stoppos);
+							   &stoppos, fsync_interval);
 		if (res == NULL)
 			goto error;
 
@@ -746,7 +760,7 @@ static PGresult *
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 				 char *basedir, stream_stop_callback stream_stop,
 				 int standby_message_timeout, char *partial_suffix,
-				 XLogRecPtr *stoppos)
+				 XLogRecPtr *stoppos, int fsync_interval)
 {
 	char	   *copybuf = NULL;
 	int64		last_status = -1;
@@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		/*
 		 * Check if we should continue streaming, or abort at this point.
 		 */
-		if (still_sending && stream_stop(blockpos, timeline, false))
+		if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+								stream_stop, partial_suffix, stoppos))
+			goto error;
+
+		now = feGetCurrentTimestamp();
+
+		/*
+		 * If fsync_interval has elapsed since last WAL flush and we've written
+		 * some WAL data, flush them to disk.
+		 */
+		if (lastFlushPosition < blockpos &&
+			walfile != -1 &&
+			((fsync_interval > 0 &&
+			  feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
+			 fsync_interval < 0))
 		{
-			if (!close_walfile(basedir, partial_suffix, blockpos))
-			{
-				/* Potential error message is written by close_walfile */
-				goto error;
-			}
-			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			if (fsync(walfile) != 0)
 			{
-				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-						progname, PQerrorMessage(conn));
+				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+						progname, current_walfile_name, strerror(errno));
 				goto error;
 			}
-			still_sending = false;
+
+			lastFlushPosition = blockpos;
+			last_fsync = now;
 		}
 
 		/*
 		 * Potentially send a status message to the master
 		 */
-		now = feGetCurrentTimestamp();
 		if (still_sending && standby_message_timeout > 0 &&
 			feTimestampDifferenceExceeds(last_status, now,
 										 standby_message_timeout))
@@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		}
 
 		/*
-		 * Compute how long send/receive loops should sleep
+		 * Calculate how long send/receive loops should sleep
 		 */
-		if (standby_message_timeout && still_sending)
-		{
-			int64		targettime;
-			long		secs;
-			int			usecs;
-
-			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-			feTimestampDifference(now,
-								  targettime,
-								  &secs,
-								  &usecs);
-			/* Always sleep at least 1 sec */
-			if (secs <= 0)
-			{
-				secs = 1;
-				usecs = 0;
-			}
-
-			sleeptime = secs * 1000 + usecs / 1000;
-		}
-		else
-			sleeptime = -1;
+		sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+												 last_status, fsync_interval, blockpos);
 
 		r = CopyStreamReceive(conn, sleeptime, &copybuf);
-		if (r == 0)
-			continue;
-		if (r == -1)
-			goto error;
-		if (r == -2)
+		while (r != 0)
 		{
-			PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
-													 basedir, partial_suffix, stoppos);
-			if (res == NULL)
+			if (r == -1)
 				goto error;
-			else
-				return res;
-		}
+			if (r == -2)
+			{
+				PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+														 basedir, partial_suffix, stoppos);
+				if (res == NULL)
+					goto error;
+				else
+					return res;
+			}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
-		{
-			if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
-									 &last_status))
-				goto error;
-		}
-		else if (copybuf[0] == 'w')
-		{
-			if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
-									timeline, basedir, stream_stop, partial_suffix))
+			/* Check the message type. */
+			if (copybuf[0] == 'k')
+			{
+				if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+										 &last_status))
+					goto error;
+			}
+			else if (copybuf[0] == 'w')
+			{
+				if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+										timeline, basedir, stream_stop, partial_suffix))
+					goto error;
+
+				/*
+				 * Check if we should continue streaming, or abort at this point.
+				 */
+				if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+										 stream_stop, partial_suffix, stoppos))
+					goto error;
+			}
+			else
+			{
+				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+						progname, copybuf[0]);
 				goto error;
-		}
-		else
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
+			}
+
+			/*
+			 * Process the received data, and any subsequent data we
+			 * can read without blocking.
+			 */
+			r = CopyStreamReceive(conn, 0, &copybuf);
 		}
 	}
 
@@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
 	*stoppos = blockpos;
 	return res;
 }
+
+/*
+ * Check if we should continue streaming, or abort at this point.
+ */
+static bool
+CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
+					char *basedir, stream_stop_callback stream_stop,
+					char *partial_suffix, XLogRecPtr *stoppos)
+{
+	if (still_sending && stream_stop(blockpos, timeline, false))
+	{
+		if (!close_walfile(basedir, partial_suffix, blockpos))
+		{
+			/* Potential error message is written by close_walfile */
+			return false;
+		}
+		if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+		{
+			fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+					progname, PQerrorMessage(conn));
+			return false;
+		}
+		still_sending = false;
+	}
+
+	return true;
+}
+
+/*
+ * Calculate how long send/receive loops should sleep
+ */
+static long
+CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+							 int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+{
+	int64		targettime = 0;
+	int64		status_targettime = 0;
+	int64		fsync_targettime = 0;
+	long		sleeptime;
+
+	if (standby_message_timeout && still_sending)
+		status_targettime = last_status +
+			(standby_message_timeout - 1) * ((int64) 1000);
+
+	if (fsync_interval > 0 && lastFlushPosition < blockpos)
+		fsync_targettime = last_fsync +
+			(fsync_interval - 1) * ((int64) 1000);
+
+	if ((status_targettime < fsync_targettime && status_targettime > 0) ||
+		fsync_targettime == 0)
+		targettime = status_targettime;
+	else
+		targettime = fsync_targettime;
+
+	if (targettime > 0)
+	{
+		long		secs;
+		int			usecs;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		/* Always sleep at least 1 sec */
+		if (secs <= 0)
+		{
+			secs = 1;
+			usecs = 0;
+		}
+
+		sleeptime = secs * 1000 + usecs / 1000;
+	}
+	else
+		sleeptime = -1;
+
+	return sleeptime;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index f4789a5..72f8245 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
 				  char *basedir,
 				  stream_stop_callback stream_stop,
 				  int standby_message_timeout,
-				  char *partial_suffix);
+				  char *partial_suffix,
+				  int fsync_interval);
#31Noname
furuyao@pm.nttdata.co.jp
In reply to: Fujii Masao (#30)
Re: pg_receivexlog add synchronous mode

Okay, applied the patch.

I heavily modified your patch based on the master that the refactoring
patch has been applied. Attached is that modified version. Could you
review that?

Thank you for the patch.
I did a review of the patch.

No problem in the patch.

Behavior after the true return of ProcessXLogDataMsg was changed by the patch.
Although it was moving to while(1), it has changed so that a while(r != 0) loop may be continued.
Since still_sending is false, although skip processing is performed, a result of operation does not change.

Regards,

--
Furuya Osamu

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#32Fujii Masao
masao.fujii@gmail.com
In reply to: Noname (#31)
Re: pg_receivexlog add synchronous mode

On Thu, Aug 7, 2014 at 5:24 PM, <furuyao@pm.nttdata.co.jp> wrote:

Okay, applied the patch.

I heavily modified your patch based on the master that the refactoring
patch has been applied. Attached is that modified version. Could you
review that?

Thank you for the patch.
I did a review of the patch.

No problem in the patch.

Thanks for the review! Applied the patch.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#33Michael Paquier
michael.paquier@gmail.com
In reply to: Fujii Masao (#32)
1 attachment(s)
Re: pg_receivexlog add synchronous mode

On Fri, Aug 8, 2014 at 5:10 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

Thanks for the review! Applied the patch.

I noticed that the tab padding for the new option -F in the getops
switch is incorrect. Attached patch corrects that. pgindent would have
caught that anyway, but it doesn't hurt to be correct now.
Thanks,
--
Michael

Attachments:

20140812_pg_receivexlog_tabpadding.patchtext/x-patch; charset=US-ASCII; name=20140812_pg_receivexlog_tabpadding.patchDownload
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 0b7af54..4483c87 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -441,15 +441,15 @@ main(int argc, char **argv)
 			case 'n':
 				noloop = 1;
 				break;
-		case 'F':
-			fsync_interval = atoi(optarg) * 1000;
-			if (fsync_interval < -1000)
-			{
-				fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
-						progname, optarg);
-				exit(1);
-			}
-			break;
+			case 'F':
+				fsync_interval = atoi(optarg) * 1000;
+				if (fsync_interval < -1000)
+				{
+					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 			case 'v':
 				verbose++;
 				break;
#34Fujii Masao
masao.fujii@gmail.com
In reply to: Michael Paquier (#33)
Re: pg_receivexlog add synchronous mode

On Tue, Aug 12, 2014 at 4:34 PM, Michael Paquier
<michael.paquier@gmail.com> wrote:

On Fri, Aug 8, 2014 at 5:10 PM, Fujii Masao <masao.fujii@gmail.com> wrote:

Thanks for the review! Applied the patch.

I noticed that the tab padding for the new option -F in the getops
switch is incorrect. Attached patch corrects that. pgindent would have
caught that anyway, but it doesn't hurt to be correct now.

Yeah, that's a problem. But, as you clearly said, upcoming pgindent
would fix this kind of problem. So I'm not sure if it's really worth fixing
only this case independently.

Regards,

--
Fujii Masao

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers