From 2486417b7c3586e150e806a1fbc3b873c2a4a0f9 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Sat, 7 Jan 2023 15:45:06 -0600
Subject: [PATCH 1/3] WIP: pg_dump: zstd compression

Previously proposed at: 20201221194924.GI30237@telsasoft.com

note-to-self: see also: private commit 36ab001fb
---
 src/bin/pg_dump/Makefile              |   1 +
 src/bin/pg_dump/compress_io.c         |  54 +--
 src/bin/pg_dump/compress_zstd.c       | 520 ++++++++++++++++++++++++++
 src/bin/pg_dump/compress_zstd.h       |   9 +
 src/bin/pg_dump/meson.build           |   1 +
 src/bin/pg_dump/pg_backup_archiver.c  |   9 +-
 src/bin/pg_dump/pg_backup_directory.c |   2 +
 src/bin/pg_dump/pg_dump.c             |  13 -
 src/bin/pg_dump/t/002_pg_dump.pl      |  71 ++++
 9 files changed, 640 insertions(+), 40 deletions(-)
 create mode 100644 src/bin/pg_dump/compress_zstd.c
 create mode 100644 src/bin/pg_dump/compress_zstd.h

diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index eb8f59459a1..76574298faf 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -29,6 +29,7 @@ OBJS = \
 	compress_io.o \
 	compress_lz4.o \
 	compress_none.o \
+	compress_zstd.o \
 	dumputils.o \
 	parallel.o \
 	pg_backup_archiver.o \
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index ce06f1eac9c..061e3d9ce1c 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -52,8 +52,8 @@
  *
  *	InitDiscoverCompressFileHandle tries to infer the compression by the
  *	filename suffix. If the suffix is not yet known then it tries to simply
- *	open the file and if it fails, it tries to open the same file with the .gz
- *	suffix, and then again with the .lz4 suffix.
+ *	open the file and if it fails, it tries to open the same file with
+ *	compressed suffixes.
  *
  * IDENTIFICATION
  *	   src/bin/pg_dump/compress_io.c
@@ -69,6 +69,7 @@
 #include "compress_io.h"
 #include "compress_lz4.h"
 #include "compress_none.h"
+#include "compress_zstd.h"
 #include "pg_backup_utils.h"
 
 /*----------------------
@@ -98,6 +99,10 @@ supports_compression(const pg_compress_specification compression_spec)
 	if (algorithm == PG_COMPRESSION_LZ4)
 		supported = true;
 #endif
+#ifdef USE_ZSTD
+	if (algorithm == PG_COMPRESSION_ZSTD)
+		supported = true;
+#endif
 
 	if (!supported)
 		return psprintf("this build does not support compression with %s",
@@ -130,6 +135,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
 		InitCompressorGzip(cs, compression_spec);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
 		InitCompressorLZ4(cs, compression_spec);
+	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+		InitCompressorZstd(cs, compression_spec);
 
 	return cs;
 }
@@ -196,25 +203,36 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
 		InitCompressFileHandleGzip(CFH, compression_spec);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
 		InitCompressFileHandleLZ4(CFH, compression_spec);
+	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+		InitCompressFileHandleZstd(CFH, compression_spec);
 
 	return CFH;
 }
 
+static bool
+check_compressed_file(const char *path, char **fname, char *ext)
+{
+	free_keep_errno(*fname);
+	*fname = psprintf("%s.%s", path, ext);
+	return (access(*fname, F_OK) == 0);
+}
+
 /*
  * Open a file for reading. 'path' is the file to open, and 'mode' should
  * be either "r" or "rb".
  *
  * If the file at 'path' contains the suffix of a supported compression method,
- * currently this includes ".gz" and ".lz4", then this compression will be used
+ * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used
  * throughout. Otherwise the compression will be inferred by iteratively trying
  * to open the file at 'path', first as is, then by appending known compression
  * suffixes. So if you pass "foo" as 'path', this will open either "foo" or
- * "foo.gz" or "foo.lz4", trying in that order.
+ * "foo.{gz,lz4,zst}", trying in that order.
  *
  * On failure, return NULL with an error code in errno.
  */
 CompressFileHandle *
 InitDiscoverCompressFileHandle(const char *path, const char *mode)
