Re: Your review of pg_receivexlog/pg_basebackup

Started by Heikki Linnakangasabout 14 years ago6 messages
#1Heikki Linnakangas
heikki.linnakangas@enterprisedb.com

(CC'ing pgsql-hackers, this started as an IM discussion yesterday but
really belongs in the archives)

On 25.10.2011 23:52, Magnus Hagander wrote:

There's a tiny chance to get incomplete xlog files with pg_receivexlog if you crash:
1. pg_receivexlog finishes write()ing a file but system crashes before fsync() finishes.
2. When pg_receivexlog restarts after crash, the last WAL file was not fully flushed to disk, with
holes in the middle, but it has the right length. pg_receivexlog will continue streaming from the next file.
not sure if we care about such a narrow window, but maybe we do

So how would we go about fixing that? Always unlink the last file in
the directory and try from there would seem dangerous too - what if
it's not available on the master anymore, then we might have given up
on data...

Start streaming from the beginning of the last segment, but don't unlink
it first. Just overwrite it as you receive the data.

Or, always create new xlog file as "0000000100000001000000D3.partial",
and only when it's fully written, fsync it, and then rename it to
"0000000100000001000000D3". Then you know that if a file doesn't have
the .partial suffix, it's complete. The fact that the last partial file
always has the .partial suffix needs some extra pushups in the
restore_command, though.

--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com

#2Magnus Hagander
magnus@hagander.net
In reply to: Heikki Linnakangas (#1)
1 attachment(s)

On Wed, Oct 26, 2011 at 09:52, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:

(CC'ing pgsql-hackers, this started as an IM discussion yesterday but really
belongs in the archives)

On 25.10.2011 23:52, Magnus Hagander wrote:

There's a tiny chance to get incomplete xlog files with pg_receivexlog if
you crash:
1. pg_receivexlog finishes write()ing a file but system crashes before
fsync() finishes.
2. When pg_receivexlog restarts after crash, the last WAL file was not
fully flushed to disk, with
holes in the middle, but it has the right length. pg_receivexlog will
continue streaming from the next file.
not sure if we care about such a narrow window, but maybe we do

So how would we go about fixing that?  Always unlink the last file in
the directory and try from there would seem dangerous too - what if
it's not available on the master anymore, then we might have given up
on data...

Start streaming from the beginning of the last segment, but don't unlink it
first. Just overwrite it as you receive the data.

Or, always create new xlog file as "0000000100000001000000D3.partial", and
only when it's fully written, fsync it, and then rename it to
"0000000100000001000000D3". Then you know that if a file doesn't have the
.partial suffix, it's complete. The fact that the last partial file always
has the .partial suffix needs some extra pushups in the restore_command,
though.

Here's a version that does this. Turns out this requires a lot less
code than what was previously in there, which is always nice.

We still need to solve the other part which is how to deal with the
partial files on restore. But this is definitely a cleaner way from a
pure pg_receivexlog perspective.

Comments/reviews?

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

Attachments:

pg_receivexlog_partial.patchtext/x-patch; charset=US-ASCII; name=pg_receivexlog_partial.patchDownload
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 71,104 **** usage(void)
  static bool
  segment_callback(XLogRecPtr segendpos, uint32 timeline)
  {
- 	char		fn[MAXPGPATH];
- 	struct stat statbuf;
- 
  	if (verbose)
  		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
  				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
  
  	/*
- 	 * Check if there is a partial file for the name we just finished, and if
- 	 * there is, remove it under the assumption that we have now got all the
- 	 * data we need.
- 	 */
- 	segendpos.xrecoff /= XLOG_SEG_SIZE;
- 	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
- 	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
- 			 basedir, timeline,
- 			 segendpos.xlogid,
- 			 segendpos.xrecoff);
- 	if (stat(fn, &statbuf) == 0)
- 	{
- 		/* File existed, get rid of it */
- 		if (verbose)
- 			fprintf(stderr, _("%s: removing file \"%s\"\n"),
- 					progname, fn);
- 		unlink(fn);
- 	}
- 
- 	/*
  	 * Never abort from this - we handle all aborting in continue_streaming()
  	 */
  	return false;
--- 71,81 ----
***************
*** 133,139 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  	bool		b;
  	uint32		high_log = 0;
  	uint32		high_seg = 0;
- 	bool		partial = false;
  
  	dir = opendir(basedir);
  	if (dir == NULL)
--- 110,115 ----
***************
*** 195,201 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  			disconnect_and_exit(1);
  		}
  
