Support for pg_receivexlog --format=plain|tar

Started by Michael Paquierabout 9 years ago20 messages
#1Michael Paquier
michael.paquier@gmail.com

Hi all,

Since 56c7d8d4, pg_basebackup supports tar format when streaming WAL
records. This has been done by introducing a new transparent routine
layer to control the method used to fetch WAL walmethods.c: plain or
tar.

pg_receivexlog does not make use of that yet, but I think that it
could to allow retention of more WAL history within the same amount of
disk space. OK, disk space is cheap but for some users things like
that matters to define a duration retention policy. Especially when
things are automated around Postgres. I really think that
pg_receivexlog should be able to support an option like
--format=plain|tar. "plain" is the default, and matches the current
behavior. This option is of course designed to match pg_basebackup's
one.

So, here is in details what would happen if --format=tar is done:
- When streaming begins, write changes to a tar stream, named
segnum.tar.partial as long as the segment is not completed.
- Once the segment completes, rename it to segnum.tar.
- each individual segment has its own tarball.
- if pg_receivexlog fails to receive changes in the middle of a
segment, it begins streaming back at the beginning of a segment,
considering that the current .partial segment is corrupted. So if
server comes back online, empty the current .partial file and begin
writing on it again. (I have found a bug on HEAD in this area
actually).

Magnus, you have mentioned me as well that you had a couple of ideas
on the matter, feel free to jump in and let's mix our thoughts!

There are a couple of things that I have been considering as well for
pg_receivexlog. Though they are not directly stick to this thread,
here they are as I don't forget about them:
- Removal of oldest WAL segments on a partition. When writing WAL
segments to a dedicated partition, we could have an option that
automatically removes the oldest WAL segment if the partition is full.
This triggers once a segment is completed.
- Compression of fully-written segments. When a segment is finished
being written, pg_receivexlog could compress them further with gz for
example. With --format=t this leads to segnum.tar.gz being generated.
The advantage of doing those two things in pg_receivexlog is
monitoring. One process to handle them all, and there is no need of
cron jobs to handle any cleanup or compression.

Thanks,
--
Michael

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#2Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#1)
Re: Support for pg_receivexlog --format=plain|tar

On Tue, Dec 27, 2016 at 2:23 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

Hi all,

Since 56c7d8d4, pg_basebackup supports tar format when streaming WAL
records. This has been done by introducing a new transparent routine
layer to control the method used to fetch WAL walmethods.c: plain or
tar.

pg_receivexlog does not make use of that yet, but I think that it
could to allow retention of more WAL history within the same amount of
disk space. OK, disk space is cheap but for some users things like
that matters to define a duration retention policy. Especially when
things are automated around Postgres. I really think that
pg_receivexlog should be able to support an option like
--format=plain|tar. "plain" is the default, and matches the current
behavior. This option is of course designed to match pg_basebackup's
one.

So, here is in details what would happen if --format=tar is done:
- When streaming begins, write changes to a tar stream, named
segnum.tar.partial as long as the segment is not completed.
- Once the segment completes, rename it to segnum.tar.
- each individual segment has its own tarball.
- if pg_receivexlog fails to receive changes in the middle of a
segment, it begins streaming back at the beginning of a segment,
considering that the current .partial segment is corrupted. So if
server comes back online, empty the current .partial file and begin
writing on it again. (I have found a bug on HEAD in this area
actually).

Magnus, you have mentioned me as well that you had a couple of ideas
on the matter, feel free to jump in and let's mix our thoughts!

Yeah, I've been wondering what the actual usecase is here :)

Though I was considering the case where all segments are streamed into the
same tarfile (and then some sort of configurable limit where we'd switch
tarfile after <n> segments, which rapidly started to feel too complicated).

What's the actual advantage of having it wrapped inside a single tarfile?

There are a couple of things that I have been considering as well for
pg_receivexlog. Though they are not directly stick to this thread,
here they are as I don't forget about them:
- Removal of oldest WAL segments on a partition. When writing WAL
segments to a dedicated partition, we could have an option that
automatically removes the oldest WAL segment if the partition is full.
This triggers once a segment is completed.
- Compression of fully-written segments. When a segment is finished
being written, pg_receivexlog could compress them further with gz for
example. With --format=t this leads to segnum.tar.gz being generated.
The advantage of doing those two things in pg_receivexlog is
monitoring. One process to handle them all, and there is no need of
cron jobs to handle any cleanup or compression.

I was at one point thinking that would be a good idea as well, but recently
I've more been thinking that what we should do is implement a
"--post-segment-command", which would act similar to archive_command but
started by pg_receivexlog. This could handle things like compression, and
also integration with external backup tools like backrest or barman in a
cleaner way. We could also spawn this without waiting for it to finish
immediately, which would allow parallellization of the process. When doing
the compression inline that rapidly becomes the bottleneck. Unlike a
basebackup you're only dealing with the need to buffer 16Mb on disk before
compressing it, so it should be fairly cheap.

Another thing I've been considering in the same area would be to add the
ability to write the segments to a pipe instead of a directory. Then you
could just pipe it into gzip without the need to buffer on disk. This would
kill the ability to know at which point we'd sync()ed to disk, but in most
cases so will doing direct gzip. Just means we couldn't support this in
sync mode.

I can see the point of being able to compress the individual segments
directly in pg_receivexlog in smaller systems though, without the need to
rely on an external compression program as well. But in that case, is there
any reason we need to wrap it in a tarfile, and can't just write it to
<segment>.gz natively?

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#3Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#2)
Re: Support for pg_receivexlog --format=plain|tar

On Tue, Dec 27, 2016 at 6:34 PM, Magnus Hagander <magnus@hagander.net> wrote:

On Tue, Dec 27, 2016 at 2:23 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

Magnus, you have mentioned me as well that you had a couple of ideas
on the matter, feel free to jump in and let's mix our thoughts!

Yeah, I've been wondering what the actual usecase is here :)

There is value to compress segments finishing with trailing zeros,
even if they are not the species with the highest representation in
the WAL archive.

Though I was considering the case where all segments are streamed into the
same tarfile (and then some sort of configurable limit where we'd switch
tarfile after <n> segments, which rapidly started to feel too complicated).

What's the actual advantage of having it wrapped inside a single tarfile?

I am advocating for one tar file per segment to be honest. Grouping
them makes the failure handling more complicated when connection to
the server is killed, or the replication stream is cut. Well, not
really complicated actually, because I think that you would need to
drop in the segment folder a status file with enough information to
let pg_receivexlog know from where in the tar file it needs to
continue writing. If a new tarball is created for each segment,
deciding from where to stream after a connection failure is just a
matter of doing what is done today: having a look at the completed
segments and begin streaming from the incomplete/absent one.

There are a couple of things that I have been considering as well for
pg_receivexlog. Though they are not directly stick to this thread,
here they are as I don't forget about them:
- Removal of oldest WAL segments on a partition. When writing WAL
segments to a dedicated partition, we could have an option that
automatically removes the oldest WAL segment if the partition is full.
This triggers once a segment is completed.
- Compression of fully-written segments. When a segment is finished
being written, pg_receivexlog could compress them further with gz for
example. With --format=t this leads to segnum.tar.gz being generated.
The advantage of doing those two things in pg_receivexlog is
monitoring. One process to handle them all, and there is no need of
cron jobs to handle any cleanup or compression.

I was at one point thinking that would be a good idea as well, but recently
I've more been thinking that what we should do is implement a
"--post-segment-command", which would act similar to archive_command but
started by pg_receivexlog. This could handle things like compression, and
also integration with external backup tools like backrest or barman in a
cleaner way. We could also spawn this without waiting for it to finish
immediately, which would allow parallellization of the process. When doing
the compression inline that rapidly becomes the bottleneck. Unlike a
basebackup you're only dealing with the need to buffer 16Mb on disk before
compressing it, so it should be fairly cheap.

I did not consider the case of barman and backrest to be honest,
having the view of 2ndQ folks and David would be nice here. Still, the
main idea behind those done by pg_receivexlog's process would be to
not spawn a new process. I have a class of users that care about
things that could hang, they play a lot with network-mounted disks...
And VMs of course.

Another thing I've been considering in the same area would be to add the
ability to write the segments to a pipe instead of a directory. Then you
could just pipe it into gzip without the need to buffer on disk. This would
kill the ability to know at which point we'd sync()ed to disk, but in most
cases so will doing direct gzip. Just means we couldn't support this in sync
mode.

Users piping their data don't care about reliability anyway. So that
is not a problem.

I can see the point of being able to compress the individual segments
directly in pg_receivexlog in smaller systems though, without the need to
rely on an external compression program as well. But in that case, is there
any reason we need to wrap it in a tarfile, and can't just write it to
<segment>.gz natively?

You mean having a --compress=0|9 option that creates individual gz
files for each segment? Definitely we could just do that. It would be
a shame though to not use the WAL methods you have introduced in
src/bin/pg_basebackup, with having the whole set tar and tar.gz. A
quick hack in pg_receivexlog has showed me that segments are saved in
a single tarball, which is not cool. My feeling is that using the
existing infrastructure, but making it pluggable for individual files
(in short I think that what is needed here is a way to tell the WAL
method to switch to a new file when a segment completes) would really
be the most simple one in terms of code lines and maintenance.
--
Michael

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#3)
Re: Support for pg_receivexlog --format=plain|tar

