>From 6e196d17e3dc3ae923321c1b49eb46ccd5ac75b0 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 27 Jan 2015 19:52:11 +0100
Subject: [PATCH] Fix various issues around WAL replay and ALTER DATABASE SET
 TABLESPACE.

<Danger of basebackups vs. AD ST>
Discussion: 20150120152819.GC24381@alap3.anarazel.de

Fix GetLockConflicts() to properly terminate the list of conflicting
backends. It's unclear why this hasn't caused more problems.

Discussion: 20150127142713.GD29457@awork2.anarazel.de

Don't acquire blocking locks on the database in dbase_redo(), not
enough state setup.
Discussion: 20150126212458.GA29457@awork2.anarazel.de

Don't allow access to the template database during the replay of a
CREATE DATABASE.
---
 src/backend/access/transam/xlog.c    |  29 +++++++++
 src/backend/commands/dbcommands.c    | 104 ++++++++++++++++++++++++++-----
 src/backend/replication/basebackup.c |  15 +++++
 src/backend/replication/walsender.c  |  14 +----
 src/backend/storage/ipc/standby.c    | 117 ++++++++++++++++++-----------------
 src/backend/storage/lmgr/lmgr.c      |  31 ++++++++++
 src/backend/storage/lmgr/lock.c      |  13 ++--
 src/backend/utils/init/postinit.c    |   2 +-
 src/include/access/xlog.h            |   1 +
 src/include/storage/lmgr.h           |   3 +
 src/include/storage/standby.h        |   2 +-
 11 files changed, 240 insertions(+), 91 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..38e7dff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -53,6 +53,7 @@
 #include "storage/ipc.h"
 #include "storage/large_object.h"
 #include "storage/latch.h"
+#include "storage/lmgr.h"
 #include "storage/pmsignal.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -9291,6 +9292,12 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("backup label too long (max %d bytes)",
 						MAXPGPATH)));
+	/*
+	 * Acquire lock on pg datababase to prevent concurrent CREATE DATABASE and
+	 * ALTER DATABASE ... SET TABLESPACE from running. In case of an error
+	 * it'll be released by the transaction abort code.
+	 */
+	LockRelationOidForSession(DatabaseRelationId, ShareLock);
 
 	/*
 	 * Mark backup active in shared memory.  We must do full-page WAL writes
@@ -9523,6 +9530,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
 
+	UnlockRelationOidForSession(DatabaseRelationId, ShareLock);
+
 	/*
 	 * We're done.  As a convenience, return the starting WAL location.
 	 */
@@ -9937,6 +9946,26 @@ do_pg_abort_backup(void)
 }
 
 /*
+ * Is a (exclusive or nonexclusive) base backup running?
+ *
+ * Note that this does not check whether any standby of this node has a
+ * basebackup running, or whether any upstream master (if this is a standby)
+ * has one in progress
+ */
+bool
+LocalBaseBackupInProgress(void)
+{
+	bool ret;
+
+	WALInsertLockAcquire();
+	ret = XLogCtl->Insert.exclusiveBackup ||
+		XLogCtl->Insert.nonExclusiveBackups > 0;
+	WALInsertLockRelease();
+
+	return ret;
+}
+
+/*
  * Get latest redo apply position.
  *
  * Exported to allow WALReceiver to read the pointer directly.
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 5e66961..e6a3352 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1071,6 +1071,9 @@ movedb(const char *dbname, const char *tblspcname)
 	LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
 							   AccessExclusiveLock);
 
+	/* Acquire lock on pg_database against concurrent base backups starting */
+	LockRelationOidForSession(DatabaseRelationId, RowExclusiveLock);
+
 	/*
 	 * Permission checks
 	 */
