From 8f07443e04d2e610414d023e8aa443d8aa785dfd Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokolatos@pm.me>
Date: Tue, 5 Jul 2022 12:34:08 +0000
Subject: [PATCH v7 3/3] Add LZ4 compression in pg_{dump|restore}

Within compress_lz4.{c,h} the streaming API and a file API compression is
implemented.. The first one, is aimed at inlined use cases and thus simple
lz4.h calls can be used directly. The second one is generating output, or is
parsing input, which can be read/generated via the lz4 utility.

Wherever the LZ4F api does not implement all the functionality corresponding to
fread(), fwrite(), fgets(), fgetc(), feof(), and fclose(), it has been
implemented localy.

Custom compressed archives need to now store the compression method in their
header. This requires a bump in the version number. The level of compression is
still stored in the dump, though admittedly is of no apparent use.
---
 doc/src/sgml/ref/pg_dump.sgml        |  23 +-
 src/bin/pg_dump/Makefile             |   2 +
 src/bin/pg_dump/compress_io.c        |  41 +-
 src/bin/pg_dump/compress_lz4.c       | 593 +++++++++++++++++++++++++++
 src/bin/pg_dump/compress_lz4.h       |   9 +
 src/bin/pg_dump/pg_backup_archiver.c |  73 ++--
 src/bin/pg_dump/pg_backup_archiver.h |   4 +-
 src/bin/pg_dump/pg_dump.c            |   8 +-
 src/bin/pg_dump/t/001_basic.pl       |   2 +-
 src/bin/pg_dump/t/002_pg_dump.pl     |  69 +++-
 10 files changed, 769 insertions(+), 55 deletions(-)
 create mode 100644 src/bin/pg_dump/compress_lz4.c
 create mode 100644 src/bin/pg_dump/compress_lz4.h

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 94885d0812..1097488b83 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -328,9 +328,10 @@ PostgreSQL documentation
            machine-readable format that <application>pg_restore</application>
            can read. A directory format archive can be manipulated with
            standard Unix tools; for example, files in an uncompressed archive
-           can be compressed with the <application>gzip</application> tool.
-           This format is compressed by default and also supports parallel
-           dumps.
+           can be compressed with the <application>gzip</application> or
+           <application>lz4</application>tool.
+           This format is compressed by default using <literal>gzip</literal>
+           and also supports parallel dumps.
           </para>
          </listitem>
         </varlistentry>
@@ -652,12 +653,12 @@ PostgreSQL documentation
        <para>
         Specify the compression method and/or the compression level to use.
         The compression method can be set to <literal>gzip</literal> or
-        <literal>none</literal> for no compression. A compression level can
-        be optionally specified, by appending the level number after a colon
-        (<literal>:</literal>). If no level is specified, the default compression
-        level will be used for the specified method. If only a level is
-        specified without mentioning a method, <literal>gzip</literal> compression
-        will be used.
+        <literal>lz4</literal> or <literal>none</literal> for no compression. A
+        compression level can be optionally specified, by appending the level
+        number after a colon (<literal>:</literal>). If no level is specified,
+        the default compression level will be used for the specified method. If
+        only a level is specified without mentioning a method,
+        <literal>gzip</literal> compression willbe used.
        </para>
 
        <para>
@@ -665,8 +666,8 @@ PostgreSQL documentation
         individual table-data segments, and the default is to compress using
         <literal>gzip</literal> at a moderate level. For plain text output,
         setting a nonzero compression level causes the entire output file to be compressed,
-        as though it had been fed through <application>gzip</application>; but the default
-        is not to compress.
+        as though it had been fed through <application>gzip</application> or
+        <application>lz4</application>; but the default is not to compress.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index 2d777ec213..1f7fd8d1f0 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -17,6 +17,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 export GZIP_PROGRAM=$(GZIP)
+export LZ4
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
@@ -24,6 +25,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
 OBJS = \
 	$(WIN32RES) \
 	compress_gzip.o \
+	compress_lz4.o \
 	compress_io.o \
 	dumputils.o \
 	parallel.o \
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 1948ee3d57..4ef1cb5291 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -38,13 +38,15 @@
  * ----------------------
  *
  *	The compressed stream API is a wrapper around the C standard fopen() and