! 		if (statbuf.st_size == 16 * 1024 * 1024)
  		{
  			/* Completed segment */
  			if (log > high_log ||
--- 171,177 ----
  			disconnect_and_exit(1);
  		}
  
! 		if (statbuf.st_size == XLOG_SEG_SIZE)
  		{
  			/* Completed segment */
  			if (log > high_log ||
***************
*** 208,244 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  		}
  		else
  		{
! 			/*
! 			 * This is a partial file. Rename it out of the way.
! 			 */
! 			char		newfn[MAXPGPATH];
! 
! 			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
! 					progname, dirent->d_name, dirent->d_name);
! 
! 			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
! 					 basedir, dirent->d_name);
! 
! 			if (stat(newfn, &statbuf) == 0)
! 			{
! 				/*
! 				 * XXX: perhaps we should only error out if the existing file
! 				 * is larger?
! 				 */
! 				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
! 						progname, newfn);
! 				disconnect_and_exit(1);
! 			}
! 			if (rename(fullpath, newfn) != 0)
! 			{
! 				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
! 						progname, fullpath, newfn, strerror(errno));
! 				disconnect_and_exit(1);
! 			}
! 
! 			/* Don't continue looking for more, we assume this is the last */
! 			partial = true;
! 			break;
  		}
  	}
  
--- 184,192 ----
  		}
  		else
  		{
! 			fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"),
! 					progname, dirent->d_name, (int) statbuf.st_size);
! 			continue;
  		}
  	}
  
***************
*** 247,263 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  	if (high_log > 0 || high_seg > 0)
  	{
  		XLogRecPtr	high_ptr;
! 
! 		if (!partial)
! 		{
! 			/*
! 			 * If the segment was partial, the pointer is already at the right
! 			 * location since we want to re-transmit that segment. If it was
! 			 * not, we need to move it to the next segment, since we are
! 			 * tracking the last one that was complete.
! 			 */
! 			NextLogSeg(high_log, high_seg);
! 		}
  
  		high_ptr.xlogid = high_log;
  		high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
--- 195,205 ----
  	if (high_log > 0 || high_seg > 0)
  	{
  		XLogRecPtr	high_ptr;
! 		/*
! 		 * Move the starting pointer to the start of the next segment,
! 		 * since the highest one we've seen was completed.
! 		 */
! 		NextLogSeg(high_log, high_seg);
  
  		high_ptr.xlogid = high_log;
  		high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 51,64 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
  	XLogFileName(namebuf, timeline, startpoint.xlogid,
  				 startpoint.xrecoff / XLOG_SEG_SIZE);
  
! 	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
! 	f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
  	if (f == -1)
  		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! 				progname, namebuf, strerror(errno));
  	return f;
  }
  
  /*
   * Local version of GetCurrentTimestamp(), since we are not linked with
   * backend code.
--- 51,114 ----
  	XLogFileName(namebuf, timeline, startpoint.xlogid,
  				 startpoint.xrecoff / XLOG_SEG_SIZE);
  
! 	snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
! 	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666);
  	if (f == -1)
  		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 
  	return f;
  }
  
+ static bool
+ close_walfile(int walfile, char *basedir, char *walname)
+ {
+ 	off_t		size = lseek(walfile, 0, SEEK_END);
+ 
+ 	if (size == -1)
+ 	{
+ 		fprintf(stderr, _("%s: could not get size of written file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	if (fsync(walfile) != 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	if (close(walfile) != 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	/* Rename the .partial file only if it's 16Mb */
+ 	if (size == XLOG_SEG_SIZE)
+ 	{
+ 		char		oldfn[MAXPGPATH];
+ 		char		newfn[MAXPGPATH];
+ 
+ 		snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
+ 		snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+ 		if (rename(oldfn, newfn) != 0)
+ 		{
+ 			fprintf(stderr, _("%s: could not rename file %s: %s\n"),
+ 					progname, walname, strerror(errno));
+ 			return false;
+ 		}
+ 	}
+ 	else
+ 		fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
+ 				progname, walname);
+ 
+ 	return true;
+ }
+ 
+ 
  /*
   * Local version of GetCurrentTimestamp(), since we are not linked with
   * backend code.
***************
*** 178,187 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
! 			{
! 				fsync(walfile);
! 				close(walfile);
! 			}
  			return true;
  		}
  
--- 228,235 ----
  		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
! 				/* Potential error message is written by close_walfile */
! 				return close_walfile(walfile, basedir, current_walfile_name);
  			return true;
  		}
  
