Refectoring of receivelog.c
I was working on adding the tar streaming functionality we talked about at
the developer meeting to pg_basebackup, and rapidly ran across the issue
that Andres has been complaining about for a while. The code in
receivelog.c just passes an insane number of parameters around. Adding or
changing even a small thing ends up touching a huge number of places.
Here's an attempt to refactor the code to instead pass around a control
structure. I think it's a definite win already now, and we can't just keep
adding new functionality on top of the current one.
I'll proceed to work on the actual functionality I was working on to go on
top of this separately, but would appreciate a review of this part
independently. It's mostly mechanical, but there may definitely be mistakes
- or thinkos in the whole idea...
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Attachments:
basebackup.patchtext/x-patch; charset=US-ASCII; name=basebackup.patchDownload
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 372,381 **** typedef struct
static int
LogStreamerMain(logstreamer_param *param)
{
! if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
! param->sysidentifier, param->xlogdir,
! reached_end_position, standby_message_timeout,
! NULL, false, true))
/*
* Any errors will already have been reported in the function process,
--- 372,391 ----
static int
LogStreamerMain(logstreamer_param *param)
{
! StreamCtl stream;
!
! MemSet(&stream, sizeof(stream), 0);
! stream.startpos = param->startptr;
! stream.timeline = param->timeline;
! stream.sysidentifier = param->sysidentifier;
! stream.stream_stop = reached_end_position;
! stream.standby_message_timeout = standby_message_timeout;
! stream.synchronous = false;
! stream.mark_done = true;
! stream.basedir = param->xlogdir;
! stream.partial_suffix = NULL;
!
! if (!ReceiveXlogStream(param->bgconn, &stream))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 276,285 **** FindStreamingStart(uint32 *tli)
static void
StreamLog(void)
{
! XLogRecPtr startpos,
! serverpos;
! TimeLineID starttli,
! servertli;
/*
* Connect in replication mode to the server
--- 276,286 ----
static void
StreamLog(void)
{
! XLogRecPtr serverpos;
! TimeLineID servertli;
! StreamCtl stream;
!
! MemSet(&stream, 0, sizeof(stream));
/*
* Connect in replication mode to the server
***************
*** 311,327 **** StreamLog(void)
/*
* Figure out where to start streaming.
*/
! startpos = FindStreamingStart(&starttli);
! if (startpos == InvalidXLogRecPtr)
{
! startpos = serverpos;
! starttli = servertli;
}
/*
* Always start streaming at the beginning of a segment
*/
! startpos -= startpos % XLOG_SEG_SIZE;
/*
* Start the replication
--- 312,328 ----
/*
* Figure out where to start streaming.
*/
! stream.startpos = FindStreamingStart(&stream.timeline);
! if (stream.startpos == InvalidXLogRecPtr)
{
! stream.startpos = serverpos;
! stream.timeline = servertli;
}
/*
* Always start streaming at the beginning of a segment
*/
! stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
/*
* Start the replication
***************
*** 329,340 **** StreamLog(void)
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
! progname, (uint32) (startpos >> 32), (uint32) startpos,
! starttli);
! ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial",
! synchronous, false);
PQfinish(conn);
conn = NULL;
--- 330,346 ----
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
! progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
! stream.timeline);
!
! stream.stream_stop = stop_streaming;
! stream.standby_message_timeout = standby_message_timeout;
! stream.synchronous = synchronous;
! stream.mark_done = false;
! stream.basedir = basedir;
! stream.partial_suffix = ".partial";
! ReceiveXlogStream(conn, &stream);
PQfinish(conn);
conn = NULL;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 33,59 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static bool still_sending = true; /* feedback still needs to be sent? */
! static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
! uint32 timeline, char *basedir,
! stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos,
! bool synchronous, bool mark_done);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status);
! static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr *blockpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, bool mark_done);
! static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
! XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos, bool mark_done);
! static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
! uint32 timeline, char *basedir,
! stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos,
! bool mark_done);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status);
--- 33,50 ----
static bool still_sending = true; /* feedback still needs to be sent? */
! static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
! XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status);
! static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
! XLogRecPtr *blockpos);
! static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
! XLogRecPtr blockpos, XLogRecPtr *stoppos);
! static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
! XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status);
***************
*** 99,106 **** mark_file_as_archived(const char *basedir, const char *fname)
* partial_suffix) is stored in current_walfile_name.
*/
static bool
! open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
! char *partial_suffix)
{
int f;
char fn[MAXPGPATH];
--- 90,96 ----
* partial_suffix) is stored in current_walfile_name.
*/
static bool
! open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
{
int f;
char fn[MAXPGPATH];
***************
*** 110,119 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
! XLogFileName(current_walfile_name, timeline, segno);
! snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
! partial_suffix ? partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
--- 100,109 ----
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
! XLogFileName(current_walfile_name, stream->timeline, segno);
! snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
! stream->partial_suffix ? stream->partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
***************
*** 185,191 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
{
off_t currpos;
--- 175,181 ----
* and returns false, otherwise returns true.
*/
static bool
! close_walfile(StreamCtl *stream, XLogRecPtr pos)
{
off_t currpos;
***************
*** 220,232 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
/*
* If we finished writing a .partial file, rename it into place.
*/
! if (currpos == XLOG_SEG_SIZE && partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
! snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
! snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
--- 210,222 ----
/*
* If we finished writing a .partial file, rename it into place.
*/
! if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
! snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
! snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
***************
*** 234,243 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
return false;
}
}
! else if (partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
! progname, current_walfile_name, partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
--- 224,233 ----
return false;
}
}
! else if (stream->partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
! progname, current_walfile_name, stream->partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
***************
*** 245,254 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
! if (currpos == XLOG_SEG_SIZE && mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(basedir, current_walfile_name))
return false;
}
--- 235,244 ----
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
! if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(stream->basedir, current_walfile_name))
return false;
}
***************
*** 261,267 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* Check if a timeline history file exists.
*/
static bool
! existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
{
char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
--- 251,257 ----
* Check if a timeline history file exists.
*/
static bool
! existsTimeLineHistoryFile(StreamCtl *stream)
{
char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
***************
*** 271,282 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
* Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it.
*/
! if (tli == 1)
return true;
! TLHistoryFileName(histfname, tli);
! snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
--- 261,272 ----
* Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it.
*/
! if (stream->timeline == 1)
return true;
! TLHistoryFileName(histfname, stream->timeline);
! snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
***************
*** 294,301 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
}
static bool
! writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
! char *content, bool mark_done)
{
int size = strlen(content);
char path[MAXPGPATH];
--- 284,290 ----
}
static bool
! writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
{
int size = strlen(content);
char path[MAXPGPATH];
***************
*** 307,321 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
* Check that the server's idea of how timeline history files should be
* named matches ours.
*/
! TLHistoryFileName(histfname, tli);
if (strcmp(histfname, filename) != 0)
{
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
! progname, tli, filename);
return false;
}
! snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
/*
* Write into a temp file name.
--- 296,310 ----
* Check that the server's idea of how timeline history files should be
* named matches ours.
*/
! TLHistoryFileName(histfname, stream->timeline);
if (strcmp(histfname, filename) != 0)
{
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
! progname, stream->timeline, filename);
return false;
}
! snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
/*
* Write into a temp file name.
***************
*** 375,384 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
}
/* Maintain archive_status, check close_walfile() for details. */
! if (mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(basedir, histfname))
return false;
}
--- 364,373 ----
}
/* Maintain archive_status, check close_walfile() for details. */
! if (stream->mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(stream->basedir, histfname))
return false;
}
***************
*** 498,508 **** CheckServerVersionForStreaming(PGconn *conn)
* Note: The log position *must* be at a log segment start!
*/
bool
! ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
! char *sysidentifier, char *basedir,
! stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix,
! bool synchronous, bool mark_done)
{
char query[128];
char slotcmd[128];
--- 487,493 ----
* Note: The log position *must* be at a log segment start!
*/
bool
! ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
char query[128];
char slotcmd[128];
***************
*** 539,545 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
slotcmd[0] = 0;
}
! if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
--- 524,530 ----
slotcmd[0] = 0;
}
! if (stream->sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
***************
*** 559,565 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
! if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
--- 544,550 ----
PQclear(res);
return false;
}
! if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
***************
*** 567,577 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
! if (timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
! progname, timeline);
PQclear(res);
return false;
}
--- 552,562 ----
PQclear(res);
return false;
}
! if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
! progname, stream->timeline);
PQclear(res);
return false;
}
***************
*** 582,588 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* initialize flush position to starting point, it's the caller's
* responsibility that that's sane.
*/
! lastFlushPosition = startpos;
while (1)
{
--- 567,573 ----
* initialize flush position to starting point, it's the caller's
* responsibility that that's sane.
*/
! lastFlushPosition = stream->startpos;
while (1)
{
***************
*** 590,598 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Fetch the timeline history file for this timeline, if we don't have
* it already.
*/
! if (!existsTimeLineHistoryFile(basedir, timeline))
{
! snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
--- 575,583 ----
* Fetch the timeline history file for this timeline, if we don't have
* it already.
*/
! if (!existsTimeLineHistoryFile(stream))
{
! snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
***************
*** 615,624 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/* Write the history file to disk */
! writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
! PQgetvalue(res, 0, 1),
! mark_done);
PQclear(res);
}
--- 600,608 ----
}
/* Write the history file to disk */
! writeTimeLineHistoryFile(stream,
PQgetvalue(res, 0, 0),
! PQgetvalue(res, 0, 1));
PQclear(res);
}
***************
*** 627,640 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Before we start streaming from the requested location, check if the
* callback tells us to stop here.
*/
! if (stream_stop(startpos, timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
! (uint32) (startpos >> 32), (uint32) startpos,
! timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
--- 611,624 ----
* Before we start streaming from the requested location, check if the
* callback tells us to stop here.
*/
! if (stream->stream_stop(stream->startpos, stream->timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
! (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
! stream->timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
***************
*** 646,654 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
/* Stream the WAL */
! res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
! standby_message_timeout, partial_suffix,
! &stoppos, synchronous, mark_done);
if (res == NULL)
goto error;
--- 630,636 ----
PQclear(res);
/* Stream the WAL */
! res = HandleCopyStream(conn, stream, &stoppos);
if (res == NULL)
goto error;
***************
*** 676,701 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
uint32 newtimeline;
bool parsed;
! parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
PQclear(res);
if (!parsed)
goto error;
/* Sanity check the values the server gave us */
! if (newtimeline <= timeline)
{
fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
! progname, newtimeline, timeline);
goto error;
}
! if (startpos > stoppos)
{
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
! timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
! newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
goto error;
}
--- 658,683 ----
uint32 newtimeline;
bool parsed;
! parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
PQclear(res);
if (!parsed)
goto error;
/* Sanity check the values the server gave us */
! if (newtimeline <= stream->timeline)
{
fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
! progname, newtimeline, stream->timeline);
goto error;
}
! if (stream->startpos > stoppos)
{
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
! stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
! newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
goto error;
}
***************
*** 715,722 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Loop back to start streaming from the new timeline. Always
* start streaming at the beginning of a segment.
*/
! timeline = newtimeline;
! startpos = startpos - (startpos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
--- 697,704 ----
* Loop back to start streaming from the new timeline. Always
* start streaming at the beginning of a segment.
*/
! stream->timeline = newtimeline;
! stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
***************
*** 729,735 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
! if (stream_stop(stoppos, timeline, false))
return true;
else
{
--- 711,717 ----
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
! if (stream->stream_stop(stoppos, stream->timeline, false))
return true;
else
{
***************
*** 810,823 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
* On any other sort of error, returns NULL.
*/
static PGresult *
! HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, bool synchronous, bool mark_done)
{
char *copybuf = NULL;
int64 last_status = -1;
! XLogRecPtr blockpos = startpos;
still_sending = true;
--- 792,803 ----
* On any other sort of error, returns NULL.
*/
static PGresult *
! HandleCopyStream(PGconn *conn, StreamCtl *stream,
! XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
! XLogRecPtr blockpos = stream->startpos;
still_sending = true;
***************
*** 830,838 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming, or abort at this point.
*/
! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos,
! mark_done))
goto error;
now = feGetCurrentTimestamp();
--- 810,816 ----
/*
* Check if we should continue streaming, or abort at this point.
*/
! if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
now = feGetCurrentTimestamp();
***************
*** 841,847 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
! if (synchronous && lastFlushPosition < blockpos && walfile != -1)
{
if (fsync(walfile) != 0)
{
--- 819,825 ----
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
! if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
{
if (fsync(walfile) != 0)
{
***************
*** 863,871 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Potentially send a status message to the master
*/
! if (still_sending && standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
! standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
--- 841,849 ----
/*
* Potentially send a status message to the master
*/
! if (still_sending && stream->standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
! stream->standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
***************
*** 876,882 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Calculate how long send/receive loops should sleep
*/
! sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
last_status);
r = CopyStreamReceive(conn, sleeptime, ©buf);
--- 854,860 ----
/*
* Calculate how long send/receive loops should sleep
*/
! sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status);
r = CopyStreamReceive(conn, sleeptime, ©buf);
***************
*** 886,894 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error;
if (r == -2)
{
! PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! basedir, partial_suffix,
! stoppos, mark_done);
if (res == NULL)
goto error;
--- 864,870 ----
goto error;
if (r == -2)
{
! PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
if (res == NULL)
goto error;
***************
*** 905,922 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
else if (copybuf[0] == 'w')
{
! if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! timeline, basedir, stream_stop,
! partial_suffix, mark_done))
goto error;
/*
* Check if we should continue streaming, or abort at this
* point.
*/
! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos,
! mark_done))
goto error;
}
else
--- 881,894 ----
}
else if (copybuf[0] == 'w')
{
! if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
/*
* Check if we should continue streaming, or abort at this
* point.
*/
! if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
}
else
***************
*** 1113,1122 **** ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
* Process XLogData message.
*/
static bool
! ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr *blockpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, bool mark_done)
{
int xlogoff;
int bytes_left;
--- 1085,1092 ----
* Process XLogData message.
*/
static bool
! ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
! XLogRecPtr *blockpos)
{
int xlogoff;
int bytes_left;
***************
*** 1196,1203 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
if (walfile == -1)
{
! if (!open_walfile(*blockpos, timeline,
! basedir, partial_suffix))
{
/* Error logged by open_walfile */
return false;
--- 1166,1172 ----
if (walfile == -1)
{
! if (!open_walfile(stream, *blockpos))
{
/* Error logged by open_walfile */
return false;
***************
*** 1224,1236 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
/* Error message written in close_walfile() */
return false;
xlogoff = 0;
! if (still_sending && stream_stop(*blockpos, timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
--- 1193,1205 ----
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(stream, *blockpos))
/* Error message written in close_walfile() */
return false;
xlogoff = 0;
! if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
***************
*** 1252,1260 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
* Handle end of the copy stream.
*/
static PGresult *
! HandleEndOfCopyStream(PGconn *conn, char *copybuf,
! XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos, bool mark_done)
{
PGresult *res = PQgetResult(conn);
--- 1221,1228 ----
* Handle end of the copy stream.
*/
static PGresult *
! HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
! XLogRecPtr blockpos, XLogRecPtr *stoppos)
{
PGresult *res = PQgetResult(conn);
***************
*** 1265,1271 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/
if (still_sending)
{
! if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{
/* Error message written in close_walfile() */
PQclear(res);
--- 1233,1239 ----
*/
if (still_sending)
{
! if (!close_walfile(stream, blockpos))
{
/* Error message written in close_walfile() */
PQclear(res);
***************
*** 1295,1307 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
* Check if we should continue streaming, or abort at this point.
*/
static bool
! CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
{
! if (still_sending && stream_stop(blockpos, timeline, false))
{
! if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{
/* Potential error message is written by close_walfile */
return false;
--- 1263,1274 ----
* Check if we should continue streaming, or abort at this point.
*/
static bool
! CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
! XLogRecPtr *stoppos)
{
! if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
{
! if (!close_walfile(stream, blockpos))
{
/* Potential error message is written by close_walfile */
return false;
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 22,37 ****
*/
typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
extern bool CheckServerVersionForStreaming(PGconn *conn);
extern bool ReceiveXlogStream(PGconn *conn,
! XLogRecPtr startpos,
! uint32 timeline,
! char *sysidentifier,
! char *basedir,
! stream_stop_callback stream_stop,
! int standby_message_timeout,
! char *partial_suffix,
! bool synchronous,
! bool mark_done);
#endif /* RECEIVELOG_H */
--- 22,49 ----
*/
typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
+ /*
+ * Global parameters when receiving xlog stream
+ */
+ typedef struct
+ {
+ XLogRecPtr startpos;
+ TimeLineID timeline;
+ char *sysidentifier;
+ int standby_message_timeout;
+ bool synchronous;
+ bool mark_done;
+
+ stream_stop_callback stream_stop;
+
+ char *basedir;
+ char *partial_suffix;
+ } StreamCtl;
+
+
+
extern bool CheckServerVersionForStreaming(PGconn *conn);
extern bool ReceiveXlogStream(PGconn *conn,
! StreamCtl *stream);
#endif /* RECEIVELOG_H */
On 15 February 2016 at 04:48, Magnus Hagander <magnus@hagander.net> wrote:
I was working on adding the tar streaming functionality we talked about at
the developer meeting to pg_basebackup, and rapidly ran across the issue
that Andres has been complaining about for a while. The code in
receivelog.c just passes an insane number of parameters around. Adding or
changing even a small thing ends up touching a huge number of places.
Other than the lack of comments on the fields in StreamCtl to indicate
their functions I think this looks good.
I didn't find any mistakes, but I do admit my eyes started glazing over
after a bit.
I'd rather not have StreamCtl as a typedef of an anonymous struct if it's
exposed in a header though. It should have a name that can be used in
forward declarations etc.
After recently working with the XLogReader I can't emphasise enough how
important *useful* comments on the fields in these sorts of structs are -
the XLogReaderState has ReadRecPtr, readSegNo + readOff + readLen,
currRecPtr AND latestPagePtr. Which actually do have comments, just not
super helpful ones.
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Mon, Feb 15, 2016 at 7:15 AM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 15 February 2016 at 04:48, Magnus Hagander <magnus@hagander.net> wrote:
I was working on adding the tar streaming functionality we talked about
at the developer meeting to pg_basebackup, and rapidly ran across the issue
that Andres has been complaining about for a while. The code in
receivelog.c just passes an insane number of parameters around. Adding or
changing even a small thing ends up touching a huge number of places.Other than the lack of comments on the fields in StreamCtl to indicate
their functions I think this looks good.
I copied that lack of comments from the previous implementation :P
But yeah, I agree, it's probably a good idea to add them.
I didn't find any mistakes, but I do admit my eyes started glazing over
after a bit.I'd rather not have StreamCtl as a typedef of an anonymous struct if it's
exposed in a header though. It should have a name that can be used in
forward declarations etc.
It's not exactly uncommon with anonymous ones either in our code, but I see
no problem adding that.
Thanks!
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On 15 Feb 2016, at 14:46, Magnus Hagander <magnus@hagander.net> wrote:
On Mon, Feb 15, 2016 at 7:15 AM, Craig Ringer <craig@2ndquadrant.com <mailto:craig@2ndquadrant.com>> wrote:
On 15 February 2016 at 04:48, Magnus Hagander <magnus@hagander.net <mailto:magnus@hagander.net>> wrote:
I was working on adding the tar streaming functionality we talked about at the developer meeting to pg_basebackup, and rapidly ran across the issue that Andres has been complaining about for a while. The code in receivelog.c just passes an insane number of parameters around. Adding or changing even a small thing ends up touching a huge number of places.Other than the lack of comments on the fields in StreamCtl to indicate their functions I think this looks good.
I copied that lack of comments from the previous implementation :P
But yeah, I agree, it's probably a good idea to add them.
I didn't find any mistakes, but I do admit my eyes started glazing over after a bit.
I'd rather not have StreamCtl as a typedef of an anonymous struct if it's exposed in a header though. It should have a name that can be used in forward declarations etc.
It's not exactly uncommon with anonymous ones either in our code, but I see no problem adding that.
Short review of this patch:
The patch applies, and builds, cleanly on top of master as of today. No new
functionality is introduced and thus no new tests or doc patches etc are
applicable.
The main point of the patch is to improve readability and reduce the number of
parameters passed, goals which are reached. However, I agree with Craig that
comments on the struct fields should be added to improve readability even
further. The comment on ReceiveXlogStream() also now reads a bit strange since
it describes fields inside the StreamCtl without referencing StreamCtl at all.
For first time readers of the code it could perhaps be helpful with a brief
note that the discussed parameters are in StreamCtl to avoid potential
confusion.
Overall I think this patch is an improvement on the current code and consider
it ready for committer.
cheers ./daniel
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Mar 11, 2016 at 9:40 AM, Daniel Gustafsson <daniel@yesql.se> wrote:
On 15 Feb 2016, at 14:46, Magnus Hagander <magnus@hagander.net> wrote:
On Mon, Feb 15, 2016 at 7:15 AM, Craig Ringer <craig@2ndquadrant.com
<mailto:craig@2ndquadrant.com>> wrote:
On 15 February 2016 at 04:48, Magnus Hagander <magnus@hagander.net
<mailto:magnus@hagander.net>> wrote:
I was working on adding the tar streaming functionality we talked about
at the developer meeting to pg_basebackup, and rapidly ran across the issue
that Andres has been complaining about for a while. The code in
receivelog.c just passes an insane number of parameters around. Adding or
changing even a small thing ends up touching a huge number of places.Other than the lack of comments on the fields in StreamCtl to indicate
their functions I think this looks good.
I copied that lack of comments from the previous implementation :P
But yeah, I agree, it's probably a good idea to add them.
I didn't find any mistakes, but I do admit my eyes started glazing over
after a bit.
I'd rather not have StreamCtl as a typedef of an anonymous struct if
it's exposed in a header though. It should have a name that can be used in
forward declarations etc.It's not exactly uncommon with anonymous ones either in our code, but I
see no problem adding that.
Short review of this patch:
The patch applies, and builds, cleanly on top of master as of today. No
new
functionality is introduced and thus no new tests or doc patches etc are
applicable.The main point of the patch is to improve readability and reduce the
number of
parameters passed, goals which are reached. However, I agree with Craig
that
comments on the struct fields should be added to improve readability even
further. The comment on ReceiveXlogStream() also now reads a bit strange
since
it describes fields inside the StreamCtl without referencing StreamCtl at
all.
For first time readers of the code it could perhaps be helpful with a brief
note that the discussed parameters are in StreamCtl to avoid potential
confusion.Overall I think this patch is an improvement on the current code and
consider
it ready for committer.
Pushed with updated comments and a named stsruct.
Thanks!
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On 03/11/2016 11:15 AM, Magnus Hagander wrote:
...
Pushed with updated comments and a named stsruct.
Pretty sure this memset call in pg_basebackup.c is incorrect, as it
passes parameters in the wrong order:
MemSet(&stream, sizeof(stream), 0);
It seems benign, because we're setting all the fields explicitly, but
gcc is nagging about it.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sun, Mar 13, 2016 at 12:15 AM, Tomas Vondra <tomas.vondra@2ndquadrant.com
wrote:
On 03/11/2016 11:15 AM, Magnus Hagander wrote:
...
Pushed with updated comments and a named stsruct.
Pretty sure this memset call in pg_basebackup.c is incorrect, as it passes
parameters in the wrong order:MemSet(&stream, sizeof(stream), 0);
It seems benign, because we're setting all the fields explicitly, but gcc
is nagging about it.
Indeed,that's backwards. Interestingly enough I thought I did a c&p between
that and the one in pg_receivexlog.c, but clearly I did not. And my gcc did
not nag about it.
Fixed, thanks for the pointer!
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/