@@ -1087,6 +1090,30 @@ movedb(const char *dbname, const char *tblspcname)
 				 errmsg("cannot change the tablespace of the currently open database")));
 
 	/*
+	 * Prevent SET TABLESPACE from running concurrently with a base
+	 * backup. Without that check a base backup would potentially copy a
+	 * partially removed source database; which WAL replay then would copy
+	 * over the new database...
+	 *
+	 * Starting a base backup takes a SHARE lock on pg_database. In addition a
+	 * streaming basebackup takes the same lock for the entirety of the copy
+	 * of the data directory.  That, combined with this check, prevents base
+	 * backups from being taken at the same time a SET TABLESPACE is in
+	 * progress.
+	 *
+	 * Note that this check here will not trigger if a standby currently has a
+	 * base backup ongoing; instead WAL replay will have a recovery conflict
+	 * when replaying the DBASE_CREATE record. That is only safe because
+	 * standbys can only do nonexclusive base backups which hold the lock over
+	 * the entire runtime - otherwise no lock would prevent replay after
+	 * pg_start_backup().
+	 */
+	if (LocalBaseBackupInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot change a database's tablespace while a base backup is in progress")));
+
+	/*
 	 * Get tablespace's oid
 	 */
 	dst_tblspcoid = get_tablespace_oid(tblspcname, false);
@@ -1116,6 +1143,7 @@ movedb(const char *dbname, const char *tblspcname)
 		heap_close(pgdbrel, NoLock);
 		UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
 									 AccessExclusiveLock);
+		UnlockRelationOidForSession(DatabaseRelationId, RowExclusiveLock);
 		return;
 	}
 
@@ -1204,6 +1232,12 @@ movedb(const char *dbname, const char *tblspcname)
 	}
 
 	/*
+	 * Force xid assignment, for the benefit of dbase_redo()'s internal
+	 * locking.
+	 */
+	GetTopTransactionId();
+
+	/*
 	 * Use an ENSURE block to make sure we remove the debris if the copy fails
 	 * (eg, due to out-of-disk-space).  This is not a 100% solution, because
 	 * of the possibility of failure during transaction commit, but it should
@@ -1314,6 +1348,12 @@ movedb(const char *dbname, const char *tblspcname)
 	StartTransactionCommand();
 
 	/*
+	 * Force xid assignment, for the benefit of dbase_redo()'s internal
+	 * locking.
+	 */
+	GetTopTransactionId();
+
+	/*
 	 * Remove files from the old tablespace
 	 */
 	if (!rmtree(src_dbpath, true))
@@ -1340,6 +1380,7 @@ movedb(const char *dbname, const char *tblspcname)
 	/* Now it's safe to release the database lock */
 	UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
 								 AccessExclusiveLock);
+	UnlockRelationOidForSession(DatabaseRelationId, RowExclusiveLock);
 }
 
 /* Error cleanup callback for movedb */
@@ -2053,6 +2094,38 @@ dbase_redo(XLogReaderState *record)
 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
 		/*
+		 * Can only do the locking if the server generating the record was new
+		 * enough to know it has to assign a xid.
+		 */
+		if (InHotStandby && TransactionIdIsValid(XLogRecGetXid(record)))
+		{
+			LOCKTAG locktag;
+
+			SET_LOCKTAG_OBJECT(locktag,
+							   InvalidOid,
+							   DatabaseRelationId,
+							   xlrec->src_db_id,
+							   0);
+
+			/* Lock source database, do avoid concurrent hint bit writes et al. */
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, AccessExclusiveLock);
+			ResolveRecoveryConflictWithDatabase(xlrec->src_db_id);
+
+			/* Lock target database, it'll be overwritten in a second */
+			SET_LOCKTAG_OBJECT(locktag,
+							   InvalidOid,
+							   DatabaseRelationId,
+							   xlrec->db_id,
+							   0);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, AccessExclusiveLock);
+			ResolveRecoveryConflictWithDatabase(xlrec->src_db_id);
+
+			/* Lock pg_database, to conflict with base backups */
+			SET_LOCKTAG_RELATION(locktag, 0, DatabaseRelationId);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, RowExclusiveLock);
+		}
+
+		/*
 		 * Our theory for replaying a CREATE is to forcibly drop the target
 		 * subdirectory if present, then re-copy the source data. This may be
 		 * more work than needed, but it is simple to implement.
@@ -2086,16 +2159,31 @@ dbase_redo(XLogReaderState *record)
 
 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
-		if (InHotStandby)
+		/*
+		 * Can only do the locking if the server generating the record was new
+		 * enough to know it has to assign a xid.
+		 */
+		if (InHotStandby && TransactionIdIsValid(XLogRecGetXid(record)))
 		{
+			LOCKTAG locktag;
+
 			/*
 			 * Lock database while we resolve conflicts to ensure that
 			 * InitPostgres() cannot fully re-execute concurrently. This
 			 * avoids backends re-connecting automatically to same database,
 			 * which can happen in some cases.
 			 */
-			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
+			SET_LOCKTAG_OBJECT(locktag,
+							   InvalidOid,
+							   DatabaseRelationId,
+							   xlrec->db_id,
+							   0);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, AccessExclusiveLock);
 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
