From e59d9c1cdcf3f109267c12d4a28525f121c69720 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Wed, 23 Mar 2022 11:00:33 -0400
Subject: [PATCH 2/5] Allow parallel zstd compression when taking a base
 backup.

libzstd allows transparent parallel compression just by setting
an option when creating the compression context, so permit that
for both client and server-side backup compression. To use this,
use something like pg_basebackup --compress WHERE-zstd:workers=N
where WHERE is "client" or "server" and N is an integer.

When compression is performed on the server side, this will spawn
threads inside the PostgreSQL backend. While there is almost no
PostgreSQL server code which is thread-safe, the threads here are used
internally by libzstd and touch only data structures controlled by
libzstd.

Patch by me, based in part on earlier work by Dipesh Pandit
and Jeevan Ladhe.
---
 doc/src/sgml/protocol.sgml                    | 12 +++++--
 doc/src/sgml/ref/pg_basebackup.sgml           |  4 +--
 src/backend/replication/basebackup_zstd.c     | 18 ++++++++++
 src/bin/pg_basebackup/bbstreamer_zstd.c       | 15 +++++++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl  |  5 +++
 src/bin/pg_verifybackup/t/009_extract.pl      | 29 ++++++++++++++--
 src/bin/pg_verifybackup/t/010_client_untar.pl | 33 +++++++++++++++++--
 src/common/backup_compression.c               | 16 +++++++++
 src/include/common/backup_compression.h       |  2 ++
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  3 +-
 10 files changed, 125 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 2fa3cedfe9e..98f0bc3cc34 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2739,17 +2739,23 @@ The commands accepted in replication mode are:
           option.  If the value is an integer, it specifies the compression
           level.  Otherwise, it should be a comma-separated list of items,
           each of the form <literal>keyword</literal> or
-          <literal>keyword=value</literal>. Currently, the only supported
-          keyword is <literal>level</literal>, which sets the compression
-          level.
+          <literal>keyword=value</literal>. Currently, the supported keywords
+          are <literal>level</literal> and <literal>workers</literal>.
         </para>
 
         <para>
+          The <literal>level</literal> keyword sets the compression level.
           For <literal>gzip</literal> the compression level should be an
           integer between 1 and 9, for <literal>lz4</literal> an integer
           between 1 and 12, and for <literal>zstd</literal> an integer
           between 1 and 22.
          </para>
+
+        <para>
+          The <literal>workers</literal> keyword sets the number of threads
+          that should be used for parallel compression. Parallel compression
+          is supported only for <literal>zstd</literal>.
+         </para>
         </listitem>
        </varlistentry>
 
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index d9233beb8e1..82f5f606250 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -424,8 +424,8 @@ PostgreSQL documentation
         integer, it specifies the compression level.  Otherwise, it should be
         a comma-separated list of items, each of the form
         <literal>keyword</literal> or <literal>keyword=value</literal>.
-        Currently, the only supported keyword is <literal>level</literal>,
-        which sets the compression level.
+        Currently, the supported keywords are <literal>level</literal>
+        and <literal>workers</literal>.
        </para>
        <para>
         If no compression level is specified, the default compression level
diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index 5496eaa72b7..d6eb0617d8a 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -28,6 +28,9 @@ typedef struct bbsink_zstd
 	/* Compression level */
 	int			compresslevel;
 
+	/* Number of parallel workers. */
+	int			workers;
+
 	ZSTD_CCtx  *cctx;
 	ZSTD_outBuffer zstd_outBuf;
 } bbsink_zstd;
@@ -83,6 +86,7 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
 	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
 	sink->base.bbs_next = next;
 	sink->compresslevel = compresslevel;
+	sink->workers = compress->workers;
 
 	return &sink->base;
 #endif
@@ -110,6 +114,20 @@ bbsink_zstd_begin_backup(bbsink *sink)
 		elog(ERROR, "could not set zstd compression level to %d: %s",
 			 mysink->compresslevel, ZSTD_getErrorName(ret));
 
+	/*
+	 * We check for failure here because (1) older versions of the library
+	 * do not support ZSTD_c_nbWorkers and (2) the library might want to
+	 * reject an unreasonable values (though in practice it does not seem to do
+	 * so).
+	 */
+	ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+								 mysink->workers);
+	if (ZSTD_isError(ret))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("could not set compression worker count to %d: %s",
+					   mysink->workers, ZSTD_getErrorName(ret)));
+
 	/*
 	 * We need our own buffer, because we're going to pass different data to
 	 * the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index 7946b6350b6..20393da595b 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -102,6 +102,21 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 		exit(1);
 	}
 
+	/*
+	 * We check for failure here because (1) older versions of the library
+	 * do not support ZSTD_c_nbWorkers and (2) the library might want to
+	 * reject unreasonable values (though in practice it does not seem to do
+	 * so).
+	 */
+	ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+								 compress->workers);
+	if (ZSTD_isError(ret))
+	{
+		pg_log_error("could not set compression worker count to %d: %s",
+					 compress->workers, ZSTD_getErrorName(ret));
+		exit(1);
+	}
+
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
 	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index 47f3d00ac45..5ba84c22509 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -130,6 +130,11 @@ my @compression_failure_tests = (
 		'invalid compression specification: found empty string where a compression option was expected',
 		'failure on extra, empty compression option'
 	],
