From 624f31ab769e137999a739f400b36af939319c4f Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 3 Feb 2023 10:55:55 +0800 Subject: [PATCH] Avoid dropping origins from both apply and tablesync worker Currently, both the apply worker and tablesync worker try to drop the origin. This can cause the synchronization to take longer due to lock contention. Previously, we allowed the apply worker to drop the origin to avoid the case that the tablesync worker fails to the origin(due to crash). In this case we don't restart the tablesync worker, and the apply worker can clean the origin. To improve this, we introduce a new relstate SUBREL_STATE_PRE_SYNCDONE which will be set after synchronization finished in front of apply (sublsn set), but before dropping the origin and other final cleanups. The apply worker will restart tablesync worker if the relstate is SUBREL_STATE_PRE_SYNCDONE. This way, even if the tablesync worker error out in the transaction that tries to drop the origin, the apply worker will restart the tablesync worker to redo the cleanup(for origin and other stuff) and then directly exit. --- doc/src/sgml/catalogs.sgml | 7 +- src/backend/commands/subscriptioncmds.c | 25 ++-- src/backend/replication/logical/tablesync.c | 193 +++++++++++++++------------- src/backend/replication/logical/worker.c | 3 +- src/include/catalog/pg_subscription_rel.h | 6 +- 5 files changed, 125 insertions(+), 109 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index c1e4048..3d9491a 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8071,7 +8071,8 @@ SCRAM-SHA-256$<iteration count>:&l i = initialize, d = data is being copied, f = finished table copy, - s = synchronized, + p = synchronized but not yet cleaned up, + s = synchronization done, r = ready (normal replication) @@ -8082,8 +8083,8 @@ SCRAM-SHA-256$<iteration count>:&l Remote LSN of the state change used for synchronization coordination - when in s or r states, - otherwise null + when in p, s or + r states, otherwise null diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 464db6d..a53f9d1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -929,10 +929,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, logicalrep_worker_stop(sub->oid, relid); /* - * For READY state, we would have already dropped the - * tablesync origin. + * For READY state and SYNCDONE state, we would have already + * dropped the tablesync origin. */ - if (state != SUBREL_STATE_READY) + if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE) { char originname[NAMEDATALEN]; @@ -940,11 +940,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Drop the tablesync's origin tracking if exists. * * It is possible that the origin is not yet created for - * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or - * apply worker can also concurrently try to drop the - * origin and by this time the origin might be already - * removed. For these reasons, passing missing_ok = true. + * tablesync worker so passing missing_ok = true. This can + * happen for the states before SUBREL_STATE_FINISHEDCOPY. */ ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, sizeof(originname)); @@ -1536,13 +1533,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* * Drop the tablesync's origin tracking if exists. * + * For SYNCDONE/READY states, the tablesync origin tracking is known + * to have already been dropped by the tablesync worker. + * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForLogicalRep(subid, relid, originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); + if (rstate->state != SUBREL_STATE_SYNCDONE) + { + ReplicationOriginNameForLogicalRep(subid, relid, originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + } } /* Clean up dependencies */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 07eea50..9762ff5 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -274,6 +274,82 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) } /* + * Update the state of the table to SUBREL_STATE_SYNCDONE and cleanup the + * tablesync slot and drop the tablesync's origin tracking. + */ +static void +finish_synchronization(bool restart_after_crash) +{ + char syncslotname[NAMEDATALEN] = {0}; + char originname[NAMEDATALEN] = {0}; + + StartTransactionCommand(); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_PRE_SYNCDONE); + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + /* + * Resetting the origin session removes the ownership of the slot. This is + * needed to allow the origin to be dropped. But note that if the tablesync + * worker restarts after crash. we don't need to reset the origin because + * it has not been setup yet. + */ + if (!restart_after_crash) + { + replorigin_session_reset(); + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + } + + /* + * We expect that origin must be present. The concurrent operations + * that remove origin like a refresh for the subscription take an + * access exclusive lock on pg_subscription which prevent the previous + * operation to update the rel state to SUBREL_STATE_SYNCDONE to + * succeed. + */ + replorigin_drop_by_name(originname, false, false); + + /* + * Cleanup the tablesync slot. + * + * This has to be done after updating the state because otherwise if + * there is an error while doing the database operations we won't be + * able to rollback dropped slot. + */ + ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + syncslotname, + sizeof(syncslotname)); + + /* + * Normally, It is important to give an error if we are unable to drop the + * slot, otherwise, it won't be dropped till the corresponding subscription + * is dropped. So passing missing_ok = false. But if the tablesync worker + * restarts after crash, the slot may have been dropped, so we allow + * missing_ok = true for the drop. + */ + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, + restart_after_crash); + + finish_sync_worker(); +} + + +/* * Handle table synchronization cooperation from the synchronization * worker. * @@ -284,18 +360,15 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { + TimeLineID tli; + SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) { - TimeLineID tli; - char syncslotname[NAMEDATALEN] = {0}; - char originname[NAMEDATALEN] = {0}; - - MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; + MyLogicalRepWorker->relstate = SUBREL_STATE_PRE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; - SpinLockRelease(&MyLogicalRepWorker->relmutex); /* @@ -304,80 +377,26 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) if (!IsTransactionState()) StartTransactionCommand(); + /* + * Set the state to PRE_SYNCDONE so that if the an error occurs before + * setting the state to SYNCDONE the restarted tablesync worker can + * exit via the fast path without starting streaming again. + */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); - /* - * End streaming so that LogRepWorkerWalRcvConn can be used to drop - * the slot. - */ - walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); - - /* - * Cleanup the tablesync slot. - * - * This has to be done after updating the state because otherwise if - * there is an error while doing the database operations we won't be - * able to rollback dropped slot. - */ - ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - syncslotname, - sizeof(syncslotname)); - - /* - * It is important to give an error if we are unable to drop the slot, - * otherwise, it won't be dropped till the corresponding subscription - * is dropped. So passing missing_ok = false. - */ - ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); - CommitTransactionCommand(); pgstat_report_stat(false); /* - * Start a new transaction to clean up the tablesync origin tracking. - * This transaction will be ended within the finish_sync_worker(). - * Now, even, if we fail to remove this here, the apply worker will - * ensure to clean it up afterward. - * - * We need to do this after the table state is set to SYNCDONE. - * Otherwise, if an error occurs while performing the database - * operation, the worker will be restarted and the in-memory state of - * replication progress (remote_lsn) won't be rolled-back which would - * have been cleared before restart. So, the restarted worker will use - * invalid replication progress state resulting in replay of - * transactions that have already been applied. - */ - StartTransactionCommand(); - - ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - - /* - * Resetting the origin session removes the ownership of the slot. - * This is needed to allow the origin to be dropped. - */ - replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; - - /* - * Drop the tablesync's origin tracking if exists. - * - * There is a chance that the user is concurrently performing refresh - * for the subscription where we remove the table state and its origin - * or the apply worker would have removed this origin. So passing - * missing_ok = true. + * End streaming so that LogRepWorkerWalRcvConn can be used to drop + * the slot. */ - replorigin_drop_by_name(originname, true, false); + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); - finish_sync_worker(); + finish_synchronization(false); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -463,8 +482,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ if (current_lsn >= rstate->lsn) { - char originname[NAMEDATALEN]; - rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) @@ -473,26 +490,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; } - /* - * Remove the tablesync origin tracking if exists. - * - * There is a chance that the user is concurrently performing - * refresh for the subscription where we remove the table - * state and its origin or the tablesync worker would have - * already removed this origin. We can't rely on tablesync - * worker to remove the origin tracking as if there is any - * error while dropping we won't restart it to drop the - * origin. So passing missing_ok = true. - */ - ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); - replorigin_drop_by_name(originname, true, false); - - /* - * Update the state to READY only after the origin cleanup. - */ + /* Update the state to READY. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); @@ -1283,7 +1281,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || - MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); + MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY || + MyLogicalRepWorker->relstate == SUBREL_STATE_PRE_SYNCDONE); /* Assign the origin tracking record name. */ ReplicationOriginNameForLogicalRep(MySubscription->oid, @@ -1327,6 +1326,16 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) goto copy_table_done; } + else if (MyLogicalRepWorker->relstate == SUBREL_STATE_PRE_SYNCDONE) + { + /* + * The table synchronization has finished in front of apply (sublsn + * set), but the tablesync worker then crashed before setting the state + * to SYNCDONE. So, we only need to perform the final cleanup, set the + * state to SYNCDONE, then exit. + */ + finish_synchronization(true); + } SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cfb2ab6..9629372 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -510,7 +510,8 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) } else return (rel->state == SUBREL_STATE_READY || - (rel->state == SUBREL_STATE_SYNCDONE && + ((rel->state == SUBREL_STATE_PRE_SYNCDONE || + rel->state == SUBREL_STATE_SYNCDONE) && rel->statelsn <= remote_final_lsn)); } diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 60a2bcc..99a4e09 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -62,8 +62,10 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, Subsc * NULL) */ #define SUBREL_STATE_FINISHEDCOPY 'f' /* tablesync copy phase is completed * (sublsn NULL) */ -#define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of - * apply (sublsn set) */ +#define SUBREL_STATE_PRE_SYNCDONE 'p' /* synchronization finished in front of + * apply (sublsn set), but the final + * cleanup has not yet been performed */ +#define SUBREL_STATE_SYNCDONE 's' /* synchronization complete */ #define SUBREL_STATE_READY 'r' /* ready (sublsn set) */ /* These are never stored in the catalog, we only use them for IPC. */ -- 2.7.2.windows.1