diff --git a/doc/src/sgml/ref/drop_subscription.sgml b/doc/src/sgml/ref/drop_subscription.sgml
index 9f2fb93..17235ad 100644
--- a/doc/src/sgml/ref/drop_subscription.sgml
+++ b/doc/src/sgml/ref/drop_subscription.sgml
@@ -38,8 +38,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] name
- The replication worker associated with the subscription will not stop until
- after the transaction that issued this command has committed.
+ DROP SUBSCRIPTION cannot be executed inside a
+ transaction block.
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ab21e64..c966acc 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -18,6 +18,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
+#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
@@ -38,6 +39,7 @@
#include "utils/builtins.h"
#include "utils/memutils.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
/*
@@ -424,7 +426,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
* Drop a subscription
*/
void
-DropSubscription(DropSubscriptionStmt *stmt)
+DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -441,6 +443,12 @@ DropSubscription(DropSubscriptionStmt *stmt)
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
+ /*
+ * We cannot run DROP SUBSCRIPTION inside a user transaction
+ * block.
+ */
+ PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
+
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
@@ -508,6 +516,20 @@ DropSubscription(DropSubscriptionStmt *stmt)
/* Clean up dependencies */
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+ /* Save the origin */
+ snprintf(originname, sizeof(originname), "pg_%u", subid);
+ originid = replorigin_by_name(originname, true);
+
+ heap_close(rel, NoLock);
+
+ /*
+ * Commit transcation and start new transaction just after
+ * drop subscription.
+ */
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+ StartTransactionCommand();
+
/* Protect against launcher restarting the worker. */
LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
@@ -516,18 +538,13 @@ DropSubscription(DropSubscriptionStmt *stmt)
LWLockRelease(LogicalRepLauncherLock);
- /* Remove the origin tracking if exists. */
- snprintf(originname, sizeof(originname), "pg_%u", subid);
- originid = replorigin_by_name(originname, true);
+ /* Drop the origin trancking if exists */
if (originid != InvalidRepOriginId)
replorigin_drop(originid);
/* If the user asked to not drop the slot, we are done mow.*/
if (!stmt->drop_slot)
- {
- heap_close(rel, NoLock);
return;
- }
/*
* Otherwise drop the replication slot at the publisher node using
@@ -558,8 +575,6 @@ DropSubscription(DropSubscriptionStmt *stmt)
walrcv_disconnect(wrconn);
pfree(cmd.data);
-
- heap_close(rel, NoLock);
}
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 5d3be38..26def04 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1616,7 +1616,7 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_DropSubscriptionStmt:
- DropSubscription((DropSubscriptionStmt *) parsetree);
+ DropSubscription((DropSubscriptionStmt *) parsetree, isTopLevel);
/* no commands stashed for DROP */
commandCollected = true;
break;
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 127696c..e3f83db 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -20,7 +20,7 @@
extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt);
extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
-extern void DropSubscription(DropSubscriptionStmt *stmt);
+extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);