diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ab5f371..695b70a 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -230,10 +230,12 @@ textarray_to_stringlist(ArrayType *textarray) * The insert-or-update logic in this function is not concurrency safe so it * might raise an error in rare circumstances. But if we took a stronger lock * such as ShareRowExclusiveLock, we would risk more deadlocks. + * If insert_ok is true and the record for given table doesn't exist we insert + * new record. */ Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) + XLogRecPtr sublsn, bool insert_ok) { Relation rel; HeapTuple tup; @@ -249,10 +251,10 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, ObjectIdGetDatum(subid)); /* - * If the record for given table does not exist yet create new record, - * otherwise update the existing one. + * If the record for given table does not exist yet and insertion is requested + * create new record, otherwise update the existing one. */ - if (!HeapTupleIsValid(tup)) + if (!HeapTupleIsValid(tup) && insert_ok) { /* Form the tuple. */ memset(values, 0, sizeof(values)); @@ -272,7 +274,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, heap_freetuple(tup); } - else + else if (HeapTupleIsValid(tup)) { bool replaces[Natts_pg_subscription_rel]; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 86eb31d..f6d8e83 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -435,8 +435,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); + /* Insert new entry if entry doesn't exist, otherwise update it */ SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + InvalidXLogRecPtr, true); } ereport(NOTICE, @@ -557,9 +558,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) if (!bsearch(&relid, subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp)) { + /* Insert entry if the entry doesn't exist, otherwise update it */ SetSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); + InvalidXLogRecPtr, true); ereport(NOTICE, (errmsg("added subscription for table %s.%s", quote_identifier(rv->schemaname), diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index fe45fb8..6280c95 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -228,10 +228,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* Just update the subscription relation state */ SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -360,9 +362,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; } + /* Just update the subscription relation state */ SetSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, false); } } else @@ -763,7 +766,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) StartTransactionCommand(); relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, - &relstate_lsn, false); + &relstate_lsn, true); CommitTransactionCommand(); SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -808,7 +811,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); pgstat_report_stat(false); @@ -882,11 +886,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate); if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP) { - /* Update the new state. */ + /* Update the new state, not insert new entry */ SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + false); finish_sync_worker(); } break; @@ -896,6 +901,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Nothing to do here but finish. */ finish_sync_worker(); break; + case SUBREL_STATE_UNKNOWN: + ereport(LOG, + (errmsg("relation status of subscription table %u in subscription %u " + "might have been removed before starting", + MyLogicalRepWorker->relid, + MyLogicalRepWorker->subid))); + finish_sync_worker(); + break; default: elog(ERROR, "unknown relation state \"%c\"", MyLogicalRepWorker->relstate); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 391f96b..dbc2d16 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -71,7 +71,7 @@ typedef struct SubscriptionRelState } SubscriptionRelState; extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn); + XLogRecPtr sublsn, bool insert_ok); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid);