From 37537e279e5e84b03ed4a00999f8b757e84adf8e Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 1 Sep 2024 12:02:42 +0800 Subject: [PATCH v21 1/6] Maintain and Advance slot.xmin in logical walsender Conflict detection for update_deleted in logical replication This set of patches aims to support the detection of an update_deleted conflict, which occurs when the apply worker cannot find the target tuple to be updated (e.g., the tuple has been removed by a different origin). To detect this conflict consistently and correctly, we must ensure that tuples deleted by other origins are not removed by VACUUM before conflict detection. If this happens, a different conflict might be generated and resolved incorrectly, causing data inconsistency between nodes. Assuming we have a 2 nodes (A and B) bidirectional cluster and are using last update win resolution for conflicts: Node A: T1: INSERT INTO t (id, value) VALUES (1,1); T2: DELETE FROM t WHERE id = 1; Node B: T3: UPDATE t SET value = 2 WHERE id = 1; To retain the deleted tuples, the initial idea was that once transaction T2 had been applied to both nodes, there was no longer a need to preserve the dead tuple on Node A. However, a scenario arises where transactions T3 and T2 occur concurrently, with T3 committing slightly earlier than T2. In this case, if Node B applies T2 and Node A removes the dead tuple (1,1) via VACUUM, and then Node A applies T3 after the VACUUM operation, it can only result in an update_missing conflict. Given that the default resolution for update_missing conflicts is apply_or_skip (e.g. convert update to insert if possible and apply the insert), Node A will eventually hold a row (1,2) while Node B becomes empty, causing data inconsistency. Therefore, the strategy needs to be expanded as follows: Node A cannot remove the dead tuple until: (a) The DELETE operation is replayed on all remote nodes, *AND* (b) The transactions on logical standbys occurring before the replay of Node A's DELETE are replayed on Node A as well. To achieve the above, we allow the logical walsender to maintain and advance the slot.xmin to protect the data in the user table and introduce a new logical standby feedback message. This message reports the WAL position that has been replayed on the logical standby *AND* the changes occurring on the logical standby before the WAL position are also replayed to the walsender's node (where the walsender is running). After receiving the new feedback message, the walsender will advance the slot.xmin based on the flush info, similar to the advancement of catalog_xmin. We have introduced a new subscription option (feedback_slots='slot1,...'), where these slots will be used to check condition (b): the transactions on logical standbys occurring before the replay of Node A's DELETE are replayed on Node A as well. Therefore, on Node B, users should specify the slots corresponding to Node A in this option. The apply worker will get the oldest confirmed flush LSN among the specified slots and send the LSN as a feedback message to the walsender. The new feedback message is sent only if feedback_slots is not NULL. If the slots in feedback_slots are removed, a final message containing InvalidXLogRecPtr will be sent to inform the walsender to forget about the slot.xmin. To detect update_deleted conflicts during update operations, if the target row cannot be found, we perform an additional scan of the table using snapshotAny. This scan aims to locate the most recently deleted row that matches the old column values from the remote update operation and has not yet been removed by VACUUM. If any such tuples are found, we report the update_deleted conflict along with the origin and transaction information that deleted the tuple. --- src/backend/replication/logical/logical.c | 26 ++++++++ src/backend/replication/walsender.c | 76 +++++++++++++++++++++++ src/include/replication/slot.h | 34 +++++++--- 3 files changed, 129 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3fe1774a1e..c42ae2b54e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1695,6 +1695,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) bool updated_xmin = false; ReplicationSlot *slot; bool got_new_xmin = false; + bool got_new_data_xmin = false; slot = MyReplicationSlot; @@ -1739,6 +1740,27 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ got_new_xmin = true; } + + /* + * Don't overwrite if we already have a newer xmin. This can happen if we + * restart decoding in a slot. + */ + if (TransactionIdPrecedesOrEquals(xmin, slot->effective_xmin)) + { + } + + /* + * Only increase if the previous values have been applied, otherwise we + * might never end up updating if the receiver acks too slowly. + */ + else if (slot->candidate_data_xmin_lsn == InvalidXLogRecPtr) + { + MyReplicationSlot->candidate_xmin = xmin; + MyReplicationSlot->candidate_data_xmin_lsn = current_lsn; + + got_new_data_xmin = true; + } + SpinLockRelease(&slot->mutex); if (got_new_xmin) @@ -1748,6 +1770,10 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) /* candidate already valid with the current flush position, apply */ if (updated_xmin) LogicalConfirmReceivedLocation(slot->data.confirmed_flush); + + if (got_new_data_xmin) + elog(DEBUG1, "got new data xmin %u at %X/%X", xmin, + LSN_FORMAT_ARGS(current_lsn)); } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c5f1009f37..9ee88b12f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2319,6 +2319,78 @@ ProcessRepliesIfAny(void) } } +/* + * Handle a consumer's confirmation that all changes up to the given LSN have + * been received, and that all changes occurring on the consumer side before + * the replay of these changes have been confirmed as flushed to the current + * node. + */ +static void +ProcessFSFeedbackMessage(void) +{ + XLogRecPtr flushPtr; + ReplicationSlot *slot = MyReplicationSlot; + TransactionId new_xmin; + + Assert(slot != NULL); + + flushPtr = pq_getmsgint64(&reply_message); + + if (XLogRecPtrIsInvalid(flushPtr)) + { + /* + * An invalid flush position indicates the end of the feedback, + * meaning we don't need to protect user data from being removed. + * Therefore, set new_xmin to InvalidTransactionId. + */ + new_xmin = InvalidTransactionId; + } + else if (!XLogRecPtrIsInvalid(slot->candidate_data_xmin_lsn) && + slot->candidate_data_xmin_lsn <= flushPtr && + TransactionIdIsValid(slot->candidate_xmin) && + slot->data.xmin != slot->candidate_xmin) + { + new_xmin = slot->candidate_xmin; + } + else + { + /* No candidate xmin can be used or the xmin is unchanged, so exit. */ + return; + } + + /* + * We have to write the changed xmin to disk *before* we change the + * in-memory value, otherwise after a crash we wouldn't know that some + * catalog tuples might have been removed already. + * + * Ensure that by first writing to ->xmin and only update ->effective_xmin + * once the new state is synced to disk. After a crash ->effective_xmin is + * set to ->xmin. + */ + SpinLockAcquire(&slot->mutex); + slot->data.xmin = new_xmin; + slot->candidate_data_xmin_lsn = InvalidXLogRecPtr; + slot->candidate_xmin = InvalidTransactionId; + SpinLockRelease(&slot->mutex); + + /* first write new xmin to disk, so we know what's up after a crash */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u", new_xmin); + + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only advance + * xmin here and there's not much harm done by a concurrent computation + * missing that. + */ + SpinLockAcquire(&MyReplicationSlot->mutex); + slot->effective_xmin = new_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); +} + /* * Process a status update message received from standby. */ @@ -2342,6 +2414,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'x': + ProcessFSFeedbackMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 45582cf9d8..f1d8875d40 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -166,13 +166,17 @@ typedef struct ReplicationSlot /* * For logical decoding, it's extremely important that we never remove any * data that's still needed for decoding purposes, even after a crash; - * otherwise, decoding will produce wrong answers. Ordinary streaming - * replication also needs to prevent old row versions from being removed - * too soon, but the worst consequence we might encounter there is - * unwanted query cancellations on the standby. Thus, for logical - * decoding, this value represents the latest xmin that has actually been - * written to disk, whereas for streaming replication, it's just the same - * as the persistent value (data.xmin). + * otherwise, decoding will produce wrong answers. Both logical + * replication conflict detection and ordinary streaming replication needs + * to prevent old row versions from being removed too soon. The worst + * consequence in ordinary streaming replication would be unwanted query + * cancellations on the standby. However, for logical conflict detection, + * it is essential to identify the origin and timestamp of old row + * versions to correctly detect and resolve conflicts. Otherwise, it could + * cause data inconsistency between nodes. For logical decoding and + * replication, this value represents the latest xmin that has actually + * been written to disk, whereas for streaming replication, it's just the + * same as the persistent value (data.xmin). */ TransactionId effective_xmin; TransactionId effective_catalog_xmin; @@ -198,6 +202,22 @@ typedef struct ReplicationSlot XLogRecPtr candidate_restart_valid; XLogRecPtr candidate_restart_lsn; + /* + * If the client has sent feedback confirming that the WAL position + * flushed to a remote node (corresponding to the feedback slot on the + * client) is greater than or equal to the candidate_data_xmin_lsn, we can + * advance the xmin. + * + * This mechanism is used for conflict detection in a bidirectional + * logical replication cluster. It ensures that dead tuples cannot be + * cleaned by VACUUM until the DELETE operations that occurred on the + * local node have been replayed on the subscriber, and any changes on + * remote nodes that occurred before the replay of these DELETE operations + * are also replayed locally. + */ + TransactionId candidate_xmin; + XLogRecPtr candidate_data_xmin_lsn; + /* * This value tracks the last confirmed_flush LSN flushed which is used * during a shutdown checkpoint to decide if logical's slot data should be -- 2.30.0.windows.2