Unnecessary confirm work on logical replication
I was reading the logical replication code and found a little
unnecessary work we are doing.
The confirmed_flushed_lsn cannot reasonably be ahead of the
current_lsn, so there is no point of calling
LogicalConfirmReceivedLocation() every time we update the candidate
xmin or restart_lsn.
Patch is attached.
Attachments:
v00-eliminate-unnecessary-confirm-work-on-logical-replication.patchapplication/octet-stream; name=v00-eliminate-unnecessary-confirm-work-on-logical-replication.patchDownload
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a6..0d2930c4b5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1631,28 +1631,27 @@ update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
OutputPluginUpdateProgress(ctx, false);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
*
- * Note that in the most cases, we won't be able to immediately use the xmin
+ * Note that we won't be able to immediately use the xmin
* to increase the xmin horizon: we need to wait till the client has confirmed
* receiving current_lsn with LogicalConfirmReceivedLocation().
*/
void
LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
{
- bool updated_xmin = false;
ReplicationSlot *slot;
bool got_new_xmin = false;
slot = MyReplicationSlot;
Assert(slot != NULL);
SpinLockAcquire(&slot->mutex);
/*
@@ -1665,23 +1664,20 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
/*
* If the client has already confirmed up to this lsn, we directly can
* mark this as accepted. This can happen if we restart decoding in a
* slot.
*/
else if (current_lsn <= slot->data.confirmed_flush)
{
slot->candidate_catalog_xmin = xmin;
slot->candidate_xmin_lsn = current_lsn;
-
- /* our candidate can directly be used */
- updated_xmin = true;
}
/*
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly.
*/
else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
{
slot->candidate_catalog_xmin = xmin;
slot->candidate_xmin_lsn = current_lsn;
@@ -1690,37 +1686,32 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
* Log new xmin at an appropriate log level after releasing the
* spinlock.
*/
got_new_xmin = true;
}
SpinLockRelease(&slot->mutex);
if (got_new_xmin)
elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
LSN_FORMAT_ARGS(current_lsn));
-
- /* candidate already valid with the current flush position, apply */
- if (updated_xmin)
- LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
}
/*
* Mark the minimal LSN (restart_lsn) we need to read to replay all
* transactions that have not yet committed at current_lsn.
*
* Just like LogicalIncreaseXminForSlot this only takes effect when the
* client has confirmed to have received current_lsn.
*/
void
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
{
- bool updated_lsn = false;
ReplicationSlot *slot;
slot = MyReplicationSlot;
Assert(slot != NULL);
Assert(restart_lsn != InvalidXLogRecPtr);
Assert(current_lsn != InvalidXLogRecPtr);
SpinLockAcquire(&slot->mutex);
@@ -1730,23 +1721,20 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
}
/*
* We might have already flushed far enough to directly accept this lsn,
* in this case there is no need to check for existing candidate LSNs
*/
else if (current_lsn <= slot->data.confirmed_flush)
{
slot->candidate_restart_valid = current_lsn;
slot->candidate_restart_lsn = restart_lsn;
-
- /* our candidate can directly be used */
- updated_lsn = true;
}
/*
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly. A missed
* value here will just cause some extra effort after reconnecting.
*/
if (slot->candidate_restart_valid == InvalidXLogRecPtr)
{
slot->candidate_restart_valid = current_lsn;
@@ -1768,24 +1756,20 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);
elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
LSN_FORMAT_ARGS(candidate_restart_lsn),
LSN_FORMAT_ARGS(candidate_restart_valid),
LSN_FORMAT_ARGS(confirmed_flush));
}
-
- /* candidates are already valid with the current flush position, apply */
- if (updated_lsn)
- LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
}
/*
* Handle a consumer's confirmation having received all changes up to lsn.
*/
void
LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
Assert(lsn != InvalidXLogRecPtr);
On Fri, Apr 7, 2023 at 11:06 PM Emre Hasegeli <emre@hasegeli.com> wrote:
I was reading the logical replication code and found a little
unnecessary work we are doing.The confirmed_flushed_lsn cannot reasonably be ahead of the
current_lsn, so there is no point of calling
LogicalConfirmReceivedLocation() every time we update the candidate
xmin or restart_lsn.
In fact, the WAL sender always starts reading WAL from restart_lsn,
which in turn is always <= confirmed_flush_lsn. While reading WAL, WAL
sender may read XLOG_RUNNING_XACTS WAL record with lsn <=
confirmed_flush_lsn. While processing XLOG_RUNNING_XACTS record it may
update its restart_lsn and catalog_xmin with current_lsn = lsn fo
XLOG_RUNNING_XACTS record. In this situation current_lsn <=
confirmed_flush_lsn.
Does that make sense?
--
Best Wishes,
Ashutosh Bapat
In fact, the WAL sender always starts reading WAL from restart_lsn,
which in turn is always <= confirmed_flush_lsn. While reading WAL, WAL
sender may read XLOG_RUNNING_XACTS WAL record with lsn <=
confirmed_flush_lsn. While processing XLOG_RUNNING_XACTS record it may
update its restart_lsn and catalog_xmin with current_lsn = lsn fo
XLOG_RUNNING_XACTS record. In this situation current_lsn <=
confirmed_flush_lsn.
This can only happen when the WAL sender is restarted. However in
this case, the restart_lsn and catalog_xmin should have already been
persisted by the previous run of the WAL sender.
I still doubt these calls are necessary. I think there is a
complicated chicken and egg problem here. Here is my logic:
1) LogicalConfirmReceivedLocation() is called explicitly when
confirmed_flush is sent by the replication client.
2) LogicalConfirmReceivedLocation() is the only place that updates
confirmed_flush.
3) The replication client can only send a confirmed_flush for a
current_lsn it has already received.
4) These two functions have already run for any current_lsn the
replication client has received.
5) These two functions call LogicalConfirmReceivedLocation() only if
current_lsn <= confirmed_flush.
Thank you for your patience.
On 4/11/23 14:58, Emre Hasegeli wrote:
In fact, the WAL sender always starts reading WAL from restart_lsn,
which in turn is always <= confirmed_flush_lsn. While reading WAL, WAL
sender may read XLOG_RUNNING_XACTS WAL record with lsn <=
confirmed_flush_lsn. While processing XLOG_RUNNING_XACTS record it may
update its restart_lsn and catalog_xmin with current_lsn = lsn fo
XLOG_RUNNING_XACTS record. In this situation current_lsn <=
confirmed_flush_lsn.This can only happen when the WAL sender is restarted. However in
this case, the restart_lsn and catalog_xmin should have already been
persisted by the previous run of the WAL sender.I still doubt these calls are necessary. I think there is a
complicated chicken and egg problem here. Here is my logic:1) LogicalConfirmReceivedLocation() is called explicitly when
confirmed_flush is sent by the replication client.2) LogicalConfirmReceivedLocation() is the only place that updates
confirmed_flush.3) The replication client can only send a confirmed_flush for a
current_lsn it has already received.4) These two functions have already run for any current_lsn the
replication client has received.5) These two functions call LogicalConfirmReceivedLocation() only if
current_lsn <= confirmed_flush.Thank you for your patience.
Hi Emre,
I was going through my TODO list of messages, and I stumbled on this
thread from a couple months back. Do you still think this is something
we should do?
I see there was some discussion about whether this update is needed, or
whether current_lsn can ever be <= confirmed_flush_lsn. I think you may
be right this can't happen, but I wonder if we could verify that by an
assert in a convenient place (instead of just removing the update).
Also, did you try to quantify how expensive this is? The update seems
very cheap, but I guess you just noticed by eye-balling the code, which
is fine ofc. Even if it's cheap/not noticeable, it still may be worth
removing so as not to confuse people improving the code in the future.
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company