- *	libz's gzopen() APIs. It allows you to use the same functions for
- *	compressed and uncompressed streams. cfopen_read() first tries to open
- *	the file with given name, and if it fails, it tries to open the same
- *	file with the .gz suffix. cfopen_write() opens a file for writing, an
- *	extra argument specifies if the file should be compressed, and adds the
- *	.gz suffix to the filename if so. This allows you to easily handle both
- *	compressed and uncompressed files.
+ *	libz's gzopen() APIs and custom LZ4 calls which provide similar
+ *	functionality. It allows you to use the same functions for compressed and
+ *	uncompressed streams. cfopen_read() first tries to open the file with given
+ *	name, and if it fails, it tries to open the same file with the .gz suffix,
+ *	failing that it tries to open the same file with the .lz4 suffix.
+ *	cfopen_write() opens a file for writing, an extra argument specifies the
+ *	method to use should the file be compressed, and adds the appropriate
+ *	suffix, .gz or .lz4, to the filename if so. This allows you to easily handle
+ *	both compressed and uncompressed files.
  *
  * IDENTIFICATION
  *	   src/bin/pg_dump/compress_io.c
@@ -57,6 +59,7 @@
 
 #include "compress_io.h"
 #include "compress_gzip.h"
+#include "compress_lz4.h"
 #include "pg_backup_utils.h"
 
 /*----------------------
@@ -125,6 +128,9 @@ AllocateCompressor(const pg_compress_specification compress_spec,
 		case PG_COMPRESSION_GZIP:
 			InitCompressorGzip(cs, compress_spec.level);
 			break;
+		case PG_COMPRESSION_LZ4:
+			InitCompressorLZ4(cs, compress_spec.level);
+			break;
 		default:
 			pg_fatal("invalid compression method");
 			break;
@@ -175,6 +181,7 @@ free_keep_errno(void *p)
 /*
  * Compression None implementation
  */