***************
*** 360,367 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				fsync(walfile);
! 				close(walfile);
  				walfile = -1;
  				xlogoff = 0;
  
--- 408,417 ----
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				if (!close_walfile(walfile, basedir, current_walfile_name))
! 					/* Error message written in close_walfile() */
! 					return false;
! 
  				walfile = -1;
  				xlogoff = 0;
  
#3Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#2)

On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <magnus@hagander.net> wrote:

Here's a version that does this. Turns out this requires a lot less
code than what was previously in there, which is always nice.

We still need to solve the other part which is how to deal with the
partial files on restore. But this is definitely a cleaner way from a
pure pg_receivexlog perspective.

Comments/reviews?

Looks good.

Minor comment:
the source code comment of FindStreamingStart() seems to need to be updated.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#4Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#3)
1 attachment(s)

On Fri, Oct 28, 2011 at 08:46, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <magnus@hagander.net> wrote:

Here's a version that does this. Turns out this requires a lot less
code than what was previously in there, which is always nice.

We still need to solve the other part which is how to deal with the
partial files on restore. But this is definitely a cleaner way from a
pure pg_receivexlog perspective.

Comments/reviews?

Looks good.

Minor comment:
the source code comment of FindStreamingStart() seems to need to be updated.

Here's an updated patch that both includes this update to the comment,
and also the functionality to pre-pad files to 16Mb. This also seems
to have simplified the code, which is a nice bonus.

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/

Attachments:

pg_receivexlog.patchtext/x-patch; charset=US-ASCII; name=pg_receivexlog.patchDownload
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 71,104 **** usage(void)
  static bool
  segment_callback(XLogRecPtr segendpos, uint32 timeline)
  {
- 	char		fn[MAXPGPATH];
- 	struct stat statbuf;
- 
  	if (verbose)
  		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
  				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
  
  	/*
- 	 * Check if there is a partial file for the name we just finished, and if
- 	 * there is, remove it under the assumption that we have now got all the
- 	 * data we need.
- 	 */
- 	segendpos.xrecoff /= XLOG_SEG_SIZE;
- 	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
- 	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
- 			 basedir, timeline,
- 			 segendpos.xlogid,
- 			 segendpos.xrecoff);
- 	if (stat(fn, &statbuf) == 0)
- 	{
- 		/* File existed, get rid of it */
- 		if (verbose)
- 			fprintf(stderr, _("%s: removing file \"%s\"\n"),
- 					progname, fn);
- 		unlink(fn);
- 	}
- 
- 	/*
  	 * Never abort from this - we handle all aborting in continue_streaming()
  	 */
  	return false;
--- 71,81 ----
***************
*** 119,127 **** continue_streaming(void)
  /*
   * Determine starting location for streaming, based on:
   * 1. If there are existing xlog segments, start at the end of the last one
!  * 2. If the last one is a partial segment, rename it and start over, since
!  *	  we don't sync after every write.
!  * 3. If no existing xlog exists, start from the beginning of the current
   *	  WAL segment.
   */
  static XLogRecPtr
