diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8bef3fbdaf..0c961e0e8b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2757,12 +2757,11 @@ XLogSendLogical(void) char *errm; /* - * Don't know whether we've caught up yet. We'll set it to true in + * Don't know whether we've caught up yet. We'll set WalSndCaughtUp true in * WalSndWaitForWal, if we're actually waiting. We also set to true if * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - * i.e. when we're shutting down. */ - WalSndCaughtUp = false; record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); logical_startptr = InvalidXLogRecPtr; @@ -2773,6 +2772,8 @@ XLogSendLogical(void) if (record != NULL) { + XLogRecPtr flushPtr = GetFlushRecPtr(); + /* * Note the lack of any call to LagTrackerWrite() which is handled by * WalSndUpdateProgress which is called by output plugin through @@ -2781,6 +2782,13 @@ XLogSendLogical(void) LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; + + /* + * If we've sent a record that is at or beyond the flushed point, + * we've caught up. + */ + if (sentPtr >= flushPtr) + WalSndCaughtUp = true; } else {