+
 static size_t
 _read(void *ptr, size_t size, CompressFileHandle * CFH)
 {
@@ -310,6 +317,9 @@ InitCompressFileHandle(const pg_compress_specification compress_spec)
 		case PG_COMPRESSION_GZIP:
 			InitCompressGzip(CFH, compress_spec.level);
 			break;
+		case PG_COMPRESSION_LZ4:
+			InitCompressLZ4(CFH, compress_spec.level);
+			break;
 		default:
 			pg_fatal("invalid compression method");
 			break;
@@ -322,12 +332,12 @@ InitCompressFileHandle(const pg_compress_specification compress_spec)
  * Open a file for reading. 'path' is the file to open, and 'mode' should
  * be either "r" or "rb".
  *
- * If the file at 'path' does not exist, we append the ".gz" suffix (if
+ * If the file at 'path' does not exist, we append the "{.gz,.lz4}" suffix (i
  * 'path' doesn't already have it) and try again. So if you pass "foo" as
- * 'path', this will open either "foo" or "foo.gz", trying in that order.
+ * 'path', this will open either "foo" or "foo.gz" or "foo.lz4", trying in that
+ * order.
  *
  * On failure, return NULL with an error code in errno.
- *
  */
 CompressFileHandle *
 InitDiscoverCompressFileHandle(const char *path, const char *mode)
@@ -363,6 +373,17 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 			if (exists)
 				compress_spec.algorithm = PG_COMPRESSION_GZIP;
 		}
+#endif
+#ifdef USE_LZ4
+		if (!exists)
+		{
+			free_keep_errno(fname);
+			fname = psprintf("%s.lz4", path);
+			exists = (stat(fname, &st) == 0);
+
+			if (exists)
+				compress_spec.algorithm = PG_COMPRESSION_LZ4;
+		}
 #endif
 	}
 
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
new file mode 100644
index 0000000000..6f4680c344
--- /dev/null
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -0,0 +1,593 @@
+#include "postgres_fe.h"
+#include "pg_backup_utils.h"
+
+#include "compress_lz4.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#include <lz4frame.h>
+
+#define LZ4_OUT_SIZE	(4 * 1024)
+#define LZ4_IN_SIZE		(16 * 1024)
+
+/*----------------------
+ * Compressor API
+ *----------------------
+ */
+
+typedef struct LZ4CompressorState
+{
+	char	   *outbuf;
+	size_t		outsize;
+}			LZ4CompressorState;
+
+/* Private routines that support LZ4 compressed data I/O */
+static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
+								  const void *data, size_t dLen);
+static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
+{
+	LZ4_streamDecode_t lz4StreamDecode;
+	char	   *buf;
+	char	   *decbuf;
+	size_t		buflen;
+	size_t		cnt;
+
+	buflen = LZ4_IN_SIZE;
+	buf = pg_malloc(buflen);
+	decbuf = pg_malloc(buflen);
+
+	LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+
+	while ((cnt = cs->readF(AH, &buf, &buflen)))
+	{
+		int			decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
+															buf, decbuf,
+															cnt, buflen);
+
+		ahwrite(decbuf, 1, decBytes, AH);
+	}
+
+	pg_free(buf);
+	pg_free(decbuf);
+}
+
+static void
+WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
+					  const void *data, size_t dLen)
+{
+	LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private;
+	size_t		compressed;
+	size_t		requiredsize = LZ4_compressBound(dLen);
+
+	if (requiredsize > LZ4cs->outsize)
+	{
+		LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
+		LZ4cs->outsize = requiredsize;
+	}
+
+	compressed = LZ4_compress_default(data, LZ4cs->outbuf,
+									  dLen, LZ4cs->outsize);
+
+	if (compressed <= 0)
+		pg_fatal("failed to LZ4 compress data");
+
+	cs->writeF(AH, LZ4cs->outbuf, compressed);
+}
+
+static void
+EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
+{
+	LZ4CompressorState *LZ4cs;
+
+	LZ4cs = (LZ4CompressorState *) cs->private;
+	if (LZ4cs)
+	{
+		pg_free(LZ4cs->outbuf);
+		pg_free(LZ4cs);
+		cs->private = NULL;
+	}
+}
+
+
+/* Public routines that support LZ4 compressed data I/O */
+void
+InitCompressorLZ4(CompressorState *cs, int compressionLevel)
+{
+	cs->readData = ReadDataFromArchiveLZ4;
+	cs->writeData = WriteDataToArchiveLZ4;
+	cs->end = EndCompressorLZ4;
+
+	/* Will be lazy init'd */
+	cs->private = pg_malloc0(sizeof(LZ4CompressorState));
+}
+
+/*----------------------
+ * Compress File API
+ *----------------------
+ */
+
+/*
+ * State needed for LZ4 (de)compression using the CompressFileHandle API.
+ */
+typedef struct LZ4File
+{
+	FILE	   *fp;
+
+	LZ4F_preferences_t prefs;
+
+	LZ4F_compressionContext_t ctx;
+	LZ4F_decompressionContext_t dtx;
+
+	bool		inited;
+	bool		compressing;
+
+	size_t		buflen;
+	char	   *buffer;
+
+	size_t		overflowalloclen;
+	size_t		overflowlen;
+	char	   *overflowbuf;
+
+	size_t		errcode;
+}			LZ4File;
+
+/*
+ * LZ4 equivalent to feof() or gzeof(). The end of file
+ * is reached if there is no decompressed output in the
+ * overflow buffer and the end of the file is reached.
+ */
+static int
+LZ4File_eof(CompressFileHandle * CFH)
+{
+	LZ4File    *fs = (LZ4File *) CFH->private;
+
+	return fs->overflowlen == 0 && feof(fs->fp);
+}
+
+static const char *
+LZ4File_get_error(CompressFileHandle * CFH)
+{
+	LZ4File    *fs = (LZ4File *) CFH->private;
+	const char *errmsg;
+
+	if (LZ4F_isError(fs->errcode))
+		errmsg = LZ4F_getErrorName(fs->errcode);
+	else
+		errmsg = strerror(errno);
+
+	return errmsg;
+}
+
+/*
+ * Prepare an already alloc'ed LZ4File struct for subsequent calls.
+ *
+ * It creates the nessary contexts for the operations. When compressing,
+ * it additionally writes the LZ4 header in the output stream.
+ */
+static int
+LZ4File_init(LZ4File * fs, int size, bool compressing)
+{
+	size_t		status;
+
+	if (fs->inited)
+		return 0;
+
+	fs->compressing = compressing;
+	fs->inited = true;
+
+	if (fs->compressing)
+	{
+		fs->buflen = LZ4F_compressBound(LZ4_IN_SIZE, &fs->prefs);
+		if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
+			fs->buflen = LZ4F_HEADER_SIZE_MAX;
+
+		status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
+		if (LZ4F_isError(status))
+		{
+			fs->errcode = status;
+			return 1;
+		}
+
+		fs->buffer = pg_malloc(fs->buflen);
+		status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
+									&fs->prefs);
+
+		if (LZ4F_isError(status))
+		{
+			fs->errcode = status;
+			return 1;
+		}
+
+		if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+		{
+			errno = errno ? : ENOSPC;
+			return 1;
+		}
+	}
+	else
+	{
+		status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION);
+		if (LZ4F_isError(status))
+		{
+			fs->errcode = status;
+			return 1;
+		}
+
+		fs->buflen = size > LZ4_OUT_SIZE ? size : LZ4_OUT_SIZE;
+		fs->buffer = pg_malloc(fs->buflen);
+
+		fs->overflowalloclen = fs->buflen;
+		fs->overflowbuf = pg_malloc(fs->overflowalloclen);
+		fs->overflowlen = 0;
+	}
+
+	return 0;
+}
+
+/*
+ * Read already decompressed content from the overflow buffer into 'ptr' up to
+ * 'size' bytes, if available. If the eol_flag is set, then stop at the first
+ * occurance of the new line char prior to 'size' bytes.
+ *
+ * Any unread content in the overflow buffer, is moved to the beginning.
+ */
+static int
+LZ4File_read_overflow(LZ4File * fs, void *ptr, int size, bool eol_flag)
+{
+	char	   *p;
+	int			readlen = 0;
+
+	if (fs->overflowlen == 0)
+		return 0;
+
+	if (fs->overflowlen >= size)
+		readlen = size;
+	else
+		readlen = fs->overflowlen;
+
+	if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen)))
+		/* Include the line terminating char */
+		readlen = p - fs->overflowbuf + 1;
+
+	memcpy(ptr, fs->overflowbuf, readlen);
+	fs->overflowlen -= readlen;
+
+	if (fs->overflowlen > 0)
+		memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen);
+
+	return readlen;
+}
+
+/*
+ * The workhorse for reading decompressed content out of an LZ4 compressed
+ * stream.
+ *
+ * It will read up to 'ptrsize' decompressed content, or up to the new line char
+ * if found first when the eol_flag is set. It is possible that the decompressed
+ * output generated by reading any compressed input via the LZ4F API, exceeds
+ * 'ptrsize'. Any exceeding decompressed content is stored at an overflow
+ * buffer within LZ4File. Of course, when the function is called, it will first
+ * try to consume any decompressed content already present in the overflow
+ * buffer, before decompressing new content.
+ */
+static int
+LZ4File_read_internal(LZ4File * fs, void *ptr, int ptrsize, bool eol_flag)
+{
+	size_t		dsize = 0;
+	size_t		rsize;
+	size_t		size = ptrsize;
+	bool		eol_found = false;
+
+	void	   *readbuf;
+
+	/* Lazy init */
+	if (!fs->inited && LZ4File_init(fs, size, false /* decompressing */ ))
+		return -1;
+
+	/* Verfiy that there is enough space in the outbuf */
+	if (size > fs->buflen)
+	{
+		fs->buflen = size;
+		fs->buffer = pg_realloc(fs->buffer, size);
+	}
+
+	/* use already decompressed content if available */
+	dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
+	if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
+		return dsize;
+
+	readbuf = pg_malloc(size);
+
+	do
+	{
+		char	   *rp;
+		char	   *rend;
+
+		rsize = fread(readbuf, 1, size, fs->fp);
+		if (rsize < size && !feof(fs->fp))
+			return -1;
+
+		rp = (char *) readbuf;
+		rend = (char *) readbuf + rsize;
+
+		while (rp < rend)
+		{
+			size_t		status;
+			size_t		outlen = fs->buflen;
+			size_t		read_remain = rend - rp;
+
+			memset(fs->buffer, 0, outlen);
+			status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen,
+									 rp, &read_remain, NULL);
+			if (LZ4F_isError(status))
+			{
+				fs->errcode = status;
+				return -1;
+			}
+
+			rp += read_remain;
+
+			/*
+			 * fill in what space is available in ptr if the eol flag is set,
+			 * either skip if one already found or fill up to EOL if present
+			 * in the outbuf
+			 */
+			if (outlen > 0 && dsize < size && eol_found == false)
+			{
+				char	   *p;
+				size_t		lib = (eol_flag == 0) ? size - dsize : size - 1 - dsize;
+				size_t		len = outlen < lib ? outlen : lib;
+
+				if (eol_flag == true &&
+					(p = memchr(fs->buffer, '\n', outlen)) &&
+					(size_t) (p - fs->buffer + 1) <= len)
+				{
+					len = p - fs->buffer + 1;
+					eol_found = true;
+				}
+
+				memcpy((char *) ptr + dsize, fs->buffer, len);
+				dsize += len;
+
+				/* move what did not fit, if any, at the begining of the buf */
+				if (len < outlen)
+					memmove(fs->buffer, fs->buffer + len, outlen - len);
+				outlen -= len;
+			}
+
+			/* if there is available output, save it */
+			if (outlen > 0)
+			{
+				while (fs->overflowlen + outlen > fs->overflowalloclen)
+				{
+					fs->overflowalloclen *= 2;
+					fs->overflowbuf = pg_realloc(fs->overflowbuf,
+												 fs->overflowalloclen);
+				}
+
+				memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen);
+				fs->overflowlen += outlen;
+			}
+		}
+	} while (rsize == size && dsize < size && eol_found == 0);
+
+	pg_free(readbuf);
+
+	return (int) dsize;
+}
+
+/*
+ * Compress size bytes from ptr and write them to the stream.
+ */
+static size_t
+LZ4File_write(const void *ptr, size_t size, CompressFileHandle * CFH)
+{
+	LZ4File    *fs = (LZ4File *) CFH->private;
+	size_t		status;
+	int			remaining = size;
+
+	if (!fs->inited && LZ4File_init(fs, size, true))
+		return -1;
+
+	while (remaining > 0)
+	{
+		int			chunk = remaining < LZ4_IN_SIZE ? remaining : LZ4_IN_SIZE;
+
+		remaining -= chunk;
+
+		status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen,
+									 ptr, chunk, NULL);
+		if (LZ4F_isError(status))
+		{
+			fs->errcode = status;
+			return -1;
+		}
+
+		if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+		{
+			errno = errno ? : ENOSPC;
+			return 1;
+		}
+	}
+
+	return size;
+}
+
+/*
+ * fread() equivalent implementation for LZ4 compressed files.
+ */
+static size_t
+LZ4File_read(void *ptr, size_t size, CompressFileHandle * CFH)
+{
+	LZ4File    *fs = (LZ4File *) CFH->private;
+	int			ret;
+
+	ret = LZ4File_read_internal(fs, ptr, size, false);
+	if (ret != size && !LZ4File_eof(CFH))
+		pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+
+	return ret;
+}
+
+/*
+ * fgetc() equivalent implementation for LZ4 compressed files.
+ */
+static int
+LZ4File_getc(CompressFileHandle * CFH)
+{
+	LZ4File    *fs = (LZ4File *) CFH->private;
+	unsigned char c;
+
+	if (LZ4File_read_internal(fs, &c, 1, false) != 1)
+	{
+		if (!LZ4File_eof(CFH))
+			pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+		else
+			pg_fatal("could not read from input file: end of file");
+	}
+
+	return c;
+}
+
+/*
+ * fgets() equivalent implementation for LZ4 compressed files.
+ */
+static char *
+LZ4File_gets(char *ptr, int size, CompressFileHandle * CFH)
+{
+	LZ4File    *fs = (LZ4File *) CFH->private;
+	size_t		dsize;
+
+	dsize = LZ4File_read_internal(fs, ptr, size, true);
+	if (dsize < 0)
+		pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+
+	/* Done reading */
+	if (dsize == 0)
+		return NULL;
+
+	return ptr;
+}
+
+/*
+ * Finalize (de)compression of a stream. When compressing it will write any
+ * remaining content and/or generated footer from the LZ4 API.
+ */
+static int
+LZ4File_close(CompressFileHandle * CFH)
+{
+	FILE	   *fp;
+	LZ4File    *fs = (LZ4File *) CFH->private;
+	size_t		status;
+	int			ret;
+
+	fp = fs->fp;
+	if (fs->inited)
+	{
+		if (fs->compressing)
+		{
+			status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL);
+			if (LZ4F_isError(status))
+				pg_fatal("failed to end compression: %s",
+						 LZ4F_getErrorName(status));
+			else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status)
+			{
+				errno = errno ? : ENOSPC;
+				WRITE_ERROR_EXIT;
+			}
+
+			status = LZ4F_freeCompressionContext(fs->ctx);
+			if (LZ4F_isError(status))
+				pg_fatal("failed to end compression: %s",
+						 LZ4F_getErrorName(status));
+		}
+		else
+		{
+			status = LZ4F_freeDecompressionContext(fs->dtx);
+			if (LZ4F_isError(status))
+				pg_fatal("failed to end decompression: %s",
+						 LZ4F_getErrorName(status));
+			pg_free(fs->overflowbuf);
+		}
+
+		pg_free(fs->buffer);
+	}
+
+	pg_free(fs);
+
+	return fclose(fp);
+}
+
+static int
+LZ4File_open(const char *path, int fd, const char *mode,
+			 CompressFileHandle * CFH)
+{
+	FILE	   *fp;
+	LZ4File    *lz4fp = (LZ4File *) CFH->private;
+
+	if (fd >= 0)
+		fp = fdopen(fd, mode);
+	else
+		fp = fopen(path, mode);
+	if (fp == NULL)
+	{
+		lz4fp->errcode = errno;
+		return 1;
+	}
+
+	lz4fp->fp = fp;
+
+	return 0;
+}
+
+static int
+LZ4File_open_write(const char *path, const char *mode, CompressFileHandle * CFH)
+{
+	char	   *fname;
+	int			ret;
+
+	fname = psprintf("%s.lz4", path);
+	ret = CFH->open(fname, -1, mode, CFH);
+	pg_free(fname);
+
+	return ret;
+}
+
+void
+InitCompressLZ4(CompressFileHandle * CFH, int compressionLevel)
+{
+	LZ4File    *lz4fp;
+
+	CFH->open = LZ4File_open;
+	CFH->open_write = LZ4File_open_write;
+	CFH->read = LZ4File_read;
+	CFH->write = LZ4File_write;
+	CFH->gets = LZ4File_gets;
+	CFH->getc = LZ4File_getc;
+	CFH->eof = LZ4File_eof;
+	CFH->close = LZ4File_close;
+	CFH->get_error = LZ4File_get_error;
+
+	lz4fp = pg_malloc0(sizeof(*lz4fp));
+	if (compressionLevel >= 0)
+		lz4fp->prefs.compressionLevel = compressionLevel;
+
+	CFH->private = lz4fp;
+}
+#else							/* USE_LZ4 */
+void
+InitCompressorLZ4(CompressorState *cs, int compressionLevel)
+{
+	pg_fatal("not built with LZ4 support");
+}
+
+void
+InitCompressLZ4(CompressFileHandle * CFH, int compressionLevel)
+{
+	pg_fatal("not built with LZ4 support");
+}
+#endif							/* USE_LZ4 */
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
new file mode 100644
index 0000000000..fbec9a508d
--- /dev/null
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -0,0 +1,9 @@
+#ifndef _COMPRESS_LZ4_H_
+#define _COMPRESS_LZ4_H_
+
+#include "compress_io.h"
+
+extern void InitCompressorLZ4(CompressorState *cs, int compressionLevel);
+extern void InitCompressLZ4(CompressFileHandle * CFH, int compressionLevel);
+
+#endif							/* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 186510c235..225c226a6e 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -353,6 +353,7 @@ RestoreArchive(Archive *AHX)
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
 	RestoreOptions *ropt = AH->public.ropt;
 	bool		parallel_mode;
