From 8a8939a3060c984af289b5fe62754d94f2675248 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Fri, 11 Dec 2020 15:01:25 -0600
Subject: [PATCH 3/7] pg_dump: zstd compression

document any change in search for .gz?
docs
---
 configure                             | 123 ++++++-
 configure.ac                          |  22 ++
 src/bin/pg_dump/compress_io.c         | 475 ++++++++++++++++++++++++--
 src/bin/pg_dump/compress_io.h         |   5 +-
 src/bin/pg_dump/pg_backup_archiver.h  |   5 +
 src/bin/pg_dump/pg_backup_directory.c |   6 +-
 src/bin/pg_dump/pg_dump.c             |   3 +-
 src/include/pg_config.h.in            |   3 +
 8 files changed, 597 insertions(+), 45 deletions(-)

diff --git a/configure b/configure
index 11a4284e5b..240e536e04 100755
--- a/configure
+++ b/configure
@@ -698,6 +698,7 @@ with_gnu_ld
 LD
 LDFLAGS_SL
 LDFLAGS_EX
+with_zstd
 with_zlib
 with_system_tzdata
 with_libxslt
@@ -798,6 +799,7 @@ infodir
 docdir
 oldincludedir
 includedir
+runstatedir
 localstatedir
 sharedstatedir
 sysconfdir
@@ -866,6 +868,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -935,6 +938,7 @@ datadir='${datarootdir}'
 sysconfdir='${prefix}/etc'
 sharedstatedir='${prefix}/com'
 localstatedir='${prefix}/var'
+runstatedir='${localstatedir}/run'
 includedir='${prefix}/include'
 oldincludedir='/usr/include'
 docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@@ -1187,6 +1191,15 @@ do
   | -silent | --silent | --silen | --sile | --sil)
     silent=yes ;;
 
+  -runstatedir | --runstatedir | --runstatedi | --runstated \
+  | --runstate | --runstat | --runsta | --runst | --runs \
+  | --run | --ru | --r)
+    ac_prev=runstatedir ;;
+  -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
+  | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
+  | --run=* | --ru=* | --r=*)
+    runstatedir=$ac_optarg ;;
+
   -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
     ac_prev=sbindir ;;
   -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@@ -1324,7 +1337,7 @@ fi
 for ac_var in	exec_prefix prefix bindir sbindir libexecdir datarootdir \
 		datadir sysconfdir sharedstatedir localstatedir includedir \
 		oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
-		libdir localedir mandir
+		libdir localedir mandir runstatedir
 do
   eval ac_val=\$$ac_var
   # Remove trailing slashes.
@@ -1477,6 +1490,7 @@ Fine tuning of the installation directories:
   --sysconfdir=DIR        read-only single-machine data [PREFIX/etc]
   --sharedstatedir=DIR    modifiable architecture-independent data [PREFIX/com]
   --localstatedir=DIR     modifiable single-machine data [PREFIX/var]
+  --runstatedir=DIR       modifiable per-process data [LOCALSTATEDIR/run]
   --libdir=DIR            object code libraries [EPREFIX/lib]
   --includedir=DIR        C header files [PREFIX/include]
   --oldincludedir=DIR     C header files for non-gcc [/usr/include]
@@ -1570,6 +1584,7 @@ Optional Packages:
   --with-system-tzdata=DIR
                           use system time zone data in DIR
   --without-zlib          do not use Zlib
+  --without-zstd          do not use Zstd
   --with-gnu-ld           assume the C compiler uses GNU ld [default=no]
 
 Some influential environment variables:
@@ -8601,6 +8616,35 @@ fi
 
 
 
+#
+# Zstd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      :
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=yes
+
+fi
+
+
+
+
 #
 # Assignments
 #
@@ -12092,6 +12136,59 @@ fi
 
 fi
 