+// pg_compress_algorithm alg
 {
 	CompressFileHandle *CFH = NULL;
 	struct stat st;
@@ -237,28 +255,12 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 		/* avoid unused warning if it is not built with compression */
 		if (exists)
 			compression_spec.algorithm = PG_COMPRESSION_NONE;
-#ifdef HAVE_LIBZ
-		if (!exists)
-		{
-			free_keep_errno(fname);
-			fname = psprintf("%s.gz", path);
-			exists = (stat(fname, &st) == 0);
-
-			if (exists)
-				compression_spec.algorithm = PG_COMPRESSION_GZIP;
-		}
-#endif
-#ifdef USE_LZ4
-		if (!exists)
-		{
-			free_keep_errno(fname);
-			fname = psprintf("%s.lz4", path);
-			exists = (stat(fname, &st) == 0);
-
-			if (exists)
-				compression_spec.algorithm = PG_COMPRESSION_LZ4;
-		}
-#endif
+		else if (check_compressed_file(path, &fname, "gz")) // alg == PG_COMPRESSION_GZIP &&
+			compression_spec.algorithm = PG_COMPRESSION_GZIP;
+		else if (check_compressed_file(path, &fname, "lz4")) // alg == PG_COMPRESSION_LZ4 &&
+			compression_spec.algorithm = PG_COMPRESSION_LZ4;
+		else if (check_compressed_file(path, &fname, "zst")) // alg == PG_COMPRESSION_ZSTD &&
+			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	}
 
 	CFH = InitCompressFileHandle(compression_spec);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
