2pc leaks fds
Hi,
Using 2PC with master very quickly leads to:
2020-04-05 19:42:18.368 PDT [2298126][5/2009:0] LOG: out of file descriptors: Too many open files; release and retry
2020-04-05 19:42:18.368 PDT [2298126][5/2009:0] STATEMENT: COMMIT PREPARED 'ptx_2';
This started with:
commit 0dc8ead46363fec6f621a12c7e1f889ba73b55a9 (HEAD -> master)
Author: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: 2019-11-25 15:04:54 -0300
Refactor WAL file-reading code into WALRead()
I found this while trying to benchmark the effect of my snapshot changes
on 2pc. I just used the attached pgbench file.
andres@awork3:~/build/postgres/dev-assert/vpath$ pgbench -n -s 500 -c 4 -j 4 -T 100000 -P1 -f ~/tmp/pgbench-write-2pc.sql
progress: 1.0 s, 3723.8 tps, lat 1.068 ms stddev 0.305
client 2 script 0 aborted in command 8 query 0: ERROR: could not seek to end of file "base/14036/16396": Too many open files
client 1 script 0 aborted in command 8 query 0: ERROR: could not seek to end of file "base/14036/16396": Too many open files
client 3 script 0 aborted in command 8 query 0: ERROR: could not seek to end of file "base/14036/16396": Too many open files
client 0 script 0 aborted in command 8 query 0: ERROR: could not seek to end of file "base/14036/16396": Too many open files
transaction type: /home/andres/tmp/pgbench-write-2pc.sql
I've not yet reviewed the change sufficiently to pinpoint the issue.
It's a bit sad that nobody has hit this in the last few months :(.
Greetings,
Andres Freund
Attachments:
On Sun, Apr 05, 2020 at 07:56:51PM -0700, Andres Freund wrote:
I found this while trying to benchmark the effect of my snapshot changes
on 2pc. I just used the attached pgbench file.I've not yet reviewed the change sufficiently to pinpoint the issue.
Indeed. It takes seconds to show up.
It's a bit sad that nobody has hit this in the last few months :(.
2PC shines with the code of xlogreader.c in this case because it keeps
opening and closing XLogReaderState for a short amount of time. So it
is not surprising to me to see this error only months after the fact
because recovery or pg_waldump just use one XLogReaderState. From
what I can see, the error is that the code only bothers closing
WALOpenSegment->seg when switching to a new segment, but we need also
to close it when finishing the business in XLogReaderFree().
I am adding an open item, and attached is a patch to take care of the
problem. Thoughts?
--
Michael
Attachments:
xlogreader-leak.patchtext/x-diff; charset=us-asciiDownload
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f3fea5132f..7e25e2050a 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -144,6 +144,9 @@ XLogReaderFree(XLogReaderState *state)
if (state->main_data)
pfree(state->main_data);
+ if (state->seg.ws_file >= 0)
+ close(state->seg.ws_file);
+
pfree(state->errormsg_buf);
if (state->readRecordBuf)
pfree(state->readRecordBuf);
Hi,
On 2020-04-06 14:26:48 +0900, Michael Paquier wrote:
2PC shines with the code of xlogreader.c in this case because it keeps
opening and closing XLogReaderState for a short amount of time. So it
is not surprising to me to see this error only months after the fact
because recovery or pg_waldump just use one XLogReaderState.
Well, it doesn't exactly signal that people (including me, up to just
now) are testing their changes all that carefully...
From what I can see, the error is that the code only bothers closing
WALOpenSegment->seg when switching to a new segment, but we need also
to close it when finishing the business in XLogReaderFree().
Yea, I came to the same conclusion and locally fixed it the same way
(except having the close a bit earlier in XLogReaderFree()).
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index f3fea5132f..7e25e2050a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -144,6 +144,9 @@ XLogReaderFree(XLogReaderState *state) if (state->main_data) pfree(state->main_data);+ if (state->seg.ws_file >= 0) + close(state->seg.ws_file); + pfree(state->errormsg_buf); if (state->readRecordBuf) pfree(state->readRecordBuf);
But I'm not sure it's quite the right idea. I'm not sure I fully
understand the design of 0dc8ead46, but it looks to me like it's
intended to allow users of the interface to have different ways of
opening files. If we just close() the fd that'd be a bit more limited.
OTOH, I think all but one (XLogPageRead()) of the current users of
XLogReader use WALRead(), which also close()s the fd (before calling the
WALSegmentOpen callback).
The XLogReader code flow has gotten quite complicated
:(. XLogReaderReadRecord()-> state->read_page() ->
logical_read_xlog_page etc -> WALRead() -> wal_segment_open callback etc.
There's been a fair bit of change, making the interface more generic /
powerful / reducing duplication, but not a lot of added / adapted
comments in the header...
Greetings,
Andres Freund
Andres Freund <andres@anarazel.de> wrote:
From what I can see, the error is that the code only bothers closing
WALOpenSegment->seg when switching to a new segment, but we need also
to close it when finishing the business in XLogReaderFree().Yea, I came to the same conclusion and locally fixed it the same way
(except having the close a bit earlier in XLogReaderFree()).
It's still not quite clear to me why the problem starts to appear after
0dc8ead46. This patch does not remove any close() call from XLogReaderFree().
But I'm not sure it's quite the right idea. I'm not sure I fully
understand the design of 0dc8ead46, but it looks to me like it's
intended to allow users of the interface to have different ways of
opening files. If we just close() the fd that'd be a bit more limited.
It should have allowed users to have different ways to *locate the segment*
file. The WALSegmentOpen callback could actually return file path instead of
the file descriptor and let WALRead() perform the opening/closing, but then
the WALRead function would need to be aware whether it is executing in backend
or in frontend (so it can use the correct function to open/close the file).
I was aware of the problem that the correct function should be used to open
the file and that's why this comment was added (although "mandatory" would be
more suitable than "preferred"):
* BasicOpenFile() is the preferred way to open the segment file in backend
* code, whereas open(2) should be used in frontend.
*/
typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p);
--
Antonin Houska
Web: https://www.cybertec-postgresql.com
Hi,
On 2020-04-06 09:12:32 +0200, Antonin Houska wrote:
Andres Freund <andres@anarazel.de> wrote:
From what I can see, the error is that the code only bothers closing
WALOpenSegment->seg when switching to a new segment, but we need also
to close it when finishing the business in XLogReaderFree().Yea, I came to the same conclusion and locally fixed it the same way
(except having the close a bit earlier in XLogReaderFree()).It's still not quite clear to me why the problem starts to appear after
0dc8ead46. This patch does not remove any close() call from XLogReaderFree().
Before that change the file was also kind of leaked, but would use the
same static variable to store the fd and thus close it.
Greetings,
Andres Freund
Hi,
I pushed a fix. While it might not be the best medium/long term fix, it
unbreaks 2PC. Perhaps we should add an open item to track whether we
want to fix this differently?
On 2020-04-06 09:12:32 +0200, Antonin Houska wrote:
Andres Freund <andres@anarazel.de> wrote:
It should have allowed users to have different ways to *locate the segment*
file. The WALSegmentOpen callback could actually return file path instead of
the file descriptor and let WALRead() perform the opening/closing, but then
the WALRead function would need to be aware whether it is executing in backend
or in frontend (so it can use the correct function to open/close the file).I was aware of the problem that the correct function should be used to open
the file and that's why this comment was added (although "mandatory" would be
more suitable than "preferred"):* BasicOpenFile() is the preferred way to open the segment file in backend
* code, whereas open(2) should be used in frontend.
*/
typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p);
I don't think that BasicOpenFile() really solves anything here? If
anything it *exascerbates* the problem, because it will trigger all of
the "virtual file descriptors" for already opened Files to close() the
underlying OS FDs. So not even a fully cached table can be seqscanned,
because that tries to check the file size...
Greetings,
Andres Freund
On Tue, Apr 07, 2020 at 05:12:49PM -0700, Andres Freund wrote:
I pushed a fix. While it might not be the best medium/long term fix, it
unbreaks 2PC. Perhaps we should add an open item to track whether we
want to fix this differently?
Sounds fine to me. I have updated the open item that we have now by
adding a comment that the leak has been fixed by 91c4054, but that
we should revisit the refactoring.
--
Michael
Antonin Houska <ah@cybertec.at> wrote:
Andres Freund <andres@anarazel.de> wrote:
But I'm not sure it's quite the right idea. I'm not sure I fully
understand the design of 0dc8ead46, but it looks to me like it's
intended to allow users of the interface to have different ways of
opening files. If we just close() the fd that'd be a bit more limited.It should have allowed users to have different ways to *locate the segment*
file. The WALSegmentOpen callback could actually return file path instead of
the file descriptor and let WALRead() perform the opening/closing, but then
the WALRead function would need to be aware whether it is executing in backend
or in frontend (so it can use the correct function to open/close the file).
Well, #ifdef FRONTEND can be used to distinguish the caller of
WALRead(). However now that I tried to adjust the API, I see that
pg_waldump.c:WALDumpOpenSegment uses specific logic to open the file. So if
the callback only returned the file name, there would be no suitable place for
the things that WALDumpOpenSegment does.
--
Antonin Houska
Web: https://www.cybertec-postgresql.com
Andres Freund <andres@anarazel.de> wrote:
On 2020-04-06 09:12:32 +0200, Antonin Houska wrote:
Andres Freund <andres@anarazel.de> wrote:
It should have allowed users to have different ways to *locate the segment*
file. The WALSegmentOpen callback could actually return file path instead of
the file descriptor and let WALRead() perform the opening/closing, but then
the WALRead function would need to be aware whether it is executing in backend
or in frontend (so it can use the correct function to open/close the file).I was aware of the problem that the correct function should be used to open
the file and that's why this comment was added (although "mandatory" would be
more suitable than "preferred"):* BasicOpenFile() is the preferred way to open the segment file in backend
* code, whereas open(2) should be used in frontend.
*/
typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p);I don't think that BasicOpenFile() really solves anything here? If
anything it *exascerbates* the problem, because it will trigger all of
the "virtual file descriptors" for already opened Files to close() the
underlying OS FDs. So not even a fully cached table can be seqscanned,
because that tries to check the file size...
Specifically for 2PC, isn't it better to close some OS-level FD of an
unrelated table scan and then succeed than to ERROR immediately? Anyway,
0dc8ead46 hasn't changed this.
I at least admit that the comment should not recommend particular function,
and that WALRead() should call the appropriate function to close the file,
rather than always calling close().
Can we just pass the existing FD to the callback as an additional argument,
and let the callback close it?
--
Antonin Houska
Web: https://www.cybertec-postgresql.com
I have tested with and without the commit from Andres using the pgbench
script (below) provided in the initial email.
pgbench -n -s 500 -c 4 -j 4 -T 100000 -P1 -f pgbench-write-2pc.sql
I am not getting the leak anymore, it seems to be holding up pretty well.
On Wed, Apr 8, 2020 at 12:59 PM Antonin Houska <ah@cybertec.at> wrote:
Andres Freund <andres@anarazel.de> wrote:
On 2020-04-06 09:12:32 +0200, Antonin Houska wrote:
Andres Freund <andres@anarazel.de> wrote:
It should have allowed users to have different ways to *locate thesegment*
file. The WALSegmentOpen callback could actually return file path
instead of
the file descriptor and let WALRead() perform the opening/closing, but
then
the WALRead function would need to be aware whether it is executing in
backend
or in frontend (so it can use the correct function to open/close the
file).
I was aware of the problem that the correct function should be used to
open
the file and that's why this comment was added (although "mandatory"
would be
more suitable than "preferred"):
* BasicOpenFile() is the preferred way to open the segment file in
backend
* code, whereas open(2) should be used in frontend.
*/
typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext*segcxt,
TimeLineID
*tli_p);
I don't think that BasicOpenFile() really solves anything here? If
anything it *exascerbates* the problem, because it will trigger all of
the "virtual file descriptors" for already opened Files to close() the
underlying OS FDs. So not even a fully cached table can be seqscanned,
because that tries to check the file size...Specifically for 2PC, isn't it better to close some OS-level FD of an
unrelated table scan and then succeed than to ERROR immediately? Anyway,
0dc8ead46 hasn't changed this.I at least admit that the comment should not recommend particular function,
and that WALRead() should call the appropriate function to close the file,
rather than always calling close().Can we just pass the existing FD to the callback as an additional argument,
and let the callback close it?--
Antonin Houska
Web: https://www.cybertec-postgresql.com
--
Highgo Software (Canada/China/Pakistan)
URL : http://www.highgo.ca
ADDR: 10318 WHALLEY BLVD, Surrey, BC
EMAIL: mailto: ahsan.hadi@highgo.ca
On 2020-Apr-08, Antonin Houska wrote:
Specifically for 2PC, isn't it better to close some OS-level FD of an
unrelated table scan and then succeed than to ERROR immediately? Anyway,
0dc8ead46 hasn't changed this.
I think for full generality of the interface, we pass a "close" callback
in addition to the "open" callback. But if we were to pass it only for
WALRead, then there would be no way to call it during XLogReaderFree.
I think the fix Andres applied is okay as far as it goes, but for the
long term we may want to change the interface even further -- maybe by
having these functions be part of the XLogReader state struct. I have
this code paged out of my head ATM, but maybe tomorrow I can give it a
little think.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Concretely, I propose to have a new struct like
typedef struct xlogReaderFuncs
{
XLogPageReadCB read_page;
XLogSegmentOpenCB open_segment;
XLogSegmentCloseCB open_segment;
} xlogReaderFuncs;
#define XLOGREADER_FUNCS(...) &(xlogReaderFuncs){__VA_ARGS__}
and then invoke it something like
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
XLOGREADER_FUNCS(.readpage = &read_local_xlog_page,
.opensegment = &wal_segment_open),
.closesegment = &wal_segment_close),
NULL);
(with suitable definitions for XLogSegmentOpenCB etc) so that the
support functions are all available at the xlogreader level, instead of
"open" being buried at the read-page level. Any additional support
functions can be added easily.
This would give xlogreader a simpler interface.
If people like this, I could make this change for pg13 and avoid
changing the API again in pg14.
Thougths?
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 2020-04-22 13:57:54 -0400, Alvaro Herrera wrote:
Concretely, I propose to have a new struct like
typedef struct xlogReaderFuncs
{
XLogPageReadCB read_page;
XLogSegmentOpenCB open_segment;
XLogSegmentCloseCB open_segment;
} xlogReaderFuncs;#define XLOGREADER_FUNCS(...) &(xlogReaderFuncs){__VA_ARGS__}
Not sure I quite see the point of that helper macro...
and then invoke it something like
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
XLOGREADER_FUNCS(.readpage = &read_local_xlog_page,
.opensegment = &wal_segment_open),
.closesegment = &wal_segment_close),
NULL);(with suitable definitions for XLogSegmentOpenCB etc) so that the
support functions are all available at the xlogreader level, instead of
"open" being buried at the read-page level. Any additional support
functions can be added easily.This would give xlogreader a simpler interface.
My first reaction was that this looks like it'd make it harder to read
WAL from memory. But that's not really a problem, since
opensegment/closesegment don't have to do anything.
I think reducing the levels of indirection around xlogreader would be a
good idea. The control flow currently is *really* complicated: With the
page read callback at the xlogreader level, as well as separate
callbacks set from within the page read callback and passed to
WALRead(). And even though the WALOpenSegment, WALSegmentContext are
really private to WALRead, not XLogReader as a whole, they are members
of XLogReaderState. I think the PG13 changes made it considerably
harder to understand xlogreader / xlogreader using code.
Note that the WALOpenSegment callback currently does not have access to
XLogReaderState->private_data, which I think is a pretty significant new
restriction. Afaict it's not nicely possible anymore to have two
xlogreaders inside the the same process that read from different data
directories or other cases where opening the segment requires context
information.
If people like this, I could make this change for pg13 and avoid
changing the API again in pg14.
I'm in favor of doing so. Not necessarily primarily to avoid repeated
API changes, but because I don't think the v13 changes went in the quite
right direction.
ISTM that we should:
- have the three callbacks you mention above
- change WALSegmentOpen to also get the XLogReaderState
- add private state to WALOpenSegment, so it can be used even when not
accessing data in files / when one needs more information to close the
file.
- disambiguate between WALOpenSegment (struct describing an open
segment) and WALSegmentOpen (callback to open a segment) (note that
the read page callback uses a *CB naming, why not follow?)
Greetings,
Andres Freund
On 2020-Apr-22, Andres Freund wrote:
On 2020-04-22 13:57:54 -0400, Alvaro Herrera wrote:
Concretely, I propose to have a new struct like
typedef struct xlogReaderFuncs
{
XLogPageReadCB read_page;
XLogSegmentOpenCB open_segment;
XLogSegmentCloseCB open_segment;
} xlogReaderFuncs;#define XLOGREADER_FUNCS(...) &(xlogReaderFuncs){__VA_ARGS__}
Not sure I quite see the point of that helper macro...
Avoid the ugly cast -- same discussion we had for ARCHIVE_OPTS in
pg_dump code in commit f831d4accda0.
ISTM that we should:
- have the three callbacks you mention above
- change WALSegmentOpen to also get the XLogReaderState
- add private state to WALOpenSegment, so it can be used even when not
accessing data in files / when one needs more information to close the
file.
- disambiguate between WALOpenSegment (struct describing an open
segment) and WALSegmentOpen (callback to open a segment) (note that
the read page callback uses a *CB naming, why not follow?)
Sounds good.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 2020-Apr-22, Andres Freund wrote:
I'm in favor of doing so. Not necessarily primarily to avoid repeated
API changes, but because I don't think the v13 changes went in the quite
right direction.ISTM that we should:
- have the three callbacks you mention above
- change WALSegmentOpen to also get the XLogReaderState
- add private state to WALOpenSegment, so it can be used even when not
accessing data in files / when one needs more information to close the
file.
- disambiguate between WALOpenSegment (struct describing an open
segment) and WALSegmentOpen (callback to open a segment) (note that
the read page callback uses a *CB naming, why not follow?)
Here's a first attempt at that. The segment_open/close callbacks are
now given at XLogReaderAllocate time, and are passed the XLogReaderState
pointer. I wrote a comment to explain that the page_read callback can
use WALRead() if it wishes to do so; but if it does, then segment_open
has to be provided. segment_close is mandatory (since we call it at
XLogReaderFree).
Of the half a dozen cases that exist, three are slightly weird:
* Physical walsender does not use a xlogreader at all. I think we could
beat that code up so that it does. But for the moment I just cons up
a fake xlogreader, which only has the segment_open pointer set up, so
that it can call WALRead.
* main xlog.c uses an xlogreader with XLogPageRead(), which does not use
WALRead. Therefore it does not pass open_segment. It does not use
xlogreader->seg.ws_file either. Eventually we may want to beat this
one up also.
* pg_rewind has its own page read callback, SimpleXLogPageRead, which
does all the required opening and closing. I don't think it'd be an
improvement to force this to use segment_open. Oddly enough, it calls
itself "simple" but is unique in having the ability to read files from
the wal archive.
All tests are passing for me.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
0001-Rework-XLogReader-callback-system.patchtext/x-diff; charset=us-asciiDownload
From 3b779f1ea5e6cd4861037cf37a8d9375eb08fcf1 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 23 Apr 2020 18:02:20 -0400
Subject: [PATCH] Rework XLogReader callback system
segment_open and segment_close are passed in at XLogReaderAllocate time,
together with page_read; add comment to explain the role of WALRead.
---
src/backend/access/transam/twophase.c | 5 +-
src/backend/access/transam/xlog.c | 10 +-
src/backend/access/transam/xlogreader.c | 49 ++++----
src/backend/access/transam/xlogutils.c | 24 ++--
src/backend/replication/logical/logical.c | 20 +--
.../replication/logical/logicalfuncs.c | 4 +-
src/backend/replication/slotfuncs.c | 10 +-
src/backend/replication/walsender.c | 37 ++++--
src/bin/pg_rewind/parsexlog.c | 9 +-
src/bin/pg_waldump/pg_waldump.c | 30 +++--
src/include/access/xlogreader.h | 114 +++++++++++-------
src/include/access/xlogutils.h | 5 +
src/include/replication/logical.h | 4 +-
13 files changed, 208 insertions(+), 113 deletions(-)
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a..e1904877fa 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
char *errormsg;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &read_local_xlog_page, NULL);
+ XL_ROUTINE(.page_read = &read_local_xlog_page,
+ .segment_open = &wal_segment_open,
+ .segment_close = &wal_segment_close),
+ NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 11e32733c4..797b9c490e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1222,7 +1222,7 @@ XLogInsertRecord(XLogRecData *rdata,
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
- NULL, NULL);
+ XL_ROUTINE(), NULL);
if (!debug_reader)
{
@@ -6467,8 +6467,12 @@ StartupXLOG(void)
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &XLogPageRead, &private);
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 79ff976474..a307faea8b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir,
- XLogPageReadCB pagereadfunc, void *private_data)
+ XLogReaderRoutine *routine, void *private_data)
{
XLogReaderState *state;
@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
if (!state)
return NULL;
+ /* initialize caller-provided support functions */
+ state->routine = *routine;
+
state->max_block_id = -1;
/*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir);
- state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
int block_id;
if (state->seg.ws_file != -1)
- close(state->seg.ws_file);
+ state->routine.segment_close(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord().
*
- * If the read_page callback fails to read the requested data, NULL is
+ * If the page_read callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
* is set to NULL.
*
@@ -559,10 +561,10 @@ err:
/*
* Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * via the page_read() callback.
*
* Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * is set in that case (unless the error occurs in the page_read callback).
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* Data is not in our buffer.
*
* Every time we actually read the segment, even if we looked at parts of
- * it before, we need to do verification as the read_page callback might
+ * it before, we need to do verification as the page_read callback might
* now be rereading data from a different source.
*
* Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
- readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* First, read the requested data length, but at least a short page header
* so that we can validate it.
*/
- readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* still not enough */
if (readLen < XLogPageHeaderSize(hdr))
{
- readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
}
@@ -1041,11 +1043,12 @@ err:
#endif /* FRONTEND */
/*
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ *
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
- * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback
- * to open the next segment, if necessary.
+ * 'seg/segcxt' identify the last segment used.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details.
@@ -1054,9 +1057,10 @@ err:
* WAL buffers when possible.
*/
bool
-WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALOpenSegment *seg, WALSegmentContext *segcxt,
- WALSegmentOpen openSegment, WALReadError *errinfo)
+ WALReadError *errinfo)
{
char *p;
XLogRecPtr recptr;
@@ -1086,10 +1090,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
XLogSegNo nextSegNo;
if (seg->ws_file >= 0)
- close(seg->ws_file);
+ state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
- seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+ seg->ws_file = state->routine.segment_open(state, nextSegNo,
+ segcxt, &tli);
/* Update the current segment info. */
seg->ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161..bbd801513a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
}
-/* openSegment callback for WALRead */
-static int
-wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
- TimeLineID *tli_p)
+/* XLogReaderRoutine->segment_open callback for local pg_wal files */
+int
+wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
return -1; /* keep compiler quiet */
}
+/* stock XLogReaderRoutine->segment_close callback */
+void
+wal_segment_close(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
/*
- * read_page callback for reading local xlog files
+ * XLogReaderRoutine->page_read callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
* output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
- &state->segcxt, wal_segment_open, &errinfo))
+ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+ &state->seg, &state->segcxt,
+ &errinfo))
WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..dc69e5ce5f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
* Otherwise, we set for decoding to start from the given LSN without
* marking WAL reserved beforehand. In that scenario, it's up to the
* caller to guarantee that WAL remains available.
- * read_page, prepare_write, do_write, update_progress --
+ * xl_routine -- XLogReaderRoutine for underlying XLogReader
+ * prepare_write, do_write, update_progress --
* callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- read_page, prepare_write, do_write,
+ xl_routine, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
* fast_forward
* bypass the generation of logical changes.
*
- * read_page, prepare_write, do_write, update_progress
+ * xl_routine
+ * XLogReaderRoutine used by underlying xlogreader
+ *
+ * prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@@ -372,7 +376,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, read_page, prepare_write,
+ fast_forward, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f5384f1df8..cfef7f3b6c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
- read_local_xlog_page,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f776de3df7..6a2c0cb6bc 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
restart_lsn,
- read_local_xlog_page, NULL, NULL,
- NULL);
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
/*
* If caller needs us to determine the decoding start point, do so now.
@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
- read_local_xlog_page,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
NULL, NULL, NULL);
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0e933228fc..55afed683f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -54,8 +54,8 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
#include "access/xlogutils.h"
-
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
+static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
@@ -792,7 +792,8 @@ StartReplication(StartReplicationCmd *cmd)
}
/*
- * read_page callback for logical decoding contexts, as a walsender process.
+ * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
+ * walsender process.
*
* Inside the walsender we can do better than read_local_xlog_page,
* which has to do a plain sleep/busy loop, because the walsender's latch gets
@@ -826,7 +827,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- if (!WALRead(cur_page,
+ if (!WALRead(state,
+ cur_page,
targetPagePtr,
XLOG_BLCKSZ,
sendSeg->ws_tli, /* Pass the current TLI because only
@@ -834,7 +836,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* TLI is needed. */
sendSeg,
sendCxt,
- WalSndSegmentOpen,
&errinfo))
WALReadRaiseError(&errinfo);
@@ -999,7 +1000,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
- logical_read_xlog_page,
+ XL_ROUTINE(.page_read = logical_read_xlog_page,
+ .segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -1155,7 +1158,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
- logical_read_xlog_page,
+ XL_ROUTINE(.page_read = logical_read_xlog_page,
+ .segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -2422,9 +2427,10 @@ WalSndKill(int code, Datum arg)
SpinLockRelease(&walsnd->mutex);
}
-/* walsender's openSegment callback for WALRead */
+/* XLogReaderRoutine->segment_open callback */
static int
-WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WalSndSegmentOpen(XLogReaderState *state,
+ XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
char path[MAXPGPATH];
@@ -2512,6 +2518,12 @@ XLogSendPhysical(void)
Size nbytes;
XLogSegNo segno;
WALReadError errinfo;
+ static XLogReaderState fake_xlogreader =
+ {
+ /* XXX no page_read routine used by physical walsender */
+ .routine.segment_open = WalSndSegmentOpen,
+ .routine.segment_close = wal_segment_close
+ };
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2729,7 +2741,9 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- if (!WALRead(&output_message.data[output_message.len],
+ /* XXX for xlogreader use, we'd call XLogBeginRead+XLogReadRecord here */
+ if (!WALRead(&fake_xlogreader,
+ &output_message.data[output_message.len],
startptr,
nbytes,
sendSeg->ws_tli, /* Pass the current TLI because only
@@ -2737,7 +2751,6 @@ retry:
* TLI is needed. */
sendSeg,
sendCxt,
- WalSndSegmentOpen,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 14a5db5433..5a10faa614 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -115,7 +116,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
XLogRecPtr endptr;
private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -174,7 +176,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index d7bd9ccac2..e29f65500f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname)
return NULL; /* not reached */
}
-/* pg_waldump's openSegment callback for WALRead */
+/* pg_waldump's XLogReaderRoutine->segment_open callback */
static int
-WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WALDumpOpenSegment(XLogReaderState *state,
+ XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
}
/*
- * XLogReader read_page callback
+ * pg_waldump's XLogReaderRoutine->segment_close callback. Same as
+ * wal_segment_close
*/
+static void
+WALDumpCloseSegment(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
+/* pg_waldump's XLogReaderRoutine->page_read callback */
static int
WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetPtr, char *readBuff)
@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
}
- if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
- &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+ if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
+ &state->seg, &state->segcxt,
+ &errinfo))
{
WALOpenSegment *seg = &errinfo.wre_seg;
char fname[MAXPGPATH];
@@ -1031,8 +1043,12 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
- &private);
+ xlogreader_state =
+ XLogReaderAllocate(WalSegSz, waldir,
+ XL_ROUTINE(.page_read = WALDumpReadPage,
+ .segment_open = WALDumpOpenSegment,
+ .segment_close = WALDumpCloseSegment),
+ &private);
if (!xlogreader_state)
fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e18..6b9f7db646 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -17,6 +17,13 @@
* XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
* until it returns NULL.
*
+ * Callers supply a page_read callback if they want to to call
+ * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
+ * otherwise. The WALRead function can be used as a helper to write
+ * page_read callbacks, but it is not mandatory; callers that use it,
+ * must supply open_segment callbacks. The close_segment callback
+ * must always be supplied.
+ *
* After reading a record with XLogReadRecord(), it's decomposed into
* the per-block and main data parts, and the parts can be accessed
* with the XLogRec* macros and functions. You can also decode a
@@ -50,12 +57,64 @@ typedef struct WALSegmentContext
typedef struct XLogReaderState XLogReaderState;
-/* Function type definition for the read_page callback */
+/* Function type definitions for various xlogreader interactions */
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
+typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
+
+typedef struct XLogReaderRoutine
+{
+ /*
+ * Data input callback
+ *
+ * This callback shall read at least reqLen valid bytes of the xlog page
+ * starting at targetPagePtr, and store them in readBuf. The callback
+ * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+ * -1 on failure. The callback shall sleep, if necessary, to wait for the
+ * requested bytes to become available. The callback will not be invoked
+ * again for the same page unless more than the returned number of bytes
+ * are needed.
+ *
+ * targetRecPtr is the position of the WAL record we're reading. Usually
+ * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
+ * to read and verify the page or segment header, before it reads the
+ * actual WAL record it's interested in. In that case, targetRecPtr can
+ * be used to determine which timeline to read the page from.
+ *
+ * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+ * read from.
+ */
+ XLogPageReadCB page_read;
+
+ /*
+ * Callback to open the specified WAL segment for reading. Returns a
+ * valid file descriptor when the file was opened successfully.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can
+ * use it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in
+ * backend code, whereas open(2) should be used in frontend.
+ */
+ WALSegmentOpenCB segment_open;
+
+ /* WAL segment close callback */
+ WALSegmentCloseCB segment_close;
+} XLogReaderRoutine;
+
+#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
typedef struct
{
@@ -88,33 +147,16 @@ typedef struct
struct XLogReaderState
{
+ /*
+ * Operational callbacks
+ */
+ XLogReaderRoutine routine;
+
/* ----------------------------------------
* Public parameters
* ----------------------------------------
*/
- /*
- * Data input callback (mandatory).
- *
- * This callback shall read at least reqLen valid bytes of the xlog page
- * starting at targetPagePtr, and store them in readBuf. The callback
- * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
- * -1 on failure. The callback shall sleep, if necessary, to wait for the
- * requested bytes to become available. The callback will not be invoked
- * again for the same page unless more than the returned number of bytes
- * are needed.
- *
- * targetRecPtr is the position of the WAL record we're reading. Usually
- * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
- * to read and verify the page or segment header, before it reads the
- * actual WAL record it's interested in. In that case, targetRecPtr can
- * be used to determine which timeline to read the page from.
- *
- * The callback shall set ->seg.ws_tli to the TLI of the file the page was
- * read from.
- */
- XLogPageReadCB read_page;
-
/*
* System identifier of the xlog files we're about to read. Set to zero
* (the default value) if unknown or unimportant.
@@ -214,30 +256,13 @@ struct XLogReaderState
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir,
- XLogPageReadCB pagereadfunc,
+ XLogReaderRoutine *routine,
void *private_data);
+extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
-/*
- * Callback to open the specified WAL segment for reading. Returns a valid
- * file descriptor when the file was opened successfully.
- *
- * "nextSegNo" is the number of the segment to be opened.
- *
- * "segcxt" is additional information about the segment.
- *
- * "tli_p" is an input/output argument. XLogRead() uses it to pass the
- * timeline in which the new segment should be found, but the callback can use
- * it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in backend
- * code, whereas open(2) should be used in frontend.
- */
-typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
-
/* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
@@ -269,9 +294,10 @@ typedef struct WALReadError
WALOpenSegment wre_seg; /* Segment we tried to read from. */
} WALReadError;
-extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+extern bool WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALOpenSegment *seg,
- WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+ WALSegmentContext *segcxt,
WALReadError *errinfo);
/* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d9..68ce815476 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
+extern int wal_segment_open(XLogReaderState *state,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 3b7ca7f1da..c2f2475e5d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
--
2.20.1
At Thu, 23 Apr 2020 19:16:03 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
On 2020-Apr-22, Andres Freund wrote:
I'm in favor of doing so. Not necessarily primarily to avoid repeated
API changes, but because I don't think the v13 changes went in the quite
right direction.ISTM that we should:
- have the three callbacks you mention above
- change WALSegmentOpen to also get the XLogReaderState
- add private state to WALOpenSegment, so it can be used even when not
accessing data in files / when one needs more information to close the
file.
- disambiguate between WALOpenSegment (struct describing an open
segment) and WALSegmentOpen (callback to open a segment) (note that
the read page callback uses a *CB naming, why not follow?)Here's a first attempt at that. The segment_open/close callbacks are
now given at XLogReaderAllocate time, and are passed the XLogReaderState
pointer. I wrote a comment to explain that the page_read callback can
use WALRead() if it wishes to do so; but if it does, then segment_open
has to be provided. segment_close is mandatory (since we call it at
XLogReaderFree).Of the half a dozen cases that exist, three are slightly weird:
* Physical walsender does not use a xlogreader at all. I think we could
beat that code up so that it does. But for the moment I just cons up
a fake xlogreader, which only has the segment_open pointer set up, so
that it can call WALRead.* main xlog.c uses an xlogreader with XLogPageRead(), which does not use
WALRead. Therefore it does not pass open_segment. It does not use
xlogreader->seg.ws_file either. Eventually we may want to beat this
one up also.* pg_rewind has its own page read callback, SimpleXLogPageRead, which
does all the required opening and closing. I don't think it'd be an
improvement to force this to use segment_open. Oddly enough, it calls
itself "simple" but is unique in having the ability to read files from
the wal archive.All tests are passing for me.
I modestly object to such many call-back functions. FWIW I'm writing
this with [1]/messages/by-id/20200422.101246.331162888498679491.horikyota.ntt@gmail.com in my mind.
An open-callback is bound to a read-callback. A close-callback is
bound to the way the read-callback opens a segment (or the
open-callback). I'm afraid that only adding "cleanup" callback might
be sufficient.
[1]: /messages/by-id/20200422.101246.331162888498679491.horikyota.ntt@gmail.com
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On 2020-Apr-24, Kyotaro Horiguchi wrote:
At Thu, 23 Apr 2020 19:16:03 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
Here's a first attempt at that. The segment_open/close callbacks are
now given at XLogReaderAllocate time, and are passed the XLogReaderState
pointer. I wrote a comment to explain that the page_read callback can
use WALRead() if it wishes to do so; but if it does, then segment_open
has to be provided. segment_close is mandatory (since we call it at
XLogReaderFree).
I modestly object to such many call-back functions. FWIW I'm writing
this with [1] in my mind.
[1] /messages/by-id/20200422.101246.331162888498679491.horikyota.ntt@gmail.com
Hmm. Looking at your 0001, I think there's nothing in that patch that's
not compatible with my proposed API change.
0002 is a completely different story of course; but that patch is a
radical change of spirit for xlogreader, in the sense that it's no
longer a "reader", but rather just an interpreter of bytes from a WAL
byte sequence into WAL records; and shifts the responsibility of the
actual reading to the caller. That's why xlogreader no longer receives
the page_read callback (and why it doesn't need the segment_open,
segment_close callbacks).
I have to admit that until today I hadn't realized that that's what your
patch series was doing. I'm not familiar with how you intend to
implement WAL encryption on top of this, but on first blush I'm not
liking this proposed design too much.
An open-callback is bound to a read-callback. A close-callback is
bound to the way the read-callback opens a segment (or the
open-callback). I'm afraid that only adding "cleanup" callback might
be sufficient.
Well, the complaint is that the current layering is weird, in that there
are two levels at which we pass callbacks: one is XLogReaderAllocate,
where you specify the page_read callback; and the other layer is inside
the page_read callback, if that layer uses the WALRead auxiliary
function. The thing that my patch is doing is pass all three callbacks
at the XLogReaderAllocate level. So when xlogreader drills down to
read_page, xlogreader already has the segment_open callback handy if it
needs it. Conceptually, this seems more sensible.
I think a "cleanup" callback might also be sensible in general terms,
but we have a problem with the specifics -- namely that the "state" that
we need to clean up (the file descriptor of the open segment) is part of
xlogreader's state. And we obviously cannot remove the FD from
XLogReaderState, because when we need the FD to do things with it to
obtain data from the file.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
At Fri, 24 Apr 2020 11:48:46 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
On 2020-Apr-24, Kyotaro Horiguchi wrote:
At Thu, 23 Apr 2020 19:16:03 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
Here's a first attempt at that. The segment_open/close callbacks are
now given at XLogReaderAllocate time, and are passed the XLogReaderState
pointer. I wrote a comment to explain that the page_read callback can
use WALRead() if it wishes to do so; but if it does, then segment_open
has to be provided. segment_close is mandatory (since we call it at
XLogReaderFree).I modestly object to such many call-back functions. FWIW I'm writing
this with [1] in my mind.
[1] /messages/by-id/20200422.101246.331162888498679491.horikyota.ntt@gmail.comHmm. Looking at your 0001, I think there's nothing in that patch that's
not compatible with my proposed API change.0002 is a completely different story of course; but that patch is a
radical change of spirit for xlogreader, in the sense that it's no
longer a "reader", but rather just an interpreter of bytes from a WAL
byte sequence into WAL records; and shifts the responsibility of the
actual reading to the caller. That's why xlogreader no longer receives
the page_read callback (and why it doesn't need the segment_open,
segment_close callbacks).
Sorry for the ambiguity, I didn't meant I minded that this conflicts
with my patch or I don't want this to be committed. It is easily
rebased on this patch. What I was anxious about is that the new
callback struct might be too flexible than required. So I "mildly"
objected, and I won't be dissapointed if this patch is committed.
I have to admit that until today I hadn't realized that that's what your
patch series was doing. I'm not familiar with how you intend to
implement WAL encryption on top of this, but on first blush I'm not
liking this proposed design too much.
Right. I might be too much in detail, but it simplifies the call
tree. Anyway that is another discussion, though:)
An open-callback is bound to a read-callback. A close-callback is
bound to the way the read-callback opens a segment (or the
open-callback). I'm afraid that only adding "cleanup" callback might
be sufficient.Well, the complaint is that the current layering is weird, in that there
are two levels at which we pass callbacks: one is XLogReaderAllocate,
where you specify the page_read callback; and the other layer is inside
the page_read callback, if that layer uses the WALRead auxiliary
function. The thing that my patch is doing is pass all three callbacks
at the XLogReaderAllocate level. So when xlogreader drills down to
read_page, xlogreader already has the segment_open callback handy if it
needs it. Conceptually, this seems more sensible.
It looks like as if the open/read/close-callbacks are generic and on
the same interface layer, but actually open-callback is dedicate to
WALRead and it is useless when the read-callback doesn't use
WALRead. What I was anxious about is that the open-callback is
uselessly exposing the secret of the read-callback.
I think a "cleanup" callback might also be sensible in general terms,
but we have a problem with the specifics -- namely that the "state" that
we need to clean up (the file descriptor of the open segment) is part of
xlogreader's state. And we obviously cannot remove the FD from
XLogReaderState, because when we need the FD to do things with it to
obtain data from the file.
I meant concretely that we only have read- and cleanup- callbacks in
xlogreader state. The caller of XLogReaderAllocate specifies the
cleanup-callback that is to be used to clean up what the
reader-callback left behind, in the same manner with this patch does.
The only reason it is not named close-callback is that it is used only
when the xlogreader-state is destroyed. So I'm fine with
read/close-callbacks.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On 2020-Apr-27, Kyotaro Horiguchi wrote:
At Fri, 24 Apr 2020 11:48:46 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
Sorry for the ambiguity, I didn't meant I minded that this conflicts
with my patch or I don't want this to be committed. It is easily
rebased on this patch. What I was anxious about is that the new
callback struct might be too flexible than required. So I "mildly"
objected, and I won't be dissapointed if this patch is committed.
... well, yeah, maybe it is too flexible. And perhaps we could further
tweak this interface so that the file descriptor is not part of
XLogReader at all -- with such a change, it would make more sense to
worry about the "close" callback not being "close" but something like
"cleanup", as you suggest. But right now, and thinking from the point
of view of going into postgres 13 beta shortly, it seems to me that
XLogReader is just a very leaky abstraction since both itself and its
users are aware of the fact that there is a file descriptor.
Maybe with your rework for encryption you'll want to remove the FD from
XLogReader at all, and move it elsewhere. Or maybe not. But it seems
to me that my suggested approach is sensible, and better than the
current situation. (Let's keep in mind that the primary concern here is
that the callstack is way too complicated -- you ask XlogReader for
data, it calls your Read callback, that one calls WALRead passing your
openSegment callback and stuffs the FD in XLogReaderState ... a sieve it
is, the way it leaks, not an abstraction.)
I have to admit that until today I hadn't realized that that's what your
patch series was doing. I'm not familiar with how you intend to
implement WAL encryption on top of this, but on first blush I'm not
liking this proposed design too much.Right. I might be too much in detail, but it simplifies the call
tree. Anyway that is another discussion, though:)
Okay. We can discuss further changes later, of course.
It looks like as if the open/read/close-callbacks are generic and on
the same interface layer, but actually open-callback is dedicate to
WALRead and it is useless when the read-callback doesn't use
WALRead. What I was anxious about is that the open-callback is
uselessly exposing the secret of the read-callback.
Well, I don't think we care about that. WALRead can be thought of as
just a helper function that you may use to write your read callback.
The comments I added explain this.
I meant concretely that we only have read- and cleanup- callbacks in
xlogreader state. The caller of XLogReaderAllocate specifies the
cleanup-callback that is to be used to clean up what the
reader-callback left behind, in the same manner with this patch does.
The only reason it is not named close-callback is that it is used only
when the xlogreader-state is destroyed. So I'm fine with
read/close-callbacks.
We can revisit the current design in the future. For example for
encryption we might decide to remove the current-open-segment FD from
XLogReaderState and then things might be different. (I think the
current design is based a lot on historical code, rather than being
optimal.)
Since your objection isn't strong, I propose to commit same patch as
before, only rebased as conflicted with cd123234404e and this comment
prologuing WALRead:
/*
* Helper function to ease writing of XLogRoutine->page_read callbacks.
* If this function is used, caller must supply an open_segment callback in
* 'state', as that is used here.
[... rest is same as before ...]
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
v2-0001-Rework-XLogReader-callback-system.patchtext/x-diff; charset=us-asciiDownload
From fbe94ae201bc3963c71be65cb30f820da9591aeb Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 23 Apr 2020 18:02:20 -0400
Subject: [PATCH v2] Rework XLogReader callback system
segment_open and segment_close are passed in at XLogReaderAllocate time,
together with page_read; add comment to explain the role of WALRead.
---
src/backend/access/transam/twophase.c | 5 +-
src/backend/access/transam/xlog.c | 10 +-
src/backend/access/transam/xlogreader.c | 51 ++++----
src/backend/access/transam/xlogutils.c | 24 ++--
src/backend/replication/logical/logical.c | 20 +--
.../replication/logical/logicalfuncs.c | 4 +-
src/backend/replication/slotfuncs.c | 10 +-
src/backend/replication/walsender.c | 37 ++++--
src/bin/pg_rewind/parsexlog.c | 9 +-
src/bin/pg_waldump/pg_waldump.c | 30 +++--
src/include/access/xlogreader.h | 114 +++++++++++-------
src/include/access/xlogutils.h | 5 +
src/include/replication/logical.h | 4 +-
13 files changed, 210 insertions(+), 113 deletions(-)
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a..e1904877fa 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
char *errormsg;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &read_local_xlog_page, NULL);
+ XL_ROUTINE(.page_read = &read_local_xlog_page,
+ .segment_open = &wal_segment_open,
+ .segment_close = &wal_segment_close),
+ NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0d3d670928..a53e6d9633 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata,
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
- NULL, NULL);
+ XL_ROUTINE(), NULL);
if (!debug_reader)
{
@@ -6478,8 +6478,12 @@ StartupXLOG(void)
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &XLogPageRead, &private);
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 79ff976474..7cee8b92c9 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir,
- XLogPageReadCB pagereadfunc, void *private_data)
+ XLogReaderRoutine *routine, void *private_data)
{
XLogReaderState *state;
@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
if (!state)
return NULL;
+ /* initialize caller-provided support functions */
+ state->routine = *routine;
+
state->max_block_id = -1;
/*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir);
- state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
int block_id;
if (state->seg.ws_file != -1)
- close(state->seg.ws_file);
+ state->routine.segment_close(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord().
*
- * If the read_page callback fails to read the requested data, NULL is
+ * If the page_read callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
* is set to NULL.
*
@@ -559,10 +561,10 @@ err:
/*
* Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * via the page_read() callback.
*
* Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * is set in that case (unless the error occurs in the page_read callback).
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* Data is not in our buffer.
*
* Every time we actually read the segment, even if we looked at parts of
- * it before, we need to do verification as the read_page callback might
+ * it before, we need to do verification as the page_read callback might
* now be rereading data from a different source.
*
* Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
- readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* First, read the requested data length, but at least a short page header
* so that we can validate it.
*/
- readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* still not enough */
if (readLen < XLogPageHeaderSize(hdr))
{
- readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
}
@@ -1041,11 +1043,14 @@ err:
#endif /* FRONTEND */
/*
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ * If this function is used, caller must supply an open_segment callback in
+ * 'state', as that is used here.
+ *
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
- * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback
- * to open the next segment, if necessary.
+ * 'seg/segcxt' identify the last segment used.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details.
@@ -1054,9 +1059,10 @@ err:
* WAL buffers when possible.
*/
bool
-WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALOpenSegment *seg, WALSegmentContext *segcxt,
- WALSegmentOpen openSegment, WALReadError *errinfo)
+ WALReadError *errinfo)
{
char *p;
XLogRecPtr recptr;
@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
XLogSegNo nextSegNo;
if (seg->ws_file >= 0)
- close(seg->ws_file);
+ state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
- seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+ seg->ws_file = state->routine.segment_open(state, nextSegNo,
+ segcxt, &tli);
/* Update the current segment info. */
seg->ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161..bbd801513a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
}
-/* openSegment callback for WALRead */
-static int
-wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
- TimeLineID *tli_p)
+/* XLogReaderRoutine->segment_open callback for local pg_wal files */
+int
+wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
return -1; /* keep compiler quiet */
}
+/* stock XLogReaderRoutine->segment_close callback */
+void
+wal_segment_close(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
/*
- * read_page callback for reading local xlog files
+ * XLogReaderRoutine->page_read callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
* output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
- &state->segcxt, wal_segment_open, &errinfo))
+ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+ &state->seg, &state->segcxt,
+ &errinfo))
WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..dc69e5ce5f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
* Otherwise, we set for decoding to start from the given LSN without
* marking WAL reserved beforehand. In that scenario, it's up to the
* caller to guarantee that WAL remains available.
- * read_page, prepare_write, do_write, update_progress --
+ * xl_routine -- XLogReaderRoutine for underlying XLogReader
+ * prepare_write, do_write, update_progress --
* callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- read_page, prepare_write, do_write,
+ xl_routine, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
* fast_forward
* bypass the generation of logical changes.
*
- * read_page, prepare_write, do_write, update_progress
+ * xl_routine
+ * XLogReaderRoutine used by underlying xlogreader
+ *
+ * prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@@ -372,7 +376,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, read_page, prepare_write,
+ fast_forward, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fded8e8290..b99c94e848 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
- read_local_xlog_page,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ae751e94e7..26890dffb4 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
restart_lsn,
- read_local_xlog_page, NULL, NULL,
- NULL);
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
/*
* If caller needs us to determine the decoding start point, do so now.
@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
- read_local_xlog_page,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
NULL, NULL, NULL);
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8b55bbfcb2..e2204d80ee 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -54,8 +54,8 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
#include "access/xlogutils.h"
-
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
+static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
@@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd)
}
/*
- * read_page callback for logical decoding contexts, as a walsender process.
+ * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
+ * walsender process.
*
* Inside the walsender we can do better than read_local_xlog_page,
* which has to do a plain sleep/busy loop, because the walsender's latch gets
@@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- if (!WALRead(cur_page,
+ if (!WALRead(state,
+ cur_page,
targetPagePtr,
XLOG_BLCKSZ,
sendSeg->ws_tli, /* Pass the current TLI because only
@@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* TLI is needed. */
sendSeg,
sendCxt,
- WalSndSegmentOpen,
&errinfo))
WALReadRaiseError(&errinfo);
@@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
- logical_read_xlog_page,
+ XL_ROUTINE(.page_read = logical_read_xlog_page,
+ .segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
- logical_read_xlog_page,
+ XL_ROUTINE(.page_read = logical_read_xlog_page,
+ .segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg)
SpinLockRelease(&walsnd->mutex);
}
-/* walsender's openSegment callback for WALRead */
+/* XLogReaderRoutine->segment_open callback */
static int
-WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WalSndSegmentOpen(XLogReaderState *state,
+ XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
char path[MAXPGPATH];
@@ -2531,6 +2537,12 @@ XLogSendPhysical(void)
Size nbytes;
XLogSegNo segno;
WALReadError errinfo;
+ static XLogReaderState fake_xlogreader =
+ {
+ /* XXX no page_read routine used by physical walsender */
+ .routine.segment_open = WalSndSegmentOpen,
+ .routine.segment_close = wal_segment_close
+ };
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2748,7 +2760,9 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- if (!WALRead(&output_message.data[output_message.len],
+ /* XXX for xlogreader use, we'd call XLogBeginRead+XLogReadRecord here */
+ if (!WALRead(&fake_xlogreader,
+ &output_message.data[output_message.len],
startptr,
nbytes,
sendSeg->ws_tli, /* Pass the current TLI because only
@@ -2756,7 +2770,6 @@ retry:
* TLI is needed. */
sendSeg,
sendCxt,
- WalSndSegmentOpen,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index c51b5db315..d637f5eb77 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index d7bd9ccac2..e29f65500f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname)
return NULL; /* not reached */
}
-/* pg_waldump's openSegment callback for WALRead */
+/* pg_waldump's XLogReaderRoutine->segment_open callback */
static int
-WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WALDumpOpenSegment(XLogReaderState *state,
+ XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
}
/*
- * XLogReader read_page callback
+ * pg_waldump's XLogReaderRoutine->segment_close callback. Same as
+ * wal_segment_close
*/
+static void
+WALDumpCloseSegment(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
+/* pg_waldump's XLogReaderRoutine->page_read callback */
static int
WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetPtr, char *readBuff)
@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
}
- if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
- &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+ if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
+ &state->seg, &state->segcxt,
+ &errinfo))
{
WALOpenSegment *seg = &errinfo.wre_seg;
char fname[MAXPGPATH];
@@ -1031,8 +1043,12 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
- &private);
+ xlogreader_state =
+ XLogReaderAllocate(WalSegSz, waldir,
+ XL_ROUTINE(.page_read = WALDumpReadPage,
+ .segment_open = WALDumpOpenSegment,
+ .segment_close = WALDumpCloseSegment),
+ &private);
if (!xlogreader_state)
fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e18..6b9f7db646 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -17,6 +17,13 @@
* XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
* until it returns NULL.
*
+ * Callers supply a page_read callback if they want to to call
+ * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
+ * otherwise. The WALRead function can be used as a helper to write
+ * page_read callbacks, but it is not mandatory; callers that use it,
+ * must supply open_segment callbacks. The close_segment callback
+ * must always be supplied.
+ *
* After reading a record with XLogReadRecord(), it's decomposed into
* the per-block and main data parts, and the parts can be accessed
* with the XLogRec* macros and functions. You can also decode a
@@ -50,12 +57,64 @@ typedef struct WALSegmentContext
typedef struct XLogReaderState XLogReaderState;
-/* Function type definition for the read_page callback */
+/* Function type definitions for various xlogreader interactions */
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
+typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
+
+typedef struct XLogReaderRoutine
+{
+ /*
+ * Data input callback
+ *
+ * This callback shall read at least reqLen valid bytes of the xlog page
+ * starting at targetPagePtr, and store them in readBuf. The callback
+ * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+ * -1 on failure. The callback shall sleep, if necessary, to wait for the
+ * requested bytes to become available. The callback will not be invoked
+ * again for the same page unless more than the returned number of bytes
+ * are needed.
+ *
+ * targetRecPtr is the position of the WAL record we're reading. Usually
+ * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
+ * to read and verify the page or segment header, before it reads the
+ * actual WAL record it's interested in. In that case, targetRecPtr can
+ * be used to determine which timeline to read the page from.
+ *
+ * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+ * read from.
+ */
+ XLogPageReadCB page_read;
+
+ /*
+ * Callback to open the specified WAL segment for reading. Returns a
+ * valid file descriptor when the file was opened successfully.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can
+ * use it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in
+ * backend code, whereas open(2) should be used in frontend.
+ */
+ WALSegmentOpenCB segment_open;
+
+ /* WAL segment close callback */
+ WALSegmentCloseCB segment_close;
+} XLogReaderRoutine;
+
+#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
typedef struct
{
@@ -88,33 +147,16 @@ typedef struct
struct XLogReaderState
{
+ /*
+ * Operational callbacks
+ */
+ XLogReaderRoutine routine;
+
/* ----------------------------------------
* Public parameters
* ----------------------------------------
*/
- /*
- * Data input callback (mandatory).
- *
- * This callback shall read at least reqLen valid bytes of the xlog page
- * starting at targetPagePtr, and store them in readBuf. The callback
- * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
- * -1 on failure. The callback shall sleep, if necessary, to wait for the
- * requested bytes to become available. The callback will not be invoked
- * again for the same page unless more than the returned number of bytes
- * are needed.
- *
- * targetRecPtr is the position of the WAL record we're reading. Usually
- * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
- * to read and verify the page or segment header, before it reads the
- * actual WAL record it's interested in. In that case, targetRecPtr can
- * be used to determine which timeline to read the page from.
- *
- * The callback shall set ->seg.ws_tli to the TLI of the file the page was
- * read from.
- */
- XLogPageReadCB read_page;
-
/*
* System identifier of the xlog files we're about to read. Set to zero
* (the default value) if unknown or unimportant.
@@ -214,30 +256,13 @@ struct XLogReaderState
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir,
- XLogPageReadCB pagereadfunc,
+ XLogReaderRoutine *routine,
void *private_data);
+extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
-/*
- * Callback to open the specified WAL segment for reading. Returns a valid
- * file descriptor when the file was opened successfully.
- *
- * "nextSegNo" is the number of the segment to be opened.
- *
- * "segcxt" is additional information about the segment.
- *
- * "tli_p" is an input/output argument. XLogRead() uses it to pass the
- * timeline in which the new segment should be found, but the callback can use
- * it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in backend
- * code, whereas open(2) should be used in frontend.
- */
-typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
-
/* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
@@ -269,9 +294,10 @@ typedef struct WALReadError
WALOpenSegment wre_seg; /* Segment we tried to read from. */
} WALReadError;
-extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+extern bool WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALOpenSegment *seg,
- WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+ WALSegmentContext *segcxt,
WALReadError *errinfo);
/* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d9..68ce815476 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
+extern int wal_segment_open(XLogReaderState *state,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 3b7ca7f1da..c2f2475e5d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
--
2.20.1
At Thu, 7 May 2020 19:28:55 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
On 2020-Apr-27, Kyotaro Horiguchi wrote:
At Fri, 24 Apr 2020 11:48:46 -0400, Alvaro Herrera <alvherre@2ndquadrant.com> wrote in
Sorry for the ambiguity, I didn't meant I minded that this conflicts
with my patch or I don't want this to be committed. It is easily
rebased on this patch. What I was anxious about is that the new
callback struct might be too flexible than required. So I "mildly"
objected, and I won't be dissapointed if this patch is committed.... well, yeah, maybe it is too flexible. And perhaps we could further
tweak this interface so that the file descriptor is not part of
XLogReader at all -- with such a change, it would make more sense to
worry about the "close" callback not being "close" but something like
"cleanup", as you suggest. But right now, and thinking from the point
of view of going into postgres 13 beta shortly, it seems to me that
XLogReader is just a very leaky abstraction since both itself and its
users are aware of the fact that there is a file descriptor.
Agreed.
Maybe with your rework for encryption you'll want to remove the FD from
XLogReader at all, and move it elsewhere. Or maybe not. But it seems
to me that my suggested approach is sensible, and better than the
current situation. (Let's keep in mind that the primary concern here is
that the callstack is way too complicated -- you ask XlogReader for
data, it calls your Read callback, that one calls WALRead passing your
openSegment callback and stuffs the FD in XLogReaderState ... a sieve it
is, the way it leaks, not an abstraction.)
I agree that new callback functions is most sensible for getting into
13, of course.
I have to admit that until today I hadn't realized that that's what your
patch series was doing. I'm not familiar with how you intend to
implement WAL encryption on top of this, but on first blush I'm not
liking this proposed design too much.Right. I might be too much in detail, but it simplifies the call
tree. Anyway that is another discussion, though:)Okay. We can discuss further changes later, of course.
It looks like as if the open/read/close-callbacks are generic and on
the same interface layer, but actually open-callback is dedicate to
WALRead and it is useless when the read-callback doesn't use
WALRead. What I was anxious about is that the open-callback is
uselessly exposing the secret of the read-callback.Well, I don't think we care about that. WALRead can be thought of as
just a helper function that you may use to write your read callback.
The comments I added explain this.
Thanks.
I meant concretely that we only have read- and cleanup- callbacks in
xlogreader state. The caller of XLogReaderAllocate specifies the
cleanup-callback that is to be used to clean up what the
reader-callback left behind, in the same manner with this patch does.
The only reason it is not named close-callback is that it is used only
when the xlogreader-state is destroyed. So I'm fine with
read/close-callbacks.We can revisit the current design in the future. For example for
encryption we might decide to remove the current-open-segment FD from
XLogReaderState and then things might be different. (I think the
current design is based a lot on historical code, rather than being
optimal.)Since your objection isn't strong, I propose to commit same patch as
before, only rebased as conflicted with cd123234404e and this comment
prologuing WALRead:/*
* Helper function to ease writing of XLogRoutine->page_read callbacks.
* If this function is used, caller must supply an open_segment callback in
* 'state', as that is used here.
[... rest is same as before ...]
I agree to the direction of this patch. Thanks for the explanation.
The patch looks good to me except the two points below.
+ /* XXX for xlogreader use, we'd call XLogBeginRead+XLogReadRecord here */
+ if (!WALRead(&fake_xlogreader,
+ &output_message.data[output_message.len],
I'm not sure the point of the XXX comment, but I think WALRead here is
the right thing and we aren't going to use
XLogBeginRead+XLogReadRecord here. So it seems to me the comment is
misleading and instead we need such a comment for fake_xlogreader like
this.
+ static XLogReaderState fake_xlogreader =
+ {
+ /* fake reader state only to let WALRead to use the callbacks */
wal_segment_close(XlogReaderState *state) is setting
state->seg.ws_file to -1. On the other hand wal_segment_close(state,..)
doesn't update ws_file and the caller sets the returned value to
(eventually) the same field.
+ seg->ws_file = state->routine.segment_open(state, nextSegNo,
+ segcxt, &tli);
If you are willing to do so, I think it is better to make the callback
functions are responsible to update the seg.ws_file and the callers
don't care.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
On 2020-May-08, Kyotaro Horiguchi wrote:
I agree to the direction of this patch. Thanks for the explanation.
The patch looks good to me except the two points below.
Thanks! I pushed the patch. I fixed the walsender commentary as you
suggested, but I'm still of the opinion that we might want to use the
XLogReader abstraction in physical walsender than work without it; if
nothing else, that would simplify WALRead's API.
I didn't change this one though:
wal_segment_close(XlogReaderState *state) is setting
state->seg.ws_file to -1. On the other hand wal_segment_close(state,..)
doesn't update ws_file and the caller sets the returned value to
(eventually) the same field.+ seg->ws_file = state->routine.segment_open(state, nextSegNo, + segcxt, &tli);If you are willing to do so, I think it is better to make the callback
functions are responsible to update the seg.ws_file and the callers
don't care.
I agree that this would be a good idea, but it's more than just a
handful of lines of changes so I think we should consider it separately.
Attached as 0002. I also realized while doing this that we can further
simplify WALRead()'s API if we're willing to bend walsender a little bit
more into the fake xlogreader thing; that's 0001.
I marked the open item closed nonetheless.
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
Attachments:
0001-fix-WALRead-API-to-take-seg-segcxt-from-XLogReaderSt.patchtext/x-diff; charset=us-asciiDownload
From 600e28124ffb2741482bb096d5615b521ae83850 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 8 May 2020 16:40:24 -0400
Subject: [PATCH 1/2] fix WALRead API to take seg/segcxt from XLogReaderState
---
src/backend/access/transam/xlogreader.c | 31 +++++++++++--------------
src/backend/access/transam/xlogutils.c | 1 -
src/backend/replication/walsender.c | 6 ++---
src/bin/pg_waldump/pg_waldump.c | 1 -
src/include/access/xlogreader.h | 4 +---
5 files changed, 17 insertions(+), 26 deletions(-)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7cee8b92c9..f42dee2640 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1050,8 +1050,6 @@ err:
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
- * 'seg/segcxt' identify the last segment used.
- *
* Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details.
*
@@ -1061,7 +1059,6 @@ err:
bool
WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
- WALOpenSegment *seg, WALSegmentContext *segcxt,
WALReadError *errinfo)
{
char *p;
@@ -1078,34 +1075,34 @@ WALRead(XLogReaderState *state,
int segbytes;
int readbytes;
- startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+ startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/*
* If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's
* provided openSegment callback.
*/
- if (seg->ws_file < 0 ||
- !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
- tli != seg->ws_tli)
+ if (state->seg.ws_file < 0 ||
+ !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
+ tli != state->seg.ws_tli)
{
XLogSegNo nextSegNo;
- if (seg->ws_file >= 0)
+ if (state->seg.ws_file >= 0)
state->routine.segment_close(state);
- XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
- seg->ws_file = state->routine.segment_open(state, nextSegNo,
- segcxt, &tli);
+ XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
+ state->seg.ws_file = state->routine.segment_open(state, nextSegNo,
+ &state->segcxt, &tli);
/* Update the current segment info. */
- seg->ws_tli = tli;
- seg->ws_segno = nextSegNo;
+ state->seg.ws_tli = tli;
+ state->seg.ws_segno = nextSegNo;
}
/* How many bytes are within this segment? */
- if (nbytes > (segcxt->ws_segsize - startoff))
- segbytes = segcxt->ws_segsize - startoff;
+ if (nbytes > (state->segcxt.ws_segsize - startoff))
+ segbytes = state->segcxt.ws_segsize - startoff;
else
segbytes = nbytes;
@@ -1115,7 +1112,7 @@ WALRead(XLogReaderState *state,
/* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0;
- readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+ readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND
pgstat_report_wait_end();
@@ -1127,7 +1124,7 @@ WALRead(XLogReaderState *state,
errinfo->wre_req = segbytes;
errinfo->wre_read = readbytes;
errinfo->wre_off = startoff;
- errinfo->wre_seg = *seg;
+ errinfo->wre_seg = state->seg;
return false;
}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index bbd801513a..fc0bb7d059 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -947,7 +947,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* zero-padded up to the page boundary if it's incomplete.
*/
if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
- &state->seg, &state->segcxt,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d18475b854..ed8c08cb6a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -840,8 +840,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
sendSeg->ws_tli, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed. */
- sendSeg,
- sendCxt,
&errinfo))
WALReadRaiseError(&errinfo);
@@ -2760,6 +2758,8 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
+ fake_xlogreader.seg = *sendSeg;
+ fake_xlogreader.segcxt = *sendCxt;
if (!WALRead(&fake_xlogreader,
&output_message.data[output_message.len],
startptr,
@@ -2767,8 +2767,6 @@ retry:
sendSeg->ws_tli, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed. */
- sendSeg,
- sendCxt,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e29f65500f..46734914b7 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -356,7 +356,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
- &state->seg, &state->segcxt,
&errinfo))
{
WALOpenSegment *seg = &errinfo.wre_seg;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 81af200f5e..e77f478d68 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -301,9 +301,7 @@ typedef struct WALReadError
extern bool WALRead(XLogReaderState *state,
char *buf, XLogRecPtr startptr, Size count,
- TimeLineID tli, WALOpenSegment *seg,
- WALSegmentContext *segcxt,
- WALReadError *errinfo);
+ TimeLineID tli, WALReadError *errinfo);
/* Functions for decoding an XLogRecord */
--
2.20.1
0002-Simplify-XLogReader-s-open_segment-API.patchtext/x-diff; charset=us-asciiDownload
From 721cadab3f1ab88c7b2f12ed2ede5dd3c9455856 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 8 May 2020 17:03:09 -0400
Subject: [PATCH 2/2] Simplify XLogReader's open_segment API
Instead of returning the file descriptor, install it directly in
XLogReaderState->seg.ws_file.
---
src/backend/access/transam/xlogreader.c | 4 ++--
src/backend/access/transam/xlogutils.c | 32 ++++++++++++-------------
src/backend/replication/walsender.c | 12 ++++------
src/bin/pg_waldump/pg_waldump.c | 9 ++++---
src/include/access/xlogreader.h | 12 +++++-----
src/include/access/xlogutils.h | 2 +-
6 files changed, 33 insertions(+), 38 deletions(-)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f42dee2640..a533241370 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1092,8 +1092,8 @@ WALRead(XLogReaderState *state,
state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
- state->seg.ws_file = state->routine.segment_open(state, nextSegNo,
- &state->segcxt, &tli);
+ state->routine.segment_open(state, nextSegNo, &state->segcxt, &tli);
+ Assert(state->seg.ws_file >= 0); /* shouldn't happen */
/* Update the current segment info. */
state->seg.ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index fc0bb7d059..1cc2c624a4 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -784,7 +784,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
/* XLogReaderRoutine->segment_open callback for local pg_wal files */
-int
+void
wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
WALSegmentContext *segcxt, TimeLineID *tli_p)
{
@@ -793,22 +793,20 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
int fd;
XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
- fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (fd >= 0)
- return fd;
-
- if (errno == ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- path)));
- else
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m",
- path)));
-
- return -1; /* keep compiler quiet */
+ state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (state->seg.ws_file < 0)
+ {
+ if (errno == ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ path)));
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+ }
}
/* stock XLogReaderRoutine->segment_close callback */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ed8c08cb6a..b9f029d44f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -248,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
WALSegmentContext *segcxt, TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
@@ -2445,13 +2445,12 @@ WalSndKill(int code, Datum arg)
}
/* XLogReaderRoutine->segment_open callback */
-static int
+static void
WalSndSegmentOpen(XLogReaderState *state,
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
char path[MAXPGPATH];
- int fd;
/*-------
* When reading from a historic timeline, and there is a timeline switch
@@ -2488,9 +2487,9 @@ WalSndSegmentOpen(XLogReaderState *state,
}
XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
- fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
- if (fd >= 0)
- return fd;
+ state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+ if (state->seg.ws_file >= 0)
+ return;
/*
* If the file is not found, assume it's because the standby asked for a
@@ -2513,7 +2512,6 @@ WalSndSegmentOpen(XLogReaderState *state,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m",
path)));
- return -1; /* keep compiler quiet */
}
/*
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 46734914b7..1a5c5a157c 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,7 +280,7 @@ identify_target_directory(char *directory, char *fname)
}
/* pg_waldump's XLogReaderRoutine->segment_open callback */
-static int
+static void
WALDumpOpenSegment(XLogReaderState *state,
XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
@@ -300,9 +300,9 @@ WALDumpOpenSegment(XLogReaderState *state,
*/
for (tries = 0; tries < 10; tries++)
{
- fd = open_file_in_directory(segcxt->ws_dir, fname);
- if (fd >= 0)
- return fd;
+ state->seg.ws_file = open_file_in_directory(segcxt->ws_dir, fname);
+ if (state->seg.ws_file >= 0)
+ return;
if (errno == ENOENT)
{
int save_errno = errno;
@@ -318,7 +318,6 @@ WALDumpOpenSegment(XLogReaderState *state,
}
fatal_error("could not find file \"%s\": %m", fname);
- return -1; /* keep compiler quiet */
}
/*
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index e77f478d68..b73df02218 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -63,10 +63,10 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
-typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
- XLogSegNo nextSegNo,
- WALSegmentContext *segcxt,
- TimeLineID *tli_p);
+typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
typedef struct XLogReaderRoutine
@@ -94,8 +94,8 @@ typedef struct XLogReaderRoutine
XLogPageReadCB page_read;
/*
- * Callback to open the specified WAL segment for reading. The file
- * descriptor of the opened segment shall be returned. In case of
+ * Callback to open the specified WAL segment for reading. ->seg.ws_file
+ * shall be set to the file descriptor of the opened segment. In case of
* failure, an error shall be raised by the callback and it shall not
* return.
*
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 68ce815476..b7bdc5db34 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,7 +50,7 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
-extern int wal_segment_open(XLogReaderState *state,
+extern void wal_segment_open(XLogReaderState *state,
XLogSegNo nextSegNo,
WALSegmentContext *segcxt,
TimeLineID *tli_p);
--
2.20.1
Alvaro Herrera <alvherre@2ndquadrant.com> wrote:
On 2020-May-08, Kyotaro Horiguchi wrote:
I agree to the direction of this patch. Thanks for the explanation.
The patch looks good to me except the two points below.Thanks! I pushed the patch.
I tried to follow the discussion but haven't had better idea. This looks
better than the previous version. Thanks!
While looking at the changes, I've noticed a small comment issue in the
XLogReaderRoutine structure definition:
* "tli_p" is an input/output argument. XLogRead() uses it to pass the
The XLogRead() function has been renamed to WALRead() in 0dc8ead4. (This
incorrect comment was actually introduced by that commit.)
--
Antonin Houska
Web: https://www.cybertec-postgresql.com
Hi Antonin, thanks for the review.
On 2020-May-11, Antonin Houska wrote:
While looking at the changes, I've noticed a small comment issue in the
XLogReaderRoutine structure definition:* "tli_p" is an input/output argument. XLogRead() uses it to pass the
The XLogRead() function has been renamed to WALRead() in 0dc8ead4. (This
incorrect comment was actually introduced by that commit.)
Ah. I'll fix this, thanks for pointing it out.
(It might be that the TLI situation can be improved with some callback,
too.)
--
�lvaro Herrera https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services