+
+			/* Lock pg_database, to conflict with base backups */
+			SET_LOCKTAG_RELATION(locktag, 0, DatabaseRelationId);
+			StandbyAcquireLock(XLogRecGetXid(record), &locktag, RowExclusiveLock);
 		}
 
 		/* Drop pages for this database that are in the shared buffer cache */
@@ -2112,18 +2200,6 @@ dbase_redo(XLogReaderState *record)
 			ereport(WARNING,
 					(errmsg("some useless files may be left behind in old database directory \"%s\"",
 							dst_path)));
-
-		if (InHotStandby)
-		{
-			/*
-			 * Release locks prior to commit. XXX There is a race condition
-			 * here that may allow backends to reconnect, but the window for
-			 * this is small because the gap between here and commit is mostly
-			 * fairly small and it is unlikely that people will be dropping
-			 * databases that we are trying to connect to anyway.
-			 */
-			UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
-		}
 	}
 	else
 		elog(PANIC, "dbase_redo: unknown op code %u", info);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 3058ce9..a0b590f 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -17,8 +17,10 @@
 #include <unistd.h>
 #include <time.h>
 
+#include "access/xact.h"
 #include "access/xlog_internal.h"		/* for pg_start/stop_backup */
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_type.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq.h"
@@ -32,6 +34,8 @@
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/lock.h"
 #include "utils/builtins.h"
 #include "utils/elog.h"
 #include "utils/ps_status.h"
@@ -134,6 +138,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 
 	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
 								  &labelfile);
+
+	LockRelationOidForSession(DatabaseRelationId, ShareLock);
+
 	/*
 	 * Once do_pg_start_backup has been called, ensure that any failure causes
 	 * us to abort the backup so we don't "leak" a backup counter. For this reason,
@@ -304,6 +311,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
 
+	/* release lock preventing base backups - no risk while backing up WAL */
+	UnlockRelationOidForSession(DatabaseRelationId, ShareLock);
+
 	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
 
 	if (opt->includewal)
@@ -675,6 +685,9 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg, false);
 	}
 
+	/* to provide error handling around locks */
+	StartTransactionCommand();
+
 	/* Make sure we can open the directory with tablespaces in it */
 	dir = AllocateDir("pg_tblspc");
 	if (!dir)
@@ -684,6 +697,8 @@ SendBaseBackup(BaseBackupCmd *cmd)
 	perform_base_backup(&opt, dir);
 
 	FreeDir(dir);
+
+	CommitTransactionCommand();
 }
 
 static void
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 05d2339..3f25ccf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -185,7 +185,6 @@ static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
@@ -2566,17 +2565,6 @@ WalSndSigHupHandler(SIGNAL_ARGS)
 	errno = save_errno;
 }
 
-/* SIGUSR1: set flag to send WAL records */
-static void
-WalSndXLogSendHandler(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	latch_sigusr1_handler();
-
-	errno = save_errno;
-}
-
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
@@ -2610,7 +2598,7 @@ WalSndSignals(void)
 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
 	InitializeTimeouts();		/* establishes SIGALRM handler */
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
 												 * shutdown */
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 292bed5..98e89e1 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -38,10 +38,16 @@ int			max_standby_archive_delay = 30 * 1000;
 int			max_standby_streaming_delay = 30 * 1000;
 
 static List *RecoveryLockList;
