>From 8531ca4d200d24ee45265774f7ead613563adca4 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 30 Jan 2015 09:28:17 +0100
Subject: [PATCH 1/4] Allow recovery lock infrastructure to not only hold
 relation locks.

This is a preparatory commit to fix several bugs, separated out for
easier review.

The startup process could so far only properly acquire access exlusive
locks on transactions. As it does not setup enough state to do lock
queuing - primarily to be able to issue recovery conflict interrupts,
and to release them when the primary releases locks - it has it's own
infrastructure to manage locks. That infrastructure so far assumed
that all locks are access exlusive locks on relations.

Unfortunately some code in the startup process has to acquire other
locks than what's supported by the aforementioned infrastructure in
standby.c. Namely dbase_redo() has to acquire locks on the database
objects.  Also further such locks will be added soon, to fix a another
bug.

So this patch shanges the infrastructure to be able to acquire locks
of different modes and locktags.

Additionally allow acquiring more heavyweight relation logs on the
standby than RowExclusive when acquired in session mode.

Discussion: 20150120152819.GC24381@alap3.anarazel.de

Backpatch all the way.
---
 src/backend/storage/ipc/standby.c | 120 ++++++++++++++++++--------------------
 src/backend/storage/lmgr/lock.c   |   7 +--
 src/include/storage/standby.h     |   2 +-
 3 files changed, 61 insertions(+), 68 deletions(-)

diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 292bed5..0502aab 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -38,10 +38,16 @@ int			max_standby_archive_delay = 30 * 1000;
 int			max_standby_streaming_delay = 30 * 1000;
 
 static List *RecoveryLockList;
+typedef struct RecoveryLockListEntry
+{
+	TransactionId	xid;
+	LOCKMODE		lockmode;
+	LOCKTAG			locktag;
+} RecoveryLockListEntry;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 									   ProcSignalReason reason);
-static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
+static void ResolveRecoveryConflictWithLock(LOCKTAG *tag, LOCKMODE mode);
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
@@ -320,10 +326,10 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
 	 * us. This is rare enough that we do this as simply as possible: no wait,
 	 * just force them off immediately.
 	 *
-	 * No locking is required here because we already acquired
-	 * AccessExclusiveLock. Anybody trying to connect while we do this will
-	 * block during InitPostgres() and then disconnect when they see the
-	 * database has been removed.
+	 * No locking is required here because we already acquired a
+	 * AccessExclusiveLock on the database in dbase_redo(). Anybody trying to
+	 * connect while we do this will block during InitPostgres() and then
+	 * disconnect when they see the database has been removed.
 	 */
 	while (CountDBBackends(dbid) > 0)
 	{
@@ -338,14 +344,11 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
 }
 
 static void
-ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
+ResolveRecoveryConflictWithLock(LOCKTAG *locktag, LOCKMODE mode)
 {
 	VirtualTransactionId *backends;
 	bool		lock_acquired = false;
 	int			num_attempts = 0;
-	LOCKTAG		locktag;
-
-	SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
 
 	/*
 	 * If blowing away everybody with conflicting locks doesn't work, after
@@ -358,7 +361,7 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 	while (!lock_acquired)
 	{
 		if (++num_attempts < 3)
-			backends = GetLockConflicts(&locktag, AccessExclusiveLock);
+			backends = GetLockConflicts(locktag, mode);
 		else
 			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
 												 InvalidOid);
@@ -366,7 +369,7 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 		ResolveRecoveryConflictWithVirtualXIDs(backends,
 											 PROCSIG_RECOVERY_CONFLICT_LOCK);
 
-		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
+		if (LockAcquireExtended(locktag, mode, true, true, false)
 			!= LOCKACQUIRE_NOT_AVAIL)
 			lock_acquired = true;
 	}
@@ -544,19 +547,18 @@ StandbyTimeoutHandler(void)
  * this lock, so query access is not allowed at this time". So the Startup
  * process is the proxy by which the original locks are implemented.
  *
- * We only keep track of AccessExclusiveLocks, which are only ever held by
- * one transaction on one relation, and don't worry about lock queuing.
+ * We only keep track of the primary's AccessExclusiveLocks, which are only
+ * ever held by one transaction on one relation, and don't worry about lock
+ * queuing.  The startup process however does acquire other locks occasionally
+ * (c.f. dbase_redo()) - but even there no queuing is possible.
  *
  * We keep a single dynamically expandible list of locks in local memory,
  * RelationLockList, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * We record the lock against the top-level xid, rather than individual
- * subtransaction xids. This means AccessExclusiveLocks held by aborted
- * subtransactions are not released as early as possible on standbys.
- *
- * List elements use type xl_rel_lock, since the WAL record type exactly
- * matches the information that we need to keep track of.
+ * subtransaction xids. This means locks held by aborted subtransactions are
+ * not released as early as possible on standbys.
  *
  * We use session locks rather than normal locks so we don't need
  * ResourceOwners.
@@ -564,10 +566,11 @@ StandbyTimeoutHandler(void)
 
 
 void
-StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
+StandbyAcquireLock(TransactionId xid, LOCKTAG *locktag, LOCKMODE mode)
 {
-	xl_standby_lock *newlock;
-	LOCKTAG		locktag;
+	RecoveryLockListEntry *newlock;
+
+	Assert(locktag->locktag_lockmethodid == DEFAULT_LOCKMETHOD);
 
 	/* Already processed? */
 	if (!TransactionIdIsValid(xid) ||
@@ -575,26 +578,18 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 		TransactionIdDidAbort(xid))
 		return;
 
-	elog(trace_recovery(DEBUG4),
-		 "adding recovery lock: db %u rel %u", dbOid, relOid);
-
-	/* dbOid is InvalidOid when we are locking a shared relation. */
-	Assert(OidIsValid(relOid));
-
-	newlock = palloc(sizeof(xl_standby_lock));
+	newlock = palloc(sizeof(RecoveryLockListEntry));
 	newlock->xid = xid;
-	newlock->dbOid = dbOid;
-	newlock->relOid = relOid;
+	newlock->lockmode = mode;
+	memcpy(&newlock->locktag, locktag, sizeof(LOCKTAG));
 	RecoveryLockList = lappend(RecoveryLockList, newlock);
 
 	/*
 	 * Attempt to acquire the lock as requested, if not resolve conflict
 	 */
-	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
-
-	if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
+	if (LockAcquireExtended(locktag, mode, true, true, false)
 		== LOCKACQUIRE_NOT_AVAIL)
-		ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
+		ResolveRecoveryConflictWithLock(locktag, mode);
 }
 
 static void
