diff --git a/postgresql-ef28e05/src/backend/commands/copy.c b/postgresql-ef28e05/src/backend/commands/copy.c index 0567ab0..30a7f19 100644 --- a/postgresql-ef28e05/src/backend/commands/copy.c +++ b/postgresql-ef28e05/src/backend/commands/copy.c @@ -51,6 +51,8 @@ #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') +#define WHITESPACE " \t" + /* * Represents the different source/dest cases we need to worry about at * the bottom level @@ -107,6 +109,8 @@ typedef struct CopyStateData QueryDesc *queryDesc; /* executable query to copy from */ List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ + char *progname; /* used for popen */ + StringInfo cmdbuf; /* used for popen */ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool csv_mode; /* Comma Separated Value format? */ @@ -1382,11 +1386,21 @@ BeginCopy(bool is_from, static void EndCopy(CopyState cstate) { - if (cstate->filename != NULL && FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); + if (cstate->progname != NULL) + { + if (ClosePipeStream(cstate->copy_file) == -1) + ereport(ERROR, + (errmsg("could not execute command \"%s\"", + cstate->cmdbuf->data))); + } + else + { + if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->filename))); + } MemoryContextDelete(cstate->copycontext); pfree(cstate); @@ -1443,28 +1457,81 @@ BeginCopyTo(Relation rel, } else { - mode_t oumask; /* Pre-existing umask value */ struct stat st; + char* buffer = NULL; + char* token; + int count = 0; + + buffer = pstrdup(filename); + token = strtok(buffer, WHITESPACE); + Assert(token != NULL); + while (token != NULL) { + count++; + token = strtok(NULL, WHITESPACE); + } + + if (count != 1 && count !=2) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument \"%s\" not recognized", filename))); + if (count == 1) + { + cstate->progname = NULL; + cstate->filename = pstrdup(filename); + } + if (count == 2) + { + buffer = pstrdup(filename); + token = strtok(buffer, WHITESPACE); + cstate->progname = pstrdup(token); + token = strtok(NULL, WHITESPACE); + cstate->filename = pstrdup(token); + } /* * Prevent write to relative path ... too easy to shoot oneself in the * foot by overwriting a database file ... */ - if (!is_absolute_path(filename)) + if (!is_absolute_path(cstate->filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); - cstate->filename = pstrdup(filename); - oumask = umask(S_IWGRP | S_IWOTH); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); - umask(oumask); + if (cstate->progname != NULL) + { + char my_exec_path[MAXPGPATH]; - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - cstate->filename))); + if (find_my_exec(cstate->progname, my_exec_path) < 0) + elog(ERROR, "could not locate command \"%s\"", + cstate->progname); + + cstate->cmdbuf = makeStringInfo(); + appendStringInfoString(cstate->cmdbuf, my_exec_path); + appendStringInfoChar(cstate->cmdbuf, ' '); + appendStringInfoString(cstate->cmdbuf, cstate->filename); + + cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data, + PG_BINARY_W); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errmsg("could not execute command \"%s\": %m", + cstate->cmdbuf->data))); + } + else + { + mode_t oumask; /* Pre-existing umask value */ + + oumask = umask(S_IWGRP | S_IWOTH); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); + umask(oumask); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->filename))); + } fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) @@ -2377,15 +2444,67 @@ BeginCopyFrom(Relation rel, else { struct stat st; + char* buffer = NULL; + char* token; + int count = 0; - cstate->filename = pstrdup(filename); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + buffer = pstrdup(filename); + token = strtok(buffer, WHITESPACE); + Assert(token != NULL); + while (token != NULL) { + count++; + token = strtok(NULL, WHITESPACE); + } - if (cstate->copy_file == NULL) + if (count != 1 && count !=2) ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument \"%s\" not recognized", filename))); + if (count == 1) + { + cstate->progname = NULL; + cstate->filename = pstrdup(filename); + } + if (count == 2) + { + buffer = pstrdup(filename); + token = strtok(buffer, WHITESPACE); + cstate->progname = pstrdup(token); + token = strtok(NULL, WHITESPACE); + cstate->filename = pstrdup(token); + } + + if (cstate->progname != NULL) + { + char my_exec_path[MAXPGPATH]; + + if (find_my_exec(cstate->progname, my_exec_path) < 0) + elog(ERROR, "could not locate command \"%s\"", + cstate->progname); + + cstate->cmdbuf = makeStringInfo(); + appendStringInfoString(cstate->cmdbuf, my_exec_path); + appendStringInfoChar(cstate->cmdbuf, ' '); + appendStringInfoString(cstate->cmdbuf, cstate->filename); + + cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data, + PG_BINARY_R); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errmsg("could not execute command \"%s\": %m", + cstate->cmdbuf->data))); + } + else + { + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename))); + } fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) diff --git a/postgresql-ef28e05/src/backend/storage/file/fd.c b/postgresql-ef28e05/src/backend/storage/file/fd.c index ecb62ba..c679657 100644 --- a/postgresql-ef28e05/src/backend/storage/file/fd.c +++ b/postgresql-ef28e05/src/backend/storage/file/fd.c @@ -183,6 +183,7 @@ static uint64 temporary_files_size = 0; typedef enum { AllocateDescFile, + AllocateDescPipe, AllocateDescDir } AllocateDescKind; @@ -1539,6 +1540,9 @@ FreeDesc(AllocateDesc *desc) case AllocateDescFile: result = fclose(desc->desc.file); break; + case AllocateDescPipe: + result = pclose_check(desc->desc.file); + break; case AllocateDescDir: result = closedir(desc->desc.dir); break; @@ -1583,6 +1587,77 @@ FreeFile(FILE *file) return fclose(file); } +FILE * +OpenPipeStream(const char *command, const char *mode) +{ + FILE *file; + + DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)", + numAllocatedDescs, command)); + + /* + * The test against MAX_ALLOCATED_DESCS prevents us from overflowing + * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile + * from hogging every one of the available FDs, which'd lead to infinite + * looping. + */ + if (numAllocatedDescs >= MAX_ALLOCATED_DESCS || + numAllocatedDescs >= max_safe_fds - 1) + elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"", + command); + +TryAgain: + fflush(stdout); + fflush(stderr); + errno = 0; + if ((file = popen(command, mode)) != NULL) + { + AllocateDesc *desc = &allocatedDescs[numAllocatedDescs]; + + desc->kind = AllocateDescPipe; + desc->desc.file = file; + desc->create_subid = GetCurrentSubTransactionId(); + numAllocatedDescs++; + return desc->desc.file; + } + + if (errno == EMFILE || errno == ENFILE) + { + int save_errno = errno; + + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("out of file descriptors: %m; release and retry"))); + errno = 0; + if (ReleaseLruFile()) + goto TryAgain; + errno = save_errno; + } + + return NULL; +} + +int +ClosePipeStream(FILE *file) +{ + int i; + + DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs)); + + /* Remove file from list of allocated files, if it's present */ + for (i = numAllocatedDescs; --i >= 0;) + { + AllocateDesc *desc = &allocatedDescs[i]; + + if (desc->kind == AllocateDescPipe && desc->desc.file == file) + return FreeDesc(desc); + } + + /* Only get here if someone passes us a file not in allocatedDescs */ + elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream"); + + return pclose_check(file); +} /* * Routines that want to use (ie, DIR*) should use AllocateDir diff --git a/postgresql-ef28e05/src/bin/psql/copy.c b/postgresql-ef28e05/src/bin/psql/copy.c index 6cc1f46..5ecc30e 100644 --- a/postgresql-ef28e05/src/bin/psql/copy.c +++ b/postgresql-ef28e05/src/bin/psql/copy.c @@ -52,6 +52,7 @@ struct copy_options char *before_tofrom; /* COPY string before TO/FROM */ char *after_tofrom; /* COPY string after TO/FROM filename */ char *file; /* NULL = stdin/stdout */ + char *prog; /* NULL = fopen, otherwise = popen */ bool psql_inout; /* true = use psql stdin/stdout */ bool from; /* true = FROM, false = TO */ }; @@ -65,6 +66,7 @@ free_copy_options(struct copy_options * ptr) free(ptr->before_tofrom); free(ptr->after_tofrom); free(ptr->file); + free(ptr->prog); free(ptr); } @@ -89,6 +91,7 @@ parse_slash_copy(const char *args) struct copy_options *result; char *token; const char *whitespace = " \t\n\r"; + char *buffer = NULL; char nonstd_backslash = standard_strings() ? 0 : '\\'; if (!args) @@ -211,8 +214,7 @@ parse_slash_copy(const char *args) else { result->psql_inout = false; - result->file = pg_strdup(token); - expand_tilde(&result->file); + buffer = pg_strdup(token); } /* Collect the rest of the line (COPY options) */ @@ -221,6 +223,44 @@ parse_slash_copy(const char *args) if (token) result->after_tofrom = pg_strdup(token); + if (buffer) + { + int count = 0; + + token = strtokx(buffer, " \t", NULL, NULL, + 0, false, false, pset.encoding); + if (!token) + psql_error("\\copy: parse error at \"%s\"\n", buffer); + while (token != NULL) { + count++; + token = strtokx(NULL, " \t", NULL, NULL, + 0, false, false, pset.encoding); + } + if (count != 1 && count != 2) + psql_error("\\copy: parse error at \"%s\"\n", buffer); + + if (count == 1) + { + result->prog = NULL; + result->file = pg_strdup(buffer); + } + if (count == 2) + { + token = strtokx(buffer, " \t", NULL, NULL, + 0, false, false, pset.encoding); + if (!token) + psql_error("\\copy: parse error at \"%s\"\n", buffer); + result->prog = pg_strdup(token); + token = strtokx(NULL, " \t", NULL, NULL, + 0, false, false, pset.encoding); + if (!token) + psql_error("\\copy: parse error at \"%s\"\n", buffer); + result->file = pg_strdup(token); + } + expand_tilde(&result->file); + free(buffer); + } + return result; error: @@ -247,6 +287,7 @@ do_copy(const char *args) FILE *save_file; FILE **override_file; struct copy_options *options; + char *command = NULL; bool success; struct stat st; @@ -260,12 +301,43 @@ do_copy(const char *args) if (options->file) canonicalize_path(options->file); + if (options->prog) + { + PQExpBufferData cmdbuf; + char full_path[MAXPGPATH]; + int ret; + + ret = find_my_exec(options->prog, full_path); + if (ret) + { + psql_error("could not locate program: %s\n", options->prog); + free_copy_options(options); + return false; + } + initPQExpBuffer(&cmdbuf); + appendPQExpBufferStr(&cmdbuf, full_path); + appendPQExpBuffer(&cmdbuf, " "); + appendPQExpBufferStr(&cmdbuf, options->file); + command = pg_strdup(cmdbuf.data); + termPQExpBuffer(&cmdbuf); + } + if (options->from) { override_file = &pset.cur_cmd_source; if (options->file) - copystream = fopen(options->file, PG_BINARY_R); + { + if (options->prog) + { + fflush(stdout); + fflush(stderr); + errno = 0; + copystream = popen(command, PG_BINARY_R); + } + else + copystream = fopen(options->file, PG_BINARY_R); + } else if (!options->psql_inout) copystream = pset.cur_cmd_source; else @@ -276,7 +348,17 @@ do_copy(const char *args) override_file = &pset.queryFout; if (options->file) - copystream = fopen(options->file, PG_BINARY_W); + { + if (options->prog) + { + fflush(stdout); + fflush(stderr); + errno = 0; + copystream = popen(command, PG_BINARY_W); + } + else + copystream = fopen(options->file, PG_BINARY_W); + } else if (!options->psql_inout) copystream = pset.queryFout; else @@ -285,8 +367,12 @@ do_copy(const char *args) if (!copystream) { - psql_error("%s: %s\n", - options->file, strerror(errno)); + if (options->prog) + psql_error("could not execute command \"%s\": %s\n", + command, strerror(errno)); + else + psql_error("%s: %s\n", + options->file, strerror(errno)); free_copy_options(options); return false; } @@ -322,13 +408,25 @@ do_copy(const char *args) if (options->file != NULL) { - if (fclose(copystream) != 0) + if (options->prog) + { + if (pclose_check(copystream) == -1) + { + psql_error("could not execute command \"%s\"\n", command); + success = false; + } + } + else { - psql_error("%s: %s\n", options->file, strerror(errno)); - success = false; + if (fclose(copystream) != 0) + { + psql_error("%s: %s\n", options->file, strerror(errno)); + success = false; + } } } free_copy_options(options); + free(command); return success; } diff --git a/postgresql-ef28e05/src/include/storage/fd.h b/postgresql-ef28e05/src/include/storage/fd.h index 849bb10..b90bab9 100644 --- a/postgresql-ef28e05/src/include/storage/fd.h +++ b/postgresql-ef28e05/src/include/storage/fd.h @@ -79,6 +79,10 @@ extern char *FilePathName(File file); extern FILE *AllocateFile(const char *name, const char *mode); extern int FreeFile(FILE *file); +/* Operations that allow use of pipe stream */ +extern FILE *OpenPipeStream(const char *command, const char *mode); +extern int ClosePipeStream(FILE *file); + /* Operations to allow use of the library routines */ extern DIR *AllocateDir(const char *dirname); extern struct dirent *ReadDir(DIR *dir, const char *dirname);