+if test "$with_zstd" = yes; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressStream2 in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compressStream2 in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compressStream2+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compressStream2 ();
+int
+main ()
+{
+return ZSTD_compressStream2 ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compressStream2=yes
+else
+  ac_cv_lib_zstd_ZSTD_compressStream2=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compressStream2" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compressStream2" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compressStream2" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+fi
+
 if test "$enable_spinlocks" = yes; then
 
 $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
@@ -13295,6 +13392,20 @@ Use --without-zlib to disable zlib support." "$LINENO" 5
 fi
 
 
+fi
+
+if test "$with_zstd" = yes; then
+  ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default"
+if test "x$ac_cv_header_zstd_h" = xyes; then :
+
+else
+  as_fn_error $? "zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+
 fi
 
 if test "$with_gssapi" = yes ; then
@@ -14689,7 +14800,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14735,7 +14846,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14759,7 +14870,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14804,7 +14915,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14828,7 +14939,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
diff --git a/configure.ac b/configure.ac
index fc523c6aeb..7f7222159a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, yes,
+              [do not use Zstd])
+AC_SUBST(with_zstd)
+
 #
 # Assignments
 #
@@ -1186,6 +1193,14 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_compressStream2, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1415,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 1417401086..b51ba680a2 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -13,7 +13,7 @@
  * friends, providing an interface similar to those, but abstracts away
  * the possible compression. Both APIs use libz for the compression, but
  * the second API uses gzip headers, so the resulting files can be easily
- * manipulated with the gzip utility.
+ * manipulated with the gzip utility. XXX
  *
  * Compressor API
  * --------------
@@ -41,7 +41,7 @@
  *	libz's gzopen() APIs. It allows you to use the same functions for
  *	compressed and uncompressed streams. cfopen_read() first tries to open
  *	the file with given name, and if it fails, it tries to open the same
- *	file with the .gz suffix. cfopen_write() opens a file for writing, an
+ *	file with the .gz suffix. cfopen_write() opens a file for writing, an XXX
  *	extra argument specifies if the file should be compressed, and adds the
  *	.gz suffix to the filename if so. This allows you to easily handle both
  *	compressed and uncompressed files.
@@ -72,6 +72,18 @@ struct CompressorState
 	char	   *zlibOut;
 	size_t		zlibOutSize;
 #endif
+
+#ifdef HAVE_LIBZSTD
+	union {
+		struct {
+			ZSTD_outBuffer output;
+			ZSTD_inBuffer input;
+			// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+			ZSTD_CStream *cstream;
+		} zstd;
+	} u;
+#endif
+
 };
 
 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
@@ -88,6 +100,15 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
 #endif
 
+#ifdef HAVE_LIBZSTD
+static void InitCompressorZstd(CompressorState *cs, int level);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF);
+#endif
+
 /* Routines that support uncompressed data I/O */
 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
 static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
@@ -101,15 +122,25 @@ static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
 static void
 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
 {
-	if (compression == Z_DEFAULT_COMPRESSION ||
-		(compression > 0 && compression <= 9))
-		*alg = COMPR_ALG_LIBZ;
-	else if (compression == 0)
-		*alg = COMPR_ALG_NONE;
-	else
+	switch (compression)
 	{
-		fatal("invalid compression code: %d", compression);
-		*alg = COMPR_ALG_NONE;	/* keep compiler quiet */
+#ifdef HAVE_LIBZSTD
+		case ZSTD_COMPRESSION:
+			*alg = COMPR_ALG_ZSTD;
+			break;
+#endif
+#ifdef HAVE_ZLIB
+		case Z_DEFAULT_COMPRESSION:
+		case 1..9:
+			*alg = COMPR_ALG_LIBZ;
+			break;
+#endif
+		case 0:
+			*alg = COMPR_ALG_NONE;
+			break;
+		default:
+			fatal("invalid compression code: %d", compression);
+			*alg = COMPR_ALG_NONE;	/* keep compiler quiet */
 	}
 
 	/* The level is just the passed-in value. */
@@ -141,10 +172,23 @@ AllocateCompressor(int compression, WriteFunc writeF)
 	/*
 	 * Perform compression algorithm specific initialization.
 	 */
+	switch (alg)
+	{
 #ifdef HAVE_LIBZ
-	if (alg == COMPR_ALG_LIBZ)
+	case COMPR_ALG_LIBZ:
 		InitCompressorZlib(cs, level);
+		break;
+#endif
+#ifdef HAVE_LIBZSTD
+	case COMPR_ALG_ZSTD:
+		InitCompressorZstd(cs, level);
+		break;
 #endif
+	case COMPR_ALG_NONE:
+		/* Do nothing */
+		break;
+	// default:
+	}
 
 	return cs;
 }
