Re: Add LZ4 compression in pg_dump
Hi,
Will you be able to send a rebased patch for the next CF ?
If you update for the review comments I sent in March, I'll plan to do another
round of review.
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
LZ4F_HEADER_SIZE_MAX isn't defined in old LZ4.
I ran into that on an ubuntu LTS, so I don't think it's so old that it
shouldn't be handled more gracefully. LZ4 should either have an explicit
version check, or else shouldn't depend on that feature (or should define a
safe fallback version if the library header doesn't define it).https://packages.ubuntu.com/liblz4-1
0003: typo: of legacy => or legacy
There are a large number of ifdefs being added here - it'd be nice to minimize
that. basebackup was organized to use separate files, which is one way.$ git grep -c 'ifdef .*LZ4' src/bin/pg_dump/compress_io.c
src/bin/pg_dump/compress_io.c:19In last year's CF entry, I had made a union within CompressorState. LZ4
doesn't need z_streamp (and ztsd will need ZSTD_outBuffer, ZSTD_inBuffer,
ZSTD_CStream).0002: I wonder if you're able to re-use any of the basebackup parsing stuff
from commit ffd53659c. You're passing both the compression method *and* level.
I think there should be a structure which includes both. In the future, that
can also handle additional options. I hope to re-use these same things for
wal_compression=method:level.You renamed this:
|- COMPR_ALG_LIBZ
|-} CompressionAlgorithm;
|+ COMPRESSION_GZIP,
|+} CompressionMethod;..But I don't think that's an improvement. If you were to change it, it should
say something like PGDUMP_COMPRESS_ZLIB, since there are other compression
structs and typedefs. zlib is not idential to gzip, which uses a different
header, so in WriteDataToArchive(), LIBZ is correct, and GZIP is incorrect.The cf* changes in pg_backup_archiver could be split out into a separate
commit. It's strictly a code simplification - not just preparation for more
compression algorithms. The commit message should "See also:
bf9aa490db24b2334b3595ee33653bf2fe39208c".The changes in 0002 for cfopen_write seem insufficient:
|+ if (compressionMethod == COMPRESSION_NONE)
|+ fp = cfopen(path, mode, compressionMethod, 0);
| else
| {
| #ifdef HAVE_LIBZ
| char *fname;
|
| fname = psprintf("%s.gz", path);
|- fp = cfopen(fname, mode, compression);
|+ fp = cfopen(fname, mode, compressionMethod, compressionLevel);
| free_keep_errno(fname);
| #elseThe only difference between the LIBZ and uncompressed case is the file
extension, and it'll be the only difference with LZ4 too. So I suggest to
first handle the file extension, and the rest of the code path is not
conditional on the compression method. I don't think cfopen_write even needs
HAVE_LIBZ - can't you handle that in cfopen_internal() ?This patch rejects -Z0, which ought to be accepted:
./src/bin/pg_dump/pg_dump -h /tmp regression -Fc -Z0 |wc
pg_dump: error: can only specify -Z/--compress [LEVEL] when method is setYour 0003 patch shouldn't reference LZ4: +#ifndef HAVE_LIBLZ4 + if (*compressionMethod == COMPRESSION_LZ4) + supports_compression = false; +#endifThe 0004 patch renames zlibOutSize to outsize - I think the patch series should
be constructed such as to minimize the size of the method-specific patches. I
say this anticipating also adding support for zstd. The preliminary patches
should have all the boring stuff. It would help for reviewing to keep the
patches split up, or to enumerate all the boring things that are being renamed
(like change OutputContext to cfp, rename zlibOutSize, ...).0004: The include should use <lz4.h> and not "lz4.h"
freebsd/cfbot is failing.
I suggested off-list to add an 0099 patch to change LZ4 to the default, to
exercise it more on CI.
On Sat, Mar 26, 2022 at 01:33:36PM -0500, Justin Pryzby wrote:
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
You're passing both the compression method *and* level. I think there should
be a structure which includes both. In the future, that can also handle
additional options.I'm not sure if there's anything worth saving, but I did that last year with
0003-Support-multiple-compression-algs-levels-opts.patch
I sent a rebased copy off-list.
/messages/by-id/20210104025321.GA9712@telsasoft.com| fatal("not built with LZ4 support");
| fatal("not built with lz4 support");Please use consistent capitalization of "lz4" - then the compiler can optimize
away duplicate strings.0004: The include should use <lz4.h> and not "lz4.h"
Also, use USE_LZ4 rather than HAVE_LIBLZ4, per 75eae0908.
Import Notes
Reply to msg id not found: 20220326183336.GJ28503@telsasoft.com20220326162156.GI28503@telsasoft.com
------- Original Message -------
On Sunday, June 26th, 2022 at 5:55 PM, Justin Pryzby <pryzby@telsasoft.com> wrote:
Hi,
Will you be able to send a rebased patch for the next CF ?
Thank you for taking an interest in the PR. The plan is indeed to sent
a new version.
If you update for the review comments I sent in March, I'll plan to do another
round of review.
Thank you.
Show quoted text
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
LZ4F_HEADER_SIZE_MAX isn't defined in old LZ4.
I ran into that on an ubuntu LTS, so I don't think it's so old that it
shouldn't be handled more gracefully. LZ4 should either have an explicit
version check, or else shouldn't depend on that feature (or should define a
safe fallback version if the library header doesn't define it).https://packages.ubuntu.com/liblz4-1
0003: typo: of legacy => or legacy
There are a large number of ifdefs being added here - it'd be nice to minimize
that. basebackup was organized to use separate files, which is one way.$ git grep -c 'ifdef .*LZ4' src/bin/pg_dump/compress_io.c
src/bin/pg_dump/compress_io.c:19In last year's CF entry, I had made a union within CompressorState. LZ4
doesn't need z_streamp (and ztsd will need ZSTD_outBuffer, ZSTD_inBuffer,
ZSTD_CStream).0002: I wonder if you're able to re-use any of the basebackup parsing stuff
from commit ffd53659c. You're passing both the compression method and level.
I think there should be a structure which includes both. In the future, that
can also handle additional options. I hope to re-use these same things for
wal_compression=method:level.You renamed this:
|- COMPR_ALG_LIBZ
|-} CompressionAlgorithm;
|+ COMPRESSION_GZIP,
|+} CompressionMethod;..But I don't think that's an improvement. If you were to change it, it should
say something like PGDUMP_COMPRESS_ZLIB, since there are other compression
structs and typedefs. zlib is not idential to gzip, which uses a different
header, so in WriteDataToArchive(), LIBZ is correct, and GZIP is incorrect.The cf* changes in pg_backup_archiver could be split out into a separate
commit. It's strictly a code simplification - not just preparation for more
compression algorithms. The commit message should "See also:
bf9aa490db24b2334b3595ee33653bf2fe39208c".The changes in 0002 for cfopen_write seem insufficient:
|+ if (compressionMethod == COMPRESSION_NONE)
|+ fp = cfopen(path, mode, compressionMethod, 0);
| else
| {
| #ifdef HAVE_LIBZ
| char *fname;
|
| fname = psprintf("%s.gz", path);
|- fp = cfopen(fname, mode, compression);
|+ fp = cfopen(fname, mode, compressionMethod, compressionLevel);
| free_keep_errno(fname);
| #elseThe only difference between the LIBZ and uncompressed case is the file
extension, and it'll be the only difference with LZ4 too. So I suggest to
first handle the file extension, and the rest of the code path is not
conditional on the compression method. I don't think cfopen_write even needs
HAVE_LIBZ - can't you handle that in cfopen_internal() ?This patch rejects -Z0, which ought to be accepted:
./src/bin/pg_dump/pg_dump -h /tmp regression -Fc -Z0 |wc
pg_dump: error: can only specify -Z/--compress [LEVEL] when method is setYour 0003 patch shouldn't reference LZ4: +#ifndef HAVE_LIBLZ4 + if (*compressionMethod == COMPRESSION_LZ4) + supports_compression = false; +#endifThe 0004 patch renames zlibOutSize to outsize - I think the patch series should
be constructed such as to minimize the size of the method-specific patches. I
say this anticipating also adding support for zstd. The preliminary patches
should have all the boring stuff. It would help for reviewing to keep the
patches split up, or to enumerate all the boring things that are being renamed
(like change OutputContext to cfp, rename zlibOutSize, ...).0004: The include should use <lz4.h> and not "lz4.h"
freebsd/cfbot is failing.
I suggested off-list to add an 0099 patch to change LZ4 to the default, to
exercise it more on CI.On Sat, Mar 26, 2022 at 01:33:36PM -0500, Justin Pryzby wrote:
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
You're passing both the compression method and level. I think there should
be a structure which includes both. In the future, that can also handle
additional options.I'm not sure if there's anything worth saving, but I did that last year with
0003-Support-multiple-compression-algs-levels-opts.patch
I sent a rebased copy off-list.
/messages/by-id/20210104025321.GA9712@telsasoft.com| fatal("not built with LZ4 support");
| fatal("not built with lz4 support");Please use consistent capitalization of "lz4" - then the compiler can optimize
away duplicate strings.0004: The include should use <lz4.h> and not "lz4.h"
Also, use USE_LZ4 rather than HAVE_LIBLZ4, per 75eae0908.
------- Original Message -------
On Sunday, June 26th, 2022 at 5:55 PM, Justin Pryzby <pryzby@telsasoft.com> wrote:
Hi,
Will you be able to send a rebased patch for the next CF ?
Please find a rebased and heavily refactored patchset. Since parts of this
patchset were already committed, I restarted numbering. I am not certain if
this is the preferred way. This makes alignment with previous comments a bit
harder
If you update for the review comments I sent in March, I'll plan to do another
round of review.
I have updated for "some" of the comments. This is not an unwillingness to
incorporate those specific comments. Simply this patchset had started to divert
heavily already based on comments from Mr. Paquier who had already requested for
the APIs to be refactored to use function pointers. This is happening in 0002 of
the patchset. 0001 of the patchset is using the new compression.h under common.
This patchset should be considered a late draft, as commentary, documentation,
and some finer details are not yet finalized; because I am expecting the proposed
refactor to receive a wealth of comments. It would be helpful to understand if
the proposed direction is something worth to be worked upon, before moving to the
finer details.
For what is worth, I am the sole author of the current patchset.
Cheers,
//Georgios
Show quoted text
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
LZ4F_HEADER_SIZE_MAX isn't defined in old LZ4.
I ran into that on an ubuntu LTS, so I don't think it's so old that it
shouldn't be handled more gracefully. LZ4 should either have an explicit
version check, or else shouldn't depend on that feature (or should define a
safe fallback version if the library header doesn't define it).https://packages.ubuntu.com/liblz4-1
0003: typo: of legacy => or legacy
There are a large number of ifdefs being added here - it'd be nice to minimize
that. basebackup was organized to use separate files, which is one way.$ git grep -c 'ifdef .*LZ4' src/bin/pg_dump/compress_io.c
src/bin/pg_dump/compress_io.c:19In last year's CF entry, I had made a union within CompressorState. LZ4
doesn't need z_streamp (and ztsd will need ZSTD_outBuffer, ZSTD_inBuffer,
ZSTD_CStream).0002: I wonder if you're able to re-use any of the basebackup parsing stuff
from commit ffd53659c. You're passing both the compression method and level.
I think there should be a structure which includes both. In the future, that
can also handle additional options. I hope to re-use these same things for
wal_compression=method:level.You renamed this:
|- COMPR_ALG_LIBZ
|-} CompressionAlgorithm;
|+ COMPRESSION_GZIP,
|+} CompressionMethod;..But I don't think that's an improvement. If you were to change it, it should
say something like PGDUMP_COMPRESS_ZLIB, since there are other compression
structs and typedefs. zlib is not idential to gzip, which uses a different
header, so in WriteDataToArchive(), LIBZ is correct, and GZIP is incorrect.The cf* changes in pg_backup_archiver could be split out into a separate
commit. It's strictly a code simplification - not just preparation for more
compression algorithms. The commit message should "See also:
bf9aa490db24b2334b3595ee33653bf2fe39208c".The changes in 0002 for cfopen_write seem insufficient:
|+ if (compressionMethod == COMPRESSION_NONE)
|+ fp = cfopen(path, mode, compressionMethod, 0);
| else
| {
| #ifdef HAVE_LIBZ
| char *fname;
|
| fname = psprintf("%s.gz", path);
|- fp = cfopen(fname, mode, compression);
|+ fp = cfopen(fname, mode, compressionMethod, compressionLevel);
| free_keep_errno(fname);
| #elseThe only difference between the LIBZ and uncompressed case is the file
extension, and it'll be the only difference with LZ4 too. So I suggest to
first handle the file extension, and the rest of the code path is not
conditional on the compression method. I don't think cfopen_write even needs
HAVE_LIBZ - can't you handle that in cfopen_internal() ?This patch rejects -Z0, which ought to be accepted:
./src/bin/pg_dump/pg_dump -h /tmp regression -Fc -Z0 |wc
pg_dump: error: can only specify -Z/--compress [LEVEL] when method is setYour 0003 patch shouldn't reference LZ4: +#ifndef HAVE_LIBLZ4 + if (*compressionMethod == COMPRESSION_LZ4) + supports_compression = false; +#endifThe 0004 patch renames zlibOutSize to outsize - I think the patch series should
be constructed such as to minimize the size of the method-specific patches. I
say this anticipating also adding support for zstd. The preliminary patches
should have all the boring stuff. It would help for reviewing to keep the
patches split up, or to enumerate all the boring things that are being renamed
(like change OutputContext to cfp, rename zlibOutSize, ...).0004: The include should use <lz4.h> and not "lz4.h"
freebsd/cfbot is failing.
I suggested off-list to add an 0099 patch to change LZ4 to the default, to
exercise it more on CI.On Sat, Mar 26, 2022 at 01:33:36PM -0500, Justin Pryzby wrote:
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
You're passing both the compression method and level. I think there should
be a structure which includes both. In the future, that can also handle
additional options.I'm not sure if there's anything worth saving, but I did that last year with
0003-Support-multiple-compression-algs-levels-opts.patch
I sent a rebased copy off-list.
/messages/by-id/20210104025321.GA9712@telsasoft.com| fatal("not built with LZ4 support");
| fatal("not built with lz4 support");Please use consistent capitalization of "lz4" - then the compiler can optimize
away duplicate strings.0004: The include should use <lz4.h> and not "lz4.h"
Also, use USE_LZ4 rather than HAVE_LIBLZ4, per 75eae0908.
Attachments:
v7-0002-Introduce-Compressor-API-in-pg_dump.patchtext/x-patch; name=v7-0002-Introduce-Compressor-API-in-pg_dump.patchDownload
From b8e1b7fe02a898326c8adab9dcc67d9e4c281621 Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokolatos@pm.me>
Date: Tue, 5 Jul 2022 12:33:52 +0000
Subject: [PATCH v7 2/3] Introduce Compressor API in pg_dump
The purpose of this API is to allow for easier addition of new compression
methods. CompressFileHandle is substituting the cfp* family of functions under a
struct of function pointers for opening, writing, etc. The implementor of a new
compression method is now able to "simply" just add those definitions.
---
src/bin/pg_dump/Makefile | 1 +
src/bin/pg_dump/compress_gzip.c | 390 ++++++++++++
src/bin/pg_dump/compress_gzip.h | 9 +
src/bin/pg_dump/compress_io.c | 817 ++++++--------------------
src/bin/pg_dump/compress_io.h | 69 ++-
src/bin/pg_dump/pg_backup_archiver.c | 40 +-
src/bin/pg_dump/pg_backup_custom.c | 23 +-
src/bin/pg_dump/pg_backup_directory.c | 85 +--
8 files changed, 730 insertions(+), 704 deletions(-)
create mode 100644 src/bin/pg_dump/compress_gzip.c
create mode 100644 src/bin/pg_dump/compress_gzip.h
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index 2f524b09bf..2d777ec213 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -23,6 +23,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
OBJS = \
$(WIN32RES) \
+ compress_gzip.o \
compress_io.o \
dumputils.o \
parallel.o \
diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
new file mode 100644
index 0000000000..bc6d1abc77
--- /dev/null
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -0,0 +1,390 @@
+#include "postgres_fe.h"
+#include "pg_backup_utils.h"
+
+#include "compress_gzip.h"
+
+#ifdef HAVE_LIBZ
+#include "zlib.h"
+/*----------------------
+ * Compressor API
+ *----------------------
+ */
+typedef struct GzipCompressorState
+{
+ int compressionLevel;
+ z_streamp zp;
+
+ void *outbuf;
+ size_t outsize;
+} GzipCompressorState;
+
+/* Private routines that support gzip compressed data I/O */
+static void
+DeflateCompressorGzip(ArchiveHandle *AH, CompressorState *cs, bool flush)
+{
+ GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private;
+ z_streamp zp = gzipcs->zp;
+ void *out = gzipcs->outbuf;
+ int res = Z_OK;
+
+ while (gzipcs->zp->avail_in != 0 || flush)
+ {
+ res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
+ if (res == Z_STREAM_ERROR)
+ pg_fatal("could not compress data: %s", zp->msg);
+ if ((flush && (zp->avail_out < gzipcs->outsize))
+ || (zp->avail_out == 0)
+ || (zp->avail_in != 0)
+ )
+ {
+ /*
+ * Extra paranoia: avoid zero-length chunks, since a zero length
+ * chunk is the EOF marker in the custom format. This should never
+ * happen but...
+ */
+ if (zp->avail_out < gzipcs->outsize)
+ {
+ /*
+ * Any write function should do its own error checking but to
+ * make sure we do a check here as well...
+ */
+ size_t len = gzipcs->outsize - zp->avail_out;
+
+ cs->writeF(AH, (char *) out, len);
+ }
+ zp->next_out = out;
+ zp->avail_out = gzipcs->outsize;
+ }
+
+ if (res == Z_STREAM_END)
+ break;
+ }
+}
+
+static void
+EndCompressorGzip(ArchiveHandle *AH, CompressorState *cs)
+{
+ GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private;
+ z_streamp zp;
+
+ if (gzipcs->zp)
+ {
+ zp = gzipcs->zp;
+ zp->next_in = NULL;
+ zp->avail_in = 0;
+
+ /* Flush any remaining data from zlib buffer */
+ DeflateCompressorGzip(AH, cs, true);
+
+ if (deflateEnd(zp) != Z_OK)
+ pg_fatal("could not close compression stream: %s", zp->msg);
+
+ pg_free(gzipcs->outbuf);
+ pg_free(gzipcs->zp);
+ }
+
+ pg_free(gzipcs);
+ cs->private = NULL;
+}
+
+static void
+WriteDataToArchiveGzip(ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen)
+{
+ GzipCompressorState *gzipcs = (GzipCompressorState *) cs->private;
+ z_streamp zp;
+
+ if (!gzipcs->zp)
+ {
+ zp = gzipcs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+ zp->zalloc = Z_NULL;
+ zp->zfree = Z_NULL;
+ zp->opaque = Z_NULL;
+
+ /*
+ * outsize is the buffer size we tell zlib it can output to. We
+ * actually allocate one extra byte because some routines want to
+ * append a trailing zero byte to the zlib output.
+ */
+ gzipcs->outbuf = pg_malloc(ZLIB_OUT_SIZE + 1);
+ gzipcs->outsize = ZLIB_OUT_SIZE;
+
+ if (deflateInit(zp, gzipcs->compressionLevel) != Z_OK)
+ pg_fatal("could not initialize compression library: %s", zp->msg);
+
+ /* Just be paranoid - maybe End is called after Start, with no Write */
+ zp->next_out = gzipcs->outbuf;
+ zp->avail_out = gzipcs->outsize;
+ }
+
+ gzipcs->zp->next_in = (void *) unconstify(void *, data);
+ gzipcs->zp->avail_in = dLen;
+ DeflateCompressorGzip(AH, cs, false);
+}
+
+static void
+ReadDataFromArchiveGzip(ArchiveHandle *AH, CompressorState *cs)
+{
+ z_streamp zp;
+ char *out;
+ int res = Z_OK;
+ size_t cnt;
+ char *buf;
+ size_t buflen;
+
+ zp = (z_streamp) pg_malloc(sizeof(z_stream));
+ zp->zalloc = Z_NULL;
+ zp->zfree = Z_NULL;
+ zp->opaque = Z_NULL;
+
+ buf = pg_malloc(ZLIB_IN_SIZE);
+ buflen = ZLIB_IN_SIZE;
+
+ out = pg_malloc(ZLIB_OUT_SIZE + 1);
+
+ if (inflateInit(zp) != Z_OK)
+ pg_fatal("could not initialize compression library: %s",
+ zp->msg);
+
+ /* no minimal chunk size for zlib */
+ while ((cnt = cs->readF(AH, &buf, &buflen)))
+ {
+ zp->next_in = (void *) buf;
+ zp->avail_in = cnt;
+
+ while (zp->avail_in > 0)
+ {
+ zp->next_out = (void *) out;
+ zp->avail_out = ZLIB_OUT_SIZE;
+
+ res = inflate(zp, 0);
+ if (res != Z_OK && res != Z_STREAM_END)
+ pg_fatal("could not uncompress data: %s", zp->msg);
+
+ out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
+ ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
+ }
+ }
+
+ zp->next_in = NULL;
+ zp->avail_in = 0;
+ while (res != Z_STREAM_END)
+ {
+ zp->next_out = (void *) out;
+ zp->avail_out = ZLIB_OUT_SIZE;
+ res = inflate(zp, 0);
+ if (res != Z_OK && res != Z_STREAM_END)
+ pg_fatal("could not uncompress data: %s", zp->msg);
+
+ out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
+ ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
+ }
+
+ if (inflateEnd(zp) != Z_OK)
+ pg_fatal("could not close compression library: %s", zp->msg);
+
+ free(buf);
+ free(out);
+ free(zp);
+}
+
+/* Public routines that support gzip compressed data I/O */
+void
+InitCompressorGzip(CompressorState *cs, int compressionLevel)
+{
+ GzipCompressorState *gzipcs;
+
+ cs->readData = ReadDataFromArchiveGzip;
+ cs->writeData = WriteDataToArchiveGzip;
+ cs->end = EndCompressorGzip;
+
+ gzipcs = (GzipCompressorState *) pg_malloc0(sizeof(GzipCompressorState));
+ gzipcs->compressionLevel = compressionLevel;
+
+ cs->private = gzipcs;
+}
+
+
+/*----------------------
+ * Compress File API
+ *----------------------
+ */
+
+typedef struct GzipData
+{
+ gzFile fp;
+ int compressionLevel;
+} GzipData;
+
+static size_t
+Gzip_read(void *ptr, size_t size, CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+ size_t ret;
+
+ ret = gzread(gd->fp, ptr, size);
+ if (ret != size && !gzeof(gd->fp))
+ {
+ int errnum;
+ const char *errmsg = gzerror(gd->fp, &errnum);
+
+ pg_fatal("could not read from input file: %s",
+ errnum == Z_ERRNO ? strerror(errno) : errmsg);
+ }
+
+ return ret;
+}
+
+static size_t
+Gzip_write(const void *ptr, size_t size, CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+
+ return gzwrite(gd->fp, ptr, size);
+}
+
+static int
+Gzip_getc(CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+ int ret;
+
+ errno = 0;
+ ret = gzgetc(gd->fp);
+ if (ret == EOF)
+ {
+ if (!gzeof(gd->fp))
+ pg_fatal("could not read from input file: %s", strerror(errno));
+ else
+ pg_fatal("could not read from input file: end of file");
+ }
+
+ return ret;
+}
+
+static char *
+Gzip_gets(char *ptr, int size, CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+
+ return gzgets(gd->fp, ptr, size);
+}
+
+static int
+Gzip_close(CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+ int save_errno;
+ int ret;
+
+ CFH->private = NULL;
+
+ ret = gzclose(gd->fp);
+
+ save_errno = errno;
+ free(gd);
+ errno = save_errno;
+
+ return ret;
+}
+
+static int
+Gzip_eof(CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+
+ return gzeof(gd->fp);
+}
+
+static const char *
+Gzip_get_error(CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+ const char *errmsg;
+ int errnum;
+
+ errmsg = gzerror(gd->fp, &errnum);
+ if (errnum == Z_ERRNO)
+ errmsg = strerror(errno);
+
+ return errmsg;
+}
+
+static int
+Gzip_open(const char *path, int fd, const char *mode, CompressFileHandle * CFH)
+{
+ GzipData *gd = (GzipData *) CFH->private;
+ char mode_compression[32];
+
+ if (gd->compressionLevel != Z_DEFAULT_COMPRESSION)
+ {
+ /*
+ * user has specified a compression level, so tell zlib to use it
+ */
+ snprintf(mode_compression, sizeof(mode_compression), "%s%d",
+ mode, gd->compressionLevel);
+ }
+ else
+ strcpy(mode_compression, mode);
+
+ if (fd >= 0)
+ gd->fp = gzdopen(dup(fd), mode_compression);
+ else
+ gd->fp = gzopen(path, mode_compression);
+
+ if (gd->fp == NULL)
+ return 1;
+
+ return 0;
+}
+
+static int
+Gzip_open_write(const char *path, const char *mode, CompressFileHandle * CFH)
+{
+ char *fname;
+ int ret;
+ int save_errno;
+
+ fname = psprintf("%s.gz", path);
+ ret = CFH->open(fname, -1, mode, CFH);
+
+ save_errno = errno;
+ pg_free(fname);
+ errno = save_errno;
+
+ return ret;
+}
+
+void
+InitCompressGzip(CompressFileHandle * CFH, int compressionLevel)
+{
+ GzipData *gd;
+
+ CFH->open = Gzip_open;
+ CFH->open_write = Gzip_open_write;
+ CFH->read = Gzip_read;
+ CFH->write = Gzip_write;
+ CFH->gets = Gzip_gets;
+ CFH->getc = Gzip_getc;
+ CFH->close = Gzip_close;
+ CFH->eof = Gzip_eof;
+ CFH->get_error = Gzip_get_error;
+
+ gd = pg_malloc0(sizeof(GzipData));
+ gd->compressionLevel = compressionLevel;
+
+ CFH->private = gd;
+}
+#else /* HAVE_LIBZ */
+void
+InitCompressorGzip(CompressorState *cs, int compressionLevel)
+{
+ pg_fatal("not built with zlib support");
+}
+
+void
+InitCompressGzip(CompressFileHandle * CFH, int compressionLevel)
+{
+ pg_fatal("not built with zlib support");
+}
+#endif /* HAVE_LIBZ */
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
new file mode 100644
index 0000000000..ab0362c1f3
--- /dev/null
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -0,0 +1,9 @@
+#ifndef _COMPRESS_GZIP_H_
+#define _COMPRESS_GZIP_H_
+
+#include "compress_io.h"
+
+extern void InitCompressorGzip(CompressorState *cs, int compressionLevel);
+extern void InitCompressGzip(CompressFileHandle * CFH, int compressionLevel);
+
+#endif /* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 146396172b..1948ee3d57 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -51,9 +51,12 @@
*
*-------------------------------------------------------------------------
*/
+#include <sys/stat.h>
+#include <unistd.h>
#include "postgres_fe.h"
#include "compress_io.h"
+#include "compress_gzip.h"
#include "pg_backup_utils.h"
/*----------------------
@@ -61,113 +64,73 @@
*----------------------
*/
-/* typedef appears in compress_io.h */
-struct CompressorState
+/* Private routines that support uncompressed data I/O */
+static void
+ReadDataFromArchiveNone(ArchiveHandle *AH, CompressorState *cs)
{
- pg_compress_algorithm compress_algorithm;
- WriteFunc writeF;
+ size_t cnt;
+ char *buf;
+ size_t buflen;
-#ifdef HAVE_LIBZ
- z_streamp zp;
- char *zlibOut;
- size_t zlibOutSize;
-#endif
-};
+ buf = pg_malloc(ZLIB_OUT_SIZE);
+ buflen = ZLIB_OUT_SIZE;
-/* Routines that support zlib compressed data I/O */
-#ifdef HAVE_LIBZ
-static void InitCompressorZlib(CompressorState *cs, int level);
-static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
- bool flush);
-static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
-static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
- const char *data, size_t dLen);
-static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
-#endif
+ while ((cnt = cs->readF(AH, &buf, &buflen)))
+ {
+ ahwrite(buf, 1, cnt, AH);
+ }
-/* Routines that support uncompressed data I/O */
-static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
-static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
- const char *data, size_t dLen);
+ free(buf);
+}
+
+static void
+WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen)
+{
+ cs->writeF(AH, data, dLen);
+}
+
+static void
+EndCompressorNone(ArchiveHandle *AH, CompressorState *cs)
+{
+ /* no op */
+}
+
+static void
+InitCompressorNone(CompressorState *cs)
+{
+ cs->readData = ReadDataFromArchiveNone;
+ cs->writeData = WriteDataToArchiveNone;
+ cs->end = EndCompressorNone;
+}
/* Public interface routines */
/* Allocate a new compressor */
CompressorState *
AllocateCompressor(const pg_compress_specification compress_spec,
- WriteFunc writeF)
+ ReadFunc readF, WriteFunc writeF)
{
CompressorState *cs;
-#ifndef HAVE_LIBZ
- if (compress_spec.algorithm == PG_COMPRESSION_GZIP)
- pg_fatal("not built with zlib support");
-#endif
-
cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
+ cs->readF = readF;
cs->writeF = writeF;
- cs->compress_algorithm = compress_spec.algorithm;
-
- /*
- * Perform compression algorithm specific initialization.
- */
-#ifdef HAVE_LIBZ
- if (cs->compress_algorithm == PG_COMPRESSION_GZIP)
- InitCompressorZlib(cs, compress_spec.level);
-#endif
- return cs;
-}
-
-/*
- * Read all compressed data from the input stream (via readF) and print it
- * out with ahwrite().
- */
-void
-ReadDataFromArchive(ArchiveHandle *AH, pg_compress_specification compress_spec,
- ReadFunc readF)
-{
switch (compress_spec.algorithm)
{
case PG_COMPRESSION_NONE:
- ReadDataFromArchiveNone(AH, readF);
+ InitCompressorNone(cs);
break;
case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ReadDataFromArchiveZlib(AH, readF);
-#else
- pg_fatal("not built with zlib support");
-#endif
+ InitCompressorGzip(cs, compress_spec.level);
break;
default:
pg_fatal("invalid compression method");
break;
}
-}
-/*
- * Compress and write data to the output stream (via writeF).
- */
-void
-WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
- const void *data, size_t dLen)
-{
- switch (cs->compress_algorithm)
- {
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- WriteDataToArchiveZlib(AH, cs, data, dLen);
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- case PG_COMPRESSION_NONE:
- WriteDataToArchiveNone(AH, cs, data, dLen);
- break;
- default:
- pg_fatal("invalid compression method");
- break;
- }
+ return cs;
}
/*
@@ -176,243 +139,28 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
void
EndCompressor(ArchiveHandle *AH, CompressorState *cs)
{
- switch (cs->compress_algorithm)
- {
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- EndCompressorZlib(AH, cs);
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- case PG_COMPRESSION_NONE:
- free(cs);
- break;
-
- default:
- pg_fatal("invalid compression method");
- break;
- }
-}
-
-/* Private routines, specific to each compression method. */
-
-#ifdef HAVE_LIBZ
-/*
- * Functions for zlib compressed output.
- */
-
-static void
-InitCompressorZlib(CompressorState *cs, int level)
-{
- z_streamp zp;
-
- zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
- zp->zalloc = Z_NULL;
- zp->zfree = Z_NULL;
- zp->opaque = Z_NULL;
-
- /*
- * zlibOutSize is the buffer size we tell zlib it can output to. We
- * actually allocate one extra byte because some routines want to append a
- * trailing zero byte to the zlib output.
- */
- cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
- cs->zlibOutSize = ZLIB_OUT_SIZE;
-
- if (deflateInit(zp, level) != Z_OK)
- pg_fatal("could not initialize compression library: %s",
- zp->msg);
-
- /* Just be paranoid - maybe End is called after Start, with no Write */
- zp->next_out = (void *) cs->zlibOut;
- zp->avail_out = cs->zlibOutSize;
-}
-
-static void
-EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
-{
- z_streamp zp = cs->zp;
-
- zp->next_in = NULL;
- zp->avail_in = 0;
-
- /* Flush any remaining data from zlib buffer */
- DeflateCompressorZlib(AH, cs, true);
-
- if (deflateEnd(zp) != Z_OK)
- pg_fatal("could not close compression stream: %s", zp->msg);
-
- free(cs->zlibOut);
- free(cs->zp);
+ cs->end(AH, cs);
+ pg_free(cs);
}
-static void
-DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
-{
- z_streamp zp = cs->zp;
- char *out = cs->zlibOut;
- int res = Z_OK;
-
- while (cs->zp->avail_in != 0 || flush)
- {
- res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
- if (res == Z_STREAM_ERROR)
- pg_fatal("could not compress data: %s", zp->msg);
- if ((flush && (zp->avail_out < cs->zlibOutSize))
- || (zp->avail_out == 0)
- || (zp->avail_in != 0)
- )
- {
- /*
- * Extra paranoia: avoid zero-length chunks, since a zero length
- * chunk is the EOF marker in the custom format. This should never
- * happen but...
- */
- if (zp->avail_out < cs->zlibOutSize)
- {
- /*
- * Any write function should do its own error checking but to
- * make sure we do a check here as well...
- */
- size_t len = cs->zlibOutSize - zp->avail_out;
-
- cs->writeF(AH, out, len);
- }
- zp->next_out = (void *) out;
- zp->avail_out = cs->zlibOutSize;
- }
-
- if (res == Z_STREAM_END)
- break;
- }
-}
-
-static void
-WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
- const char *data, size_t dLen)
-{
- cs->zp->next_in = (void *) unconstify(char *, data);
- cs->zp->avail_in = dLen;
- DeflateCompressorZlib(AH, cs, false);
-}
-
-static void
-ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
-{
- z_streamp zp;
- char *out;
- int res = Z_OK;
- size_t cnt;
- char *buf;
- size_t buflen;
-
- zp = (z_streamp) pg_malloc(sizeof(z_stream));
- zp->zalloc = Z_NULL;
- zp->zfree = Z_NULL;
- zp->opaque = Z_NULL;
-
- buf = pg_malloc(ZLIB_IN_SIZE);
- buflen = ZLIB_IN_SIZE;
-
- out = pg_malloc(ZLIB_OUT_SIZE + 1);
-
- if (inflateInit(zp) != Z_OK)
- pg_fatal("could not initialize compression library: %s",
- zp->msg);
-
- /* no minimal chunk size for zlib */
- while ((cnt = readF(AH, &buf, &buflen)))
- {
- zp->next_in = (void *) buf;
- zp->avail_in = cnt;
-
- while (zp->avail_in > 0)
- {
- zp->next_out = (void *) out;
- zp->avail_out = ZLIB_OUT_SIZE;
-
- res = inflate(zp, 0);
- if (res != Z_OK && res != Z_STREAM_END)
- pg_fatal("could not uncompress data: %s", zp->msg);
-
- out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
- ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
- }
- }
-
- zp->next_in = NULL;
- zp->avail_in = 0;
- while (res != Z_STREAM_END)
- {
- zp->next_out = (void *) out;
- zp->avail_out = ZLIB_OUT_SIZE;
- res = inflate(zp, 0);
- if (res != Z_OK && res != Z_STREAM_END)
- pg_fatal("could not uncompress data: %s", zp->msg);
-
- out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
- ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
- }
-
- if (inflateEnd(zp) != Z_OK)
- pg_fatal("could not close compression library: %s", zp->msg);
-
- free(buf);
- free(out);
- free(zp);
-}
-#endif /* HAVE_LIBZ */
-
-
-/*
- * Functions for uncompressed output.
- */
-
-static void
-ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
-{
- size_t cnt;
- char *buf;
- size_t buflen;
-
- buf = pg_malloc(ZLIB_OUT_SIZE);
- buflen = ZLIB_OUT_SIZE;
-
- while ((cnt = readF(AH, &buf, &buflen)))
- {
- ahwrite(buf, 1, cnt, AH);
- }
-
- free(buf);
-}
-
-static void
-WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
- const char *data, size_t dLen)
-{
- cs->writeF(AH, data, dLen);
-}
-
-
/*----------------------
* Compressed stream API
*----------------------
*/
-/*
- * cfp represents an open stream, wrapping the underlying FILE or gzFile
- * pointer. This is opaque to the callers.
- */
-struct cfp
+static int
+hasSuffix(const char *filename, const char *suffix)
{
- pg_compress_algorithm compress_algorithm;
- void *fp;
-};
+ int filenamelen = strlen(filename);
+ int suffixlen = strlen(suffix);
-#ifdef HAVE_LIBZ
-static int hasSuffix(const char *filename, const char *suffix);
-#endif
+ if (filenamelen < suffixlen)
+ return 0;
+
+ return memcmp(&filename[filenamelen - suffixlen],
+ suffix,
+ suffixlen) == 0;
+}
/* free() without changing errno; useful in several places below */
static void
@@ -425,392 +173,219 @@ free_keep_errno(void *p)
}
/*
- * Open a file for reading. 'path' is the file to open, and 'mode' should
- * be either "r" or "rb".
- *
- * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
- * doesn't already have it) and try again. So if you pass "foo" as 'path',
- * this will open either "foo" or "foo.gz".
- *
- * On failure, return NULL with an error code in errno.
+ * Compression None implementation
*/
-cfp *
-cfopen_read(const char *path, const char *mode)
+static size_t
+_read(void *ptr, size_t size, CompressFileHandle * CFH)
{
- cfp *fp;
- pg_compress_specification compress_spec = {0};
+ FILE *fp = (FILE *) CFH->private;
+ size_t ret;
- compress_spec.algorithm = PG_COMPRESSION_GZIP;
-#ifdef HAVE_LIBZ
- if (hasSuffix(path, ".gz"))
- fp = cfopen(path, mode, compress_spec);
- else
-#endif
- {
- compress_spec.algorithm = PG_COMPRESSION_NONE;
- fp = cfopen(path, mode, compress_spec);
-#ifdef HAVE_LIBZ
- if (fp == NULL)
- {
- char *fname;
-
- compress_spec.algorithm = PG_COMPRESSION_GZIP;
- fname = psprintf("%s.gz", path);
- fp = cfopen(fname, mode, compress_spec);
- free_keep_errno(fname);
- }
-#endif
- }
- return fp;
-}
+ if (size == 0)
+ return 0;
-/*
- * Open a file for writing. 'path' indicates the path name, and 'mode' must
- * be a filemode as accepted by fopen() and gzopen() that indicates writing
- * ("w", "wb", "a", or "ab").
- *
- * If 'compress_spec.algorithm' is GZIP, a gzip compressed stream is opened,
- * and 'compress_spec.level' used. The ".gz" suffix is automatically added to
- * 'path' in that case.
- *
- * On failure, return NULL with an error code in errno.
- */
-cfp *
-cfopen_write(const char *path, const char *mode,
- const pg_compress_specification compress_spec)
-{
- cfp *fp;
+ ret = fread(ptr, 1, size, fp);
+ if (ret != size && !feof(fp))
+ pg_fatal("could not read from input file: %s",
+ strerror(errno));
- if (compress_spec.algorithm == PG_COMPRESSION_NONE)
- fp = cfopen(path, mode, compress_spec);
- else
- {
-#ifdef HAVE_LIBZ
- char *fname;
-
- fname = psprintf("%s.gz", path);
- fp = cfopen(fname, mode, compress_spec);
- free_keep_errno(fname);
-#else
- pg_fatal("not built with zlib support");
- fp = NULL; /* keep compiler quiet */
-#endif
- }
- return fp;
+ return ret;
}
-/*
- * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or
- * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The
- * descriptor is not dup'ed and it is the caller's responsibility to do so.
- * The caller must verify that the 'compress_algorithm' is supported by the
- * current build.
- *
- * On failure, return NULL with an error code in errno.
- */
-static cfp *
-cfopen_internal(const char *path, int fd, const char *mode,
- pg_compress_algorithm compress_algorithm, int compressionLevel)
+static size_t
+_write(const void *ptr, size_t size, CompressFileHandle * CFH)
{
- cfp *fp = pg_malloc(sizeof(cfp));
-
- fp->compress_algorithm = compress_algorithm;
-
- switch (compress_algorithm)
- {
- case PG_COMPRESSION_NONE:
- if (fd >= 0)
- fp->fp = fdopen(fd, mode);
- else
- fp->fp = fopen(path, mode);
- if (fp->fp == NULL)
- {
- free_keep_errno(fp);
- fp = NULL;
- }
-
- break;
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- if (compressionLevel != Z_DEFAULT_COMPRESSION)
- {
- /*
- * user has specified a compression level, so tell zlib to use
- * it
- */
- char mode_compression[32];
-
- snprintf(mode_compression, sizeof(mode_compression), "%s%d",
- mode, compressionLevel);
- if (fd >= 0)
- fp->fp = gzdopen(fd, mode_compression);
- else
- fp->fp = gzopen(path, mode_compression);
- }
- else
- {
- /* don't specify a level, just use the zlib default */
- if (fd >= 0)
- fp->fp = gzdopen(fd, mode);
- else
- fp->fp = gzopen(path, mode);
- }
-
- if (fp->fp == NULL)
- {
- free_keep_errno(fp);
- fp = NULL;
- }
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- default:
- pg_fatal("invalid compression method");
- break;
- }
-
- return fp;
+ return fwrite(ptr, 1, size, (FILE *) CFH->private);
}
-cfp *
-cfopen(const char *path, const char *mode,
- const pg_compress_specification compress_spec)
+static const char *
+_get_error(CompressFileHandle * CFH)
{
- return cfopen_internal(path, -1, mode,
- compress_spec.algorithm,
- compress_spec.level);
+ return strerror(errno);
}
-cfp *
-cfdopen(int fd, const char *mode,
- const pg_compress_specification compress_spec)
+static char *
+_gets(char *ptr, int size, CompressFileHandle * CFH)
{
- return cfopen_internal(NULL, fd, mode,
- compress_spec.algorithm,
- compress_spec.level);
+ return fgets(ptr, size, (FILE *) CFH->private);
}
-int
-cfread(void *ptr, int size, cfp *fp)
+static int
+_getc(CompressFileHandle * CFH)
{
+ FILE *fp = (FILE *) CFH->private;
int ret;
- if (size == 0)
- return 0;
-
- switch (fp->compress_algorithm)
+ ret = fgetc(fp);
+ if (ret == EOF)
{
- case PG_COMPRESSION_NONE:
- ret = fread(ptr, 1, size, fp->fp);
- if (ret != size && !feof(fp->fp))
- READ_ERROR_EXIT(fp->fp);
-
- break;
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ret = gzread(fp->fp, ptr, size);
- if (ret != size && !gzeof(fp->fp))
- {
- int errnum;
- const char *errmsg = gzerror(fp->fp, &errnum);
-
- pg_fatal("could not read from input file: %s",
- errnum == Z_ERRNO ? strerror(errno) : errmsg);
- }
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
-
- default:
- pg_fatal("invalid compression method");
- break;
+ if (!feof(fp))
+ pg_fatal("could not read from input file: %s", strerror(errno));
+ else
+ pg_fatal("could not read from input file: end of file");
}
return ret;
}
-int
-cfwrite(const void *ptr, int size, cfp *fp)
+static int
+_close(CompressFileHandle * CFH)
{
+ FILE *fp = (FILE *) CFH->private;
int ret = 0;
- switch (fp->compress_algorithm)
- {
- case PG_COMPRESSION_NONE:
- ret = fwrite(ptr, 1, size, fp->fp);
- break;
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ret = gzwrite(fp->fp, ptr, size);
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- default:
- pg_fatal("invalid compression method");
- break;
- }
+ CFH->private = NULL;
+
+ if (fp)
+ ret = fclose(fp);
return ret;
}
-int
-cfgetc(cfp *fp)
+static int
+_eof(CompressFileHandle * CFH)
{
- int ret;
+ return feof((FILE *) CFH->private);
+}
- switch (fp->compress_algorithm)
- {
- case PG_COMPRESSION_NONE:
- ret = fgetc(fp->fp);
- if (ret == EOF)
- READ_ERROR_EXIT(fp->fp);
+static int
+_open(const char *path, int fd, const char *mode, CompressFileHandle * CFH)
+{
+ Assert(CFH->private == NULL);
- break;
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ret = gzgetc((gzFile) fp->fp);
- if (ret == EOF)
- {
- if (!gzeof(fp->fp))
- pg_fatal("could not read from input file: %s", strerror(errno));
- else
- pg_fatal("could not read from input file: end of file");
- }
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- default:
- pg_fatal("invalid compression method");
- break;
- }
+ if (fd >= 0)
+ CFH->private = fdopen(dup(fd), mode);
+ else
+ CFH->private = fopen(path, mode);
- return ret;
+ if (CFH->private == NULL)
+ return 1;
+
+ return 0;
}
-char *
-cfgets(cfp *fp, char *buf, int len)
+static int
+_open_write(const char *path, const char *mode, CompressFileHandle * CFH)
{
- char *ret;
+ Assert(CFH->private == NULL);
- switch (fp->compress_algorithm)
- {
- case PG_COMPRESSION_NONE:
- ret = fgets(buf, len, fp->fp);
+ CFH->private = fopen(path, mode);
+ if (CFH->private == NULL)
+ return 1;
- break;
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ret = gzgets(fp->fp, buf, len);
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- default:
- pg_fatal("invalid compression method");
- break;
- }
+ return 0;
+}
- return ret;
+static void
+InitCompressNone(CompressFileHandle * CFH)
+{
+ CFH->open = _open;
+ CFH->open_write = _open_write;
+ CFH->read = _read;
+ CFH->write = _write;
+ CFH->gets = _gets;
+ CFH->getc = _getc;
+ CFH->close = _close;
+ CFH->eof = _eof;
+ CFH->get_error = _get_error;
+
+ CFH->private = NULL;
}
-int
-cfclose(cfp *fp)
+/*
+ * Public interface
+ */
+CompressFileHandle *
+InitCompressFileHandle(const pg_compress_specification compress_spec)
{
- int ret;
+ CompressFileHandle *CFH;
- if (fp == NULL)
- {
- errno = EBADF;
- return EOF;
- }
+ CFH = pg_malloc0(sizeof(CompressFileHandle));
- switch (fp->compress_algorithm)
+ switch (compress_spec.algorithm)
{
case PG_COMPRESSION_NONE:
- ret = fclose(fp->fp);
- fp->fp = NULL;
-
+ InitCompressNone(CFH);
break;
case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ret = gzclose(fp->fp);
- fp->fp = NULL;
-#else
- pg_fatal("not built with zlib support");
-#endif
+ InitCompressGzip(CFH, compress_spec.level);
break;
default:
pg_fatal("invalid compression method");
break;
}
- free_keep_errno(fp);
-
- return ret;
+ return CFH;
}
-int
-cfeof(cfp *fp)
+/*
+ * Open a file for reading. 'path' is the file to open, and 'mode' should
+ * be either "r" or "rb".
+ *
+ * If the file at 'path' does not exist, we append the ".gz" suffix (if
+ * 'path' doesn't already have it) and try again. So if you pass "foo" as
+ * 'path', this will open either "foo" or "foo.gz", trying in that order.
+ *
+ * On failure, return NULL with an error code in errno.
+ *
+ */
+CompressFileHandle *
+InitDiscoverCompressFileHandle(const char *path, const char *mode)
{
- int ret;
+ CompressFileHandle *CFH = NULL;
+ struct stat st;
+ char *fname;
+ pg_compress_specification compress_spec = {0};
- switch (fp->compress_algorithm)
- {
- case PG_COMPRESSION_NONE:
- ret = feof(fp->fp);
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
- break;
- case PG_COMPRESSION_GZIP:
-#ifdef HAVE_LIBZ
- ret = gzeof(fp->fp);
-#else
- pg_fatal("not built with zlib support");
-#endif
- break;
- default:
- pg_fatal("invalid compression method");
- break;
- }
+ Assert(strcmp(mode, "r") == 0 || strcmp(mode, "rb") == 0);
- return ret;
-}
+ fname = strdup(path);
-const char *
-get_cfp_error(cfp *fp)
-{
- if (fp->compress_algorithm == PG_COMPRESSION_GZIP)
+ if (hasSuffix(fname, ".gz"))
+ compress_spec.algorithm = PG_COMPRESSION_GZIP;
+ else
{
+ bool exists;
+
+ exists = (stat(path, &st) == 0);
+ /* avoid unused warning if it is not build with compression */
+ if (exists)
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
#ifdef HAVE_LIBZ
- int errnum;
- const char *errmsg = gzerror(fp->fp, &errnum);
+ if (!exists)
+ {
+ free_keep_errno(fname);
+ fname = psprintf("%s.gz", path);
+ exists = (stat(fname, &st) == 0);
- if (errnum != Z_ERRNO)
- return errmsg;
-#else
- pg_fatal("not built with zlib support");
+ if (exists)
+ compress_spec.algorithm = PG_COMPRESSION_GZIP;
+ }
#endif
}
- return strerror(errno);
+ CFH = InitCompressFileHandle(compress_spec);
+ if (CFH->open(fname, -1, mode, CFH))
+ {
+ free_keep_errno(CFH);
+ CFH = NULL;
+ }
+ free_keep_errno(fname);
+
+ return CFH;
}
-#ifdef HAVE_LIBZ
-static int
-hasSuffix(const char *filename, const char *suffix)
+int
+DestroyCompressFileHandle(CompressFileHandle * CFH)
{
- int filenamelen = strlen(filename);
- int suffixlen = strlen(suffix);
+ int ret = 0;
- if (filenamelen < suffixlen)
- return 0;
+ if (CFH->private)
+ ret = CFH->close(CFH);
- return memcmp(&filename[filenamelen - suffixlen],
- suffix,
- suffixlen) == 0;
-}
+ free_keep_errno(CFH);
-#endif
+ return ret;
+}
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index 2b42f030a8..ec13374a52 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -52,34 +52,61 @@ typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len);
*/
typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen);
-/* struct definition appears in compress_io.c */
typedef struct CompressorState CompressorState;
+struct CompressorState
+{
+ /*
+ * Read all compressed data from the input stream (via readF) and print it
+ * out with ahwrite().
+ */
+ void (*readData) (ArchiveHandle *AH, CompressorState *cs);
+
+ /*
+ * Compress and write data to the output stream (via writeF).
+ */
+ void (*writeData) (ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen);
+ void (*end) (ArchiveHandle *AH, CompressorState *cs);
+
+ ReadFunc readF;
+ WriteFunc writeF;
+
+ void *private;
+};
extern CompressorState *AllocateCompressor(const pg_compress_specification compress_spec,
+ ReadFunc readF,
WriteFunc writeF);
-extern void ReadDataFromArchive(ArchiveHandle *AH,
- const pg_compress_specification compress_spec,
- ReadFunc readF);
-extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
- const void *data, size_t dLen);
extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs);
+/*
+ * Compress File Handle
+ */
+typedef struct CompressFileHandle CompressFileHandle;
+
+struct CompressFileHandle
+{
+ int (*open) (const char *path, int fd, const char *mode,
+ CompressFileHandle * CFH);
+ int (*open_write) (const char *path, const char *mode,
+ CompressFileHandle * cxt);
+ size_t (*read) (void *ptr, size_t size, CompressFileHandle * CFH);
+ size_t (*write) (const void *ptr, size_t size,
+ struct CompressFileHandle *CFH);
+ char *(*gets) (char *s, int size, CompressFileHandle * CFH);
+ int (*getc) (CompressFileHandle * CFH);
+ int (*eof) (CompressFileHandle * CFH);
+ int (*close) (CompressFileHandle * CFH);
+ const char *(*get_error) (CompressFileHandle * CFH);
+
+ void *private;
+};
+
-typedef struct cfp cfp;
+extern CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compress_spec);
-extern cfp *cfopen(const char *path, const char *mode,
- const pg_compress_specification compress_spec);
-extern cfp *cfdopen(int fd, const char *mode,
- pg_compress_specification compress_spec);
-extern cfp *cfopen_read(const char *path, const char *mode);
-extern cfp *cfopen_write(const char *path, const char *mode,
- const pg_compress_specification compress_spec);
-extern int cfread(void *ptr, int size, cfp *fp);
-extern int cfwrite(const void *ptr, int size, cfp *fp);
-extern int cfgetc(cfp *fp);
-extern char *cfgets(cfp *fp, char *buf, int len);
-extern int cfclose(cfp *fp);
-extern int cfeof(cfp *fp);
-extern const char *get_cfp_error(cfp *fp);
+extern CompressFileHandle * InitDiscoverCompressFileHandle(const char *path,
+ const char *mode);
+extern int DestroyCompressFileHandle(CompressFileHandle * CFH);
#endif
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index fa84c31ecb..186510c235 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -95,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH);
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
static void SetOutput(ArchiveHandle *AH, const char *filename,
const pg_compress_specification compress_spec);
-static cfp *SaveOutput(ArchiveHandle *AH);
-static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput);
+static CompressFileHandle * SaveOutput(ArchiveHandle *AH);
+static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle * savedOutput);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
static void restore_toc_entries_prefork(ArchiveHandle *AH,
@@ -272,7 +272,7 @@ CloseArchive(Archive *AHX)
/* Close the output */
errno = 0;
- res = cfclose(AH->OF);
+ res = DestroyCompressFileHandle(AH->OF);
if (res != 0)
pg_fatal("could not close output file: %m");
@@ -354,7 +354,7 @@ RestoreArchive(Archive *AHX)
RestoreOptions *ropt = AH->public.ropt;
bool parallel_mode;
TocEntry *te;
- cfp *sav;
+ CompressFileHandle *sav;
AH->stage = STAGE_INITIALIZING;
@@ -1119,7 +1119,7 @@ PrintTOCSummary(Archive *AHX)
TocEntry *te;
pg_compress_specification out_compress_spec;
teSection curSection;
- cfp *sav;
+ CompressFileHandle *sav;
const char *fmtName;
char stamp_str[64];
@@ -1495,6 +1495,7 @@ static void
SetOutput(ArchiveHandle *AH, const char *filename,
const pg_compress_specification compress_spec)
{
+ CompressFileHandle *CFH;
const char *mode;
int fn = -1;
@@ -1517,33 +1518,32 @@ SetOutput(ArchiveHandle *AH, const char *filename,
else
mode = PG_BINARY_W;
- if (fn >= 0)
- AH->OF = cfdopen(dup(fn), mode, compress_spec);
- else
- AH->OF = cfopen(filename, mode, compress_spec);
+ CFH = InitCompressFileHandle(compress_spec);
- if (!AH->OF)
+ if (CFH->open(filename, fn, mode, CFH))
{
if (filename)
pg_fatal("could not open output file \"%s\": %m", filename);
else
pg_fatal("could not open output file: %m");
}
+
+ AH->OF = CFH;
}
-static cfp *
+static CompressFileHandle *
SaveOutput(ArchiveHandle *AH)
{
- return (cfp *) AH->OF;
+ return (CompressFileHandle *) AH->OF;
}
static void
-RestoreOutput(ArchiveHandle *AH, cfp *savedOutput)
+RestoreOutput(ArchiveHandle *AH, CompressFileHandle * savedOutput)
{
int res;
errno = 0;
- res = cfclose(AH->OF);
+ res = DestroyCompressFileHandle(AH->OF);
if (res != 0)
pg_fatal("could not close output file: %m");
@@ -1682,7 +1682,11 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
else if (RestoringToDB(AH))
bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
else
- bytes_written = cfwrite(ptr, size * nmemb, AH->OF);
+ {
+ CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
+
+ bytes_written = CFH->write(ptr, size * nmemb, CFH);
+ }
if (bytes_written != size * nmemb)
WRITE_ERROR_EXIT;
@@ -2171,6 +2175,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
SetupWorkerPtrType setupWorkerPtr)
{
ArchiveHandle *AH;
+ CompressFileHandle *CFH;
pg_compress_specification out_compress_spec = {0};
pg_log_debug("allocating AH for %s, format %d",
@@ -2226,7 +2231,10 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
/* Open stdout with no compression for AH output handle */
out_compress_spec.algorithm = PG_COMPRESSION_NONE;
- AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec);
+ CFH = InitCompressFileHandle(out_compress_spec);
+ if (CFH->open(NULL, fileno(stdout), PG_BINARY_A, CFH))
+ pg_fatal("could not open stdout for appending: %m");
+ AH->OF = CFH;
/*
* On Windows, we need to use binary mode to read/write non-text files,
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 43ccbe1339..e8f8c4c0d6 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -298,7 +298,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
_WriteByte(AH, BLK_DATA); /* Block type */
WriteInt(AH, te->dumpId); /* For sanity check */
- ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc);
+ ctx->cs = AllocateCompressor(AH->compress_spec,
+ NULL,
+ _CustomWriteFunc);
}
/*
@@ -317,15 +319,15 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
CompressorState *cs = ctx->cs;
if (dLen > 0)
- /* WriteDataToArchive() internally throws write errors */
- WriteDataToArchive(AH, cs, data, dLen);
+ /* writeData() internally throws write errors */
+ cs->writeData(AH, cs, data, dLen);
}
/*
* Called by the archiver when a dumper's 'DataDumper' routine has
* finished.
*
- * Optional.
+ * Mandatory.
*/
static void
_EndData(ArchiveHandle *AH, TocEntry *te)
@@ -333,6 +335,8 @@ _EndData(ArchiveHandle *AH, TocEntry *te)
lclContext *ctx = (lclContext *) AH->formatData;
EndCompressor(AH, ctx->cs);
+ ctx->cs = NULL;
+
/* Send the end marker */
WriteInt(AH, 0);
}
@@ -377,7 +381,9 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
WriteInt(AH, oid);
- ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc);
+ ctx->cs = AllocateCompressor(AH->compress_spec,
+ NULL,
+ _CustomWriteFunc);
}
/*
@@ -566,7 +572,12 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void
_PrintData(ArchiveHandle *AH)
{
- ReadDataFromArchive(AH, AH->compress_spec, _CustomReadFunc);
+ CompressorState *cs;
+
+ cs = AllocateCompressor(AH->compress_spec,
+ _CustomReadFunc, NULL);
+ cs->readData(AH, cs);
+ EndCompressor(AH, cs);
}
static void
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 17c7130c75..3f35bc9815 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -50,9 +50,9 @@ typedef struct
*/
char *directory;
- cfp *dataFH; /* currently open data file */
+ CompressFileHandle *dataFH; /* currently open data file */
- cfp *blobsTocFH; /* file handle for blobs.toc */
+ CompressFileHandle *blobsTocFH; /* file handle for blobs.toc */
ParallelState *pstate; /* for parallel backup / restore */
} lclContext;
@@ -198,11 +198,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
else
{ /* Read Mode */
char fname[MAXPGPATH];
- cfp *tocFH;
+ CompressFileHandle *tocFH;
setFilePath(AH, fname, "toc.dat");
- tocFH = cfopen_read(fname, PG_BINARY_R);
+ tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
if (tocFH == NULL)
pg_fatal("could not open input file \"%s\": %m", fname);
@@ -218,7 +218,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
ReadToc(AH);
/* Nothing else in the file, so close it again... */
- if (cfclose(tocFH) != 0)
+ if (DestroyCompressFileHandle(tocFH) != 0)
pg_fatal("could not close TOC file: %m");
ctx->dataFH = NULL;
}
@@ -327,9 +327,9 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
setFilePath(AH, fname, tctx->filename);
- ctx->dataFH = cfopen_write(fname, PG_BINARY_W,
- AH->compress_spec);
- if (ctx->dataFH == NULL)
+ ctx->dataFH = InitCompressFileHandle(AH->compress_spec);
+
+ if (ctx->dataFH->open_write(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -346,15 +346,16 @@ static void
_WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH = ctx->dataFH;
errno = 0;
- if (dLen > 0 && cfwrite(data, dLen, ctx->dataFH) != dLen)
+ if (dLen > 0 && CFH->write(data, dLen, CFH) != dLen)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_fatal("could not write to output file: %s",
- get_cfp_error(ctx->dataFH));
+ CFH->get_error(CFH));
}
}
@@ -370,7 +371,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te)
lclContext *ctx = (lclContext *) AH->formatData;
/* Close the file */
- if (cfclose(ctx->dataFH) != 0)
+ if (DestroyCompressFileHandle(ctx->dataFH) != 0)
pg_fatal("could not close data file: %m");
ctx->dataFH = NULL;
@@ -385,26 +386,25 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
size_t cnt;
char *buf;
size_t buflen;
- cfp *cfp;
+ CompressFileHandle *CFH;
if (!filename)
return;
- cfp = cfopen_read(filename, PG_BINARY_R);
-
- if (!cfp)
+ CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+ if (!CFH)
pg_fatal("could not open input file \"%s\": %m", filename);
buf = pg_malloc(ZLIB_OUT_SIZE);
buflen = ZLIB_OUT_SIZE;
- while ((cnt = cfread(buf, buflen, cfp)))
+ while ((cnt = CFH->read(buf, buflen, CFH)))
{
ahwrite(buf, 1, cnt, AH);
}
free(buf);
- if (cfclose(cfp) != 0)
+ if (DestroyCompressFileHandle(CFH) != 0)
pg_fatal("could not close data file \"%s\": %m", filename);
}
@@ -435,6 +435,7 @@ _LoadBlobs(ArchiveHandle *AH)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH;
char tocfname[MAXPGPATH];
char line[MAXPGPATH];
@@ -442,14 +443,14 @@ _LoadBlobs(ArchiveHandle *AH)
setFilePath(AH, tocfname, "blobs.toc");
- ctx->blobsTocFH = cfopen_read(tocfname, PG_BINARY_R);
+ CFH = ctx->blobsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
if (ctx->blobsTocFH == NULL)
pg_fatal("could not open large object TOC file \"%s\" for input: %m",
tocfname);
/* Read the blobs TOC file line-by-line, and process each blob */
- while ((cfgets(ctx->blobsTocFH, line, MAXPGPATH)) != NULL)
+ while ((CFH->gets(line, MAXPGPATH, CFH)) != NULL)
{
char blobfname[MAXPGPATH + 1];
char path[MAXPGPATH];
@@ -464,11 +465,11 @@ _LoadBlobs(ArchiveHandle *AH)
_PrintFileData(AH, path);
EndRestoreBlob(AH, oid);
}
- if (!cfeof(ctx->blobsTocFH))
+ if (!CFH->eof(CFH))
pg_fatal("error reading large object TOC file \"%s\"",
tocfname);
- if (cfclose(ctx->blobsTocFH) != 0)
+ if (DestroyCompressFileHandle(ctx->blobsTocFH) != 0)
pg_fatal("could not close large object TOC file \"%s\": %m",
tocfname);
@@ -488,15 +489,16 @@ _WriteByte(ArchiveHandle *AH, const int i)
{
unsigned char c = (unsigned char) i;
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH = ctx->dataFH;
errno = 0;
- if (cfwrite(&c, 1, ctx->dataFH) != 1)
+ if (CFH->write(&c, 1, CFH) != 1)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_fatal("could not write to output file: %s",
- get_cfp_error(ctx->dataFH));
+ CFH->get_error(CFH));
}
return 1;
@@ -512,8 +514,9 @@ static int
_ReadByte(ArchiveHandle *AH)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH = ctx->dataFH;
- return cfgetc(ctx->dataFH);
+ return CFH->getc(CFH);
}
/*
@@ -524,15 +527,16 @@ static void
_WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH = ctx->dataFH;
errno = 0;
- if (cfwrite(buf, len, ctx->dataFH) != len)
+ if (CFH->write(buf, len, CFH) != len)
{
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
pg_fatal("could not write to output file: %s",
- get_cfp_error(ctx->dataFH));
+ CFH->get_error(CFH));
}
}
@@ -545,12 +549,13 @@ static void
_ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH = ctx->dataFH;
/*
- * If there was an I/O error, we already exited in cfread(), so here we
+ * If there was an I/O error, we already exited in readF(), so here we
* exit on short reads.
*/
- if (cfread(buf, len, ctx->dataFH) != len)
+ if (CFH->read(buf, len, CFH) != len)
pg_fatal("could not read from input file: end of file");
}
@@ -573,7 +578,7 @@ _CloseArchive(ArchiveHandle *AH)
if (AH->mode == archModeWrite)
{
- cfp *tocFH;
+ CompressFileHandle *tocFH;
pg_compress_specification compress_spec;
char fname[MAXPGPATH];
@@ -584,8 +589,8 @@ _CloseArchive(ArchiveHandle *AH)
/* The TOC is always created uncompressed */
compress_spec.algorithm = PG_COMPRESSION_NONE;
- tocFH = cfopen_write(fname, PG_BINARY_W, compress_spec);
- if (tocFH == NULL)
+ tocFH = InitCompressFileHandle(compress_spec);
+ if (tocFH->open_write(fname, PG_BINARY_W, tocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
ctx->dataFH = tocFH;
@@ -598,7 +603,7 @@ _CloseArchive(ArchiveHandle *AH)
WriteHead(AH);
AH->format = archDirectory;
WriteToc(AH);
- if (cfclose(tocFH) != 0)
+ if (DestroyCompressFileHandle(tocFH) != 0)
pg_fatal("could not close TOC file: %m");
WriteDataChunks(AH, ctx->pstate);
@@ -649,8 +654,8 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
/* The blob TOC file is never compressed */
compress_spec.algorithm = PG_COMPRESSION_NONE;
- ctx->blobsTocFH = cfopen_write(fname, "ab", compress_spec);
- if (ctx->blobsTocFH == NULL)
+ ctx->blobsTocFH = InitCompressFileHandle(compress_spec);
+ if (ctx->blobsTocFH->open_write(fname, "ab", ctx->blobsTocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -667,9 +672,8 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
- ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compress_spec);
-
- if (ctx->dataFH == NULL)
+ ctx->dataFH = InitCompressFileHandle(AH->compress_spec);
+ if (ctx->dataFH->open_write(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -682,17 +686,18 @@ static void
_EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ CompressFileHandle *CFH = ctx->blobsTocFH;
char buf[50];
int len;
/* Close the BLOB data file itself */
- if (cfclose(ctx->dataFH) != 0)
+ if (DestroyCompressFileHandle(ctx->dataFH) != 0)
pg_fatal("could not close blob data file: %m");
ctx->dataFH = NULL;
/* register the blob in blobs.toc */
len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
- if (cfwrite(buf, len, ctx->blobsTocFH) != len)
+ if (CFH->write(buf, len, CFH) != len)
pg_fatal("could not write to blobs TOC file");
}
@@ -706,7 +711,7 @@ _EndBlobs(ArchiveHandle *AH, TocEntry *te)
{
lclContext *ctx = (lclContext *) AH->formatData;
- if (cfclose(ctx->blobsTocFH) != 0)
+ if (DestroyCompressFileHandle(ctx->blobsTocFH) != 0)
pg_fatal("could not close blobs TOC file: %m");
ctx->blobsTocFH = NULL;
}
--
2.34.1
v7-0001-Prepare-pg_dump-for-additional-compression-method.patchtext/x-patch; name=v7-0001-Prepare-pg_dump-for-additional-compression-method.patchDownload
From bf5deb8e3a770e26da876b2268832ab1a07bbb84 Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokolatos@pm.me>
Date: Tue, 5 Jul 2022 12:33:28 +0000
Subject: [PATCH v7 1/3] Prepare pg_dump for additional compression methods
This commmit does some of the heavy lifting required for additional compression
methods.
First it is teaching pg_dump.c about the definitions and interfaces found in
common/compression.h. Then it is propagating those throughout the code.
Commit bf9aa490db introduced cfp in compress_io.{c,h} with the intent of
unifying compression related code and allow for the introduction of
additional archive formats. However, pg_backup_archiver.c was not using that
API. This commit teaches pg_backup_archiver.c about cfp and is using it through
out.
---
doc/src/sgml/ref/pg_dump.sgml | 30 +-
src/bin/pg_dump/compress_io.c | 427 ++++++++++++++++----------
src/bin/pg_dump/compress_io.h | 35 ++-
src/bin/pg_dump/pg_backup.h | 7 +-
src/bin/pg_dump/pg_backup_archiver.c | 178 +++++------
src/bin/pg_dump/pg_backup_archiver.h | 37 +--
src/bin/pg_dump/pg_backup_custom.c | 6 +-
src/bin/pg_dump/pg_backup_directory.c | 13 +-
src/bin/pg_dump/pg_backup_tar.c | 12 +-
src/bin/pg_dump/pg_dump.c | 151 +++++++--
src/bin/pg_dump/t/001_basic.pl | 10 +
src/bin/pg_dump/t/002_pg_dump.pl | 2 +-
12 files changed, 552 insertions(+), 356 deletions(-)
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 5efb442b44..94885d0812 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -644,17 +644,31 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
- <term><option>-Z <replaceable class="parameter">0..9</replaceable></option></term>
- <term><option>--compress=<replaceable class="parameter">0..9</replaceable></option></term>
+ <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+ <term><option>-Z <replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>]</term>
+ <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+ <term><option>--compress=<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>]</term>
<listitem>
<para>
- Specify the compression level to use. Zero means no compression.
+ Specify the compression method and/or the compression level to use.
+ The compression method can be set to <literal>gzip</literal> or
+ <literal>none</literal> for no compression. A compression level can
+ be optionally specified, by appending the level number after a colon
+ (<literal>:</literal>). If no level is specified, the default compression
+ level will be used for the specified method. If only a level is
+ specified without mentioning a method, <literal>gzip</literal> compression
+ will be used.
+ </para>
+
+ <para>
For the custom and directory archive formats, this specifies compression of
- individual table-data segments, and the default is to compress
- at a moderate level.
- For plain text output, setting a nonzero compression level causes
- the entire output file to be compressed, as though it had been
- fed through <application>gzip</application>; but the default is not to compress.
+ individual table-data segments, and the default is to compress using
+ <literal>gzip</literal> at a moderate level. For plain text output,
+ setting a nonzero compression level causes the entire output file to be compressed,
+ as though it had been fed through <application>gzip</application>; but the default
+ is not to compress.
+ </para>
+ <para>
The tar archive format currently does not support compression at all.
</para>
</listitem>
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 62f940ff7a..146396172b 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -64,7 +64,7 @@
/* typedef appears in compress_io.h */
struct CompressorState
{
- CompressionAlgorithm comprAlg;
+ pg_compress_algorithm compress_algorithm;
WriteFunc writeF;
#ifdef HAVE_LIBZ
@@ -74,9 +74,6 @@ struct CompressorState
#endif
};
-static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
- int *level);
-
/* Routines that support zlib compressed data I/O */
#ifdef HAVE_LIBZ
static void InitCompressorZlib(CompressorState *cs, int level);
@@ -93,57 +90,30 @@ static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
const char *data, size_t dLen);
-/*
- * Interprets a numeric 'compression' value. The algorithm implied by the
- * value (zlib or none at the moment), is returned in *alg, and the
- * zlib compression level in *level.
- */
-static void
-ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
-{
- if (compression == Z_DEFAULT_COMPRESSION ||
- (compression > 0 && compression <= 9))
- *alg = COMPR_ALG_LIBZ;
- else if (compression == 0)
- *alg = COMPR_ALG_NONE;
- else
- {
- pg_fatal("invalid compression code: %d", compression);
- *alg = COMPR_ALG_NONE; /* keep compiler quiet */
- }
-
- /* The level is just the passed-in value. */
- if (level)
- *level = compression;
-}
-
/* Public interface routines */
/* Allocate a new compressor */
CompressorState *
-AllocateCompressor(int compression, WriteFunc writeF)
+AllocateCompressor(const pg_compress_specification compress_spec,
+ WriteFunc writeF)
{
CompressorState *cs;
- CompressionAlgorithm alg;
- int level;
-
- ParseCompressionOption(compression, &alg, &level);
#ifndef HAVE_LIBZ
- if (alg == COMPR_ALG_LIBZ)
+ if (compress_spec.algorithm == PG_COMPRESSION_GZIP)
pg_fatal("not built with zlib support");
#endif
cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
cs->writeF = writeF;
- cs->comprAlg = alg;
+ cs->compress_algorithm = compress_spec.algorithm;
/*
* Perform compression algorithm specific initialization.
*/
#ifdef HAVE_LIBZ
- if (alg == COMPR_ALG_LIBZ)
- InitCompressorZlib(cs, level);
+ if (cs->compress_algorithm == PG_COMPRESSION_GZIP)
+ InitCompressorZlib(cs, compress_spec.level);
#endif
return cs;
@@ -154,21 +124,24 @@ AllocateCompressor(int compression, WriteFunc writeF)
* out with ahwrite().
*/
void
-ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
+ReadDataFromArchive(ArchiveHandle *AH, pg_compress_specification compress_spec,
+ ReadFunc readF)
{
- CompressionAlgorithm alg;
-
- ParseCompressionOption(compression, &alg, NULL);
-
- if (alg == COMPR_ALG_NONE)
- ReadDataFromArchiveNone(AH, readF);
- if (alg == COMPR_ALG_LIBZ)
+ switch (compress_spec.algorithm)
{
+ case PG_COMPRESSION_NONE:
+ ReadDataFromArchiveNone(AH, readF);
+ break;
+ case PG_COMPRESSION_GZIP:
#ifdef HAVE_LIBZ
- ReadDataFromArchiveZlib(AH, readF);
+ ReadDataFromArchiveZlib(AH, readF);
#else
- pg_fatal("not built with zlib support");
+ pg_fatal("not built with zlib support");
#endif
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
}
}
@@ -179,18 +152,21 @@ void
WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen)
{
- switch (cs->comprAlg)
+ switch (cs->compress_algorithm)
{
- case COMPR_ALG_LIBZ:
+ case PG_COMPRESSION_GZIP:
#ifdef HAVE_LIBZ
WriteDataToArchiveZlib(AH, cs, data, dLen);
#else
pg_fatal("not built with zlib support");
#endif
break;
- case COMPR_ALG_NONE:
+ case PG_COMPRESSION_NONE:
WriteDataToArchiveNone(AH, cs, data, dLen);
break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
}
}
@@ -200,11 +176,23 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
void
EndCompressor(ArchiveHandle *AH, CompressorState *cs)
{
+ switch (cs->compress_algorithm)
+ {
+ case PG_COMPRESSION_GZIP:
#ifdef HAVE_LIBZ
- if (cs->comprAlg == COMPR_ALG_LIBZ)
- EndCompressorZlib(AH, cs);
+ EndCompressorZlib(AH, cs);
+#else
+ pg_fatal("not built with zlib support");
#endif
- free(cs);
+ break;
+ case PG_COMPRESSION_NONE:
+ free(cs);
+ break;
+
+ default:
+ pg_fatal("invalid compression method");
+ break;
+ }
}
/* Private routines, specific to each compression method. */
@@ -418,10 +406,8 @@ WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
*/
struct cfp
{
- FILE *uncompressedfp;
-#ifdef HAVE_LIBZ
- gzFile compressedfp;
-#endif
+ pg_compress_algorithm compress_algorithm;
+ void *fp;
};
#ifdef HAVE_LIBZ
@@ -452,21 +438,25 @@ cfp *
cfopen_read(const char *path, const char *mode)
{
cfp *fp;
+ pg_compress_specification compress_spec = {0};
+ compress_spec.algorithm = PG_COMPRESSION_GZIP;
#ifdef HAVE_LIBZ
if (hasSuffix(path, ".gz"))
- fp = cfopen(path, mode, 1);
+ fp = cfopen(path, mode, compress_spec);
else
#endif
{
- fp = cfopen(path, mode, 0);
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
+ fp = cfopen(path, mode, compress_spec);
#ifdef HAVE_LIBZ
if (fp == NULL)
{
char *fname;
+ compress_spec.algorithm = PG_COMPRESSION_GZIP;
fname = psprintf("%s.gz", path);
- fp = cfopen(fname, mode, 1);
+ fp = cfopen(fname, mode, compress_spec);
free_keep_errno(fname);
}
#endif
@@ -479,26 +469,27 @@ cfopen_read(const char *path, const char *mode)
* be a filemode as accepted by fopen() and gzopen() that indicates writing
* ("w", "wb", "a", or "ab").
*
- * If 'compression' is non-zero, a gzip compressed stream is opened, and
- * 'compression' indicates the compression level used. The ".gz" suffix
- * is automatically added to 'path' in that case.
+ * If 'compress_spec.algorithm' is GZIP, a gzip compressed stream is opened,
+ * and 'compress_spec.level' used. The ".gz" suffix is automatically added to
+ * 'path' in that case.
*
* On failure, return NULL with an error code in errno.
*/
cfp *
-cfopen_write(const char *path, const char *mode, int compression)
+cfopen_write(const char *path, const char *mode,
+ const pg_compress_specification compress_spec)
{
cfp *fp;
- if (compression == 0)
- fp = cfopen(path, mode, 0);
+ if (compress_spec.algorithm == PG_COMPRESSION_NONE)
+ fp = cfopen(path, mode, compress_spec);
else
{
#ifdef HAVE_LIBZ
char *fname;
fname = psprintf("%s.gz", path);
- fp = cfopen(fname, mode, compression);
+ fp = cfopen(fname, mode, compress_spec);
free_keep_errno(fname);
#else
pg_fatal("not built with zlib support");
@@ -509,60 +500,96 @@ cfopen_write(const char *path, const char *mode, int compression)
}
/*
- * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
- * is opened with libz gzopen(), otherwise with plain fopen().
+ * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or
+ * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The
+ * descriptor is not dup'ed and it is the caller's responsibility to do so.
+ * The caller must verify that the 'compress_algorithm' is supported by the
+ * current build.
*
* On failure, return NULL with an error code in errno.
*/
-cfp *
-cfopen(const char *path, const char *mode, int compression)
+static cfp *
+cfopen_internal(const char *path, int fd, const char *mode,
+ pg_compress_algorithm compress_algorithm, int compressionLevel)
{
cfp *fp = pg_malloc(sizeof(cfp));
- if (compression != 0)
+ fp->compress_algorithm = compress_algorithm;
+
+ switch (compress_algorithm)
{
-#ifdef HAVE_LIBZ
- if (compression != Z_DEFAULT_COMPRESSION)
- {
- /* user has specified a compression level, so tell zlib to use it */
- char mode_compression[32];
+ case PG_COMPRESSION_NONE:
+ if (fd >= 0)
+ fp->fp = fdopen(fd, mode);
+ else
+ fp->fp = fopen(path, mode);
+ if (fp->fp == NULL)
+ {
+ free_keep_errno(fp);
+ fp = NULL;
+ }
- snprintf(mode_compression, sizeof(mode_compression), "%s%d",
- mode, compression);
- fp->compressedfp = gzopen(path, mode_compression);
- }
- else
- {
- /* don't specify a level, just use the zlib default */
- fp->compressedfp = gzopen(path, mode);
- }
+ break;
+ case PG_COMPRESSION_GZIP:
+#ifdef HAVE_LIBZ
+ if (compressionLevel != Z_DEFAULT_COMPRESSION)
+ {
+ /*
+ * user has specified a compression level, so tell zlib to use
+ * it
+ */
+ char mode_compression[32];
+
+ snprintf(mode_compression, sizeof(mode_compression), "%s%d",
+ mode, compressionLevel);
+ if (fd >= 0)
+ fp->fp = gzdopen(fd, mode_compression);
+ else
+ fp->fp = gzopen(path, mode_compression);
+ }
+ else
+ {
+ /* don't specify a level, just use the zlib default */
+ if (fd >= 0)
+ fp->fp = gzdopen(fd, mode);
+ else
+ fp->fp = gzopen(path, mode);
+ }
- fp->uncompressedfp = NULL;
- if (fp->compressedfp == NULL)
- {
- free_keep_errno(fp);
- fp = NULL;
- }
+ if (fp->fp == NULL)
+ {
+ free_keep_errno(fp);
+ fp = NULL;
+ }
#else
- pg_fatal("not built with zlib support");
-#endif
- }
- else
- {
-#ifdef HAVE_LIBZ
- fp->compressedfp = NULL;
+ pg_fatal("not built with zlib support");
#endif
- fp->uncompressedfp = fopen(path, mode);
- if (fp->uncompressedfp == NULL)
- {
- free_keep_errno(fp);
- fp = NULL;
- }
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
}
return fp;
}
+cfp *
+cfopen(const char *path, const char *mode,
+ const pg_compress_specification compress_spec)
+{
+ return cfopen_internal(path, -1, mode,
+ compress_spec.algorithm,
+ compress_spec.level);
+}
+
+cfp *
+cfdopen(int fd, const char *mode,
+ const pg_compress_specification compress_spec)
+{
+ return cfopen_internal(NULL, fd, mode,
+ compress_spec.algorithm,
+ compress_spec.level);
+}
int
cfread(void *ptr, int size, cfp *fp)
@@ -572,38 +599,61 @@ cfread(void *ptr, int size, cfp *fp)
if (size == 0)
return 0;
-#ifdef HAVE_LIBZ
- if (fp->compressedfp)
+ switch (fp->compress_algorithm)
{
- ret = gzread(fp->compressedfp, ptr, size);
- if (ret != size && !gzeof(fp->compressedfp))
- {
- int errnum;
- const char *errmsg = gzerror(fp->compressedfp, &errnum);
+ case PG_COMPRESSION_NONE:
+ ret = fread(ptr, 1, size, fp->fp);
+ if (ret != size && !feof(fp->fp))
+ READ_ERROR_EXIT(fp->fp);
- pg_fatal("could not read from input file: %s",
- errnum == Z_ERRNO ? strerror(errno) : errmsg);
- }
- }
- else
+ break;
+ case PG_COMPRESSION_GZIP:
+#ifdef HAVE_LIBZ
+ ret = gzread(fp->fp, ptr, size);
+ if (ret != size && !gzeof(fp->fp))
+ {
+ int errnum;
+ const char *errmsg = gzerror(fp->fp, &errnum);
+
+ pg_fatal("could not read from input file: %s",
+ errnum == Z_ERRNO ? strerror(errno) : errmsg);
+ }
+#else
+ pg_fatal("not built with zlib support");
#endif
- {
- ret = fread(ptr, 1, size, fp->uncompressedfp);
- if (ret != size && !feof(fp->uncompressedfp))
- READ_ERROR_EXIT(fp->uncompressedfp);
+ break;
+
+ default:
+ pg_fatal("invalid compression method");
+ break;
}
+
return ret;
}
int
cfwrite(const void *ptr, int size, cfp *fp)
{
+ int ret = 0;
+
+ switch (fp->compress_algorithm)
+ {
+ case PG_COMPRESSION_NONE:
+ ret = fwrite(ptr, 1, size, fp->fp);
+ break;
+ case PG_COMPRESSION_GZIP:
#ifdef HAVE_LIBZ
- if (fp->compressedfp)
- return gzwrite(fp->compressedfp, ptr, size);
- else
+ ret = gzwrite(fp->fp, ptr, size);
+#else
+ pg_fatal("not built with zlib support");
#endif
- return fwrite(ptr, 1, size, fp->uncompressedfp);
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
+ }
+
+ return ret;
}
int
@@ -611,24 +661,31 @@ cfgetc(cfp *fp)
{
int ret;
-#ifdef HAVE_LIBZ
- if (fp->compressedfp)
+ switch (fp->compress_algorithm)
{
- ret = gzgetc(fp->compressedfp);
- if (ret == EOF)
- {
- if (!gzeof(fp->compressedfp))
- pg_fatal("could not read from input file: %s", strerror(errno));
- else
- pg_fatal("could not read from input file: end of file");
- }
- }
- else
+ case PG_COMPRESSION_NONE:
+ ret = fgetc(fp->fp);
+ if (ret == EOF)
+ READ_ERROR_EXIT(fp->fp);
+
+ break;
+ case PG_COMPRESSION_GZIP:
+#ifdef HAVE_LIBZ
+ ret = gzgetc((gzFile) fp->fp);
+ if (ret == EOF)
+ {
+ if (!gzeof(fp->fp))
+ pg_fatal("could not read from input file: %s", strerror(errno));
+ else
+ pg_fatal("could not read from input file: end of file");
+ }
+#else
+ pg_fatal("not built with zlib support");
#endif
- {
- ret = fgetc(fp->uncompressedfp);
- if (ret == EOF)
- READ_ERROR_EXIT(fp->uncompressedfp);
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
}
return ret;
@@ -637,65 +694,107 @@ cfgetc(cfp *fp)
char *
cfgets(cfp *fp, char *buf, int len)
{
+ char *ret;
+
+ switch (fp->compress_algorithm)
+ {
+ case PG_COMPRESSION_NONE:
+ ret = fgets(buf, len, fp->fp);
+
+ break;
+ case PG_COMPRESSION_GZIP:
#ifdef HAVE_LIBZ
- if (fp->compressedfp)
- return gzgets(fp->compressedfp, buf, len);
- else
+ ret = gzgets(fp->fp, buf, len);
+#else
+ pg_fatal("not built with zlib support");
#endif
- return fgets(buf, len, fp->uncompressedfp);
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
+ }
+
+ return ret;
}
int
cfclose(cfp *fp)
{
- int result;
+ int ret;
if (fp == NULL)
{
errno = EBADF;
return EOF;
}
-#ifdef HAVE_LIBZ
- if (fp->compressedfp)
+
+ switch (fp->compress_algorithm)
{
- result = gzclose(fp->compressedfp);
- fp->compressedfp = NULL;
- }
- else
+ case PG_COMPRESSION_NONE:
+ ret = fclose(fp->fp);
+ fp->fp = NULL;
+
+ break;
+ case PG_COMPRESSION_GZIP:
+#ifdef HAVE_LIBZ
+ ret = gzclose(fp->fp);
+ fp->fp = NULL;
+#else
+ pg_fatal("not built with zlib support");
#endif
- {
- result = fclose(fp->uncompressedfp);
- fp->uncompressedfp = NULL;
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
}
+
free_keep_errno(fp);
- return result;
+ return ret;
}
int
cfeof(cfp *fp)
{
+ int ret;
+
+ switch (fp->compress_algorithm)
+ {
+ case PG_COMPRESSION_NONE:
+ ret = feof(fp->fp);
+
+ break;
+ case PG_COMPRESSION_GZIP:
#ifdef HAVE_LIBZ
- if (fp->compressedfp)
- return gzeof(fp->compressedfp);
- else
+ ret = gzeof(fp->fp);
+#else
+ pg_fatal("not built with zlib support");
#endif
- return feof(fp->uncompressedfp);
+ break;
+ default:
+ pg_fatal("invalid compression method");
+ break;
+ }
+
+ return ret;
}
const char *
get_cfp_error(cfp *fp)
{
-#ifdef HAVE_LIBZ
- if (fp->compressedfp)
+ if (fp->compress_algorithm == PG_COMPRESSION_GZIP)
{
+#ifdef HAVE_LIBZ
int errnum;
- const char *errmsg = gzerror(fp->compressedfp, &errnum);
+ const char *errmsg = gzerror(fp->fp, &errnum);
if (errnum != Z_ERRNO)
return errmsg;
- }
+#else
+ pg_fatal("not built with zlib support");
#endif
+ }
+
return strerror(errno);
}
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index f635787692..2b42f030a8 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -17,16 +17,25 @@
#include "pg_backup_archiver.h"
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#define GZCLOSE(fh) gzclose(fh)
+#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s))
+#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s))
+#define GZEOF(fh) gzeof(fh)
+#else
+#define GZCLOSE(fh) fclose(fh)
+#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
+#define GZREAD(p, s, n, fh) fread(p, s, n, fh)
+#define GZEOF(fh) feof(fh)
+/* this is just the redefinition of a libz constant */
+#define Z_DEFAULT_COMPRESSION (-1)
+#endif
+
/* Initial buffer sizes used in zlib compression. */
#define ZLIB_OUT_SIZE 4096
#define ZLIB_IN_SIZE 4096
-typedef enum
-{
- COMPR_ALG_NONE,
- COMPR_ALG_LIBZ
-} CompressionAlgorithm;
-
/* Prototype for callback function to WriteDataToArchive() */
typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len);
@@ -46,8 +55,10 @@ typedef size_t (*ReadFunc) (ArchiveHandle *AH, char **buf, size_t *buflen);
/* struct definition appears in compress_io.c */
typedef struct CompressorState CompressorState;
-extern CompressorState *AllocateCompressor(int compression, WriteFunc writeF);
-extern void ReadDataFromArchive(ArchiveHandle *AH, int compression,
+extern CompressorState *AllocateCompressor(const pg_compress_specification compress_spec,
+ WriteFunc writeF);
+extern void ReadDataFromArchive(ArchiveHandle *AH,
+ const pg_compress_specification compress_spec,
ReadFunc readF);
extern void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen);
@@ -56,9 +67,13 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs);
typedef struct cfp cfp;
-extern cfp *cfopen(const char *path, const char *mode, int compression);
+extern cfp *cfopen(const char *path, const char *mode,
+ const pg_compress_specification compress_spec);
+extern cfp *cfdopen(int fd, const char *mode,
+ pg_compress_specification compress_spec);
extern cfp *cfopen_read(const char *path, const char *mode);
-extern cfp *cfopen_write(const char *path, const char *mode, int compression);
+extern cfp *cfopen_write(const char *path, const char *mode,
+ const pg_compress_specification compress_spec);
extern int cfread(void *ptr, int size, cfp *fp);
extern int cfwrite(const void *ptr, int size, cfp *fp);
extern int cfgetc(cfp *fp);
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index fcc5f6bd05..cf9fad4706 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -23,6 +23,7 @@
#ifndef PG_BACKUP_H
#define PG_BACKUP_H
+#include "common/compression.h"
#include "fe_utils/simple_list.h"
#include "libpq-fe.h"
@@ -143,7 +144,8 @@ typedef struct _restoreOptions
int noDataForFailedTables;
int exit_on_error;
- int compression;
+ pg_compress_specification compress_spec; /* Specification for
+ * compression */
int suppressDumpWarnings; /* Suppress output of WARNING entries
* to stderr */
bool single_txn;
@@ -303,7 +305,8 @@ extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
/* Create a new archive */
extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, bool dosync, ArchiveMode mode,
+ const pg_compress_specification compress_spec,
+ bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker);
/* The --list option */
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 233198afc0..fa84c31ecb 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -31,6 +31,7 @@
#endif
#include "common/string.h"
+#include "compress_io.h"
#include "dumputils.h"
#include "fe_utils/string_utils.h"
#include "lib/stringinfo.h"
@@ -43,13 +44,6 @@
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
-/* state needed to save/restore an archive's output target */
-typedef struct _outputContext
-{
- void *OF;
- int gzOut;
-} OutputContext;
-
/*
* State for tracking TocEntrys that are ready to process during a parallel
* restore. (This used to be a list, and we still call it that, though now
@@ -70,7 +64,8 @@ typedef struct _parallelReadyList
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, bool dosync, ArchiveMode mode,
+ const pg_compress_specification compress_spec,
+ bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupWorkerPtr);
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te);
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
@@ -98,9 +93,10 @@ static int _discoverArchiveFormat(ArchiveHandle *AH);
static int RestoringToDB(ArchiveHandle *AH);
static void dump_lo_buf(ArchiveHandle *AH);
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
-static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
-static OutputContext SaveOutput(ArchiveHandle *AH);
-static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
+static void SetOutput(ArchiveHandle *AH, const char *filename,
+ const pg_compress_specification compress_spec);
+static cfp *SaveOutput(ArchiveHandle *AH);
+static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput);
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
static void restore_toc_entries_prefork(ArchiveHandle *AH,
@@ -239,12 +235,13 @@ setupRestoreWorker(Archive *AHX)
/* Public */
Archive *
CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, bool dosync, ArchiveMode mode,
+ const pg_compress_specification compress_spec,
+ bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker)
{
- ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
- mode, setupDumpWorker);
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, compress_spec,
+ dosync, mode, setupDumpWorker);
return (Archive *) AH;
}
@@ -254,7 +251,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
Archive *
OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
{
- ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
+ ArchiveHandle *AH;
+ pg_compress_specification compress_spec;
+
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
+ AH = _allocAH(FileSpec, fmt, compress_spec, true,
+ archModeRead, setupRestoreWorker);
return (Archive *) AH;
}
@@ -269,11 +271,8 @@ CloseArchive(Archive *AHX)
AH->ClosePtr(AH);
/* Close the output */
- errno = 0; /* in case gzclose() doesn't set it */
- if (AH->gzOut)
- res = GZCLOSE(AH->OF);
- else if (AH->OF != stdout)
- res = fclose(AH->OF);
+ errno = 0;
+ res = cfclose(AH->OF);
if (res != 0)
pg_fatal("could not close output file: %m");
@@ -355,7 +354,7 @@ RestoreArchive(Archive *AHX)
RestoreOptions *ropt = AH->public.ropt;
bool parallel_mode;
TocEntry *te;
- OutputContext sav;
+ cfp *sav;
AH->stage = STAGE_INITIALIZING;
@@ -384,7 +383,8 @@ RestoreArchive(Archive *AHX)
* Make sure we won't need (de)compression we haven't got
*/
#ifndef HAVE_LIBZ
- if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP &&
+ AH->PrintTocDataPtr != NULL)
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
@@ -459,8 +459,8 @@ RestoreArchive(Archive *AHX)
* Setup the output file if necessary.
*/
sav = SaveOutput(AH);
- if (ropt->filename || ropt->compression)
- SetOutput(AH, ropt->filename, ropt->compression);
+ if (ropt->filename || ropt->compress_spec.algorithm != PG_COMPRESSION_NONE)
+ SetOutput(AH, ropt->filename, ropt->compress_spec);
ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
@@ -739,7 +739,7 @@ RestoreArchive(Archive *AHX)
*/
AH->stage = STAGE_FINALIZING;
- if (ropt->filename || ropt->compression)
+ if (ropt->filename || ropt->compress_spec.algorithm != PG_COMPRESSION_NONE)
RestoreOutput(AH, sav);
if (ropt->useDB)
@@ -969,6 +969,8 @@ NewRestoreOptions(void)
opts->format = archUnknown;
opts->cparams.promptPassword = TRI_DEFAULT;
opts->dumpSections = DUMP_UNSECTIONED;
+ opts->compress_spec.algorithm = PG_COMPRESSION_NONE;
+ opts->compress_spec.level = INT_MIN;
return opts;
}
@@ -1115,23 +1117,28 @@ PrintTOCSummary(Archive *AHX)
ArchiveHandle *AH = (ArchiveHandle *) AHX;
RestoreOptions *ropt = AH->public.ropt;
TocEntry *te;
+ pg_compress_specification out_compress_spec;
teSection curSection;
- OutputContext sav;
+ cfp *sav;
const char *fmtName;
char stamp_str[64];
+ /* TOC is always uncompressed */
+ out_compress_spec.algorithm = PG_COMPRESSION_NONE;
+
sav = SaveOutput(AH);
if (ropt->filename)
- SetOutput(AH, ropt->filename, 0 /* no compression */ );
+ SetOutput(AH, ropt->filename, out_compress_spec);
if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
localtime(&AH->createDate)) == 0)
strcpy(stamp_str, "[unknown]");
ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
- ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
+ ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
sanitize_line(AH->archdbname, false),
- AH->tocCount, AH->compression);
+ AH->tocCount,
+ get_compress_algorithm_name(AH->compress_spec.algorithm));
switch (AH->format)
{
@@ -1485,60 +1492,35 @@ archprintf(Archive *AH, const char *fmt,...)
*******************************/
static void
-SetOutput(ArchiveHandle *AH, const char *filename, int compression)
+SetOutput(ArchiveHandle *AH, const char *filename,
+ const pg_compress_specification compress_spec)
{
- int fn;
+ const char *mode;
+ int fn = -1;
if (filename)
{
if (strcmp(filename, "-") == 0)
fn = fileno(stdout);
- else
- fn = -1;
}
else if (AH->FH)
fn = fileno(AH->FH);
else if (AH->fSpec)
{
- fn = -1;
filename = AH->fSpec;
}
else
fn = fileno(stdout);
- /* If compression explicitly requested, use gzopen */
-#ifdef HAVE_LIBZ
- if (compression != 0)
- {
- char fmode[14];
+ if (AH->mode == archModeAppend)
+ mode = PG_BINARY_A;
+ else
+ mode = PG_BINARY_W;
- /* Don't use PG_BINARY_x since this is zlib */
- sprintf(fmode, "wb%d", compression);
- if (fn >= 0)
- AH->OF = gzdopen(dup(fn), fmode);
- else
- AH->OF = gzopen(filename, fmode);
- AH->gzOut = 1;
- }
+ if (fn >= 0)
+ AH->OF = cfdopen(dup(fn), mode, compress_spec);
else
-#endif
- { /* Use fopen */
- if (AH->mode == archModeAppend)
- {
- if (fn >= 0)
- AH->OF = fdopen(dup(fn), PG_BINARY_A);
- else
- AH->OF = fopen(filename, PG_BINARY_A);
- }
- else
- {
- if (fn >= 0)
- AH->OF = fdopen(dup(fn), PG_BINARY_W);
- else
- AH->OF = fopen(filename, PG_BINARY_W);
- }
- AH->gzOut = 0;
- }
+ AH->OF = cfopen(filename, mode, compress_spec);
if (!AH->OF)
{
@@ -1549,33 +1531,24 @@ SetOutput(ArchiveHandle *AH, const char *filename, int compression)
}
}
-static OutputContext
+static cfp *
SaveOutput(ArchiveHandle *AH)
{
- OutputContext sav;
-
- sav.OF = AH->OF;
- sav.gzOut = AH->gzOut;
-
- return sav;
+ return (cfp *) AH->OF;
}
static void
-RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
+RestoreOutput(ArchiveHandle *AH, cfp *savedOutput)
{
int res;
- errno = 0; /* in case gzclose() doesn't set it */
- if (AH->gzOut)
- res = GZCLOSE(AH->OF);
- else
- res = fclose(AH->OF);
+ errno = 0;
+ res = cfclose(AH->OF);
if (res != 0)
pg_fatal("could not close output file: %m");
- AH->gzOut = savedContext.gzOut;
- AH->OF = savedContext.OF;
+ AH->OF = savedOutput;
}
@@ -1699,22 +1672,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
bytes_written = size * nmemb;
}
- else if (AH->gzOut)
- bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
else if (AH->CustomOutPtr)
bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
+ /*
+ * If we're doing a restore, and it's direct to DB, and we're connected
+ * then send it to the DB.
+ */
+ else if (RestoringToDB(AH))
+ bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
else
- {
- /*
- * If we're doing a restore, and it's direct to DB, and we're
- * connected then send it to the DB.
- */
- if (RestoringToDB(AH))
- bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
- else
- bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
- }
+ bytes_written = cfwrite(ptr, size * nmemb, AH->OF);
if (bytes_written != size * nmemb)
WRITE_ERROR_EXIT;
@@ -2198,10 +2166,12 @@ _discoverArchiveFormat(ArchiveHandle *AH)
*/
static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
- const int compression, bool dosync, ArchiveMode mode,
+ const pg_compress_specification compress_spec,
+ bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupWorkerPtr)
{
ArchiveHandle *AH;
+ pg_compress_specification out_compress_spec = {0};
pg_log_debug("allocating AH for %s, format %d",
FileSpec ? FileSpec : "(stdio)", fmt);
@@ -2249,14 +2219,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->toc->prev = AH->toc;
AH->mode = mode;
- AH->compression = compression;
+ AH->compress_spec = compress_spec;
AH->dosync = dosync;
memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
/* Open stdout with no compression for AH output handle */
- AH->gzOut = 0;
- AH->OF = stdout;
+ out_compress_spec.algorithm = PG_COMPRESSION_NONE;
+ AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec);
/*
* On Windows, we need to use binary mode to read/write non-text files,
@@ -2264,7 +2234,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
* Force stdin/stdout into binary mode if that is what we are using.
*/
#ifdef WIN32
- if ((fmt != archNull || compression != 0) &&
+ if ((fmt != archNull || compress_spec.algorithm != PG_COMPRESSION_NONE) &&
(AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
{
if (mode == archModeWrite)
@@ -3705,7 +3675,7 @@ WriteHead(ArchiveHandle *AH)
AH->WriteBytePtr(AH, AH->intSize);
AH->WriteBytePtr(AH, AH->offSize);
AH->WriteBytePtr(AH, AH->format);
- WriteInt(AH, AH->compression);
+ WriteInt(AH, AH->compress_spec.level);
crtm = *localtime(&AH->createDate);
WriteInt(AH, crtm.tm_sec);
WriteInt(AH, crtm.tm_min);
@@ -3776,21 +3746,25 @@ ReadHead(ArchiveHandle *AH)
pg_fatal("expected format (%d) differs from format found in file (%d)",
AH->format, fmt);
+ AH->compress_spec.algorithm = PG_COMPRESSION_NONE;
if (AH->version >= K_VERS_1_2)
{
if (AH->version < K_VERS_1_4)
- AH->compression = AH->ReadBytePtr(AH);
+ AH->compress_spec.level = AH->ReadBytePtr(AH);
else
- AH->compression = ReadInt(AH);
+ AH->compress_spec.level = ReadInt(AH);
}
else
- AH->compression = Z_DEFAULT_COMPRESSION;
+ AH->compress_spec.level = Z_DEFAULT_COMPRESSION;
+ if (AH->compress_spec.level != INT_MIN)
#ifndef HAVE_LIBZ
- if (AH->compression != 0)
pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
+#else
+ AH->compress_spec.algorithm = PG_COMPRESSION_GZIP;
#endif
+
if (AH->version >= K_VERS_1_4)
{
struct tm crtm;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 084cd87e8d..341d406515 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -32,30 +32,6 @@
#define LOBBUFSIZE 16384
-#ifdef HAVE_LIBZ
-#include <zlib.h>
-#define GZCLOSE(fh) gzclose(fh)
-#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s))
-#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s))
-#define GZEOF(fh) gzeof(fh)
-#else
-#define GZCLOSE(fh) fclose(fh)
-#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
-#define GZREAD(p, s, n, fh) fread(p, s, n, fh)
-#define GZEOF(fh) feof(fh)
-/* this is just the redefinition of a libz constant */
-#define Z_DEFAULT_COMPRESSION (-1)
-
-typedef struct _z_stream
-{
- void *next_in;
- void *next_out;
- size_t avail_in;
- size_t avail_out;
-} z_stream;
-typedef z_stream *z_streamp;
-#endif
-
/* Data block types */
#define BLK_DATA 1
#define BLK_BLOBS 3
@@ -319,8 +295,7 @@ struct _archiveHandle
char *fSpec; /* Archive File Spec */
FILE *FH; /* General purpose file handle */
- void *OF;
- int gzOut; /* Output file */
+ void *OF; /* Output file */
struct _tocEntry *toc; /* Header of circular list of TOC entries */
int tocCount; /* Number of TOC entries */
@@ -331,14 +306,8 @@ struct _archiveHandle
DumpId *tableDataId; /* TABLE DATA ids, indexed by table dumpId */
struct _tocEntry *currToc; /* Used when dumping data */
- int compression; /*---------
- * Compression requested on open().
- * Possible values for compression:
- * -1 Z_DEFAULT_COMPRESSION
- * 0 COMPRESSION_NONE
- * 1-9 levels for gzip compression
- *---------
- */
+ pg_compress_specification compress_spec; /* Requested specification for
+ * compression */
bool dosync; /* data requested to be synced on sight */
ArchiveMode mode; /* File mode - r or w */
void *formatData; /* Header data specific to file format */
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 1023fea01b..43ccbe1339 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -298,7 +298,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
_WriteByte(AH, BLK_DATA); /* Block type */
WriteInt(AH, te->dumpId); /* For sanity check */
- ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
+ ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc);
}
/*
@@ -377,7 +377,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
WriteInt(AH, oid);
- ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
+ ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc);
}
/*
@@ -566,7 +566,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void
_PrintData(ArchiveHandle *AH)
{
- ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
+ ReadDataFromArchive(AH, AH->compress_spec, _CustomReadFunc);
}
static void
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 3f46f7988a..17c7130c75 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -327,7 +327,8 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
setFilePath(AH, fname, tctx->filename);
- ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
+ ctx->dataFH = cfopen_write(fname, PG_BINARY_W,
+ AH->compress_spec);
if (ctx->dataFH == NULL)
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -573,6 +574,7 @@ _CloseArchive(ArchiveHandle *AH)
if (AH->mode == archModeWrite)
{
cfp *tocFH;
+ pg_compress_specification compress_spec;
char fname[MAXPGPATH];
setFilePath(AH, fname, "toc.dat");
@@ -581,7 +583,8 @@ _CloseArchive(ArchiveHandle *AH)
ctx->pstate = ParallelBackupStart(AH);
/* The TOC is always created uncompressed */
- tocFH = cfopen_write(fname, PG_BINARY_W, 0);
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
+ tocFH = cfopen_write(fname, PG_BINARY_W, compress_spec);
if (tocFH == NULL)
pg_fatal("could not open output file \"%s\": %m", fname);
ctx->dataFH = tocFH;
@@ -639,12 +642,14 @@ static void
_StartBlobs(ArchiveHandle *AH, TocEntry *te)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ pg_compress_specification compress_spec;
char fname[MAXPGPATH];
setFilePath(AH, fname, "blobs.toc");
/* The blob TOC file is never compressed */
- ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
+ ctx->blobsTocFH = cfopen_write(fname, "ab", compress_spec);
if (ctx->blobsTocFH == NULL)
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -662,7 +667,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
- ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
+ ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compress_spec);
if (ctx->dataFH == NULL)
pg_fatal("could not open output file \"%s\": %m", fname);
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index bfc49b66d2..085f5c7f20 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -35,6 +35,7 @@
#include <unistd.h>
#include "common/file_utils.h"
+#include "compress_io.h"
#include "fe_utils/string_utils.h"
#include "pg_backup_archiver.h"
#include "pg_backup_tar.h"
@@ -194,7 +195,7 @@ InitArchiveFmt_Tar(ArchiveHandle *AH)
* possible since gzdopen uses buffered IO which totally screws file
* positioning.
*/
- if (AH->compression != 0)
+ if (AH->compress_spec.algorithm != PG_COMPRESSION_NONE)
pg_fatal("compression is not supported by tar archive format");
}
else
@@ -328,7 +329,7 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
}
}
- if (AH->compression == 0)
+ if (AH->compress_spec.algorithm == PG_COMPRESSION_NONE)
tm->nFH = ctx->tarFH;
else
pg_fatal("compression is not supported by tar archive format");
@@ -383,7 +384,7 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
umask(old_umask);
- if (AH->compression == 0)
+ if (AH->compress_spec.algorithm == PG_COMPRESSION_NONE)
tm->nFH = tm->tmpFH;
else
pg_fatal("compression is not supported by tar archive format");
@@ -401,7 +402,7 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
static void
tarClose(ArchiveHandle *AH, TAR_MEMBER *th)
{
- if (AH->compression != 0)
+ if (AH->compress_spec.algorithm != PG_COMPRESSION_NONE)
pg_fatal("compression is not supported by tar archive format");
if (th->mode == 'w')
@@ -800,7 +801,6 @@ _CloseArchive(ArchiveHandle *AH)
memcpy(ropt, AH->public.ropt, sizeof(RestoreOptions));
ropt->filename = NULL;
ropt->dropSchema = 1;
- ropt->compression = 0;
ropt->superuser = NULL;
ropt->suppressDumpWarnings = true;
@@ -888,7 +888,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
if (oid == 0)
pg_fatal("invalid OID for large object (%u)", oid);
- if (AH->compression != 0)
+ if (AH->compress_spec.algorithm != PG_COMPRESSION_NONE)
pg_fatal("compression is not supported by tar archive format");
sprintf(fname, "blob_%u.dat", oid);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c871cb727d..dd214e7670 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -54,7 +54,9 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_trigger_d.h"
#include "catalog/pg_type_d.h"
+#include "common/compression.h"
#include "common/connect.h"
+#include "compress_io.h"
#include "dumputils.h"
#include "fe_utils/option_utils.h"
#include "fe_utils/string_utils.h"
@@ -163,6 +165,8 @@ static void setup_connection(Archive *AH,
const char *dumpencoding, const char *dumpsnapshot,
char *use_role);
static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode);
+static bool parse_compression_option(const char *opt,
+ pg_compress_specification *compress_spec);
static void expand_schema_name_patterns(Archive *fout,
SimpleStringList *patterns,
SimpleOidList *oids,
@@ -339,8 +343,9 @@ main(int argc, char **argv)
const char *dumpsnapshot = NULL;
char *use_role = NULL;
int numWorkers = 1;
- int compressLevel = -1;
int plainText = 0;
+ pg_compress_specification compress_spec = {0};
+ bool user_compression_defined = false;
ArchiveFormat archiveFormat = archUnknown;
ArchiveMode archiveMode;
@@ -560,10 +565,10 @@ main(int argc, char **argv)
dopt.aclsSkip = true;
break;
- case 'Z': /* Compression Level */
- if (!option_parse_int(optarg, "-Z/--compress", 0, 9,
- &compressLevel))
+ case 'Z': /* Compression */
+ if (!parse_compression_option(optarg, &compress_spec))
exit_nicely(1);
+ user_compression_defined = true;
break;
case 0:
@@ -686,23 +691,23 @@ main(int argc, char **argv)
if (archiveFormat == archNull)
plainText = 1;
- /* Custom and directory formats are compressed by default, others not */
- if (compressLevel == -1)
+ /*
+ * Custom and directory formats are compressed by default (zlib), others
+ * not
+ */
+ if (user_compression_defined == false)
{
+ compress_spec.algorithm = PG_COMPRESSION_NONE;
+ compress_spec.level = INT_MIN;
#ifdef HAVE_LIBZ
if (archiveFormat == archCustom || archiveFormat == archDirectory)
- compressLevel = Z_DEFAULT_COMPRESSION;
- else
+ {
+ compress_spec.algorithm = PG_COMPRESSION_GZIP;
+ compress_spec.level = Z_DEFAULT_COMPRESSION;
+ }
#endif
- compressLevel = 0;
}
-#ifndef HAVE_LIBZ
- if (compressLevel != 0)
- pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
- compressLevel = 0;
-#endif
-
/*
* If emitting an archive format, we always want to emit a DATABASE item,
* in case --create is specified at pg_restore time.
@@ -715,8 +720,8 @@ main(int argc, char **argv)
pg_fatal("parallel backup only supported by the directory format");
/* Open the output file */
- fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
- archiveMode, setupDumpWorker);
+ fout = CreateArchive(filename, archiveFormat, compress_spec,
+ dosync, archiveMode, setupDumpWorker);
/* Make dump options accessible right away */
SetArchiveOptions(fout, &dopt, NULL);
@@ -947,10 +952,7 @@ main(int argc, char **argv)
ropt->sequence_data = dopt.sequence_data;
ropt->binary_upgrade = dopt.binary_upgrade;
- if (compressLevel == -1)
- ropt->compression = 0;
- else
- ropt->compression = compressLevel;
+ ropt->compress_spec = compress_spec;
ropt->suppressDumpWarnings = true; /* We've already shown them */
@@ -997,7 +999,8 @@ help(const char *progname)
printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
printf(_(" -v, --verbose verbose mode\n"));
printf(_(" -V, --version output version information, then exit\n"));
- printf(_(" -Z, --compress=0-9 compression level for compressed formats\n"));
+ printf(_(" -Z, --compress=METHOD[:LEVEL]\n"
+ " compress as specified\n"));
printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
printf(_(" -?, --help show this help, then exit\n"));
@@ -1257,6 +1260,110 @@ get_synchronized_snapshot(Archive *fout)
return result;
}
+/*
+ * Interprets a compression option of the format 'method[:LEVEL]' of legacy just
+ * '[LEVEL]'. In the later format, gzip is implied. The parsed method and level
+ * are returned in pg_compress_specification. In case of error, the function
+ * returns false.
+ */
+static bool
+parse_compression_option(const char *opt,
+ pg_compress_specification *compress_spec)
+{
+ char *method;
+ const char *sep;
+ int methodlen;
+ bool supports_compression = true;
+ bool res = true;
+
+ /* set defaults */
+ compress_spec->algorithm = PG_COMPRESSION_NONE;
+ compress_spec->level = INT_MIN;
+
+ /* find the separator if exists */
+ sep = strchr(opt, ':');
+
+ /*
+ * If there is no separator, then it is either a legacy format, or only
+ * the method has been passed.
+ */
+ if (!sep)
+ {
+ if (strspn(opt, "-0123456789") == strlen(opt))
+ {
+ res = option_parse_int(opt, "-Z/--compress", 0, 9,
+ &(compress_spec->level));
+ compress_spec->algorithm = (compress_spec->level > 0) ?
+ PG_COMPRESSION_GZIP :
+ PG_COMPRESSION_NONE;
+ }
+ else
+ {
+ method = pg_strdup(opt);
+ res = parse_compress_algorithm(method,
+ &compress_spec->algorithm);
+ if (!res)
+ pg_log_error("invalid compression method \"%s\" (gzip, none)",
+ method);
+
+ pg_free(method);
+ }
+ }
+ else
+ {
+ /* otherwise, it should be method:LEVEL */
+ methodlen = sep - opt + 1;
+ method = pg_malloc0(methodlen);
+ snprintf(method, methodlen, "%.*s", methodlen - 1, opt);
+
+ res = parse_compress_algorithm(method, &compress_spec->algorithm);
+ if (res)
+ {
+ char *error_detail = NULL;
+ char *detail;
+
+ sep++;
+ detail = pg_strdup(sep);
+ parse_compress_specification(compress_spec->algorithm,
+ detail,
+ compress_spec);
+ error_detail = validate_compress_specification(compress_spec);
+ if (error_detail != NULL)
+ {
+ pg_log_error("invalid compression specification: %s", error_detail);
+ res = false;
+ }
+
+ pg_free(detail);
+ }
+
+ pg_free(method);
+ }
+
+ /* there is no need to check further when an error is already detected */
+ if (!res)
+ return false;
+
+ /* verify that the requested compression is supported */
+
+ if (compress_spec->algorithm != PG_COMPRESSION_NONE &&
+ compress_spec->algorithm != PG_COMPRESSION_GZIP)
+ supports_compression = false;
+
+#ifndef HAVE_LIBZ
+ if (compress_spec->algorithm == PG_COMPRESSION_GZIP)
+ supports_compression = false;
+#endif
+
+ if (!supports_compression)
+ {
+ pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
+ compress_spec->algorithm = PG_COMPRESSION_NONE;
+ }
+
+ return true;
+}
+
static ArchiveFormat
parseArchiveFormat(const char *format, ArchiveMode *mode)
{
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index a583c8a6d2..e1f1a8801c 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -120,6 +120,16 @@ command_fails_like(
qr/\Qpg_restore: error: cannot specify both --single-transaction and multiple jobs\E/,
'pg_restore: cannot specify both --single-transaction and multiple jobs');
+command_fails_like(
+ [ 'pg_dump', '--compress', 'garbage' ],
+ qr/\Qpg_dump: error: invalid compression method "garbage" (gzip, none)\E/,
+ 'pg_dump: invalid --compress');
+
+command_fails_like(
+ [ 'pg_dump', '--compress', 'none:1' ],
+ qr/\Qpg_dump: error: invalid compression specification: compression algorithm "none" does not accept a compression level\E/,
+ 'pg_dump: invalid compression specification: compression algorithm "none" does not accept a compression level');
+
command_fails_like(
[ 'pg_dump', '-Z', '-1' ],
qr/\Qpg_dump: error: -Z\/--compress must be in range 0..9\E/,
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 1f08716f69..a02b578750 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -87,7 +87,7 @@ my %pgdump_runs = (
compile_option => 'gzip',
dump_cmd => [
'pg_dump', '--jobs=2',
- '--format=directory', '--compress=1',
+ '--format=directory', '--compress=gzip:1',
"--file=$tempdir/compression_gzip_dir", 'postgres',
],
# Give coverage for manually compressed blob.toc files during
--
2.34.1
v7-0003-Add-LZ4-compression-in-pg_-dump-restore.patchtext/x-patch; name=v7-0003-Add-LZ4-compression-in-pg_-dump-restore.patchDownload
From 8f07443e04d2e610414d023e8aa443d8aa785dfd Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokolatos@pm.me>
Date: Tue, 5 Jul 2022 12:34:08 +0000
Subject: [PATCH v7 3/3] Add LZ4 compression in pg_{dump|restore}
Within compress_lz4.{c,h} the streaming API and a file API compression is
implemented.. The first one, is aimed at inlined use cases and thus simple
lz4.h calls can be used directly. The second one is generating output, or is
parsing input, which can be read/generated via the lz4 utility.
Wherever the LZ4F api does not implement all the functionality corresponding to
fread(), fwrite(), fgets(), fgetc(), feof(), and fclose(), it has been
implemented localy.
Custom compressed archives need to now store the compression method in their
header. This requires a bump in the version number. The level of compression is
still stored in the dump, though admittedly is of no apparent use.
---
doc/src/sgml/ref/pg_dump.sgml | 23 +-
src/bin/pg_dump/Makefile | 2 +
src/bin/pg_dump/compress_io.c | 41 +-
src/bin/pg_dump/compress_lz4.c | 593 +++++++++++++++++++++++++++
src/bin/pg_dump/compress_lz4.h | 9 +
src/bin/pg_dump/pg_backup_archiver.c | 73 ++--
src/bin/pg_dump/pg_backup_archiver.h | 4 +-
src/bin/pg_dump/pg_dump.c | 8 +-
src/bin/pg_dump/t/001_basic.pl | 2 +-
src/bin/pg_dump/t/002_pg_dump.pl | 69 +++-
10 files changed, 769 insertions(+), 55 deletions(-)
create mode 100644 src/bin/pg_dump/compress_lz4.c
create mode 100644 src/bin/pg_dump/compress_lz4.h
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 94885d0812..1097488b83 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -328,9 +328,10 @@ PostgreSQL documentation
machine-readable format that <application>pg_restore</application>
can read. A directory format archive can be manipulated with
standard Unix tools; for example, files in an uncompressed archive
- can be compressed with the <application>gzip</application> tool.
- This format is compressed by default and also supports parallel
- dumps.
+ can be compressed with the <application>gzip</application> or
+ <application>lz4</application>tool.
+ This format is compressed by default using <literal>gzip</literal>
+ and also supports parallel dumps.
</para>
</listitem>
</varlistentry>
@@ -652,12 +653,12 @@ PostgreSQL documentation
<para>
Specify the compression method and/or the compression level to use.
The compression method can be set to <literal>gzip</literal> or
- <literal>none</literal> for no compression. A compression level can
- be optionally specified, by appending the level number after a colon
- (<literal>:</literal>). If no level is specified, the default compression
- level will be used for the specified method. If only a level is
- specified without mentioning a method, <literal>gzip</literal> compression
- will be used.
+ <literal>lz4</literal> or <literal>none</literal> for no compression. A
+ compression level can be optionally specified, by appending the level
+ number after a colon (<literal>:</literal>). If no level is specified,
+ the default compression level will be used for the specified method. If
+ only a level is specified without mentioning a method,
+ <literal>gzip</literal> compression willbe used.
</para>
<para>
@@ -665,8 +666,8 @@ PostgreSQL documentation
individual table-data segments, and the default is to compress using
<literal>gzip</literal> at a moderate level. For plain text output,
setting a nonzero compression level causes the entire output file to be compressed,
- as though it had been fed through <application>gzip</application>; but the default
- is not to compress.
+ as though it had been fed through <application>gzip</application> or
+ <application>lz4</application>; but the default is not to compress.
</para>
<para>
The tar archive format currently does not support compression at all.
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index 2d777ec213..1f7fd8d1f0 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -17,6 +17,7 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
export GZIP_PROGRAM=$(GZIP)
+export LZ4
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
@@ -24,6 +25,7 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
OBJS = \
$(WIN32RES) \
compress_gzip.o \
+ compress_lz4.o \
compress_io.o \
dumputils.o \
parallel.o \
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 1948ee3d57..4ef1cb5291 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -38,13 +38,15 @@
* ----------------------
*
* The compressed stream API is a wrapper around the C standard fopen() and
- * libz's gzopen() APIs. It allows you to use the same functions for
- * compressed and uncompressed streams. cfopen_read() first tries to open
- * the file with given name, and if it fails, it tries to open the same
- * file with the .gz suffix. cfopen_write() opens a file for writing, an
- * extra argument specifies if the file should be compressed, and adds the
- * .gz suffix to the filename if so. This allows you to easily handle both
- * compressed and uncompressed files.
+ * libz's gzopen() APIs and custom LZ4 calls which provide similar
+ * functionality. It allows you to use the same functions for compressed and
+ * uncompressed streams. cfopen_read() first tries to open the file with given
+ * name, and if it fails, it tries to open the same file with the .gz suffix,
+ * failing that it tries to open the same file with the .lz4 suffix.
+ * cfopen_write() opens a file for writing, an extra argument specifies the
+ * method to use should the file be compressed, and adds the appropriate
+ * suffix, .gz or .lz4, to the filename if so. This allows you to easily handle
+ * both compressed and uncompressed files.
*
* IDENTIFICATION
* src/bin/pg_dump/compress_io.c
@@ -57,6 +59,7 @@
#include "compress_io.h"
#include "compress_gzip.h"
+#include "compress_lz4.h"
#include "pg_backup_utils.h"
/*----------------------
@@ -125,6 +128,9 @@ AllocateCompressor(const pg_compress_specification compress_spec,
case PG_COMPRESSION_GZIP:
InitCompressorGzip(cs, compress_spec.level);
break;
+ case PG_COMPRESSION_LZ4:
+ InitCompressorLZ4(cs, compress_spec.level);
+ break;
default:
pg_fatal("invalid compression method");
break;
@@ -175,6 +181,7 @@ free_keep_errno(void *p)
/*
* Compression None implementation
*/
+
static size_t
_read(void *ptr, size_t size, CompressFileHandle * CFH)
{
@@ -310,6 +317,9 @@ InitCompressFileHandle(const pg_compress_specification compress_spec)
case PG_COMPRESSION_GZIP:
InitCompressGzip(CFH, compress_spec.level);
break;
+ case PG_COMPRESSION_LZ4:
+ InitCompressLZ4(CFH, compress_spec.level);
+ break;
default:
pg_fatal("invalid compression method");
break;
@@ -322,12 +332,12 @@ InitCompressFileHandle(const pg_compress_specification compress_spec)
* Open a file for reading. 'path' is the file to open, and 'mode' should
* be either "r" or "rb".
*
- * If the file at 'path' does not exist, we append the ".gz" suffix (if
+ * If the file at 'path' does not exist, we append the "{.gz,.lz4}" suffix (i
* 'path' doesn't already have it) and try again. So if you pass "foo" as
- * 'path', this will open either "foo" or "foo.gz", trying in that order.
+ * 'path', this will open either "foo" or "foo.gz" or "foo.lz4", trying in that
+ * order.
*
* On failure, return NULL with an error code in errno.
- *
*/
CompressFileHandle *
InitDiscoverCompressFileHandle(const char *path, const char *mode)
@@ -363,6 +373,17 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
if (exists)
compress_spec.algorithm = PG_COMPRESSION_GZIP;
}
+#endif
+#ifdef USE_LZ4
+ if (!exists)
+ {
+ free_keep_errno(fname);
+ fname = psprintf("%s.lz4", path);
+ exists = (stat(fname, &st) == 0);
+
+ if (exists)
+ compress_spec.algorithm = PG_COMPRESSION_LZ4;
+ }
#endif
}
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
new file mode 100644
index 0000000000..6f4680c344
--- /dev/null
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -0,0 +1,593 @@
+#include "postgres_fe.h"
+#include "pg_backup_utils.h"
+
+#include "compress_lz4.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#include <lz4frame.h>
+
+#define LZ4_OUT_SIZE (4 * 1024)
+#define LZ4_IN_SIZE (16 * 1024)
+
+/*----------------------
+ * Compressor API
+ *----------------------
+ */
+
+typedef struct LZ4CompressorState
+{
+ char *outbuf;
+ size_t outsize;
+} LZ4CompressorState;
+
+/* Private routines that support LZ4 compressed data I/O */
+static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen);
+static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
+{
+ LZ4_streamDecode_t lz4StreamDecode;
+ char *buf;
+ char *decbuf;
+ size_t buflen;
+ size_t cnt;
+
+ buflen = LZ4_IN_SIZE;
+ buf = pg_malloc(buflen);
+ decbuf = pg_malloc(buflen);
+
+ LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+
+ while ((cnt = cs->readF(AH, &buf, &buflen)))
+ {
+ int decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
+ buf, decbuf,
+ cnt, buflen);
+
+ ahwrite(decbuf, 1, decBytes, AH);
+ }
+
+ pg_free(buf);
+ pg_free(decbuf);
+}
+
+static void
+WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen)
+{
+ LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private;
+ size_t compressed;
+ size_t requiredsize = LZ4_compressBound(dLen);
+
+ if (requiredsize > LZ4cs->outsize)
+ {
+ LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
+ LZ4cs->outsize = requiredsize;
+ }
+
+ compressed = LZ4_compress_default(data, LZ4cs->outbuf,
+ dLen, LZ4cs->outsize);
+
+ if (compressed <= 0)
+ pg_fatal("failed to LZ4 compress data");
+
+ cs->writeF(AH, LZ4cs->outbuf, compressed);
+}
+
+static void
+EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
+{
+ LZ4CompressorState *LZ4cs;
+
+ LZ4cs = (LZ4CompressorState *) cs->private;
+ if (LZ4cs)
+ {
+ pg_free(LZ4cs->outbuf);
+ pg_free(LZ4cs);
+ cs->private = NULL;
+ }
+}
+
+
+/* Public routines that support LZ4 compressed data I/O */
+void
+InitCompressorLZ4(CompressorState *cs, int compressionLevel)
+{
+ cs->readData = ReadDataFromArchiveLZ4;
+ cs->writeData = WriteDataToArchiveLZ4;
+ cs->end = EndCompressorLZ4;
+
+ /* Will be lazy init'd */
+ cs->private = pg_malloc0(sizeof(LZ4CompressorState));
+}
+
+/*----------------------
+ * Compress File API
+ *----------------------
+ */
+
+/*
+ * State needed for LZ4 (de)compression using the CompressFileHandle API.
+ */
+typedef struct LZ4File
+{
+ FILE *fp;
+
+ LZ4F_preferences_t prefs;
+
+ LZ4F_compressionContext_t ctx;
+ LZ4F_decompressionContext_t dtx;
+
+ bool inited;
+ bool compressing;
+
+ size_t buflen;
+ char *buffer;
+
+ size_t overflowalloclen;
+ size_t overflowlen;
+ char *overflowbuf;
+
+ size_t errcode;
+} LZ4File;
+
+/*
+ * LZ4 equivalent to feof() or gzeof(). The end of file
+ * is reached if there is no decompressed output in the
+ * overflow buffer and the end of the file is reached.
+ */
+static int
+LZ4File_eof(CompressFileHandle * CFH)
+{
+ LZ4File *fs = (LZ4File *) CFH->private;
+
+ return fs->overflowlen == 0 && feof(fs->fp);
+}
+
+static const char *
+LZ4File_get_error(CompressFileHandle * CFH)
+{
+ LZ4File *fs = (LZ4File *) CFH->private;
+ const char *errmsg;
+
+ if (LZ4F_isError(fs->errcode))
+ errmsg = LZ4F_getErrorName(fs->errcode);
+ else
+ errmsg = strerror(errno);
+
+ return errmsg;
+}
+
+/*
+ * Prepare an already alloc'ed LZ4File struct for subsequent calls.
+ *
+ * It creates the nessary contexts for the operations. When compressing,
+ * it additionally writes the LZ4 header in the output stream.
+ */
+static int
+LZ4File_init(LZ4File * fs, int size, bool compressing)
+{
+ size_t status;
+
+ if (fs->inited)
+ return 0;
+
+ fs->compressing = compressing;
+ fs->inited = true;
+
+ if (fs->compressing)
+ {
+ fs->buflen = LZ4F_compressBound(LZ4_IN_SIZE, &fs->prefs);
+ if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
+ fs->buflen = LZ4F_HEADER_SIZE_MAX;
+
+ status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
+ if (LZ4F_isError(status))
+ {
+ fs->errcode = status;
+ return 1;
+ }
+
+ fs->buffer = pg_malloc(fs->buflen);
+ status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
+ &fs->prefs);
+
+ if (LZ4F_isError(status))
+ {
+ fs->errcode = status;
+ return 1;
+ }
+
+ if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+ {
+ errno = errno ? : ENOSPC;
+ return 1;
+ }
+ }
+ else
+ {
+ status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION);
+ if (LZ4F_isError(status))
+ {
+ fs->errcode = status;
+ return 1;
+ }
+
+ fs->buflen = size > LZ4_OUT_SIZE ? size : LZ4_OUT_SIZE;
+ fs->buffer = pg_malloc(fs->buflen);
+
+ fs->overflowalloclen = fs->buflen;
+ fs->overflowbuf = pg_malloc(fs->overflowalloclen);
+ fs->overflowlen = 0;
+ }
+
+ return 0;
+}
+
+/*
+ * Read already decompressed content from the overflow buffer into 'ptr' up to
+ * 'size' bytes, if available. If the eol_flag is set, then stop at the first
+ * occurance of the new line char prior to 'size' bytes.
+ *
+ * Any unread content in the overflow buffer, is moved to the beginning.
+ */
+static int
+LZ4File_read_overflow(LZ4File * fs, void *ptr, int size, bool eol_flag)
+{
+ char *p;
+ int readlen = 0;
+
+ if (fs->overflowlen == 0)
+ return 0;
+
+ if (fs->overflowlen >= size)
+ readlen = size;
+ else
+ readlen = fs->overflowlen;
+
+ if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen)))
+ /* Include the line terminating char */
+ readlen = p - fs->overflowbuf + 1;
+
+ memcpy(ptr, fs->overflowbuf, readlen);
+ fs->overflowlen -= readlen;
+
+ if (fs->overflowlen > 0)
+ memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen);
+
+ return readlen;
+}
+
+/*
+ * The workhorse for reading decompressed content out of an LZ4 compressed
+ * stream.
+ *
+ * It will read up to 'ptrsize' decompressed content, or up to the new line char
+ * if found first when the eol_flag is set. It is possible that the decompressed
+ * output generated by reading any compressed input via the LZ4F API, exceeds
+ * 'ptrsize'. Any exceeding decompressed content is stored at an overflow
+ * buffer within LZ4File. Of course, when the function is called, it will first
+ * try to consume any decompressed content already present in the overflow
+ * buffer, before decompressing new content.
+ */
+static int
+LZ4File_read_internal(LZ4File * fs, void *ptr, int ptrsize, bool eol_flag)
+{
+ size_t dsize = 0;
+ size_t rsize;
+ size_t size = ptrsize;
+ bool eol_found = false;
+
+ void *readbuf;
+
+ /* Lazy init */
+ if (!fs->inited && LZ4File_init(fs, size, false /* decompressing */ ))
+ return -1;
+
+ /* Verfiy that there is enough space in the outbuf */
+ if (size > fs->buflen)
+ {
+ fs->buflen = size;
+ fs->buffer = pg_realloc(fs->buffer, size);
+ }
+
+ /* use already decompressed content if available */
+ dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
+ if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
+ return dsize;
+
+ readbuf = pg_malloc(size);
+
+ do
+ {
+ char *rp;
+ char *rend;
+
+ rsize = fread(readbuf, 1, size, fs->fp);
+ if (rsize < size && !feof(fs->fp))
+ return -1;
+
+ rp = (char *) readbuf;
+ rend = (char *) readbuf + rsize;
+
+ while (rp < rend)
+ {
+ size_t status;
+ size_t outlen = fs->buflen;
+ size_t read_remain = rend - rp;
+
+ memset(fs->buffer, 0, outlen);
+ status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen,
+ rp, &read_remain, NULL);
+ if (LZ4F_isError(status))
+ {
+ fs->errcode = status;
+ return -1;
+ }
+
+ rp += read_remain;
+
+ /*
+ * fill in what space is available in ptr if the eol flag is set,
+ * either skip if one already found or fill up to EOL if present
+ * in the outbuf
+ */
+ if (outlen > 0 && dsize < size && eol_found == false)
+ {
+ char *p;
+ size_t lib = (eol_flag == 0) ? size - dsize : size - 1 - dsize;
+ size_t len = outlen < lib ? outlen : lib;
+
+ if (eol_flag == true &&
+ (p = memchr(fs->buffer, '\n', outlen)) &&
+ (size_t) (p - fs->buffer + 1) <= len)
+ {
+ len = p - fs->buffer + 1;
+ eol_found = true;
+ }
+
+ memcpy((char *) ptr + dsize, fs->buffer, len);
+ dsize += len;
+
+ /* move what did not fit, if any, at the begining of the buf */
+ if (len < outlen)
+ memmove(fs->buffer, fs->buffer + len, outlen - len);
+ outlen -= len;
+ }
+
+ /* if there is available output, save it */
+ if (outlen > 0)
+ {
+ while (fs->overflowlen + outlen > fs->overflowalloclen)
+ {
+ fs->overflowalloclen *= 2;
+ fs->overflowbuf = pg_realloc(fs->overflowbuf,
+ fs->overflowalloclen);
+ }
+
+ memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen);
+ fs->overflowlen += outlen;
+ }
+ }
+ } while (rsize == size && dsize < size && eol_found == 0);
+
+ pg_free(readbuf);
+
+ return (int) dsize;
+}
+
+/*
+ * Compress size bytes from ptr and write them to the stream.
+ */
+static size_t
+LZ4File_write(const void *ptr, size_t size, CompressFileHandle * CFH)
+{
+ LZ4File *fs = (LZ4File *) CFH->private;
+ size_t status;
+ int remaining = size;
+
+ if (!fs->inited && LZ4File_init(fs, size, true))
+ return -1;
+
+ while (remaining > 0)
+ {
+ int chunk = remaining < LZ4_IN_SIZE ? remaining : LZ4_IN_SIZE;
+
+ remaining -= chunk;
+
+ status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen,
+ ptr, chunk, NULL);
+ if (LZ4F_isError(status))
+ {
+ fs->errcode = status;
+ return -1;
+ }
+
+ if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+ {
+ errno = errno ? : ENOSPC;
+ return 1;
+ }
+ }
+
+ return size;
+}
+
+/*
+ * fread() equivalent implementation for LZ4 compressed files.
+ */
+static size_t
+LZ4File_read(void *ptr, size_t size, CompressFileHandle * CFH)
+{
+ LZ4File *fs = (LZ4File *) CFH->private;
+ int ret;
+
+ ret = LZ4File_read_internal(fs, ptr, size, false);
+ if (ret != size && !LZ4File_eof(CFH))
+ pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+
+ return ret;
+}
+
+/*
+ * fgetc() equivalent implementation for LZ4 compressed files.
+ */
+static int
+LZ4File_getc(CompressFileHandle * CFH)
+{
+ LZ4File *fs = (LZ4File *) CFH->private;
+ unsigned char c;
+
+ if (LZ4File_read_internal(fs, &c, 1, false) != 1)
+ {
+ if (!LZ4File_eof(CFH))
+ pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+ else
+ pg_fatal("could not read from input file: end of file");
+ }
+
+ return c;
+}
+
+/*
+ * fgets() equivalent implementation for LZ4 compressed files.
+ */
+static char *
+LZ4File_gets(char *ptr, int size, CompressFileHandle * CFH)
+{
+ LZ4File *fs = (LZ4File *) CFH->private;
+ size_t dsize;
+
+ dsize = LZ4File_read_internal(fs, ptr, size, true);
+ if (dsize < 0)
+ pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+
+ /* Done reading */
+ if (dsize == 0)
+ return NULL;
+
+ return ptr;
+}
+
+/*
+ * Finalize (de)compression of a stream. When compressing it will write any
+ * remaining content and/or generated footer from the LZ4 API.
+ */
+static int
+LZ4File_close(CompressFileHandle * CFH)
+{
+ FILE *fp;
+ LZ4File *fs = (LZ4File *) CFH->private;
+ size_t status;
+ int ret;
+
+ fp = fs->fp;
+ if (fs->inited)
+ {
+ if (fs->compressing)
+ {
+ status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL);
+ if (LZ4F_isError(status))
+ pg_fatal("failed to end compression: %s",
+ LZ4F_getErrorName(status));
+ else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status)
+ {
+ errno = errno ? : ENOSPC;
+ WRITE_ERROR_EXIT;
+ }
+
+ status = LZ4F_freeCompressionContext(fs->ctx);
+ if (LZ4F_isError(status))
+ pg_fatal("failed to end compression: %s",
+ LZ4F_getErrorName(status));
+ }
+ else
+ {
+ status = LZ4F_freeDecompressionContext(fs->dtx);
+ if (LZ4F_isError(status))
+ pg_fatal("failed to end decompression: %s",
+ LZ4F_getErrorName(status));
+ pg_free(fs->overflowbuf);
+ }
+
+ pg_free(fs->buffer);
+ }
+
+ pg_free(fs);
+
+ return fclose(fp);
+}
+
+static int
+LZ4File_open(const char *path, int fd, const char *mode,
+ CompressFileHandle * CFH)
+{
+ FILE *fp;
+ LZ4File *lz4fp = (LZ4File *) CFH->private;
+
+ if (fd >= 0)
+ fp = fdopen(fd, mode);
+ else
+ fp = fopen(path, mode);
+ if (fp == NULL)
+ {
+ lz4fp->errcode = errno;
+ return 1;
+ }
+
+ lz4fp->fp = fp;
+
+ return 0;
+}
+
+static int
+LZ4File_open_write(const char *path, const char *mode, CompressFileHandle * CFH)
+{
+ char *fname;
+ int ret;
+
+ fname = psprintf("%s.lz4", path);
+ ret = CFH->open(fname, -1, mode, CFH);
+ pg_free(fname);
+
+ return ret;
+}
+
+void
+InitCompressLZ4(CompressFileHandle * CFH, int compressionLevel)
+{
+ LZ4File *lz4fp;
+
+ CFH->open = LZ4File_open;
+ CFH->open_write = LZ4File_open_write;
+ CFH->read = LZ4File_read;
+ CFH->write = LZ4File_write;
+ CFH->gets = LZ4File_gets;
+ CFH->getc = LZ4File_getc;
+ CFH->eof = LZ4File_eof;
+ CFH->close = LZ4File_close;
+ CFH->get_error = LZ4File_get_error;
+
+ lz4fp = pg_malloc0(sizeof(*lz4fp));
+ if (compressionLevel >= 0)
+ lz4fp->prefs.compressionLevel = compressionLevel;
+
+ CFH->private = lz4fp;
+}
+#else /* USE_LZ4 */
+void
+InitCompressorLZ4(CompressorState *cs, int compressionLevel)
+{
+ pg_fatal("not built with LZ4 support");
+}
+
+void
+InitCompressLZ4(CompressFileHandle * CFH, int compressionLevel)
+{
+ pg_fatal("not built with LZ4 support");
+}
+#endif /* USE_LZ4 */
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
new file mode 100644
index 0000000000..fbec9a508d
--- /dev/null
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -0,0 +1,9 @@
+#ifndef _COMPRESS_LZ4_H_
+#define _COMPRESS_LZ4_H_
+
+#include "compress_io.h"
+
+extern void InitCompressorLZ4(CompressorState *cs, int compressionLevel);
+extern void InitCompressLZ4(CompressFileHandle * CFH, int compressionLevel);
+
+#endif /* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 186510c235..225c226a6e 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -353,6 +353,7 @@ RestoreArchive(Archive *AHX)
ArchiveHandle *AH = (ArchiveHandle *) AHX;
RestoreOptions *ropt = AH->public.ropt;
bool parallel_mode;
+ bool supports_compression;
TocEntry *te;
CompressFileHandle *sav;
@@ -382,17 +383,28 @@ RestoreArchive(Archive *AHX)
/*
* Make sure we won't need (de)compression we haven't got
*/
-#ifndef HAVE_LIBZ
- if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP &&
+ supports_compression = true;
+ if ((AH->compress_spec.algorithm == PG_COMPRESSION_GZIP ||
+ AH->compress_spec.algorithm == PG_COMPRESSION_LZ4) &&
AH->PrintTocDataPtr != NULL)
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
- pg_fatal("cannot restore from compressed archive (compression not supported in this installation)");
+ {
+#ifndef HAVE_LIBZ
+ if (AH->compress_algorithm == PG_COMPRESSION_GZIP)
+ supports_compression = false;
+#endif
+#ifndef USE_LZ4
+ if (AH->compress_algorithm == PG_COMPRESSION_LZ4)
+ supports_compression = false;
+#endif
+ if (supports_compression == false)
+ pg_fatal("cannot restore from compressed archive (compression not supported in this installation)");
+ }
}
}
-#endif
/*
* Prepare index arrays, so we can assume we have them throughout restore.
@@ -2028,6 +2040,18 @@ ReadStr(ArchiveHandle *AH)
return buf;
}
+static bool
+_fileExistsInDirectory(const char *dir, const char *filename)
+{
+ struct stat st;
+ char buf[MAXPGPATH];
+
+ if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
+ pg_fatal("directory name too long: \"%s\"", dir);
+
+ return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
+}
+
static int
_discoverArchiveFormat(ArchiveHandle *AH)
{
@@ -2054,30 +2078,20 @@ _discoverArchiveFormat(ArchiveHandle *AH)
/*
* Check if the specified archive is a directory. If so, check if
- * there's a "toc.dat" (or "toc.dat.gz") file in it.
+ * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
*/
if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
{
- char buf[MAXPGPATH];
-
- if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
- pg_fatal("directory name too long: \"%s\"",
- AH->fSpec);
- if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
- {
- AH->format = archDirectory;
+ AH->format = archDirectory;
+ if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
return AH->format;
- }
-
#ifdef HAVE_LIBZ
- if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
- pg_fatal("directory name too long: \"%s\"",
- AH->fSpec);
- if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
- {
- AH->format = archDirectory;
+ if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
+ return AH->format;
+#endif
+#ifdef USE_LZ4
+ if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
return AH->format;
- }
#endif
pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
AH->fSpec);
@@ -3684,6 +3698,7 @@ WriteHead(ArchiveHandle *AH)
AH->WriteBytePtr(AH, AH->offSize);
AH->WriteBytePtr(AH, AH->format);
WriteInt(AH, AH->compress_spec.level);
+ AH->WriteBytePtr(AH, AH->compress_spec.algorithm);
crtm = *localtime(&AH->createDate);
WriteInt(AH, crtm.tm_sec);
WriteInt(AH, crtm.tm_min);
@@ -3765,14 +3780,20 @@ ReadHead(ArchiveHandle *AH)
else
AH->compress_spec.level = Z_DEFAULT_COMPRESSION;
- if (AH->compress_spec.level != INT_MIN)
+ if (AH->version >= K_VERS_1_15)
+ AH->compress_spec.algorithm = AH->ReadBytePtr(AH);
+ else if (AH->compress_spec.level != INT_MIN)
+ AH->compress_spec.algorithm = PG_COMPRESSION_GZIP;
+
#ifndef HAVE_LIBZ
+ if (AH->compress_spec.algorithm == PG_COMPRESSION_GZIP)
+ {
pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
-#else
- AH->compress_spec.algorithm = PG_COMPRESSION_GZIP;
+ AH->compress_spec.algorithm = PG_COMPRESSION_NONE;
+ AH->compress_spec.level = 0;
+ }
#endif
-
if (AH->version >= K_VERS_1_4)
{
struct tm crtm;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 341d406515..f577e8fc61 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -65,10 +65,12 @@
#define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0) /* change search_path
* behavior */
#define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0) /* add tableam */
+#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0) /* add compressionMethod
+ * in header */
/* Current archive version number (the format we can output) */
#define K_VERS_MAJOR 1
-#define K_VERS_MINOR 14
+#define K_VERS_MINOR 15
#define K_VERS_REV 0
#define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index dd214e7670..e1253f7291 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -1303,7 +1303,7 @@ parse_compression_option(const char *opt,
res = parse_compress_algorithm(method,
&compress_spec->algorithm);
if (!res)
- pg_log_error("invalid compression method \"%s\" (gzip, none)",
+ pg_log_error("invalid compression method \"%s\" (gzip, lz4, none)",
method);
pg_free(method);
@@ -1345,8 +1345,8 @@ parse_compression_option(const char *opt,
return false;
/* verify that the requested compression is supported */
-
if (compress_spec->algorithm != PG_COMPRESSION_NONE &&
+ compress_spec->algorithm != PG_COMPRESSION_LZ4 &&
compress_spec->algorithm != PG_COMPRESSION_GZIP)
supports_compression = false;
@@ -1354,6 +1354,10 @@ parse_compression_option(const char *opt,
if (compress_spec->algorithm == PG_COMPRESSION_GZIP)
supports_compression = false;
#endif
+#ifndef USE_LZ4
+ if (compression_spec->algorithm == PG_COMPRESSION_LZ4)
+ supports_compression = false;
+#endif
if (!supports_compression)
{
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index e1f1a8801c..3803370350 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -122,7 +122,7 @@ command_fails_like(
command_fails_like(
[ 'pg_dump', '--compress', 'garbage' ],
- qr/\Qpg_dump: error: invalid compression method "garbage" (gzip, none)\E/,
+ qr/\Qpg_dump: error: invalid compression method "garbage" (gzip, lz4, none)\E/,
'pg_dump: invalid --compress');
command_fails_like(
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index a02b578750..9109d72f8a 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -116,6 +116,67 @@ my %pgdump_runs = (
args => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ],
},
},
+
+ # Do not use --no-sync to give test coverage for data sync.
+ compression_lz4_custom => {
+ test_key => 'compression',
+ compile_option => 'lz4',
+ dump_cmd => [
+ 'pg_dump', '--format=custom',
+ '--compress=1', "--file=$tempdir/compression_lz4_custom.dump",
+ 'postgres',
+ ],
+ restore_cmd => [
+ 'pg_restore',
+ "--file=$tempdir/compression_lz4_custom.sql",
+ "$tempdir/compression_lz4_custom.dump",
+ ],
+ },
+
+ # Do not use --no-sync to give test coverage for data sync.
+ compression_lz4_dir => {
+ test_key => 'compression',
+ compile_option => 'lz4',
+ dump_cmd => [
+ 'pg_dump', '--jobs=2',
+ '--format=directory', '--compress=lz4:1',
+ "--file=$tempdir/compression_lz4_dir", 'postgres',
+ ],
+ # Give coverage for manually compressed blob.toc files during
+ # restore.
+ compress_cmd => {
+ program => $ENV{'LZ4'},
+ args => [
+ '-z', '-f', '--rm',
+ "$tempdir/compression_lz4_dir/blobs.toc",
+ "$tempdir/compression_lz4_dir/blobs.toc.lz4",
+ ],
+ },
+ restore_cmd => [
+ 'pg_restore', '--jobs=2',
+ "--file=$tempdir/compression_lz4_dir.sql",
+ "$tempdir/compression_lz4_dir",
+ ],
+ },
+
+ compression_lz4_plain => {
+ test_key => 'compression',
+ compile_option => 'lz4',
+ dump_cmd => [
+ 'pg_dump', '--format=plain', '--compress=lz4',
+ "--file=$tempdir/compression_lz4_plain.sql.lz4", 'postgres',
+ ],
+ # Decompress the generated file to run through the tests.
+ compress_cmd => {
+ program => $ENV{'LZ4'},
+ args => [
+ '-d', '-f',
+ "$tempdir/compression_lz4_plain.sql.lz4",
+ "$tempdir/compression_lz4_plain.sql",
+ ],
+ },
+ },
+
clean => {
dump_cmd => [
'pg_dump',
@@ -4057,11 +4118,11 @@ foreach my $run (sort keys %pgdump_runs)
my $run_db = 'postgres';
# Skip command-level tests for gzip if there is no support for it.
- if ( defined($pgdump_runs{$run}->{compile_option})
- && $pgdump_runs{$run}->{compile_option} eq 'gzip'
- && !$supports_gzip)
+ if ($pgdump_runs{$run}->{compile_option} &&
+ ($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
+ ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))
{
- note "$run: skipped due to no gzip support";
+ note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
next;
}
--
2.34.1
This is a review of 0001.
On Tue, Jul 05, 2022 at 01:22:47PM +0000, gkokolatos@pm.me wrote:
Simply this patchset had started to divert
heavily already based on comments from Mr. Paquier who had already requested for
the APIs to be refactored to use function pointers. This is happening in 0002 of
the patchset.
I said something about reducing ifdefs, but I'm having trouble finding what
Michael said about this ?
On Sat, Mar 26, 2022 at 11:21:56AM -0500, Justin Pryzby wrote:
LZ4F_HEADER_SIZE_MAX isn't defined in old LZ4.
I ran into that on an ubuntu LTS, so I don't think it's so old that it
shouldn't be handled more gracefully. LZ4 should either have an explicit
version check, or else shouldn't depend on that feature (or should define a
safe fallback version if the library header doesn't define it).
https://packages.ubuntu.com/liblz4-1
The constant still seems to be used without defining a fallback or a minimum version.
0003: typo: of legacy => or legacy
This is still there
You renamed this:
|- COMPR_ALG_LIBZ
|-} CompressionAlgorithm;
|+ COMPRESSION_GZIP,
|+} CompressionMethod;..But I don't think that's an improvement. If you were to change it, it should
say something like PGDUMP_COMPRESS_ZLIB, since there are other compression
structs and typedefs. zlib is not idential to gzip, which uses a different
header, so in WriteDataToArchive(), LIBZ is correct, and GZIP is incorrect.
This comment still applies - zlib's gz* functions are "gzip" but the others are
"zlib". https://zlib.net/manual.html
That affects both the 0001 and 0002 patches.
Actually, I think that "gzip" should not be the name of the user-facing option,
since (except for "plain" format) it isn't using gzip.
+Robert, since this suggests amending parse_compress_algorithm(). Maybe "zlib"
should be parsed the same way as "gzip" - I don't think we ever expose both to
a user, but in some cases (basebackup and pg_dump -Fp -Z1) the output is "gzip"
and in some cases NO it's zlib (pg_dump -Fc -Z1).
The cf* changes in pg_backup_archiver could be split out into a separate
commit. It's strictly a code simplification - not just preparation for more
compression algorithms. The commit message should "See also:
bf9aa490db24b2334b3595ee33653bf2fe39208c".
I still think this could be an early, 0000 patch.
freebsd/cfbot is failing.
This is still failing for bsd, windows and compiler warnings.
Windows also has compiler warnings.
http://cfbot.cputube.org/georgios-kokolatos.html
Please see: src/tools/ci/README, which you can use to run check-world on 4 OS
by pushing a branch to github.
I suggested off-list to add an 0099 patch to change LZ4 to the default, to
exercise it more on CI.
What about this ? I think the patch needs to pass CI on all 4 OS with
default=zlib and default=lz4.
On Sat, Mar 26, 2022 at 01:33:36PM -0500, Justin Pryzby wrote:
@@ -254,7 +251,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt, Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt) { - ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker); + ArchiveHandle *AH; + pg_compress_specification compress_spec;
Should this be initialized to {0} ?
@@ -969,6 +969,8 @@ NewRestoreOptions(void) opts->format = archUnknown; opts->cparams.promptPassword = TRI_DEFAULT; opts->dumpSections = DUMP_UNSECTIONED; + opts->compress_spec.algorithm = PG_COMPRESSION_NONE; + opts->compress_spec.level = INT_MIN;
Why INT_MIN ?
@@ -1115,23 +1117,28 @@ PrintTOCSummary(Archive *AHX) ArchiveHandle *AH = (ArchiveHandle *) AHX; RestoreOptions *ropt = AH->public.ropt; TocEntry *te; + pg_compress_specification out_compress_spec;
Should have {0} ?
I suggest to write it like my 2020 patch for this, which says:
no_compression = {0};
/* Open stdout with no compression for AH output handle */ - AH->gzOut = 0; - AH->OF = stdout; + out_compress_spec.algorithm = PG_COMPRESSION_NONE; + AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec);
Ideally this should check the success of dup().
@@ -3776,21 +3746,25 @@ ReadHead(ArchiveHandle *AH) + if (AH->compress_spec.level != INT_MIN)
Why is it testing the level and not the algorithm ?
--- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -298,7 +298,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */- ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); + ctx->cs = AllocateCompressor(AH->compress_spec, _CustomWriteFunc);
Is it necessary to rename the data structure ?
If not, this file can remain unchanged.
--- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -573,6 +574,7 @@ _CloseArchive(ArchiveHandle *AH) if (AH->mode == archModeWrite) { cfp *tocFH; + pg_compress_specification compress_spec;
Should use {0} ?
@@ -639,12 +642,14 @@ static void _StartBlobs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; + pg_compress_specification compress_spec;
Same
+ /* + * Custom and directory formats are compressed by default (zlib), others + * not + */ + if (user_compression_defined == false)
Should be: !user_compression_defined
Your 0001+0002 patches (without 0003) fail to compile:
pg_backup_directory.c: In function ‘_ReadByte’:
pg_backup_directory.c:519:12: error: ‘CompressFileHandle’ {aka ‘struct CompressFileHandle’} has no member named ‘_IO_getc’
519 | return CFH->getc(CFH);
| ^~
pg_backup_directory.c:520:1: warning: control reaches end of non-void function [-Wreturn-type]
520 | }
--
Justin
On Tue, Jul 05, 2022 at 01:22:47PM +0000, gkokolatos@pm.me wrote:
I have updated for "some" of the comments. This is not an unwillingness to
incorporate those specific comments. Simply this patchset had started to divert
heavily already based on comments from Mr. Paquier who had already requested for
the APIs to be refactored to use function pointers. This is happening in 0002 of
the patchset. 0001 of the patchset is using the new compression.h under common.This patchset should be considered a late draft, as commentary, documentation,
and some finer details are not yet finalized; because I am expecting the proposed
refactor to receive a wealth of comments. It would be helpful to understand if
the proposed direction is something worth to be worked upon, before moving to the
finer details.
I have read through the patch set, and I like a lot the separation you
are doing here with CompressFileHandle where a compression method has
to specify a full set of callbacks depending on the actions that need
to be taken. One advantage, as you patch shows, is that you reduce
the dependency of each code path depending on the compression method,
with #ifdefs and such located mostly into their own file structure, so
as adding a new compression method becomes really easier. These
callbacks are going to require much more documentation to describe
what anybody using them should expect from them, and perhaps they
could be renamed in a more generic way as the currect names come from
POSIX (say read_char(), read_string()?), even if this patch has just
inherited the names coming from pg_dump itself, but this can be tuned
over and over.
The split into three parts as of 0001 to plug into pg_dump the new
compression option set, 0002 to introduce the callbacks and 0003 to
add LZ4, building on the two first parts, makes sense to me. 0001 and
0002 could be done in a reversed order as they are mostly independent,
this order is fine as-is.
In short, I am fine with the proposed approach.
+#define K_VERS_1_15 MAKE_ARCHIVE_VERSION(1, 15, 0) /* add compressionMethod
+ * in header */
Indeed, the dump format needs a version bump for this information.
+static bool
+parse_compression_option(const char *opt,
+ pg_compress_specification *compress_spec)
This parsing logic in pg_dump.c looks a lot like what pg_receivewal.c
does with its parse_compress_options() where, for compatibility:
- If only a number is given:
-- Assume no compression if level is 0.
-- Assume gzip with given compression if level > 0.
- If a string is found, assume a full spec, with optionally a level.
So some consolidation could be done between both.
By the way, I can see that GZCLOSE(), etc. are still defined in
compress_io.h but they are not used.
--
Michael
This entry has been waiting on author input for a while (our current
threshold is roughly two weeks), so I've marked it Returned with
Feedback.
Once you think the patchset is ready for review again, you (or any
interested party) can resurrect the patch entry by visiting
https://commitfest.postgresql.org/38/3571/
and changing the status to "Needs Review", and then changing the
status again to "Move to next CF". (Don't forget the second step;
hopefully we will have streamlined this in the near future!)
Thanks,
--Jacob
Thank you for your work during commitfest.
The patch is still in development. Given vacation status, expect the next patches to be ready for the November commitfest.
For now it has moved to the September one. Further action will be taken then as needed.
Enjoy the rest of the summer!