+typedef struct RecoveryLockListEntry
+{
+	TransactionId xid;
+	LOCKMODE lockmode;
+	LOCKTAG locktag;
+} RecoveryLockListEntry;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 									   ProcSignalReason reason);
-static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
+static void ResolveRecoveryConflictWithLock(LOCKTAG *tag, LOCKMODE mode);
 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
@@ -320,10 +326,10 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
 	 * us. This is rare enough that we do this as simply as possible: no wait,
 	 * just force them off immediately.
 	 *
-	 * No locking is required here because we already acquired
-	 * AccessExclusiveLock. Anybody trying to connect while we do this will
-	 * block during InitPostgres() and then disconnect when they see the
-	 * database has been removed.
+	 * No locking is required here because we already acquired a
+	 * AccessExclusiveLock on the database in dbase_redo(). Anybody trying to
+	 * connect while we do this will block during InitPostgres() and then
+	 * disconnect when they see the database has been removed.
 	 */
 	while (CountDBBackends(dbid) > 0)
 	{
@@ -338,14 +344,11 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
 }
 
 static void
-ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
+ResolveRecoveryConflictWithLock(LOCKTAG *locktag, LOCKMODE mode)
 {
 	VirtualTransactionId *backends;
 	bool		lock_acquired = false;
 	int			num_attempts = 0;
-	LOCKTAG		locktag;
-
-	SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
 
 	/*
 	 * If blowing away everybody with conflicting locks doesn't work, after
@@ -358,7 +361,7 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 	while (!lock_acquired)
 	{
 		if (++num_attempts < 3)
-			backends = GetLockConflicts(&locktag, AccessExclusiveLock);
+			backends = GetLockConflicts(locktag, mode);
 		else
 			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
 												 InvalidOid);
@@ -366,7 +369,7 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
 		ResolveRecoveryConflictWithVirtualXIDs(backends,
 											 PROCSIG_RECOVERY_CONFLICT_LOCK);
 
-		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
+		if (LockAcquireExtended(locktag, mode, true, true, false)
 			!= LOCKACQUIRE_NOT_AVAIL)
 			lock_acquired = true;
 	}
@@ -544,19 +547,18 @@ StandbyTimeoutHandler(void)
  * this lock, so query access is not allowed at this time". So the Startup
  * process is the proxy by which the original locks are implemented.
  *
- * We only keep track of AccessExclusiveLocks, which are only ever held by
- * one transaction on one relation, and don't worry about lock queuing.
+ * We only keep track of the primary's AccessExclusiveLocks, which are only
+ * ever held by one transaction on one relation, and don't worry about lock
+ * queuing.  The startup process however does acquire other locks occasionally
+ * (c.f. dbase_redo()) - but even there no queuing is possible.
  *
  * We keep a single dynamically expandible list of locks in local memory,
  * RelationLockList, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * We record the lock against the top-level xid, rather than individual
- * subtransaction xids. This means AccessExclusiveLocks held by aborted
- * subtransactions are not released as early as possible on standbys.
- *
- * List elements use type xl_rel_lock, since the WAL record type exactly
- * matches the information that we need to keep track of.
+ * subtransaction xids. This means locks held by aborted subtransactions are
+ * not released as early as possible on standbys.
  *
  * We use session locks rather than normal locks so we don't need
  * ResourceOwners.
@@ -564,10 +566,11 @@ StandbyTimeoutHandler(void)
 
 
 void
-StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
+StandbyAcquireLock(TransactionId xid, LOCKTAG *locktag, LOCKMODE mode)
 {
-	xl_standby_lock *newlock;
-	LOCKTAG		locktag;
+	RecoveryLockListEntry *newlock;
+	Oid dbOid = locktag->locktag_field1;
+	Oid relOid = locktag->locktag_field2;
 
 	/* Already processed? */
 	if (!TransactionIdIsValid(xid) ||
@@ -576,25 +579,24 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 		return;
 
 	elog(trace_recovery(DEBUG4),
-		 "adding recovery lock: db %u rel %u", dbOid, relOid);
+		 "adding recovery lock: db %u rel %u",
+		 dbOid, relOid);
 
 	/* dbOid is InvalidOid when we are locking a shared relation. */
 	Assert(OidIsValid(relOid));
 
-	newlock = palloc(sizeof(xl_standby_lock));
+	newlock = palloc(sizeof(RecoveryLockListEntry));
 	newlock->xid = xid;
-	newlock->dbOid = dbOid;
-	newlock->relOid = relOid;
+	newlock->lockmode = mode;
+	memcpy(&newlock->locktag, locktag, sizeof(LOCKTAG));
 	RecoveryLockList = lappend(RecoveryLockList, newlock);
 
 	/*
 	 * Attempt to acquire the lock as requested, if not resolve conflict
 	 */
-	SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
-
-	if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
+	if (LockAcquireExtended(locktag, mode, true, true, false)
 		== LOCKACQUIRE_NOT_AVAIL)
-		ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
+		ResolveRecoveryConflictWithLock(locktag, mode);
 }
 
 static void
