Re: Rework LogicalOutputPluginWriterUpdateProgress

Started by Andres Freundalmost 3 years ago36 messages
#1Andres Freund
Andres Freund
andres@anarazel.de

Hi,

This is a reply to:
/messages/by-id/CAA4eK1+DB66cYRRVyGcaMm7+tQ_u=q=+HWGjpu9X0pqMFWbsZQ@mail.gmail.com
split off, so patches to address some of my concerns don't confuse cfbot.

On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:

On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

Attached is a current, quite rough, prototype. It addresses some of the points
raised, but far from all. There's also several XXXs/FIXMEs in it. I changed
the file-ending to .txt to avoid hijacking the CF entry.

I have started a separate thread to avoid such confusion. I hope that
is fine with you.

In abstract, yes - unfortunately just changing the subject isn't going to
suffice, I'm afraid. The In-Reply-To header was still referencing the old
thread. The mail archive did see the threads as one, and I think that's what
cfbot uses as the source.

On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:

On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

Hacking on a rough prototype how I think this should rather look, I had a few
questions / remarks:

- We probably need to call UpdateProgress from a bunch of places in decode.c
as well? Indicating that we're lagging by a lot, just because all
transactions were in another database seems decidedly suboptimal.

We can do that but I think in all those cases we will reach quickly
enough back to walsender logic (WalSndLoop - that will send keepalive
if required) that we don't need to worry. After processing each
record, the logic will return back to the main loop that will send
keepalive if required.

For keepalive processing yes, for syncrep and accurate lag tracking, I don't
think that suffices? We could do that in WalSndLoop() instead I guess, but
we'd have more information about when that's useful in decode.c.

Also, while reading WAL if we need to block, it will call WalSndWaitForWal()
which will send keepalive if required.

The fast-path prevents WalSndWaitForWal() from doing that in a lot of cases.

/*
* Fast path to avoid acquiring the spinlock in case we already know we
* have enough WAL available. This is particularly interesting if we're
* far behind.
*/
if (RecentFlushPtr != InvalidXLogRecPtr &&
loc <= RecentFlushPtr)
return RecentFlushPtr;

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

I think, independent of the update_progress calls, it'd be worth investing a
bit of time into optimizing those cases, so that we don't put the changes into
the reorderbuffer in the first place. I think we find space for two flag bits
to identify the cases in the WAL, rather than needing to access the catalog to
figure it out. If we don't find space, we could add an annotation the WAL
record (making it bigger) for the two cases, because they're not the path most
important to optimize.

- Why should lag tracking only be updated at commit like points? That seems
like it adds odd discontinuinities?

We have previously experimented to call it from non-commit locations
but that turned out to give inaccurate information about Lag. See
email [1].

That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
reporting something more frequently. ISTM that
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
update lag reporting for records that don't strictly need it. I think that
decision should be made based on the LSN, and be deterministic.

- Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
WalSndWriteData() missing wal_sender_timeout <= 0 checks?

It seems we are checking that via
ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
need to check it before as well?

Either we don't need the precheck at all, or we should do it reliably. Right
now we'll have a higher overhead / some behavioural changes, if
wal_sender_timeout is disabled. That doesn't make sense.

- I don't really understand why f95d53edged55 added !end_xact to the if
condition for ProcessPendingWrites(). Is the theory that we'll end up in an
outer loop soon?

Yes. For non-empty xacts, we will anyway send a commit message. For
empty (skipped) xacts, we will send for synchronous replication case
to avoid any delay.

That seems way too dependent on the behaviour of a specific output plugin,
there's plenty use cases where you'd not need a separate message emitted at
commit time. With what I proposed we would know whether we just wrote
something, or not.

I don't think the syncrep logic in WalSndUpdateProgress really works as-is -
consider what happens if e.g. the origin filter filters out entire
transactions. We'll afaics never get to WalSndUpdateProgress(). In some cases
we'll be lucky because we'll return quickly to XLogSendLogical(), but not
reliably.

Which case are you worried about? As mentioned in one of the previous
points I thought the timeout/keepalive handling in the callers should
be enough.

Well, you added syncrep specific logic to WalSndUpdateProgress(). The same
logic isn't present in the higher level loops. If we do need that logic, we
also need to trigger it if the origin filter filters out the entire
transaction. If we don't need it, then we shouldn't have it in
WalSndUpdateProgress() either.

How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
WalSndWaitToSendPendingWrites?

I don't like those much:

We're not really waiting for the data to be sent or such, we just want to give
it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
actually are waiting for something to complete.

I don't think 'write' is a great description either, although our existing
terminology is somewhat muddled. We're waiting calling pq_flush() until
!pq_is_send_pending().

WalSndSendPending() or WalSndFlushPending()?

Greetings,

Andres Freund

#2Andres Freund
Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#1)

Hi,

Replying on the new thread. Original message at
/messages/by-id/CAA4eK1+H2m95HhzfpRkwv2-GtFwtbcVp7837X49+vs0RXX3dBA@mail.gmail.com

On 2023-02-09 15:54:19 +0530, Amit Kapila wrote:

One thing to note about the changes we are discussing here is that
some of the plugins like wal2json already call
OutputPluginUpdateProgress in their commit callback. They may need to
update it accordingly.

It was a fundamental mistake to add OutputPluginUpdateProgress(). I don't like
causing unnecessary breakage, but this seems necessary.

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

Likely WalSndUpdateProgress() should not do anything if
did_write && !finished_xact.

The reason why we don't skip sending prepare for empty 2PC xacts is that if
the WALSender restarts after the PREPARE of a transaction and before the
COMMIT PREPARED of the same transaction then we won't be able to figure out
if we have skipped sending BEGIN/PREPARE of a transaction.

It's probably not a good idea to skip sending 2PC state changes anyway, at
least when used for replication, rather than CDC type use cases.

But I again think that that's not something the core system can assume.

I'm sad that we went so far down a pretty obviously bad rabbit hole. Adding
incrementally more of the progress calls to pgoutput, and knowing that
wal2json also added some, should have run some pretty large alarm bells.

To skip sending prepare for empty xacts, we previously thought of some ideas
like (a) At commit-prepare time have a check on the subscriber-side to know
whether there is a corresponding prepare for it before actually doing
commit-prepare but that sounded costly. (b) somehow persist the information
whether the PREPARE for a xact is already sent and then use that information
for commit prepared but again that also didn't sound like a good idea.

I don't think it's worth optimizing this. However, the explanation for why
we're not skipping empty prepared xacts needs to be added to
pgoutput_prepare_txn() etc.