On Tue, Dec 27, 2016 at 1:16 PM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Tue, Dec 27, 2016 at 6:34 PM, Magnus Hagander <magnus@hagander.net>
wrote:

On Tue, Dec 27, 2016 at 2:23 AM, Michael Paquier <

michael.paquier@gmail.com>

wrote:

Magnus, you have mentioned me as well that you had a couple of ideas
on the matter, feel free to jump in and let's mix our thoughts!

Yeah, I've been wondering what the actual usecase is here :)

There is value to compress segments finishing with trailing zeros,
even if they are not the species with the highest representation in
the WAL archive.

Agreed on that part -- that's the value in compression though, and not
necessarily the TAR format itself.

Is there any value of the TAR format *without* compression in your scenario?

Though I was considering the case where all segments are streamed into

the

same tarfile (and then some sort of configurable limit where we'd switch
tarfile after <n> segments, which rapidly started to feel too

complicated).

What's the actual advantage of having it wrapped inside a single tarfile?

I am advocating for one tar file per segment to be honest. Grouping
them makes the failure handling more complicated when connection to
the server is killed, or the replication stream is cut. Well, not
really complicated actually, because I think that you would need to
drop in the segment folder a status file with enough information to
let pg_receivexlog know from where in the tar file it needs to
continue writing. If a new tarball is created for each segment,
deciding from where to stream after a connection failure is just a
matter of doing what is done today: having a look at the completed
segments and begin streaming from the incomplete/absent one.

This pretty much matches up with the conclusion I got to myself as well. We
could create a new tarfile for each restart of pg_receivexlog, but then it
becomes unpredictable.

There are a couple of things that I have been considering as well for
pg_receivexlog. Though they are not directly stick to this thread,
here they are as I don't forget about them:
- Removal of oldest WAL segments on a partition. When writing WAL
segments to a dedicated partition, we could have an option that
automatically removes the oldest WAL segment if the partition is full.
This triggers once a segment is completed.
- Compression of fully-written segments. When a segment is finished
being written, pg_receivexlog could compress them further with gz for
example. With --format=t this leads to segnum.tar.gz being generated.
The advantage of doing those two things in pg_receivexlog is
monitoring. One process to handle them all, and there is no need of
cron jobs to handle any cleanup or compression.

I was at one point thinking that would be a good idea as well, but

recently

I've more been thinking that what we should do is implement a
"--post-segment-command", which would act similar to archive_command but
started by pg_receivexlog. This could handle things like compression, and
also integration with external backup tools like backrest or barman in a
cleaner way. We could also spawn this without waiting for it to finish
immediately, which would allow parallellization of the process. When

doing

the compression inline that rapidly becomes the bottleneck. Unlike a
basebackup you're only dealing with the need to buffer 16Mb on disk

before

compressing it, so it should be fairly cheap.

I did not consider the case of barman and backrest to be honest,
having the view of 2ndQ folks and David would be nice here. Still, the
main idea behind those done by pg_receivexlog's process would be to
not spawn a new process. I have a class of users that care about
things that could hang, they play a lot with network-mounted disks...
And VMs of course.

I have been talking to David about it a couple of times, and he agreed that
it'd be useful to have a post-segment command. We haven't discussed it in
much detail though. I'll add him to direct-cc here to see if he has any
further input :)

It could be that the best idea is to just notify some other process of
what's happening. But making it an external command would give that a lot
of flexibility. Of course, we need to be careful not to put ourselves back
in the position we are in with archive_command, in that it's very difficult
to write a good one.

I'm sure everybody cares about things that could hang. But everything can
hang...

Another thing I've been considering in the same area would be to add the
ability to write the segments to a pipe instead of a directory. Then you
could just pipe it into gzip without the need to buffer on disk. This

would

kill the ability to know at which point we'd sync()ed to disk, but in

most

cases so will doing direct gzip. Just means we couldn't support this in

sync

mode.

Users piping their data don't care about reliability anyway. So that
is not a problem.

Good point. Same would be true about people who gzip it, wouldn't it?

I can see the point of being able to compress the individual segments
directly in pg_receivexlog in smaller systems though, without the need to
rely on an external compression program as well. But in that case, is

there

any reason we need to wrap it in a tarfile, and can't just write it to
<segment>.gz natively?

You mean having a --compress=0|9 option that creates individual gz
files for each segment? Definitely we could just do that. It would be

Yes, that's what I meant.

a shame though to not use the WAL methods you have introduced in
src/bin/pg_basebackup, with having the whole set tar and tar.gz. A
quick hack in pg_receivexlog has showed me that segments are saved in
a single tarball, which is not cool. My feeling is that using the
existing infrastructure, but making it pluggable for individual files
(in short I think that what is needed here is a way to tell the WAL
method to switch to a new file when a segment completes) would really
be the most simple one in terms of code lines and maintenance.

Much as I'd like to reuse that, I don't think that reusing that in itself
shold be the driver for how this should be decided. It should be the end
product.

To me it seems silly to create a directory full of tarfiles with a single
file in each. I don't particularly care about the fact that we added 512
bytes of wasted space to each, but we just created something that's
unnecessarily complicated for people to handle, didn't we? A plain
directory of .gz files is a lot easier to work with.

//Magnus

#5Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#4)
1 attachment(s)
Re: Support for pg_receivexlog --format=plain|tar

On Wed, Dec 28, 2016 at 3:12 AM, Magnus Hagander <magnus@hagander.net> wrote:

On Tue, Dec 27, 2016 at 1:16 PM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Tue, Dec 27, 2016 at 6:34 PM, Magnus Hagander <magnus@hagander.net>
wrote:

On Tue, Dec 27, 2016 at 2:23 AM, Michael Paquier
<michael.paquier@gmail.com>
wrote:

Magnus, you have mentioned me as well that you had a couple of ideas
on the matter, feel free to jump in and let's mix our thoughts!

Yeah, I've been wondering what the actual usecase is here :)

There is value to compress segments finishing with trailing zeros,
even if they are not the species with the highest representation in
the WAL archive.

Agreed on that part -- that's the value in compression though, and not
necessarily the TAR format itself.

Is there any value of the TAR format *without* compression in your scenario?

Hm. I cannot think of one.

I can see the point of being able to compress the individual segments
directly in pg_receivexlog in smaller systems though, without the need
to
rely on an external compression program as well. But in that case, is
there
any reason we need to wrap it in a tarfile, and can't just write it to
<segment>.gz natively?

You mean having a --compress=0|9 option that creates individual gz
files for each segment? Definitely we could just do that. It would be

Yes, that's what I meant.

OK, I have bitten the bullet and attached is a patch to add just
--compress=0|9. So there is one .gz file generated per segment, and
things are rather straight-forward. Adding tests is unfortunately not
in scope as not all builds are compiled with libz.

A couple of things to be aware of though:
- gzopen, gzwrite and gzclose are used to handle the gz files. That's
unconsistent with the tar method that is based on mainly deflate() and
more low level routines.
- I have switched the directory method to use a file pointer instead
of a file descriptor as gzwrite returns int as the number of
uncompressed bytes written.
- history and timeline files are gzip'd as well. Perhaps we don't want
to do that.

What do you think about this approach? I'll add that to the next CF.
--
Michael

Attachments:

receivexlog-gzip-v1.patchinvalid/octet-stream; name=receivexlog-gzip-v1.patchDownload
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index bfa055b58b..8c1ea9a2e2 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 79b899a343..a2030ce040 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -473,7 +473,7 @@ LogStreamerMain(logstreamer_param *param)
 	stream.partial_suffix = NULL;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
 	else
 		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 99445e6584..3b6b600771 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int	verbose = 0;
