Unnecessary confirm work on logical replication

Started by Emre Hasegelialmost 3 years ago4 messages
#1Emre Hasegeli
emre@hasegeli.com
1 attachment(s)

I was reading the logical replication code and found a little
unnecessary work we are doing.

The confirmed_flushed_lsn cannot reasonably be ahead of the
current_lsn, so there is no point of calling
LogicalConfirmReceivedLocation() every time we update the candidate
xmin or restart_lsn.

Patch is attached.

Attachments:

v00-eliminate-unnecessary-confirm-work-on-logical-replication.patchapplication/octet-stream; name=v00-eliminate-unnecessary-confirm-work-on-logical-replication.patchDownload
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a6..0d2930c4b5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1631,28 +1631,27 @@ update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	OutputPluginUpdateProgress(ctx, false);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 }
 
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
  *
- * Note that in the most cases, we won't be able to immediately use the xmin
+ * Note that we won't be able to immediately use the xmin
  * to increase the xmin horizon: we need to wait till the client has confirmed
  * receiving current_lsn with LogicalConfirmReceivedLocation().
  */
 void
 LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 {
-	bool		updated_xmin = false;
 	ReplicationSlot *slot;
 	bool		got_new_xmin = false;
 
 	slot = MyReplicationSlot;
 
 	Assert(slot != NULL);
 
 	SpinLockAcquire(&slot->mutex);
 
 	/*
@@ -1665,23 +1664,20 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 
 	/*
 	 * If the client has already confirmed up to this lsn, we directly can
 	 * mark this as accepted. This can happen if we restart decoding in a
 	 * slot.
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
-
-		/* our candidate can directly be used */
-		updated_xmin = true;
 	}
 
 	/*
 	 * Only increase if the previous values have been applied, otherwise we
 	 * might never end up updating if the receiver acks too slowly.
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
@@ -1690,37 +1686,32 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 		 * Log new xmin at an appropriate log level after releasing the
 		 * spinlock.
 		 */
 		got_new_xmin = true;
 	}
 	SpinLockRelease(&slot->mutex);
 
 	if (got_new_xmin)
 		elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
 			 LSN_FORMAT_ARGS(current_lsn));
-
-	/* candidate already valid with the current flush position, apply */
-	if (updated_xmin)
-		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
 }
 
 /*
  * Mark the minimal LSN (restart_lsn) we need to read to replay all
  * transactions that have not yet committed at current_lsn.
  *
  * Just like LogicalIncreaseXminForSlot this only takes effect when the
  * client has confirmed to have received current_lsn.
  */
 void
 LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
 {
-	bool		updated_lsn = false;
 	ReplicationSlot *slot;
 
 	slot = MyReplicationSlot;
 
 	Assert(slot != NULL);
 	Assert(restart_lsn != InvalidXLogRecPtr);
 	Assert(current_lsn != InvalidXLogRecPtr);
 
 	SpinLockAcquire(&slot->mutex);
 
@@ -1730,23 +1721,20 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	}
 
 	/*
 	 * We might have already flushed far enough to directly accept this lsn,
 	 * in this case there is no need to check for existing candidate LSNs
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
-
-		/* our candidate can directly be used */
-		updated_lsn = true;
 	}
 
 	/*
 	 * Only increase if the previous values have been applied, otherwise we
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		slot->candidate_restart_valid = current_lsn;
@@ -1768,24 +1756,20 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
 			 LSN_FORMAT_ARGS(candidate_restart_lsn),
 			 LSN_FORMAT_ARGS(candidate_restart_valid),
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
-
-	/* candidates are already valid with the current flush position, apply */
-	if (updated_lsn)
-		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
 }
 
 /*
  * Handle a consumer's confirmation having received all changes up to lsn.
  */
 void
 LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
#2Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Emre Hasegeli (#1)
Re: Unnecessary confirm work on logical replication

On Fri, Apr 7, 2023 at 11:06 PM Emre Hasegeli <emre@hasegeli.com> wrote:

I was reading the logical replication code and found a little
unnecessary work we are doing.

