diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 81fc4e2..af1959b 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -272,7 +272,7 @@ file_fdw_validator(PG_FUNCTION_ARGS) /* * Now apply the core COPY code's validation logic for more checks. */ - ProcessCopyOptions(NULL, true, other_options); + ProcessCopyOptions(NULL, true, false, other_options); /* * Filename option is required for file_fdw foreign tables. diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 0567ab0..252fc6e 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -100,6 +100,8 @@ typedef struct CopyStateData EolType eol_type; /* EOL type of input */ int file_encoding; /* file or remote side's character encoding */ bool need_transcoding; /* file encoding diff from server? */ + StringInfo cmdbuf; /* used to hold the whole command string + * if either preproc or postproc is set */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ /* parameters from the COPY command */ @@ -124,6 +126,8 @@ typedef struct CopyStateData bool *force_notnull_flags; /* per-column CSV FNN flags */ bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ + char *preproc; /* preprocessor for COPY FROM */ + char *postproc; /* postprocessor for COPY TO */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ /* these are just for error messages, see CopyFromErrorCallback */ @@ -272,8 +276,9 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query, - const char *queryString, List *attnamelist, List *options); +static CopyState BeginCopy(bool is_from, Relation rel, + Node *raw_query, const char *queryString, + bool is_pipe, List *attnamelist, List *options); static void EndCopy(CopyState cstate); static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString, const char *filename, List *attnamelist, List *options); @@ -851,6 +856,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) void ProcessCopyOptions(CopyState cstate, bool is_from, + bool is_pipe, List *options) { bool format_specified = false; @@ -998,6 +1004,22 @@ ProcessCopyOptions(CopyState cstate, errmsg("argument to option \"%s\" must be a valid encoding name", defel->defname))); } + else if (strcmp(defel->defname, "preprocessor") == 0) + { + if (cstate->preproc) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + cstate->preproc = defGetString(defel); + } + else if (strcmp(defel->defname, "postprocessor") == 0) + { + if (cstate->postproc) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + cstate->postproc = defGetString(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1136,6 +1158,26 @@ ProcessCopyOptions(CopyState cstate, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); + + /* Check preproc */ + if (cstate->preproc && !is_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify PREPROCESSOR in COPY TO"))); + if (cstate->preproc && is_pipe) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify PREPROCESSOR in COPY FROM STDIN"))); + + /* Check postproc */ + if (cstate->postproc && is_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify POSTPROCESSOR in COPY FROM"))); + if (cstate->postproc && is_pipe) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify POSTPROCESSOR in COPY TO STDOUT"))); } /* @@ -1158,6 +1200,7 @@ BeginCopy(bool is_from, Relation rel, Node *raw_query, const char *queryString, + bool is_pipe, List *attnamelist, List *options) { @@ -1182,7 +1225,7 @@ BeginCopy(bool is_from, oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Extract options from the statement node tree */ - ProcessCopyOptions(cstate, is_from, options); + ProcessCopyOptions(cstate, is_from, is_pipe, options); /* Process the source/target relation or query */ if (rel) @@ -1382,11 +1425,21 @@ BeginCopy(bool is_from, static void EndCopy(CopyState cstate) { - if (cstate->filename != NULL && FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not close file \"%s\": %m", - cstate->filename))); + if (cstate->preproc != NULL || cstate->postproc != NULL) + { + if (ClosePipeStream(cstate->copy_file) == -1) + ereport(ERROR, + (errmsg("could not execute command \"%s\"", + cstate->cmdbuf->data))); + } + else + { + if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->filename))); + } MemoryContextDelete(cstate->copycontext); pfree(cstate); @@ -1433,7 +1486,8 @@ BeginCopyTo(Relation rel, RelationGetRelationName(rel)))); } - cstate = BeginCopy(false, rel, query, queryString, attnamelist, options); + cstate = BeginCopy(false, rel, query, queryString, + pipe, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); if (pipe) @@ -1443,7 +1497,6 @@ BeginCopyTo(Relation rel, } else { - mode_t oumask; /* Pre-existing umask value */ struct stat st; /* @@ -1456,15 +1509,42 @@ BeginCopyTo(Relation rel, errmsg("relative path not allowed for COPY to file"))); cstate->filename = pstrdup(filename); - oumask = umask(S_IWGRP | S_IWOTH); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); - umask(oumask); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for writing: %m", - cstate->filename))); + if (cstate->postproc != NULL) + { + char my_exec_path[MAXPGPATH]; + + if (find_my_exec(cstate->postproc, my_exec_path) < 0) + elog(ERROR, "could not locate postprocessor \"%s\"", + cstate->postproc); + + cstate->cmdbuf = makeStringInfo(); + appendStringInfoString(cstate->cmdbuf, my_exec_path); + appendStringInfoChar(cstate->cmdbuf, ' '); + appendStringInfoString(cstate->cmdbuf, cstate->filename); + + cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data, + PG_BINARY_W); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errmsg("could not execute command \"%s\": %m", + cstate->cmdbuf->data))); + } + else + { + mode_t oumask; /* Pre-existing umask value */ + + oumask = umask(S_IWGRP | S_IWOTH); + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); + umask(oumask); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for writing: %m", + cstate->filename))); + } fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) @@ -2288,7 +2368,7 @@ BeginCopyFrom(Relation rel, MemoryContext oldcontext; bool volatile_defexprs; - cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options); + cstate = BeginCopy(true, rel, NULL, NULL, pipe, attnamelist, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); /* Initialize state variables */ @@ -2379,13 +2459,38 @@ BeginCopyFrom(Relation rel, struct stat st; cstate->filename = pstrdup(filename); - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); + if (cstate->preproc != NULL) + { + char my_exec_path[MAXPGPATH]; + + if (find_my_exec(cstate->preproc, my_exec_path) < 0) + elog(ERROR, "could not locate preprocessor \"%s\"", + cstate->preproc); + + cstate->cmdbuf = makeStringInfo(); + appendStringInfoString(cstate->cmdbuf, my_exec_path); + appendStringInfoChar(cstate->cmdbuf, ' '); + appendStringInfoString(cstate->cmdbuf, cstate->filename); + + cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data, + PG_BINARY_R); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errmsg("could not execute command \"%s\": %m", + cstate->cmdbuf->data))); + } + else + { + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename))); + } fstat(fileno(cstate->copy_file), &st); if (S_ISDIR(st.st_mode)) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 5894cb0..09796f4 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -547,8 +547,8 @@ static void processCASbits(int cas_bits, int location, const char *constrType, ORDER OUT_P OUTER_P OVER OVERLAPS OVERLAY OWNED OWNER PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION - PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY - PRIOR PRIVILEGES PROCEDURAL PROCEDURE + POSTPROCESSOR PRECEDING PRECISION PREPROCESSOR PRESERVE PREPARE PREPARED + PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE @@ -2360,6 +2360,14 @@ copy_opt_item: { $$ = makeDefElem("force_not_null", (Node *)$4); } + | PREPROCESSOR Sconst + { + $$ = makeDefElem("preprocessor", (Node *)makeString($2)); + } + | POSTPROCESSOR Sconst + { + $$ = makeDefElem("postprocessor", (Node *)makeString($2)); + } | ENCODING Sconst { $$ = makeDefElem("encoding", (Node *)makeString($2)); @@ -12520,9 +12528,11 @@ unreserved_keyword: | PASSING | PASSWORD | PLANS + | POSTPROCESSOR | PRECEDING | PREPARE | PREPARED + | PREPROCESSOR | PRESERVE | PRIOR | PRIVILEGES diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index fed25fd..eaa594a 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -182,6 +182,7 @@ static uint64 temporary_files_size = 0; typedef enum { AllocateDescFile, + AllocateDescPipe, AllocateDescDir } AllocateDescKind; @@ -1542,6 +1543,56 @@ TryAgain: return NULL; } +FILE * +OpenPipeStream(const char *command, const char *mode) +{ + FILE *file; + + DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)", + numAllocatedDescs, command)); + + /* + * The test against MAX_ALLOCATED_DESCS prevents us from overflowing + * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile + * from hogging every one of the available FDs, which'd lead to infinite + * looping. + */ + if (numAllocatedDescs >= MAX_ALLOCATED_DESCS || + numAllocatedDescs >= max_safe_fds - 1) + elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"", + command); + +TryAgain: + fflush(stdout); + fflush(stderr); + errno = 0; + if ((file = popen(command, mode)) != NULL) + { + AllocateDesc *desc = &allocatedDescs[numAllocatedDescs]; + + desc->kind = AllocateDescPipe; + desc->desc.file = file; + desc->create_subid = GetCurrentSubTransactionId(); + numAllocatedDescs++; + return desc->desc.file; + } + + if (errno == EMFILE || errno == ENFILE) + { + int save_errno = errno; + + ereport(LOG, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("out of file descriptors: %m; release and retry"))); + errno = 0; + if (ReleaseLruFile()) + goto TryAgain; + errno = save_errno; + } + + return NULL; +} + /* * Free an AllocateDesc of either type. * @@ -1558,6 +1609,9 @@ FreeDesc(AllocateDesc *desc) case AllocateDescFile: result = fclose(desc->desc.file); break; + case AllocateDescPipe: + result = pclose_check(desc->desc.file); + break; case AllocateDescDir: result = closedir(desc->desc.dir); break; @@ -1602,6 +1656,28 @@ FreeFile(FILE *file) return fclose(file); } +int +ClosePipeStream(FILE *file) +{ + int i; + + DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs)); + + /* Remove file from list of allocated files, if it's present */ + for (i = numAllocatedDescs; --i >= 0;) + { + AllocateDesc *desc = &allocatedDescs[i]; + + if (desc->kind == AllocateDescPipe && desc->desc.file == file) + return FreeDesc(desc); + } + + /* Only get here if someone passes us a file not in allocatedDescs */ + elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream"); + + return pclose_check(file); +} + /* * Routines that want to use (ie, DIR*) should use AllocateDir diff --git a/src/bin/psql/copy.c b/src/bin/psql/copy.c index 22fcc59..38a6c52 100644 --- a/src/bin/psql/copy.c +++ b/src/bin/psql/copy.c @@ -54,6 +54,9 @@ struct copy_options char *file; /* NULL = stdin/stdout */ bool psql_inout; /* true = use psql stdin/stdout */ bool from; /* true = FROM, false = TO */ + char *proc; /* preprocessor/postprocessor, if any */ + char *command; /* used to hold the whole command string + * if preprocessor/postprocessor is set */ }; @@ -65,6 +68,8 @@ free_copy_options(struct copy_options * ptr) free(ptr->before_tofrom); free(ptr->after_tofrom); free(ptr->file); + free(ptr->proc); + free(ptr->command); free(ptr); } @@ -88,6 +93,8 @@ parse_slash_copy(const char *args) { struct copy_options *result; char *token; + char *prev_token; + char *rest; const char *whitespace = " \t\n\r"; char nonstd_backslash = standard_strings() ? 0 : '\\'; @@ -216,10 +223,76 @@ parse_slash_copy(const char *args) } /* Collect the rest of the line (COPY options) */ - token = strtokx(NULL, "", NULL, NULL, + rest = strtokx(NULL, "", NULL, NULL, + 0, false, false, pset.encoding); + if (!rest || + (!strstr(rest, "preprocessor") && !strstr(rest, "postprocessor"))) + { + if (rest) + result->after_tofrom = pg_strdup(rest); + return result; + } + result->after_tofrom = pg_strdup(""); + token = strtokx(rest, whitespace, ",()", "\"'", 0, false, false, pset.encoding); - if (token) - result->after_tofrom = pg_strdup(token); + while (token) + { + if (pg_strcasecmp(token, "preprocessor") == 0 || + pg_strcasecmp(token, "postprocessor") == 0) + { + if ( prev_token[0] != ',' && prev_token[0] != '(') + goto error; + + if (result->proc != NULL) + goto error; + if (result->file == NULL) + goto error; + if (!result->from && pg_strcasecmp(token, "preprocessor") == 0) + goto error; + if (result->from && pg_strcasecmp(token, "postprocessor") == 0) + goto error; + + token = strtokx(NULL, whitespace, NULL, "'", + 0, false, true, pset.encoding); + if (!token) + goto error; + result->proc = pg_strdup(token); + + token = strtokx(NULL, whitespace, ",()", "\"'", + 0, false, false, pset.encoding); + if (!token) + goto error; + if (token[0] != ',' && token[0] != ')') + goto error; + if (token[0] == ')') + { + int len; + + token = strtokx(NULL, whitespace, ",()", "\"'", + 0, false, false, pset.encoding); + if (token) + goto error; + if (prev_token[0] == ',') + { + len = strlen(result->after_tofrom); + result->after_tofrom[len - 1] = ')'; + } + if (prev_token[0] == '(') + result->after_tofrom = pg_strdup(""); + break; + } + + token = strtokx(NULL, whitespace, NULL, "\"'", + 0, false, false, pset.encoding); + if (!token) + goto error; + } + xstrcat(&result->after_tofrom, " "); + xstrcat(&result->after_tofrom, token); + prev_token = token; + token = strtokx(NULL, whitespace, ".,()", "\"'", + 0, false, false, pset.encoding); + } return result; @@ -260,12 +333,48 @@ do_copy(const char *args) if (options->file) canonicalize_path(options->file); + if (options->proc) + { + PQExpBufferData cmdbuf; + char full_path[MAXPGPATH]; + int ret; + + ret = find_my_exec(options->proc, full_path); + if (ret) + { + if (options->from) + psql_error("could not locate preprocessor: %s\n", + options->proc); + else + psql_error("could not locate postprocessor: %s\n", + options->proc); + free_copy_options(options); + return false; + } + initPQExpBuffer(&cmdbuf); + appendPQExpBufferStr(&cmdbuf, full_path); + appendPQExpBuffer(&cmdbuf, " "); + appendPQExpBufferStr(&cmdbuf, options->file); + options->command = pg_strdup(cmdbuf.data); + termPQExpBuffer(&cmdbuf); + } + if (options->from) { override_file = &pset.cur_cmd_source; if (options->file) - copystream = fopen(options->file, PG_BINARY_R); + { + if (!options->proc) + copystream = fopen(options->file, PG_BINARY_R); + else + { + fflush(stdout); + fflush(stderr); + errno = 0; + copystream = popen(options->command, PG_BINARY_R); + } + } else if (!options->psql_inout) copystream = pset.cur_cmd_source; else @@ -276,7 +385,17 @@ do_copy(const char *args) override_file = &pset.queryFout; if (options->file) - copystream = fopen(options->file, PG_BINARY_W); + { + if (!options->proc) + copystream = fopen(options->file, PG_BINARY_W); + else + { + fflush(stdout); + fflush(stderr); + errno = 0; + copystream = popen(options->command, PG_BINARY_W); + } + } else if (!options->psql_inout) copystream = pset.queryFout; else @@ -285,8 +404,12 @@ do_copy(const char *args) if (!copystream) { - psql_error("%s: %s\n", - options->file, strerror(errno)); + if (!options->proc) + psql_error("%s: %s\n", + options->file, strerror(errno)); + else + psql_error("could not execute command \"%s\": %s\n", + options->command, strerror(errno)); free_copy_options(options); return false; } @@ -322,10 +445,23 @@ do_copy(const char *args) if (options->file != NULL) { - if (fclose(copystream) != 0) + if (!options->proc) { - psql_error("%s: %s\n", options->file, strerror(errno)); - success = false; + if (fclose(copystream) != 0) + { + psql_error("%s: %s\n", + options->file, strerror(errno)); + success = false; + } + } + else + { + if (pclose_check(copystream) == -1) + { + psql_error("could not execute command \"%s\"\n", + options->command); + success = false; + } } } free_copy_options(options); diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 8680ac3..b6ab6a1 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -23,7 +23,8 @@ typedef struct CopyStateData *CopyState; extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString); -extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options); +extern void ProcessCopyOptions(CopyState cstate, bool is_from, + bool is_pipe, List *options); extern CopyState BeginCopyFrom(Relation rel, const char *filename, List *attnamelist, List *options); extern void EndCopyFrom(CopyState cstate); diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index af60dac..f0cf0a4 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -282,10 +282,12 @@ PG_KEYWORD("password", PASSWORD, UNRESERVED_KEYWORD) PG_KEYWORD("placing", PLACING, RESERVED_KEYWORD) PG_KEYWORD("plans", PLANS, UNRESERVED_KEYWORD) PG_KEYWORD("position", POSITION, COL_NAME_KEYWORD) +PG_KEYWORD("postprocessor", POSTPROCESSOR, UNRESERVED_KEYWORD) PG_KEYWORD("preceding", PRECEDING, UNRESERVED_KEYWORD) PG_KEYWORD("precision", PRECISION, COL_NAME_KEYWORD) PG_KEYWORD("prepare", PREPARE, UNRESERVED_KEYWORD) PG_KEYWORD("prepared", PREPARED, UNRESERVED_KEYWORD) +PG_KEYWORD("preprocessor", PREPROCESSOR, UNRESERVED_KEYWORD) PG_KEYWORD("preserve", PRESERVE, UNRESERVED_KEYWORD) PG_KEYWORD("primary", PRIMARY, RESERVED_KEYWORD) PG_KEYWORD("prior", PRIOR, UNRESERVED_KEYWORD) diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h index bad9f10..69d5388 100644 --- a/src/include/storage/fd.h +++ b/src/include/storage/fd.h @@ -80,6 +80,10 @@ extern char *FilePathName(File file); extern FILE *AllocateFile(const char *name, const char *mode); extern int FreeFile(FILE *file); +/* Operations that allow use of pipe stream */ +extern FILE *OpenPipeStream(const char *command, const char *mode); +extern int ClosePipeStream(FILE *file); + /* Operations to allow use of the library routines */ extern DIR *AllocateDir(const char *dirname); extern struct dirent *ReadDir(DIR *dir, const char *dirname);