+static int	compresslevel = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -75,6 +76,7 @@ usage(void)
 	printf(_("      --synchronous      flush transaction log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -338,7 +340,8 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = true;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+												stream.do_sync);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
@@ -389,6 +392,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
 		{"drop-slot", no_argument, NULL, 2},
@@ -419,7 +423,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -469,6 +473,15 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'Z':
+				compresslevel = atoi(optarg);
+				if (compresslevel < 0 || compresslevel > 9)
+				{
+					fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 /* action */
 			case 1:
 				do_create_slot = true;
@@ -535,6 +548,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+#ifndef HAVE_LIBZ
+	if (compresslevel != 0)
+	{
+		fprintf(stderr,
+				_("%s: this build does not support compression\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 4382e5d76a..28daf6cb25 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -284,7 +284,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 		return false;
 	}
 
-	if ((int) stream->walmethod->write(f, content, size) != size)
+	if (stream->walmethod->write(f, content, size) != size)
 	{
 		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
 				progname, histfname, stream->walmethod->getlasterror());
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 40c8a5c697..495c0ad6ed 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -27,7 +27,7 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
-/* Size of zlib buffer for .tar.gz */
+/* Size of zlib buffer for .tar.gz and .gz */
 #define ZLIB_OUT_SIZE 4096
 
 /*-------------------------------------------------------------------------
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
 	char	   *basedir;
+	int			compression;
 	bool		sync;
 }	DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -50,11 +51,14 @@ static DirectoryMethodData *dir_data = NULL;
  */
 typedef struct DirectoryMethodFile
 {
-	int			fd;
+	FILE	   *fp;
 	off_t		currpos;
 	char	   *pathname;
 	char	   *fullpath;
 	char	   *temp_suffix;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 }	DirectoryMethodFile;
 
 static char *
@@ -68,17 +72,36 @@ static Walfile
 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
 {
 	static char tmppath[MAXPGPATH];
-	int			fd;
+	FILE	   *fp;
 	DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 
-	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+			 dir_data->basedir, pathname,
+			 dir_data->compression > 0 ? ".gz" : "",
+			 temp_suffix ? temp_suffix : "");
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
-		return NULL;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		char        mode_compression[32];
+		snprintf(mode_compression, sizeof(mode_compression), "%s%d",
+				 PG_BINARY_W, dir_data->compression);
+		gzfp = gzopen(tmppath, mode_compression);
+		if (gzfp == NULL)
+			return NULL;
+	}
+	else
+#endif
+	{
+		fp = fopen(tmppath, PG_BINARY_W);
+		if (fp == NULL)
+			return NULL;
+	}
 
-	if (pad_to_size)
+	if (pad_to_size && dir_data->compression == 0)
 	{
 		/* Always pre-pad on regular files */
 		char	   *zerobuf;
@@ -87,23 +110,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		zerobuf = pg_malloc0(XLOG_BLCKSZ);
 		for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
 		{
-			if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+			if (write(fileno(fp), zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
 			{
 				int			save_errno = errno;
 
 				pg_free(zerobuf);
-				close(fd);
+				fclose(fp);
 				errno = save_errno;
 				return NULL;
 			}
 		}
 		pg_free(zerobuf);
 
-		if (lseek(fd, 0, SEEK_SET) != 0)
+		if (fseek(fp, 0, SEEK_SET) != 0)
 		{
 			int			save_errno = errno;
 
-			close(fd);
+			fclose(fp);
 			errno = save_errno;
 			return NULL;
 		}
@@ -120,13 +143,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		if (fsync_fname(tmppath, false, progname) != 0 ||
 			fsync_parent_path(tmppath, progname) != 0)
 		{
-			close(fd);
+#ifdef HAVE_LIBZ
+			if (dir_data->compression > 0)
+				gzclose(gzfp);
+			else
+#endif
+				fclose(fp);
 			return NULL;
 		}
 	}
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
-	f->fd = fd;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		f->gzfp = gzfp;
+	else
+#endif
+		f->fp = fp;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
 	f->fullpath = pg_strdup(tmppath);
@@ -136,7 +169,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	return f;
 }
 
-static ssize_t
+static int
 dir_write(Walfile f, const void *buf, size_t count)
 {
 	ssize_t		r;
@@ -144,7 +177,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
 	Assert(f != NULL);
 
-	r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzwrite(df->gzfp, buf, count);
+	else
+#endif
+		r = write(fileno(df->fp), buf, count);
 	if (r > 0)
 		df->currpos += r;
 	return r;
@@ -169,7 +207,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
 	Assert(f != NULL);
 
-	r = close(df->fd);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzclose(df->gzfp);
+	else
+#endif
+		r = fclose(df->fp);
 
 	if (r == 0)
 	{
@@ -180,17 +223,22 @@ dir_close(Walfile f, WalCloseMethod method)
 			 * If we have a temp prefix, normal operation is to rename the
 			 * file.
 			 */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix);
-			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-					 dir_data->basedir, df->pathname);
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "");
 			r = durable_rename(tmppath, tmppath2, progname);
 		}
 		else if (method == CLOSE_UNLINK)
 		{
 			/* Unlink the file once it's closed */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
 		else
@@ -226,7 +274,7 @@ dir_sync(Walfile f)
 	if (!dir_data->sync)
 		return 0;
 
-	return fsync(((DirectoryMethodFile *) f)->fd);
+	return fsync(fileno(((DirectoryMethodFile *) f)->fp));
 }
 
 static ssize_t
@@ -277,7 +325,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -293,6 +341,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
 
@@ -398,10 +447,10 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
 }
 #endif
 
-static ssize_t
+static int
 tar_write(Walfile f, const void *buf, size_t count)
 {
-	ssize_t		r;
+	int			r;
 
 	Assert(f != NULL);
 	tar_clear_error();
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index 8cea8ff4c0..8cdccee614 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -27,7 +27,7 @@ struct WalWriteMethod
 	bool		(*existsfile) (const char *pathname);
 	ssize_t		(*get_file_size) (const char *pathname);
 
-	ssize_t		(*write) (Walfile f, const void *buf, size_t count);
+	int			(*write) (Walfile f, const void *buf, size_t count);
 	off_t		(*get_current_pos) (Walfile f);
 	int			(*sync) (Walfile f);
 	bool		(*finish) (void);
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *						   (only implements the methods required for pg_basebackup,
  *						   not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
#6Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#5)
Re: Support for pg_receivexlog --format=plain|tar

On Wed, Dec 28, 2016 at 6:58 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Wed, Dec 28, 2016 at 3:12 AM, Magnus Hagander <magnus@hagander.net>
wrote:

On Tue, Dec 27, 2016 at 1:16 PM, Michael Paquier <

michael.paquier@gmail.com>

wrote:

On Tue, Dec 27, 2016 at 6:34 PM, Magnus Hagander <magnus@hagander.net>
wrote:

On Tue, Dec 27, 2016 at 2:23 AM, Michael Paquier
<michael.paquier@gmail.com>
wrote:

Magnus, you have mentioned me as well that you had a couple of ideas
on the matter, feel free to jump in and let's mix our thoughts!

Yeah, I've been wondering what the actual usecase is here :)

There is value to compress segments finishing with trailing zeros,
even if they are not the species with the highest representation in
the WAL archive.

Agreed on that part -- that's the value in compression though, and not
necessarily the TAR format itself.

Is there any value of the TAR format *without* compression in your

scenario?

Hm. I cannot think of one.

I can see the point of being able to compress the individual segments
directly in pg_receivexlog in smaller systems though, without the need
to
rely on an external compression program as well. But in that case, is
there
any reason we need to wrap it in a tarfile, and can't just write it to
<segment>.gz natively?

You mean having a --compress=0|9 option that creates individual gz
files for each segment? Definitely we could just do that. It would be

Yes, that's what I meant.

OK, I have bitten the bullet and attached is a patch to add just
--compress=0|9. So there is one .gz file generated per segment, and
things are rather straight-forward. Adding tests is unfortunately not
in scope as not all builds are compiled with libz.

Conditional tests? It probably wouldn't hurt to have them, but that would
be something more generic (like we'd need something to actually validate it
-- but it would make sense to have a test that, with compression enabled,
would verify if the uncompressed file turns out to be exactly 16Mb for
example).

A couple of things to be aware of though:
- gzopen, gzwrite and gzclose are used to handle the gz files. That's
unconsistent with the tar method that is based on mainly deflate() and
more low level routines.

But chosen for simplicity, I assume?

- I have switched the directory method to use a file pointer instead
of a file descriptor as gzwrite returns int as the number of
uncompressed bytes written.

I don't really follow that reasoning :) Why does the directory method have
to change to use a filepointer because of that?

- history and timeline files are gzip'd as well. Perhaps we don't want
to do that.

I think it makes sense to compress everything.

What do you think about this approach? I'll add that to the next CF.

I haven't reviweed the code in detail but yes, I think this approach is the
right one.

//Magnus

#7Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#6)
Re: Support for pg_receivexlog --format=plain|tar

On Wed, Dec 28, 2016 at 9:31 PM, Magnus Hagander <magnus@hagander.net> wrote:

Conditional tests? It probably wouldn't hurt to have them, but that would be
something more generic (like we'd need something to actually validate it --
but it would make sense to have a test that, with compression enabled, would
verify if the uncompressed file turns out to be exactly 16Mb for example).

Looking at if the build is compiled with libz via SQL or using
pg_config is the way to go here. A minimum is doable.

A couple of things to be aware of though:
- gzopen, gzwrite and gzclose are used to handle the gz files. That's
unconsistent with the tar method that is based on mainly deflate() and
more low level routines.

But chosen for simplicity, I assume?

Yep. That's quite in-line with the current code.

- I have switched the directory method to use a file pointer instead
of a file descriptor as gzwrite returns int as the number of
uncompressed bytes written.

I don't really follow that reasoning :) Why does the directory method have
to change to use a filepointer because of that?

The only reason is that write() returns size_t and fwrite returns int,
while gzwrite() returns int. It seems more consistent to use fwrite()
in this case. Or we don't bother about my nitpicking and just cast
stuff.

What do you think about this approach? I'll add that to the next CF.

I haven't reviweed the code in detail but yes, I think this approach is the
right one.

OK, thanks. I'll think about those conditional tests, correct one or
two things in the patch and submit again soon.
--
Michael

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#7)
Re: Support for pg_receivexlog --format=plain|tar

On Thu, Dec 29, 2016 at 12:35 AM, Michael Paquier <michael.paquier@gmail.com

wrote:

On Wed, Dec 28, 2016 at 9:31 PM, Magnus Hagander <magnus@hagander.net>
wrote:

Conditional tests? It probably wouldn't hurt to have them, but that

would be

something more generic (like we'd need something to actually validate it

--

but it would make sense to have a test that, with compression enabled,

would

verify if the uncompressed file turns out to be exactly 16Mb for

example).

Looking at if the build is compiled with libz via SQL or using
pg_config is the way to go here. A minimum is doable.

A couple of things to be aware of though:
- gzopen, gzwrite and gzclose are used to handle the gz files. That's
unconsistent with the tar method that is based on mainly deflate() and
more low level routines.

But chosen for simplicity, I assume?

Yep. That's quite in-line with the current code.

Agreed.

- I have switched the directory method to use a file pointer instead
of a file descriptor as gzwrite returns int as the number of
uncompressed bytes written.

I don't really follow that reasoning :) Why does the directory method

have

to change to use a filepointer because of that?

The only reason is that write() returns size_t and fwrite returns int,
while gzwrite() returns int. It seems more consistent to use fwrite()
in this case. Or we don't bother about my nitpicking and just cast
stuff.

I can at least partially see that argument, but your patch doesn't actually
use fwrite(), it uses write() with fileno()...

But also, on my platform (debian jessie), fwrite() returns size_t, and
write() returns ssize_t. So those are apparently both different from what
your platform does - which one did you get that one?

(gzwrite does return int)

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#9Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#8)
1 attachment(s)
Re: Support for pg_receivexlog --format=plain|tar

On Thu, Dec 29, 2016 at 6:13 PM, Magnus Hagander <magnus@hagander.net> wrote:

On Thu, Dec 29, 2016 at 12:35 AM, Michael Paquier
<michael.paquier@gmail.com> wrote:

On Wed, Dec 28, 2016 at 9:31 PM, Magnus Hagander <magnus@hagander.net>
wrote:

- I have switched the directory method to use a file pointer instead
of a file descriptor as gzwrite returns int as the number of
uncompressed bytes written.

I don't really follow that reasoning :) Why does the directory method
have
to change to use a filepointer because of that?

The only reason is that write() returns size_t and fwrite returns int,
while gzwrite() returns int. It seems more consistent to use fwrite()
in this case. Or we don't bother about my nitpicking and just cast
stuff.

I can at least partially see that argument, but your patch doesn't actually
use fwrite(), it uses write() with fileno()...

That was part of the one/two things I wanted to change before sending
a fresh patch.

But also, on my platform (debian jessie), fwrite() returns size_t, and
write() returns ssize_t. So those are apparently both different from what
your platform does - which one did you get that one?

It looks like I misread the macos man pages previously. Thay actually
list ssize_t. I find a bit surprising the way gzwrite is designed. It
uses an input an unsigned integer and returns to caller a signed
integer, so this will never work with uncompressed buffers of sizes
higher than 2GB. There's little point to worry about that in
pg_receivexlog though, so let's just cast to ssize_t.

Attached is a simplified new version, I have kept the file descriptor
as originally done. Note that tests are actually difficult to work
out, there is no way to run in batch pg_receivexlog..
--
Michael

Attachments:

receivexlog-gzip-v2.patchapplication/stream; name=receivexlog-gzip-v2.patchDownload
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index bfa055b58b..8c1ea9a2e2 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 79b899a343..a2030ce040 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -473,7 +473,7 @@ LogStreamerMain(logstreamer_param *param)
 	stream.partial_suffix = NULL;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
 	else
 		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 99445e6584..3b6b600771 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int	verbose = 0;
+static int	compresslevel = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -75,6 +76,7 @@ usage(void)
 	printf(_("      --synchronous      flush transaction log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -338,7 +340,8 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = true;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+												stream.do_sync);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
@@ -389,6 +392,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
 		{"drop-slot", no_argument, NULL, 2},
@@ -419,7 +423,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -469,6 +473,15 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'Z':
+				compresslevel = atoi(optarg);
+				if (compresslevel < 0 || compresslevel > 9)
+				{
+					fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 /* action */
 			case 1:
 				do_create_slot = true;
@@ -535,6 +548,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+#ifndef HAVE_LIBZ
+	if (compresslevel != 0)
+	{
+		fprintf(stderr,
+				_("%s: this build does not support compression\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 40c8a5c697..7282d7429e 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
 	char	   *basedir;
+	int			compression;
 	bool		sync;
 }	DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
 	char	   *pathname;
 	char	   *fullpath;
 	char	   *temp_suffix;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 }	DirectoryMethodFile;
 
 static char *
@@ -70,15 +74,34 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	static char tmppath[MAXPGPATH];
 	int			fd;
 	DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 
-	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+			 dir_data->basedir, pathname,
+			 dir_data->compression > 0 ? ".gz" : "",
+			 temp_suffix ? temp_suffix : "");
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
-		return NULL;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		char        mode_compression[32];
+		snprintf(mode_compression, sizeof(mode_compression), "%s%d",
+				 PG_BINARY_W, dir_data->compression);
+		gzfp = gzopen(tmppath, mode_compression);
+		if (gzfp == NULL)
+			return NULL;
+	}
+	else
+#endif
+	{
+		fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (fd < 0)
+			return NULL;
+	}
 
-	if (pad_to_size)
+	if (pad_to_size && dir_data->compression == 0)
 	{
 		/* Always pre-pad on regular files */
 		char	   *zerobuf;
@@ -120,13 +143,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		if (fsync_fname(tmppath, false, progname) != 0 ||
 			fsync_parent_path(tmppath, progname) != 0)
 		{
-			close(fd);
+#ifdef HAVE_LIBZ
+			if (dir_data->compression > 0)
+				gzclose(gzfp);
+			else
+#endif
+				close(fd);
 			return NULL;
 		}
 	}
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
-	f->fd = fd;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		f->gzfp = gzfp;
+	else
+#endif
+		f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
 	f->fullpath = pg_strdup(tmppath);
@@ -144,7 +177,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
 	Assert(f != NULL);
 
-	r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = (ssize_t) gzwrite(df->gzfp, buf, count);
+	else
+#endif
+		r = write(df->fd, buf, count);
 	if (r > 0)
 		df->currpos += r;
 	return r;
@@ -169,7 +207,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
 	Assert(f != NULL);
 
-	r = close(df->fd);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzclose(df->gzfp);
+	else
+#endif
+		r = close(df->fd);
 
 	if (r == 0)
 	{
@@ -180,17 +223,22 @@ dir_close(Walfile f, WalCloseMethod method)
 			 * If we have a temp prefix, normal operation is to rename the
 			 * file.
 			 */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix);
-			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-					 dir_data->basedir, df->pathname);
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "");
 			r = durable_rename(tmppath, tmppath2, progname);
 		}
 		else if (method == CLOSE_UNLINK)
 		{
 			/* Unlink the file once it's closed */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
 		else
@@ -277,7 +325,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -293,6 +341,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
 
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index 8cea8ff4c0..5f549254e9 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *						   (only implements the methods required for pg_basebackup,
  *						   not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
#10Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#9)
Re: Support for pg_receivexlog --format=plain|tar

On Fri, Dec 30, 2016 at 6:41 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Thu, Dec 29, 2016 at 6:13 PM, Magnus Hagander <magnus@hagander.net>
wrote:

On Thu, Dec 29, 2016 at 12:35 AM, Michael Paquier
<michael.paquier@gmail.com> wrote:

On Wed, Dec 28, 2016 at 9:31 PM, Magnus Hagander <magnus@hagander.net>
wrote:

- I have switched the directory method to use a file pointer instead
of a file descriptor as gzwrite returns int as the number of
uncompressed bytes written.

I don't really follow that reasoning :) Why does the directory method
have
to change to use a filepointer because of that?

The only reason is that write() returns size_t and fwrite returns int,
while gzwrite() returns int. It seems more consistent to use fwrite()
in this case. Or we don't bother about my nitpicking and just cast
stuff.

I can at least partially see that argument, but your patch doesn't

actually

use fwrite(), it uses write() with fileno()...

That was part of the one/two things I wanted to change before sending
a fresh patch.

OK.

But also, on my platform (debian jessie), fwrite() returns size_t, and
write() returns ssize_t. So those are apparently both different from what
your platform does - which one did you get that one?

It looks like I misread the macos man pages previously. Thay actually
list ssize_t. I find a bit surprising the way gzwrite is designed. It
uses an input an unsigned integer and returns to caller a signed
integer, so this will never work with uncompressed buffers of sizes
higher than 2GB. There's little point to worry about that in
pg_receivexlog though, so let's just cast to ssize_t.

Attached is a simplified new version, I have kept the file descriptor
as originally done. Note that tests are actually difficult to work
out, there is no way to run in batch pg_receivexlog..

A few further notes:

You are using the filemode to gzopen and the mode_compression variable to
set the compression level. The pre-existing code in pg_basebackup uses
gzsetparams(). Is there a particular reason you didn't do it the same way?

Small comment:
-   if (pad_to_size)
+   if (pad_to_size && dir_data->compression == 0)
    {
        /* Always pre-pad on regular files */

That "always" is not true anymore. Commit-time cleanup can be done of that.

The rest of this looks good to me, but please comment on the gzopen part
before we proceed to commit :)

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#11David Steele
david@pgmasters.net
In reply to: Magnus Hagander (#10)
Re: Support for pg_receivexlog --format=plain|tar

On 1/6/17 9:07 AM, Magnus Hagander wrote:

On Fri, Dec 30, 2016 at 6:41 AM, Michael Paquier
<michael.paquier@gmail.com <mailto:michael.paquier@gmail.com>> wrote:
Attached is a simplified new version, I have kept the file descriptor
as originally done. Note that tests are actually difficult to work
out, there is no way to run in batch pg_receivexlog..

A few further notes:

You are using the filemode to gzopen and the mode_compression variable
to set the compression level. The pre-existing code in pg_basebackup
uses gzsetparams(). Is there a particular reason you didn't do it the
same way?

Small comment:
-   if (pad_to_size)
+   if (pad_to_size && dir_data->compression == 0)
{
/* Always pre-pad on regular files */

That "always" is not true anymore. Commit-time cleanup can be done of that.

The rest of this looks good to me, but please comment on the gzopen part
before we proceed to commit :)

I had planned to review this patch but have removed my name since it
seems to be well in hand and likely to commit very soon and I won't have
time to look at it until next week.

I will say that I'm happy to have this feature *and* eventually the
post_command.

--
-David
david@pgmasters.net

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#10)
1 attachment(s)
Re: Support for pg_receivexlog --format=plain|tar

On Fri, Jan 6, 2017 at 11:07 PM, Magnus Hagander <magnus@hagander.net> wrote:

A few further notes:

Thanks for the review.

You are using the filemode to gzopen and the mode_compression variable to
set the compression level. The pre-existing code in pg_basebackup uses
gzsetparams(). Is there a particular reason you didn't do it the same way?

Small comment:
-   if (pad_to_size)
+   if (pad_to_size && dir_data->compression == 0)
{
/* Always pre-pad on regular files */

That "always" is not true anymore. Commit-time cleanup can be done of that.

The rest of this looks good to me, but please comment on the gzopen part
before we proceed to commit :)

Yes using gzsetparams() looks cleaner. I actually thought about using
the same logic as pg_dump. Attached is an updated patch.

There is something I forgot. With this patch,
FindStreamingStart()@pg_receivexlog.c is actually broken. In short it
forgets to consider files that have been compressed at the last run of
pg_receivexlog and will try to stream changes from the beginning. I
can see that gzip -l provides this information... But I have yet to
find something in zlib that allows a cheap lookup as startup of
streaming should be fast. Looking at how gzip -l does it may be faster
than looking at the docs.
--
Michael

Attachments:

receivexlog-gzip-v3.patchtext/x-diff; charset=US-ASCII; name=receivexlog-gzip-v3.patchDownload
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index bfa055b58b..8c1ea9a2e2 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 3f83d87e50..87ab1bb92f 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -472,7 +472,7 @@ LogStreamerMain(logstreamer_param *param)
 	stream.partial_suffix = NULL;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
 	else
 		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index b6f57a878c..9d18f22c59 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int	verbose = 0;
+static int	compresslevel = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -75,6 +76,7 @@ usage(void)
 	printf(_("      --synchronous      flush transaction log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -338,7 +340,8 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = true;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+												stream.do_sync);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
@@ -389,6 +392,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
 		{"drop-slot", no_argument, NULL, 2},
@@ -419,7 +423,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -469,6 +473,15 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'Z':
+				compresslevel = atoi(optarg);
+				if (compresslevel < 0 || compresslevel > 9)
+				{
+					fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 /* action */
 			case 1:
 				do_create_slot = true;
@@ -535,6 +548,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+#ifndef HAVE_LIBZ
+	if (compresslevel != 0)
+	{
+		fprintf(stderr,
+				_("%s: this build does not support compression\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 88ee603b8b..f0726f845b 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
 	char	   *basedir;
+	int			compression;
 	bool		sync;
 }	DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
 	char	   *pathname;
 	char	   *fullpath;
 	char	   *temp_suffix;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 }	DirectoryMethodFile;
 
 static char *
@@ -70,17 +74,40 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	static char tmppath[MAXPGPATH];
 	int			fd;
 	DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 
-	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+			 dir_data->basedir, pathname,
+			 dir_data->compression > 0 ? ".gz" : "",
+			 temp_suffix ? temp_suffix : "");
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
-		return NULL;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		gzfp = gzopen(tmppath, "wb");
+		if (gzfp == NULL)
+			return NULL;
 
-	if (pad_to_size)
+		if (gzsetparams(gzfp, dir_data->compression,
+						Z_DEFAULT_STRATEGY) != Z_OK)
+		{
+			gzclose(gzfp);
+			return NULL;
+		}
+	}
+	else
+#endif
+	{
+		fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (fd < 0)
+			return NULL;
+	}
+
+	/* Do pre-padding on non-compressed files */
+	if (pad_to_size && dir_data->compression == 0)
 	{
-		/* Always pre-pad on regular files */
 		char	   *zerobuf;
 		int			bytes;
 
@@ -120,13 +147,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		if (fsync_fname(tmppath, false, progname) != 0 ||
 			fsync_parent_path(tmppath, progname) != 0)
 		{
-			close(fd);
+#ifdef HAVE_LIBZ
+			if (dir_data->compression > 0)
+				gzclose(gzfp);
+			else
+#endif
+				close(fd);
 			return NULL;
 		}
 	}
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
-	f->fd = fd;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		f->gzfp = gzfp;
+	else
+#endif
+		f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
 	f->fullpath = pg_strdup(tmppath);
@@ -144,7 +181,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
 	Assert(f != NULL);
 
-	r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = (ssize_t) gzwrite(df->gzfp, buf, count);
+	else
+#endif
+		r = write(df->fd, buf, count);
 	if (r > 0)
 		df->currpos += r;
 	return r;
@@ -169,7 +211,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
 	Assert(f != NULL);
 
-	r = close(df->fd);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzclose(df->gzfp);
+	else
+#endif
+		r = close(df->fd);
 
 	if (r == 0)
 	{
@@ -180,17 +227,22 @@ dir_close(Walfile f, WalCloseMethod method)
 			 * If we have a temp prefix, normal operation is to rename the
 			 * file.
 			 */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix);
-			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-					 dir_data->basedir, df->pathname);
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "");
 			r = durable_rename(tmppath, tmppath2, progname);
 		}
 		else if (method == CLOSE_UNLINK)
 		{
 			/* Unlink the file once it's closed */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
 		else
@@ -277,7 +329,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -293,6 +345,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
 
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index c1723d53b5..2cd8b6d755 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *						   (only implements the methods required for pg_basebackup,
  *						   not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
#13Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#12)
Re: Support for pg_receivexlog --format=plain|tar

On Sat, Jan 7, 2017 at 12:31 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Fri, Jan 6, 2017 at 11:07 PM, Magnus Hagander <magnus@hagander.net>
wrote:

A few further notes:

Thanks for the review.

You are using the filemode to gzopen and the mode_compression variable to
set the compression level. The pre-existing code in pg_basebackup uses
gzsetparams(). Is there a particular reason you didn't do it the same

way?

Small comment:
-   if (pad_to_size)
+   if (pad_to_size && dir_data->compression == 0)
{
/* Always pre-pad on regular files */

That "always" is not true anymore. Commit-time cleanup can be done of

that.

The rest of this looks good to me, but please comment on the gzopen part
before we proceed to commit :)

Yes using gzsetparams() looks cleaner. I actually thought about using
the same logic as pg_dump. Attached is an updated patch.

There is something I forgot. With this patch,
FindStreamingStart()@pg_receivexlog.c is actually broken. In short it
forgets to consider files that have been compressed at the last run of
pg_receivexlog and will try to stream changes from the beginning. I
can see that gzip -l provides this information... But I have yet to
find something in zlib that allows a cheap lookup as startup of
streaming should be fast. Looking at how gzip -l does it may be faster
than looking at the docs.

Do we really care though? As in, does statup of streaming have to be *that*
fast? Even gunziping 16Mb (worst case) doesn't exactly take a long time. If
your pg_receivexlog is restarting so often that it becomes a problem, I
think you already have another and much bigger problem on your hands.

I found another problem with it -- it is completely broken in sync mode.
You need to either forbid sync mode together with compression, or teach
dir_sync() about it. The later would sound better, but I wonder how much
that's going to kill compression due to the small blocks? Is it a
reasonable use-case?

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#14Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#13)
1 attachment(s)
Re: Support for pg_receivexlog --format=plain|tar

On Sat, Jan 7, 2017 at 8:19 PM, Magnus Hagander <magnus@hagander.net> wrote:

On Sat, Jan 7, 2017 at 12:31 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

There is something I forgot. With this patch,
FindStreamingStart()@pg_receivexlog.c is actually broken. In short it
forgets to consider files that have been compressed at the last run of
pg_receivexlog and will try to stream changes from the beginning. I
can see that gzip -l provides this information... But I have yet to
find something in zlib that allows a cheap lookup as startup of
streaming should be fast. Looking at how gzip -l does it may be faster
than looking at the docs.

Do we really care though? As in, does startup of streaming have to be *that*
fast? Even gunziping 16Mb (worst case) doesn't exactly take a long time. If
your pg_receivexlog is restarting so often that it becomes a problem, I
think you already have another and much bigger problem on your hands.

Based on some analysis, it is enough to look at the last 4 bytes of
the compressed file to get the size output data with a single call to
lseek() and then read(). So as there is a simple way to do things and
that's far cheaper than decompressing perhaps hundred of segments I'd
rather do it this way. Attached is the implementation. This code is
using 2 booleans for 4 states of the file names: full non-compressed,
partial non-compressed, full compressed and partial compressed. This
keeps the last check of FindStreamingStart() more simple, but that's
quite fancy lately to have an enum for such things :D

I found another problem with it -- it is completely broken in sync mode. You
need to either forbid sync mode together with compression, or teach
dir_sync() about it. The later would sound better, but I wonder how much
that's going to kill compression due to the small blocks? Is it a reasonable
use-case?

Hm. Looking at the docs I think that --compress defined with
--synchronous maps to the use of Z_SYNC_FLUSH with gzflush(). FWIW I
don't have a direct use case for it, but it is not a major effort to
add it, so done. There is no actual reason to forbid this combinations
of options either.
--
Michael

Attachments:

receivexlog-gzip-v4.patchtext/x-patch; charset=US-ASCII; name=receivexlog-gzip-v4.patchDownload
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index bfa055b58b..8c1ea9a2e2 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8ebf24e771..b9c0bb5fff 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -481,7 +481,7 @@ LogStreamerMain(logstreamer_param *param)
 	stream.partial_suffix = NULL;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
 	else
 		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index b6f57a878c..74fa5c68c0 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int	verbose = 0;
+static int	compresslevel = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -57,6 +58,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
 	exit(code);									\
 	}
 
+/* Routines to evaluate segment file format */
+#define IsCompressXLogFileName(fname)    \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") &&	\
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
+#define IsPartialCompressXLogFileName(fname)    \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") &&	\
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
 
 static void
 usage(void)
@@ -75,6 +85,7 @@ usage(void)
 	printf(_("      --synchronous      flush transaction log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -187,14 +198,31 @@ FindStreamingStart(uint32 *tli)
 		uint32		tli;
 		XLogSegNo	segno;
 		bool		ispartial;
+		bool		iscompress;
 
 		/*
 		 * Check if the filename looks like an xlog file, or a .partial file.
 		 */
 		if (IsXLogFileName(dirent->d_name))
+		{
 			ispartial = false;
+			iscompress = false;
+		}
 		else if (IsPartialXLogFileName(dirent->d_name))
+		{
+			ispartial = true;
+			iscompress = false;
+		}
+		else if (IsCompressXLogFileName(dirent->d_name))
+		{
+			ispartial = false;
+			iscompress = true;
+		}
+		else if (IsPartialCompressXLogFileName(dirent->d_name))
+		{
 			ispartial = true;
+			iscompress = true;
+		}
 		else
 			continue;
 
@@ -205,9 +233,11 @@ FindStreamingStart(uint32 *tli)
 
 		/*
 		 * Check that the segment has the right size, if it's supposed to be
-		 * completed.
+		 * completed.  For non-compressed segments just check the on-disk size.
+		 * For compressed segments, look at the last 4 bytes of the compressed
+		 * file and check the size of the uncompressed data.
 		 */
-		if (!ispartial)
+		if (!ispartial && !iscompress)
 		{
 			struct stat statbuf;
 			char		fullpath[MAXPGPATH];
@@ -228,6 +258,47 @@ FindStreamingStart(uint32 *tli)
 				continue;
 			}
 		}
+		else if (!ispartial && iscompress)
+		{
+			int		fd;
+			char	buf[4];
+			int		bytes_out;
+			char	fullpath[MAXPGPATH];
+
+			snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+			fd = open(fullpath, O_RDONLY | PG_BINARY);
+			if (fd < 0)
+			{
+				fprintf(stderr, _("%s: could not open file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+			if (lseek(fd, (off_t)(-4), SEEK_END) < 0)
+			{
+				fprintf(stderr, _("%s: could not seek file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+			if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
+			{
+				fprintf(stderr, _("%s: could not read file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+
+			close(fd);
+			bytes_out = (buf[3] << 24) | (buf[2] << 16) |
+						(buf[1] << 8) | buf[0];
+
+			if (bytes_out != XLOG_SEG_SIZE)
+			{
+				fprintf(stderr,
+						_("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
+						progname, dirent->d_name, bytes_out);
+				continue;
+			}
+		}
 
 		/* Looks like a valid segment. Remember that we saw it. */
 		if ((segno > high_segno) ||
@@ -338,7 +409,8 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = true;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+												stream.do_sync);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
@@ -389,6 +461,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
 		{"drop-slot", no_argument, NULL, 2},
@@ -419,7 +492,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -469,6 +542,15 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'Z':
+				compresslevel = atoi(optarg);
+				if (compresslevel < 0 || compresslevel > 9)
+				{
+					fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 /* action */
 			case 1:
 				do_create_slot = true;
@@ -535,6 +617,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+#ifndef HAVE_LIBZ
+	if (compresslevel != 0)
+	{
+		fprintf(stderr,
+				_("%s: this build does not support compression\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 88ee603b8b..b7d075f15e 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
 	char	   *basedir;
+	int			compression;
 	bool		sync;
 }	DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
 	char	   *pathname;
 	char	   *fullpath;
 	char	   *temp_suffix;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 }	DirectoryMethodFile;
 
 static char *
@@ -70,17 +74,40 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	static char tmppath[MAXPGPATH];
 	int			fd;
 	DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 
-	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+			 dir_data->basedir, pathname,
+			 dir_data->compression > 0 ? ".gz" : "",
+			 temp_suffix ? temp_suffix : "");
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
-		return NULL;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		gzfp = gzopen(tmppath, "wb");
+		if (gzfp == NULL)
+			return NULL;
 
-	if (pad_to_size)
+		if (gzsetparams(gzfp, dir_data->compression,
+						Z_DEFAULT_STRATEGY) != Z_OK)
+		{
+			gzclose(gzfp);
+			return NULL;
+		}
+	}
+	else
+#endif
+	{
+		fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (fd < 0)
+			return NULL;
+	}
+
+	/* Do pre-padding on non-compressed files */
+	if (pad_to_size && dir_data->compression == 0)
 	{
-		/* Always pre-pad on regular files */
 		char	   *zerobuf;
 		int			bytes;
 
@@ -120,13 +147,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		if (fsync_fname(tmppath, false, progname) != 0 ||
 			fsync_parent_path(tmppath, progname) != 0)
 		{
-			close(fd);
+#ifdef HAVE_LIBZ
+			if (dir_data->compression > 0)
+				gzclose(gzfp);
+			else
+#endif
+				close(fd);
 			return NULL;
 		}
 	}
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
-	f->fd = fd;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		f->gzfp = gzfp;
+	else
+#endif
+		f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
 	f->fullpath = pg_strdup(tmppath);
@@ -144,7 +181,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
 	Assert(f != NULL);
 
-	r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = (ssize_t) gzwrite(df->gzfp, buf, count);
+	else
+#endif
+		r = write(df->fd, buf, count);
 	if (r > 0)
 		df->currpos += r;
 	return r;
@@ -169,7 +211,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
 	Assert(f != NULL);
 
-	r = close(df->fd);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzclose(df->gzfp);
+	else
+#endif
+		r = close(df->fd);
 
 	if (r == 0)
 	{
@@ -180,17 +227,22 @@ dir_close(Walfile f, WalCloseMethod method)
 			 * If we have a temp prefix, normal operation is to rename the
 			 * file.
 			 */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix);
-			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-					 dir_data->basedir, df->pathname);
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "");
 			r = durable_rename(tmppath, tmppath2, progname);
 		}
 		else if (method == CLOSE_UNLINK)
 		{
 			/* Unlink the file once it's closed */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
 		else
@@ -226,6 +278,15 @@ dir_sync(Walfile f)
 	if (!dir_data->sync)
 		return 0;
 
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
+			return -1;
+		return 0;
+	}
+#endif
+
 	return fsync(((DirectoryMethodFile *) f)->fd);
 }
 
@@ -277,7 +338,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -293,6 +354,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
 
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index c1723d53b5..2cd8b6d755 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *						   (only implements the methods required for pg_basebackup,
  *						   not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
#15Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#14)
Re: Support for pg_receivexlog --format=plain|tar

On Tue, Jan 10, 2017 at 3:01 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Sat, Jan 7, 2017 at 8:19 PM, Magnus Hagander <magnus@hagander.net>
wrote:

On Sat, Jan 7, 2017 at 12:31 AM, Michael Paquier <

michael.paquier@gmail.com>

wrote:

There is something I forgot. With this patch,
FindStreamingStart()@pg_receivexlog.c is actually broken. In short it
forgets to consider files that have been compressed at the last run of
pg_receivexlog and will try to stream changes from the beginning. I
can see that gzip -l provides this information... But I have yet to
find something in zlib that allows a cheap lookup as startup of
streaming should be fast. Looking at how gzip -l does it may be faster
than looking at the docs.

Do we really care though? As in, does startup of streaming have to be

*that*

fast? Even gunziping 16Mb (worst case) doesn't exactly take a long time.

If

your pg_receivexlog is restarting so often that it becomes a problem, I
think you already have another and much bigger problem on your hands.

Based on some analysis, it is enough to look at the last 4 bytes of
the compressed file to get the size output data with a single call to
lseek() and then read(). So as there is a simple way to do things and
that's far cheaper than decompressing perhaps hundred of segments I'd
rather do it this way. Attached is the implementation. This code is
using 2 booleans for 4 states of the file names: full non-compressed,
partial non-compressed, full compressed and partial compressed. This
keeps the last check of FindStreamingStart() more simple, but that's
quite fancy lately to have an enum for such things :D

I think you need to document that analysis at least in a code comment. I
assume this is in reference to the ISIZE member of the gzip format?

I was going to say what happens if the file is corrupt and we get random
junk data there, but as we only compare it to XlogSegSize that should be
safe. But we might want to put a note in about that too?

Finally, I think we should make the error message clearly say "compressed
segment file" - just to make things extra clear.

I found another problem with it -- it is completely broken in sync mode.

You

need to either forbid sync mode together with compression, or teach
dir_sync() about it. The later would sound better, but I wonder how much
that's going to kill compression due to the small blocks? Is it a

reasonable

use-case?

Hm. Looking at the docs I think that --compress defined with
--synchronous maps to the use of Z_SYNC_FLUSH with gzflush(). FWIW I
don't have a direct use case for it, but it is not a major effort to
add it, so done. There is no actual reason to forbid this combinations
of options either.

It is enough to just gzflush(), don't we also need to still fsync()? I

can't see any documentation that gzflush() does that, and a quick look at
the code of zlib indicates it doesn't (but I didn't check in detail, so if
you did please point me to it).

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#16Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#15)
1 attachment(s)
Re: Support for pg_receivexlog --format=plain|tar

On Sun, Jan 15, 2017 at 1:31 AM, Magnus Hagander <magnus@hagander.net> wrote:

On Tue, Jan 10, 2017 at 3:01 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

Based on some analysis, it is enough to look at the last 4 bytes of
the compressed file to get the size output data with a single call to
lseek() and then read(). So as there is a simple way to do things and
that's far cheaper than decompressing perhaps hundred of segments I'd
rather do it this way. Attached is the implementation. This code is
using 2 booleans for 4 states of the file names: full non-compressed,
partial non-compressed, full compressed and partial compressed. This
keeps the last check of FindStreamingStart() more simple, but that's
quite fancy lately to have an enum for such things :D

I think you need to document that analysis at least in a code comment. I
assume this is in reference to the ISIZE member of the gzip format?

Good question. I am not sure on this one.

I was going to say what happens if the file is corrupt and we get random
junk data there, but as we only compare it to XlogSegSize that should be
safe. But we might want to put a note in about that too?

Perhaps. I have made a better try in FindStreamingStart.

Finally, I think we should make the error message clearly say "compressed
segment file" - just to make things extra clear.

Sure.

I found another problem with it -- it is completely broken in sync mode.
You
need to either forbid sync mode together with compression, or teach
dir_sync() about it. The later would sound better, but I wonder how much
that's going to kill compression due to the small blocks? Is it a
reasonable
use-case?

Hm. Looking at the docs I think that --compress defined with
--synchronous maps to the use of Z_SYNC_FLUSH with gzflush(). FWIW I
don't have a direct use case for it, but it is not a major effort to
add it, so done. There is no actual reason to forbid this combinations
of options either.

It is enough to just gzflush(), don't we also need to still fsync()? I can't
see any documentation that gzflush() does that, and a quick look at the code
of zlib indicates it doesn't (but I didn't check in detail, so if you did
please point me to it).

Hm. It looks that you are right. zlib goes down to _tr_flush_bits() to
flush some output, but this finishes only with put_byte(). As the fd
is opaque in gzFile, it would be just better to open() the file first,
and then use gzdopen to get the gzFile. Let's use as well the existing
fd field to save it. gzclose() closes as well the parent fd per the
documentation of zlib.
--
Michael

Attachments:

receivexlog-gzip-v5.patchapplication/stream; name=receivexlog-gzip-v5.patchDownload
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index bfa055b58b..8c1ea9a2e2 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8ebf24e771..b9c0bb5fff 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -481,7 +481,7 @@ LogStreamerMain(logstreamer_param *param)
 	stream.partial_suffix = NULL;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
 	else
 		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index b6f57a878c..6709206dda 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int	verbose = 0;
+static int	compresslevel = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -57,6 +58,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
 	exit(code);									\
 	}
 
+/* Routines to evaluate segment file format */
+#define IsCompressXLogFileName(fname)    \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") &&	\
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
+#define IsPartialCompressXLogFileName(fname)    \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") &&	\
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
 
 static void
 usage(void)
@@ -75,6 +85,7 @@ usage(void)
 	printf(_("      --synchronous      flush transaction log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -187,14 +198,31 @@ FindStreamingStart(uint32 *tli)
 		uint32		tli;
 		XLogSegNo	segno;
 		bool		ispartial;
+		bool		iscompress;
 
 		/*
 		 * Check if the filename looks like an xlog file, or a .partial file.
 		 */
 		if (IsXLogFileName(dirent->d_name))
+		{
 			ispartial = false;
+			iscompress = false;
+		}
 		else if (IsPartialXLogFileName(dirent->d_name))
+		{
+			ispartial = true;
+			iscompress = false;
+		}
+		else if (IsCompressXLogFileName(dirent->d_name))
+		{
+			ispartial = false;
+			iscompress = true;
+		}
+		else if (IsPartialCompressXLogFileName(dirent->d_name))
+		{
 			ispartial = true;
+			iscompress = true;
+		}
 		else
 			continue;
 
@@ -205,9 +233,14 @@ FindStreamingStart(uint32 *tli)
 
 		/*
 		 * Check that the segment has the right size, if it's supposed to be
-		 * completed.
+		 * completed.  For non-compressed segments just check the on-disk size
+		 * and see if it matches a completed segment.
+		 * For compressed segments, look at the last 4 bytes of the compressed
+		 * file, which is where the uncompressed size is located for gz files
+		 * with a size lower than 4GB, and then compare it to the size of a
+		 * completed segment.
 		 */
-		if (!ispartial)
+		if (!ispartial && !iscompress)
 		{
 			struct stat statbuf;
 			char		fullpath[MAXPGPATH];
@@ -228,6 +261,48 @@ FindStreamingStart(uint32 *tli)
 				continue;
 			}
 		}
+		else if (!ispartial && iscompress)
+		{
+			int		fd;
+			char	buf[4];
+			int		bytes_out;
+			char	fullpath[MAXPGPATH];
+
+			snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+			fd = open(fullpath, O_RDONLY | PG_BINARY);
+			if (fd < 0)
+			{
+				fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+			if (lseek(fd, (off_t)(-4), SEEK_END) < 0)
+			{
+				fprintf(stderr, _("%s: could not seek compressed file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+			if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
+			{
+				fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+
+			close(fd);
+			bytes_out = (buf[3] << 24) | (buf[2] << 16) |
+						(buf[1] << 8) | buf[0];
+
+			if (bytes_out != XLOG_SEG_SIZE)
+			{
+				fprintf(stderr,
+						_("%s: %s segment file \"%s\" has incorrect size %d, skipping\n"),
+						iscompress ? "compressed" : "",
+						progname, dirent->d_name, bytes_out);
+				continue;
+			}
+		}
 
 		/* Looks like a valid segment. Remember that we saw it. */
 		if ((segno > high_segno) ||
@@ -338,7 +413,8 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = true;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+												stream.do_sync);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
@@ -389,6 +465,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
 		{"drop-slot", no_argument, NULL, 2},
@@ -419,7 +496,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -469,6 +546,15 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'Z':
+				compresslevel = atoi(optarg);
+				if (compresslevel < 0 || compresslevel > 9)
+				{
+					fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 /* action */
 			case 1:
 				do_create_slot = true;
@@ -535,6 +621,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+#ifndef HAVE_LIBZ
+	if (compresslevel != 0)
+	{
+		fprintf(stderr,
+				_("%s: this build does not support compression\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 88ee603b8b..ebf8a72cf4 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
 	char	   *basedir;
+	int			compression;
 	bool		sync;
 }	DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
 	char	   *pathname;
 	char	   *fullpath;
 	char	   *temp_suffix;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 }	DirectoryMethodFile;
 
 static char *
@@ -70,17 +74,47 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	static char tmppath[MAXPGPATH];
 	int			fd;
 	DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 
-	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+			 dir_data->basedir, pathname,
+			 dir_data->compression > 0 ? ".gz" : "",
+			 temp_suffix ? temp_suffix : "");
 
+	/*
+	 * Open a file for non-compressed as well as compressed files. Tracking
+	 * the file descriptor is important for dir_sync() method as gzflush()
+	 * does not do any system calls to fsync() to make changes permanent on
+	 * disk.
+	 */
 	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
 	if (fd < 0)
 		return NULL;
 
-	if (pad_to_size)
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		gzfp = gzdopen(fd, "wb");
+		if (gzfp == NULL)
+		{
+			close(fd);
+			return NULL;
+		}
+
+		if (gzsetparams(gzfp, dir_data->compression,
+						Z_DEFAULT_STRATEGY) != Z_OK)
+		{
+			gzclose(gzfp);
+			return NULL;
+		}
+	}
+#endif
+
+	/* Do pre-padding on non-compressed files */
+	if (pad_to_size && dir_data->compression == 0)
 	{
-		/* Always pre-pad on regular files */
 		char	   *zerobuf;
 		int			bytes;
 
@@ -120,12 +154,21 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		if (fsync_fname(tmppath, false, progname) != 0 ||
 			fsync_parent_path(tmppath, progname) != 0)
 		{
-			close(fd);
+#ifdef HAVE_LIBZ
+			if (dir_data->compression > 0)
+				gzclose(gzfp);
+			else
+#endif
+				close(fd);
 			return NULL;
 		}
 	}
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		f->gzfp = gzfp;
+#endif
 	f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
@@ -144,7 +187,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
 	Assert(f != NULL);
 
-	r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = (ssize_t) gzwrite(df->gzfp, buf, count);
+	else
+#endif
+		r = write(df->fd, buf, count);
 	if (r > 0)
 		df->currpos += r;
 	return r;
@@ -169,7 +217,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
 	Assert(f != NULL);
 
-	r = close(df->fd);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzclose(df->gzfp);
+	else
+#endif
+		r = close(df->fd);
 
 	if (r == 0)
 	{
@@ -180,17 +233,22 @@ dir_close(Walfile f, WalCloseMethod method)
 			 * If we have a temp prefix, normal operation is to rename the
 			 * file.
 			 */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix);
-			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-					 dir_data->basedir, df->pathname);
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "");
 			r = durable_rename(tmppath, tmppath2, progname);
 		}
 		else if (method == CLOSE_UNLINK)
 		{
 			/* Unlink the file once it's closed */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
 		else
@@ -226,6 +284,14 @@ dir_sync(Walfile f)
 	if (!dir_data->sync)
 		return 0;
 
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
+			return -1;
+	}
+#endif
+
 	return fsync(((DirectoryMethodFile *) f)->fd);
 }
 
@@ -277,7 +343,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -293,6 +359,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
 
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index c1723d53b5..2cd8b6d755 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *						   (only implements the methods required for pg_basebackup,
  *						   not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
#17Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#16)
Re: Support for pg_receivexlog --format=plain|tar

On Sun, Jan 15, 2017 at 6:44 AM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Sun, Jan 15, 2017 at 1:31 AM, Magnus Hagander <magnus@hagander.net>
wrote:

On Tue, Jan 10, 2017 at 3:01 AM, Michael Paquier <

michael.paquier@gmail.com>

wrote:

Based on some analysis, it is enough to look at the last 4 bytes of
the compressed file to get the size output data with a single call to
lseek() and then read(). So as there is a simple way to do things and
that's far cheaper than decompressing perhaps hundred of segments I'd
rather do it this way. Attached is the implementation. This code is
using 2 booleans for 4 states of the file names: full non-compressed,
partial non-compressed, full compressed and partial compressed. This
keeps the last check of FindStreamingStart() more simple, but that's
quite fancy lately to have an enum for such things :D

I think you need to document that analysis at least in a code comment. I
assume this is in reference to the ISIZE member of the gzip format?

Good question. I am not sure on this one.

Where did your research point to then? :) I just read the gzip rfc (
http://www.zlib.org/rfc-gzip.html) which seems to call it that at least?

I was going to say what happens if the file is corrupt and we get random
junk data there, but as we only compare it to XlogSegSize that should be
safe. But we might want to put a note in about that too?

Perhaps. I have made a better try in FindStreamingStart.

Looks much better now. I think that one is fine as it is now.

Finally, I think we should make the error message clearly say "compressed
segment file" - just to make things extra clear.

Sure.

AFAICT the
+ iscompress ? "compressed" : "",
part of the error handling is unnecessary, because iscompressed will always
be true in that block. All the other error messages in that codepath has
compressed hardcoded in them, as should this one.

I found another problem with it -- it is completely broken in sync

mode.

You
need to either forbid sync mode together with compression, or teach
dir_sync() about it. The later would sound better, but I wonder how

much

that's going to kill compression due to the small blocks? Is it a
reasonable
use-case?

Hm. Looking at the docs I think that --compress defined with
--synchronous maps to the use of Z_SYNC_FLUSH with gzflush(). FWIW I
don't have a direct use case for it, but it is not a major effort to
add it, so done. There is no actual reason to forbid this combinations
of options either.

It is enough to just gzflush(), don't we also need to still fsync()? I

can't

see any documentation that gzflush() does that, and a quick look at the

code

of zlib indicates it doesn't (but I didn't check in detail, so if you did
please point me to it).

Hm. It looks that you are right. zlib goes down to _tr_flush_bits() to
flush some output, but this finishes only with put_byte(). As the fd
is opaque in gzFile, it would be just better to open() the file first,
and then use gzdopen to get the gzFile. Let's use as well the existing
fd field to save it. gzclose() closes as well the parent fd per the
documentation of zlib.

This version throws a warning:
walmethods.c: In function ‘dir_open_for_write’:
walmethods.c:170:11: warning: ‘gzfp’ may be used uninitialized in this
function [-Wmaybe-uninitialized]
f->gzfp = gzfp;

I can't see that there is any code path where this can actually happen
though, so we should probably just initialize it to NULL at variable
declaration. Or do you see a path where this could actually be incorrect?

If you agree with those two comments, I will go ahead and push with those
minor fixes.

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#18Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#17)
Re: Support for pg_receivexlog --format=plain|tar

On Mon, Jan 16, 2017 at 9:12 PM, Magnus Hagander <magnus@hagander.net> wrote:

Where did your research point to then? :) I just read the gzip rfc
(http://www.zlib.org/rfc-gzip.html) which seems to call it that at least?

Well, OK. I was not aware of this RFC. I guessed it by looking at the
code of gzip, that uses the CRC as well. I also found some reference
into a blog post.

Finally, I think we should make the error message clearly say
"compressed
segment file" - just to make things extra clear.

Sure.

AFAICT the
+ iscompress ? "compressed" : "",
part of the error handling is unnecessary, because iscompressed will always
be true in that block. All the other error messages in that codepath has
compressed hardcoded in them, as should this one.

Fat-fingered here..

Hm. It looks that you are right. zlib goes down to _tr_flush_bits() to
flush some output, but this finishes only with put_byte(). As the fd
is opaque in gzFile, it would be just better to open() the file first,
and then use gzdopen to get the gzFile. Let's use as well the existing
fd field to save it. gzclose() closes as well the parent fd per the
documentation of zlib.

This version throws a warning:
walmethods.c: In function ‘dir_open_for_write’:
walmethods.c:170:11: warning: ‘gzfp’ may be used uninitialized in this
function [-Wmaybe-uninitialized]
f->gzfp = gzfp;

gcc and clang did not complain here, what did you use?

I can't see that there is any code path where this can actually happen
though, so we should probably just initialize it to NULL at variable
declaration. Or do you see a path where this could actually be incorrect?

Not that I see. All the code paths using gzfp are under
data_dir->compression > 0.

If you agree with those two comments, I will go ahead and push with those
minor fixes.

No problem for me, thanks for the review!
--
Michael

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#19Magnus Hagander
magnus@hagander.net
In reply to: Michael Paquier (#18)
Re: Support for pg_receivexlog --format=plain|tar

On Mon, Jan 16, 2017 at 1:46 PM, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Mon, Jan 16, 2017 at 9:12 PM, Magnus Hagander <magnus@hagander.net>
wrote:

Where did your research point to then? :) I just read the gzip rfc
(http://www.zlib.org/rfc-gzip.html) which seems to call it that at

least?

Well, OK. I was not aware of this RFC. I guessed it by looking at the
code of gzip, that uses the CRC as well. I also found some reference
into a blog post.

Haha, ok. That was my first google hit, but I guess I luckily hit a better
search keyword.

I'll add a reference to the comment about it before commit.

Finally, I think we should make the error message clearly say
"compressed
segment file" - just to make things extra clear.

Sure.

AFAICT the
+ iscompress ? "compressed" : "",
part of the error handling is unnecessary, because iscompressed will

always

be true in that block. All the other error messages in that codepath has
compressed hardcoded in them, as should this one.

Fat-fingered here..

Hm. It looks that you are right. zlib goes down to _tr_flush_bits() to
flush some output, but this finishes only with put_byte(). As the fd
is opaque in gzFile, it would be just better to open() the file first,
and then use gzdopen to get the gzFile. Let's use as well the existing
fd field to save it. gzclose() closes as well the parent fd per the
documentation of zlib.

This version throws a warning:
walmethods.c: In function ‘dir_open_for_write’:
walmethods.c:170:11: warning: ‘gzfp’ may be used uninitialized in this
function [-Wmaybe-uninitialized]
f->gzfp = gzfp;

gcc and clang did not complain here, what did you use?

gcc (Debian 4.9.2-10) 4.9.2

I can't see that there is any code path where this can actually happen

though, so we should probably just initialize it to NULL at variable
declaration. Or do you see a path where this could actually be

incorrect?

Not that I see. All the code paths using gzfp are under
data_dir->compression > 0.

If you agree with those two comments, I will go ahead and push with those
minor fixes.

No problem for me, thanks for the review!

Committed.

--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/

#20Michael Paquier
michael.paquier@gmail.com
In reply to: Magnus Hagander (#19)
Re: Support for pg_receivexlog --format=plain|tar

On Tue, Jan 17, 2017 at 8:11 PM, Magnus Hagander <magnus@hagander.net> wrote:

Committed.

Yeah, thanks!
--
Michael

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers