Re: Rework LogicalOutputPluginWriterUpdateProgress

Started by Andres Freundabout 3 years ago36 messageshackers
Jump to latest
#1Andres Freund
andres@anarazel.de

Hi,

This is a reply to:
/messages/by-id/CAA4eK1+DB66cYRRVyGcaMm7+tQ_u=q=+HWGjpu9X0pqMFWbsZQ@mail.gmail.com
split off, so patches to address some of my concerns don't confuse cfbot.

On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:

On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

Attached is a current, quite rough, prototype. It addresses some of the points
raised, but far from all. There's also several XXXs/FIXMEs in it. I changed
the file-ending to .txt to avoid hijacking the CF entry.

I have started a separate thread to avoid such confusion. I hope that
is fine with you.

In abstract, yes - unfortunately just changing the subject isn't going to
suffice, I'm afraid. The In-Reply-To header was still referencing the old
thread. The mail archive did see the threads as one, and I think that's what
cfbot uses as the source.

On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:

On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

Hacking on a rough prototype how I think this should rather look, I had a few
questions / remarks:

- We probably need to call UpdateProgress from a bunch of places in decode.c
as well? Indicating that we're lagging by a lot, just because all
transactions were in another database seems decidedly suboptimal.

We can do that but I think in all those cases we will reach quickly
enough back to walsender logic (WalSndLoop - that will send keepalive
if required) that we don't need to worry. After processing each
record, the logic will return back to the main loop that will send
keepalive if required.

For keepalive processing yes, for syncrep and accurate lag tracking, I don't
think that suffices? We could do that in WalSndLoop() instead I guess, but
we'd have more information about when that's useful in decode.c.

Also, while reading WAL if we need to block, it will call WalSndWaitForWal()
which will send keepalive if required.

The fast-path prevents WalSndWaitForWal() from doing that in a lot of cases.

/*
* Fast path to avoid acquiring the spinlock in case we already know we
* have enough WAL available. This is particularly interesting if we're
* far behind.
*/
if (RecentFlushPtr != InvalidXLogRecPtr &&
loc <= RecentFlushPtr)
return RecentFlushPtr;

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

I think, independent of the update_progress calls, it'd be worth investing a
bit of time into optimizing those cases, so that we don't put the changes into
the reorderbuffer in the first place. I think we find space for two flag bits
to identify the cases in the WAL, rather than needing to access the catalog to
figure it out. If we don't find space, we could add an annotation the WAL
record (making it bigger) for the two cases, because they're not the path most
important to optimize.

- Why should lag tracking only be updated at commit like points? That seems
like it adds odd discontinuinities?

We have previously experimented to call it from non-commit locations
but that turned out to give inaccurate information about Lag. See
email [1].

That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
reporting something more frequently. ISTM that
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
update lag reporting for records that don't strictly need it. I think that
decision should be made based on the LSN, and be deterministic.

- Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
WalSndWriteData() missing wal_sender_timeout <= 0 checks?

It seems we are checking that via
ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
need to check it before as well?

Either we don't need the precheck at all, or we should do it reliably. Right
now we'll have a higher overhead / some behavioural changes, if
wal_sender_timeout is disabled. That doesn't make sense.

- I don't really understand why f95d53edged55 added !end_xact to the if
condition for ProcessPendingWrites(). Is the theory that we'll end up in an
outer loop soon?

Yes. For non-empty xacts, we will anyway send a commit message. For
empty (skipped) xacts, we will send for synchronous replication case
to avoid any delay.

That seems way too dependent on the behaviour of a specific output plugin,
there's plenty use cases where you'd not need a separate message emitted at
commit time. With what I proposed we would know whether we just wrote
something, or not.

I don't think the syncrep logic in WalSndUpdateProgress really works as-is -
consider what happens if e.g. the origin filter filters out entire
transactions. We'll afaics never get to WalSndUpdateProgress(). In some cases
we'll be lucky because we'll return quickly to XLogSendLogical(), but not
reliably.