new file mode 100644
index 00000000000..659d08533ae
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -0,0 +1,520 @@
+#include "postgres_fe.h"
+
+#include "pg_backup_utils.h"
+#include "compress_zstd.h"
+
+#ifndef USE_ZSTD
+
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+	pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+	pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+#else
+
+#include <zstd.h>
+
+typedef struct ZstdCompressorState
+{
+	/* This is a normal file to which we read/write compressed data */
+	FILE	   *fp;
+	/* XXX: use one separate ZSTD_CStream per thread: disable on windows ? */
+	ZSTD_CStream *cstream;
+	ZSTD_DStream *dstream;
+	ZSTD_outBuffer output;
+	ZSTD_inBuffer input;
+}			ZstdCompressorState;
+
+static ZSTD_CStream *ZstdCStreamParams(pg_compress_specification compress);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+								   const void *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+ZSTD_CCtx_setParam_or_die(ZSTD_CStream * cstream,
+						  ZSTD_cParameter param, int value)
+{
+	size_t		res;
+
+	res = ZSTD_CCtx_setParameter(cstream, param, value);
+	if (ZSTD_isError(res))
+		pg_fatal("could not set compression parameter: %s",
+				 ZSTD_getErrorName(res));
+}
+
+/* Return a compression stream with parameters set per argument */
+static ZSTD_CStream *
+ZstdCStreamParams(pg_compress_specification compress)
+{
+	ZSTD_CStream *cstream;
+
+	cstream = ZSTD_createCStream();
+	if (cstream == NULL)
+		pg_fatal("could not initialize compression library");
+
+	ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
+							  compress.level);
+
+	if (compress.options & PG_COMPRESSION_OPTION_WORKERS)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
+								  compress.workers);
+
+#if 0
+	if (compress.options & PG_COMPRESSION_OPTION_CHECKSUM)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_checksumFlag,
+								  compress.checksum);
+
+	/* Still marked as experimental */
+	if (compress.options & PG_COMPRESSION_OPTION_RSYNCABLE)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_rsyncable, 1);
+#endif
+
+	return cstream;
+}
+
+void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+	ZSTD_outBuffer *output = &zstdcs->output;
+
+	if (cs->writeF == NULL)
+		return;
+
+	for (;;)
+	{
+		size_t		res;
+
+		output->pos = 0;
+		res = ZSTD_compressStream2(zstdcs->cstream, output,
+								   &zstdcs->input, ZSTD_e_end);
+
+		if (output->pos > 0)
+			cs->writeF(AH, output->dst, output->pos);
+		/* TODO check that we wrote "pos" bytes */
+
+		if (ZSTD_isError(res))
+			pg_fatal("could not close compression stream: %s",
+					 ZSTD_getErrorName(res));
+
+		if (res == 0)
+			break;
+	}
+
+	/* XXX: retval */
+	ZSTD_freeCStream(zstdcs->cstream);
+	pg_free(zstdcs->output.dst);
+	pg_free(zstdcs);
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const void *data, size_t dLen)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+
+	input->src = data;
+	input->size = dLen;
+	input->pos = 0;
+
+#if 0
+	ZSTD_CCtx_reset(zstdcs->cstream, ZSTD_reset_session_only); // XXX */
+	res = ZSTD_CCtx_setPledgedSrcSize(cs->zstd.cstream, dLen);
+	if (ZSTD_isError(res))
+	pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+#endif
+
+	while (input->pos != input->size)
+	{
+		size_t		res;
+
+		res = ZSTD_compressStream2(zstdcs->cstream, output,
+								   input, ZSTD_e_continue);
+
+		if (output->pos == output->size ||
+			input->pos != input->size)
+		{
+			/*
+			 * Extra paranoia: avoid zero-length chunks, since a zero length
+			 * chunk is the EOF marker in the custom format. This should never
+			 * happen but...
+			 */
+			if (output->pos > 0)
+				cs->writeF(AH, output->dst, output->pos);
+
+			output->pos = 0;
+		}
+
+		if (ZSTD_isError(res))
+			pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+	}
+}
+
+/* Read data from a compressed zstd archive */
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZSTD_DStream *dstream;
+	ZSTD_outBuffer output;
+	ZSTD_inBuffer input;
+	size_t		res;
+	size_t		input_size;
+
+	dstream = ZSTD_createDStream();
+	if (dstream == NULL)
+		pg_fatal("could not initialize compression library");
+
+	input_size = ZSTD_DStreamInSize();
+	input.src = pg_malloc(input_size);
+
+	output.size = ZSTD_DStreamOutSize();
+	output.dst = pg_malloc(output.size);
+
+	/* read compressed data */
+	for (;;)
+	{
+		size_t		cnt;
+
+		/*
+		 * XXX: the buffer can grow, we shouldn't keep resetting it to the
+		 * original value..
+		 */
+		input.size = input_size;
+
+		cnt = cs->readF(AH, (char **) unconstify(void **, &input.src), &input.size);
+		input.pos = 0;
+		input.size = cnt;
+
+		if (cnt == 0)
+			break;
+
+		while (input.pos < input.size)
+		{
+			/* decompress */
+			output.pos = 0;
+			res = ZSTD_decompressStream(dstream, &output, &input);
+
+			if (ZSTD_isError(res))
+				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			/* write to output handle */
+			((char *) output.dst)[output.pos] = '\0';
+			ahwrite(output.dst, 1, output.pos, AH);
+			/* if (res == 0) break; */
+		}
+	}
+
+	pg_free(unconstify(void *, input.src));
+	pg_free(output.dst);
+}
+
+/* Public routines that support Zstd compressed data I/O */
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+	ZstdCompressorState *zstdcs;
+
+	cs->readData = ReadDataFromArchiveZstd;
+	cs->writeData = WriteDataToArchiveZstd;
+	cs->end = EndCompressorZstd;
+
+	cs->compression_spec = compression_spec;
+
+	cs->private_data = pg_malloc0(sizeof(ZstdCompressorState));
+	zstdcs = cs->private_data;
+	/* XXX: initialize safely like the corresponding zlib "paranoia" */
+	zstdcs->output.size = ZSTD_CStreamOutSize();
+	zstdcs->output.dst = pg_malloc(zstdcs->output.size);
+	zstdcs->output.pos = 0;
+	zstdcs->cstream = ZstdCStreamParams(cs->compression_spec);
+}
+
+/*----------------------
+ * Compress File API
+ *----------------------
+ */
+
+static size_t
+Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	size_t		input_size = ZSTD_DStreamInSize();
+
+	/* input_size is the allocated size */
+	size_t		res,
+				cnt;
+
+	output->size = size;
+	output->dst = ptr;
+	output->pos = 0;
+
+	for (;;)
+	{
+		Assert(input->pos <= input->size);
+		Assert(input->size <= input_size);
+
+		/* If the input is completely consumed, start back at the beginning */
+		if (input->pos == input->size)
+		{
+			/* input->size is size produced by "fread" */
+			input->size = 0;
+			/* input->pos is position consumed by decompress */
+			input->pos = 0;
+		}
+
+		/* read compressed data if we must produce more input */
+		if (input->pos == input->size)
+		{
+			cnt = fread(unconstify(void *, input->src), 1, input_size, zstdcs->fp);
+			input->size = cnt;
+
+			/* If we have no input to consume, we're done */
+			if (cnt == 0)
+				break;
+		}
+
+		Assert(cnt >= 0);
+		Assert(input->size <= input_size);
+
+		/* Now consume as much as possible */
+		for (; input->pos < input->size;)
+		{
+			/* decompress */
+			res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+			if (ZSTD_isError(res))
+				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+			if (output->pos == output->size)
+				break;			/* No more room for output */
+			if (res == 0)
+				break;			/* End of frame */
+		}
+
+		if (output->pos == output->size)
+			break;				/* We read all the data that fits */
+	}
+
+	return output->pos;
+}
+
+static size_t
+Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	size_t		res,
+				cnt;
+
+	input->src = ptr;
+	input->size = size;
+	input->pos = 0;
+
+#if 0
+	ZSTD_CCtx_reset(fp->zstd.cstream, ZSTD_reset_session_only);
+	res = ZSTD_CCtx_setPledgedSrcSize(fp->zstd.cstream, size);
+	if (ZSTD_isError(res))
+	pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+#endif
+
+	/* Consume all input, and flush later */
+	while (input->pos != input->size)
+	{
+		output->pos = 0;
+		res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
+		if (ZSTD_isError(res))
+			pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+		cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+		if (cnt != output->pos)
+			pg_fatal("could not write data: %m");
+	}
+
+	return size;
+}
+
+static int
+Zstd_getc(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	int			ret;
+
+	if (CFH->read_func(&ret, 1, CFH) != 1)
+	{
+		if (feof(zstdcs->fp))
+			pg_fatal("could not read from input file: end of file");
+		else
+			pg_fatal("could not read from input file: %m");
+	}
+	return ret;
+}
+
+static char *
+Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
+{
+	/*
+	 * Read one byte at a time until newline or EOF. This is only used to read
+	 * the list of blobs, and the I/O is buffered anyway.
+	 */
+	int			i,
+				res;
+
+	for (i = 0; i < len - 1; ++i)
+	{
+		res = CFH->read_func(&buf[i], 1, CFH);
+		if (res != 1)
+			break;
+		if (buf[i] == '\n')
+		{
+			++i;
+			break;
+		}
+	}
+	buf[i] = '\0';
+	return i > 0 ? buf : 0;
+}
+
+static int
+Zstd_close(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	int			result;
+
+	if (zstdcs->cstream)
+	{
+		size_t		res,
+					cnt;
+		ZSTD_inBuffer *input = &zstdcs->input;
+		ZSTD_outBuffer *output = &zstdcs->output;
+
+		for (;;)
+		{
+			output->pos = 0;
+			res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
+			if (ZSTD_isError(res))
+				pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+			cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+			if (cnt != output->pos)
+				pg_fatal("could not write data: %m");
+
+			if (res == 0)
+				break;
+		}
+
+		ZSTD_freeCStream(zstdcs->cstream);
+		pg_free(zstdcs->output.dst);
+	}
+
+	if (zstdcs->dstream)
+	{
+		ZSTD_freeDStream(zstdcs->dstream);
+		pg_free(unconstify(void *, zstdcs->input.src));
+	}
+
+	result = fclose(zstdcs->fp);
+	pg_free(zstdcs);
+	return result;
+}
+
+static int
+Zstd_eof(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+	return feof(zstdcs->fp);
+}
+
+static int
+Zstd_open(const char *path, int fd, const char *mode,
+		  CompressFileHandle *CFH)
+{
+	FILE	   *fp;
+	ZstdCompressorState *zstdcs;
+
+	if (fd >= 0)
+		fp = fdopen(fd, mode);
+	else
+		fp = fopen(path, mode);
+
+	if (fp == NULL)
+	{
+		/* XXX zstdcs->errcode = errno; */
+		return 1;
+	}
+
+	CFH->private_data = pg_malloc0(sizeof(ZstdCompressorState));
+	zstdcs = (ZstdCompressorState *) CFH->private_data;
+	zstdcs->fp = fp;
+
+	if (mode[0] == 'w' || mode[0] == 'a')
+	{
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+		zstdcs->cstream = ZstdCStreamParams(CFH->compression_spec);
+	}
+	else if (strchr(mode, 'r'))
+	{
+		zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+	/* XXX else: bad mode */
+
+	return 0;
+}
+
+static int
+Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+{
+	char		fname[MAXPGPATH];
+
+	sprintf(fname, "%s.zst", path);
+	return CFH->open_func(fname, -1, mode, CFH);
+}
+
+static const char *
+Zstd_get_error(CompressFileHandle *CFH)
+{
+#if 0
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+	if (ZSTD_isError(res))
+		return ZSTD_getErrorName(res)
+	else
+#endif
+
+	return strerror(errno);
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+	CFH->open_func = Zstd_open;
+	CFH->open_write_func = Zstd_open_write;
+	CFH->read_func = Zstd_read;
+	CFH->write_func = Zstd_write;
+	CFH->gets_func = Zstd_gets;
+	CFH->getc_func = Zstd_getc;
+	CFH->close_func = Zstd_close;
+	CFH->eof_func = Zstd_eof;
+	CFH->get_error_func = Zstd_get_error;
+
+	CFH->compression_spec = compression_spec;
+
+	CFH->private_data = NULL;
+}
+
+#endif							/* USE_ZSTD */
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
new file mode 100644
index 00000000000..f36698b4c26
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -0,0 +1,9 @@
+#ifndef COMPRESS_ZSTD_H
+#define COMPRESS_ZSTD_H
+
+#include "compress_io.h"
+
+extern void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec);
+extern void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec);
+
+#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build
index 0da476a4c34..334d449091d 100644
--- a/src/bin/pg_dump/meson.build
+++ b/src/bin/pg_dump/meson.build
@@ -5,6 +5,7 @@ pg_dump_common_sources = files(
   'compress_io.c',
   'compress_lz4.c',
   'compress_none.c',
+  'compress_zstd.c',
   'dumputils.c',
   'parallel.c',
   'pg_backup_archiver.c',
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 61ebb8fe85d..6e97d1f5894 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -2075,7 +2075,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
 		/*
 		 * Check if the specified archive is a directory. If so, check if
-		 * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
+		 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
 		 */
 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
 		{
@@ -2086,10 +2086,17 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
 				return AH->format;
 #endif
+
 #ifdef USE_LZ4
 			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
 				return AH->format;
 #endif
+
+#ifdef USE_ZSTD
+			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
+				return AH->format;
+#endif
+
 			pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
 					 AH->fSpec);
 			fh = NULL;			/* keep compiler quiet */
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 41c2b733e3e..29845340859 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
+			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+				strlcat(fname, ".zst", sizeof(fname));
 
 			if (stat(fname, &st) == 0)
 				te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 24ba936332d..72f6126a1fb 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -710,19 +710,6 @@ main(int argc, char **argv)
 		pg_fatal("invalid compression specification: %s",
 				 error_detail);
 
-	switch (compression_algorithm)
-	{
-		case PG_COMPRESSION_NONE:
-			/* fallthrough */
-		case PG_COMPRESSION_GZIP:
-			/* fallthrough */
-		case PG_COMPRESSION_LZ4:
-			break;
-		case PG_COMPRESSION_ZSTD:
-			pg_fatal("compression with %s is not yet supported", "ZSTD");
-			break;
-	}
-
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 72b19ee6cde..160cd1124a2 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -213,6 +213,77 @@ my %pgdump_runs = (
 		},
 	},
 