@@ -162,12 +206,20 @@ ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
 
 	if (alg == COMPR_ALG_NONE)
 		ReadDataFromArchiveNone(AH, readF);
-	if (alg == COMPR_ALG_LIBZ)
+	else if (alg == COMPR_ALG_LIBZ)
 	{
 #ifdef HAVE_LIBZ
 		ReadDataFromArchiveZlib(AH, readF);
 #else
 		fatal("not built with zlib support");
+#endif
+	}
+	else if (alg == COMPR_ALG_ZSTD)
+	{
+#ifdef HAVE_LIBZSTD
+		ReadDataFromArchiveZstd(AH, readF);
+#else
+		fatal("not built with zstd support");
 #endif
 	}
 }
@@ -188,6 +240,15 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
 			fatal("not built with zlib support");
 #endif
 			break;
+
+		case COMPR_ALG_ZSTD:
+#ifdef HAVE_LIBZSTD
+			WriteDataToArchiveZstd(AH, cs, data, dLen);
+#else
+			fatal("not built with zstd support");
+#endif
+			break;
+
 		case COMPR_ALG_NONE:
 			WriteDataToArchiveNone(AH, cs, data, dLen);
 			break;
@@ -204,12 +265,172 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs)
 	if (cs->comprAlg == COMPR_ALG_LIBZ)
 		EndCompressorZlib(AH, cs);
 #endif
+#ifdef HAVE_LIBZSTD
+	if (cs->comprAlg == COMPR_ALG_ZSTD)
+		EndCompressorZstd(AH, cs);
+#endif
+
 	free(cs);
 }
 
 /* Private routines, specific to each compression method. */
