Adding pg_dump flag for parallel export to pipes
Hi Hackers,
We are proposing the ability to specify a pipe command to pg_dump by a
flag. And attaching the patch set.
Why : Currently it is quite simple to pipe the output of pg_dump for
text format to a pipe at command line and do any manipulations
necessary. Following is an example :
pg_dump <flags> <dbname> | lz4 | pv -L 10k | ssh remote.host
"cat - > remote.dump.lz4"
Here we first compress the stream using lz4 and then send it over ssh
to a remote host to be saved as a file while rate-limiting the network
usage to 10KB/s.
Something like this is not possible for format=directory (-Fd) since
all you can provide is the directory name to store the individual
files. Note it is not possible to do this irrespective of the usage of
the parallel dump option ('--jobs' flag).
While the directory format supports compression using a flag, the rest
of the operations in the above example are not possible. And a pipe
command provides more flexibility in what compression algorithm one
wants to use.
This patch set provides pg_dump the ability to pipe the data in the
directory mode by using a new flag '--pipe-command' (in both parallel
and non-parallel mode).
We also add a similar option to pg_restore.
The following can be the major use cases of these changes :
1. Stream pg_dump output to a cloud storage
2. SSH the data to a remote host (with or without throttling)
3. Custom compression options
Usage Examples : Here is an example of how the pipe-command will look like.
pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f" (dumpdir
should exist beforehand.)
This is equivalent to
pg_dump -Fd mydb --file=dumpdir
(Please note that the flags '--file' or '--pipe-command' can't be used
together.)
For the more complex scenario as mentioned above, the command will be
(with the parallelism of 5) :
pg_dump -Fd mydb -j 5 --pipe-command="lz4 | pv -L 10k | ssh
remote.host "cat > dumpdir/%f""
Please note the use of %f in the above examples. As a user would
almost always want to write the post-processing output to a file (or
perhaps a cloud location), we provide a format specifier %f in the
command. The implementation of pipe-command replaces these format
specifiers with the corresponding file names. These file names are the
same as they would be in the current usage of directory format with
'--file' flag (<dump_id>.dat, toc.dat, blob_NNN.toc,
blob_<blob_id>.dat).
The usage of this flag with pg_restore will also be similar. Here is
an example of restoring from a gzip compressed dump directory.
pg_restore -C -Fd -d postgres --pipe-commnad="cat
dumpdir/%f.gz | gunzip"
The new flag in pg_restore also works with '-l' and '-L' options
pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L db.list
Implementation Details : Here are the major changes :
1. We reuse the same variables which store the file name to store
the pipe command. And add a new bool fSpecIsPipe in _archiveHandle
(similar bools in pg_dump.c and pg_restore.c) to specify if it's a
pipe command.
2. In the cases when the above bool is set to true, we use popen
and pclose instead of fopen and fclose.
3. To enable the format specifier %f in the pipe-command, we make
changes to the file name creation logic in a few places. Currently the
file name (corresponding to a table or large object) is appended to
the directory name provided by '--file' command. In case of
'--pipe-command', we use 'replace_percent_placeholders' to replace %f
with the corresponding file name. This change is made for both table
files and LO TOC files.
With these core changes, the rest of the code continues working as-is.
We are attaching 4 patches for this change :
001-pg_dump_pipe has the pg_dump pipe support code.
002-pg_restore_pipe has the pg_restore pipe support.
003-pg_dump_basic_tests has a few basic validation tests for
correctmflag combinations. We need to write more automated tests in
002_pg_dump.pl but have been running into some issues with environment
setup due to which certain pipe commands result in the shell process
becoming defunct. These same commands are working fine in manual
testing. We are still looking into this.
004-pg_dump_documentation has the proposed documentation changes.
We are working on the above test issues and cleanup of the patches.
Open Questions : There are a couple of open questions in the implementation :
1. Currently the LO TOC file (blob_NNN.toc) is opened in the
append mode. This is not possible with popen for the pipe command.
From reading the code, it seems to us that this file doesn't need to
be opened in the append mode. As '_StartLOs' is called once per
archive entry in WriteDataChunksForToCEntry followed by the dumper
function and then '_EndLOs', it should be okay to change this to 'w'
mode. But this code has been there since the start so we haven't made
that change yet. In the patch, we have changed it to 'w' pipe-command
only and added the ideas for potential solutions in the comments.
2. We are also not sure yet on how to handle the environment
issues when trying to add new tests to 002_pg_dump.pl.
Please let us know what you think.
Thanks & Regards,
Nitin Motiani
Google
Attachments:
002-pg_restore_pipe_v4.patchapplication/octet-stream; name=002-pg_restore_pipe_v4.patchDownload
commit 79c2fee3856e9c2263658a59b6a27caca95011e0
Author: Nitin Motiani <nitinmotiani@google.com>
Date: Sat Feb 15 08:05:25 2025 +0000
Add pipe-command support in pg_restore
* This is same as the pg_dump change. We add support
for --pipe-command in directory archive format. This can be used
to read from multiple streams and do pre-processing (decompression
with a custom algorithm, filtering etc) before restore.
Currently that is not possible because the pg_dump output of
directory format can't just be piped.
* Like pg_dump, here also either filename or --pipe-command can be
set. If neither are set, the standard input is used as before.
* This is only supported with compression none and archive format
directory.
* We reuse the inputFileSpec field for the pipe-command. And add
a bool to specify if it is a pipe.
* The changes made for pg_dump to handle the pipe case with popen
and pclose also work here.
* The logic of %f format specifier to read from the pg_dump output
is the same too. Most of the code from the pg_dump commit works.
We add similar logic to the function to read large objects.
* The --pipe command works -l and -L option.
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 334b3208783..b58708cf8b0 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -259,26 +259,32 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
Assert(strcmp(mode, PG_BINARY_R) == 0);
fname = pg_strdup(path);
-
- if (hasSuffix(fname, ".gz"))
- compression_spec.algorithm = PG_COMPRESSION_GZIP;
- else if (hasSuffix(fname, ".lz4"))
- compression_spec.algorithm = PG_COMPRESSION_LZ4;
- else if (hasSuffix(fname, ".zst"))
- compression_spec.algorithm = PG_COMPRESSION_ZSTD;
- else
+ /*
+ If the path is a pipe command, the compression algorithm
+ is none.
+ */
+ if (!path_is_pipe_command)
{
- if (stat(path, &st) == 0)
- compression_spec.algorithm = PG_COMPRESSION_NONE;
- else if (check_compressed_file(path, &fname, "gz"))
+ if (hasSuffix(fname, ".gz"))
compression_spec.algorithm = PG_COMPRESSION_GZIP;
- else if (check_compressed_file(path, &fname, "lz4"))
+ else if (hasSuffix(fname, ".lz4"))
compression_spec.algorithm = PG_COMPRESSION_LZ4;
- else if (check_compressed_file(path, &fname, "zst"))
+ else if (hasSuffix(fname, ".zst"))
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+ else
+ {
+ if (stat(path, &st) == 0)
+ compression_spec.algorithm = PG_COMPRESSION_NONE;
+ else if (check_compressed_file(path, &fname, "gz"))
+ compression_spec.algorithm = PG_COMPRESSION_GZIP;
+ else if (check_compressed_file(path, &fname, "lz4"))
+ compression_spec.algorithm = PG_COMPRESSION_LZ4;
+ else if (check_compressed_file(path, &fname, "zst"))
+ compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+ }
}
- CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); /* TODO: try to make it work also with pipes */
+ CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
if (!CFH->open_func(fname, -1, mode, CFH))
{
free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 193e9825a16..936b4a194a5 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -471,7 +471,18 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
tocfname, line);
StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
- snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+ /* TODO: This logic for naming blob files is common betwen _LoadLOs an _StartLO.
+ * Refactor in a helper function.
+ */
+ if (AH->fSpecIsPipe)
+ {
+ pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
+ strcpy(path, pipe);
+ }
+ else
+ {
+ snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+ }
_PrintFileData(AH, path);
EndRestoreLO(AH, oid);
}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 9b757dac568..f9fe72bde6a 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -59,7 +59,7 @@ static void usage(const char *progname);
static void read_restore_filters(const char *filename, RestoreOptions *opts);
static bool file_exists_in_directory(const char *dir, const char *filename);
static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
- int numWorkers, bool append_data, int num);
+ int numWorkers, bool append_data, int num, bool filespec_is_pipe);
static int read_one_statement(StringInfo inBuf, FILE *pfile);
static int restore_all_databases(PGconn *conn, const char *dumpdirpath,
SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers);
@@ -84,6 +84,7 @@ main(int argc, char **argv)
int n_errors = 0;
bool globals_only = false;
SimpleStringList db_exclude_patterns = {NULL, NULL};
+ bool filespec_is_pipe = false;
static int disable_triggers = 0;
static int enable_row_security = 0;
static int if_exists = 0;
@@ -165,6 +166,7 @@ main(int argc, char **argv)
{"statistics-only", no_argument, &statistics_only, 1},
{"filter", required_argument, NULL, 4},
{"exclude-database", required_argument, NULL, 6},
+ {"pipe-command", required_argument, NULL, 7},
{NULL, 0, NULL, 0}
};
@@ -346,6 +348,11 @@ main(int argc, char **argv)
simple_string_list_append(&db_exclude_patterns, optarg);
break;
+ case 7: /* pipe-command */
+ inputFileSpec = pg_strdup(optarg);
+ filespec_is_pipe = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -353,11 +360,23 @@ main(int argc, char **argv)
}
}
- /* Get file name from command line */
+ /* Get file name from command line. Note that filename argument and pipe-command can't both be set. */
if (optind < argc)
+ {
+ if (filespec_is_pipe)
+ {
+ pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
+ exit_nicely(1);
+ }
inputFileSpec = argv[optind++];
- else
+ }
+ /* Even if the file argument is not provided, if the pipe-command is specified, we need to use that
+ * as the file arg and not fallback to stdio.
+ */
+ else if (!filespec_is_pipe)
+ {
inputFileSpec = NULL;
+ }
/* Complain if any arguments remain */
if (optind < argc)
@@ -567,7 +586,7 @@ main(int argc, char **argv)
if (globals_only)
pg_fatal("option -g/--globals-only can be used only when restoring an archive created by pg_dumpall");
- n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0);
+ n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0, filespec_is_pipe);
}
/* Done, print a summary of ignored errors during restore. */
@@ -589,12 +608,18 @@ main(int argc, char **argv)
*/
static int
restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
- int numWorkers, bool append_data, int num)
+ int numWorkers, bool append_data, int num, bool filespec_is_pipe)
{
Archive *AH;
int n_errors;
- AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */
+ if (filespec_is_pipe && opts->format != archDirectory)
+ {
+ pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+ exit_nicely(1);
+ }
+
+ AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
SetArchiveOptions(AH, NULL, opts);
@@ -1241,7 +1266,7 @@ restore_all_databases(PGconn *conn, const char *dumpdirpath,
opts->dumpStatistics = dumpStatistics;
/* Restore the single database. */
- n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count);
+ n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count, false);
/* Print a summary of ignored errors during single database restore. */
if (n_errors)
001-pg_dump_pipe_v4.patchapplication/octet-stream; name=001-pg_dump_pipe_v4.patchDownload
commit 6c64cd2f3259a2f04f9deff61b7ddfebe91ba1df
Author: Nitin Motiani <nitinmotiani@google.com>
Date: Tue Feb 11 08:31:02 2025 +0000
Add pipe-command support for directory mode of pg_dump
* We add a new flag --pipe-command which can be used in directory
mode. This allows us to support multiple streams and we can
do post processing like compression, filtering etc. This is
currently not possible with directory-archive format.
* Currently this flag is only supported with compression none
and archive format directory.
* This flag can't be used with the flag --file. Only one of the
two flags can be used at a time.
* We reuse the filename field for the --pipe-command also. And add a
bool to specify that the field will be used as a pipe command.
* Most of the code remains as it is. The core change is that
in case of --pipe-command, instead of fopen we do popen.
* The user would need a way to store the post-processing output
in files. For that we support the same format as the directory
mode currently does with the flag --file. We allow the user
to add a format specifier %f to the --pipe-command. And for each
stream, the format specifier is replaced with the corresponding
file name. This file name is the same as it would have been if
the flag --file had been used.
* To enable the above, there are a few places in the code where
we change the file name creation logic. Currently the file name
is appended to the directory name which is provided with --file flag.
In case of --pipe-command, we instead replace %f with the file name.
This change is made for the common use case and separately for
blob files.
* There is an open question on what mode to use in case of large objects
TOC file. Currently the code uses "ab" but that won't work for popen.
We have proposed a few options in the comments regarding this. For the
time being we are using mode PG_BINARY_W for the pipe use case.
diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 23f617209e6..92127e87889 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -389,8 +389,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
void
InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
+ if(path_is_pipe_command)
+ pg_fatal("cPipe command not supported for Gzip");
+
CFH->open_func = Gzip_open;
CFH->open_write_func = Gzip_open_write;
CFH->read_func = Gzip_read;
@@ -415,7 +419,8 @@ InitCompressorGzip(CompressorState *cs,
void
InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "gzip");
}
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index 3bef0d5b1b8..ccf2479cf3b 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -19,6 +19,7 @@
extern void InitCompressorGzip(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 8c3d9c911c4..334b3208783 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -191,20 +191,29 @@ free_keep_errno(void *p)
* Initialize a compress file handle for the specified compression algorithm.
*/
CompressFileHandle *
-InitCompressFileHandle(const pg_compress_specification compression_spec)
+InitCompressFileHandle(const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
CompressFileHandle *CFH;
+
CFH = pg_malloc0(sizeof(CompressFileHandle));
- if (compression_spec.algorithm == PG_COMPRESSION_NONE)
- InitCompressFileHandleNone(CFH, compression_spec);
+ /* TODO: Currently always set to non-compressed when path_is_pipe_command
+ * assuming that external compressor as part of pipe is nore efficient
+ * should review after POC
+ */
+ if (path_is_pipe_command)
+ InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+
+ else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
+ InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
- InitCompressFileHandleGzip(CFH, compression_spec);
+ InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
- InitCompressFileHandleLZ4(CFH, compression_spec);
+ InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
- InitCompressFileHandleZstd(CFH, compression_spec);
+ InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
return CFH;
}
@@ -237,7 +246,8 @@ check_compressed_file(const char *path, char **fname, char *ext)
* On failure, return NULL with an error code in errno.
*/
CompressFileHandle *
-InitDiscoverCompressFileHandle(const char *path, const char *mode)
+InitDiscoverCompressFileHandle(const char *path, const char *mode,
+ bool path_is_pipe_command)
{
CompressFileHandle *CFH = NULL;
struct stat st;
@@ -268,7 +278,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
}
- CFH = InitCompressFileHandle(compression_spec);
+ CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); /* TODO: try to make it work also with pipes */
if (!CFH->open_func(fname, -1, mode, CFH))
{
free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index db9b38744c8..d3b795d7e94 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -185,6 +185,11 @@ struct CompressFileHandle
*/
pg_compress_specification compression_spec;
+ /*
+ * Compression specification for this file handle.
+ */
+ bool path_is_pipe_command;
+
/*
* Private data to be used by the compressor.
*/
@@ -194,7 +199,8 @@ struct CompressFileHandle
/*
* Initialize a compress file handle with the requested compression.
*/
-extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
+extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
/*
* Initialize a compress file stream. Infer the compression algorithm
@@ -202,6 +208,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
* suffixes in 'path'.
*/
extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
- const char *mode);
+ const char *mode,
+ bool path_is_pipe_command);
extern bool EndCompressFileHandle(CompressFileHandle *CFH);
#endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index e99f0cad71f..7e7713936bf 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -751,10 +751,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
*/
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
LZ4State *state;
+ if(path_is_pipe_command)
+ pg_fatal("cPipe command not supported for LZ4");
+
CFH->open_func = LZ4Stream_open;
CFH->open_write_func = LZ4Stream_open_write;
CFH->read_func = LZ4Stream_read;
@@ -770,6 +774,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
if (CFH->compression_spec.level >= 0)
state->prefs.compressionLevel = CFH->compression_spec.level;
+ CFH->path_is_pipe_command = path_is_pipe_command;
+
CFH->private_data = state;
}
#else /* USE_LZ4 */
@@ -782,7 +788,8 @@ InitCompressorLZ4(CompressorState *cs,
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "LZ4");
}
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 7f7216cc648..d52e6071519 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -19,6 +19,7 @@
extern void InitCompressorLZ4(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index 3fc89c99854..6d69e580ed2 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -153,7 +153,12 @@ close_none(CompressFileHandle *CFH)
CFH->private_data = NULL;
if (fp)
- ret = fclose(fp);
+ {
+ if(CFH->path_is_pipe_command)
+ ret = pclose(fp);
+ else
+ ret = fclose(fp);
+ }
return ret == 0;
}
@@ -172,7 +177,12 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
if (fd >= 0)
CFH->private_data = fdopen(dup(fd), mode);
else
- CFH->private_data = fopen(path, mode);
+ {
+ if (CFH->path_is_pipe_command)
+ CFH->private_data = popen(path, mode);
+ else
+ CFH->private_data = fopen(path, mode);
+ }
if (CFH->private_data == NULL)
return false;
@@ -185,7 +195,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
{
Assert(CFH->private_data == NULL);
- CFH->private_data = fopen(path, mode);
+ pg_log_debug("Opening %s, pipe is %s",
+ path, CFH->path_is_pipe_command ? "true" : "false");
+
+ if (CFH->path_is_pipe_command)
+ CFH->private_data = popen(path, mode);
+ else
+ CFH->private_data = fopen(path, mode);
+
if (CFH->private_data == NULL)
return false;
@@ -198,7 +215,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
void
InitCompressFileHandleNone(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
CFH->open_func = open_none;
CFH->open_write_func = open_write_none;
@@ -210,5 +228,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
CFH->eof_func = eof_none;
CFH->get_error_func = get_error_none;
+ CFH->path_is_pipe_command = path_is_pipe_command;
+
CFH->private_data = NULL;
}
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index f927f196c36..1399c8bde3b 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -19,6 +19,7 @@
extern void InitCompressorNone(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index 1f7b4942706..f38c085d37a 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -26,7 +26,7 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
}
void
-InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec, bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "ZSTD");
}
@@ -523,8 +523,12 @@ Zstd_get_error(CompressFileHandle *CFH)
void
InitCompressFileHandleZstd(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
+ if(path_is_pipe_command)
+ pg_fatal("cPipe command not supported for Zstd");
+
CFH->open_func = Zstd_open;
CFH->open_write_func = Zstd_open_write;
CFH->read_func = Zstd_read;
@@ -536,6 +540,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
CFH->get_error_func = Zstd_get_error;
CFH->compression_spec = compression_spec;
+ CFH->path_is_pipe_command = path_is_pipe_command;
CFH->private_data = NULL;
}
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index af21db48ded..1644b6d6eba 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -20,6 +20,7 @@
extern void InitCompressorZstd(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index af0007fb6d2..18be674ab7f 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -311,14 +311,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
extern void RestoreArchive(Archive *AHX, bool append_data);
/* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
/* Create a new archive */
extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker,
- DataDirSyncMethod sync_method);
+ DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe); /* ^^ what are the casing rules here ?? */
/* The --list option */
extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index f961162f365..83e66873a5d 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -54,7 +54,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupWorkerPtr,
- DataDirSyncMethod sync_method);
+ DataDirSyncMethod sync_method, bool FileSpecIsPipe);
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
static char *sanitize_line(const char *str, bool want_hyphen);
@@ -230,11 +230,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker,
- DataDirSyncMethod sync_method)
+ DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe)
{
ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
- dosync, mode, setupDumpWorker, sync_method);
+ dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe);
return (Archive *) AH;
}
@@ -242,7 +243,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
/* Open an existing archive */
/* Public */
Archive *
-OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe)
{
ArchiveHandle *AH;
pg_compress_specification compression_spec = {0};
@@ -250,7 +251,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
compression_spec.algorithm = PG_COMPRESSION_NONE;
AH = _allocAH(FileSpec, fmt, compression_spec, true,
archModeRead, setupRestoreWorker,
- DATA_DIR_SYNC_METHOD_FSYNC);
+ DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe);
return (Archive *) AH;
}
@@ -1705,7 +1706,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
else
mode = PG_BINARY_W;
- CFH = InitCompressFileHandle(compression_spec);
+ CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
if (!CFH->open_func(filename, fn, mode, CFH))
{
@@ -2362,7 +2363,8 @@ static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
- SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
+ SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe)
{
ArchiveHandle *AH;
CompressFileHandle *CFH;
@@ -2403,6 +2405,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
else
AH->fSpec = NULL;
+ AH->fSpecIsPipe = FileSpecIsPipe;
+
AH->currUser = NULL; /* unknown */
AH->currSchema = NULL; /* ditto */
AH->currTablespace = NULL; /* ditto */
@@ -2415,14 +2419,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->mode = mode;
AH->compression_spec = compression_spec;
- AH->dosync = dosync;
+ AH->dosync = FileSpecIsPipe ? false : dosync;
AH->sync_method = sync_method;
memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
/* Open stdout with no compression for AH output handle */
out_compress_spec.algorithm = PG_COMPRESSION_NONE;
- CFH = InitCompressFileHandle(out_compress_spec);
+ CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
pg_fatal("could not open stdout for appending: %m");
AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 365073b3eae..d7fa3086184 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,6 +301,7 @@ struct _archiveHandle
int loCount; /* # of LOs restored */
char *fSpec; /* Archive File Spec */
+ bool fSpecIsPipe; /* fSpec is a pipe command template requiring replacing %f with file name */
FILE *FH; /* General purpose file handle */
void *OF; /* Output file */
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index b2a841bb0ff..193e9825a16 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -39,7 +39,8 @@
#include <dirent.h>
#include <sys/stat.h>
-#include "common/file_utils.h"
+//#include "common/file_utils.h"
+#include "common/percentrepl.h"
#include "compress_io.h"
#include "parallel.h"
#include "pg_backup_utils.h"
@@ -158,39 +159,41 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
{
struct stat st;
bool is_empty = false;
-
- /* we accept an empty existing directory */
- if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
+
+ if(!AH->fSpecIsPipe) /* no checks for pipe */
{
- DIR *dir = opendir(ctx->directory);
-
- if (dir)
+ /* we accept an empty existing directory */
+ if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
{
- struct dirent *d;
+ DIR *dir = opendir(ctx->directory);
- is_empty = true;
- while (errno = 0, (d = readdir(dir)))
+ if (dir)
{
- if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
+ struct dirent *d;
+
+ is_empty = true;
+ while (errno = 0, (d = readdir(dir)))
{
- is_empty = false;
- break;
+ if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
+ {
+ is_empty = false;
+ break;
+ }
}
- }
- if (errno)
- pg_fatal("could not read directory \"%s\": %m",
- ctx->directory);
+ if (errno)
+ pg_fatal("could not read directory \"%s\": %m",
+ ctx->directory);
- if (closedir(dir))
- pg_fatal("could not close directory \"%s\": %m",
- ctx->directory);
+ if (closedir(dir))
+ pg_fatal("could not close directory \"%s\": %m",
+ ctx->directory);
+ }
}
+ if (!is_empty && mkdir(ctx->directory, 0700) < 0)
+ pg_fatal("could not create directory \"%s\": %m",
+ ctx->directory);
}
-
- if (!is_empty && mkdir(ctx->directory, 0700) < 0)
- pg_fatal("could not create directory \"%s\": %m",
- ctx->directory);
}
else
{ /* Read Mode */
@@ -199,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
setFilePath(AH, fname, "toc.dat");
- tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
+ tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
if (tocFH == NULL)
pg_fatal("could not open input file \"%s\": %m", fname);
@@ -327,7 +330,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
setFilePath(AH, fname, tctx->filename);
- ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+ ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
@@ -391,7 +394,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
if (!filename)
return;
- CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+ CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
if (!CFH)
pg_fatal("could not open input file \"%s\": %m", filename);
@@ -449,7 +452,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
*/
setFilePath(AH, tocfname, tctx->filename);
- CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
+ CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
if (ctx->LOsTocFH == NULL)
pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -460,6 +463,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
{
char lofname[MAXPGPATH + 1];
char path[MAXPGPATH];
+ char* pipe;
/* Can't overflow because line and lofname are the same length */
if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -595,7 +599,7 @@ _CloseArchive(ArchiveHandle *AH)
/* The TOC is always created uncompressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
- tocFH = InitCompressFileHandle(compression_spec);
+ tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
ctx->dataFH = tocFH;
@@ -656,13 +660,42 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
pg_compress_specification compression_spec = {0};
char fname[MAXPGPATH];
+ const char *mode;
setFilePath(AH, fname, tctx->filename);
/* The LO TOC file is never compressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
- ctx->LOsTocFH = InitCompressFileHandle(compression_spec);
- if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH))
+ ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+ /* TODO: Finalize the correct approach for the mode.
+ * The mode for the LOs TOC file has been "ab" from the start. That
+ * is something we can't do for pipe-command as popen only supports
+ * read and write. Just changing it to 'w' was not expected to be enough
+ * and one possible solution considered is to open it in 'w' mode and
+ * keep it open till all the LOs in the dump group are done.
+ *
+ * The analysis of the current code shows that there is one ToCEntry
+ * per blob group. And it is written by @WriteDataChunksForToCEntry.
+ * This function calls _StartLOs once before the dumper function and
+ * and _EndLOs once after the dumper. And the dumper dumps all the
+ * LOs in the group. So a blob_NNN.toc is only opened once and closed
+ * after all the entries are written. Therefore the mode can be made 'w'
+ * for all the cases. We tested changing the mode to PG_BINARY_W and
+ * the tests passed. But in case there are some missing scenarios, we
+ * have not made that change here. Instead for now only doing it for the
+ * pipe command.
+ * In short there are 3 solutions :
+ * 1. Change the mode for everything (preferred)
+ * 2. Change it only for pipe-command (done for time-being)
+ * 3. Change it for pipe-command and then cache those handles and
+ * close them in the end (based on the code review, we might
+ * pick this).
+ */
+ if (AH->fSpecIsPipe)
+ mode = PG_BINARY_W;
+ else
+ mode = "ab";
+ if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -676,10 +709,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
lclContext *ctx = (lclContext *) AH->formatData;
char fname[MAXPGPATH];
+ char* pipe;
+ char blob_name[MAXPGPATH];
- snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+ if(AH->fSpecIsPipe)
+ {
+ snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+ pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
+ strcpy(fname, pipe);
+ /* TODO:figure out how to free the allocated string when replace_percent_placeholders isused in frontend*/
+ }
+ else
+ {
+ snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+ }
- ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+ ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -740,15 +785,26 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
{
lclContext *ctx = (lclContext *) AH->formatData;
char *dname;
+ char *pipe;
dname = ctx->directory;
- if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
- pg_fatal("file name too long: \"%s\"", dname);
- strcpy(buf, dname);
- strcat(buf, "/");
- strcat(buf, relativeFilename);
+ if(AH->fSpecIsPipe)
+ {
+ pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+ strcpy(buf, pipe);
+ /* TODO:figure out how to free the allocated string when replace_percent_placeholders isused in frontend*/
+ }
+ else /* replace all ocurrences of %f in dname with relativeFilename */
+ {
+ if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
+ pg_fatal("file name too long: \"%s\"", dname);
+
+ strcpy(buf, dname);
+ strcat(buf, "/");
+ strcat(buf, relativeFilename);
+ }
}
/*
@@ -790,17 +846,24 @@ _PrepParallelRestore(ArchiveHandle *AH)
* only need an approximate indicator of that.
*/
setFilePath(AH, fname, tctx->filename);
+ pg_log_error("filename: %s", fname);
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
{
+ if(AH->fSpecIsPipe)
+ pg_log_error("pipe and compressed");
+
if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
strlcat(fname, ".gz", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
strlcat(fname, ".lz4", sizeof(fname));
- else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+ else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD){
+ pg_log_error("filename: %s", fname);
strlcat(fname, ".zst", sizeof(fname));
+ pg_log_error("filename: %s", fname);
+ }
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8e6364d32d7..ce2e5eeccd8 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -409,6 +409,7 @@ main(int argc, char **argv)
{
int c;
const char *filename = NULL;
+ bool filename_is_pipe = false;
const char *format = "p";
TableInfo *tblinfo;
int numTables;
@@ -528,6 +529,7 @@ main(int argc, char **argv)
{"filter", required_argument, NULL, 16},
{"exclude-extension", required_argument, NULL, 17},
{"sequence-data", no_argument, &dopt.sequence_data, 1},
+ {"pipe-command", required_argument, NULL, 25},
{NULL, 0, NULL, 0}
};
@@ -599,7 +601,12 @@ main(int argc, char **argv)
break;
case 'f':
+ if(filename != NULL){
+ pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+ exit_nicely(1);
+ }
filename = pg_strdup(optarg);
+ filename_is_pipe = false; /* it already is, setting again here just for clarity */
break;
case 'F':
@@ -796,6 +803,15 @@ main(int argc, char **argv)
with_statistics = true;
break;
+ case 25: /* pipe command */
+ if(filename != NULL){
+ pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+ exit_nicely(1);
+ }
+ filename = pg_strdup(optarg);
+ filename_is_pipe = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -888,13 +904,26 @@ main(int argc, char **argv)
if (archiveFormat == archNull)
plainText = 1;
+ if (filename_is_pipe && archiveFormat != archDirectory)
+ {
+ pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+ exit_nicely(1);
+ }
+
+ if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+ {
+ pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
+ exit_nicely(1);
+ }
+
/*
* Custom and directory formats are compressed by default with gzip when
* available, not the others. If gzip is not available, no compression is
- * done by default.
+ * done by default. If directory format is being used with pipe-command,
+ * no compression is done.
*/
if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
- !user_compression_defined)
+ !filename_is_pipe && !user_compression_defined)
{
#ifdef HAVE_LIBZ
compression_algorithm_str = "gzip";
@@ -944,7 +973,7 @@ main(int argc, char **argv)
/* Open the output file */
fout = CreateArchive(filename, archiveFormat, compression_spec,
- dosync, archiveMode, setupDumpWorker, sync_method);
+ dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
/* Make dump options accessible right away */
SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index fe33b283a01..9b757dac568 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -594,7 +594,7 @@ restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
Archive *AH;
int n_errors;
- AH = OpenArchive(inputFileSpec, opts->format);
+ AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */
SetArchiveOptions(AH, NULL, opts);
004-pg_dump_documentation_v4.patchapplication/octet-stream; name=004-pg_dump_documentation_v4.patchDownload
commit 8152fdc26428b55d34f4ec9158d0112aa2379aab
Author: Nitin Motiani <nitinmotiani@google.com>
Date: Fri Apr 4 14:34:48 2025 +0000
Add documentation for pipe-command in pg_dump and pg_restore
* Add the descriptions of the new flags and constraints
regarding which mode and other flags they can't be used with.
* Explain the purpose of the flags.
* Add a few examples of the usage of the flags.
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index bfc1e7b3524..24f443c8cf4 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -287,6 +287,8 @@ PostgreSQL documentation
specifies the target directory instead of a file. In this case the
directory is created by <command>pg_dump</command> and must not exist
before.
+ This option and <option>--pipe-command</option> can't be used
+ together.
</para>
</listitem>
</varlistentry>
@@ -1272,6 +1274,32 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>--pipe-command</option></term>
+ <listitem>
+ <para>
+ This option is only supported with the directory output
+ format. It can be used to write to multiple streams which
+ otherwise would not be possible with the directory mode.
+ For each stream, it starts a process which runs the
+ specified command and pipes the pg_dump output to this
+ process.
+ This option is not valid if <option>--file</option>
+ is also specified.
+ </para>
+ <para>
+ The pipe-command can be used to perform operations like compress
+ using a custom algorithm, filter, or write the output to a cloud
+ storage etc. The user would need a way to pipe the final output of
+ each stream to a file. To handle that, the pipe command supports a format
+ specifier %f. And all the instances of %f in the command string
+ will be replaced with the corresponding file name which
+ would have been used in the directory mode with <option>--file</option>.
+ See <xref linkend="pg-dump-examples"/> below.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>--quote-all-identifiers</option></term>
<listitem>
@@ -1784,6 +1812,35 @@ CREATE DATABASE foo WITH TEMPLATE template0;
</screen>
</para>
+ <para>
+ To use pipe-command to dump a database into a directory-format archive
+ (the directory <literal>dumpdir</literal> needs to exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command to dump a database into a directory-format archive
+ in parallel with 5 worker jobs (the directory <literal>dumpdir</literal> needs to exist
+ before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb -j 5 --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command to compress and dump a database into a
+ directory-format archive (the directory <literal>dumpdir</literal> needs to
+ exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="gzip > dumpdir/%f.gz"</userinput>
+</screen>
+ </para>
+
<para>
To reload an archive file into a (freshly created) database named
<literal>newdb</literal>:
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index f14e5866f6c..c6da280bd3e 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -93,7 +93,10 @@ PostgreSQL documentation
<para>
Specifies the location of the archive file (or directory, for a
directory-format archive) to be restored.
- If not specified, the standard input is used.
+ This option and <option>--pipe-command</option> can't be set
+ at the same time.
+ If neither this option nor <option>--pipe-command</option> is specified,
+ the standard input is used.
</para>
</listitem>
</varlistentry>
@@ -851,6 +854,34 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>--pipe-command</option></term>
+ <listitem>
+ <para>
+ This option is only supported with the directory output
+ format. It can be used to read from multiple streams which
+ otherwise would not be possible with the directory mode.
+ For each stream, it starts a process which runs the
+ specified command and pipes its output to the pg_restore process.
+ This option is not valid if <option>filename</option>
+ is also specified.
+ </para>
+ <para>
+ The pipe-command can be used to perform operations like
+ decompress using a custom algorithm, filter, or read from
+ a cloud storage. When reading from the pg_dump output,
+ the user would need a way to read the correct file in each
+ stream. To handle that, the pipe command supports a format
+ specifier %f. And all the instances of %f in the command string
+ will be replaced with the corresponding file name which
+ would have been used in the directory mode with
+ <option>filename</option>.
+ This is same as the <option>--pipe-command</option> of pg-dump.
+ See <xref linkend="app-pgrestore-examples"/> below.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>--with-data</option></term>
<listitem>
@@ -1263,6 +1294,43 @@ CREATE DATABASE foo WITH TEMPLATE template0;
<prompt>$</prompt> <userinput>pg_restore -L db.list db.dump</userinput>
</screen></para>
+ <para>
+ To use pg_restore with pipe-command to recreate from a dump in
+ directory-archive format. The database should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in <literal>dumpdir</literal>.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pg_restore with pipe-command to first decompress and then
+ recreate from a dump in directory-archive format. The database
+ should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in <literal>dumpdir</literal>. And all files are
+ <literal>gzip</literal> compressed.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f.gz | gunzip"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command along with <option>-L</option> to recreate only
+ selectd items from a dump in the directory-archive format.
+ The database should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in dumpdir.
+ The <literal>db.list</literal> file is the same as one used in the previous example with <option>-L</option>
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L db.list</userinput>
+</screen>
+ </para>
+
</refsect1>
<refsect1>
003-pg_dump_basic_tests_v4.patchapplication/octet-stream; name=003-pg_dump_basic_tests_v4.patchDownload
commit 209b41aad5fa2c49ea437ec1528028b741eea487
Author: Nitin Motiani <nitinmotiani@google.com>
Date: Sat Feb 15 04:29:17 2025 +0000
Add basic tests for pipe-command
* This currently only adds a few basic tests for pg_dump with --pipe-command.
* These tests include the invalid usages of --pipe-command with other flags.
* We are still working on adding other tests in pg_dump.pl. But
we ran into some issues which might be related to setup.
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index 84ca25e17d6..3badd335f8c 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -74,6 +74,42 @@ command_fails_like(
'pg_dump: options --statistics-only and --no-statistics cannot be used together'
);
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-f', 'testdir', 'test'],
+ qr/\Qpg_dump: hint: Only one of [--file, --pipe-command] allowed\E/,
+ 'pg_dump: hint: Only one of [--file, --pipe-command] allowed'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', 'gzip', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '--compress=lz4', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', '1', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fc', '--pipe-command="cat"', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+ 'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
+command_fails_like(
+ [ 'pg_dump', '--format=tar', '--pipe-command="cat"', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+ 'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
command_fails_like(
[ 'pg_dump', '-j2', '--include-foreign-data=xxx' ],
qr/\Qpg_dump: error: option --include-foreign-data is not supported with parallel backup\E/,
Just to bring this out separately : Does anybody have any idea why pipe
commands close inside tests ?
Re: 003-pg_dump_basic_tests has a few basic validation tests for
correctmflag combinations. We need to write more automated tests in
002_pg_dump.pl but have been running into some issues with environment
setup due to which certain pipe commands result in the shell process
becoming defunct. These same commands are working fine in manual
testing. We are still looking into this.
----
Hannu
On Mon, Apr 7, 2025 at 7:17 PM Nitin Motiani <nitinmotiani@google.com>
wrote:
Show quoted text
Hi Hackers,
We are proposing the ability to specify a pipe command to pg_dump by a
flag. And attaching the patch set.Why : Currently it is quite simple to pipe the output of pg_dump for
text format to a pipe at command line and do any manipulations
necessary. Following is an example :pg_dump <flags> <dbname> | lz4 | pv -L 10k | ssh remote.host
"cat - > remote.dump.lz4"Here we first compress the stream using lz4 and then send it over ssh
to a remote host to be saved as a file while rate-limiting the network
usage to 10KB/s.Something like this is not possible for format=directory (-Fd) since
all you can provide is the directory name to store the individual
files. Note it is not possible to do this irrespective of the usage of
the parallel dump option ('--jobs' flag).While the directory format supports compression using a flag, the rest
of the operations in the above example are not possible. And a pipe
command provides more flexibility in what compression algorithm one
wants to use.This patch set provides pg_dump the ability to pipe the data in the
directory mode by using a new flag '--pipe-command' (in both parallel
and non-parallel mode).We also add a similar option to pg_restore.
The following can be the major use cases of these changes :
1. Stream pg_dump output to a cloud storage
2. SSH the data to a remote host (with or without throttling)
3. Custom compression optionsUsage Examples : Here is an example of how the pipe-command will look like.
pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f" (dumpdir
should exist beforehand.)This is equivalent to
pg_dump -Fd mydb --file=dumpdir
(Please note that the flags '--file' or '--pipe-command' can't be used
together.)For the more complex scenario as mentioned above, the command will be
(with the parallelism of 5) :pg_dump -Fd mydb -j 5 --pipe-command="lz4 | pv -L 10k | ssh
remote.host "cat > dumpdir/%f""Please note the use of %f in the above examples. As a user would
almost always want to write the post-processing output to a file (or
perhaps a cloud location), we provide a format specifier %f in the
command. The implementation of pipe-command replaces these format
specifiers with the corresponding file names. These file names are the
same as they would be in the current usage of directory format with
'--file' flag (<dump_id>.dat, toc.dat, blob_NNN.toc,
blob_<blob_id>.dat).The usage of this flag with pg_restore will also be similar. Here is
an example of restoring from a gzip compressed dump directory.pg_restore -C -Fd -d postgres --pipe-commnad="cat
dumpdir/%f.gz | gunzip"The new flag in pg_restore also works with '-l' and '-L' options
pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L
db.listImplementation Details : Here are the major changes :
1. We reuse the same variables which store the file name to store
the pipe command. And add a new bool fSpecIsPipe in _archiveHandle
(similar bools in pg_dump.c and pg_restore.c) to specify if it's a
pipe command.
2. In the cases when the above bool is set to true, we use popen
and pclose instead of fopen and fclose.
3. To enable the format specifier %f in the pipe-command, we make
changes to the file name creation logic in a few places. Currently the
file name (corresponding to a table or large object) is appended to
the directory name provided by '--file' command. In case of
'--pipe-command', we use 'replace_percent_placeholders' to replace %f
with the corresponding file name. This change is made for both table
files and LO TOC files.With these core changes, the rest of the code continues working as-is.
We are attaching 4 patches for this change :
001-pg_dump_pipe has the pg_dump pipe support code.
002-pg_restore_pipe has the pg_restore pipe support.
003-pg_dump_basic_tests has a few basic validation tests for
correctmflag combinations. We need to write more automated tests in
002_pg_dump.pl but have been running into some issues with environment
setup due to which certain pipe commands result in the shell process
becoming defunct. These same commands are working fine in manual
testing. We are still looking into this.
004-pg_dump_documentation has the proposed documentation changes.We are working on the above test issues and cleanup of the patches.
Open Questions : There are a couple of open questions in the
implementation :1. Currently the LO TOC file (blob_NNN.toc) is opened in the
append mode. This is not possible with popen for the pipe command.
From reading the code, it seems to us that this file doesn't need to
be opened in the append mode. As '_StartLOs' is called once per
archive entry in WriteDataChunksForToCEntry followed by the dumper
function and then '_EndLOs', it should be okay to change this to 'w'
mode. But this code has been there since the start so we haven't made
that change yet. In the patch, we have changed it to 'w' pipe-command
only and added the ideas for potential solutions in the comments.
2. We are also not sure yet on how to handle the environment
issues when trying to add new tests to 002_pg_dump.pl.Please let us know what you think.
Thanks & Regards,
Nitin Motiani
If there are no objections we will add this to the commitfest
Show quoted text
On Mon, Apr 7, 2025 at 9:48 PM Hannu Krosing <hannuk@google.com> wrote:
Just to bring this out separately : Does anybody have any idea why pipe commands close inside tests ?
Re: 003-pg_dump_basic_tests has a few basic validation tests for
correctmflag combinations. We need to write more automated tests in
002_pg_dump.pl but have been running into some issues with environment
setup due to which certain pipe commands result in the shell process
becoming defunct. These same commands are working fine in manual
testing. We are still looking into this.----
HannuOn Mon, Apr 7, 2025 at 7:17 PM Nitin Motiani <nitinmotiani@google.com> wrote:
Hi Hackers,
We are proposing the ability to specify a pipe command to pg_dump by a
flag. And attaching the patch set.Why : Currently it is quite simple to pipe the output of pg_dump for
text format to a pipe at command line and do any manipulations
necessary. Following is an example :pg_dump <flags> <dbname> | lz4 | pv -L 10k | ssh remote.host
"cat - > remote.dump.lz4"Here we first compress the stream using lz4 and then send it over ssh
to a remote host to be saved as a file while rate-limiting the network
usage to 10KB/s.Something like this is not possible for format=directory (-Fd) since
all you can provide is the directory name to store the individual
files. Note it is not possible to do this irrespective of the usage of
the parallel dump option ('--jobs' flag).While the directory format supports compression using a flag, the rest
of the operations in the above example are not possible. And a pipe
command provides more flexibility in what compression algorithm one
wants to use.This patch set provides pg_dump the ability to pipe the data in the
directory mode by using a new flag '--pipe-command' (in both parallel
and non-parallel mode).We also add a similar option to pg_restore.
The following can be the major use cases of these changes :
1. Stream pg_dump output to a cloud storage
2. SSH the data to a remote host (with or without throttling)
3. Custom compression optionsUsage Examples : Here is an example of how the pipe-command will look like.
pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f" (dumpdir
should exist beforehand.)This is equivalent to
pg_dump -Fd mydb --file=dumpdir
(Please note that the flags '--file' or '--pipe-command' can't be used
together.)For the more complex scenario as mentioned above, the command will be
(with the parallelism of 5) :pg_dump -Fd mydb -j 5 --pipe-command="lz4 | pv -L 10k | ssh
remote.host "cat > dumpdir/%f""Please note the use of %f in the above examples. As a user would
almost always want to write the post-processing output to a file (or
perhaps a cloud location), we provide a format specifier %f in the
command. The implementation of pipe-command replaces these format
specifiers with the corresponding file names. These file names are the
same as they would be in the current usage of directory format with
'--file' flag (<dump_id>.dat, toc.dat, blob_NNN.toc,
blob_<blob_id>.dat).The usage of this flag with pg_restore will also be similar. Here is
an example of restoring from a gzip compressed dump directory.pg_restore -C -Fd -d postgres --pipe-commnad="cat
dumpdir/%f.gz | gunzip"The new flag in pg_restore also works with '-l' and '-L' options
pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L db.list
Implementation Details : Here are the major changes :
1. We reuse the same variables which store the file name to store
the pipe command. And add a new bool fSpecIsPipe in _archiveHandle
(similar bools in pg_dump.c and pg_restore.c) to specify if it's a
pipe command.
2. In the cases when the above bool is set to true, we use popen
and pclose instead of fopen and fclose.
3. To enable the format specifier %f in the pipe-command, we make
changes to the file name creation logic in a few places. Currently the
file name (corresponding to a table or large object) is appended to
the directory name provided by '--file' command. In case of
'--pipe-command', we use 'replace_percent_placeholders' to replace %f
with the corresponding file name. This change is made for both table
files and LO TOC files.With these core changes, the rest of the code continues working as-is.
We are attaching 4 patches for this change :
001-pg_dump_pipe has the pg_dump pipe support code.
002-pg_restore_pipe has the pg_restore pipe support.
003-pg_dump_basic_tests has a few basic validation tests for
correctmflag combinations. We need to write more automated tests in
002_pg_dump.pl but have been running into some issues with environment
setup due to which certain pipe commands result in the shell process
becoming defunct. These same commands are working fine in manual
testing. We are still looking into this.
004-pg_dump_documentation has the proposed documentation changes.We are working on the above test issues and cleanup of the patches.
Open Questions : There are a couple of open questions in the implementation :
1. Currently the LO TOC file (blob_NNN.toc) is opened in the
append mode. This is not possible with popen for the pipe command.
From reading the code, it seems to us that this file doesn't need to
be opened in the append mode. As '_StartLOs' is called once per
archive entry in WriteDataChunksForToCEntry followed by the dumper
function and then '_EndLOs', it should be okay to change this to 'w'
mode. But this code has been there since the start so we haven't made
that change yet. In the patch, we have changed it to 'w' pipe-command
only and added the ideas for potential solutions in the comments.
2. We are also not sure yet on how to handle the environment
issues when trying to add new tests to 002_pg_dump.pl.Please let us know what you think.
Thanks & Regards,
Nitin Motiani
On Tue, Apr 8, 2025 at 7:48 AM Hannu Krosing <hannuk@google.com> wrote:
Just to bring this out separately : Does anybody have any idea why pipe commands close inside tests ?
Re: 003-pg_dump_basic_tests has a few basic validation tests for
correctmflag combinations. We need to write more automated tests in
002_pg_dump.pl but have been running into some issues with environment
setup due to which certain pipe commands result in the shell process
becoming defunct. These same commands are working fine in manual
testing. We are still looking into this.
No comment on the wider project except that it looks generally useful,
and I can see that it's not possible to use the conventional POSIX
filename "-" to represent stdout, because you need to write to
multiple files so you need to come up with *something* along the lines
you're proposing here. But I was interested in seeing if I could help
with that technical problem you mentioned above, and I don't see that
happening with the current patches. Do I understand correctly that
the problem you encountered is in some other tests that you haven't
attached yet? Could you post what you have so that others can see the
problem and perhaps have a chance of helping? I also recommend using
git format-patch when you post patches so that you have a place to
write a commit message including a note about which bits are WIP and
known not to work correctly yet.
Thanks for the feedback, Thomas.
No comment on the wider project except that it looks generally useful,
and I can see that it's not possible to use the conventional POSIX
filename "-" to represent stdout, because you need to write to
multiple files so you need to come up with *something* along the lines
you're proposing here. But I was interested in seeing if I could help
with that technical problem you mentioned above, and I don't see that
happening with the current patches. Do I understand correctly that
the problem you encountered is in some other tests that you haven't
attached yet? Could you post what you have so that others can see the
problem and perhaps have a chance of helping?
Yes, we didn't add the failed tests to the patch. We'll add those and
send new patches.
I also recommend using
git format-patch when you post patches so that you have a place to
write a commit message including a note about which bits are WIP and
known not to work correctly yet.
Will follow these recommendations when sending the next set of patches.
Regards,
Nitin Motiani
Google
Hi,
Apologies for the delay on this thread.
On Mon, Apr 28, 2025 at 1:52 PM Nitin Motiani <nitinmotiani@google.com> wrote:
Thanks for the feedback, Thomas.
Do I understand correctly that
the problem you encountered is in some other tests that you haven't
attached yet? Could you post what you have so that others can see the
problem and perhaps have a chance of helping?Yes, we didn't add the failed tests to the patch. We'll add those and
send new patches.
I'm attaching the patch files generated using git format-patch.
0001 has the pg_dump pipe support code.
0002 has the pg_restore pipe support.
0003 has a few basic validation tests for correct flag combinations.
0004 has the proposed documentation changes.
The above 4 are the same as before.
The 0005 patch is the new WIP patch file. This includes the tests
which we have been trying to add but which are failing (although the
same commands run fine manually).
The tests in this patch are added to src/bin/pg_dump/t/002_pg_dump.pl.
The original attempt was to have a test case with dump and restore
commands using the new flag and run it in multiple scenarios. But
since that was failing, for the ease of debugging I added a few
standalone tests which just run a pg_dump with the pipe-command flag.
In these tests, if the pipe-command is a simple command like 'cat' or
'gzip', the test passes. But if the pipe-command itself uses a pipe
(either to a file or another command), the test fails.
In the following test
['pg_dump', '-Fd', '-B', 'postgres', "--pipe-command=\"cat > $tempdir/%f\"",],]
I get the below error.
# 'sh: line 1: cat >
/usr/local/google/home/nitinmotiani/postgresql/src/bin/pg_dump/tmp_check/tmp_test_XpFO/toc.dat:
No such file or directory
I can see that the temp directory tmp_test_XpFO exists. Even when I
changed the test to use an absolute path to an existing directory, I
got the same error. When I do manual testing with the same
pipe-command, it works fine. That is why we think there is some issue
with our environment setup for the tap test where it is not able to
parse the command.
I also ran the following loop (started just before starting the test
run) to print the output of ps commands around 'cat >' to see what
happens.
for i in $(seq 1 10000); do ps --forest -ef | grep "cat >" -A 5 >>
~/ps_output.txt; done
The printed results showed that the child process with the pipe
command became defunct.
nitinmo+ 3180211 3180160 5 17:05 pts/1 00:00:00 | |
\_ /usr/local/google/home/nitinmotiani/postgresql/tmp_install/usr/local/pgsql/bin/pg_dump
-Fd -B p ostgres --pipe-command="cat >
/usr/local/google/home/nitinmotiani/postgresql/src/bin/pg_dump/definite_dumpdir/%f"
nitinmo+ 3180215 3180211 0 17:05 pts/1 00:00:00 | |
\_ [sh] <defunct>
We are not sure how to handle this issue. Please let us know your thoughts.
Thanks & Regards,
Nitin Motiani
Google
Attachments:
v5-0003-Add-basic-tests-for-pipe-command.patchapplication/octet-stream; name=v5-0003-Add-basic-tests-for-pipe-command.patchDownload
From 87dec759a665729971e162e17600207b3e2f5a22 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sat, 15 Feb 2025 04:29:17 +0000
Subject: [PATCH v5 3/5] Add basic tests for pipe-command
* This currently only adds a few basic tests for pg_dump with --pipe-command.
* These tests include the invalid usages of --pipe-command with other flags.
* We are still working on adding other tests in pg_dump.pl. But
we ran into some issues which might be related to setup.
---
src/bin/pg_dump/t/001_basic.pl | 36 ++++++++++++++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
index 84ca25e17d6..3badd335f8c 100644
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -74,6 +74,42 @@ command_fails_like(
'pg_dump: options --statistics-only and --no-statistics cannot be used together'
);
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-f', 'testdir', 'test'],
+ qr/\Qpg_dump: hint: Only one of [--file, --pipe-command] allowed\E/,
+ 'pg_dump: hint: Only one of [--file, --pipe-command] allowed'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', 'gzip', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '--compress=lz4', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fd', '--pipe-command="cat"', '-Z', '1', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is not supported with any compression type\E/,
+ 'pg_dump: hint: Option --pipe-command is not supported with any compression type'
+);
+
+command_fails_like(
+ [ 'pg_dump', '-Fc', '--pipe-command="cat"', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+ 'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
+command_fails_like(
+ [ 'pg_dump', '--format=tar', '--pipe-command="cat"', 'test'],
+ qr/\Qpg_dump: hint: Option --pipe-command is only supported with directory format.\E/,
+ 'pg_dump: hint: Option --pipe-command is only supported with directory format.'
+);
+
command_fails_like(
[ 'pg_dump', '-j2', '--include-foreign-data=xxx' ],
qr/\Qpg_dump: error: option --include-foreign-data is not supported with parallel backup\E/,
--
2.49.0.1266.g31b7d2e469-goog
v5-0002-Add-pipe-command-support-in-pg_restore.patchapplication/octet-stream; name=v5-0002-Add-pipe-command-support-in-pg_restore.patchDownload
From c543f4b20d79cce3bdc77f328a1fb0560c2cdfdb Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Sat, 15 Feb 2025 08:05:25 +0000
Subject: [PATCH v5 2/5] Add pipe-command support in pg_restore
* This is same as the pg_dump change. We add support
for --pipe-command in directory archive format. This can be used
to read from multiple streams and do pre-processing (decompression
with a custom algorithm, filtering etc) before restore.
Currently that is not possible because the pg_dump output of
directory format can't just be piped.
* Like pg_dump, here also either filename or --pipe-command can be
set. If neither are set, the standard input is used as before.
* This is only supported with compression none and archive format
directory.
* We reuse the inputFileSpec field for the pipe-command. And add
a bool to specify if it is a pipe.
* The changes made for pg_dump to handle the pipe case with popen
and pclose also work here.
* The logic of %f format specifier to read from the pg_dump output
is the same too. Most of the code from the pg_dump commit works.
We add similar logic to the function to read large objects.
* The --pipe command works -l and -L option.
---
src/bin/pg_dump/compress_io.c | 34 +++++++++++++----------
src/bin/pg_dump/pg_backup_directory.c | 13 ++++++++-
src/bin/pg_dump/pg_restore.c | 39 ++++++++++++++++++++++-----
3 files changed, 64 insertions(+), 22 deletions(-)
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index b3e1a133af8..7b151265165 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -259,26 +259,32 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
Assert(strcmp(mode, PG_BINARY_R) == 0);
fname = pg_strdup(path);
-
- if (hasSuffix(fname, ".gz"))
- compression_spec.algorithm = PG_COMPRESSION_GZIP;
- else if (hasSuffix(fname, ".lz4"))
- compression_spec.algorithm = PG_COMPRESSION_LZ4;
- else if (hasSuffix(fname, ".zst"))
- compression_spec.algorithm = PG_COMPRESSION_ZSTD;
- else
+ /*
+ If the path is a pipe command, the compression algorithm
+ is none.
+ */
+ if (!path_is_pipe_command)
{
- if (stat(path, &st) == 0)
- compression_spec.algorithm = PG_COMPRESSION_NONE;
- else if (check_compressed_file(path, &fname, "gz"))
+ if (hasSuffix(fname, ".gz"))
compression_spec.algorithm = PG_COMPRESSION_GZIP;
- else if (check_compressed_file(path, &fname, "lz4"))
+ else if (hasSuffix(fname, ".lz4"))
compression_spec.algorithm = PG_COMPRESSION_LZ4;
- else if (check_compressed_file(path, &fname, "zst"))
+ else if (hasSuffix(fname, ".zst"))
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+ else
+ {
+ if (stat(path, &st) == 0)
+ compression_spec.algorithm = PG_COMPRESSION_NONE;
+ else if (check_compressed_file(path, &fname, "gz"))
+ compression_spec.algorithm = PG_COMPRESSION_GZIP;
+ else if (check_compressed_file(path, &fname, "lz4"))
+ compression_spec.algorithm = PG_COMPRESSION_LZ4;
+ else if (check_compressed_file(path, &fname, "zst"))
+ compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+ }
}
- CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); /* TODO: try to make it work also with pipes */
+ CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
if (!CFH->open_func(fname, -1, mode, CFH))
{
free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index db4b627a0b8..35b7d5ab6e7 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -440,7 +440,18 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
tocfname, line);
StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
- snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+ /* TODO: This logic for naming blob files is common betwen _LoadLOs an _StartLO.
+ * Refactor in a helper function.
+ */
+ if (AH->fSpecIsPipe)
+ {
+ pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
+ strcpy(path, pipe);
+ }
+ else
+ {
+ snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+ }
_PrintFileData(AH, path);
EndRestoreLO(AH, oid);
}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 46d90aec946..f552eabe0d8 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -59,7 +59,7 @@ static void usage(const char *progname);
static void read_restore_filters(const char *filename, RestoreOptions *opts);
static bool file_exists_in_directory(const char *dir, const char *filename);
static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
- int numWorkers, bool append_data, int num);
+ int numWorkers, bool append_data, int num, bool filespec_is_pipe);
static int read_one_statement(StringInfo inBuf, FILE *pfile);
static int restore_all_databases(PGconn *conn, const char *dumpdirpath,
SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers);
@@ -94,6 +94,7 @@ main(int argc, char **argv)
int n_errors = 0;
bool globals_only = false;
SimpleStringList db_exclude_patterns = {NULL, NULL};
+ bool filespec_is_pipe = false;
static int disable_triggers = 0;
static int enable_row_security = 0;
static int if_exists = 0;
@@ -175,6 +176,7 @@ main(int argc, char **argv)
{"statistics-only", no_argument, &statistics_only, 1},
{"filter", required_argument, NULL, 4},
{"exclude-database", required_argument, NULL, 6},
+ {"pipe-command", required_argument, NULL, 7},
{NULL, 0, NULL, 0}
};
@@ -356,6 +358,11 @@ main(int argc, char **argv)
simple_string_list_append(&db_exclude_patterns, optarg);
break;
+ case 7: /* pipe-command */
+ inputFileSpec = pg_strdup(optarg);
+ filespec_is_pipe = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -363,11 +370,23 @@ main(int argc, char **argv)
}
}
- /* Get file name from command line */
+ /* Get file name from command line. Note that filename argument and pipe-command can't both be set. */
if (optind < argc)
+ {
+ if (filespec_is_pipe)
+ {
+ pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
+ exit_nicely(1);
+ }
inputFileSpec = argv[optind++];
- else
+ }
+ /* Even if the file argument is not provided, if the pipe-command is specified, we need to use that
+ * as the file arg and not fallback to stdio.
+ */
+ else if (!filespec_is_pipe)
+ {
inputFileSpec = NULL;
+ }
/* Complain if any arguments remain */
if (optind < argc)
@@ -577,7 +596,7 @@ main(int argc, char **argv)
if (globals_only)
pg_fatal("option -g/--globals-only can be used only when restoring an archive created by pg_dumpall");
- n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0);
+ n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0, filespec_is_pipe);
}
/* Done, print a summary of ignored errors during restore. */
@@ -599,12 +618,18 @@ main(int argc, char **argv)
*/
static int
restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
- int numWorkers, bool append_data, int num)
+ int numWorkers, bool append_data, int num, bool filespec_is_pipe)
{
Archive *AH;
int n_errors;
- AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */
+ if (filespec_is_pipe && opts->format != archDirectory)
+ {
+ pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+ exit_nicely(1);
+ }
+
+ AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
SetArchiveOptions(AH, NULL, opts);
@@ -1267,7 +1292,7 @@ restore_all_databases(PGconn *conn, const char *dumpdirpath,
opts->dumpStatistics = dumpStatistics;
/* Restore the single database. */
- n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count);
+ n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count, false);
/* Print a summary of ignored errors during single database restore. */
if (n_errors)
--
2.49.0.1266.g31b7d2e469-goog
v5-0005-WIP-This-is-WIP-patch-for-adding-tests-to-pg_dump.patchapplication/octet-stream; name=v5-0005-WIP-This-is-WIP-patch-for-adding-tests-to-pg_dump.patchDownload
From ec26ad40b5bb023727045ed54c16b046c7da065a Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Thu, 22 May 2025 10:20:15 +0000
Subject: [PATCH v5 5/5] [WIP] This is WIP patch for adding tests to pg_dump.pl
* The first test in the patch was the test we were originally
trying to add but it kept failing.
* The 3 tests at the bottom were part of the debugging process and
the logs showed that those tests would fail whenever a pipe
was used inside the pipe command i.e. 'cat | gzip' or
'cat > <file>'. But simple 'cat' or 'gzip' without pipe worked.
* The tests need to be cleaned up and rerun to get the exact error messages.
---
src/bin/pg_dump/t/002_pg_dump.pl | 38 ++++++++++++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 386e21e0c59..22222b2f2eb 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -476,6 +476,15 @@ my %pgdump_runs = (
],
},
+ # This test kept failing.
+ defaults_dir_format_pipe => {
+ test_key => 'defaults',
+ dump_cmd => [
+ 'pg_dump', '-Fd', 'postgres',
+ "--pipe-command=\"cat > $tempdir/defaults_dir_format/%f\"",
+ ],
+ },
+
# Do not use --no-sync to give test coverage for data sync.
defaults_parallel => {
test_key => 'defaults',
@@ -5103,6 +5112,35 @@ foreach my $db (sort keys %create_sql)
$node->safe_psql($db, $create_sql{$db});
}
+# These four tests were added temporarily to more easily look through the output.
+# They failed as soon as we used a pipe of any kind whether to file or just to another
+# command. Those commands worked fine manually. So the second & fourth test where
+# we only have cat and gzip without any pipe, they work fine. But the rest don't.
+$node->command_fails_like(
+ [
+ 'pg_dump', '-Fd', '-B', 'postgres',
+ "--pipe-command=\"cat > $tempdir/%f\"",],
+ qr/pg_dump: error: Failure in cat to file/,
+ 'pg_dump: Failure in cat to file using pipe-command');
+
+
+$node->command_ok(
+ [
+ 'pg_dump', '-Fd', '-B', 'postgres',
+ "--pipe-command=\"cat\"",], "Testing just with cat");
+
+$node->command_ok(
+ [
+ 'pg_dump', '-Fd', '-B', 'postgres',
+ "--pipe-command=\"cat | gzip\"",], "Testing with cat piped to gzip");
+
+
+$node->command_ok(
+ [
+ 'pg_dump', '-Fd', '-B', '-x', 'postgres',
+ "--pipe-command=\"gzip\"",], "Testing just with gzip");
+
+
#########################################
# Test connecting to a non-existent database
--
2.49.0.1266.g31b7d2e469-goog
v5-0001-Add-pipe-command-support-for-directory-mode-of-pg.patchapplication/octet-stream; name=v5-0001-Add-pipe-command-support-for-directory-mode-of-pg.patchDownload
From a82dd3abe0f1265125f226621afe3b4565a5f2c7 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Tue, 11 Feb 2025 08:31:02 +0000
Subject: [PATCH v5 1/5] Add pipe-command support for directory mode of pg_dump
* We add a new flag --pipe-command which can be used in directory
mode. This allows us to support multiple streams and we can
do post processing like compression, filtering etc. This is
currently not possible with directory-archive format.
* Currently this flag is only supported with compression none
and archive format directory.
* This flag can't be used with the flag --file. Only one of the
two flags can be used at a time.
* We reuse the filename field for the --pipe-command also. And add a
bool to specify that the field will be used as a pipe command.
* Most of the code remains as it is. The core change is that
in case of --pipe-command, instead of fopen we do popen.
* The user would need a way to store the post-processing output
in files. For that we support the same format as the directory
mode currently does with the flag --file. We allow the user
to add a format specifier %f to the --pipe-command. And for each
stream, the format specifier is replaced with the corresponding
file name. This file name is the same as it would have been if
the flag --file had been used.
* To enable the above, there are a few places in the code where
we change the file name creation logic. Currently the file name
is appended to the directory name which is provided with --file flag.
In case of --pipe-command, we instead replace %f with the file name.
This change is made for the common use case and separately for
blob files.
* There is an open question on what mode to use in case of large objects
TOC file. Currently the code uses "ab" but that won't work for popen.
We have proposed a few options in the comments regarding this. For the
time being we are using mode PG_BINARY_W for the pipe use case.
---
src/bin/pg_dump/compress_gzip.c | 9 ++-
src/bin/pg_dump/compress_gzip.h | 3 +-
src/bin/pg_dump/compress_io.c | 26 ++++---
src/bin/pg_dump/compress_io.h | 11 ++-
src/bin/pg_dump/compress_lz4.c | 11 ++-
src/bin/pg_dump/compress_lz4.h | 3 +-
src/bin/pg_dump/compress_none.c | 28 ++++++--
src/bin/pg_dump/compress_none.h | 3 +-
src/bin/pg_dump/compress_zstd.c | 10 ++-
src/bin/pg_dump/compress_zstd.h | 3 +-
src/bin/pg_dump/pg_backup.h | 5 +-
src/bin/pg_dump/pg_backup_archiver.c | 22 +++---
src/bin/pg_dump/pg_backup_archiver.h | 1 +
src/bin/pg_dump/pg_backup_directory.c | 99 ++++++++++++++++++++++-----
src/bin/pg_dump/pg_dump.c | 37 +++++++++-
src/bin/pg_dump/pg_restore.c | 2 +-
16 files changed, 216 insertions(+), 57 deletions(-)
diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 5a30ebf9bf5..312885d7136 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -389,8 +389,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
void
InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
+ if(path_is_pipe_command)
+ pg_fatal("cPipe command not supported for Gzip");
+
CFH->open_func = Gzip_open;
CFH->open_write_func = Gzip_open_write;
CFH->read_func = Gzip_read;
@@ -415,7 +419,8 @@ InitCompressorGzip(CompressorState *cs,
void
InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "gzip");
}
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index 3bef0d5b1b8..ccf2479cf3b 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -19,6 +19,7 @@
extern void InitCompressorGzip(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 8c3d9c911c4..b3e1a133af8 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -191,20 +191,29 @@ free_keep_errno(void *p)
* Initialize a compress file handle for the specified compression algorithm.
*/
CompressFileHandle *
-InitCompressFileHandle(const pg_compress_specification compression_spec)
+InitCompressFileHandle(const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
CompressFileHandle *CFH;
+
CFH = pg_malloc0(sizeof(CompressFileHandle));
- if (compression_spec.algorithm == PG_COMPRESSION_NONE)
- InitCompressFileHandleNone(CFH, compression_spec);
+ /* TODO: Currently always set to non-compressed when path_is_pipe_command
+ * assuming that external compressor as part of pipe is nore efficient
+ * should review after POC
+ */
+ if (path_is_pipe_command)
+ InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+
+ else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
+ InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
- InitCompressFileHandleGzip(CFH, compression_spec);
+ InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
- InitCompressFileHandleLZ4(CFH, compression_spec);
+ InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
- InitCompressFileHandleZstd(CFH, compression_spec);
+ InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
return CFH;
}
@@ -237,7 +246,8 @@ check_compressed_file(const char *path, char **fname, char *ext)
* On failure, return NULL with an error code in errno.
*/
CompressFileHandle *
-InitDiscoverCompressFileHandle(const char *path, const char *mode)
+InitDiscoverCompressFileHandle(const char *path, const char *mode,
+ bool path_is_pipe_command)
{
CompressFileHandle *CFH = NULL;
struct stat st;
@@ -268,7 +278,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
compression_spec.algorithm = PG_COMPRESSION_ZSTD;
}
- CFH = InitCompressFileHandle(compression_spec);
+ CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); /* TODO: try to make it work also with pipes */
if (!CFH->open_func(fname, -1, mode, CFH))
{
free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index db9b38744c8..affc287c66e 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -185,6 +185,11 @@ struct CompressFileHandle
*/
pg_compress_specification compression_spec;
+ /*
+ * Compression specification for this file handle.
+ */
+ bool path_is_pipe_command;
+
/*
* Private data to be used by the compressor.
*/
@@ -194,7 +199,8 @@ struct CompressFileHandle
/*
* Initialize a compress file handle with the requested compression.
*/
-extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
+extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
/*
* Initialize a compress file stream. Infer the compression algorithm
@@ -202,6 +208,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
* suffixes in 'path'.
*/
extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
- const char *mode);
+ const char *mode,
+ bool path_is_pipe_command);
extern bool EndCompressFileHandle(CompressFileHandle *CFH);
#endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index e99f0cad71f..7e7713936bf 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -751,10 +751,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
*/
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
LZ4State *state;
+ if(path_is_pipe_command)
+ pg_fatal("cPipe command not supported for LZ4");
+
CFH->open_func = LZ4Stream_open;
CFH->open_write_func = LZ4Stream_open_write;
CFH->read_func = LZ4Stream_read;
@@ -770,6 +774,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
if (CFH->compression_spec.level >= 0)
state->prefs.compressionLevel = CFH->compression_spec.level;
+ CFH->path_is_pipe_command = path_is_pipe_command;
+
CFH->private_data = state;
}
#else /* USE_LZ4 */
@@ -782,7 +788,8 @@ InitCompressorLZ4(CompressorState *cs,
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "LZ4");
}
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 7f7216cc648..d52e6071519 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -19,6 +19,7 @@
extern void InitCompressorLZ4(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index 3fc89c99854..b7392824b68 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -153,7 +153,12 @@ close_none(CompressFileHandle *CFH)
CFH->private_data = NULL;
if (fp)
- ret = fclose(fp);
+ {
+ if(CFH->path_is_pipe_command)
+ ret = pclose(fp);
+ else
+ ret = fclose(fp);
+ }
return ret == 0;
}
@@ -172,7 +177,12 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
if (fd >= 0)
CFH->private_data = fdopen(dup(fd), mode);
else
- CFH->private_data = fopen(path, mode);
+ {
+ if (CFH->path_is_pipe_command)
+ CFH->private_data = popen(path, mode);
+ else
+ CFH->private_data = fopen(path, mode);
+ }
if (CFH->private_data == NULL)
return false;
@@ -185,7 +195,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
{
Assert(CFH->private_data == NULL);
- CFH->private_data = fopen(path, mode);
+ pg_log_debug("Opening %s, pipe is %s",
+ path, CFH->path_is_pipe_command ? "true" : "false");
+
+ if (CFH->path_is_pipe_command)
+ CFH->private_data = popen(path, mode);
+ else
+ CFH->private_data = fopen(path, mode);
+
if (CFH->private_data == NULL)
return false;
@@ -198,7 +215,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
void
InitCompressFileHandleNone(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
CFH->open_func = open_none;
CFH->open_write_func = open_write_none;
@@ -210,5 +228,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
CFH->eof_func = eof_none;
CFH->get_error_func = get_error_none;
+ CFH->path_is_pipe_command = path_is_pipe_command;
+
CFH->private_data = NULL;
}
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index f927f196c36..1399c8bde3b 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -19,6 +19,7 @@
extern void InitCompressorNone(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index cb595b10c2d..a7a37643acd 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -26,7 +26,8 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
}
void
-InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
pg_fatal("this build does not support compression with %s", "ZSTD");
}
@@ -524,8 +525,12 @@ Zstd_get_error(CompressFileHandle *CFH)
void
InitCompressFileHandleZstd(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec)
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command)
{
+ if(path_is_pipe_command)
+ pg_fatal("cPipe command not supported for Zstd");
+
CFH->open_func = Zstd_open;
CFH->open_write_func = Zstd_open_write;
CFH->read_func = Zstd_read;
@@ -537,6 +542,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
CFH->get_error_func = Zstd_get_error;
CFH->compression_spec = compression_spec;
+ CFH->path_is_pipe_command = path_is_pipe_command;
CFH->private_data = NULL;
}
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index af21db48ded..1644b6d6eba 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -20,6 +20,7 @@
extern void InitCompressorZstd(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
- const pg_compress_specification compression_spec);
+ const pg_compress_specification compression_spec,
+ bool path_is_pipe_command);
#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index af0007fb6d2..18be674ab7f 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -311,14 +311,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
extern void RestoreArchive(Archive *AHX, bool append_data);
/* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
/* Create a new archive */
extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker,
- DataDirSyncMethod sync_method);
+ DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe); /* ^^ what are the casing rules here ?? */
/* The --list option */
extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 197c1295d93..af34af6b380 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -54,7 +54,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupWorkerPtr,
- DataDirSyncMethod sync_method);
+ DataDirSyncMethod sync_method, bool FileSpecIsPipe);
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
static char *sanitize_line(const char *str, bool want_hyphen);
@@ -230,11 +230,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
SetupWorkerPtrType setupDumpWorker,
- DataDirSyncMethod sync_method)
+ DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe)
{
ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
- dosync, mode, setupDumpWorker, sync_method);
+ dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe);
return (Archive *) AH;
}
@@ -242,7 +243,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
/* Open an existing archive */
/* Public */
Archive *
-OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe)
{
ArchiveHandle *AH;
pg_compress_specification compression_spec = {0};
@@ -250,7 +251,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
compression_spec.algorithm = PG_COMPRESSION_NONE;
AH = _allocAH(FileSpec, fmt, compression_spec, true,
archModeRead, setupRestoreWorker,
- DATA_DIR_SYNC_METHOD_FSYNC);
+ DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe);
return (Archive *) AH;
}
@@ -1705,7 +1706,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
else
mode = PG_BINARY_W;
- CFH = InitCompressFileHandle(compression_spec);
+ CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
if (!CFH->open_func(filename, fn, mode, CFH))
{
@@ -2362,7 +2363,8 @@ static ArchiveHandle *
_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
bool dosync, ArchiveMode mode,
- SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
+ SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method,
+ bool FileSpecIsPipe)
{
ArchiveHandle *AH;
CompressFileHandle *CFH;
@@ -2403,6 +2405,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
else
AH->fSpec = NULL;
+ AH->fSpecIsPipe = FileSpecIsPipe;
+
AH->currUser = NULL; /* unknown */
AH->currSchema = NULL; /* ditto */
AH->currTablespace = NULL; /* ditto */
@@ -2415,14 +2419,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->mode = mode;
AH->compression_spec = compression_spec;
- AH->dosync = dosync;
+ AH->dosync = FileSpecIsPipe ? false : dosync;
AH->sync_method = sync_method;
memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
/* Open stdout with no compression for AH output handle */
out_compress_spec.algorithm = PG_COMPRESSION_NONE;
- CFH = InitCompressFileHandle(out_compress_spec);
+ CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
pg_fatal("could not open stdout for appending: %m");
AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 365073b3eae..d7fa3086184 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,6 +301,7 @@ struct _archiveHandle
int loCount; /* # of LOs restored */
char *fSpec; /* Archive File Spec */
+ bool fSpecIsPipe; /* fSpec is a pipe command template requiring replacing %f with file name */
FILE *FH; /* General purpose file handle */
void *OF; /* Output file */
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 21b00792a8a..db4b627a0b8 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -39,7 +39,8 @@
#include <dirent.h>
#include <sys/stat.h>
-#include "common/file_utils.h"
+//#include "common/file_utils.h"
+#include "common/percentrepl.h"
#include "compress_io.h"
#include "dumputils.h"
#include "parallel.h"
@@ -157,8 +158,11 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
if (AH->mode == archModeWrite)
{
- /* we accept an empty existing directory */
- create_or_open_dir(ctx->directory);
+ if(!AH->fSpecIsPipe) /* no checks for pipe */
+ {
+ /* we accept an empty existing directory */
+ create_or_open_dir(ctx->directory);
+ }
}
else
{ /* Read Mode */
@@ -167,7 +171,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
setFilePath(AH, fname, "toc.dat");
- tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
+ tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
if (tocFH == NULL)
pg_fatal("could not open input file \"%s\": %m", fname);
@@ -295,7 +299,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
setFilePath(AH, fname, tctx->filename);
- ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+ ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
@@ -359,7 +363,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
if (!filename)
return;
- CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+ CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
if (!CFH)
pg_fatal("could not open input file \"%s\": %m", filename);
@@ -417,7 +421,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
*/
setFilePath(AH, tocfname, tctx->filename);
- CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
+ CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
if (ctx->LOsTocFH == NULL)
pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -428,6 +432,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
{
char lofname[MAXPGPATH + 1];
char path[MAXPGPATH];
+ char* pipe;
/* Can't overflow because line and lofname are the same length */
if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -563,7 +568,7 @@ _CloseArchive(ArchiveHandle *AH)
/* The TOC is always created uncompressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
- tocFH = InitCompressFileHandle(compression_spec);
+ tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
ctx->dataFH = tocFH;
@@ -624,13 +629,42 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
lclTocEntry *tctx = (lclTocEntry *) te->formatData;
pg_compress_specification compression_spec = {0};
char fname[MAXPGPATH];
+ const char *mode;
setFilePath(AH, fname, tctx->filename);
/* The LO TOC file is never compressed */
compression_spec.algorithm = PG_COMPRESSION_NONE;
- ctx->LOsTocFH = InitCompressFileHandle(compression_spec);
- if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH))
+ ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+ /* TODO: Finalize the correct approach for the mode.
+ * The mode for the LOs TOC file has been "ab" from the start. That
+ * is something we can't do for pipe-command as popen only supports
+ * read and write. Just changing it to 'w' was not expected to be enough
+ * and one possible solution considered is to open it in 'w' mode and
+ * keep it open till all the LOs in the dump group are done.
+ *
+ * The analysis of the current code shows that there is one ToCEntry
+ * per blob group. And it is written by @WriteDataChunksForToCEntry.
+ * This function calls _StartLOs once before the dumper function and
+ * and _EndLOs once after the dumper. And the dumper dumps all the
+ * LOs in the group. So a blob_NNN.toc is only opened once and closed
+ * after all the entries are written. Therefore the mode can be made 'w'
+ * for all the cases. We tested changing the mode to PG_BINARY_W and
+ * the tests passed. But in case there are some missing scenarios, we
+ * have not made that change here. Instead for now only doing
+ * it for the pipe command.
+ * In short there are 3 solutions :
+ * 1. Change the mode for everything (preferred)
+ * 2. Change it only for pipe-command (done for time-being)
+ * 3. Change it for pipe-command and then cache those handles
+ * and close them in the end (based on the code review, we might
+ * pick this).
+ */
+ if (AH->fSpecIsPipe)
+ mode = PG_BINARY_W;
+ else
+ mode = "ab";
+ if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -644,10 +678,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
lclContext *ctx = (lclContext *) AH->formatData;
char fname[MAXPGPATH];
+ char* pipe;
+ char blob_name[MAXPGPATH];
- snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+ if(AH->fSpecIsPipe)
+ {
+ snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+ pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
+ strcpy(fname, pipe);
+ /* TODO:figure out how to free the allocated string when replace_percent_placeholders isused in frontend*/
+ }
+ else
+ {
+ snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+ }
- ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+ ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
pg_fatal("could not open output file \"%s\": %m", fname);
}
@@ -708,15 +754,26 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
{
lclContext *ctx = (lclContext *) AH->formatData;
char *dname;
+ char *pipe;
dname = ctx->directory;
- if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
- pg_fatal("file name too long: \"%s\"", dname);
- strcpy(buf, dname);
- strcat(buf, "/");
- strcat(buf, relativeFilename);
+ if(AH->fSpecIsPipe)
+ {
+ pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+ strcpy(buf, pipe);
+ /* TODO:figure out how to free the allocated string when replace_percent_placeholders isused in frontend*/
+ }
+ else /* replace all ocurrences of %f in dname with relativeFilename */
+ {
+ if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
+ pg_fatal("file name too long: \"%s\"", dname);
+
+ strcpy(buf, dname);
+ strcat(buf, "/");
+ strcat(buf, relativeFilename);
+ }
}
/*
@@ -758,17 +815,23 @@ _PrepParallelRestore(ArchiveHandle *AH)
* only need an approximate indicator of that.
*/
setFilePath(AH, fname, tctx->filename);
+ pg_log_error("filename: %s", fname);
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
{
+ if(AH->fSpecIsPipe)
+ pg_log_error("pipe and compressed");
if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
strlcat(fname, ".gz", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
strlcat(fname, ".lz4", sizeof(fname));
- else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+ else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD){
+ pg_log_error("filename: %s", fname);
strlcat(fname, ".zst", sizeof(fname));
+ pg_log_error("filename: %s", fname);
+ }
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 37432e66efd..689b1859067 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -411,6 +411,7 @@ main(int argc, char **argv)
{
int c;
const char *filename = NULL;
+ bool filename_is_pipe = false;
const char *format = "p";
TableInfo *tblinfo;
int numTables;
@@ -530,6 +531,7 @@ main(int argc, char **argv)
{"filter", required_argument, NULL, 16},
{"exclude-extension", required_argument, NULL, 17},
{"sequence-data", no_argument, &dopt.sequence_data, 1},
+ {"pipe-command", required_argument, NULL, 25},
{NULL, 0, NULL, 0}
};
@@ -601,7 +603,13 @@ main(int argc, char **argv)
break;
case 'f':
+ if (filename != NULL)
+ {
+ pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+ exit_nicely(1);
+ }
filename = pg_strdup(optarg);
+ filename_is_pipe = false; /* it already is, setting again here just for clarity */
break;
case 'F':
@@ -798,6 +806,16 @@ main(int argc, char **argv)
with_statistics = true;
break;
+ case 25: /* pipe command */
+ if (filename != NULL)
+ {
+ pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+ exit_nicely(1);
+ }
+ filename = pg_strdup(optarg);
+ filename_is_pipe = true;
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -890,13 +908,26 @@ main(int argc, char **argv)
if (archiveFormat == archNull)
plainText = 1;
+ if (filename_is_pipe && archiveFormat != archDirectory)
+ {
+ pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+ exit_nicely(1);
+ }
+
+ if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+ {
+ pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
+ exit_nicely(1);
+ }
+
/*
* Custom and directory formats are compressed by default with gzip when
* available, not the others. If gzip is not available, no compression is
- * done by default.
+ * done by default. If directory format is being used with pipe-command,
+ * no compression is done.
*/
if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
- !user_compression_defined)
+ !filename_is_pipe && !user_compression_defined)
{
#ifdef HAVE_LIBZ
compression_algorithm_str = "gzip";
@@ -946,7 +977,7 @@ main(int argc, char **argv)
/* Open the output file */
fout = CreateArchive(filename, archiveFormat, compression_spec,
- dosync, archiveMode, setupDumpWorker, sync_method);
+ dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
/* Make dump options accessible right away */
SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index f2182e91825..46d90aec946 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -604,7 +604,7 @@ restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
Archive *AH;
int n_errors;
- AH = OpenArchive(inputFileSpec, opts->format);
+ AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */
SetArchiveOptions(AH, NULL, opts);
--
2.49.0.1266.g31b7d2e469-goog
v5-0004-Add-documentation-for-pipe-command-in-pg_dump-and.patchapplication/octet-stream; name=v5-0004-Add-documentation-for-pipe-command-in-pg_dump-and.patchDownload
From 7df17cbe5e32226c72a63fb6c97b308908dcf849 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <nitinmotiani@google.com>
Date: Fri, 4 Apr 2025 14:34:48 +0000
Subject: [PATCH v5 4/5] Add documentation for pipe-command in pg_dump and
pg_restore
* Add the descriptions of the new flags and constraints
regarding which mode and other flags they can't be used with.
* Explain the purpose of the flags.
* Add a few examples of the usage of the flags.
---
doc/src/sgml/ref/pg_dump.sgml | 56 ++++++++++++++++++++++++++
doc/src/sgml/ref/pg_restore.sgml | 68 +++++++++++++++++++++++++++++++-
2 files changed, 123 insertions(+), 1 deletion(-)
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index d7595a7e546..18bc67d61fc 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -287,6 +287,7 @@ PostgreSQL documentation
specifies the target directory instead of a file. In this case the
directory is created by <command>pg_dump</command> and must not exist
before.
+ This option and <option>--pipe-command</option> can't be used together.
</para>
</listitem>
</varlistentry>
@@ -1234,6 +1235,32 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>--pipe-command</option></term>
+ <listitem>
+ <para>
+ This option is only supported with the directory output
+ format. It can be used to write to multiple streams which
+ otherwise would not be possible with the directory mode.
+ For each stream, it starts a process which runs the
+ specified command and pipes the pg_dump output to this
+ process.
+ This option is not valid if <option>--file</option>
+ is also specified.
+ </para>
+ <para>
+ The pipe-command can be used to perform operations like compress
+ using a custom algorithm, filter, or write the output to a cloud
+ storage etc. The user would need a way to pipe the final output of
+ each stream to a file. To handle that, the pipe command supports a format
+ specifier %f. And all the instances of %f in the command string
+ will be replaced with the corresponding file name which
+ would have been used in the directory mode with <option>--file</option>.
+ See <xref linkend="pg-dump-examples"/> below.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>--quote-all-identifiers</option></term>
<listitem>
@@ -1783,6 +1810,35 @@ CREATE DATABASE foo WITH TEMPLATE template0;
</screen>
</para>
+ <para>
+ To use pipe-command to dump a database into a directory-format archive
+ (the directory <literal>dumpdir</literal> needs to exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command to dump a database into a directory-format archive
+ in parallel with 5 worker jobs (the directory <literal>dumpdir</literal> needs to exist
+ before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb -j 5 --pipe-command="cat > dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command to compress and dump a database into a
+ directory-format archive (the directory <literal>dumpdir</literal> needs to
+ exist before running the command).
+
+<screen>
+<prompt>$</prompt> <userinput>pg_dump -Fd mydb --pipe-command="gzip > dumpdir/%f.gz"</userinput>
+</screen>
+ </para>
+
<para>
To reload an archive file into a (freshly created) database named
<literal>newdb</literal>:
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index 2295df62d03..5a929162702 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -93,7 +93,10 @@ PostgreSQL documentation
<para>
Specifies the location of the archive file (or directory, for a
directory-format archive) to be restored.
- If not specified, the standard input is used.
+ This option and <option>--pipe-command</option> can't be set
+ at the same time.
+ If neither this option nor <option>--pipe-command</option> is specified,
+ the standard input is used.
</para>
</listitem>
</varlistentry>
@@ -842,6 +845,32 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>--pipe-command</option></term>
+ <listitem>
+ <para>
+ This option is only supported with the directory output
+ format. It can be used to read from multiple streams which
+ otherwise would not be possible with the directory mode.
+ For each stream, it starts a process which runs the
+ specified command and pipes its output to the pg_restore process.
+ This option is not valid if <option>filename</option> is also specified.
+ </para>
+ <para>
+ The pipe-command can be used to perform operations like
+ decompress using a custom algorithm, filter, or read from
+ a cloud storage. When reading from the pg_dump output,
+ the user would need a way to read the correct file in each
+ stream. To handle that, the pipe command supports a format
+ specifier %f. And all the instances of %f in the command string
+ will be replaced with the corresponding file name which
+ would have been used in the directory mode with <option>filename</option>.
+ This is same as the <option>--pipe-command</option> of pg-dump.
+ See <xref linkend="app-pgrestore-examples"/> below.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term>
<listitem>
@@ -1263,6 +1292,43 @@ CREATE DATABASE foo WITH TEMPLATE template0;
<prompt>$</prompt> <userinput>pg_restore -L db.list db.dump</userinput>
</screen></para>
+ <para>
+ To use pg_restore with pipe-command to recreate from a dump in
+ directory-archive format. The database should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in <literal>dumpdir</literal>.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pg_restore with pipe-command to first decompress and then
+ recreate from a dump in directory-archive format. The database
+ should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in <literal>dumpdir</literal>. And all files are
+ <literal>gzip</literal> compressed.
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f.gz | gunzip"</userinput>
+</screen>
+ </para>
+
+ <para>
+ To use pipe-command along with <option>-L</option> to recreate only
+ selectd items from a dump in the directory-archive format.
+ The database should not exist beforehand.
+ Assume in this example that the dump in directory-archive format is
+ stored in dumpdir.
+ The <literal>db.list</literal> file is the same as one used in the previous example with <option>-L</option>
+
+<screen>
+<prompt>$</prompt> <userinput>pg_restore -C -Fd -d postgres --pipe-commnad="cat dumpdir/%f" -L db.list</userinput>
+</screen>
+ </para>
+
</refsect1>
<refsect1>
--
2.49.0.1266.g31b7d2e469-goog
I have added this to the commitfest
We would be grateful for any reviews and feedback on this.
When adding to commitfest I tried to put Nitin as "first author" as he
has done the bulk of the work (I did just a quick pg_dump-only PoC)
but it looks like Commitfest just orders all provided authors
alphabetically .
Hi,
Very interesting patch. One question: is it possible with this patch to pipe pg_dump directory output directly into pg_restore with this patch? Looking at the code I don't believe that is the case but figured I would ask.
Thanks,
Andrew Jackson
Hi,
Went ahead and experimented with your patch a bit. To answer my previous question this patch can be used to pipe pg_dump directly into pg_restore. This should absolutely be added as another use case to your list above as it is a well known limitation that you can use pg_dump/psql to do buffered copy but only with a single process, while using pg_dump/pg_restore is capable of multiprocessed copy but it must be saved to disk in its entirety before the restore can begin. This is extremely frustrating when dealing with large databases where you don't want multiple copies saved on disk and because it's not as fast as it can be. With this patch you can get the best of both worlds.
Example dump
```bash
pg_dump --jobs=4 -Fd "${connection_str}" --pipe-command="mkfifo dumpdir/%f; cat >> dumpdir/%f"
```
Example restore run in different process
```bash
pg_restore --jobs=4 -Fd --dbname="${another_connection_str}" ./dumpdir
```
Thanks,
Andrew Jackson
On Thu, Jun 5, 2025 at 6:09 PM Nitin Motiani <nitinmotiani@google.com> wrote:
Hi,
Apologies for the delay on this thread.
On Mon, Apr 28, 2025 at 1:52 PM Nitin Motiani <nitinmotiani@google.com> wrote:
Thanks for the feedback, Thomas.
Do I understand correctly that
the problem you encountered is in some other tests that you haven't
attached yet? Could you post what you have so that others can see the
problem and perhaps have a chance of helping?Yes, we didn't add the failed tests to the patch. We'll add those and
send new patches.I'm attaching the patch files generated using git format-patch.
0001 has the pg_dump pipe support code.
0002 has the pg_restore pipe support.
0003 has a few basic validation tests for correct flag combinations.
0004 has the proposed documentation changes.The above 4 are the same as before.
The 0005 patch is the new WIP patch file. This includes the tests
which we have been trying to add but which are failing (although the
same commands run fine manually).The tests in this patch are added to src/bin/pg_dump/t/002_pg_dump.pl.
The original attempt was to have a test case with dump and restore
commands using the new flag and run it in multiple scenarios. But
since that was failing, for the ease of debugging I added a few
standalone tests which just run a pg_dump with the pipe-command flag.
In these tests, if the pipe-command is a simple command like 'cat' or
'gzip', the test passes. But if the pipe-command itself uses a pipe
(either to a file or another command), the test fails.In the following test
['pg_dump', '-Fd', '-B', 'postgres', "--pipe-command=\"cat > $tempdir/%f\"",],]
I get the below error.
# 'sh: line 1: cat >
/usr/local/google/home/nitinmotiani/postgresql/src/bin/pg_dump/tmp_check/tmp_test_XpFO/toc.dat:
No such file or directoryI can see that the temp directory tmp_test_XpFO exists. Even when I
changed the test to use an absolute path to an existing directory, I
got the same error. When I do manual testing with the same
pipe-command, it works fine. That is why we think there is some issue
with our environment setup for the tap test where it is not able to
parse the command.I also ran the following loop (started just before starting the test
run) to print the output of ps commands around 'cat >' to see what
happens.for i in $(seq 1 10000); do ps --forest -ef | grep "cat >" -A 5 >>
~/ps_output.txt; doneThe printed results showed that the child process with the pipe
command became defunct.nitinmo+ 3180211 3180160 5 17:05 pts/1 00:00:00 | |
\_ /usr/local/google/home/nitinmotiani/postgresql/tmp_install/usr/local/pgsql/bin/pg_dump
-Fd -B p ostgres --pipe-command="cat >
/usr/local/google/home/nitinmotiani/postgresql/src/bin/pg_dump/definite_dumpdir/%f"
nitinmo+ 3180215 3180211 0 17:05 pts/1 00:00:00 | |
\_ [sh] <defunct>We are not sure how to handle this issue. Please let us know your thoughts.
The latest patch set is not applying on HEAD can you rebase the patch
set. And also there are many TODOs in the patch, if those TODOs are
just good to do and you are planning for future development better to
get rid of those. OTOH if some of those TODOs are mandatory to do
before we can commit the patch then are you planning to work on those
soon? I am planning to review this patch so are you planning to send
the rebased version with implementing the TODO which are required for
the first version.
--
Regards,
Dilip Kumar
Google
On Tue, Sep 9, 2025 at 12:07 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
The latest patch set is not applying on HEAD can you rebase the patch
set. And also there are many TODOs in the patch, if those TODOs are
just good to do and you are planning for future development better to
get rid of those. OTOH if some of those TODOs are mandatory to do
before we can commit the patch then are you planning to work on those
soon? I am planning to review this patch so are you planning to send
the rebased version with implementing the TODO which are required for
the first version.
Thanks for the feedback, Dilip. We will rebase the patch soon and send
it. Regarding the TODOs, some of those are plans for future
development (i.e. refactor). There are also TODOs in the first patch
file 0001 which are actually removed in file 0002. We can clean those
up or combine the two files. Other than that, some are about the open
questions. We will remove those from the code and will discuss those
issues on the thread.
Thanks,
Nitin Motiani
Google