>From e5d46c73955e7647912c2625e31027a6fef7c880 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 26 Jan 2015 21:03:35 +0100
Subject: [PATCH] Prevent ALTER DATABASE SET TABLESPACE from running
 concurrently with a base backup.

The combination can cause a corrupt base backup.
---
 src/backend/access/transam/xlog.c    | 33 +++++++++++++++++++++++++++++
 src/backend/commands/dbcommands.c    | 41 ++++++++++++++++++++++++++++++++++++
 src/backend/replication/basebackup.c | 11 ++++++++++
 src/include/access/xlog.h            |  1 +
 4 files changed, 86 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..1daa10d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -53,6 +53,7 @@
 #include "storage/ipc.h"
 #include "storage/large_object.h"
 #include "storage/latch.h"
+#include "storage/lmgr.h"
 #include "storage/pmsignal.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -9329,6 +9330,14 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	else
 		XLogCtl->Insert.nonExclusiveBackups++;
 	XLogCtl->Insert.forcePageWrites = true;
+
+	/*
+	 * Acquire lock on pg datababase just before releasing the WAL insert lock
+	 * and entering the ENSURE_ERROR_CLEANUP block. That way it's sufficient
+	 * to release it in the error cleanup callback.
+	 */
+	LockRelationOid(DatabaseRelationId, ShareLock);
+
 	WALInsertLockRelease();
 
 	/* Ensure we release forcePageWrites if fail below */
@@ -9523,6 +9532,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
 
+	UnlockRelationOid(DatabaseRelationId, ShareLock);
+
 	/*
 	 * We're done.  As a convenience, return the starting WAL location.
 	 */
@@ -9537,6 +9548,8 @@ pg_start_backup_callback(int code, Datum arg)
 {
 	bool		exclusive = DatumGetBool(arg);
 
+	UnlockRelationOid(DatabaseRelationId, ShareLock);
+
 	/* Update backup counters and forcePageWrites on failure */
 	WALInsertLockAcquireExclusive();
 	if (exclusive)
@@ -9937,6 +9950,26 @@ do_pg_abort_backup(void)
 }
 
 /*
+ * Is a (exclusive or nonexclusive) base backup running?
+ *
+ * Note that this does not check whether any standby of this node has a
+ * basebackup running, or whether any upstream master (if this is a standby)
+ * has one in progress
+ */
+bool
+LocalBaseBackupInProgress(void)
+{
+	bool ret;
+
+	WALInsertLockAcquire();
+	ret = XLogCtl->Insert.exclusiveBackup ||
+		XLogCtl->Insert.nonExclusiveBackups > 0;
+	WALInsertLockRelease();
+
+	return ret;
+}
+
+/*
  * Get latest redo apply position.
  *
  * Exported to allow WALReceiver to read the pointer directly.
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 5e66961..6184c83 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -1087,6 +1087,30 @@ movedb(const char *dbname, const char *tblspcname)
 				 errmsg("cannot change the tablespace of the currently open database")));
 
 	/*
+	 * Prevent SET TABLESPACE from running concurrently with a base
+	 * backup. Without that check a base backup would potentially copy a
+	 * partially removed source database; which WAL replay then would copy
+	 * over the new database...
+	 *
+	 * Starting a base backup takes a SHARE lock on pg_database. In addition a
+	 * streaming basebackup takes the same lock for the entirety of the copy
+	 * of the data directory.  That, combined with this check, prevents base
+	 * backups from being taken at the same time a SET TABLESPACE is in
+	 * progress.
+	 *
+	 * Note that this check here will not trigger if a standby currently has a
+	 * base backup ongoing; instead WAL replay will have a recovery conflict
+	 * when replaying the DBASE_CREATE record. That is only safe because
+	 * standbys can only do nonexclusive base backups which hold the lock over
+	 * the entire runtime - otherwise no lock would prevent replay after
+	 * pg_start_backup().
+	 */
+	if (LocalBaseBackupInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot change a database's tablespace while a base backup is in progress")));
+
+	/*
 	 * Get tablespace's oid
 	 */
 	dst_tblspcoid = get_tablespace_oid(tblspcname, false);
@@ -2052,6 +2076,17 @@ dbase_redo(XLogReaderState *record)
 		src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
 
+		if (InHotStandby)
+		{
+			/* Lock source database, do avoid concurrent hint bit writes et al. */
+			LockSharedObjectForSession(DatabaseRelationId, xlrec->src_db_id, 0, AccessExclusiveLock);
+			ResolveRecoveryConflictWithDatabase(xlrec->src_db_id);
+
+			/* Lock target database, it'll be overwritten in a second */
+			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
+			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
+		}
+
 		/*
 		 * Our theory for replaying a CREATE is to forcibly drop the target
 		 * subdirectory if present, then re-copy the source data. This may be
@@ -2078,6 +2113,12 @@ dbase_redo(XLogReaderState *record)
 		 * We don't need to copy subdirectories
 		 */
 		copydir(src_path, dst_path, false);
+
+		if (InHotStandby)
+		{
+			UnlockSharedObjectForSession(DatabaseRelationId, xlrec->src_db_id, 0, AccessExclusiveLock);
+			UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
+		}
 	}
 	else if (info == XLOG_DBASE_DROP)
 	{
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 3058ce9..0e76497 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -19,6 +19,7 @@
 
 #include "access/xlog_internal.h"		/* for pg_start/stop_backup */
 #include "catalog/catalog.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_type.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq.h"
@@ -32,6 +33,8 @@
 #include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/lock.h"
 #include "utils/builtins.h"
 #include "utils/elog.h"
 #include "utils/ps_status.h"
@@ -110,6 +113,8 @@ static void
 base_backup_cleanup(int code, Datum arg)
 {
 	do_pg_abort_backup();
+
+	UnlockRelationOid(DatabaseRelationId, ShareLock);
 }
 
 /*
@@ -134,6 +139,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 
 	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
 								  &labelfile);
+
+	LockRelationOid(DatabaseRelationId, ShareLock);
+
 	/*
 	 * Once do_pg_start_backup has been called, ensure that any failure causes
 	 * us to abort the backup so we don't "leak" a backup counter. For this reason,
@@ -304,6 +312,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 	}
 	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
 
+	/* release lock again, before stop_backup, as that can error out */
+	UnlockRelationOid(DatabaseRelationId, ShareLock);
+
 	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
 
 	if (opt->includewal)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 138deaf..f3893f3 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -253,6 +253,7 @@ extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
 extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
 				  TimeLineID *stoptli_p);
 extern void do_pg_abort_backup(void);
+extern bool LocalBaseBackupInProgress(void);
 
 /* File path names (all relative to $PGDATA) */
 #define BACKUP_LABEL_FILE		"backup_label"
-- 
2.0.0.rc2.4.g1dc51c6.dirty