+// XXX: put in separate files ?
+
+#ifdef HAVE_LIBZSTD
+static void
+InitCompressorZstd(CompressorState *cs, int level)
+{
+	cs->u.zstd.cstream = ZSTD_createCStream();
+	if (cs->u.zstd.cstream == NULL)
+		fatal("could not initialize compression library");
+
+	/* XXX: initialize safely like the corresponding zlib "paranoia" */
+	cs->u.zstd.output.size = ZSTD_CStreamOutSize();
+	cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size);
+	cs->u.zstd.output.pos = 0;
+}
+
+static void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZSTD_outBuffer	*output = &cs->u.zstd.output;
+
+	for (;;)
+	{
+		size_t res;
+
+		res = ZSTD_compressStream2(cs->u.zstd.cstream, output,
+				&cs->u.zstd.input, ZSTD_e_end);
+
+		if (output->pos > 0)
+			cs->writeF(AH, output->dst, output->pos);
+
+		if (res == 0)
+			break;
+
+		if (ZSTD_isError(res))
+			fatal("could not close compression stream: %s",
+					ZSTD_getErrorName(res));
+	}
+
+	// XXX: retval
+	ZSTD_freeCStream(cs->u.zstd.cstream);
+}
+
+static void
+DeflateCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZSTD_inBuffer	*input = &cs->u.zstd.input;
+	ZSTD_outBuffer	*output = &cs->u.zstd.output;
+
+	while (input->pos != input->size)
+	{
+		size_t		res;
+
+		res = ZSTD_compressStream2(cs->u.zstd.cstream, output,
+				input, ZSTD_e_continue);
+
+		if (output->pos == output->size ||
+				input->pos != input->size)
+		{
+			/*
+			 * Extra paranoia: avoid zero-length chunks, since a zero length
+			 * chunk is the EOF marker in the custom format. This should never
+			 * happen but...
+			 */
+			if (output->pos > 0)
+				cs->writeF(AH, output->dst, output->pos);
+
+			output->pos = 0;
+		}
+
+		if (ZSTD_isError(res))
+			fatal("could not compress data: %s", ZSTD_getErrorName(res));
+	}
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen)
+{
+	cs->u.zstd.input.src = (void *) unconstify(char *, data);
+	cs->u.zstd.input.size = dLen;
+	cs->u.zstd.input.pos = 0;
+	DeflateCompressorZstd(AH, cs);
+}
+
+/* Read data from a compressed zstd archive */
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF)
+{
+	ZSTD_DStream	*dstream;
+	ZSTD_outBuffer	output;
+	ZSTD_inBuffer	input;
+	size_t			res;
+	size_t			input_size;
+
+	dstream = ZSTD_createDStream();
+	if (dstream == NULL)
+		fatal("could not initialize compression library");
+
+	input_size = ZSTD_DStreamInSize();
+	input.src = pg_malloc(input_size);
+
+	output.size = ZSTD_DStreamOutSize();
+	output.dst = pg_malloc(output.size);
+
+	/* read compressed data */
+	for (;;)
+	{
+		size_t			cnt;
+
+		input.size = input_size; // XXX: the buffer can grow, we shouldn't keep resetting it to the original value..
+		cnt = readF(AH, (char **)unconstify(void **, &input.src), &input.size);
+		input.pos = 0;
+		input.size = cnt;
+
+		if (cnt == 0)
+			break;
+
+		while (input.pos < input.size)
+		{
+			/* decompress */
+			output.pos = 0;
+			res = ZSTD_decompressStream(dstream, &output, &input);
+
+			if (ZSTD_isError(res))
+				fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			/* write to output handle */
+			((char *)output.dst)[output.pos] = '\0';
+			ahwrite(output.dst, 1, output.pos, AH);
+		}
+	}
+
+	/* write any remainder to output handle */
+	/* XXX: is it needed? */
+	/* If `input.pos < input.size`, some input has not been consumed. */
+	/* But if `output.pos == output.size`, there might be some data left within internal buffers., */
+#if 0
+	while (input.pos < input.size || output.pos == output.size)
+	{
+		output.pos = 0;
+		res = ZSTD_decompressStream(dstream, &output, &input);
+		if (ZSTD_isError(res))
+			fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+		((char *)output.dst)[output.pos] = '\0';
+		ahwrite(output.dst, 1, output.pos, AH);
+	}
+#endif
+
+	pg_free(unconstify(void *, input.src));
+	pg_free(output.dst);
+}
+
+#endif		/* HAVE_LIBZSTD */
 
 #ifdef HAVE_LIBZ
+
 /*
  * Functions for zlib compressed output.
  */
@@ -422,6 +643,19 @@ struct cfp
 #ifdef HAVE_LIBZ
 	gzFile		compressedfp;
 #endif
+
+#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg?
+	/* This is a normal file to which we read/write compressed data */
+	struct {
+		FILE			*fp;
+		// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+		ZSTD_CStream	*cstream;
+		ZSTD_DStream	*dstream;
+		ZSTD_outBuffer	output;
+		ZSTD_inBuffer	input;
+	} zstd;
+#endif
+
 };
 
 #ifdef HAVE_LIBZ