--- 96,103 ----
  /*
   * Determine starting location for streaming, based on:
   * 1. If there are existing xlog segments, start at the end of the last one
!  *    that is complete (size matches XLogSegSize)
!  * 2. If no valid xlog exists, start from the beginning of the current
   *	  WAL segment.
   */
  static XLogRecPtr
***************
*** 133,139 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  	bool		b;
  	uint32		high_log = 0;
  	uint32		high_seg = 0;
- 	bool		partial = false;
  
  	dir = opendir(basedir);
  	if (dir == NULL)
--- 109,114 ----
***************
*** 195,201 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  			disconnect_and_exit(1);
  		}
  
! 		if (statbuf.st_size == 16 * 1024 * 1024)
  		{
  			/* Completed segment */
  			if (log > high_log ||
--- 170,176 ----
  			disconnect_and_exit(1);
  		}
  
! 		if (statbuf.st_size == XLOG_SEG_SIZE)
  		{
  			/* Completed segment */
  			if (log > high_log ||
***************
*** 208,244 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  		}
  		else
  		{
! 			/*
! 			 * This is a partial file. Rename it out of the way.
! 			 */
! 			char		newfn[MAXPGPATH];
! 
! 			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
! 					progname, dirent->d_name, dirent->d_name);
! 
! 			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
! 					 basedir, dirent->d_name);
! 
! 			if (stat(newfn, &statbuf) == 0)
! 			{
! 				/*
! 				 * XXX: perhaps we should only error out if the existing file
! 				 * is larger?
! 				 */
! 				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
! 						progname, newfn);
! 				disconnect_and_exit(1);
! 			}
! 			if (rename(fullpath, newfn) != 0)
! 			{
! 				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
! 						progname, fullpath, newfn, strerror(errno));
! 				disconnect_and_exit(1);
! 			}
! 
! 			/* Don't continue looking for more, we assume this is the last */
! 			partial = true;
! 			break;
  		}
  	}
  
--- 183,191 ----
  		}
  		else
  		{
! 			fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"),
! 					progname, dirent->d_name, (int) statbuf.st_size);
! 			continue;
  		}
  	}
  
***************
*** 247,263 **** FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
  	if (high_log > 0 || high_seg > 0)
  	{
  		XLogRecPtr	high_ptr;
! 
! 		if (!partial)
! 		{
! 			/*
! 			 * If the segment was partial, the pointer is already at the right
! 			 * location since we want to re-transmit that segment. If it was
! 			 * not, we need to move it to the next segment, since we are
! 			 * tracking the last one that was complete.
! 			 */
! 			NextLogSeg(high_log, high_seg);
! 		}
  
  		high_ptr.xlogid = high_log;
  		high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
--- 194,204 ----
  	if (high_log > 0 || high_seg > 0)
  	{
  		XLogRecPtr	high_ptr;
! 		/*
! 		 * Move the starting pointer to the start of the next segment,
! 		 * since the highest one we've seen was completed.
! 		 */
! 		NextLogSeg(high_log, high_seg);
  
  		high_ptr.xlogid = high_log;
  		high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "receivelog.h"
  #include "streamutil.h"
  
+ #include <sys/stat.h>
  #include <sys/time.h>
  #include <sys/types.h>
  #include <unistd.h>
***************
*** 41,64 **** const XLogRecPtr InvalidXLogRecPtr = {0, 0};
   * Open a new WAL file in the specified directory. Store the name
   * (not including the full directory) in namebuf. Assumes there is
   * enough room in this buffer...
   */
  static int
  open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
  {
  	int			f;
  	char		fn[MAXPGPATH];
  
  	XLogFileName(namebuf, timeline, startpoint.xlogid,
  				 startpoint.xrecoff / XLOG_SEG_SIZE);
  
! 	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
! 	f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666);
  	if (f == -1)
  		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! 				progname, namebuf, strerror(errno));
  	return f;
  }
  
  /*
   * Local version of GetCurrentTimestamp(), since we are not linked with
   * backend code.
--- 42,163 ----
   * Open a new WAL file in the specified directory. Store the name
   * (not including the full directory) in namebuf. Assumes there is
   * enough room in this buffer...
+  *
+  * The file will be padded to 16Mb with zeroes.
   */
  static int
  open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
  {
  	int			f;
  	char		fn[MAXPGPATH];
+ 	struct stat	statbuf;
+ 	char	   *zerobuf;
+ 	int			bytes;
  
  	XLogFileName(namebuf, timeline, startpoint.xlogid,
  				 startpoint.xrecoff / XLOG_SEG_SIZE);
  
! 	snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
! 	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666);
  	if (f == -1)
  		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 
