diff --git a/doc/src/sgml/ref/drop_subscription.sgml b/doc/src/sgml/ref/drop_subscription.sgml index 9f2fb93..17235ad 100644 --- a/doc/src/sgml/ref/drop_subscription.sgml +++ b/doc/src/sgml/ref/drop_subscription.sgml @@ -38,8 +38,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] name - The replication worker associated with the subscription will not stop until - after the transaction that issued this command has committed. + DROP SUBSCRIPTION cannot be executed inside a + transaction block. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index ab21e64..c966acc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -18,6 +18,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/xact.h" #include "catalog/indexing.h" #include "catalog/objectaccess.h" @@ -38,6 +39,7 @@ #include "utils/builtins.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" /* @@ -424,7 +426,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) * Drop a subscription */ void -DropSubscription(DropSubscriptionStmt *stmt) +DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { Relation rel; ObjectAddress myself; @@ -441,6 +443,12 @@ DropSubscription(DropSubscriptionStmt *stmt) WalReceiverConn *wrconn = NULL; StringInfoData cmd; + /* + * We cannot run DROP SUBSCRIPTION inside a user transaction + * block. + */ + PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, @@ -508,6 +516,20 @@ DropSubscription(DropSubscriptionStmt *stmt) /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); + /* Save the origin */ + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, true); + + heap_close(rel, NoLock); + + /* + * Commit transcation and start new transaction just after + * drop subscription. + */ + PopActiveSnapshot(); + CommitTransactionCommand(); + StartTransactionCommand(); + /* Protect against launcher restarting the worker. */ LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); @@ -516,18 +538,13 @@ DropSubscription(DropSubscriptionStmt *stmt) LWLockRelease(LogicalRepLauncherLock); - /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); - originid = replorigin_by_name(originname, true); + /* Drop the origin trancking if exists */ if (originid != InvalidRepOriginId) replorigin_drop(originid); /* If the user asked to not drop the slot, we are done mow.*/ if (!stmt->drop_slot) - { - heap_close(rel, NoLock); return; - } /* * Otherwise drop the replication slot at the publisher node using @@ -558,8 +575,6 @@ DropSubscription(DropSubscriptionStmt *stmt) walrcv_disconnect(wrconn); pfree(cmd.data); - - heap_close(rel, NoLock); } /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 5d3be38..26def04 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1616,7 +1616,7 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_DropSubscriptionStmt: - DropSubscription((DropSubscriptionStmt *) parsetree); + DropSubscription((DropSubscriptionStmt *) parsetree, isTopLevel); /* no commands stashed for DROP */ commandCollected = true; break; diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 127696c..e3f83db 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -20,7 +20,7 @@ extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt); extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt); -extern void DropSubscription(DropSubscriptionStmt *stmt); +extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);