The confirmed_flushed_lsn cannot reasonably be ahead of the
current_lsn, so there is no point of calling
LogicalConfirmReceivedLocation() every time we update the candidate
xmin or restart_lsn.

In fact, the WAL sender always starts reading WAL from restart_lsn,
which in turn is always <= confirmed_flush_lsn. While reading WAL, WAL
sender may read XLOG_RUNNING_XACTS WAL record with lsn <=
confirmed_flush_lsn. While processing XLOG_RUNNING_XACTS record it may
update its restart_lsn and catalog_xmin with current_lsn = lsn fo
XLOG_RUNNING_XACTS record. In this situation current_lsn <=
confirmed_flush_lsn.

Does that make sense?

--
Best Wishes,
Ashutosh Bapat

#3Emre Hasegeli
emre@hasegeli.com
In reply to: Ashutosh Bapat (#2)
Re: Unnecessary confirm work on logical replication

In fact, the WAL sender always starts reading WAL from restart_lsn,
which in turn is always <= confirmed_flush_lsn. While reading WAL, WAL
sender may read XLOG_RUNNING_XACTS WAL record with lsn <=
confirmed_flush_lsn. While processing XLOG_RUNNING_XACTS record it may
update its restart_lsn and catalog_xmin with current_lsn = lsn fo
XLOG_RUNNING_XACTS record. In this situation current_lsn <=
confirmed_flush_lsn.

This can only happen when the WAL sender is restarted. However in
this case, the restart_lsn and catalog_xmin should have already been
persisted by the previous run of the WAL sender.

I still doubt these calls are necessary. I think there is a
complicated chicken and egg problem here. Here is my logic:

1) LogicalConfirmReceivedLocation() is called explicitly when
confirmed_flush is sent by the replication client.

2) LogicalConfirmReceivedLocation() is the only place that updates
confirmed_flush.

3) The replication client can only send a confirmed_flush for a
current_lsn it has already received.

4) These two functions have already run for any current_lsn the
replication client has received.

5) These two functions call LogicalConfirmReceivedLocation() only if
current_lsn <= confirmed_flush.

Thank you for your patience.

#4Tomas Vondra
tomas.vondra@enterprisedb.com
In reply to: Emre Hasegeli (#3)
Re: Unnecessary confirm work on logical replication

On 4/11/23 14:58, Emre Hasegeli wrote:

In fact, the WAL sender always starts reading WAL from restart_lsn,
which in turn is always <= confirmed_flush_lsn. While reading WAL, WAL
sender may read XLOG_RUNNING_XACTS WAL record with lsn <=
confirmed_flush_lsn. While processing XLOG_RUNNING_XACTS record it may
update its restart_lsn and catalog_xmin with current_lsn = lsn fo
XLOG_RUNNING_XACTS record. In this situation current_lsn <=
confirmed_flush_lsn.

This can only happen when the WAL sender is restarted. However in
this case, the restart_lsn and catalog_xmin should have already been
persisted by the previous run of the WAL sender.

I still doubt these calls are necessary. I think there is a
complicated chicken and egg problem here. Here is my logic:

1) LogicalConfirmReceivedLocation() is called explicitly when
confirmed_flush is sent by the replication client.

2) LogicalConfirmReceivedLocation() is the only place that updates
confirmed_flush.

3) The replication client can only send a confirmed_flush for a
current_lsn it has already received.

4) These two functions have already run for any current_lsn the
replication client has received.

5) These two functions call LogicalConfirmReceivedLocation() only if
current_lsn <= confirmed_flush.

Thank you for your patience.

Hi Emre,

I was going through my TODO list of messages, and I stumbled on this
thread from a couple months back. Do you still think this is something
we should do?

I see there was some discussion about whether this update is needed, or
whether current_lsn can ever be <= confirmed_flush_lsn. I think you may
be right this can't happen, but I wonder if we could verify that by an
assert in a convenient place (instead of just removing the update).

Also, did you try to quantify how expensive this is? The update seems
very cheap, but I guess you just noticed by eye-balling the code, which
is fine ofc. Even if it's cheap/not noticeable, it still may be worth
removing so as not to confuse people improving the code in the future.

regards

--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company