! 	/*
! 	 * Verify that the file is either empty (just created), or a complete
! 	 * XLogSegSize segment. Anything in between indicates a corrupt file.
! 	 */
! 	if (fstat(f, &statbuf) != 0)
! 	{
! 		fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 		close(f);
! 		return -1;
! 	}
! 	if (statbuf.st_size == XLogSegSize)
! 		return f; /* File is open and ready to use */
! 	if (statbuf.st_size != 0)
! 	{
! 		fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
! 				progname, fn, (int) statbuf.st_size, XLogSegSize);
! 		close(f);
! 		return -1;
! 	}
! 
! 	/* New, empty, file. So pad it to 16Mb with zeroes */
! 	zerobuf = xmalloc0(XLOG_BLCKSZ);
! 	for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
! 	{
! 		if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
! 		{
! 			fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
! 					progname, fn, strerror(errno));
! 			close(f);
! 			return -1;
! 		}
! 	}
! 	if (lseek(f, SEEK_SET, 0) != 0)
! 	{
! 		fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"),
! 				progname, fn, strerror(errno));
! 		close(f);
! 		return -1;
! 	}
  	return f;
  }
  
+ static bool
+ close_walfile(int walfile, char *basedir, char *walname)
+ {
+ 	off_t		currpos = lseek(walfile, 0, SEEK_CUR);
+ 
+ 	if (currpos == -1)
+ 	{
+ 		fprintf(stderr, _("%s: could not get current position in file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	if (fsync(walfile) != 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	if (close(walfile) != 0)
+ 	{
+ 		fprintf(stderr, _("%s: could not close file %s: %s\n"),
+ 				progname, walname, strerror(errno));
+ 		return false;
+ 	}
+ 
+ 	/*
+ 	 * Rename the .partial file only if we've completed writing the
+ 	 * whole segment.
+ 	 */
+ 	if (currpos == XLOG_SEG_SIZE)
+ 	{
+ 		char		oldfn[MAXPGPATH];
+ 		char		newfn[MAXPGPATH];
+ 
+ 		snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
+ 		snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+ 		if (rename(oldfn, newfn) != 0)
+ 		{
+ 			fprintf(stderr, _("%s: could not rename file %s: %s\n"),
+ 					progname, walname, strerror(errno));
+ 			return false;
+ 		}
+ 	}
+ 	else
+ 		fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
+ 				progname, walname);
+ 
+ 	return true;
+ }
+ 
+ 
  /*
   * Local version of GetCurrentTimestamp(), since we are not linked with
   * backend code.
***************
*** 178,187 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
! 			{
! 				fsync(walfile);
! 				close(walfile);
! 			}
  			return true;
  		}
  
--- 277,284 ----
  		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
! 				/* Potential error message is written by close_walfile */
! 				return close_walfile(walfile, basedir, current_walfile_name);
  			return true;
  		}
  
***************
*** 360,367 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				fsync(walfile);
! 				close(walfile);
  				walfile = -1;
  				xlogoff = 0;
  
--- 457,466 ----
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				if (!close_walfile(walfile, basedir, current_walfile_name))
! 					/* Error message written in close_walfile() */
! 					return false;
! 
  				walfile = -1;
  				xlogoff = 0;
  
#5Fujii Masao
masao.fujii@gmail.com
In reply to: Magnus Hagander (#4)

On Tue, Nov 1, 2011 at 3:08 AM, Magnus Hagander <magnus@hagander.net> wrote:

On Fri, Oct 28, 2011 at 08:46, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <magnus@hagander.net> wrote:

Here's a version that does this. Turns out this requires a lot less
code than what was previously in there, which is always nice.

We still need to solve the other part which is how to deal with the
partial files on restore. But this is definitely a cleaner way from a
pure pg_receivexlog perspective.

Comments/reviews?

Looks good.

Minor comment:
the source code comment of FindStreamingStart() seems to need to be updated.

Here's an updated patch that both includes this update to the comment,
and also the functionality to pre-pad files to 16Mb. This also seems
to have simplified the code, which is a nice bonus.

Here are the comments:

In open_walfile(), "zerobuf" needs to be free'd after use of it.

+ f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666);

We should use "S_IRUSR | S_IWUSR" instead of "0666" as a file access modes?

+		if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+		{
+			fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
+					progname, fn, strerror(errno));
+			close(f);
+			return -1;
+		}

