From 3ab3558a236e6ad17fe48087aac3cabb4b02aa3e Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Wed, 20 Dec 2023 17:42:39 -0500
Subject: [PATCH v9 4/4] Invent --transaction-size option for pg_restore.

This patch allows pg_restore to wrap its commands into transaction
blocks, somewhat like --single-transaction, except that we commit
and start a new block after every N objects.  Using this mode
with a size limit of 1000 or so objects greatly reduces the number
of transactions consumed by the restore, while preventing any
one transaction from taking enough locks to overrun the receiving
server's shared lock table.

(A value of 1000 works well with the default lock table size of
around 6400 locks.  Higher --transaction-size values can be used
if one has increased the receiving server's lock table size.)

In this patch I have just hard-wired pg_upgrade to use
--transaction-size 1000.  Perhaps there would be value in adding
another pg_upgrade option to allow user control of that, but I'm
unsure that it's worth the trouble; I think few users would use it,
and any who did would see not that much benefit.  However, we
might need to adjust the logic to make the size be 1000 divided
by the number of parallel restore jobs allowed.
---
 doc/src/sgml/ref/pg_restore.sgml     |  24 +++++
 src/bin/pg_dump/pg_backup.h          |   4 +-
 src/bin/pg_dump/pg_backup_archiver.c | 139 +++++++++++++++++++++++++--
 src/bin/pg_dump/pg_backup_archiver.h |   3 +
 src/bin/pg_dump/pg_backup_db.c       |  18 ++++
 src/bin/pg_dump/pg_restore.c         |  15 ++-
 src/bin/pg_upgrade/pg_upgrade.c      |   2 +
 7 files changed, 197 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 1a23874da6..2e3ba80258 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -786,6 +786,30 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--transaction-size=<replaceable class="parameter">N</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the restore as a series of transactions, each processing
+        up to <replaceable class="parameter">N</replaceable> database
+        objects.  This option implies <option>--exit-on-error</option>.
+       </para>
+       <para>
+        <option>--transaction-size</option> offers an intermediate choice
+        between the default behavior (one transaction per SQL command)
+        and <option>-1</option>/<option>--single-transaction</option>
+        (one transaction for all restored objects).
+        While <option>--single-transaction</option> has the least
+        overhead, it may be impractical for large databases because the
+        transaction will take a lock on each restored object, possibly
+        exhausting the server's lock table space.
+        Using <option>--transaction-size</option> with a size of a few
+        thousand objects offers nearly the same performance benefits while
+        capping the amount of lock table space needed.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--use-set-session-authorization</option></term>
       <listitem>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 9ef2f2017e..fbf5f1c515 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -149,7 +149,9 @@ typedef struct _restoreOptions
 												 * compression */
 	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
 										 * to stderr */
-	bool		single_txn;
+
+	bool		single_txn;		/* restore all TOCs in one transaction */
+	int			txn_size;		/* restore this many TOCs per txn, if > 0 */
 
 	bool	   *idWanted;		/* array showing which dump IDs to emit */
 	int			enable_row_security;
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 73b9972da4..ec74846998 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX)
 			/* Otherwise, drop anything that's selected and has a dropStmt */
 			if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
 			{
+				bool		not_allowed_in_txn = false;
+
 				pg_log_info("dropping %s %s", te->desc, te->tag);
+
+				/*
+				 * In --transaction-size mode, we have to temporarily exit our
+				 * transaction block to drop objects that can't be dropped
+				 * within a transaction.
+				 */
+				if (ropt->txn_size > 0)
+				{
+					if (strcmp(te->desc, "DATABASE") == 0 ||
+						strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+					{
+						not_allowed_in_txn = true;
+						if (AH->connection)
+							CommitTransaction(AHX);
+						else
+							ahprintf(AH, "COMMIT;\n");
+					}
+				}
+
 				/* Select owner and schema as necessary */
 				_becomeOwner(AH, te);
 				_selectOutputSchema(AH, te->namespace);
@@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX)
 						}
 					}
 				}
