*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 78,84 **** static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
  static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
  static void BaseBackup(void);
  
! static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
  
  #ifdef HAVE_LIBZ
  static const char *
--- 78,84 ----
  static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
  static void BaseBackup(void);
  
! static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline);
  
  #ifdef HAVE_LIBZ
  static const char *
***************
*** 129,136 **** usage(void)
  
  
  /*
!  * Called in the background process whenever a complete segment of WAL
!  * has been received.
   * On Unix, we check to see if there is any data on our pipe
   * (which would mean we have a stop position), and if it is, check if
   * it is time to stop.
--- 129,137 ----
  
  
  /*
!  * Called in the background process every time data is received.
!  * Also called when the streaming stops to check whether
!  * the current log segment can be treated as a complete one.
   * On Unix, we check to see if there is any data on our pipe
   * (which would mean we have a stop position), and if it is, check if
   * it is time to stop.
***************
*** 138,144 **** usage(void)
   * time to stop.
   */
  static bool
! segment_callback(XLogRecPtr segendpos, uint32 timeline)
  {
  	if (!has_xlogendptr)
  	{
--- 139,145 ----
   * time to stop.
   */
  static bool
! reached_end_position(XLogRecPtr segendpos, uint32 timeline)
  {
  	if (!has_xlogendptr)
  	{
***************
*** 231,237 **** LogStreamerMain(logstreamer_param * param)
  {
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
! 						   segment_callback, NULL, standby_message_timeout))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 232,238 ----
  {
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
! 						   reached_end_position, reached_end_position, standby_message_timeout))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 71,77 **** usage(void)
  static bool
  segment_callback(XLogRecPtr segendpos, uint32 timeline)
  {
! 	if (verbose)
  		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
  				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
  
--- 71,77 ----
  static bool
  segment_callback(XLogRecPtr segendpos, uint32 timeline)
  {
! 	if (verbose && segendpos.xrecoff % XLOG_SEG_SIZE == 0)
  		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
  				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
  
***************
*** 82,88 **** segment_callback(XLogRecPtr segendpos, uint32 timeline)
  }
  
  static bool
! continue_streaming(void)
  {
  	if (time_to_abort)
  	{
--- 82,88 ----
  }
  
  static bool
! continue_streaming(XLogRecPtr segendpos, uint32 timeline)
  {
  	if (time_to_abort)
  	{
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 113,120 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
  	return f;
  }
  
  static bool
! close_walfile(int walfile, char *basedir, char *walname)
  {
  	off_t		currpos = lseek(walfile, 0, SEEK_CUR);
  
--- 113,126 ----
  	return f;
  }
  
+ /*
+  * Close the current WAL file, and rename it to the correct filename if it's complete.
+  *
+  * If segment_complete is true, rename the current WAL file even if we've not
+  * completed writing the whole segment.
+  */
  static bool
! close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
  {
  	off_t		currpos = lseek(walfile, 0, SEEK_CUR);
  
***************
*** 141,149 **** close_walfile(int walfile, char *basedir, char *walname)
  
  	/*
  	 * Rename the .partial file only if we've completed writing the
! 	 * whole segment.
  	 */
! 	if (currpos == XLOG_SEG_SIZE)
  	{
  		char		oldfn[MAXPGPATH];
  		char		newfn[MAXPGPATH];
--- 147,155 ----
  
  	/*
  	 * Rename the .partial file only if we've completed writing the
! 	 * whole segment or segment_complete is true.
  	 */
! 	if (currpos == XLOG_SEG_SIZE || segment_complete)
  	{
  		char		oldfn[MAXPGPATH];
  		char		newfn[MAXPGPATH];
***************
*** 206,211 **** localGetCurrentTimestamp(void)
--- 212,221 ----
   * return. As long as they return false, streaming will continue
   * indefinitely.
   *
+  * The segment_finish callback will also be called when the
+  * streaming will stop to check whether the current log segment
+  * can be treated as a complete one.
+  *
   * standby_message_timeout controls how often we send a message
   * back to the master letting it know our progress, in seconds.
   * This message will only contain the write location, and never
***************
*** 288,298 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  		/*
  		 * Check if we should continue streaming, or abort at this point.
  		 */
! 		if (stream_continue && stream_continue())
  		{
  			if (walfile != -1)
  				/* Potential error message is written by close_walfile */
! 				return close_walfile(walfile, basedir, current_walfile_name);
  			return true;
  		}
  
--- 298,310 ----
  		/*
  		 * Check if we should continue streaming, or abort at this point.
  		 */
! 		if (stream_continue && stream_continue(blockpos, timeline))
  		{
  			if (walfile != -1)
  				/* Potential error message is written by close_walfile */
! 				return close_walfile(walfile, basedir, current_walfile_name,
! 									 segment_finish != NULL ?
! 									 segment_finish(blockpos, timeline) : false);
  			return true;
  		}
  
***************
*** 486,492 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				if (!close_walfile(walfile, basedir, current_walfile_name))
  					/* Error message written in close_walfile() */
  					return false;
  
--- 498,504 ----
  			/* Did we reach the end of a WAL segment? */
  			if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
  			{
! 				if (!close_walfile(walfile, basedir, current_walfile_name, false))
  					/* Error message written in close_walfile() */
  					return false;
  
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 2,8 ****
  
  /*
   * Called whenever a segment is finished, return true to stop
!  * the streaming at this point.
   */
  typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
  
--- 2,10 ----
  
  /*
   * Called whenever a segment is finished, return true to stop
!  * the streaming at this point. Also called when the streaming
!  * stops to check whether the current log segment can be
!  * treated as a complete one.
   */
  typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
  
***************
*** 10,16 **** typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
   * Called before trying to read more data. Return true to stop
   * the streaming at this point.
   */
! typedef bool (*stream_continue_callback)(void);
  
  extern bool ReceiveXlogStream(PGconn *conn,
  							  XLogRecPtr startpos,
--- 12,18 ----
   * Called before trying to read more data. Return true to stop
   * the streaming at this point.
   */
! typedef bool (*stream_continue_callback)(XLogRecPtr segendpos, uint32 timeline);
  
  extern bool ReceiveXlogStream(PGconn *conn,
  							  XLogRecPtr startpos,
