From df4c150100416e1c6030221081302d7b274309da Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 26 Jan 2024 11:37:37 -0500
Subject: [PATCH v10 4/4] Invent --transaction-size option for pg_restore.

This patch allows pg_restore to wrap its commands into transaction
blocks, somewhat like --single-transaction, except that we commit
and start a new block after every N objects.  Using this mode
with a size limit of 1000 or so objects greatly reduces the number
of transactions consumed by the restore, while preventing any
one transaction from taking enough locks to overrun the receiving
server's shared lock table.

(A value of 1000 works well with the default lock table size of
around 6400 locks.  Higher --transaction-size values can be used
if one has increased the receiving server's lock table size.)

In this patch I have just hard-wired pg_upgrade to use
--transaction-size 1000.  Perhaps there would be value in adding
another pg_upgrade option to allow user control of that, but I'm
unsure that it's worth the trouble; I think few users would use it,
and any who did would see not that much benefit.  However, we
might need to adjust the logic to make the size be 1000 divided
by the number of parallel restore jobs allowed.
---
 doc/src/sgml/ref/pg_restore.sgml     |  24 +++++
 src/bin/pg_dump/pg_backup.h          |   4 +-
 src/bin/pg_dump/pg_backup_archiver.c | 139 +++++++++++++++++++++++++--
 src/bin/pg_dump/pg_backup_archiver.h |   3 +
 src/bin/pg_dump/pg_backup_db.c       |  18 ++++
 src/bin/pg_dump/pg_restore.c         |  15 ++-
 src/bin/pg_upgrade/pg_upgrade.c      |  11 +++
 7 files changed, 206 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 1a23874da6..2e3ba80258 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -786,6 +786,30 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--transaction-size=<replaceable class="parameter">N</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the restore as a series of transactions, each processing
+        up to <replaceable class="parameter">N</replaceable> database
+        objects.  This option implies <option>--exit-on-error</option>.
+       </para>
+       <para>
+        <option>--transaction-size</option> offers an intermediate choice
+        between the default behavior (one transaction per SQL command)
+        and <option>-1</option>/<option>--single-transaction</option>
+        (one transaction for all restored objects).
+        While <option>--single-transaction</option> has the least
+        overhead, it may be impractical for large databases because the
+        transaction will take a lock on each restored object, possibly
+        exhausting the server's lock table space.
+        Using <option>--transaction-size</option> with a size of a few
+        thousand objects offers nearly the same performance benefits while
+        capping the amount of lock table space needed.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--use-set-session-authorization</option></term>
       <listitem>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 9ef2f2017e..fbf5f1c515 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -149,7 +149,9 @@ typedef struct _restoreOptions
 												 * compression */
 	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
 										 * to stderr */
-	bool		single_txn;
+
+	bool		single_txn;		/* restore all TOCs in one transaction */
+	int			txn_size;		/* restore this many TOCs per txn, if > 0 */
 
 	bool	   *idWanted;		/* array showing which dump IDs to emit */
 	int			enable_row_security;
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 73b9972da4..ec74846998 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX)
 			/* Otherwise, drop anything that's selected and has a dropStmt */
 			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
 			{
+				bool		not_allowed_in_txn = false;
+
 				pg_log_info("dropping %s %s", te->desc, te->tag);
+
+				/*
+				 * In --transaction-size mode, we have to temporarily exit our
+				 * transaction block to drop objects that can't be dropped
+				 * within a transaction.
+				 */
+				if (ropt->txn_size > 0)
+				{
+					if (strcmp(te->desc, "DATABASE") == 0 ||
+						strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+					{
+						not_allowed_in_txn = true;
+						if (AH->connection)
+							CommitTransaction(AHX);
+						else
+							ahprintf(AH, "COMMIT;\n");
+					}
+				}
+
 				/* Select owner and schema as necessary */
 				_becomeOwner(AH, te);
 				_selectOutputSchema(AH, te->namespace);
@@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX)
 						}
 					}
 				}
+
+				/*
+				 * In --transaction-size mode, re-establish the transaction
+				 * block if needed; otherwise, commit after every N drops.
+				 */
+				if (ropt->txn_size > 0)
+				{
+					if (not_allowed_in_txn)
+					{
+						if (AH->connection)
+							StartTransaction(AHX);
+						else
+							ahprintf(AH, "BEGIN;\n");
+						AH->txnCount = 0;
+					}
+					else if (++AH->txnCount >= ropt->txn_size)
+					{
+						if (AH->connection)
+						{
+							CommitTransaction(AHX);
+							StartTransaction(AHX);
+						}
+						else
+							ahprintf(AH, "COMMIT;\nBEGIN;\n");
+						AH->txnCount = 0;
+					}
+				}
 			}
 		}
 