+
+				/*
+				 * In --transaction-size mode, re-establish the transaction
+				 * block if needed; otherwise, commit after every N drops.
+				 */
+				if (ropt->txn_size > 0)
+				{
+					if (not_allowed_in_txn)
+					{
+						if (AH->connection)
+							StartTransaction(AHX);
+						else
+							ahprintf(AH, "BEGIN;\n");
+						AH->txnCount = 0;
+					}
+					else if (++AH->txnCount >= ropt->txn_size)
+					{
+						if (AH->connection)
+						{
+							CommitTransaction(AHX);
+							StartTransaction(AHX);
+						}
+						else
+							ahprintf(AH, "COMMIT;\nBEGIN;\n");
+						AH->txnCount = 0;
+					}
+				}
 			}
 		}
 
@@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX)
 		}
 	}
 
-	if (ropt->single_txn)
+	/*
+	 * Close out any persistent transaction we may have.  While these two
+	 * cases are started in different places, we can end both cases here.
+	 */
+	if (ropt->single_txn || ropt->txn_size > 0)
 	{
 		if (AH->connection)
 			CommitTransaction(AHX);
@@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
 	 */
 	if ((reqs & REQ_SCHEMA) != 0)
 	{
+		bool		object_is_db = false;
+
+		/*
+		 * In --transaction-size mode, must exit our transaction block to
+		 * create a database or set its properties.
+		 */
+		if (strcmp(te->desc, "DATABASE") == 0 ||
+			strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+		{
+			object_is_db = true;
+			if (ropt->txn_size > 0)
+			{
+				if (AH->connection)
+					CommitTransaction(&AH->public);
+				else
+					ahprintf(AH, "COMMIT;\n\n");
+			}
+		}
+
 		/* Show namespace in log message if available */
 		if (te->namespace)
 			pg_log_info("creating %s \"%s.%s\"",
@@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
 		/*
 		 * If we created a DB, connect to it.  Also, if we changed DB
 		 * properties, reconnect to ensure that relevant GUC settings are
-		 * applied to our session.
+		 * applied to our session.  (That also restarts the transaction block
+		 * in --transaction-size mode.)
 		 */
-		if (strcmp(te->desc, "DATABASE") == 0 ||
-			strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+		if (object_is_db)
 		{
 			pg_log_info("connecting to new database \"%s\"", te->tag);
 			_reconnectToDB(AH, te->tag);
@@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
 		}
 	}
 
+	/*
+	 * If we emitted anything for this TOC entry, that counts as one action
+	 * against the transaction-size limit.  Commit if it's time to.
+	 */
+	if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
+	{
+		if (++AH->txnCount >= ropt->txn_size)
+		{
+			if (AH->connection)
+			{
+				CommitTransaction(&AH->public);
+				StartTransaction(&AH->public);
+			}
+			else
+				ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+			AH->txnCount = 0;
+		}
+	}
+
 	if (AH->public.n_errors > 0 && status == WORKER_OK)
 		status = WORKER_IGNORED_ERRORS;
 
@@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
-	if (!ropt->single_txn)
+	/*
+	 * LOs must be restored within a transaction block, since we need the LO
+	 * handle to stay open while we write it.  Establish a transaction unless
+	 * there's one being used globally.
+	 */
+	if (!(ropt->single_txn || ropt->txn_size > 0))
 	{
 		if (AH->connection)
 			StartTransaction(&AH->public);
@@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
-	if (!ropt->single_txn)
+	if (!(ropt->single_txn || ropt->txn_size > 0))
 	{
 		if (AH->connection)
 			CommitTransaction(&AH->public);
@@ -3170,6 +3265,19 @@ _doSetFixedOutputState(ArchiveHandle *AH)
 	else
 		ahprintf(AH, "SET row_security = off;\n");
 
+	/*
+	 * In --transaction-size mode, we should always be in a transaction when
+	 * we begin to restore objects.
+	 */
+	if (ropt && ropt->txn_size > 0)
+	{
+		if (AH->connection)
+			StartTransaction(&AH->public);
+		else
+			ahprintf(AH, "\nBEGIN;\n");
+		AH->txnCount = 0;
+	}
+
 	ahprintf(AH, "\n");
 }
 
@@ -4033,6 +4141,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 		}
 	}
 
+	/*
+	 * In --transaction-size mode, we must commit the open transaction before
+	 * dropping the database connection.  This also ensures that child workers
+	 * can see the objects we've created so far.
+	 */
+	if (AH->public.ropt->txn_size > 0)
+		CommitTransaction(&AH->public);
+
 	/*
 	 * Now close parent connection in prep for parallel steps.  We do this
 	 * mainly to ensure that we don't exceed the specified number of parallel
@@ -4772,6 +4888,10 @@ CloneArchive(ArchiveHandle *AH)
 	clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
 	memcpy(clone, AH, sizeof(ArchiveHandle));
 
+	/* Likewise flat-copy the RestoreOptions, so we can alter them locally */
+	clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
+	memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
+
 	/* Handle format-independent fields */
 	memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
 
@@ -4793,6 +4913,13 @@ CloneArchive(ArchiveHandle *AH)
 	/* clones should not share lo_buf */
 	clone->lo_buf = NULL;
 
+	/*
+	 * Clone connections disregard --transaction-size; they must commit after
+	 * each command so that the results are immediately visible to other
+	 * workers.
+	 */
+	clone->public.ropt->txn_size = 0;
+
 	/*
 	 * Connect our new clone object to the database, using the same connection
 	 * parameters used for the original connection.
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index e4dd395582..1b9f142dea 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -324,6 +324,9 @@ struct _archiveHandle
 	char	   *currTablespace; /* current tablespace, or NULL */
 	char	   *currTableAm;	/* current table access method, or NULL */
 
+	/* in --transaction-size mode, this counts objects emitted in cur xact */
+	int			txnCount;
+
 	void	   *lo_buf;
 	size_t		lo_buf_used;
 	size_t		lo_buf_size;
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index c14d813b21..6b3bf174f2 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -554,6 +554,7 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 {
 	/* Make a writable copy of the command string */
 	char	   *buf = pg_strdup(te->defn);
+	RestoreOptions *ropt = AH->public.ropt;
 	char	   *st;
 	char	   *en;
 
@@ -562,6 +563,23 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 	{
 		*en++ = '\0';
 		ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
+
+		/* In --transaction-size mode, count each command as an action */
+		if (ropt && ropt->txn_size > 0)
+		{
+			if (++AH->txnCount >= ropt->txn_size)
+			{
+				if (AH->connection)
+				{
+					CommitTransaction(&AH->public);
+					StartTransaction(&AH->public);
+				}
+				else
+					ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
+				AH->txnCount = 0;
+			}
+		}
+
 		st = en;
 	}
 	ahprintf(AH, "\n");
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index c3beacdec1..5ea78cf7cc 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -120,6 +120,7 @@ main(int argc, char **argv)
 		{"role", required_argument, NULL, 2},
 		{"section", required_argument, NULL, 3},
 		{"strict-names", no_argument, &strict_names, 1},
+		{"transaction-size", required_argument, NULL, 5},
 		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
 		{"no-comments", no_argument, &no_comments, 1},
 		{"no-publications", no_argument, &no_publications, 1},
@@ -289,10 +290,18 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &(opts->dumpSections));
 				break;
 
-			case 4:
+			case 4:				/* filter */
 				read_restore_filters(optarg, opts);
 				break;
 
+			case 5:				/* transaction-size */
+				if (!option_parse_int(optarg, "--transaction-size",
+									  1, INT_MAX,
+									  &opts->txn_size))
+					exit(1);
+				opts->exit_on_error = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -337,6 +346,9 @@ main(int argc, char **argv)
 	if (opts->dataOnly && opts->dropSchema)
 		pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
 
+	if (opts->single_txn && opts->txn_size > 0)
+		pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together");
+
 	/*
 	 * -C is not compatible with -1, because we can't create a database inside
 	 * a transaction block.
@@ -484,6 +496,7 @@ usage(const char *progname)
 	printf(_("  --section=SECTION            restore named section (pre-data, data, or post-data)\n"));
 	printf(_("  --strict-names               require table and/or schema include patterns to\n"
 			 "                               match at least one entity each\n"));
+	printf(_("  --transaction-size=N         commit after every N objects\n"));
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 3960af4036..5cfd2282e1 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -548,6 +548,7 @@ create_new_objects(void)
 				  true,
 				  true,
 				  "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+				  "--transaction-size=1000 "
 				  "--dbname postgres \"%s/%s\"",
 				  new_cluster.bindir,
 				  cluster_conn_opts(&new_cluster),
@@ -586,6 +587,7 @@ create_new_objects(void)
 		parallel_exec_prog(log_file_name,
 						   NULL,
 						   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+						   "--transaction-size=1000 "
 						   "--dbname template1 \"%s/%s\"",
 						   new_cluster.bindir,
 						   cluster_conn_opts(&new_cluster),
-- 
2.39.3