+	[
+		'gzip:workers=3',
+		'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
+		'failure on worker count for gzip'
+	],
 );
 for my $cft (@compression_failure_tests)
 {
diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl
index 41a5b370cc5..d6f11b95535 100644
--- a/src/bin/pg_verifybackup/t/009_extract.pl
+++ b/src/bin/pg_verifybackup/t/009_extract.pl
@@ -34,6 +34,12 @@ my @test_configuration = (
 		'compression_method' => 'zstd',
 		'backup_flags' => ['--compress', 'server-zstd:5'],
 		'enabled' => check_pg_config("#define USE_ZSTD 1")
+	},
+	{
+		'compression_method' => 'parallel zstd',
+		'backup_flags' => ['--compress', 'server-zstd:workers=3'],
+		'enabled' => check_pg_config("#define USE_ZSTD 1"),
+		'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
 	}
 );
 
@@ -55,8 +61,27 @@ for my $tc (@test_configuration)
 		my @verify = ('pg_verifybackup', '-e', $backup_path);
 
 		# A backup with a valid compression method should work.
-		$primary->command_ok(\@backup,
-							 "backup done, compression method \"$method\"");
+		my $backup_stdout = '';
+		my $backup_stderr = '';
+		my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+											  '2>', \$backup_stderr);
+		if ($backup_stdout ne '')
+		{
+			print "# standard output was:\n$backup_stdout";
+		}
+		if ($backup_stderr ne '')
+		{
+			print "# standard error was:\n$backup_stderr";
+		}
+		if (! $backup_result && $tc->{'possibly_unsupported'} &&
+			$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+		{
+			skip "compression with $method not supported by this build", 2;
+		}
+		else
+		{
+			ok($backup_result, "backup done, compression $method");
+		}
 
 		# Make sure that it verifies OK.
 		$primary->command_ok(\@verify,
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
index 488a6d1edee..c1cd12cb065 100644
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -49,6 +49,15 @@ my @test_configuration = (
 		'decompress_program' => $ENV{'ZSTD'},
 		'decompress_flags' => [ '-d' ],
 		'enabled' => check_pg_config("#define USE_ZSTD 1")
+	},
+	{
+		'compression_method' => 'parallel zstd',
+		'backup_flags' => ['--compress', 'client-zstd:workers=3'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define USE_ZSTD 1"),
+		'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
 	}
 );
 
@@ -69,9 +78,27 @@ for my $tc (@test_configuration)
 			'pg_basebackup', '-D', $backup_path,
 			'-Xfetch', '--no-sync', '-cfast', '-Ft');
 		push @backup, @{$tc->{'backup_flags'}};
-		$primary->command_ok(\@backup,
-							 "client side backup, compression $method");
-
+		my $backup_stdout = '';
+		my $backup_stderr = '';
+		my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+											  '2>', \$backup_stderr);
+		if ($backup_stdout ne '')
+		{
+			print "# standard output was:\n$backup_stdout";
+		}
+		if ($backup_stderr ne '')
+		{
+			print "# standard error was:\n$backup_stderr";
+		}
+		if (! $backup_result && $tc->{'possibly_unsupported'} &&
+			$backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+		{
+			skip "compression with $method not supported by this build", 3;
+		}
+		else
+		{
+			ok($backup_result, "client side backup, compression $method");
+		}
 
 		# Verify that the we got the files we expected.
 		my $backup_files = join(',',
diff --git a/src/common/backup_compression.c b/src/common/backup_compression.c
index 0650f975c44..969e08cca20 100644
--- a/src/common/backup_compression.c
+++ b/src/common/backup_compression.c
@@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
 			result->level = expect_integer_value(keyword, value, result);
 			result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
 		}
+		else if (strcmp(keyword, "workers") == 0)
+		{
+			result->workers = expect_integer_value(keyword, value, result);
+			result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
+		}
 		else
 			result->parse_error =
 				psprintf(_("unknown compression option \"%s\""), keyword);
@@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec)
 							min_level, max_level);
 	}
 
+	/*
+	 * Of the compression algorithms that we currently support, only zstd
+	 * allows parallel workers.
+	 */
+	if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 &&
+		(spec->algorithm != BACKUP_COMPRESSION_ZSTD))
+	{
+		return psprintf(_("compression algorithm \"%s\" does not accept a worker count"),
+						get_bc_algorithm_name(spec->algorithm));
+	}
+
 	return NULL;
 }
diff --git a/src/include/common/backup_compression.h b/src/include/common/backup_compression.h
index 0565cbc657d..6a0ecaa99c9 100644
--- a/src/include/common/backup_compression.h
+++ b/src/include/common/backup_compression.h
@@ -23,12 +23,14 @@ typedef enum bc_algorithm
 } bc_algorithm;
 
 #define	BACKUP_COMPRESSION_OPTION_LEVEL			(1 << 0)
+#define BACKUP_COMPRESSION_OPTION_WORKERS		(1 << 1)
 
 typedef struct bc_specification
 {
 	bc_algorithm algorithm;
 	unsigned	options;		/* OR of BACKUP_COMPRESSION_OPTION constants */
 	int			level;
+	int			workers;
 	char	   *parse_error;	/* NULL if parsing was OK, else message */
 } bc_specification;
 
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index bee6aacf47c..b6e33516110 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2502,8 +2502,7 @@ sub run_log
 
 	local %ENV = $self->_get_env();
 
-	PostgreSQL::Test::Utils::run_log(@_);
-	return;
+	return PostgreSQL::Test::Utils::run_log(@_);
 }
 
 =pod
-- 
2.17.1