+	compression_zstd_custom => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump',      '--format=custom',
+			'--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump",
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			"--file=$tempdir/compression_zstd_custom.sql",
+			"$tempdir/compression_zstd_custom.dump",
+		],
+		command_like => {
+			command => [
+				'pg_restore',
+				'-l', "$tempdir/compression_zstd_custom.dump",
+			],
+			expected => qr/Compression: zstd/,
+			name => 'data content is zstd compressed'
+		},
+	},
+
+	compression_zstd_dir => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump',                              '--jobs=2',
+			'--format=directory',                   '--compress=zstd:1',
+			"--file=$tempdir/compression_zstd_dir", 'postgres',
+		],
+		# Give coverage for manually compressed blob.toc files during
+		# restore.
+		compress_cmd => {
+			program => $ENV{'ZSTD'},
+			args    => [
+				'-z', '-f', '--rm',
+				"$tempdir/compression_zstd_dir/blobs.toc",
+				"$tempdir/compression_zstd_dir/blobs.toc.zst",
+			],
+		},
+		# Verify that data files were compressed
+		glob_patterns => [
+			"$tempdir/compression_zstd_dir/toc.dat",
+		    "$tempdir/compression_zstd_dir/*.dat.zst",
+		],
+		restore_cmd => [
+			'pg_restore', '--jobs=2',
+			"--file=$tempdir/compression_zstd_dir.sql",
+			"$tempdir/compression_zstd_dir",
+		],
+	},
+
+	compression_zstd_plain => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump', '--format=plain', '--compress=zstd',
+			"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
+		],
+		# Decompress the generated file to run through the tests.
+		compress_cmd => {
+			program => $ENV{'ZSTD'},
+			args    => [
+				'-d', '-f',
+				"$tempdir/compression_zstd_plain.sql.zst",
+				"$tempdir/compression_zstd_plain.sql",
+			],
+		},
+	},
+
 	clean => {
 		dump_cmd => [
 			'pg_dump',
-- 
2.34.1