+	bool		supports_compression;
 	TocEntry   *te;
 	CompressFileHandle *sav;
 
@@ -382,17 +383,28 @@ RestoreArchive(Archive *AHX)
 	/*
 	 * Make sure we won't need (de)compression we haven't got
 	 */
-#ifndef HAVE_LIBZ
-	if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP &&
+	supports_compression = true;
+	if ((AH->compress_spec.algorithm == PG_COMPRESSION_GZIP ||
+		 AH->compress_spec.algorithm == PG_COMPRESSION_LZ4) &&
 		AH->PrintTocDataPtr != NULL)
 	{
 		for (te = AH->toc->next; te != AH->toc; te = te->next)
 		{
 			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
-				pg_fatal("cannot restore from compressed archive (compression not supported in this installation)");
+			{
+#ifndef HAVE_LIBZ
+				if (AH->compress_algorithm == PG_COMPRESSION_GZIP)
+					supports_compression = false;
+#endif
+#ifndef USE_LZ4
+				if (AH->compress_algorithm == PG_COMPRESSION_LZ4)
+					supports_compression = false;
+#endif
+				if (supports_compression == false)
+					pg_fatal("cannot restore from compressed archive (compression not supported in this installation)");
+			}
 		}
 	}
-#endif
 
 	/*
 	 * Prepare index arrays, so we can assume we have them throughout restore.
@@ -2028,6 +2040,18 @@ ReadStr(ArchiveHandle *AH)
 	return buf;
 }
 
+static bool
+_fileExistsInDirectory(const char *dir, const char *filename)
+{
+	struct stat st;
+	char		buf[MAXPGPATH];
+
+	if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
+		pg_fatal("directory name too long: \"%s\"", dir);
+
+	return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
+}
+
 static int
 _discoverArchiveFormat(ArchiveHandle *AH)
 {
@@ -2054,30 +2078,20 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
 		/*
 		 * Check if the specified archive is a directory. If so, check if
-		 * there's a "toc.dat" (or "toc.dat.gz") file in it.
+		 * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
 		 */
 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
 		{
-			char		buf[MAXPGPATH];
-
-			if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
-				pg_fatal("directory name too long: \"%s\"",
-						 AH->fSpec);
-			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
-			{
-				AH->format = archDirectory;
+			AH->format = archDirectory;
+			if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
 				return AH->format;
-			}
-
 #ifdef HAVE_LIBZ
-			if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
-				pg_fatal("directory name too long: \"%s\"",
-						 AH->fSpec);
-			if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
-			{
-				AH->format = archDirectory;
+			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
+				return AH->format;
+#endif
+#ifdef USE_LZ4
+			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
 				return AH->format;
-			}
 #endif
 			pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
 					 AH->fSpec);
@@ -3684,6 +3698,7 @@ WriteHead(ArchiveHandle *AH)
 	AH->WriteBytePtr(AH, AH->offSize);
 	AH->WriteBytePtr(AH, AH->format);
 	WriteInt(AH, AH->compress_spec.level);
+	AH->WriteBytePtr(AH, AH->compress_spec.algorithm);
 	crtm = *localtime(&AH->createDate);
 	WriteInt(AH, crtm.tm_sec);
 	WriteInt(AH, crtm.tm_min);
@@ -3765,14 +3780,20 @@ ReadHead(ArchiveHandle *AH)
 	else
 		AH->compress_spec.level = Z_DEFAULT_COMPRESSION;
 
-	if (AH->compress_spec.level != INT_MIN)
+	if (AH->version >= K_VERS_1_15)
+		AH->compress_spec.algorithm = AH->ReadBytePtr(AH);
+	else if (AH->compress_spec.level != INT_MIN)
+		AH->compress_spec.algorithm = PG_COMPRESSION_GZIP;
+
 #ifndef HAVE_LIBZ
+	if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP)
+	{
 		pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
-#else
-		AH->compress_spec.algorithm = PG_COMPRESSION_GZIP;
+		AH->compress_spec.algorithm = PG_COMPRESSION_NONE;
+		AH->compress_spec.level = 0;
+	}
 #endif
 
-
 	if (AH->version >= K_VERS_1_4)
 	{
 		struct tm	crtm;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 341d406515..f577e8fc61 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -65,10 +65,12 @@
 #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0)	/* change search_path
 													 * behavior */
 #define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0)	/* add tableam */
+#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0)	/* add compressionMethod
+													 * in header */
 
 /* Current archive version number (the format we can output) */
 #define K_VERS_MAJOR 1
-#define K_VERS_MINOR 14
+#define K_VERS_MINOR 15
 #define K_VERS_REV 0
 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV)
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index dd214e7670..e1253f7291 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -1303,7 +1303,7 @@ parse_compression_option(const char *opt,
 			res = parse_compress_algorithm(method,
 										   &compress_spec->algorithm);
 			if (!res)
-				pg_log_error("invalid compression method \"%s\" (gzip, none)",
+				pg_log_error("invalid compression method \"%s\" (gzip, lz4, none)",
 							 method);
 
 			pg_free(method);
@@ -1345,8 +1345,8 @@ parse_compression_option(const char *opt,
 		return false;
 
 	/* verify that the requested compression is supported */
-
 	if (compress_spec->algorithm != PG_COMPRESSION_NONE &&
+		compress_spec->algorithm != PG_COMPRESSION_LZ4 &&
 		compress_spec->algorithm != PG_COMPRESSION_GZIP)
 		supports_compression = false;
 
@@ -1354,6 +1354,10 @@ parse_compression_option(const char *opt,
 	if (compress_spec->algorithm == PG_COMPRESSION_GZIP)
 		supports_compression = false;
 #endif
+#ifndef USE_LZ4
+	if (compression_spec->algorithm == PG_COMPRESSION_LZ4)
+		supports_compression = false;
+#endif
 
 	if (!supports_compression)
 	{
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index e1f1a8801c..3803370350 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -122,7 +122,7 @@ command_fails_like(
 
 command_fails_like(
 	[ 'pg_dump', '--compress', 'garbage' ],
-	qr/\Qpg_dump: error: invalid compression method "garbage" (gzip, none)\E/,
+	qr/\Qpg_dump: error: invalid compression method "garbage" (gzip, lz4, none)\E/,
 	'pg_dump: invalid --compress');
 
 command_fails_like(
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index a02b578750..9109d72f8a 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -116,6 +116,67 @@ my %pgdump_runs = (
 			args    => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ],
 		},
 	},
+
+	# Do not use --no-sync to give test coverage for data sync.
+	compression_lz4_custom => {
+		test_key       => 'compression',
+		compile_option => 'lz4',
+		dump_cmd       => [
+			'pg_dump',      '--format=custom',
+			'--compress=1', "--file=$tempdir/compression_lz4_custom.dump",
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			"--file=$tempdir/compression_lz4_custom.sql",
+			"$tempdir/compression_lz4_custom.dump",
+		],
+	},
+
+	# Do not use --no-sync to give test coverage for data sync.
+	compression_lz4_dir => {
+		test_key       => 'compression',
+		compile_option => 'lz4',
+		dump_cmd       => [
+			'pg_dump',                              '--jobs=2',
+			'--format=directory',                   '--compress=lz4:1',
+			"--file=$tempdir/compression_lz4_dir", 'postgres',
+		],
+		# Give coverage for manually compressed blob.toc files during
+		# restore.
+		compress_cmd => {
+			program => $ENV{'LZ4'},
+			args    => [
+				'-z', '-f', '--rm',
+				"$tempdir/compression_lz4_dir/blobs.toc",
+				"$tempdir/compression_lz4_dir/blobs.toc.lz4",
+			],
+		},
+		restore_cmd => [
+			'pg_restore', '--jobs=2',
+			"--file=$tempdir/compression_lz4_dir.sql",
+			"$tempdir/compression_lz4_dir",
+		],
+	},
+
+	compression_lz4_plain => {
+		test_key       => 'compression',
+		compile_option => 'lz4',
+		dump_cmd       => [
+			'pg_dump', '--format=plain', '--compress=lz4',
+			"--file=$tempdir/compression_lz4_plain.sql.lz4", 'postgres',
+		],
+		# Decompress the generated file to run through the tests.
+		compress_cmd => {
+			program => $ENV{'LZ4'},
+			args    => [
+				'-d', '-f',
+				"$tempdir/compression_lz4_plain.sql.lz4",
+				"$tempdir/compression_lz4_plain.sql",
+			],
+		},
+	},
+
 	clean => {
 		dump_cmd => [
 			'pg_dump',
@@ -4057,11 +4118,11 @@ foreach my $run (sort keys %pgdump_runs)
 	my $run_db   = 'postgres';
 
 	# Skip command-level tests for gzip if there is no support for it.
-	if (   defined($pgdump_runs{$run}->{compile_option})
-		&& $pgdump_runs{$run}->{compile_option} eq 'gzip'
-		&& !$supports_gzip)
+	if ($pgdump_runs{$run}->{compile_option} &&
+		($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
+		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))
 	{
-		note "$run: skipped due to no gzip support";
+		note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
 		next;
 	}
 
-- 
2.34.1