@@ -449,24 +683,25 @@ free_keep_errno(void *p)
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen_read(const char *path, const char *mode)
+cfopen_read(const char *path, const char *mode, int compression)
 {
 	cfp		   *fp;
 
 #ifdef HAVE_LIBZ
-	if (hasSuffix(path, ".gz"))
-		fp = cfopen(path, mode, 1);
+	if (hasSuffix(path, ".gz") || hasSuffix(path, ".zst"))
+		fp = cfopen(path, mode, compression);
 	else
 #endif
 	{
-		fp = cfopen(path, mode, 0);
+		fp = cfopen(path, mode, compression);
 #ifdef HAVE_LIBZ
 		if (fp == NULL)
 		{
 			char	   *fname;
+			char	   *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz";
 
-			fname = psprintf("%s.gz", path);
-			fp = cfopen(fname, mode, 1);
+			fname = psprintf("%s.%s", path, suffix);
+			fp = cfopen(fname, mode, compression);
 			free_keep_errno(fname);
 		}
 #endif
@@ -491,13 +726,14 @@ cfopen_write(const char *path, const char *mode, int compression)
 	cfp		   *fp;
 
 	if (compression == 0)
-		fp = cfopen(path, mode, 0);
+		fp = cfopen(path, mode, compression);
 	else
 	{
-#ifdef HAVE_LIBZ
+#ifdef HAVE_LIBZ // XXX
 		char	   *fname;
+		char	   *suffix = compression == ZSTD_COMPRESSION ? "zst" : "gz";
 
-		fname = psprintf("%s.gz", path);
+		fname = psprintf("%s.%s", path, suffix);
 		fp = cfopen(fname, mode, compression);
 		free_keep_errno(fname);
 #else
@@ -505,11 +741,12 @@ cfopen_write(const char *path, const char *mode, int compression)
 		fp = NULL;				/* keep compiler quiet */
 #endif
 	}
+
 	return fp;
 }
 
 /*
- * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
+ * Opens file 'path' in 'mode'. If 'alg' is COMPR_ALG_ZLIB, the file
  * is opened with libz gzopen(), otherwise with plain fopen().
  *
  * On failure, return NULL with an error code in errno.
@@ -519,9 +756,19 @@ cfopen(const char *path, const char *mode, int compression)
 {
 	cfp		   *fp = pg_malloc(sizeof(cfp));
 
-	if (compression != 0)
+	fp->uncompressedfp = NULL;
+#ifdef HAVE_LIBZ
+	fp->compressedfp = NULL;
+#endif
+#ifdef HAVE_LIBZSTD
+	fp->zstd.fp = NULL;
+#endif
+
+	switch (compression)
 	{
 #ifdef HAVE_LIBZ
+	case 1 ... 9: // XXX: nonportable
+	case Z_DEFAULT_COMPRESSION:
 		if (compression != Z_DEFAULT_COMPRESSION)
 		{
 			/* user has specified a compression level, so tell zlib to use it */
@@ -537,30 +784,57 @@ cfopen(const char *path, const char *mode, int compression)
 			fp->compressedfp = gzopen(path, mode);
 		}
 
-		fp->uncompressedfp = NULL;
 		if (fp->compressedfp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
 		}
-#else
-		fatal("not built with zlib support");
+
+		return fp;
+		break;
 #endif
-	}
-	else
-	{
-#ifdef HAVE_LIBZ
-		fp->compressedfp = NULL;
+
+#ifdef HAVE_LIBZSTD
+	case ZSTD_COMPRESSION:
+		fp->zstd.fp = fopen(path, mode);
+		// XXX: save the compression params
+		if (fp->zstd.fp == NULL)
+		{
+			free_keep_errno(fp);
+			fp = NULL;
+		}
+		else if (strchr(mode, 'w'))
+		{
+			fp->zstd.dstream = NULL;
+			fp->zstd.output.size = ZSTD_CStreamOutSize(); // XXX
+			fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size);
+			fp->zstd.cstream = ZSTD_createCStream();
+			if (fp->zstd.cstream == NULL)
+				fatal("could not initialize compression library");
+		}
+		else if (strchr(mode, 'r'))
+		{
+			fp->zstd.cstream = NULL;
+			fp->zstd.input.size = ZSTD_DStreamOutSize(); // XXX
+			fp->zstd.input.src = pg_malloc0(fp->zstd.input.size);
+			fp->zstd.dstream = ZSTD_createDStream();
+			if (fp->zstd.dstream == NULL)
+				fatal("could not initialize compression library");
+		} // XXX else: bad mode
+		return fp;
+		break;
 #endif
