From 83aa8b6a0c7b4de42c1bf7aa6cfd863830a86c76 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Fri, 27 Jan 2023 15:17:09 +0530
Subject: [PATCH] Lock the replication origin record instead of locking the
 pg_replication_origin relation.

Lock the replication origin record instead of locking the
pg_replication_origin relation.
---
 src/backend/replication/logical/origin.c | 37 ++++++++++++++++--------
 1 file changed, 25 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index b754c43840..049a2547d3 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -417,6 +417,21 @@ restart:
 	CommandCounterIncrement();
 }
 
+static bool
+replorigin_exists(RepOriginId roident)
+{
+	HeapTuple	tuple;
+
+	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum((Oid) roident));
+	if (HeapTupleIsValid(tuple))
+	{
+		ReleaseSysCache(tuple);
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Drop replication origin (by name).
  *
@@ -430,23 +445,21 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
 
 	Assert(IsTransactionState());
 
-	/*
-	 * To interlock against concurrent drops, we hold ExclusiveLock on
-	 * pg_replication_origin till xact commit.
-	 *
-	 * XXX We can optimize this by acquiring the lock on a specific origin by
-	 * using LockSharedObject if required. However, for that, we first to
-	 * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
-	 * the specific origin and then re-check if the origin still exists.
-	 */
-	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+	rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
 
 	roident = replorigin_by_name(name, missing_ok);
 
-	if (OidIsValid(roident))
+	/*
+	 * Lock the origin to prevent concurrent drops. We keep the lock until the
+	 * end of transaction.
+	 */
+	LockSharedObject(ReplicationOriginRelationId, roident, 0,
+					 AccessExclusiveLock);
+
+	/* Drop the replication origin if it has not been dropped already */
+	if (replorigin_exists(roident))
 		replorigin_drop_guts(rel, roident, nowait);
 
-	/* We keep the lock on pg_replication_origin until commit */
 	table_close(rel, NoLock);
 }
 
-- 
2.34.1

