diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 744c5e8..5a0e467 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3474,6 +3474,21 @@ ANY num_sync (
+ table_sync_retry_interval (integer)
+
+ table_sync_retry_interval> configuration parameter
+
+
+
+
+ Specify how long the subscriber should wait before retrying to copy the initial
+ data after a failed attempt. The default is 5 seconds. Units
+ are milliseconds if not specified.
+
+
+
+
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index f5ba9f6..573e23f 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -480,6 +480,7 @@ GetSubscriptionNotReadyRelations(Oid subid)
relstate->relid = subrel->srrelid;
relstate->state = subrel->srsubstate;
relstate->lsn = subrel->srsublsn;
+ relstate->last_start_time = 0;
res = lappend(res, relstate);
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7ba239c..1c260d9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -58,6 +58,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
+int table_sync_retry_interval = 5000;
LogicalRepWorker *MyLogicalRepWorker = NULL;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d1f2734..e7019fb 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -237,7 +237,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*
* If there are tables that need synchronizing and are not being synchronized
* yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers). To prevent starting the sync worker for the same relation at a
+ * high frequency after fails, we store its last start time to each sync
+ * state info. We start the sync worker for the same relation at intervals
+ * of table_sync_retry_interval.
*
* For tables that are being synchronized already, check if sync workers
* either need action from the apply worker or have finished.
@@ -265,11 +268,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
{
MemoryContext oldctx;
List *rstates;
+ List *old_table_states;
ListCell *lc;
SubscriptionRelState *rstate;
- /* Clean the old list. */
- list_free_deep(table_states);
+ /* Save old sync state info list, clean the old list later */
+ old_table_states = table_states;
table_states = NIL;
StartTransactionCommand();
@@ -281,14 +285,43 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach(lc, rstates)
{
+ ListCell *old_lc;
+ ListCell *old_prev = NULL;
+ ListCell *old_next = NULL;
+
rstate = palloc(sizeof(SubscriptionRelState));
memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+
+ /*
+ * Iterate old sync state info to take over last_start_time
+ * to new entries.
+ */
+ for (old_lc = list_head(old_table_states); old_lc != NULL; old_lc = old_next)
+ {
+ SubscriptionRelState *s = (SubscriptionRelState *) lfirst(old_lc);
+
+ old_next = lnext(old_lc);
+
+ /* Found the sync state info, take over last_start time */
+ if (rstate->relid == s->relid)
+ {
+ rstate->last_start_time = s->last_start_time;
+ old_table_states = list_delete_cell(old_table_states, old_lc, old_prev);
+ break;
+ }
+
+ old_prev = old_lc;
+ }
+
table_states = lappend(table_states, rstate);
}
MemoryContextSwitchTo(oldctx);
CommitTransactionCommand();
+ /* Clean old list */
+ list_free_deep(old_table_states);
+
table_states_valid = true;
}
@@ -395,11 +428,20 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid);
+ TimestampTz now;
+
+ now = GetCurrentTimestamp();
+
+ if (TimestampDifferenceExceeds(rstate->last_start_time, now,
+ table_sync_retry_interval))
+ {
+ logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid);
+ rstate->last_start_time = now;
+ }
}
}
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9ad8361..8bca324 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2623,6 +2623,18 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"table_sync_retry_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+ gettext_noop("Sets the time to wait before retrying to launch table "
+ "synchronization worker after a failed attempt."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &table_sync_retry_interval,
+ 5000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_retrieve_retry_interval", PGC_SIGHUP, REPLICATION_STANDBY,
gettext_noop("Sets the time to wait before retrying to retrieve WAL "
"after a failed attempt."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 1435d92..632aa08 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -271,6 +271,8 @@
# in milliseconds; 0 disables
#wal_retrieve_retry_interval = 5s # time to wait before retrying to
# retrieve WAL after a failed attempt
+#table_sync_retry_interval = 5s # time to wait before retyring to
+ # synchronize table after a failed attempt
# - Subscribers -
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f4f152..d8264f8 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -66,6 +66,7 @@ typedef struct SubscriptionRelState
Oid relid;
XLogRecPtr lsn;
char state;
+ TimestampTz last_start_time;
} SubscriptionRelState;
extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 060946a..e9ea7da 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -14,6 +14,7 @@
extern int max_logical_replication_workers;
extern int max_sync_workers_per_subscription;
+extern int table_sync_retry_interval;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);