Greetings,

Andres Freund

#3Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#2)

On Sat, Feb 11, 2023 at 3:04 AM Andres Freund <andres@anarazel.de> wrote:

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

I would like to clarify a few things about the proposed approach. In
commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
ctx->did_write = false;, then call the commit/prepare callback (which
will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
update_progress() which will make decisions based on ctx->did_write
flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
should know that the transaction has performed some writes before that
call which is currently working because pgoutput is tracking the same
via sent_begin_txn. Is the intention here that we still track whether
BEGIN () has been sent via pgoutput?

--
With Regards,
Amit Kapila.

#4Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#1)

On Sat, Feb 11, 2023 at 2:34 AM Andres Freund <andres@anarazel.de> wrote:

On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:

On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

Hacking on a rough prototype how I think this should rather look, I had a few
questions / remarks:

- We probably need to call UpdateProgress from a bunch of places in decode.c
as well? Indicating that we're lagging by a lot, just because all
transactions were in another database seems decidedly suboptimal.

We can do that but I think in all those cases we will reach quickly
enough back to walsender logic (WalSndLoop - that will send keepalive
if required) that we don't need to worry. After processing each
record, the logic will return back to the main loop that will send
keepalive if required.

For keepalive processing yes, for syncrep and accurate lag tracking, I don't
think that suffices? We could do that in WalSndLoop() instead I guess, but
we'd have more information about when that's useful in decode.c.

Yeah, I think one possibility to address that is to call
update_progress() in DecodeCommit() and friends when we need to skip
the xact. We decide that in DecodeTXNNeedSkip. In the checks in that
function, I am not sure whether we need to call it for the case where
we skip the xact because we decide that it was previously decoded.

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

I think, independent of the update_progress calls, it'd be worth investing a
bit of time into optimizing those cases, so that we don't put the changes into
the reorderbuffer in the first place. I think we find space for two flag bits
to identify the cases in the WAL, rather than needing to access the catalog to
figure it out. If we don't find space, we could add an annotation the WAL
record (making it bigger) for the two cases, because they're not the path most
important to optimize.

- Why should lag tracking only be updated at commit like points? That seems
like it adds odd discontinuinities?

We have previously experimented to call it from non-commit locations
but that turned out to give inaccurate information about Lag. See
email [1].

That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
reporting something more frequently. ISTM that
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
update lag reporting for records that don't strictly need it. I think that
decision should be made based on the LSN, and be deterministic.

- Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
WalSndWriteData() missing wal_sender_timeout <= 0 checks?

It seems we are checking that via
ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
need to check it before as well?

Either we don't need the precheck at all, or we should do it reliably. Right
now we'll have a higher overhead / some behavioural changes, if
wal_sender_timeout is disabled. That doesn't make sense.

Fair enough, we can probably do it earlier.

How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
WalSndWaitToSendPendingWrites?

I don't like those much:

We're not really waiting for the data to be sent or such, we just want to give
it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
actually are waiting for something to complete.

I don't think 'write' is a great description either, although our existing
terminology is somewhat muddled. We're waiting calling pq_flush() until
!pq_is_send_pending().

WalSndSendPending() or WalSndFlushPending()?

Either of those sounds fine.

--
With Regards,
Amit Kapila.

#5Andres Freund
Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#3)

Hi,

On 2023-02-13 08:22:34 +0530, Amit Kapila wrote:

On Sat, Feb 11, 2023 at 3:04 AM Andres Freund <andres@anarazel.de> wrote:

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

I would like to clarify a few things about the proposed approach. In
commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
ctx->did_write = false;, then call the commit/prepare callback (which
will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
update_progress() which will make decisions based on ctx->did_write
flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
should know that the transaction has performed some writes before that
call which is currently working because pgoutput is tracking the same
via sent_begin_txn.

I don't really see these as being related. What pgoutput does internally to
optimize for some usecases shouldn't matter to the larger infrastructure.

Is the intention here that we still track whether BEGIN () has been sent via
pgoutput?

Yes. If somebody later wants to propose tracking this alongside a txn and
passing that to the output plugin callbacks, we can do that. But that's
independent of fixing the broken architecture of the progress infrastructure.

Greetings,

Andres Freund

#6Andres Freund
Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#4)

Hi,

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

Greetings,

Andres Freund

#7wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Andres Freund (#6)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Thur, Feb 14, 2023 at 2:03 AM Andres Freund <andres@anarazel.de> wrote:

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

I would like to do something for this thread. So, I am planning to update the
patch as per discussion in the email chain unless someone is already working on
it.

Regards,
Wang wei

#8wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#7)
1 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Sun, Feb 19, 2023 at 21:06 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote:

On Thur, Feb 14, 2023 at 2:03 AM Andres Freund <andres@anarazel.de> wrote:

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

I would like to do something for this thread. So, I am planning to update the
patch as per discussion in the email chain unless someone is already working on
it.

Thanks to Andres and Amit for the discussion.

Based on the discussion and Andres' WIP(in [1]/messages/by-id/20230208200235.esfoggsmuvf4pugt@awork3.anarazel.de), I made the following
modifications:
1. Some function renaming stuffs.
2. Added the threshold-related logic in the function
update_progress_and_keepalive.
3. Added the timeout-related processing of temporary data and
unlogged/foreign/system tables in the function ReorderBufferProcessTXN.
4. Improved error messages in the function OutputPluginPrepareWrite.
5. Invoked function update_progress_and_keepalive to fix sync-related problems
caused by filters such as origin in functions DecodeCommit(), DecodePrepare()
and ReorderBufferAbort();
6. Removed the invocation of function update_progress_and_keepalive in the
function begin_prepare_cb_wrapper().
7. Invoked the function update_progress_and_keepalive() in the function
stream_truncate_cb_wrapper(), just like we do in the function
truncate_cb_wrapper().
8. Removed the check of SyncRepRequested() in the syncrep logic in the function
WalSndUpdateProgressAndKeepAlive();
9. Added the check for wal_sender_timeout before using it in functions
WalSndUpdateProgressAndKeepAlive() and WalSndWriteData();

Attach the new patch.

[1]: /messages/by-id/20230208200235.esfoggsmuvf4pugt@awork3.anarazel.de

Regards,
Wang wei

Attachments:

v2-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v2-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
#9Hayato Kuroda (Fujitsu)
Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#8)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Dear Wang,

Thank you for making the patch. IIUC your patch basically can achieve that output plugin
does not have to call UpdateProgress.

I think the basic approach is as follows, is it right?

1. In *_cb_wrapper, set ctx->did_write to false
2. In OutputPluginWrite() set ctx->did_write to true.
This means that changes are really written, not skipped.
3. At the end of the transaction, call update_progress_and_keepalive().
Even if we are not at the end, check skipped count and call the function if needed.
The counter will be reset if ctx->did_write is true or we exceed the threshold.

Followings are my comments. I apologize if I missed some previous discussions.

01. logical.c

```
+static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
+                                                                                 bool finished_xact);
+
+static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
```

"struct" may be not needed.

02. UpdateDecodingProgressAndKeepalive

I think the name should be UpdateDecodingProgressAndSendKeepalive(), keepalive is not verb.
(But it's ok to ignore if you prefer the shorter name)
Same thing can be said for the name of datatype and callback.

03. UpdateDecodingProgressAndKeepalive

```
+       /* set output state */
+       ctx->accept_writes = false;
+       ctx->write_xid = xid;
+       ctx->write_location = lsn;
+       ctx->did_write = false;
```

Do we have to modify accept_writes, write_xid, and write_location here?
These value is not used in WalSndUpdateProgressAndKeepalive().

04. stream_abort_cb_wrapper

```
+ update_progress_and_keepalive(ctx, true)
```

I'm not sure, but is it correct that call update_progress_and_keepalive() with
finished_xact = true? Isn't there a possibility that streamed sub-transaciton is aborted?

05. is_skip_threshold_change

At the end of the transaction, update_progress_and_keepalive() is called directly.
Don't we have to reset change_count here?

06. ReorderBufferAbort

Assuming that the top transaction is aborted. At that time update_progress_and_keepalive()
is called in stream_abort_cb_wrapper(), an then WalSndUpdateProgressAndKeepalive()
is called at the end of ReorderBufferAbort(). Do we have to do in both?
I think stream_abort_cb_wrapper() may be not needed.

07. WalSndUpdateProgress

You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be still strange
because it will be called even if there are no pending writes.

Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
(at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be needed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

#10wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#9)
1 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Thur, Feb 23, 2023 at 18:41 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:

Dear Wang,

Thank you for making the patch. IIUC your patch basically can achieve that
output plugin
does not have to call UpdateProgress.

Thanks for your review and comments.

I think the basic approach is as follows, is it right?

1. In *_cb_wrapper, set ctx->did_write to false
2. In OutputPluginWrite() set ctx->did_write to true.
This means that changes are really written, not skipped.
3. At the end of the transaction, call update_progress_and_keepalive().
Even if we are not at the end, check skipped count and call the function if
needed.
The counter will be reset if ctx->did_write is true or we exceed the threshold.

Yes, you are right.
For the reset of the counter, please also refer to the reply to #05.

Followings are my comments. I apologize if I missed some previous discussions.

01. logical.c

```
+static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
+                                                                                 bool finished_xact);
+
+static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
```

"struct" may be not needed.

Removed.

02. UpdateDecodingProgressAndKeepalive

I think the name should be UpdateDecodingProgressAndSendKeepalive(),
keepalive is not verb.
(But it's ok to ignore if you prefer the shorter name)
Same thing can be said for the name of datatype and callback.

Yes, I prefer the shorter one. Otherwise, I think some names would be longer.

03. UpdateDecodingProgressAndKeepalive

```
+       /* set output state */
+       ctx->accept_writes = false;
+       ctx->write_xid = xid;
+       ctx->write_location = lsn;
+       ctx->did_write = false;
```

Do we have to modify accept_writes, write_xid, and write_location here?
These value is not used in WalSndUpdateProgressAndKeepalive().

I think it might be better to set these three flags.
Since LogicalOutputPluginWriterUpdateProgressAndKeepalive is an open callback, I
think setting write_xid and write_location is not just for the function
WalSndUpdateProgressAndKeepalive. And I think setting accept_writes could
prevent some wrong usage.

04. stream_abort_cb_wrapper

```
+ update_progress_and_keepalive(ctx, true)
```

I'm not sure, but is it correct that call update_progress_and_keepalive() with
finished_xact = true? Isn't there a possibility that streamed sub-transaciton is
aborted?

Fixed.

05. is_skip_threshold_change

At the end of the transaction, update_progress_and_keepalive() is called directly.
Don't we have to reset change_count here?

I think this might complicate the function is_skip_threshold_change, so I didn't
reset the counter in this case.
I think the worst case of not resetting the counter is to delay sending the
keepalive message for the next transaction. But since the threshold we're using
is safe enough, it seems fine to me not to reset the counter in this case.
Added these related comments in the function is_skip_threshold_change.

06. ReorderBufferAbort

Assuming that the top transaction is aborted. At that time
update_progress_and_keepalive()
is called in stream_abort_cb_wrapper(), an then
WalSndUpdateProgressAndKeepalive()
is called at the end of ReorderBufferAbort(). Do we have to do in both?
I think stream_abort_cb_wrapper() may be not needed.

Yes, I think we only need one call for this case.
To make the behavior in *_cb_wrapper look consistent, I avoided the second call
for this case in the function ReorderBufferAbort.

07. WalSndUpdateProgress

You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be
still strange
because it will be called even if there are no pending writes.

Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
(at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be
needed.

I think after sending the keepalive message (in WalSndKeepaliveIfNecessary), we
need to make sure the pending data is flushed through the loop.

Attach the new patch.

Regards,
Wang wei

Attachments:

v3-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v3-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
#11Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: wangw.fnst@fujitsu.com (#8)

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;

/* in streaming mode, stream_stop_cb is required */

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
  if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better? Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

~~~

8. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

~~~

9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
* changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required. Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

~~~

11. update_progress_and_keepalive

/*
* Update progress tracking and send keep alive (if required).
*/
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)

======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

======
src/backend/replication/walsender.c

~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
     now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
    wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#12Takamichi Osumi (Fujitsu)
Takamichi Osumi (Fujitsu)
osumi.takamichi@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#10)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Hi,

On Monday, February 27, 2023 6:30 PM wangw.fnst@fujitsu.com <wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

Thanks for sharing v3. Minor review comments and question.

(1) UpdateDecodingProgressAndKeepalive header comment

The comment should be updated to explain maybe why we reset some other flags as discussed in [1]/messages/by-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9@OS3PR01MB6275.jpnprd01.prod.outlook.com and the functionality to update and keepalive of the function simply.

(2) OutputPluginPrepareWrite

Probably the changed error string is too wide.

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
        if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+               elog(ERROR, "writes are only accepted in callbacks in the OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and filter_prepare callbacks)");

I thought you can break the error message into two string lines. Or, you can rephrase it to different expression.

(3) Minor question

The patch introduced the goto statements into the cb_wrapper functions. Is the purpose to call the update_progress_and_keepalive after pop the error stack, even if the corresponding callback is missing ? I thought we can just have "else" clause for the check of the existence of callback, but did you choose the current goto style for readability ?

(4) Name of is_skip_threshold_change

I also feel the name of is_skip_threshold_change can be changed to "exceeded_keepalive_threshold" or something. Other candidates are proposed by Peter-san in [2]/messages/by-id/CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com.

[1]: /messages/by-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9@OS3PR01MB6275.jpnprd01.prod.outlook.com
[2]: /messages/by-id/CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com

Best Regards,
Takamichi Osumi

#13wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Peter Smith (#11)
1 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote:

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

Thanks for your comments.

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced
in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

Since I think we can know the meaning of the input based on the parameter name
of the function, I think both approaches are fine. But the approach in the
current patch can reduce a member of the structure, so I think this modification
looks good to me.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;

/* in streaming mode, stream_stop_cb is required */

Removed.

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

Since I want to keep the function update_progress_and_keepalive simple, I didn't
change it.

~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~
5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.
~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

I'm not sure if 'do_update_keepalive' is accurate. So, to distinguish this
function from the parameter, I renamed the function to
'UpdateProgressAndKeepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better?

I tried to improve this message in the new patch. Do you have any suggestions to
improve it?

Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

I'm not sure if we should change errors to assertions here.

~~~

8. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

I think this FIXME seems to be another topic and I will delete this FIXME later.

~~~

9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

Renamed this function to is_keepalive_threshold_exceeded.

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
* changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required. Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

Fixed.
I think the second suggestion looks better to me.

~~~

11. update_progress_and_keepalive

/*
* Update progress tracking and send keep alive (if required).
*/
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)

I think these two approaches are fine. But because I think the approach in the
current patch is consistent with the style of other functions, I didn't change
it.

======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb-

private_data,

+    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

It seems confusing to use txn->toplevel_xid to check whether it is top
transaction. Because the comment of txn->toptxn shows the meaning of value, I
updated the patch to use txn->toptxn to check this.

======
src/backend/replication/walsender.c
~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.

I think this function is used to flush pending data or send keepalive message.
But I'm not sure if we should add keepalive related string to the function
name, which seems to make this function name too long.

Attach the new patch.

Regards,
Wang wei

Attachments:

v4-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v4-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
#14wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Takamichi Osumi (Fujitsu) (#12)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Tues, Feb 28, 2023 at 11:31 AM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:

Hi,

On Monday, February 27, 2023 6:30 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

Thanks for sharing v3. Minor review comments and question.

Thanks for your comments.

(1) UpdateDecodingProgressAndKeepalive header comment

The comment should be updated to explain maybe why we reset some other
flags as discussed in [1] and the functionality to update and keepalive of the
function simply.

Added the comments atop the function UpdateDecodingProgressAndKeepalive about
when to call this function.

(2) OutputPluginPrepareWrite

Probably the changed error string is too wide.

@@ -662,7 +657,7 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change
callbacks");
+               elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and
filter_prepare callbacks)");

I thought you can break the error message into two string lines. Or, you can
rephrase it to different expression.

I tried to improve this message and broke it into two lines in the new patch.

(3) Minor question

The patch introduced the goto statements into the cb_wrapper functions. Is the
purpose to call the update_progress_and_keepalive after pop the error stack,
even if the corresponding callback is missing ? I thought we can just have "else"
clause for the check of the existence of callback, but did you choose the current
goto style for readability ?

I think both styles look fine to me.
I haven't modified this for this version. I'll reconsider if anyone else has
similar thoughts later.

(4) Name of is_skip_threshold_change

I also feel the name of is_skip_threshold_change can be changed to
"exceeded_keepalive_threshold" or something. Other candidates are proposed
by Peter-san in [2].

Renamed this function to is_keepalive_threshold_exceeded.

Please see the new patch in [1]/messages/by-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com.

[1]: /messages/by-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

#15Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: wangw.fnst@fujitsu.com (#13)

On Wed, Mar 1, 2023 at 9:16 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote:

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

Thanks for your comments.

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced
in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

Since I think we can know the meaning of the input based on the parameter name
of the function, I think both approaches are fine. But the approach in the
current patch can reduce a member of the structure, so I think this modification
looks good to me.

Hmm, I am not so sure:

- Why is reducing members of LogicalDecodingContext even a goal? I
thought the LogicalDecodingContext is intended to be the one-stop
place to hold *all* things related to the "Context" (including that
member that was deleted).

- How is reducing one member better than introducing one new parameter
in multiple calls?

Anyway, I think this exposes another problem. If you still want the
patch to pass the 'finshed_xact' parameter separately then AFAICT the
first parameter (ctx) now becomes unused/redundant in the
WalSndUpdateProgressAndKeepalive function, so it ought to be removed.

======
src/backend/replication/logical/logical.c

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

Since I want to keep the function update_progress_and_keepalive simple, I didn't
change it.

Hmm, the reason given seems like a false economy to me. You are able
to keep this 1 function simpler only by adding more complexity to the
calls in 6 other places. Let's see if other people have opinions about
this.

~~~

1.
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+    bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);

1a.
There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

~

1b.
I did not recognize a reason for the different naming conventions.
Here are two new functions but one is CamelCase and one is snake_case.
What are the rules to decide the naming?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#16Andres Freund
Andres Freund
andres@anarazel.de
In reply to: Peter Smith (#15)

Hi,

On 2023-03-03 11:18:04 +1100, Peter Smith wrote:

- Why is reducing members of LogicalDecodingContext even a goal? I
thought the LogicalDecodingContext is intended to be the one-stop
place to hold *all* things related to the "Context" (including that
member that was deleted).

There's not really a reason to keep it in LogicalDecodingContext after
this change. It was only needed there because of the broken
architectural model of calling UpdateProgress from within output
plugins. Why set a field in each wrapper that we don't need?

- How is reducing one member better than introducing one new parameter
in multiple calls?

Reducing the member isn't important, needing to set it before each
callback however makes sense.

Greetings,

Andres Freund

#17houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#15)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Friday, March 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Wed, Mar 1, 2023 at 9:16 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com>

wrote:

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

Thanks for your comments.

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change
in the common code, we removed the

LogicalDecodingContext->end_xact

introduced in commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement.
IIUC, it removes the "unnecessary" member, but only does that by
replacing it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less
code, but it is less readable code now because you need to know what
the true/false parameter means. I wonder if it would have been
better just to leave this how it was.

Since I think we can know the meaning of the input based on the
parameter name of the function, I think both approaches are fine. But
the approach in the current patch can reduce a member of the
structure, so I think this modification looks good to me.

...

Anyway, I think this exposes another problem. If you still want the patch to pass
the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
function, so it ought to be removed.

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Best Regards,
Hou zj

#18Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: houzj.fnst@fujitsu.com (#17)

On Fri, Mar 3, 2023 at 1:27 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Friday, March 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:

...

Anyway, I think this exposes another problem. If you still want the patch to pass
the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
function, so it ought to be removed.

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Oh right, that makes sense. Thanks.

Perhaps it just wants some comment to mention that although the
built-in implementation does not use the 'ctx' users might implement
their own logic which does use it.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#19wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Peter Smith (#18)
1 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Fri, Mar 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:

Thanks for your comments.

1.
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+    bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);

1a.
There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

Removed.

~

1b.
I did not recognize a reason for the different naming conventions.
Here are two new functions but one is CamelCase and one is snake_case.
What are the rules to decide the naming?

I used the snake_case style for the function UpdateProgressAndKeepalive in the
previous version, but it was confusing because it shared the same parameter name
with the functions StartupDecodingContext, CreateInitDecodingContext and
CreateDecodingContext. To avoid this confusion, and since both naming styles
exist in this file, I changed it to CamelCase style.

Attach the new patch.

Regards,
Wang wei

Attachments:

v5-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v5-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
#20Hayato Kuroda (Fujitsu)
Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#19)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Dear Wang,

Thank you for updating the patch! Followings are my comments.

---
01. missing comments
You might miss the comment from Peter[1]/messages/by-id/CAHut+PsksiQHuv4A54R4w79TAvCu__PcuffKYY0V96e2z_sEvA@mail.gmail.com. Or could you pin the related one?

---
02. LogicalDecodingProcessRecord()

Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
decoding function? Assuming that the timeout parameter does not have enough time
period and there are so many sequential operations in the transaction. At that time
there may be a possibility that timeout is occurred while calling ReorderBufferProcessXid()
several times. It may be a bad example, but I meant to say that we may have to
consider the case that decoding function has not implemented yet.

---
03. stream_*_cb_wrapper

Only stream_*_cb_wrapper have comments "don't call update progress, we didn't really make any", but
there are more functions that does not send updates. Do you have any reasons why only they have?

[1]: /messages/by-id/CAHut+PsksiQHuv4A54R4w79TAvCu__PcuffKYY0V96e2z_sEvA@mail.gmail.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

#21wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#20)
1 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Tue, Mar 7, 2023 15:55 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:

Dear Wang,

Thank you for updating the patch! Followings are my comments.

Thanks for your comments.

---
01. missing comments
You might miss the comment from Peter[1]. Or could you pin the related one?

Since I think the functions WalSndPrepareWrite and WalSndWriteData have similar
parameters and the HEAD has no related comments, I'm not sure whether we should
add them in this patch, or in a separate patch to comment atop these callback
functions or where they are called.

---
02. LogicalDecodingProcessRecord()

Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
decoding function? Assuming that the timeout parameter does not have enough
time
period and there are so many sequential operations in the transaction. At that
time
there may be a possibility that timeout is occurred while calling
ReorderBufferProcessXid()
several times. It may be a bad example, but I meant to say that we may have to
consider the case that decoding function has not implemented yet.

I think it's ok in this function. If the decoding function has not been
implemented for a record, I think we quickly return to the loop in the function
WalSndLoop, where it will try to send the keepalive message.

BTW, in the previous discussion [1]/messages/by-id/20230213180302.u5sqosteflr3zkiz@awork3.anarazel.de, we decided to ignore some paths, because
the gain from modifying them may not be so great.

---
03. stream_*_cb_wrapper

Only stream_*_cb_wrapper have comments "don't call update progress, we
didn't really make any", but
there are more functions that does not send updates. Do you have any reasons
why only they have?

Added this comment to more functions.
I think the following six functions don't call the function
UpdateProgressAndKeepalive in v5 patch:
- begin_cb_wrapper
- begin_prepare_cb_wrapper
- startup_cb_wrapper
- shutdown_cb_wrapper
- filter_prepare_cb_wrapper
- filter_by_origin_cb_wrapper

I think the comment you mentioned means that no new progress needs to be updated
in this *_cb_wrapper. Also, I think we don't need to update the progress at the
beginning of a transaction, just like in HEAD. So, I added the same comment only
in the 4 functions below:
- startup_cb_wrapper
- shutdown_cb_wrapper
- filter_prepare_cb_wrapper
- filter_by_origin_cb_wrapper

Attach the new patch.

[1]: /messages/by-id/20230213180302.u5sqosteflr3zkiz@awork3.anarazel.de

Regards,
Wang wei

Attachments:

v6-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v6-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
#22Hayato Kuroda (Fujitsu)
Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#21)
1 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Dear Wang,

Thank you for updating the patch! I have briefly tested your patch
and it worked well in following case.

* WalSndUpdateProgressAndKeepalive is called when many inserts have come
but the publisher does not publish the insertion. PSA the script for this.
* WalSndUpdateProgressAndKeepalive is called when the commit record is not
related with the specified database
* WalSndUpdateProgressAndKeepalive is called when many inserts for unlogged
tables are done.

---
01. missing comments
You might miss the comment from Peter[1]. Or could you pin the related one?

Since I think the functions WalSndPrepareWrite and WalSndWriteData have
similar
parameters and the HEAD has no related comments, I'm not sure whether we
should
add them in this patch, or in a separate patch to comment atop these callback
functions or where they are called.

Make sense, OK.

---
02. LogicalDecodingProcessRecord()

Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
decoding function? Assuming that the timeout parameter does not have enough
time
period and there are so many sequential operations in the transaction. At that
time
there may be a possibility that timeout is occurred while calling
ReorderBufferProcessXid()
several times. It may be a bad example, but I meant to say that we may have to
consider the case that decoding function has not implemented yet.

I think it's ok in this function. If the decoding function has not been
implemented for a record, I think we quickly return to the loop in the function
WalSndLoop, where it will try to send the keepalive message.

I confirmed that and yes, we will go back to WalSndLoop().

BTW, in the previous discussion [1], we decided to ignore some paths, because
the gain from modifying them may not be so great.

I missed the discussion, thanks. Based on that codes seems right.

Followings are my comments.

---
```
+/*
+ * Update progress tracking and send keep alive (if required).
+ */
+static void
+UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
```

Can we add atop the UpdateProgressAndKeepalive()? Currently the developers who
create output plugins must call OutputPluginUpdateProgress(), but from now the
function is not only renamed but does not have nessesary to call from plugin
(of cource we do not restrict to call it). I think it must be clarified for them.

---
ReorderBufferUpdateProgressTxnCB must be removed from typedefs.list.

---
Do we have to write a document for the breakage somewhere? I think we do not have
to add appendix-obsolete-* file because we did not have any links for that, but
we can add a warning in "Functions for Producing Output" subsection if needed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

test.shapplication/octet-stream; name=test.sh
#23Takamichi Osumi (Fujitsu)
Takamichi Osumi (Fujitsu)
osumi.takamichi@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#21)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Hi,

On Wednesday, March 8, 2023 11:54 AM From: wangw.fnst@fujitsu.com <wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

Thanks for sharing v6 ! Few minor comments for the same.

(1) commit message

The old function name 'is_skip_threshold_change' is referred currently. We need to update it to 'is_keepalive_threshold_exceeded' I think.

(2) OutputPluginPrepareWrite

@@ -662,7 +656,8 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
        if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+               elog(ERROR, "writes are only accepted in output plugin callbacks, "
+                        "except startup, shutdown, filter_by_origin, and filter_prepare.");

We can remove the period at the end of error string.

(3) is_keepalive_threshold_exceeded's comments

+/*
+ * Helper function to check whether a large number of changes have been skipped
+ * continuously.
+ */
+static bool
+is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)

I suggest to update the comment slightly something like below.
From:
...whether a large number of changes have been skipped continuously
To:
...whether a large number of changes have been skipped without being sent to the output plugin continuously

(4) term for 'keepalive'

+/*
+ * Update progress tracking and send keep alive (if required).
+ */

The 'keep alive' might be better to be replaced with 'keepalive', which looks commonest in other source codes. In the current patch, there are 3 different ways to express it (the other one is 'keep-alive') and it would be better to unify the term, at least within the same patch ?

Best Regards,
Takamichi Osumi

#24Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: wangw.fnst@fujitsu.com (#21)

Here are some review comments for v6-0001

======
General.

1.
There are lots of new comments saying:
/* don't call update progress, we didn't really make any */

but is the wording "call update progress" meaningful?

Should that be written something more like:
/* No progress has been made so there is no need to call
UpdateProgressAndKeepalive. */

======

2. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
  ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch? I thought it
was posted a while ago (See [1]/messages/by-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com comment #8) that this would be
deleted.

~~~

4.

@@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

  /* Pop the error context stack */
  error_context_stack = errcallback.previous;
+
+ UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
 }

~

Are the double parentheses necessary?

~~~

5. UpdateProgressAndKeepalive

I had previously suggested (See [2]/messages/by-id/CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com comment #3) that the code might be
simplified if the "is_keepalive_threshold_exceeded(ctx)" check was
pushed down into this function, but it seems like nobody else gave any
opinion for/against that idea yet... so the question still stands.

======
src/backend/replication/walsender.c

6. WalSndUpdateProgressAndKeepalive

Since the 'ctx' is unused here, it might be nicer to annotate that to
make it clear it is deliberate and suppress any possible warnings
about unused params.

e.g. something like:

WalSndUpdateProgressAndKeepalive(
pg_attribute_unused() LogicalDecodingContext *ctx,
XLogRecPtr lsn,
TransactionId xid,
bool did_write,
bool finished_xact)

------
[1]: /messages/by-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com
[2]: /messages/by-id/CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia.

#25Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: wangw.fnst@fujitsu.com (#21)

On Wed, Mar 8, 2023 at 8:24 AM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

I think this combines multiple improvements in one patch. We can
consider all of them together or maybe it would be better to split
some of those. Do we think it makes sense to split some of the
improvements? I could think of below:

1. Remove SyncRepRequested() check from WalSndUpdateProgress().
2. Add check of wal_sender_timeout > 0 in WalSndUpdateProgress() and
any other similar place.
3. Change the name of ProcessPendingWrites() to WalSndSendPending().
4. Change WalSndUpdateProgress() to WalSndUpdateProgressAndKeepalive().
5. The remaining patch.

Now, for (1), we can consider backpatching but I am not sure if it is
worth it because in the worst case, we will miss sending a keepalive.
For (4), it is not clear to me that we have a complete agreement on
the new name. Andres, do you have an opinion on the new name used in
the patch?

If we agree that we don't need to backpatch for (1) and the new name
for (4) is reasonable then we can commit 1-4 as one patch and then
look at the remaining patch.

Thoughts?

--
With Regards,
Amit Kapila.

#26Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#24)

On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com> wrote:

2. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch?

After reading this Fixme comment and the error message ("logical
replication at prepare time requires a %s callback
rollback_prepared_cb"), I think we can move this and a similar check
in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
function. This is because there is no use of letting prepare pass when
we can't do a rollback or commit prepared. What do you think?

4.

@@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+ UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
}

~

Are the double parentheses necessary?

Personally, I find this style easier to follow.

--
With Regards,
Amit Kapila.

#27Peter Smith
Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#26)

On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com> wrote:

2. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch?

After reading this Fixme comment and the error message ("logical
replication at prepare time requires a %s callback
rollback_prepared_cb"), I think we can move this and a similar check
in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
function. This is because there is no use of letting prepare pass when
we can't do a rollback or commit prepared. What do you think?

My first impression was it sounds like a good idea to catch the
missing callbacks early as you said.

But if you decide to check for missing commit/rollback callbacks early
in prepare_cb_wrapper(), then won't you also want to have equivalent
checking done earlier for stream_prepare_cb_wrapper()?

And then it quickly becomes a slippery slope to question many other things:
- Why allow startup_cb if shutdown_cb is missing?
- Why allow change_cb if commit_cb or rollback_cb is missing?
- Why allow filter_prepare_cb if prepare_cb is missing?
- etc.

~

So I am wondering if the HEAD code lazy-check of the callback only at
the point where it is needed was actually a deliberate design choice
just to be simpler - e.g. we don't need to be so concerned about any
other callback dependencies.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#28Amit Kapila
Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#27)

On Fri, Mar 10, 2023 at 11:17 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com> wrote:

2. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch?

After reading this Fixme comment and the error message ("logical
replication at prepare time requires a %s callback
rollback_prepared_cb"), I think we can move this and a similar check
in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
function. This is because there is no use of letting prepare pass when
we can't do a rollback or commit prepared. What do you think?

My first impression was it sounds like a good idea to catch the
missing callbacks early as you said.

But if you decide to check for missing commit/rollback callbacks early
in prepare_cb_wrapper(), then won't you also want to have equivalent
checking done earlier for stream_prepare_cb_wrapper()?

Yeah, probably or we can leave the lazy checking as it is. In the
ideal case, we could check for the presence of all the callbacks in
StartupDecodingContext() but we delay it to find the missing methods
later. One possibility is that we check for any missing method in
StartupDecodingContext() if any one of prepare/streaming calls are
present but not sure if that is any better than the current
arrangement.

And then it quickly becomes a slippery slope to question many other things:
- Why allow startup_cb if shutdown_cb is missing?

I am not sure if there is a hard dependency between these two but
their callers do check for Null before invoking those.

- Why allow change_cb if commit_cb or rollback_cb is missing?

We have a check for change_cb and commit_cb in LoadOutputPlugin. Do we
have rollback_cb() defined at all?

- Why allow filter_prepare_cb if prepare_cb is missing?

I am not so sure about this but If prepare gets filtered, we don't
need to invoke prepare_cb.

- etc.

~

So I am wondering if the HEAD code lazy-check of the callback only at
the point where it is needed was actually a deliberate design choice
just to be simpler - e.g. we don't need to be so concerned about any
other callback dependencies.

Yeah, changing that probably needs some more thought. I have mentioned
one of the possibilities above.

--
With Regards,
Amit Kapila.

#29wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Amit Kapila (#25)
6 attachment(s)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Mon, Mar 10, 2023 11:56 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Mar 8, 2023 at 8:24 AM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

I think this combines multiple improvements in one patch. We can
consider all of them together or maybe it would be better to split
some of those. Do we think it makes sense to split some of the
improvements? I could think of below:

1. Remove SyncRepRequested() check from WalSndUpdateProgress().
2. Add check of wal_sender_timeout > 0 in WalSndUpdateProgress() and
any other similar place.
3. Change the name of ProcessPendingWrites() to WalSndSendPending().
4. Change WalSndUpdateProgress() to WalSndUpdateProgressAndKeepalive().
5. The remaining patch.

I think it would help to review different improvements separately, so I split
the patch as suggested.

Also addressed the comments by Kuroda-san, Osumi-san and Peter.
Attach the new patch set.

Regards,
Wang wei

Attachments:

v7-0001-Remove-SyncRepRequested-check-from-WalSndUpdatePr.patchapplication/octet-stream; name=v7-0001-Remove-SyncRepRequested-check-from-WalSndUpdatePr.patch
v7-0002-Check-wal_sender_timeout-is-in-effect-before-usin.patchapplication/octet-stream; name=v7-0002-Check-wal_sender_timeout-is-in-effect-before-usin.patch
v7-0003-Rename-the-function-ProcessPendingWrites-to-WalSn.patchapplication/octet-stream; name=v7-0003-Rename-the-function-ProcessPendingWrites-to-WalSn.patch
v7-0004-Rename-the-function-WalSndUpdateProgress-to-WalSn.patchapplication/octet-stream; name=v7-0004-Rename-the-function-WalSndUpdateProgress-to-WalSn.patch
v7-0005-Rework-LogicalOutputPluginWriterUpdateProgressAnd.patchapplication/octet-stream; name=v7-0005-Rework-LogicalOutputPluginWriterUpdateProgressAnd.patch
v7-0006-Catch-the-absence-of-commit-rollback_prepared_cb_.patchapplication/octet-stream; name=v7-0006-Catch-the-absence-of-commit-rollback_prepared_cb_.patch
#30wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#22)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Wed, Mar 8, 2023 19:06 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:

Dear Wang,

Thanks for your testing and comments.

---
```
+/*
+ * Update progress tracking and send keep alive (if required).
+ */
+static void
+UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
```

Can we add atop the UpdateProgressAndKeepalive()? Currently the developers
who
create output plugins must call OutputPluginUpdateProgress(), but from now the
function is not only renamed but does not have nessesary to call from plugin
(of cource we do not restrict to call it). I think it must be clarified for them.

Make sense.
Added some comments atop this function.

---
ReorderBufferUpdateProgressTxnCB must be removed from typedefs.list.

Removed.

---
Do we have to write a document for the breakage somewhere? I think we do not
have
to add appendix-obsolete-* file because we did not have any links for that, but
we can add a warning in "Functions for Producing Output" subsection if needed.

Since we've moved the feature (update progress and send keepalive) from the
output plugin into the infrastructure, the output plugin is no longer
responsible for maintaining this feature anymore. Also, I think output plugin
developers only need to remove the call to the old function
OutputPluginUpdateProgress if they get compile errors related to this
modification. So, it seems to me that we don't need to add relevant
modifications in pg-doc.

Regards,
Wang wei

#31wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Takamichi Osumi (Fujitsu) (#23)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Wed, Mar 8, 2023 23:55 PM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:

Hi,

On Wednesday, March 8, 2023 11:54 AM From: wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

Thanks for sharing v6 ! Few minor comments for the same.

Thanks for your comments.

(1) commit message

The old function name 'is_skip_threshold_change' is referred currently. We need
to update it to 'is_keepalive_threshold_exceeded' I think.

Fixed.

(2) OutputPluginPrepareWrite

@@ -662,7 +656,8 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change
callbacks");
+               elog(ERROR, "writes are only accepted in output plugin callbacks, "
+                        "except startup, shutdown, filter_by_origin, and filter_prepare.");

We can remove the period at the end of error string.

Removed.

(3) is_keepalive_threshold_exceeded's comments

+/*
+ * Helper function to check whether a large number of changes have been
skipped
+ * continuously.
+ */
+static bool
+is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)

I suggest to update the comment slightly something like below.
From:
...whether a large number of changes have been skipped continuously
To:
...whether a large number of changes have been skipped without being sent to
the output plugin continuously

Make sense.
Also, I slightly corrected the original function comment with a grammar check
tool. So, the modified comment looks like this:
```
Helper function to check for continuous skipping of many changes without sending
them to the output plugin.
```

(4) term for 'keepalive'

+/*
+ * Update progress tracking and send keep alive (if required).
+ */

The 'keep alive' might be better to be replaced with 'keepalive', which looks
commonest in other source codes. In the current patch, there are 3 different
ways to express it (the other one is 'keep-alive') and it would be better to unify
the term, at least within the same patch ?

Yes, agree.
Unified the comment you mentioned here ('keep alive') and the comment in the
commit message ('keep-alive') as 'keepalive'.

Regards,
Wang wei

#32wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Peter Smith (#24)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Thur, Mar 9, 2023 13:26 PM Peter Smith <smithpb2250@gmail.com> wrote:

Here are some review comments for v6-0001

Thanks for your comments.

======
General.

1.
There are lots of new comments saying:
/* don't call update progress, we didn't really make any */

but is the wording "call update progress" meaningful?

Should that be written something more like:
/* No progress has been made so there is no need to call
UpdateProgressAndKeepalive. */

Changed.
Shortened your suggested comment using a grammar tool. So, the modified comment
looks like this:
```
No progress has been made, so don't call UpdateProgressAndKeepalive
```

~~~

4.

@@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+ UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
}

~

Are the double parentheses necessary?

I think the code looks clearer this way.

======
src/backend/replication/walsender.c

6. WalSndUpdateProgressAndKeepalive

Since the 'ctx' is unused here, it might be nicer to annotate that to
make it clear it is deliberate and suppress any possible warnings
about unused params.

e.g. something like:

WalSndUpdateProgressAndKeepalive(
pg_attribute_unused() LogicalDecodingContext *ctx,
XLogRecPtr lsn,
TransactionId xid,
bool did_write,
bool finished_xact)

Because many functions don't use this approach, I’m not sure what the rules are
for using it in PG. And I think that we should discuss this on a separate thread
to check which similar functions need this kind of modification in PG source
code.

Regards,
Wang wei

#33wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Amit Kapila (#28)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Mon, Mar 10, 2023 14:35 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Mar 10, 2023 at 11:17 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Mar 9, 2023 at 10:56 AM Peter Smith <smithpb2250@gmail.com>

wrote:

2. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch?

After reading this Fixme comment and the error message ("logical
replication at prepare time requires a %s callback
rollback_prepared_cb"), I think we can move this and a similar check
in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
function. This is because there is no use of letting prepare pass when
we can't do a rollback or commit prepared. What do you think?

My first impression was it sounds like a good idea to catch the
missing callbacks early as you said.

But if you decide to check for missing commit/rollback callbacks early
in prepare_cb_wrapper(), then won't you also want to have equivalent
checking done earlier for stream_prepare_cb_wrapper()?

Yeah, probably or we can leave the lazy checking as it is. In the
ideal case, we could check for the presence of all the callbacks in
StartupDecodingContext() but we delay it to find the missing methods
later. One possibility is that we check for any missing method in
StartupDecodingContext() if any one of prepare/streaming calls are
present but not sure if that is any better than the current
arrangement.

And then it quickly becomes a slippery slope to question many other things:
- Why allow startup_cb if shutdown_cb is missing?

I am not sure if there is a hard dependency between these two but
their callers do check for Null before invoking those.

- Why allow change_cb if commit_cb or rollback_cb is missing?

We have a check for change_cb and commit_cb in LoadOutputPlugin. Do we
have rollback_cb() defined at all?

- Why allow filter_prepare_cb if prepare_cb is missing?

I am not so sure about this but If prepare gets filtered, we don't
need to invoke prepare_cb.

- etc.

~

So I am wondering if the HEAD code lazy-check of the callback only at
the point where it is needed was actually a deliberate design choice
just to be simpler - e.g. we don't need to be so concerned about any
other callback dependencies.

Yeah, changing that probably needs some more thought. I have mentioned
one of the possibilities above.

I think this approach looks fine to me. So, I wrote a separate patch (0006) for
discussing and reviewing this approach.

Regards,
Wang wei

#34Takamichi Osumi (Fujitsu)
Takamichi Osumi (Fujitsu)
osumi.takamichi@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#29)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Hi,

On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote:

Attach the new patch set.

Thanks for updating the patch ! One review comment on v7-0005.

stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of threshold check and UpdateProgressAndKeepalive unlike other write wrapper functions like below. But, both of them write some data to the output plugin, set the flag of did_write and thus it updates the subscriber's last_recv_timestamp used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair to both functions can be more accurate, in order to reset the counter in changes_count on the publisher ?

@@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,

        /* Pop the error context stack */
        error_context_stack = errcallback.previous;
+
+       /* No progress has been made, so don't call UpdateProgressAndKeepalive */
 }

Best Regards,
Takamichi Osumi

#35wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Takamichi Osumi (Fujitsu) (#34)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Fri, Mar 10, 2023 20:17 PM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:

Hi,

On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com>
wrote:

Attach the new patch set.

Thanks for updating the patch ! One review comment on v7-0005.

Thanks for your comment.

stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of
threshold check and UpdateProgressAndKeepalive unlike other write wrapper
functions like below. But, both of them write some data to the output plugin, set
the flag of did_write and thus it updates the subscriber's last_recv_timestamp
used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair to
both functions can be more accurate, in order to reset the counter in
changes_count on the publisher ?

@@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+       /* No progress has been made, so don't call UpdateProgressAndKeepalive */
}

Since I think stream_start/stop_cp are different from change_cb, they don't
represent records in wal, so I think the LSNs corresponding to these two
messages are the LSNs of other records. So, we don't call the function
UpdateProgressAndKeepalive here. Also, for the reasons described in [1]/messages/by-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9@OS3PR01MB6275.jpnprd01.prod.outlook.com.#05, I
didn't reset the counter here.

[1]: /messages/by-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9@OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

#36vignesh C
vignesh C
vignesh21@gmail.com
In reply to: wangw.fnst@fujitsu.com (#35)

On Mon, 13 Mar 2023 at 08:17, wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

On Fri, Mar 10, 2023 20:17 PM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:

Hi,

On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com>
wrote:

Attach the new patch set.

Thanks for updating the patch ! One review comment on v7-0005.

Thanks for your comment.

stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of
threshold check and UpdateProgressAndKeepalive unlike other write wrapper
functions like below. But, both of them write some data to the output plugin, set
the flag of did_write and thus it updates the subscriber's last_recv_timestamp
used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair to
both functions can be more accurate, in order to reset the counter in
changes_count on the publisher ?

@@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+       /* No progress has been made, so don't call UpdateProgressAndKeepalive */
}

Since I think stream_start/stop_cp are different from change_cb, they don't
represent records in wal, so I think the LSNs corresponding to these two
messages are the LSNs of other records. So, we don't call the function
UpdateProgressAndKeepalive here. Also, for the reasons described in [1].#05, I
didn't reset the counter here.

As there has been no activity in this thread and it seems there is not
much interest on this from the last 9 months, I have changed the
status of the patch to "Returned with Feedback".

Regards,
Vignesh