@@ -610,22 +612,16 @@ StandbyReleaseLocks(TransactionId xid)
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 
 		next = lnext(cell);
 
 		if (!TransactionIdIsValid(xid) || lock->xid == xid)
 		{
-			LOCKTAG		locktag;
-
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
+			if (!LockRelease(&lock->locktag, lock->lockmode, true))
 				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
+					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+					 lock->xid);
 
 			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 			pfree(lock);
@@ -662,25 +658,24 @@ StandbyReleaseAllLocks(void)
 	ListCell   *cell,
 			   *prev,
 			   *next;
-	LOCKTAG		locktag;
 
 	elog(trace_recovery(DEBUG2), "release all standby locks");
 
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 
 		next = lnext(cell);
 
 		elog(trace_recovery(DEBUG4),
-			 "releasing recovery lock: xid %u db %u rel %u",
-			 lock->xid, lock->dbOid, lock->relOid);
-		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-		if (!LockRelease(&locktag, AccessExclusiveLock, true))
+			 "releasing recovery lock for xid %u",
+			 lock->xid);
+
+		if (!LockRelease(&lock->locktag, lock->lockmode, true))
 			elog(LOG,
-				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-				 lock->xid, lock->dbOid, lock->relOid);
+				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+				 lock->xid);
 		RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 		pfree(lock);
 	}
@@ -697,12 +692,11 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 	ListCell   *cell,
 			   *prev,
 			   *next;
-	LOCKTAG		locktag;
 
 	prev = NULL;
 	for (cell = list_head(RecoveryLockList); cell; cell = next)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		RecoveryLockListEntry *lock = (RecoveryLockListEntry *) lfirst(cell);
 		bool		remove = false;
 
 		next = lnext(cell);
@@ -735,13 +729,13 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 		if (remove)
 		{
 			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
+				 "releasing recovery lock: xid %u",
+				 lock->xid);
+
+			if (!LockRelease(&lock->locktag, lock->lockmode, true))
 				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
+					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u",
+					 lock->xid);
 			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
 			pfree(lock);
 		}
@@ -776,9 +770,16 @@ standby_redo(XLogReaderState *record)
 		int			i;
 
 		for (i = 0; i < xlrec->nlocks; i++)