+
+	default:
 		fp->uncompressedfp = fopen(path, mode);
 		if (fp->uncompressedfp == NULL)
 		{
 			free_keep_errno(fp);
 			fp = NULL;
 		}
-	}
 
-	return fp;
+		return fp;
+	}
 }
 
 
@@ -587,6 +861,44 @@ cfread(void *ptr, int size, cfp *fp)
 	}
 	else
 #endif
+
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+		size_t			input_size = ZSTD_DStreamInSize();
+		size_t			res, cnt;
+
+		output->size = size;
+		output->dst = ptr;
+		output->pos = 0;
+
+		/* read compressed data */
+		while ((cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp)))
+		{
+			input->size = cnt;
+			input->pos = 0;
+
+			for ( ; input->pos < input->size; )
+			{
+				/* decompress */
+				res = ZSTD_decompressStream(fp->zstd.dstream, output, input);
+				if (res == 0 || output->pos == output->size)
+					break;
+				if (ZSTD_isError(res))
+					fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+			}
+
+			if (output->pos == output->size)
+				break; /* We read all the data that fits */
+		}
+
+		ret = output->pos;
+	}
+	else
+#endif
+
 	{
 		ret = fread(ptr, 1, size, fp->uncompressedfp);
 		if (ret != size && !feof(fp->uncompressedfp))
@@ -603,6 +915,35 @@ cfwrite(const void *ptr, int size, cfp *fp)
 		return gzwrite(fp->compressedfp, ptr, size);
 	else
 #endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		size_t      res, cnt;
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+
+		input->src = ptr;
+		input->size = size;
+		input->pos = 0;
+
+		/* Consume all input, and flush later */
+		while (input->pos != input->size)
+		{
+			output->pos = 0;
+			res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue);
+			if (ZSTD_isError(res))
+				fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+			cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+			if (cnt != output->pos)
+				fatal("could not write data: %s", strerror(errno));
+		}
+
+		return size;
+	}
+	else
+#endif
+
 		return fwrite(ptr, 1, size, fp->uncompressedfp);
 }
 
@@ -625,6 +966,20 @@ cfgetc(cfp *fp)
 	}
 	else
 #endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		if (cfread(&ret, 1, fp) != 1)
+		{
+			if (feof(fp->zstd.fp))
+				fatal("could not read from input file: end of file");
+			else
+				fatal("could not read from input file: %s", strerror(errno));
+		}
+fprintf(stderr, "cfgetc %d\n", ret);
+	}
+#endif
+
 	{
 		ret = fgetc(fp->uncompressedfp);
 		if (ret == EOF)
@@ -641,6 +996,18 @@ cfgets(cfp *fp, char *buf, int len)
 	if (fp->compressedfp)
 		return gzgets(fp->compressedfp, buf, len);
 	else
+#endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		int res;
+		res = cfread(buf, len, fp);
+		buf[res] = 0;
+		if (strchr(buf, '\n'))
+			*strchr(buf, '\n') = '\0';
+		return buf;
+	}
+	else
 #endif
 		return fgets(buf, len, fp->uncompressedfp);
 }
@@ -662,6 +1029,43 @@ cfclose(cfp *fp)
 		fp->compressedfp = NULL;
 	}
 	else