@@ -610,22 +605,16 @@ StandbyReleaseLocks(TransactionId xid)
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 
 		next = lnext(cell);
 
 		if (!TransactionIdIsValid(xid) || lock->xid == xid)
 		{
-			LOCKTAG		locktag;
-
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
+			if (!LockRelease(&lock->locktag, lock->lockmode, true))
 				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
+					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+					 lock->xid);
 
 			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 			pfree(lock);
@@ -662,25 +651,24 @@ StandbyReleaseAllLocks(void)
 	ListCell   *cell,
 			   *prev,
 			   *next;
-	LOCKTAG		locktag;
 
 	elog(trace_recovery(DEBUG2), "release all standby locks");
 
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 
 		next = lnext(cell);
 
 		elog(trace_recovery(DEBUG4),
-			 "releasing recovery lock: xid %u db %u rel %u",
-			 lock->xid, lock->dbOid, lock->relOid);
-		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-		if (!LockRelease(&locktag, AccessExclusiveLock, true))
+			 "releasing recovery lock for xid %u",
+			 lock->xid);
+
+		if (!LockRelease(&lock->locktag, lock->lockmode, true))
 			elog(LOG,
-				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-				 lock->xid, lock->dbOid, lock->relOid);
+				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+				 lock->xid);
 		RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 		pfree(lock);
 	}
@@ -697,12 +685,11 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 	ListCell   *cell,
 			   *prev,
 			   *next;
-	LOCKTAG		locktag;
 
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 		bool		remove = false;
 
 		next = lnext(cell);
@@ -735,13 +722,13 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 		if (remove)
 		{
 			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
+				 "releasing recovery lock: xid %u",
+				 lock->xid);
+
+			if (!LockRelease(&lock->locktag, lock->lockmode, true))
 				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
+					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+					 lock->xid);
 			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 			pfree(lock);
 		}
@@ -776,9 +763,16 @@ standby_redo(XLogReaderState *record)
 		int			i;
 
 		for (i = 0; i < xlrec->nlocks; i++)
-			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
-											  xlrec->locks[i].dbOid,
-											  xlrec->locks[i].relOid);
+		{
+			LOCKTAG locktag;
+
+			SET_LOCKTAG_RELATION(locktag,
+								 xlrec->locks[i].dbOid,
+								 xlrec->locks[i].relOid);
+			StandbyAcquireLock(xlrec->locks[i].xid,
+							   &locktag,
+							   AccessExclusiveLock);
+		}
 	}
 	else if (info == XLOG_RUNNING_XACTS)
 	{
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 1eb2d4b..02ecf3d 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -710,7 +710,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	if (RecoveryInProgress() && !InRecovery &&
 		(locktag->locktag_type == LOCKTAG_OBJECT ||
 		 locktag->locktag_type == LOCKTAG_RELATION) &&
-		lockmode > RowExclusiveLock)
+		lockmode > RowExclusiveLock &&
+		!sessionLock)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
@@ -3861,9 +3862,7 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info,
 	if (lockmode == AccessExclusiveLock &&
 		locktag->locktag_type == LOCKTAG_RELATION)
 	{
-		StandbyAcquireAccessExclusiveLock(xid,
-										locktag->locktag_field1 /* dboid */ ,
-									  locktag->locktag_field2 /* reloid */ );
+		StandbyAcquireLock(xid, locktag, AccessExclusiveLock);
 	}
 }
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index c32c963..f711281 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -45,7 +45,7 @@ extern void StandbyTimeoutHandler(void);
  * to make hot standby work. That includes logging AccessExclusiveLocks taken
  * by transactions and running-xacts snapshots.
  */
-extern void StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid);
+extern void StandbyAcquireLock(TransactionId xid, LOCKTAG *tag, LOCKMODE mode);
 extern void StandbyReleaseLockTree(TransactionId xid,
 					   int nsubxids, TransactionId *subxids);
 extern void StandbyReleaseAllLocks(void);
-- 
2.2.1.212.gc5b9256