When write() fails, we should delete the partial WAL file, like
XLogFileInit() does?
If not, subsequent pg_receivexlog always fails unless a user deletes
it manually.
Because open_walfile() always fails when it finds an existing partial WAL file.

When open_walfile() fails, pg_receivexlog exits without closing the connection.
I don't think this is good error handling. But this issue itself is
not what we're
trying to address now. So I think you can commit separately from current patch.

Regards,

--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center

#6Magnus Hagander
magnus@hagander.net
In reply to: Fujii Masao (#5)

On Tue, Nov 1, 2011 at 05:53, Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Nov 1, 2011 at 3:08 AM, Magnus Hagander <magnus@hagander.net> wrote:

On Fri, Oct 28, 2011 at 08:46, Fujii Masao <masao.fujii@gmail.com> wrote:

On Thu, Oct 27, 2011 at 11:14 PM, Magnus Hagander <magnus@hagander.net> wrote:

Here's a version that does this. Turns out this requires a lot less
code than what was previously in there, which is always nice.

We still need to solve the other part which is how to deal with the
partial files on restore. But this is definitely a cleaner way from a
pure pg_receivexlog perspective.

Comments/reviews?

Looks good.

Minor comment:
the source code comment of FindStreamingStart() seems to need to be updated.

Here's an updated patch that both includes this update to the comment,
and also the functionality to pre-pad files to 16Mb. This also seems
to have simplified the code, which is a nice bonus.

Here are the comments:

In open_walfile(), "zerobuf" needs to be free'd after use of it.

Ooops, yes.

+       f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, 0666);

We should use "S_IRUSR | S_IWUSR" instead of "0666" as a file access modes?

Agreed, changed.

+               if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+               {
+                       fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
+                                       progname, fn, strerror(errno));
+                       close(f);
+                       return -1;
+               }

When write() fails, we should delete the partial WAL file, like
XLogFileInit() does?

Yes, that's probably a good idae. Added a simple unlink() call
directly after the close().

If not, subsequent pg_receivexlog always fails unless a user deletes
it manually.
Because open_walfile() always fails when it finds an existing partial WAL file.

When open_walfile() fails, pg_receivexlog exits without closing the connection.
I don't think this is good error handling. But this issue itself is
not what we're
trying to address now. So I think you can commit separately from current patch.

Wow, when looking into that, there was a nice bug in open_walfile -
when the file failed to open, it would write that error message but
not return - then proceed to write a second error message from fstat.
Oops.

Anyway - yes, the return value of ReceiveXLogStream isn't checked at
all. That's certainly not very nice. I'll go fix that too.

I'll apply the patch with the fixes you've mentioned above. Please
check master again in a few minutes. Thanks!

--
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/