From c44f723c033573fa5da72721595bc135a00e3cbf Mon Sep 17 00:00:00 2001
From: Thomas Krennwallner <teakay@aiven.io>
Date: Fri, 20 Sep 2024 17:33:05 -0400
Subject: [PATCH] pg_upgrade: Add check for invalid databases.

Currently, pg_upgrade fails to connect to the first invalid database
it encounters in get_loadable_libraries() and then aborts.  While we
print a fatal message with a hint what should be done, the output is
terse and does not report further invalid databases present in an
installation.

Instead of just exiting on first error, we collect all
pg_database.datconnlimit values in get_db_infos(), ignore invalid
databases in process_slot(), and then perform check_for_invalid_dbs(),
which collects all invalid databases in a report file
invalid_databases.txt.
---
 src/bin/pg_upgrade/check.c             | 62 ++++++++++++++++++++++++++
 src/bin/pg_upgrade/info.c              |  5 ++-
 src/bin/pg_upgrade/pg_upgrade.h        |  2 +
 src/bin/pg_upgrade/t/002_pg_upgrade.pl |  2 +-
 src/bin/pg_upgrade/task.c              | 32 +++++++++++--
 5 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 12735a4268..b5b0f7cd6f 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -12,6 +12,7 @@
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_class_d.h"
 #include "catalog/pg_collation.h"
+#include "catalog/pg_database.h"
 #include "fe_utils/string_utils.h"
 #include "mb/pg_wchar.h"
 #include "pg_upgrade.h"
@@ -19,6 +20,7 @@
 static void check_new_cluster_is_empty(void);
 static void check_is_install_user(ClusterInfo *cluster);
 static void check_proper_datallowconn(ClusterInfo *cluster);
+static void check_for_invalid_dbs(ClusterInfo *cluster);
 static void check_for_prepared_transactions(ClusterInfo *cluster);
 static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
@@ -598,6 +600,13 @@ check_and_dump_old_cluster(void)
 	 */
 	get_db_rel_and_slot_infos(&old_cluster);
 
+	/*
+	 * Verify that the old cluster does not reference invalid databases.
+	 * Connections to such databases are not allowed and break the following
+	 * checks.
+	 */
+	check_for_invalid_dbs(&old_cluster);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -691,6 +700,13 @@ check_new_cluster(void)
 {
 	get_db_rel_and_slot_infos(&new_cluster);
 
+	/*
+	 * Verify that the new cluster does not reference invalid databases.
+	 * Connections to such databases are not allowed and break the following
+	 * checks.
+	 */
+	check_for_invalid_dbs(&new_cluster);
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -1087,6 +1103,52 @@ check_is_install_user(ClusterInfo *cluster)
 	check_ok();
 }
 
+/*
+ *	check_for_invalid_dbs
+ *
+ *	Ensure that all databases are valid as connections to invalid databases
+ *	are not allowed.
+ */
+static void
+check_for_invalid_dbs(ClusterInfo *cluster)
+{
+	int			dbnum;
+	FILE	   *script = NULL;
+	char		output_path[MAXPGPATH];
+
+	prep_status("Checking for invalid databases");
+
+	snprintf(output_path, sizeof(output_path), "%s/%s",
+			 log_opts.basedir,
+			 "invalid_databases.txt");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *active_db = &cluster->dbarr.dbs[dbnum];
+
+		if (active_db->db_connlimit == DATCONNLIMIT_INVALID_DB)
+		{
+			if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
+				pg_fatal("could not open file \"%s\": %m", output_path);
+			fprintf(script, "%s\n", active_db->db_name);
+		}
+	}
+
+	if (script)
+	{
+		fclose(script);
+		pg_log(PG_REPORT, "fatal");
+		pg_fatal("Your installation contains invalid databases as a consequence of\n"
+				 "interrupted DROP DATABASE.  They are now marked as corrupted databases\n"
+				 "that cannot be connected to anymore.  Consider removing them using\n"
+				 "    DROP DATABASE ...;\n"
+				 "A list of invalid databases is in the file:\n"
+				 "    %s", output_path);
+	}
+	else
+		check_ok();
+}
+
 
 /*
  *	check_proper_datallowconn
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index f83ded89cb..0e2bd40d1a 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -409,12 +409,13 @@ get_db_infos(ClusterInfo *cluster)
 	int			tupnum;
 	DbInfo	   *dbinfos;
 	int			i_datname,
+				i_datconnlimit,
 				i_oid,
 				i_spclocation;
 	char		query[QUERY_ALLOC];
 
 	snprintf(query, sizeof(query),
-			 "SELECT d.oid, d.datname, d.encoding, d.datcollate, d.datctype, ");
+			 "SELECT d.oid, d.datname, d.encoding, d.datcollate, d.datctype, d.datconnlimit, ");
 	if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
 		snprintf(query + strlen(query), sizeof(query) - strlen(query),
 				 "datlocprovider, datlocale, ");
@@ -436,6 +437,7 @@ get_db_infos(ClusterInfo *cluster)
 
 	i_oid = PQfnumber(res, "oid");
 	i_datname = PQfnumber(res, "datname");
+	i_datconnlimit = PQfnumber(res, "datconnlimit");
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
@@ -445,6 +447,7 @@ get_db_infos(ClusterInfo *cluster)
 	{
 		dbinfos[tupnum].db_oid = atooid(PQgetvalue(res, tupnum, i_oid));
 		dbinfos[tupnum].db_name = pg_strdup(PQgetvalue(res, tupnum, i_datname));
+		dbinfos[tupnum].db_connlimit = atoi(PQgetvalue(res, tupnum, i_datconnlimit));
 		snprintf(dbinfos[tupnum].db_tablespace, sizeof(dbinfos[tupnum].db_tablespace), "%s",
 				 PQgetvalue(res, tupnum, i_spclocation));
 	}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 53f693c2d4..b1c77ca6de 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -194,6 +194,8 @@ typedef struct
 	char	   *db_name;		/* database name */
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
+	int			db_connlimit;	/* database invalid if set to
+								 * DATCONNLIMIT_INVALID_DB */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
 	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