Which case are you worried about? As mentioned in one of the previous
points I thought the timeout/keepalive handling in the callers should
be enough.

Well, you added syncrep specific logic to WalSndUpdateProgress(). The same
logic isn't present in the higher level loops. If we do need that logic, we
also need to trigger it if the origin filter filters out the entire
transaction. If we don't need it, then we shouldn't have it in
WalSndUpdateProgress() either.

How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
WalSndWaitToSendPendingWrites?

I don't like those much:

We're not really waiting for the data to be sent or such, we just want to give
it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
actually are waiting for something to complete.

I don't think 'write' is a great description either, although our existing
terminology is somewhat muddled. We're waiting calling pq_flush() until
!pq_is_send_pending().

WalSndSendPending() or WalSndFlushPending()?

Greetings,

Andres Freund

#2Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#1)

Hi,

Replying on the new thread. Original message at
/messages/by-id/CAA4eK1+H2m95HhzfpRkwv2-GtFwtbcVp7837X49+vs0RXX3dBA@mail.gmail.com

On 2023-02-09 15:54:19 +0530, Amit Kapila wrote:

One thing to note about the changes we are discussing here is that
some of the plugins like wal2json already call
OutputPluginUpdateProgress in their commit callback. They may need to
update it accordingly.

It was a fundamental mistake to add OutputPluginUpdateProgress(). I don't like
causing unnecessary breakage, but this seems necessary.

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

Likely WalSndUpdateProgress() should not do anything if
did_write && !finished_xact.

The reason why we don't skip sending prepare for empty 2PC xacts is that if
the WALSender restarts after the PREPARE of a transaction and before the
COMMIT PREPARED of the same transaction then we won't be able to figure out
if we have skipped sending BEGIN/PREPARE of a transaction.

It's probably not a good idea to skip sending 2PC state changes anyway, at
least when used for replication, rather than CDC type use cases.

But I again think that that's not something the core system can assume.

I'm sad that we went so far down a pretty obviously bad rabbit hole. Adding
incrementally more of the progress calls to pgoutput, and knowing that
wal2json also added some, should have run some pretty large alarm bells.

To skip sending prepare for empty xacts, we previously thought of some ideas
like (a) At commit-prepare time have a check on the subscriber-side to know
whether there is a corresponding prepare for it before actually doing
commit-prepare but that sounded costly. (b) somehow persist the information
whether the PREPARE for a xact is already sent and then use that information
for commit prepared but again that also didn't sound like a good idea.

I don't think it's worth optimizing this. However, the explanation for why
we're not skipping empty prepared xacts needs to be added to
pgoutput_prepare_txn() etc.

Greetings,

Andres Freund

#3Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#2)

On Sat, Feb 11, 2023 at 3:04 AM Andres Freund <andres@anarazel.de> wrote:

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

