From 42e325e2d833fb2dfebade4cbcca4827ab967976 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 2 Sep 2024 17:40:56 +0800 Subject: [PATCH v21 4/6] Support the conflict detection for update_deleted This patch supports detecting update_deleted conflicts during update operations. If the target row cannot be found when applying update operations, we perform an additional scan of the table using snapshotAny. This scan aims to locate the most recently deleted row that matches the old column values from the remote update operation and has not yet been removed by VACUUM. If any such tuples are found, we report the update_deleted conflict along with the origin and transaction information that deleted the tuple. --- src/backend/catalog/system_views.sql | 1 + src/backend/executor/execReplication.c | 136 ++++++++++++++++++++- src/backend/replication/logical/conflict.c | 22 ++++ src/backend/replication/logical/worker.c | 73 ++++++++--- src/backend/utils/adt/pgstatfuncs.c | 10 +- src/include/catalog/pg_proc.dat | 6 +- src/include/executor/executor.h | 6 +- src/include/replication/conflict.h | 3 + src/test/regress/expected/rules.out | 3 +- 9 files changed, 227 insertions(+), 33 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 762f397cc7..3fa2b2eb65 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1369,6 +1369,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, + ss.confl_update_deleted, ss.confl_delete_origin_differs, ss.confl_delete_missing, ss.stats_reset diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 1086cbc962..d19266b305 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,11 +14,13 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -26,6 +28,7 @@ #include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -35,7 +38,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Returns the fixed strategy number, if any, of the equality operator for the @@ -264,7 +267,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -323,7 +326,7 @@ retry: */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -348,6 +351,9 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, if (att->attisdropped || att->attgenerated) continue; + if (columns && !bms_is_member(att->attnum, columns)) + continue; + /* * If one value is NULL and other is not, then they are certainly not * equal @@ -423,7 +429,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -474,6 +480,128 @@ retry: return found; } +/* + * Build a bitmap that includes the column numbers for the provided index. + */ +static Bitmapset * +build_index_column_bitmap(Oid indexoid) +{ + IndexInfo *indexinfo; + Bitmapset *index_bitmap = NULL; + Relation idxrel; + + Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); + + /* Open the index. */ + idxrel = index_open(indexoid, NoLock); + + indexinfo = BuildIndexInfo(idxrel); + + for (int i = 0; i < indexinfo->ii_NumIndexAttrs; i++) + { + int keycol = indexinfo->ii_IndexAttrNumbers[i]; + + index_bitmap = bms_add_member(index_bitmap, keycol); + } + + index_close(idxrel, NoLock); + + return index_bitmap; +} + +/* + * Search the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. + * + * The commit timestamp of the transaction that deleted the tuple is used to + * determine whether the tuple is the most recently deleted one. + * + * This operation can be quite slow on tables with a large number of rows. + * However, it is primarily used in rare conflict cases where the target row + * for an update cannot be found. + */ +bool +FindMostRecentlyDeletedTupleInfo(Relation rel, TupleTableSlot *searchslot, + Oid localindexoid, + TransactionId *delete_xid, + TimestampTz *delete_time, + RepOriginId *delete_origin) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + bool found; + TransactionId oldestXmin; + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + Bitmapset *indexbitmap = NULL; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_time = 0; + *delete_origin = InvalidRepOriginId; + + oldestXmin = GetOldestNonRemovableTransactionId(rel); + + if (OidIsValid(localindexoid)) + indexbitmap = build_index_column_bitmap(localindexoid); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* Start a heap scan. */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + hslot = (BufferHeapTupleTableSlot *) scanslot; + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + bool dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + if (HeapTupleSatisfiesVacuum(tuple, oldestXmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!dead) + continue; + + xmax = HeapTupleHeaderGetRawXmax(tuple->t_data); + TransactionIdGetCommitTsData(xmax, &localts, &localorigin); + + /* Select the dead tuple with the most recent commit timestamp */ + if (*delete_time == 0 || + TimestampDifferenceExceeds(*delete_time, localts, 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + found = true; + } + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return found; +} + /* * Find the tuple that violates the passed unique index (conflictindex). * diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 5d9ff626bd..cd39727932 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -28,6 +28,7 @@ static const char *const ConflictTypeNames[] = { [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", + [CT_UPDATE_DELETED] = "update_deleted", [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs", [CT_DELETE_MISSING] = "delete_missing" }; @@ -172,6 +173,7 @@ errcode_apply_conflict(ConflictType type) return errcode(ERRCODE_UNIQUE_VIOLATION); case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: + case CT_UPDATE_DELETED: case CT_DELETE_ORIGIN_DIFFERS: case CT_DELETE_MISSING: return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE); @@ -259,6 +261,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, appendStringInfo(&err_detail, _("Could not find the row to be updated.")); break; + case CT_UPDATE_DELETED: + if (localts) + { + if (localorigin == InvalidRepOriginId) + appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."), + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin that modified this row has been removed. */ + else + appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + } + else + appendStringInfo(&err_detail, _("The row to be updated was deleted.")); + + break; + case CT_DELETE_ORIGIN_DIFFERS: if (localorigin == InvalidRepOriginId) appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f4a31ef534..286f5ec257 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -139,6 +139,24 @@ * failover = true when creating the subscription. Enabling failover allows us * to smoothly transition to the promoted standby, ensuring that we can * subscribe to the new primary without losing any data. + * + * CONFLICT DETECTION (update_deleted) + * ---------------------- + * When applying an update operation, if the target row cannot be found, we + * scan the table again to locate the most recently deleted row that matches + * the old column values of the remote update operation and is not yet + * removable by VACUUM. + * + * To detect the update_deleted conflict stably and correctly in a + * bidirectional cluster, we must ensure that dead tuples cannot be cleaned by + * VACUUM until the DELETE operations that occurred on the local node have been + * replayed on the subscriber. Additionally, any changes on remote nodes that + * occurred before the replay of these DELETE operations must also be replayed + * locally. + * + * Therefore, it is necessary to add the replication slot name corresponding to + * the publisher (e.g., the slot acquired by the walsender that sends changes + * back to the publisher) in the feedback_slots options. *------------------------------------------------------------------------- */ @@ -2672,6 +2690,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, TupleTableSlot *localslot; bool found; MemoryContext oldctx; + RepOriginId localorigin; + TransactionId localxid; + TimestampTz localts; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); ExecOpenIndices(relinfo, true); @@ -2688,15 +2709,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + if (GetTupleTransactionInfo(localslot, &localxid, &localorigin, &localts) && localorigin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -2707,7 +2724,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, localslot, newslot, - InvalidOid, localxmin, localorigin, localts); + InvalidOid, localxid, localorigin, localts); } /* Process and store remote tuple in the slot */ @@ -2726,19 +2743,27 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + ConflictType type; TupleTableSlot *newslot = localslot; + Oid replica_index = GetRelationIdentityOrPK(localrel); + + if (FindMostRecentlyDeletedTupleInfo(localrel, remoteslot, + replica_index, &localxid, + &localts, &localorigin)) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found or was deleted. Do + * nothing except for emitting a log message. */ - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, + ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, NULL, newslot, - InvalidOid, InvalidTransactionId, - InvalidRepOriginId, 0); + InvalidOid, localxid, localorigin, localts); } /* Cleanup. */ @@ -3059,7 +3084,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, bool found; EPQState epqstate; RepOriginId localorigin; - TransactionId localxmin; + TransactionId localxid; TimestampTz localts; /* Get the matching local tuple from the partition. */ @@ -3069,17 +3094,25 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + ConflictType type; TupleTableSlot *newslot = localslot; + Oid replica_index = GetRelationIdentityOrPK(partrel); + + if (FindMostRecentlyDeletedTupleInfo(partrel, remoteslot_part, + replica_index, &localxid, + &localts, &localorigin)) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found or was + * deleted. Do nothing except for emitting a log message. */ - ReportApplyConflict(estate, partrelinfo, - LOG, CT_UPDATE_MISSING, + ReportApplyConflict(estate, partrelinfo, LOG, type, remoteslot_part, NULL, newslot, InvalidOid, InvalidTransactionId, InvalidRepOriginId, 0); @@ -3091,7 +3124,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a * different origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && + if (GetTupleTransactionInfo(localslot, &localxid, &localorigin, &localts) && localorigin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -3102,7 +3135,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot_part, localslot, newslot, - InvalidOid, localxmin, localorigin, + InvalidOid, localxid, localorigin, localts); } @@ -4048,8 +4081,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) * position have been flushed to the remote node corresponding to the feedback * slots. In this context, the remote node indicates the publisher, as the * feedback is currently used only in a bidirectional cluster to preserve old - * row versions for conflict detection purposes (see the comments for - * ReplicationSlot in slot.h for details). + * row versions for conflict detection purposes (see the comments atop worker.c + * for details). * * If feedback_slots is NULL and a status might have already been sent to * update the xmin value of the slot, an InvalidXLogRecPtr is sent. This diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 97dc09ac0d..aa4a5fb755 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1966,7 +1966,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -1994,11 +1994,13 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index ff5436acac..a5a7ee499d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5538,9 +5538,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_update_deleted,confl_delete_origin_differs,confl_delete_missing,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 69c3ebff00..bb04ef86c2 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -665,7 +665,11 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); - +extern bool FindMostRecentlyDeletedTupleInfo(Relation rel, TupleTableSlot *searchslot, + Oid localindexoid, + TransactionId *delete_xid, + TimestampTz *delete_time, + RepOriginId *delete_origin); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index c759677ff5..d2d3e2f064 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -35,6 +35,9 @@ typedef enum /* The row to be updated is missing */ CT_UPDATE_MISSING, + /* The row to be updated is deleted */ + CT_UPDATE_DELETED, + /* The row to be deleted was modified by a different origin */ CT_DELETE_ORIGIN_DIFFERS, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index a1626f3fae..569be4ed04 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2143,11 +2143,12 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, + ss.confl_update_deleted, ss.confl_delete_origin_differs, ss.confl_delete_missing, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_update_deleted, confl_delete_origin_differs, confl_delete_missing, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, -- 2.30.0.windows.2