From dc2e9acc6a55772896117cf3b88e4189f994a82d Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Fri, 27 Jan 2023 15:17:09 +0530
Subject: [PATCH v2] Lock the replication origin record instead of locking the
 pg_replication_origin relation.

Lock the replication origin record instead of locking the
pg_replication_origin relation.
---
 src/backend/replication/logical/origin.c | 56 ++++++++++++------------
 1 file changed, 27 insertions(+), 29 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index b754c43840..3e360cf41e 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -338,16 +338,14 @@ replorigin_create(const char *roname)
  * Helper function to drop a replication origin.
  */
 static void
-replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
+replorigin_state_clear(RepOriginId roident, bool nowait)
 {
-	HeapTuple	tuple;
 	int			i;
 
 	/*
-	 * First, clean up the slot state info, if there is any matching slot.
+	 * Clean up the slot state info, if there is any matching slot.
 	 */
 restart:
-	tuple = NULL;
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
 	for (i = 0; i < max_replication_slots; i++)
@@ -402,19 +400,6 @@ restart:
 	}
 	LWLockRelease(ReplicationOriginLock);
 	ConditionVariableCancelSleep();
-
-	/*
-	 * Now, we can delete the catalog entry.
-	 */
-	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
-	if (!HeapTupleIsValid(tuple))
-		elog(ERROR, "cache lookup failed for replication origin with ID %d",
-			 roident);
-
-	CatalogTupleDelete(rel, &tuple->t_self);
-	ReleaseSysCache(tuple);
-
-	CommandCounterIncrement();
 }
 
 /*
@@ -427,24 +412,37 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
 {
 	RepOriginId roident;
 	Relation	rel;
+	HeapTuple	tuple;
 
 	Assert(IsTransactionState());
 
-	/*
-	 * To interlock against concurrent drops, we hold ExclusiveLock on
-	 * pg_replication_origin till xact commit.
-	 *
-	 * XXX We can optimize this by acquiring the lock on a specific origin by
-	 * using LockSharedObject if required. However, for that, we first to
-	 * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
-	 * the specific origin and then re-check if the origin still exists.
-	 */
-	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+	rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
 
 	roident = replorigin_by_name(name, missing_ok);
 
-	if (OidIsValid(roident))
-		replorigin_drop_guts(rel, roident, nowait);
+	/* Lock the origin to prevent concurrent drops */
+	LockSharedObject(ReplicationOriginRelationId, roident, 0,
+					 AccessExclusiveLock);
+
+	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
+	if (!HeapTupleIsValid(tuple))
+	{
+		if (!missing_ok)
+			elog(ERROR, "cache lookup failed for replication origin with ID %d",
+				 roident);
+
+		return;
+	}
+
+	replorigin_state_clear(roident, nowait);
+
+	/*
+	 * Now, we can delete the catalog entry.
+	 */
+	CatalogTupleDelete(rel, &tuple->t_self);
+	ReleaseSysCache(tuple);
+
+	CommandCounterIncrement();
 
 	/* We keep the lock on pg_replication_origin until commit */
 	table_close(rel, NoLock);
-- 
2.34.1