I would like to clarify a few things about the proposed approach. In
commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
ctx->did_write = false;, then call the commit/prepare callback (which
will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
update_progress() which will make decisions based on ctx->did_write
flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
should know that the transaction has performed some writes before that
call which is currently working because pgoutput is tracking the same
via sent_begin_txn. Is the intention here that we still track whether
BEGIN () has been sent via pgoutput?

--
With Regards,
Amit Kapila.

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Andres Freund (#1)

On Sat, Feb 11, 2023 at 2:34 AM Andres Freund <andres@anarazel.de> wrote:

On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:

On Thu, Feb 9, 2023 at 1:33 AM Andres Freund <andres@anarazel.de> wrote:

Hacking on a rough prototype how I think this should rather look, I had a few
questions / remarks:

- We probably need to call UpdateProgress from a bunch of places in decode.c
as well? Indicating that we're lagging by a lot, just because all
transactions were in another database seems decidedly suboptimal.

We can do that but I think in all those cases we will reach quickly
enough back to walsender logic (WalSndLoop - that will send keepalive
if required) that we don't need to worry. After processing each
record, the logic will return back to the main loop that will send
keepalive if required.

For keepalive processing yes, for syncrep and accurate lag tracking, I don't
think that suffices? We could do that in WalSndLoop() instead I guess, but
we'd have more information about when that's useful in decode.c.

Yeah, I think one possibility to address that is to call
update_progress() in DecodeCommit() and friends when we need to skip
the xact. We decide that in DecodeTXNNeedSkip. In the checks in that
function, I am not sure whether we need to call it for the case where
we skip the xact because we decide that it was previously decoded.

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

I think, independent of the update_progress calls, it'd be worth investing a
bit of time into optimizing those cases, so that we don't put the changes into
the reorderbuffer in the first place. I think we find space for two flag bits
to identify the cases in the WAL, rather than needing to access the catalog to
figure it out. If we don't find space, we could add an annotation the WAL
record (making it bigger) for the two cases, because they're not the path most
important to optimize.

- Why should lag tracking only be updated at commit like points? That seems
like it adds odd discontinuinities?

We have previously experimented to call it from non-commit locations
but that turned out to give inaccurate information about Lag. See
email [1].

That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
reporting something more frequently. ISTM that
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
update lag reporting for records that don't strictly need it. I think that
decision should be made based on the LSN, and be deterministic.

- Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
WalSndWriteData() missing wal_sender_timeout <= 0 checks?

It seems we are checking that via
ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
need to check it before as well?

Either we don't need the precheck at all, or we should do it reliably. Right
now we'll have a higher overhead / some behavioural changes, if
wal_sender_timeout is disabled. That doesn't make sense.

Fair enough, we can probably do it earlier.

How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
WalSndWaitToSendPendingWrites?

I don't like those much:

We're not really waiting for the data to be sent or such, we just want to give
it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
actually are waiting for something to complete.

I don't think 'write' is a great description either, although our existing
terminology is somewhat muddled. We're waiting calling pq_flush() until
!pq_is_send_pending().

WalSndSendPending() or WalSndFlushPending()?

Either of those sounds fine.

--
With Regards,
Amit Kapila.

#5Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#3)

Hi,

On 2023-02-13 08:22:34 +0530, Amit Kapila wrote:

On Sat, Feb 11, 2023 at 3:04 AM Andres Freund <andres@anarazel.de> wrote:

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

I would like to clarify a few things about the proposed approach. In
commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
ctx->did_write = false;, then call the commit/prepare callback (which
will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
update_progress() which will make decisions based on ctx->did_write
flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
should know that the transaction has performed some writes before that
call which is currently working because pgoutput is tracking the same
via sent_begin_txn.

I don't really see these as being related. What pgoutput does internally to
optimize for some usecases shouldn't matter to the larger infrastructure.

Is the intention here that we still track whether BEGIN () has been sent via
pgoutput?

Yes. If somebody later wants to propose tracking this alongside a txn and
passing that to the output plugin callbacks, we can do that. But that's
independent of fixing the broken architecture of the progress infrastructure.

Greetings,

Andres Freund

#6Andres Freund
andres@anarazel.de
In reply to: Amit Kapila (#4)

Hi,

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

Greetings,

Andres Freund

#7wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Andres Freund (#6)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Thur, Feb 14, 2023 at 2:03 AM Andres Freund <andres@anarazel.de> wrote:

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

I would like to do something for this thread. So, I am planning to update the
patch as per discussion in the email chain unless someone is already working on
it.

Regards,
Wang wei

#8wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#7)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Sun, Feb 19, 2023 at 21:06 PM Wang, Wei/王 威 <wangw.fnst@fujitsu.com> wrote:

On Thur, Feb 14, 2023 at 2:03 AM Andres Freund <andres@anarazel.de> wrote:

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
if (!RelationIsLogicallyLogged(relation))
if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

I would like to do something for this thread. So, I am planning to update the
patch as per discussion in the email chain unless someone is already working on
it.

Thanks to Andres and Amit for the discussion.

Based on the discussion and Andres' WIP(in [1]/messages/by-id/20230208200235.esfoggsmuvf4pugt@awork3.anarazel.de), I made the following
modifications:
1. Some function renaming stuffs.
2. Added the threshold-related logic in the function
update_progress_and_keepalive.
3. Added the timeout-related processing of temporary data and
unlogged/foreign/system tables in the function ReorderBufferProcessTXN.
4. Improved error messages in the function OutputPluginPrepareWrite.
5. Invoked function update_progress_and_keepalive to fix sync-related problems
caused by filters such as origin in functions DecodeCommit(), DecodePrepare()
and ReorderBufferAbort();
6. Removed the invocation of function update_progress_and_keepalive in the
function begin_prepare_cb_wrapper().
7. Invoked the function update_progress_and_keepalive() in the function
stream_truncate_cb_wrapper(), just like we do in the function
truncate_cb_wrapper().
8. Removed the check of SyncRepRequested() in the syncrep logic in the function
WalSndUpdateProgressAndKeepAlive();
9. Added the check for wal_sender_timeout before using it in functions
WalSndUpdateProgressAndKeepAlive() and WalSndWriteData();

Attach the new patch.

[1]: /messages/by-id/20230208200235.esfoggsmuvf4pugt@awork3.anarazel.de

Regards,
Wang wei

Attachments:

v2-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v2-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchDownload+195-165
#9Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#8)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Dear Wang,

Thank you for making the patch. IIUC your patch basically can achieve that output plugin
does not have to call UpdateProgress.

I think the basic approach is as follows, is it right?

1. In *_cb_wrapper, set ctx->did_write to false
2. In OutputPluginWrite() set ctx->did_write to true.
This means that changes are really written, not skipped.
3. At the end of the transaction, call update_progress_and_keepalive().
Even if we are not at the end, check skipped count and call the function if needed.
The counter will be reset if ctx->did_write is true or we exceed the threshold.

Followings are my comments. I apologize if I missed some previous discussions.

01. logical.c

```
+static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
+                                                                                 bool finished_xact);
+
+static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
```

"struct" may be not needed.

02. UpdateDecodingProgressAndKeepalive

I think the name should be UpdateDecodingProgressAndSendKeepalive(), keepalive is not verb.
(But it's ok to ignore if you prefer the shorter name)
Same thing can be said for the name of datatype and callback.

03. UpdateDecodingProgressAndKeepalive

```
+       /* set output state */
+       ctx->accept_writes = false;
+       ctx->write_xid = xid;
+       ctx->write_location = lsn;
+       ctx->did_write = false;
```

Do we have to modify accept_writes, write_xid, and write_location here?
These value is not used in WalSndUpdateProgressAndKeepalive().

04. stream_abort_cb_wrapper

```
+ update_progress_and_keepalive(ctx, true)
```

I'm not sure, but is it correct that call update_progress_and_keepalive() with
finished_xact = true? Isn't there a possibility that streamed sub-transaciton is aborted?

05. is_skip_threshold_change

At the end of the transaction, update_progress_and_keepalive() is called directly.
Don't we have to reset change_count here?

06. ReorderBufferAbort

Assuming that the top transaction is aborted. At that time update_progress_and_keepalive()
is called in stream_abort_cb_wrapper(), an then WalSndUpdateProgressAndKeepalive()
is called at the end of ReorderBufferAbort(). Do we have to do in both?
I think stream_abort_cb_wrapper() may be not needed.

07. WalSndUpdateProgress

You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be still strange
because it will be called even if there are no pending writes.

Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
(at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be needed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

#10wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#9)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Thur, Feb 23, 2023 at 18:41 PM Kuroda, Hayato/黒田 隼人 <kuroda.hayato@fujitsu.com> wrote:

Dear Wang,

Thank you for making the patch. IIUC your patch basically can achieve that
output plugin
does not have to call UpdateProgress.

Thanks for your review and comments.

I think the basic approach is as follows, is it right?

1. In *_cb_wrapper, set ctx->did_write to false
2. In OutputPluginWrite() set ctx->did_write to true.
This means that changes are really written, not skipped.
3. At the end of the transaction, call update_progress_and_keepalive().
Even if we are not at the end, check skipped count and call the function if
needed.
The counter will be reset if ctx->did_write is true or we exceed the threshold.

Yes, you are right.
For the reset of the counter, please also refer to the reply to #05.

Followings are my comments. I apologize if I missed some previous discussions.

01. logical.c

```
+static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
+                                                                                 bool finished_xact);
+
+static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
```

"struct" may be not needed.

Removed.

02. UpdateDecodingProgressAndKeepalive

I think the name should be UpdateDecodingProgressAndSendKeepalive(),
keepalive is not verb.
(But it's ok to ignore if you prefer the shorter name)
Same thing can be said for the name of datatype and callback.

Yes, I prefer the shorter one. Otherwise, I think some names would be longer.

03. UpdateDecodingProgressAndKeepalive

```
+       /* set output state */
+       ctx->accept_writes = false;
+       ctx->write_xid = xid;
+       ctx->write_location = lsn;
+       ctx->did_write = false;
```

Do we have to modify accept_writes, write_xid, and write_location here?
These value is not used in WalSndUpdateProgressAndKeepalive().

I think it might be better to set these three flags.
Since LogicalOutputPluginWriterUpdateProgressAndKeepalive is an open callback, I
think setting write_xid and write_location is not just for the function
WalSndUpdateProgressAndKeepalive. And I think setting accept_writes could
prevent some wrong usage.

04. stream_abort_cb_wrapper

```
+ update_progress_and_keepalive(ctx, true)
```

I'm not sure, but is it correct that call update_progress_and_keepalive() with
finished_xact = true? Isn't there a possibility that streamed sub-transaciton is
aborted?

Fixed.

05. is_skip_threshold_change

At the end of the transaction, update_progress_and_keepalive() is called directly.
Don't we have to reset change_count here?

I think this might complicate the function is_skip_threshold_change, so I didn't
reset the counter in this case.
I think the worst case of not resetting the counter is to delay sending the
keepalive message for the next transaction. But since the threshold we're using
is safe enough, it seems fine to me not to reset the counter in this case.
Added these related comments in the function is_skip_threshold_change.

06. ReorderBufferAbort

Assuming that the top transaction is aborted. At that time
update_progress_and_keepalive()
is called in stream_abort_cb_wrapper(), an then
WalSndUpdateProgressAndKeepalive()
is called at the end of ReorderBufferAbort(). Do we have to do in both?
I think stream_abort_cb_wrapper() may be not needed.

Yes, I think we only need one call for this case.
To make the behavior in *_cb_wrapper look consistent, I avoided the second call
for this case in the function ReorderBufferAbort.

07. WalSndUpdateProgress

You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be
still strange
because it will be called even if there are no pending writes.

Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
(at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be
needed.

I think after sending the keepalive message (in WalSndKeepaliveIfNecessary), we
need to make sure the pending data is flushed through the loop.

Attach the new patch.

Regards,
Wang wei

Attachments:

v3-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v3-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchDownload+203-165
#11Peter Smith
smithpb2250@gmail.com
In reply to: wangw.fnst@fujitsu.com (#8)

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;

/* in streaming mode, stream_stop_cb is required */

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
    XLogReaderRoutine *xl_routine,
    LogicalOutputPluginWriterPrepareWrite prepare_write,
    LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
  if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better? Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

~~~

8. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

~~~

9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
* changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required. Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

~~~

11. update_progress_and_keepalive

/*
* Update progress tracking and send keep alive (if required).
*/
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)

======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

======
src/backend/replication/walsender.c

~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
     now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
    wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#12osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#10)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Hi,

On Monday, February 27, 2023 6:30 PM wangw.fnst@fujitsu.com <wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

Thanks for sharing v3. Minor review comments and question.

(1) UpdateDecodingProgressAndKeepalive header comment

The comment should be updated to explain maybe why we reset some other flags as discussed in [1]/messages/by-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9@OS3PR01MB6275.jpnprd01.prod.outlook.com and the functionality to update and keepalive of the function simply.

(2) OutputPluginPrepareWrite

Probably the changed error string is too wide.

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
        if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+               elog(ERROR, "writes are only accepted in callbacks in the OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and filter_prepare callbacks)");

I thought you can break the error message into two string lines. Or, you can rephrase it to different expression.

(3) Minor question

The patch introduced the goto statements into the cb_wrapper functions. Is the purpose to call the update_progress_and_keepalive after pop the error stack, even if the corresponding callback is missing ? I thought we can just have "else" clause for the check of the existence of callback, but did you choose the current goto style for readability ?

(4) Name of is_skip_threshold_change

I also feel the name of is_skip_threshold_change can be changed to "exceeded_keepalive_threshold" or something. Other candidates are proposed by Peter-san in [2]/messages/by-id/CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com.

[1]: /messages/by-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9@OS3PR01MB6275.jpnprd01.prod.outlook.com
[2]: /messages/by-id/CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com

Best Regards,
Takamichi Osumi

#13wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Peter Smith (#11)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote:

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

Thanks for your comments.

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced
in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

Since I think we can know the meaning of the input based on the parameter name
of the function, I think both approaches are fine. But the approach in the
current patch can reduce a member of the structure, so I think this modification
looks good to me.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;

/* in streaming mode, stream_stop_cb is required */

Removed.

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

Since I want to keep the function update_progress_and_keepalive simple, I didn't
change it.

~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~
5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.
~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

I'm not sure if 'do_update_keepalive' is accurate. So, to distinguish this
function from the parameter, I renamed the function to
'UpdateProgressAndKeepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better?

I tried to improve this message in the new patch. Do you have any suggestions to
improve it?

Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

I'm not sure if we should change errors to assertions here.

~~~

8. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

I think this FIXME seems to be another topic and I will delete this FIXME later.

~~~

9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

Renamed this function to is_keepalive_threshold_exceeded.

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
* changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required. Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

Fixed.
I think the second suggestion looks better to me.

~~~

11. update_progress_and_keepalive

/*
* Update progress tracking and send keep alive (if required).
*/
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)

I think these two approaches are fine. But because I think the approach in the
current patch is consistent with the style of other functions, I didn't change
it.

======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb-

private_data,

+    xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

It seems confusing to use txn->toplevel_xid to check whether it is top
transaction. Because the comment of txn->toptxn shows the meaning of value, I
updated the patch to use txn->toptxn to check this.

======
src/backend/replication/walsender.c
~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.

I think this function is used to flush pending data or send keepalive message.
But I'm not sure if we should add keepalive related string to the function
name, which seems to make this function name too long.

Attach the new patch.

Regards,
Wang wei

Attachments:

v4-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v4-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchDownload+207-170
#14wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: osumi.takamichi@fujitsu.com (#12)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Tues, Feb 28, 2023 at 11:31 AM Osumi, Takamichi/大墨 昂道 <osumi.takamichi@fujitsu.com> wrote:

Hi,

On Monday, February 27, 2023 6:30 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

Attach the new patch.

Thanks for sharing v3. Minor review comments and question.

Thanks for your comments.

(1) UpdateDecodingProgressAndKeepalive header comment

The comment should be updated to explain maybe why we reset some other
flags as discussed in [1] and the functionality to update and keepalive of the
function simply.

Added the comments atop the function UpdateDecodingProgressAndKeepalive about
when to call this function.

(2) OutputPluginPrepareWrite

Probably the changed error string is too wide.

@@ -662,7 +657,7 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
-               elog(ERROR, "writes are only accepted in commit, begin and change
callbacks");
+               elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and
filter_prepare callbacks)");

I thought you can break the error message into two string lines. Or, you can
rephrase it to different expression.

I tried to improve this message and broke it into two lines in the new patch.

(3) Minor question

The patch introduced the goto statements into the cb_wrapper functions. Is the
purpose to call the update_progress_and_keepalive after pop the error stack,
even if the corresponding callback is missing ? I thought we can just have "else"
clause for the check of the existence of callback, but did you choose the current
goto style for readability ?

I think both styles look fine to me.
I haven't modified this for this version. I'll reconsider if anyone else has
similar thoughts later.

(4) Name of is_skip_threshold_change

I also feel the name of is_skip_threshold_change can be changed to
"exceeded_keepalive_threshold" or something. Other candidates are proposed
by Peter-san in [2].

Renamed this function to is_keepalive_threshold_exceeded.

Please see the new patch in [1]/messages/by-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com.

[1]: /messages/by-id/OS3PR01MB6275C6CA72222C0C23730A319EAD9@OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

#15Peter Smith
smithpb2250@gmail.com
In reply to: wangw.fnst@fujitsu.com (#13)

On Wed, Mar 1, 2023 at 9:16 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com> wrote:

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

Thanks for your comments.

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced
in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

Since I think we can know the meaning of the input based on the parameter name
of the function, I think both approaches are fine. But the approach in the
current patch can reduce a member of the structure, so I think this modification
looks good to me.

Hmm, I am not so sure:

- Why is reducing members of LogicalDecodingContext even a goal? I
thought the LogicalDecodingContext is intended to be the one-stop
place to hold *all* things related to the "Context" (including that
member that was deleted).

- How is reducing one member better than introducing one new parameter
in multiple calls?

Anyway, I think this exposes another problem. If you still want the
patch to pass the 'finshed_xact' parameter separately then AFAICT the
first parameter (ctx) now becomes unused/redundant in the
WalSndUpdateProgressAndKeepalive function, so it ought to be removed.

======
src/backend/replication/logical/logical.c

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

Since I want to keep the function update_progress_and_keepalive simple, I didn't
change it.

Hmm, the reason given seems like a false economy to me. You are able
to keep this 1 function simpler only by adding more complexity to the
calls in 6 other places. Let's see if other people have opinions about
this.

~~~

1.
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+    bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);

1a.
There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

~

1b.
I did not recognize a reason for the different naming conventions.
Here are two new functions but one is CamelCase and one is snake_case.
What are the rules to decide the naming?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#16Andres Freund
andres@anarazel.de
In reply to: Peter Smith (#15)

Hi,

On 2023-03-03 11:18:04 +1100, Peter Smith wrote:

- Why is reducing members of LogicalDecodingContext even a goal? I
thought the LogicalDecodingContext is intended to be the one-stop
place to hold *all* things related to the "Context" (including that
member that was deleted).

There's not really a reason to keep it in LogicalDecodingContext after
this change. It was only needed there because of the broken
architectural model of calling UpdateProgress from within output
plugins. Why set a field in each wrapper that we don't need?

- How is reducing one member better than introducing one new parameter
in multiple calls?

Reducing the member isn't important, needing to set it before each
callback however makes sense.

Greetings,

Andres Freund

#17Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Peter Smith (#15)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Friday, March 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:

On Wed, Mar 1, 2023 at 9:16 PM wangw.fnst@fujitsu.com
<wangw.fnst@fujitsu.com> wrote:

On Tues, Feb 28, 2023 at 9:12 AM Peter Smith <smithpb2250@gmail.com>

wrote:

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

Thanks for your comments.

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change
in the common code, we removed the

LogicalDecodingContext->end_xact

introduced in commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement.
IIUC, it removes the "unnecessary" member, but only does that by
replacing it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less
code, but it is less readable code now because you need to know what
the true/false parameter means. I wonder if it would have been
better just to leave this how it was.

Since I think we can know the meaning of the input based on the
parameter name of the function, I think both approaches are fine. But
the approach in the current patch can reduce a member of the
structure, so I think this modification looks good to me.

...

Anyway, I think this exposes another problem. If you still want the patch to pass
the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
function, so it ought to be removed.

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Best Regards,
Hou zj

#18Peter Smith
smithpb2250@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#17)

On Fri, Mar 3, 2023 at 1:27 PM houzj.fnst@fujitsu.com
<houzj.fnst@fujitsu.com> wrote:

On Friday, March 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:

...

Anyway, I think this exposes another problem. If you still want the patch to pass
the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
function, so it ought to be removed.

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Oh right, that makes sense. Thanks.

Perhaps it just wants some comment to mention that although the
built-in implementation does not use the 'ctx' users might implement
their own logic which does use it.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

#19wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Peter Smith (#18)
RE: Rework LogicalOutputPluginWriterUpdateProgress

On Fri, Mar 3, 2023 8:18 AM Peter Smith <smithpb2250@gmail.com> wrote:

Thanks for your comments.

1.
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+    bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);

1a.
There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

Removed.

~

1b.
I did not recognize a reason for the different naming conventions.
Here are two new functions but one is CamelCase and one is snake_case.
What are the rules to decide the naming?

I used the snake_case style for the function UpdateProgressAndKeepalive in the
previous version, but it was confusing because it shared the same parameter name
with the functions StartupDecodingContext, CreateInitDecodingContext and
CreateDecodingContext. To avoid this confusion, and since both naming styles
exist in this file, I changed it to CamelCase style.

Attach the new patch.

Regards,
Wang wei

Attachments:

v5-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchapplication/octet-stream; name=v5-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patchDownload+206-170
#20Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#19)
RE: Rework LogicalOutputPluginWriterUpdateProgress

Dear Wang,

Thank you for updating the patch! Followings are my comments.

---
01. missing comments
You might miss the comment from Peter[1]/messages/by-id/CAHut+PsksiQHuv4A54R4w79TAvCu__PcuffKYY0V96e2z_sEvA@mail.gmail.com. Or could you pin the related one?

---
02. LogicalDecodingProcessRecord()

Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
decoding function? Assuming that the timeout parameter does not have enough time
period and there are so many sequential operations in the transaction. At that time
there may be a possibility that timeout is occurred while calling ReorderBufferProcessXid()
several times. It may be a bad example, but I meant to say that we may have to
consider the case that decoding function has not implemented yet.

---
03. stream_*_cb_wrapper

Only stream_*_cb_wrapper have comments "don't call update progress, we didn't really make any", but
there are more functions that does not send updates. Do you have any reasons why only they have?

[1]: /messages/by-id/CAHut+PsksiQHuv4A54R4w79TAvCu__PcuffKYY0V96e2z_sEvA@mail.gmail.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

#21wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#20)
#22Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#21)
#23osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#21)
#24Peter Smith
smithpb2250@gmail.com
In reply to: wangw.fnst@fujitsu.com (#21)
#25Amit Kapila
amit.kapila16@gmail.com
In reply to: wangw.fnst@fujitsu.com (#21)
#26Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#24)
#27Peter Smith
smithpb2250@gmail.com
In reply to: Amit Kapila (#26)
#28Amit Kapila
amit.kapila16@gmail.com
In reply to: Peter Smith (#27)
#29wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Amit Kapila (#25)
#30wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Hayato Kuroda (Fujitsu) (#22)
#31wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: osumi.takamichi@fujitsu.com (#23)
#32wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Peter Smith (#24)
#33wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: Amit Kapila (#28)
#34osumi.takamichi@fujitsu.com
osumi.takamichi@fujitsu.com
In reply to: wangw.fnst@fujitsu.com (#29)
#35wangw.fnst@fujitsu.com
wangw.fnst@fujitsu.com
In reply to: osumi.takamichi@fujitsu.com (#34)
#36vignesh C
vignesh21@gmail.com
In reply to: wangw.fnst@fujitsu.com (#35)