-			StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
-											  xlrec->locks[i].dbOid,
-											  xlrec->locks[i].relOid);
+		{
+			LOCKTAG locktag;
+
+			SET_LOCKTAG_RELATION(locktag,
+								 xlrec->locks[i].dbOid,
+								 xlrec->locks[i].relOid);
+			StandbyAcquireLock(xlrec->locks[i].xid,
+							   &locktag,
+							   AccessExclusiveLock);
+		}
 	}
 	else if (info == XLOG_RUNNING_XACTS)
 	{
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index d13a167..c428b38 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -248,6 +248,37 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
 }
 
 /*
+ *		LockRelationOidForSession
+ *
+ * Lock a relation in session mode, without requiring having it opened it in
+ * transaction local mode. That's likely only useful during WAL replay.
+ */
+void
+LockRelationOidForSession(Oid relid, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+
+	SetLocktagRelationOid(&tag, relid);
+
+	(void) LockAcquire(&tag, lockmode, true, false);
+}
+
+/*
+ *		UnlockRelationOidForSession
+ *
+ * Unlock lock acquired by LockRelationOidForSession.
+ */
+void
+UnlockRelationOidForSession(Oid relid, LOCKMODE lockmode)
+{
+	LOCKTAG		tag;
+
+	SetLocktagRelationOid(&tag, relid);
+
+	LockRelease(&tag, lockmode, true);
+}
+
+/*
  *		LockHasWaitersRelation
  *
  * This is a functiion to check if someone else is waiting on a
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 61c8d21..f051ad6 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -710,13 +710,16 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	if (RecoveryInProgress() && !InRecovery &&
 		(locktag->locktag_type == LOCKTAG_OBJECT ||
 		 locktag->locktag_type == LOCKTAG_RELATION) &&
-		lockmode > RowExclusiveLock)
+		lockmode > RowExclusiveLock &&
+		!sessionLock)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
 						lockMethodTable->lockModeNames[lockmode]),
 				 errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
 
+	Assert(!InRecovery || (sessionLock && dontWait));
+
 #ifdef LOCK_DEBUG
 	if (LOCK_DEBUG_ENABLED(locktag))
 		elog(LOG, "LockAcquire: lock [%u,%u] %s",
@@ -2804,6 +2807,8 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
 		 * on this lockable object.
 		 */
 		LWLockRelease(partitionLock);
+		vxids[count].backendId = InvalidBackendId;
+		vxids[count].localTransactionId = InvalidLocalTransactionId;
 		return vxids;
 	}
 
@@ -2857,6 +2862,8 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
 	if (count > MaxBackends)	/* should never happen */
 		elog(PANIC, "too many conflicting locks found");
 
+	vxids[count].backendId = InvalidBackendId;
+	vxids[count].localTransactionId = InvalidLocalTransactionId;
 	return vxids;
 }
 
@@ -3857,9 +3864,7 @@ lock_twophase_standby_recover(TransactionId xid, uint16 info,
 	if (lockmode == AccessExclusiveLock &&
 		locktag->locktag_type == LOCKTAG_RELATION)
 	{
-		StandbyAcquireAccessExclusiveLock(xid,
-										locktag->locktag_field1 /* dboid */ ,
-									  locktag->locktag_field2 /* reloid */ );
+		StandbyAcquireLock(xid, locktag, AccessExclusiveLock);
 	}
 }
 
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 1f5cf06..dfe2322 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -886,7 +886,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 			ereport(FATAL,
 					(errcode(ERRCODE_UNDEFINED_DATABASE),
 					 errmsg("database \"%s\" does not exist", dbname),
-			   errdetail("It seems to have just been dropped or renamed.")));
+			   errdetail("It seems to have just been dropped, moved or renamed.")));
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 138deaf..f3893f3 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -253,6 +253,7 @@ extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
 extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
 				  TimeLineID *stoptli_p);
 extern void do_pg_abort_backup(void);
+extern bool LocalBaseBackupInProgress(void);
 
 /* File path names (all relative to $PGDATA) */
 #define BACKUP_LABEL_FILE		"backup_label"
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index f5d70e5..2397f11 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -50,6 +50,9 @@ extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
 
+extern void LockRelationOidForSession(Oid relid, LOCKMODE lockmode);
+extern void UnlockRelationOidForSession(Oid relid, LOCKMODE lockmode);
+
 /* Lock a relation for extension */
 extern void LockRelationForExtension(Relation relation, LOCKMODE lockmode);
 extern void UnlockRelationForExtension(Relation relation, LOCKMODE lockmode);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index c32c963..f711281 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -45,7 +45,7 @@ extern void StandbyTimeoutHandler(void);
  * to make hot standby work. That includes logging AccessExclusiveLocks taken
  * by transactions and running-xacts snapshots.
  */
-extern void StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid);
+extern void StandbyAcquireLock(TransactionId xid, LOCKTAG *tag, LOCKMODE mode);
 extern void StandbyReleaseLockTree(TransactionId xid,
 					   int nsubxids, TransactionId *subxids);
 extern void StandbyReleaseAllLocks(void);
-- 
2.0.0.rc2.4.g1dc51c6.dirty

