diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 744c5e8..5a0e467 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3474,6 +3474,21 @@ ANY num_sync ( + table_sync_retry_interval (integer) + + table_sync_retry_interval configuration parameter + + + + + Specify how long the subscriber should wait before retrying to copy the initial + data after a failed attempt. The default is 5 seconds. Units + are milliseconds if not specified. + + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index f5ba9f6..573e23f 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -480,6 +480,7 @@ GetSubscriptionNotReadyRelations(Oid subid) relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; relstate->lsn = subrel->srsublsn; + relstate->last_start_time = 0; res = lappend(res, relstate); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7ba239c..1c260d9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -58,6 +58,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; +int table_sync_retry_interval = 5000; LogicalRepWorker *MyLogicalRepWorker = NULL; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d1f2734..e7019fb 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -237,7 +237,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * * If there are tables that need synchronizing and are not being synchronized * yet, start sync workers for them (if there are free slots for sync - * workers). + * workers). To prevent starting the sync worker for the same relation at a + * high frequency after fails, we store its last start time to each sync + * state info. We start the sync worker for the same relation at intervals + * of table_sync_retry_interval. * * For tables that are being synchronized already, check if sync workers * either need action from the apply worker or have finished. @@ -265,11 +268,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) { MemoryContext oldctx; List *rstates; + List *old_table_states; ListCell *lc; SubscriptionRelState *rstate; - /* Clean the old list. */ - list_free_deep(table_states); + /* Save old sync state info list, clean the old list later */ + old_table_states = table_states; table_states = NIL; StartTransactionCommand(); @@ -281,14 +285,43 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) oldctx = MemoryContextSwitchTo(CacheMemoryContext); foreach(lc, rstates) { + ListCell *old_lc; + ListCell *old_prev = NULL; + ListCell *old_next = NULL; + rstate = palloc(sizeof(SubscriptionRelState)); memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + + /* + * Iterate old sync state info to take over last_start_time + * to new entries. + */ + for (old_lc = list_head(old_table_states); old_lc != NULL; old_lc = old_next) + { + SubscriptionRelState *s = (SubscriptionRelState *) lfirst(old_lc); + + old_next = lnext(old_lc); + + /* Found the sync state info, take over last_start time */ + if (rstate->relid == s->relid) + { + rstate->last_start_time = s->last_start_time; + old_table_states = list_delete_cell(old_table_states, old_lc, old_prev); + break; + } + + old_prev = old_lc; + } + table_states = lappend(table_states, rstate); } MemoryContextSwitchTo(oldctx); CommitTransactionCommand(); + /* Clean old list */ + list_free_deep(old_table_states); + table_states_valid = true; } @@ -395,11 +428,20 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid); + TimestampTz now; + + now = GetCurrentTimestamp(); + + if (TimestampDifferenceExceeds(rstate->last_start_time, now, + table_sync_retry_interval)) + { + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + rstate->last_start_time = now; + } } } } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9ad8361..8bca324 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2623,6 +2623,18 @@ static struct config_int ConfigureNamesInt[] = }, { + {"table_sync_retry_interval", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Sets the time to wait before retrying to launch table " + "synchronization worker after a failed attempt."), + NULL, + GUC_UNIT_MS + }, + &table_sync_retry_interval, + 5000, 1, INT_MAX, + NULL, NULL, NULL + }, + + { {"wal_retrieve_retry_interval", PGC_SIGHUP, REPLICATION_STANDBY, gettext_noop("Sets the time to wait before retrying to retrieve WAL " "after a failed attempt."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 1435d92..632aa08 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -271,6 +271,8 @@ # in milliseconds; 0 disables #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt +#table_sync_retry_interval = 5s # time to wait before retyring to + # synchronize table after a failed attempt # - Subscribers - diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9f4f152..d8264f8 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -66,6 +66,7 @@ typedef struct SubscriptionRelState Oid relid; XLogRecPtr lsn; char state; + TimestampTz last_start_time; } SubscriptionRelState; extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 060946a..e9ea7da 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -14,6 +14,7 @@ extern int max_logical_replication_workers; extern int max_sync_workers_per_subscription; +extern int table_sync_retry_interval; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg);