From 0d10ca479a335fd6d6aee63a77a003b6fee76f14 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 1 Sep 2024 12:03:57 +0800 Subject: [PATCH v21 3/6] Send the slot flush feedback message via apply worker This patch allows the apply worker to send feedback about the WAL position of the publisher that has been flushed locally and that all local changes occurring before this WAL position have been flushed to the remote node corresponding to the feedback slots. In this context, the remote node indicates the publisher, as the feedback is currently used only in a bidirectional cluster to preserve old row versions for conflict detection purposes. If feedback_slots is NULL and a status might have already been sent to update the xmin value of the slot, an InvalidXLogRecPtr is sent. This indicates that the apply worker no longer tracks or sends feedback about the confirmed flush position. In this case, the publisher should reset the slot's xmin to InvalidXLogRecPtr to allow dead tuples in user tables to be removed. The status message is sent at most once per wal_receiver_status_interval. TODO: doc. --- src/backend/replication/logical/worker.c | 262 ++++++++++++++++++++++- 1 file changed, 260 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0fb577d328..f4a31ef534 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,6 +173,7 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" @@ -378,6 +379,8 @@ static void stream_open_and_write_change(TransactionId xid, char action, StringI static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void send_flush_status_feedback(XLogRecPtr recvpos, + bool feedback_slots_changed); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -3499,8 +3502,18 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, if (pos->local_end <= local_flush) { *flush = pos->remote_end; - dlist_delete(iter.cur); - pfree(pos); + + /* + * If feedback slots are specified, do not clean up entries here. + * These entries are needed to compute positions that are + * confirmed to be flushed to the remote node. The entries will be + * cleaned up in the get_slot_flush_position(). + */ + if (!MySubscription->feedback_slots) + { + dlist_delete(iter.cur); + pfree(pos); + } } else { @@ -3520,6 +3533,114 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, *have_pending_txes = !dlist_is_empty(&lsn_mapping); } + +/* + * Get the oldest confirmed flush LSN across all slots specified in + * feedback_slots. + */ +static XLogRecPtr +get_slot_confirmed_flush(void) +{ + int checked_slot_num = 0; + XLogRecPtr min_confirmed_flush = InvalidXLogRecPtr; + + /* Quickly exit if there are no feedback slots configured */ + if (!MySubscription->feedback_slots) + return InvalidXLogRecPtr; + + /* + * To prevent concurrent slot dropping and creation while filtering the + * slots, take the ReplicationSlotControlLock outside of the loop. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + foreach_ptr(String, name, MySubscription->feedback_slots) + { + XLogRecPtr confirmed_flush; + ReplicationSlot *slot; + + slot = ValidateAndGetFeedbackSlot(strVal(name)); + + if (!slot) + break; + + SpinLockAcquire(&slot->mutex); + confirmed_flush = slot->data.confirmed_flush; + SpinLockRelease(&slot->mutex); + + if (XLogRecPtrIsInvalid(min_confirmed_flush) || + min_confirmed_flush > confirmed_flush) + min_confirmed_flush = confirmed_flush; + + checked_slot_num++; + } + + LWLockRelease(ReplicationSlotControlLock); + + if (checked_slot_num != list_length(MySubscription->feedback_slots)) + return InvalidXLogRecPtr; + + return min_confirmed_flush; +} + +/* + * Figure out the WAL positions confirmed to be flushed to the remote node + * corresponding to the feedback slots, to report to the walsender process. + * + * When reporting the position to the sender, this function iterates through + * the list and checks which entries are confirmed to be flushed to the remote + * node. These entries are then reported as having been flushed. + * + * The have_pending_txes variable is set to true if there are outstanding + * transactions that need to be flushed. + */ +static void +get_slot_flush_position(XLogRecPtr *slot_flush, bool *have_pending_txes) +{ + dlist_mutable_iter iter; + XLogRecPtr slot_confirmed_flush; + + *slot_flush = InvalidXLogRecPtr; + + slot_confirmed_flush = get_slot_confirmed_flush(); + + /* + * There is no need to iterate the list if we cannot get the correct slot + * flush position. + */ + if (XLogRecPtrIsInvalid(slot_confirmed_flush)) + { + *have_pending_txes = !dlist_is_empty(&lsn_mapping); + return; + } + + dlist_foreach_modify(iter, &lsn_mapping) + { + FlushPosition *pos = + dlist_container(FlushPosition, node, iter.cur); + + if (pos->local_end <= slot_confirmed_flush) + { + Assert(pos->local_end <= GetFlushRecPtr(NULL)); + + *slot_flush = pos->remote_end; + dlist_delete(iter.cur); + pfree(pos); + } + else + { + /* + * Don't want to uselessly iterate over the rest of the list which + * could potentially be long. + */ + *have_pending_txes = true; + return; + } + } + + *have_pending_txes = !dlist_is_empty(&lsn_mapping); +} + /* * Store current remote/local lsn pair in the tracking list. */ @@ -3692,6 +3813,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, reply_requested, false); UpdateWorkerStats(last_received, timestamp, true); + + send_flush_status_feedback(last_received, false); } /* other message types are purposefully ignored */ @@ -3705,6 +3828,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + send_flush_status_feedback(last_received, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3915,6 +4040,122 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) last_flushpos = flushpos; } +/* + * Send a Flush Status Update message to the server. + * + * This function sends the WAL position of the publisher that has been flushed + * locally and confirms that all local changes occurring before this WAL + * position have been flushed to the remote node corresponding to the feedback + * slots. In this context, the remote node indicates the publisher, as the + * feedback is currently used only in a bidirectional cluster to preserve old + * row versions for conflict detection purposes (see the comments for + * ReplicationSlot in slot.h for details). + * + * If feedback_slots is NULL and a status might have already been sent to + * update the xmin value of the slot, an InvalidXLogRecPtr is sent. This + * indicates that the apply worker no longer tracks or sends feedback about the + * confirmed flush position. In this case, the publisher should reset the + * slot's xmin to InvalidXLogRecPtr to allow dead tuples in user tables to be + * removed. + * + * The status message is sent at most once per wal_receiver_status_interval, + * unless 'feedback_slots_changed' is set to true. In that case, we recalculate + * the sending time. + */ +static void +send_flush_status_feedback(XLogRecPtr recvpos, bool feedback_slots_changed) +{ + static StringInfo reply_message = NULL; + static TimestampTz send_time = 0; + + /* + * Initialize the flag to force sending a final message upon the first + * connection if feedback_slots is not specified. This is necessary + * because a previous connection might have set xmin on a replication slot + * on the publisher. + */ + static bool publisher_has_slot_xmin = true; + + XLogRecPtr slot_flushpos; + bool have_pending_txes; + TimestampTz now; + + /* Reset the last-send time when the feedback_slots option was changed */ + if (feedback_slots_changed) + send_time = 0; + + /* + * Return if status reporting is disabled unless a status has already been + * sent. In such a case, one more message is needed to inform the + * walsender to forget about the xmin. + */ + if ((wal_receiver_status_interval <= 0 || !MySubscription->feedback_slots) && + !publisher_has_slot_xmin) + return; + + get_slot_flush_position(&slot_flushpos, &have_pending_txes); + + /* + * If there are no outstanding transactions to flush and feedback has been + * sent previously, we can report the latest received position (recvpos). + * + * Note that we cannot report the latest position if no feedback has ever + * been sent. This is because all the entries in lsn_mapping could have + * been cleaned up before specifying feedback slots. As a result, we + * cannot determine the local position corresponding to previous local + * flushed transactions, and thus cannot confirm if the WALs have been + * flushed to the remote node. + * + * When a feedback message has been sent, it means we have confirmed that + * all changes applied after specifying feedback slots have been flushed + * on the remote node. + */ + if (MySubscription->feedback_slots && send_time && !have_pending_txes) + slot_flushpos = recvpos; + + /* + * Return if feedback slots are specified but the slot's flush position is + * invalid. This indicates that some specified slots are not valid + * feedback slots (see ValidateAndGetFeedbackSlot for details), or the + * worker has has not yet applied any changes since the feedback slots + * were specified. + */ + if (MySubscription->feedback_slots && XLogRecPtrIsInvalid(slot_flushpos)) + return; + + Assert(!XLogRecPtrIsInvalid(slot_flushpos) || publisher_has_slot_xmin); + + /* Get current timestamp */ + now = GetCurrentTimestamp(); + + /* Send feedback at most once per wal_receiver_status_interval */ + if (!TimestampDifferenceExceeds(send_time, now, + wal_receiver_status_interval * 1000)) + return; + + if (!reply_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + reply_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(reply_message); + + pq_sendbyte(reply_message, 'x'); + pq_sendint64(reply_message, slot_flushpos); + + elog(DEBUG2, "sending flush status feedback %X/%X", + LSN_FORMAT_ARGS(slot_flushpos)); + + walrcv_send(LogRepWorkerWalRcvConn, + reply_message->data, reply_message->len); + + publisher_has_slot_xmin = !XLogRecPtrIsInvalid(slot_flushpos); + send_time = XLogRecPtrIsInvalid(slot_flushpos) ? 0 : now; +} + /* * Exit routine for apply workers due to subscription parameter changes. */ @@ -3955,6 +4196,7 @@ maybe_reread_subscription(void) MemoryContext oldctx; Subscription *newsub; bool started_tx = false; + bool send_feedback = false; /* When cache state is valid there is nothing to do here. */ if (MySubscriptionValid) @@ -4005,6 +4247,19 @@ maybe_reread_subscription(void) /* two-phase cannot be altered while the worker is running */ Assert(newsub->twophasestate == MySubscription->twophasestate); + /* + * Force sending a message when the feedback_slots parameter changes. If + * feedback_slots is changed to NULL, send a final message to reset the + * xmin of the slot on the publisher. Otherwise, try to send a message to + * update the slot's xmin on the publisher. + * + * The message will be sent after switching to the new subscription info + * (at the end of the function) so that the updated feedback slots info + * can be used when sending the message. + */ + if (!equal(MySubscription->feedback_slots, newsub->feedback_slots)) + send_feedback = true; + /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker but note that the parallel apply @@ -4072,6 +4327,9 @@ maybe_reread_subscription(void) if (started_tx) CommitTransactionCommand(); + if (send_feedback) + send_flush_status_feedback(MyLogicalRepWorker->last_lsn, true); + MySubscriptionValid = true; } -- 2.30.0.windows.2