diff --git a/src/bin/pg_upgrade/t/002_pg_upgrade.pl b/src/bin/pg_upgrade/t/002_pg_upgrade.pl
index 17af2ce61e..c7da6d27b2 100644
--- a/src/bin/pg_upgrade/t/002_pg_upgrade.pl
+++ b/src/bin/pg_upgrade/t/002_pg_upgrade.pl
@@ -424,7 +424,7 @@ SKIP:
 			$mode, '--check',
 		],
 		1,
-		[qr/invalid/],    # pg_upgrade prints errors on stdout :(
+		[qr/invalid_databases\.txt/],    # pg_upgrade prints errors on stdout :(
 		[qr/^$/],
 		'invalid database causes failure');
 	rmtree($newnode->data_dir . "/pg_upgrade_output.d");
diff --git a/src/bin/pg_upgrade/task.c b/src/bin/pg_upgrade/task.c
index ba1726c25e..90de2070a1 100644
--- a/src/bin/pg_upgrade/task.c
+++ b/src/bin/pg_upgrade/task.c
@@ -44,6 +44,7 @@
 
 #include "postgres_fe.h"
 
+#include "catalog/pg_database.h"
 #include "common/connect.h"
 #include "fe_utils/string_utils.h"
 #include "pg_upgrade.h"
@@ -228,6 +229,18 @@ process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
 		PQclear(res);
 }
 
+/*
+ * Closes the connection managed by the slot and frees the slot for immediate reuse.
+ */
+static void
+free_slot(UpgradeTaskSlot *slot)
+{
+	if (slot->conn)
+		PQfinish(slot->conn);
+	memset(slot, 0, sizeof(UpgradeTaskSlot));
+	slot->ready = true;
+}
+
 /*
  * Advances the state machine for a given slot as necessary.
  */
@@ -255,6 +268,21 @@ process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTas
 			 * new connection.
 			 */
 			slot->db_idx = dbs_processing++;
+
+			/*
+			 * Do not try to connect to invalid databases here,
+			 * check_for_invalid_dbs() will report them later. Skip this
+			 * database instead, free the slot and start initiating the next
+			 * connection.
+			 */
+			if (cluster->dbarr.dbs[slot->db_idx].db_connlimit == DATCONNLIMIT_INVALID_DB)
+			{
+				dbs_complete++;
+				free_slot(slot);
+				process_slot(cluster, slot, task);
+				return;
+			}
+
 			slot->state = CONNECTING;
 			start_conn(cluster, slot);
 
@@ -314,9 +342,7 @@ process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTas
 			 * through the slots.
 			 */
 			dbs_complete++;
-			PQfinish(slot->conn);
-			memset(slot, 0, sizeof(UpgradeTaskSlot));
-			slot->ready = true;
+			free_slot(slot);
 
 			process_slot(cluster, slot, task);
 
-- 
2.46.1