+#endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+		size_t res, cnt;
+
+		if (fp->zstd.cstream)
+		{
+			for (;;)
+			{
+				output->pos = 0;
+				res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end);
+				if (ZSTD_isError(res))
+					fatal("could not compress data: %s", ZSTD_getErrorName(res));
+				cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+				if (cnt != output->pos)
+					fatal("could not write data: %s", strerror(errno));
+				if (res == 0)
+					break;
+			}
+
+			ZSTD_freeCStream(fp->zstd.cstream);
+			pg_free(fp->zstd.output.dst);
+		}
+
+		if (fp->zstd.dstream)
+		{
+			ZSTD_freeDStream(fp->zstd.dstream);
+			pg_free(unconstify(void *, fp->zstd.input.src));
+		}
+
+		result = fclose(fp->zstd.fp);
+		fp->zstd.fp = NULL;
+	}
+	else
 #endif
 	{
 		result = fclose(fp->uncompressedfp);
@@ -679,6 +1083,11 @@ cfeof(cfp *fp)
 	if (fp->compressedfp)
 		return gzeof(fp->compressedfp);
 	else
+#endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+		return feof(fp->zstd.fp);
+	else
 #endif
 		return feof(fp->uncompressedfp);
 }
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index d2e6e1b854..f0ce06f176 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -24,7 +24,8 @@
 typedef enum
 {
 	COMPR_ALG_NONE,
-	COMPR_ALG_LIBZ
+	COMPR_ALG_LIBZ,
+	COMPR_ALG_ZSTD,
 } CompressionAlgorithm;
 
 /* Prototype for callback function to WriteDataToArchive() */
@@ -57,7 +58,7 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs);
 typedef struct cfp cfp;
 
 extern cfp *cfopen(const char *path, const char *mode, int compression);
-extern cfp *cfopen_read(const char *path, const char *mode);
+extern cfp *cfopen_read(const char *path, const char *mode, int compression);
 extern cfp *cfopen_write(const char *path, const char *mode, int compression);
 extern int	cfread(void *ptr, int size, cfp *fp);
 extern int	cfwrite(const void *ptr, int size, cfp *fp);
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 6a5a22637b..1a229ebedb 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -60,6 +60,11 @@ typedef struct _z_stream
 typedef z_stream *z_streamp;
 #endif
 
+#ifdef HAVE_LIBZSTD
+#include <zstd.h>
+#define ZSTD_COMPRESSION	-2
+#endif	/* HAVE_LIBZSTD */
+
 /* Data block types */
 #define BLK_DATA 1
 #define BLK_BLOBS 3
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 650b542fce..6e55cb425e 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -202,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = cfopen_read(fname, PG_BINARY_R);
+		tocFH = cfopen_read(fname, PG_BINARY_R, AH->compression);
 		if (tocFH == NULL)
 			fatal("could not open input file \"%s\": %m", fname);
 
@@ -388,7 +388,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	cfp = cfopen_read(filename, PG_BINARY_R);
+	cfp = cfopen_read(filename, PG_BINARY_R, AH->compression);
 
 	if (!cfp)
 		fatal("could not open input file \"%s\": %m", filename);
@@ -440,7 +440,7 @@ _LoadBlobs(ArchiveHandle *AH)
 
 	setFilePath(AH, fname, "blobs.toc");
 
-	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
+	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, AH->compression);
 
 	if (ctx->blobsTocFH == NULL)
 		fatal("could not open large object TOC file \"%s\" for input: %m",
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8b1e5cc2b5..f21eb021c7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -533,7 +533,8 @@ main(int argc, char **argv)
 
 			case 'Z':			/* Compression Level */
 				compressLevel = atoi(optarg);
-				if (compressLevel < 0 || compressLevel > 9)
+				if ((compressLevel < 0 || compressLevel > 9) &&
+					compressLevel != -2)
 				{
 					pg_log_error("compression level must be in range 0..9");
 					exit_nicely(1);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838e53..da35415c72 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -346,6 +346,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
-- 
2.17.0