@@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX)
 		}
 	}
 
-	if (ropt->single_txn)
+	/*
+	 * Close out any persistent transaction we may have.  While these two
+	 * cases are started in different places, we can end both cases here.
+	 */
+	if (ropt->single_txn || ropt->txn_size > 0)
 	{
 		if (AH->connection)
 			CommitTransaction(AHX);
@@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
 	 */
 	if ((reqs & REQ_SCHEMA) != 0)
 	{
+		bool		object_is_db = false;
+
+		/*
+		 * In --transaction-size mode, must exit our transaction block to
+		 * create a database or set its properties.
+		 */
+		if (strcmp(te->desc, "DATABASE") == 0 ||
+			strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+		{
+			object_is_db = true;
+			if (ropt->txn_size > 0)
+			{
+				if (AH->connection)
+					CommitTransaction(&AH->public);
+				else
+					ahprintf(AH, "COMMIT;\n\n");
+			}
+		}
+
 		/* Show namespace in log message if available */
 		if (te->namespace)
 			pg_log_info("creating %s \"%s.%s\"",
@@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
 		/*
 		 * If we created a DB, connect to it.  Also, if we changed DB
 		 * properties, reconnect to ensure that relevant GUC settings are
-		 * applied to our session.
+		 * applied to our session.  (That also restarts the transaction block
+		 * in --transaction-size mode.)
 		 */
-		if (strcmp(te->desc, "DATABASE") == 0 ||
-			strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+		if (object_is_db)
 		{
 			pg_log_info("connecting to new database \"%s\"", te->tag);
 			_reconnectToDB(AH, te->tag);
@@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
 		}
 	}
 
+	/*
+	 * If we emitted anything for this TOC entry, that counts as one action
+	 * against the transaction-size limit.  Commit if it's time to.
+	 */
+	if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
+	{
+		if (++AH->txnCount >= ropt->txn_size)
+		{
+			if (AH->connection)
+			{
+				CommitTransaction(&AH->public);
+				StartTransaction(&AH->public);
+			}
+			else
+				ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+			AH->txnCount = 0;
+		}
+	}
+
 	if (AH->public.n_errors > 0 && status == WORKER_OK)
 		status = WORKER_IGNORED_ERRORS;
 
@@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
-	if (!ropt->single_txn)
+	/*
+	 * LOs must be restored within a transaction block, since we need the LO
+	 * handle to stay open while we write it.  Establish a transaction unless
+	 * there's one being used globally.
+	 */
+	if (!(ropt->single_txn || ropt->txn_size > 0))
 	{
 		if (AH->connection)
 			StartTransaction(&AH->public);
@@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
-	if (!ropt->single_txn)
+	if (!(ropt->single_txn || ropt->txn_size > 0))
 	{
 		if (AH->connection)
 			CommitTransaction(&AH->public);
@@ -3170,6 +3265,19 @@ _doSetFixedOutputState(ArchiveHandle *AH)
 	else
 		ahprintf(AH, "SET row_security = off;\n");
 
+	/*
+	 * In --transaction-size mode, we should always be in a transaction when
+	 * we begin to restore objects.
+	 */
+	if (ropt && ropt->txn_size > 0)
+	{
+		if (AH->connection)
+			StartTransaction(&AH->public);
+		else
+			ahprintf(AH, "\nBEGIN;\n");
+		AH->txnCount = 0;
+	}
+
 	ahprintf(AH, "\n");
 }
 
@@ -4033,6 +4141,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 		}
 	}
 
+	/*
+	 * In --transaction-size mode, we must commit the open transaction before
+	 * dropping the database connection.  This also ensures that child workers
+	 * can see the objects we've created so far.
+	 */
+	if (AH->public.ropt->txn_size > 0)
+		CommitTransaction(&AH->public);
+
 	/*
 	 * Now close parent connection in prep for parallel steps.  We do this
 	 * mainly to ensure that we don't exceed the specified number of parallel
@@ -4772,6 +4888,10 @@ CloneArchive(ArchiveHandle *AH)
 	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
 	memcpy(clone, AH, sizeof(ArchiveHandle));
 
+	/* Likewise flat-copy the RestoreOptions, so we can alter them locally */
+	clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
+	memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
+
 	/* Handle format-independent fields */
 	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
 
@@ -4793,6 +4913,13 @@ CloneArchive(ArchiveHandle *AH)
 	/* clones should not share lo_buf */
 	clone->lo_buf = NULL;
 
+	/*
+	 * Clone connections disregard --transaction-size; they must commit after
+	 * each command so that the results are immediately visible to other
+	 * workers.
+	 */
+	clone->public.ropt->txn_size = 0;
+
 	/*
 	 * Connect our new clone object to the database, using the same connection
 	 * parameters used for the original connection.
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index e4dd395582..1b9f142dea 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -324,6 +324,9 @@ struct _archiveHandle
 	char	   *currTablespace; /* current tablespace, or NULL */
 	char	   *currTableAm;	/* current table access method, or NULL */
 
+	/* in --transaction-size mode, this counts objects emitted in cur xact */
+	int			txnCount;
+
 	void	   *lo_buf;
 	size_t		lo_buf_used;
 	size_t		lo_buf_size;
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index c14d813b21..6b3bf174f2 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -554,6 +554,7 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 {
 	/* Make a writable copy of the command string */
 	char	   *buf = pg_strdup(te->defn);
+	RestoreOptions *ropt = AH->public.ropt;
 	char	   *st;
 	char	   *en;
 
@@ -562,6 +563,23 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 	{
 		*en++ = '\0';
 		ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
+
+		/* In --transaction-size mode, count each command as an action */
+		if (ropt && ropt->txn_size > 0)
+		{
+			if (++AH->txnCount >= ropt->txn_size)
+			{
+				if (AH->connection)
+				{
+					CommitTransaction(&AH->public);
+					StartTransaction(&AH->public);
+				}
+				else
+					ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+				AH->txnCount = 0;
+			}
+		}
+
 		st = en;
 	}
 	ahprintf(AH, "\n");
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index c3beacdec1..5ea78cf7cc 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -120,6 +120,7 @@ main(int argc, char **argv)
 		{"role", required_argument, NULL, 2},
 		{"section", required_argument, NULL, 3},
 		{"strict-names", no_argument, &strict_names, 1},
+		{"transaction-size", required_argument, NULL, 5},
 		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
 		{"no-comments", no_argument, &no_comments, 1},
 		{"no-publications", no_argument, &no_publications, 1},
@@ -289,10 +290,18 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &(opts->dumpSections));
 				break;
 
-			case 4:
+			case 4:				/* filter */
 				read_restore_filters(optarg, opts);
 				break;
 
+			case 5:				/* transaction-size */
+				if (!option_parse_int(optarg, "--transaction-size",
+									  1, INT_MAX,
+									  &opts->txn_size))
+					exit(1);
+				opts->exit_on_error = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -337,6 +346,9 @@ main(int argc, char **argv)
 	if (opts->dataOnly && opts->dropSchema)
 		pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
 
+	if (opts->single_txn && opts->txn_size > 0)
+		pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together");
+
 	/*
 	 * -C is not compatible with -1, because we can't create a database inside
 	 * a transaction block.
@@ -484,6 +496,7 @@ usage(const char *progname)
 	printf(_("  --section=SECTION            restore named section (pre-data, data, or post-data)\n"));
 	printf(_("  --strict-names               require table and/or schema include patterns to\n"
 			 "                               match at least one entity each\n"));
+	printf(_("  --transaction-size=N         commit after every N objects\n"));
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 10c94a6c1f..6a698e0c87 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -51,6 +51,13 @@
 #include "fe_utils/string_utils.h"
 #include "pg_upgrade.h"
 
+/*
+ * Maximum number of pg_restore actions (TOC entries) to process within one
+ * transaction.  At some point we might want to make this user-controllable,
+ * but for now a hard-wired setting will suffice.
+ */
+#define RESTORE_TRANSACTION_SIZE 1000
+
 static void set_locale_and_encoding(void);
 static void prepare_new_cluster(void);
 static void prepare_new_globals(void);
@@ -548,10 +555,12 @@ create_new_objects(void)
 				  true,
 				  true,
 				  "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+				  "--transaction-size=%d "
 				  "--dbname postgres \"%s/%s\"",
 				  new_cluster.bindir,
 				  cluster_conn_opts(&new_cluster),
 				  create_opts,
+				  RESTORE_TRANSACTION_SIZE,
 				  log_opts.dumpdir,
 				  sql_file_name);
 
@@ -586,10 +595,12 @@ create_new_objects(void)
 		parallel_exec_prog(log_file_name,
 						   NULL,
 						   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+						   "--transaction-size=%d "
 						   "--dbname template1 \"%s/%s\"",
 						   new_cluster.bindir,
 						   cluster_conn_opts(&new_cluster),
 						   create_opts,
+						   RESTORE_TRANSACTION_SIZE,
 						   log_opts.dumpdir,
 						   sql_file_name);
 	}
-- 
2.39.3

