WIP patch: add (PRE|POST)PROCESSOR options to COPY

Started by Etsuro Fujitaover 13 years ago45 messages
#1Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
1 attachment(s)

I'd like to add the following options to the SQL COPY command and the psql \copy
instruction:

* PREPROCESSOR: Specifies the user-supplied program for COPY IN. The data
from an input file is preprocessed by the program before the data is loaded into
a postgres table.
* POSTPROCESSOR: Specifies the user-supplied program for COPY OUT. The data
from a postgres table is postprocessed by the program before the data is stored
in an output file.

These options can be specified only when an input or output file is specified.

These options allow to move data between postgres tables and e.g., compressed
files or files on a distributed file system such as Hadoop HDFS. Former
examples are shown below:

$ echo '/bin/gunzip -c $1' > decompress.sh
$ chmod +x decompress.sh

postgres=# COPY foo FROM '/home/pgsql/foo.csv.gz' WITH (format 'csv',
preprocessor '/home/pgsql/decompress.sh');

$ echo '/bin/gzip > $1' > compress.sh
$ chmod +x compress.sh

postgres=# COPY bar TO '/home/pgsql/bar.csv.gz' WITH (format 'csv',
postprocessor '/home/pgsql/compress.sh');

Attached is a WIP patch. Comments and questions are welcome.

(By using these options, I think it's also possible to develop a variant of
file_fdw, for example a compressed file wrapper and Hadoop HDFS wrapper.)

Thanks,

Best regards,
Etsuro Fujita

Attachments:

copy_options.patchapplication/octet-stream; name=copy_options.patchDownload
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 81fc4e2..af1959b 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -272,7 +272,7 @@ file_fdw_validator(PG_FUNCTION_ARGS)
 	/*
 	 * Now apply the core COPY code's validation logic for more checks.
 	 */
-	ProcessCopyOptions(NULL, true, other_options);
+	ProcessCopyOptions(NULL, true, false, other_options);
 
 	/*
 	 * Filename option is required for file_fdw foreign tables.
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 0567ab0..252fc6e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -100,6 +100,8 @@ typedef struct CopyStateData
 	EolType		eol_type;		/* EOL type of input */
 	int			file_encoding;	/* file or remote side's character encoding */
 	bool		need_transcoding;		/* file encoding diff from server? */
+	StringInfo	cmdbuf;			/* used to hold the whole command string
+								 * if either preproc or postproc is set */
 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
 
 	/* parameters from the COPY command */
@@ -124,6 +126,8 @@ typedef struct CopyStateData
 	bool	   *force_notnull_flags;	/* per-column CSV FNN flags */
 	bool		convert_selectively;	/* do selective binary conversion? */
 	List	   *convert_select;	/* list of column names (can be NIL) */
+	char 	   *preproc;		/* preprocessor for COPY FROM */
+	char	   *postproc;		/* postprocessor for COPY TO */
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
 
 	/* these are just for error messages, see CopyFromErrorCallback */
@@ -272,8 +276,9 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
-		  const char *queryString, List *attnamelist, List *options);
+static CopyState BeginCopy(bool is_from, Relation rel,
+						   Node *raw_query, const char *queryString,
+						   bool is_pipe, List *attnamelist, List *options);
 static void EndCopy(CopyState cstate);
 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
 			const char *filename, List *attnamelist, List *options);
@@ -851,6 +856,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 void
 ProcessCopyOptions(CopyState cstate,
 				   bool is_from,
+				   bool is_pipe,
 				   List *options)
 {
 	bool		format_specified = false;
@@ -998,6 +1004,22 @@ ProcessCopyOptions(CopyState cstate,
 						 errmsg("argument to option \"%s\" must be a valid encoding name",
 								defel->defname)));
 		}
+		else if (strcmp(defel->defname, "preprocessor") == 0)
+		{
+			if (cstate->preproc)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			cstate->preproc = defGetString(defel);
+		}
+		else if (strcmp(defel->defname, "postprocessor") == 0)
+		{
+			if (cstate->postproc)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			cstate->postproc = defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1136,6 +1158,26 @@ ProcessCopyOptions(CopyState cstate,
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("CSV quote character must not appear in the NULL specification")));
+
+	/* Check preproc */
+	if (cstate->preproc && !is_from)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify PREPROCESSOR in COPY TO")));
+	if (cstate->preproc && is_pipe)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify PREPROCESSOR in COPY FROM STDIN")));
+
+	/* Check postproc */
+	if (cstate->postproc && is_from)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify POSTPROCESSOR in COPY FROM")));
+	if (cstate->postproc && is_pipe)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify POSTPROCESSOR in COPY TO STDOUT")));
 }
 
 /*
@@ -1158,6 +1200,7 @@ BeginCopy(bool is_from,
 		  Relation rel,
 		  Node *raw_query,
 		  const char *queryString,
+		  bool is_pipe,
 		  List *attnamelist,
 		  List *options)
 {
@@ -1182,7 +1225,7 @@ BeginCopy(bool is_from,
 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
 	/* Extract options from the statement node tree */
-	ProcessCopyOptions(cstate, is_from, options);
+	ProcessCopyOptions(cstate, is_from, is_pipe, options);
 
 	/* Process the source/target relation or query */
 	if (rel)
@@ -1382,11 +1425,21 @@ BeginCopy(bool is_from,
 static void
 EndCopy(CopyState cstate)
 {
-	if (cstate->filename != NULL && FreeFile(cstate->copy_file))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not close file \"%s\": %m",
-						cstate->filename)));
+	if (cstate->preproc != NULL || cstate->postproc != NULL)
+	{
+		if (ClosePipeStream(cstate->copy_file) == -1)
+			ereport(ERROR,
+					(errmsg("could not execute command \"%s\"",
+							cstate->cmdbuf->data)));
+	}
+	else
+	{
+		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close file \"%s\": %m",
+							cstate->filename)));
+	}
 
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
@@ -1433,7 +1486,8 @@ BeginCopyTo(Relation rel,
 							RelationGetRelationName(rel))));
 	}
 
-	cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
+	cstate = BeginCopy(false, rel, query, queryString,
+					   pipe, attnamelist, options);
 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
 	if (pipe)
@@ -1443,7 +1497,6 @@ BeginCopyTo(Relation rel,
 	}
 	else
 	{
-		mode_t		oumask;		/* Pre-existing umask value */
 		struct stat st;
 
 		/*
@@ -1456,15 +1509,42 @@ BeginCopyTo(Relation rel,
 					 errmsg("relative path not allowed for COPY to file")));
 
 		cstate->filename = pstrdup(filename);
-		oumask = umask(S_IWGRP | S_IWOTH);
-		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
-		umask(oumask);
 
-		if (cstate->copy_file == NULL)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not open file \"%s\" for writing: %m",
-							cstate->filename)));
+		if (cstate->postproc != NULL)
+		{
+			char		my_exec_path[MAXPGPATH];
+
+			if (find_my_exec(cstate->postproc, my_exec_path) < 0)
+				elog(ERROR, "could not locate postprocessor \"%s\"",
+					 cstate->postproc);
+
+			cstate->cmdbuf = makeStringInfo();
+			appendStringInfoString(cstate->cmdbuf, my_exec_path);
+			appendStringInfoChar(cstate->cmdbuf, ' ');
+			appendStringInfoString(cstate->cmdbuf, cstate->filename);
+
+			cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data,
+											   PG_BINARY_W);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errmsg("could not execute command \"%s\": %m",
+								cstate->cmdbuf->data)));
+		}
+		else
+		{
+			mode_t		oumask;		/* Pre-existing umask value */
+
+			oumask = umask(S_IWGRP | S_IWOTH);
+			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+			umask(oumask);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for writing: %m",
+								cstate->filename)));
+		}
 
 		fstat(fileno(cstate->copy_file), &st);
 		if (S_ISDIR(st.st_mode))
@@ -2288,7 +2368,7 @@ BeginCopyFrom(Relation rel,
 	MemoryContext oldcontext;
 	bool		volatile_defexprs;
 
-	cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
+	cstate = BeginCopy(true, rel, NULL, NULL, pipe, attnamelist, options);
 	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
 
 	/* Initialize state variables */
@@ -2379,13 +2459,38 @@ BeginCopyFrom(Relation rel,
 		struct stat st;
 
 		cstate->filename = pstrdup(filename);
-		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
 
-		if (cstate->copy_file == NULL)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not open file \"%s\" for reading: %m",
-							cstate->filename)));
+		if (cstate->preproc != NULL)
+		{
+			char		my_exec_path[MAXPGPATH];
+
+			if (find_my_exec(cstate->preproc, my_exec_path) < 0)
+				elog(ERROR, "could not locate preprocessor \"%s\"",
+					 cstate->preproc);
+
+			cstate->cmdbuf = makeStringInfo();
+			appendStringInfoString(cstate->cmdbuf, my_exec_path);
+			appendStringInfoChar(cstate->cmdbuf, ' ');
+			appendStringInfoString(cstate->cmdbuf, cstate->filename);
+
+			cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data,
+											   PG_BINARY_R);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errmsg("could not execute command \"%s\": %m",
+								cstate->cmdbuf->data)));
+		}
+		else
+		{
+			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for reading: %m",
+								cstate->filename)));
+		}
 
 		fstat(fileno(cstate->copy_file), &st);
 		if (S_ISDIR(st.st_mode))
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 5894cb0..09796f4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -547,8 +547,8 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 	ORDER OUT_P OUTER_P OVER OVERLAPS OVERLAY OWNED OWNER
 
 	PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION
-	PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
-	PRIOR PRIVILEGES PROCEDURAL PROCEDURE
+	POSTPROCESSOR PRECEDING PRECISION PREPROCESSOR PRESERVE PREPARE PREPARED
+	PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE
 
 	QUOTE
 
@@ -2360,6 +2360,14 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("force_not_null", (Node *)$4);
 				}
+			| PREPROCESSOR Sconst
+				{
+					$$ = makeDefElem("preprocessor", (Node *)makeString($2));
+				}
+			| POSTPROCESSOR Sconst
+				{
+					$$ = makeDefElem("postprocessor", (Node *)makeString($2));
+				}
 			| ENCODING Sconst
 				{
 					$$ = makeDefElem("encoding", (Node *)makeString($2));
@@ -12520,9 +12528,11 @@ unreserved_keyword:
 			| PASSING
 			| PASSWORD
 			| PLANS
+			| POSTPROCESSOR
 			| PRECEDING
 			| PREPARE
 			| PREPARED
+			| PREPROCESSOR
 			| PRESERVE
 			| PRIOR
 			| PRIVILEGES
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index fed25fd..eaa594a 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -182,6 +182,7 @@ static uint64 temporary_files_size = 0;
 typedef enum
 {
 	AllocateDescFile,
+	AllocateDescPipe,
 	AllocateDescDir
 } AllocateDescKind;
 
@@ -1542,6 +1543,56 @@ TryAgain:
 	return NULL;
 }
 
+FILE *
+OpenPipeStream(const char *command, const char *mode)
+{
+	FILE	   *file;
+
+	DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
+			   numAllocatedDescs, command));
+
+	/*
+	 * The test against MAX_ALLOCATED_DESCS prevents us from overflowing
+	 * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile
+	 * from hogging every one of the available FDs, which'd lead to infinite
+	 * looping.
+	 */
+	if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
+		numAllocatedDescs >= max_safe_fds - 1)
+		elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"",
+			 command);
+
+TryAgain:
+	fflush(stdout);
+	fflush(stderr);
+	errno = 0;
+	if ((file = popen(command, mode)) != NULL)
+	{
+		AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
+
+		desc->kind = AllocateDescPipe;
+		desc->desc.file = file;
+		desc->create_subid = GetCurrentSubTransactionId();
+		numAllocatedDescs++;
+		return desc->desc.file;
+	}
+
+	if (errno == EMFILE || errno == ENFILE)
+	{
+		int			save_errno = errno;
+
+		ereport(LOG,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("out of file descriptors: %m; release and retry")));
+		errno = 0;
+		if (ReleaseLruFile())
+			goto TryAgain;
+		errno = save_errno;
+	}
+
+	return NULL;
+}
+
 /*
  * Free an AllocateDesc of either type.
  *
@@ -1558,6 +1609,9 @@ FreeDesc(AllocateDesc *desc)
 		case AllocateDescFile:
 			result = fclose(desc->desc.file);
 			break;
+		case AllocateDescPipe:
+			result = pclose_check(desc->desc.file);
+			break;
 		case AllocateDescDir:
 			result = closedir(desc->desc.dir);
 			break;
@@ -1602,6 +1656,28 @@ FreeFile(FILE *file)
 	return fclose(file);
 }
 
+int
+ClosePipeStream(FILE *file)
+{
+	int			i;
+
+	DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs));
+
+	/* Remove file from list of allocated files, if it's present */
+	for (i = numAllocatedDescs; --i >= 0;)
+	{
+		AllocateDesc *desc = &allocatedDescs[i];
+
+		if (desc->kind == AllocateDescPipe && desc->desc.file == file)
+			return FreeDesc(desc);
+	}
+
+	/* Only get here if someone passes us a file not in allocatedDescs */
+	elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream");
+
+	return pclose_check(file);
+}
+
 
 /*
  * Routines that want to use <dirent.h> (ie, DIR*) should use AllocateDir
diff --git a/src/bin/psql/copy.c b/src/bin/psql/copy.c
index 22fcc59..38a6c52 100644
--- a/src/bin/psql/copy.c
+++ b/src/bin/psql/copy.c
@@ -54,6 +54,9 @@ struct copy_options
 	char	   *file;			/* NULL = stdin/stdout */
 	bool		psql_inout;		/* true = use psql stdin/stdout */
 	bool		from;			/* true = FROM, false = TO */
+	char	   *proc;			/* preprocessor/postprocessor, if any */
+	char	   *command;		/* used to hold the whole command string
+								 * if preprocessor/postprocessor is set */
 };
 
 
@@ -65,6 +68,8 @@ free_copy_options(struct copy_options * ptr)
 	free(ptr->before_tofrom);
 	free(ptr->after_tofrom);
 	free(ptr->file);
+	free(ptr->proc);
+	free(ptr->command);
 	free(ptr);
 }
 
@@ -88,6 +93,8 @@ parse_slash_copy(const char *args)
 {
 	struct copy_options *result;
 	char	   *token;
+	char	   *prev_token;
+	char	   *rest;
 	const char *whitespace = " \t\n\r";
 	char		nonstd_backslash = standard_strings() ? 0 : '\\';
 
@@ -216,10 +223,76 @@ parse_slash_copy(const char *args)
 	}
 
 	/* Collect the rest of the line (COPY options) */
-	token = strtokx(NULL, "", NULL, NULL,
+	rest = strtokx(NULL, "", NULL, NULL,
+				   0, false, false, pset.encoding);
+	if (!rest ||
+		(!strstr(rest, "preprocessor") && !strstr(rest, "postprocessor")))
+	{
+		if (rest)
+			result->after_tofrom = pg_strdup(rest);
+		return result;
+	}
+	result->after_tofrom = pg_strdup("");
+	token = strtokx(rest, whitespace, ",()", "\"'",
 					0, false, false, pset.encoding);
-	if (token)
-		result->after_tofrom = pg_strdup(token);
+	while (token)
+	{
+		if (pg_strcasecmp(token, "preprocessor") == 0 ||
+			pg_strcasecmp(token, "postprocessor") == 0)
+		{
+			if ( prev_token[0] != ',' && prev_token[0] != '(')
+				goto error;
+
+			if (result->proc != NULL)
+				goto error;
+			if (result->file == NULL)
+				goto error;
+			if (!result->from && pg_strcasecmp(token, "preprocessor") == 0)
+				goto error;
+			if (result->from && pg_strcasecmp(token, "postprocessor") == 0)
+				goto error;
+
+			token = strtokx(NULL, whitespace, NULL, "'",
+							0, false, true, pset.encoding);
+			if (!token)
+				goto error;
+			result->proc = pg_strdup(token);
+
+			token = strtokx(NULL, whitespace, ",()", "\"'",
+							0, false, false, pset.encoding);
+			if (!token)
+				goto error;
+			if (token[0] != ',' && token[0] != ')')
+				goto error;
+			if (token[0] == ')')
+			{
+				int			len;
+
+				token = strtokx(NULL, whitespace, ",()", "\"'",
+								0, false, false, pset.encoding);
+				if (token)
+					goto error;
+				if (prev_token[0] == ',')
+				{
+					len = strlen(result->after_tofrom);
+					result->after_tofrom[len - 1] = ')';
+				}
+				if (prev_token[0] == '(')
+					result->after_tofrom = pg_strdup("");
+				break;
+			}
+
+			token = strtokx(NULL, whitespace, NULL, "\"'",
+							0, false, false, pset.encoding);
+			if (!token)
+				goto error;
+		}
+		xstrcat(&result->after_tofrom, " ");
+		xstrcat(&result->after_tofrom, token);
+		prev_token = token;
+		token = strtokx(NULL, whitespace, ".,()", "\"'",
+						0, false, false, pset.encoding);
+	}
 
 	return result;
 
@@ -260,12 +333,48 @@ do_copy(const char *args)
 	if (options->file)
 		canonicalize_path(options->file);
 
+	if (options->proc)
+	{
+		PQExpBufferData cmdbuf;
+		char		full_path[MAXPGPATH];
+		int			ret;
+
+		ret = find_my_exec(options->proc, full_path);
+		if (ret)
+		{
+			if (options->from)
+				psql_error("could not locate preprocessor: %s\n",
+						   options->proc);
+			else
+				psql_error("could not locate postprocessor: %s\n",
+						   options->proc);
+			free_copy_options(options);
+			return false;
+		}
+		initPQExpBuffer(&cmdbuf);
+		appendPQExpBufferStr(&cmdbuf, full_path);
+		appendPQExpBuffer(&cmdbuf, " ");
+		appendPQExpBufferStr(&cmdbuf, options->file);
+		options->command = pg_strdup(cmdbuf.data);
+		termPQExpBuffer(&cmdbuf);
+	}
+
 	if (options->from)
 	{
 		override_file = &pset.cur_cmd_source;
 
 		if (options->file)
-			copystream = fopen(options->file, PG_BINARY_R);
+		{
+			if (!options->proc)
+				copystream = fopen(options->file, PG_BINARY_R);
+			else
+			{
+				fflush(stdout);
+				fflush(stderr);
+				errno = 0;
+				copystream = popen(options->command, PG_BINARY_R);
+			}
+		}
 		else if (!options->psql_inout)
 			copystream = pset.cur_cmd_source;
 		else
@@ -276,7 +385,17 @@ do_copy(const char *args)
 		override_file = &pset.queryFout;
 
 		if (options->file)
-			copystream = fopen(options->file, PG_BINARY_W);
+		{
+			if (!options->proc)
+				copystream = fopen(options->file, PG_BINARY_W);
+			else
+			{
+				fflush(stdout);
+				fflush(stderr);
+				errno = 0;
+				copystream = popen(options->command, PG_BINARY_W);
+			}
+		}
 		else if (!options->psql_inout)
 			copystream = pset.queryFout;
 		else
@@ -285,8 +404,12 @@ do_copy(const char *args)
 
 	if (!copystream)
 	{
-		psql_error("%s: %s\n",
-				   options->file, strerror(errno));
+		if (!options->proc)
+			psql_error("%s: %s\n",
+					   options->file, strerror(errno));
+		else
+			psql_error("could not execute command \"%s\": %s\n",
+					   options->command, strerror(errno));
 		free_copy_options(options);
 		return false;
 	}
@@ -322,10 +445,23 @@ do_copy(const char *args)
 
 	if (options->file != NULL)
 	{
-		if (fclose(copystream) != 0)
+		if (!options->proc)
 		{
-			psql_error("%s: %s\n", options->file, strerror(errno));
-			success = false;
+			if (fclose(copystream) != 0)
+			{
+				psql_error("%s: %s\n",
+						   options->file, strerror(errno));
+				success = false;
+			}
+		}
+		else
+		{
+			if (pclose_check(copystream) == -1)
+			{
+				psql_error("could not execute command \"%s\"\n",
+						   options->command);
+				success = false;
+			}
 		}
 	}
 	free_copy_options(options);
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 8680ac3..b6ab6a1 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -23,7 +23,8 @@ typedef struct CopyStateData *CopyState;
 
 extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
 
-extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options);
+extern void ProcessCopyOptions(CopyState cstate, bool is_from,
+							   bool is_pipe, List *options);
 extern CopyState BeginCopyFrom(Relation rel, const char *filename,
 			  List *attnamelist, List *options);
 extern void EndCopyFrom(CopyState cstate);
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index af60dac..f0cf0a4 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -282,10 +282,12 @@ PG_KEYWORD("password", PASSWORD, UNRESERVED_KEYWORD)
 PG_KEYWORD("placing", PLACING, RESERVED_KEYWORD)
 PG_KEYWORD("plans", PLANS, UNRESERVED_KEYWORD)
 PG_KEYWORD("position", POSITION, COL_NAME_KEYWORD)
+PG_KEYWORD("postprocessor", POSTPROCESSOR, UNRESERVED_KEYWORD)
 PG_KEYWORD("preceding", PRECEDING, UNRESERVED_KEYWORD)
 PG_KEYWORD("precision", PRECISION, COL_NAME_KEYWORD)
 PG_KEYWORD("prepare", PREPARE, UNRESERVED_KEYWORD)
 PG_KEYWORD("prepared", PREPARED, UNRESERVED_KEYWORD)
+PG_KEYWORD("preprocessor", PREPROCESSOR, UNRESERVED_KEYWORD)
 PG_KEYWORD("preserve", PRESERVE, UNRESERVED_KEYWORD)
 PG_KEYWORD("primary", PRIMARY, RESERVED_KEYWORD)
 PG_KEYWORD("prior", PRIOR, UNRESERVED_KEYWORD)
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index bad9f10..69d5388 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -80,6 +80,10 @@ extern char *FilePathName(File file);
 extern FILE *AllocateFile(const char *name, const char *mode);
 extern int	FreeFile(FILE *file);
 
+/* Operations that allow use of pipe stream */
+extern FILE *OpenPipeStream(const char *command, const char *mode);
+extern int	ClosePipeStream(FILE *file);
+
 /* Operations to allow use of the <dirent.h> library routines */
 extern DIR *AllocateDir(const char *dirname);
 extern struct dirent *ReadDir(DIR *dir, const char *dirname);
#2Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Etsuro Fujita (#1)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Excerpts from Etsuro Fujita's message of jue sep 13 06:13:26 -0300 2012:

I'd like to add the following options to the SQL COPY command and the psql \copy
instruction:

* PREPROCESSOR: Specifies the user-supplied program for COPY IN. The data
from an input file is preprocessed by the program before the data is loaded into
a postgres table.
* POSTPROCESSOR: Specifies the user-supplied program for COPY OUT. The data
from a postgres table is postprocessed by the program before the data is stored
in an output file.

External programs? I don't like the sound of that; there all kinds of
open questions, security concerns, and process management problems.
What if the pre- and postprocessors were functions instead?

--
Álvaro Herrera http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#3Pavel Stehule
pavel.stehule@gmail.com
In reply to: Alvaro Herrera (#2)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

2012/9/13 Alvaro Herrera <alvherre@2ndquadrant.com>:

Excerpts from Etsuro Fujita's message of jue sep 13 06:13:26 -0300 2012:

I'd like to add the following options to the SQL COPY command and the psql \copy
instruction:

* PREPROCESSOR: Specifies the user-supplied program for COPY IN. The data
from an input file is preprocessed by the program before the data is loaded into
a postgres table.
* POSTPROCESSOR: Specifies the user-supplied program for COPY OUT. The data
from a postgres table is postprocessed by the program before the data is stored
in an output file.

External programs? I don't like the sound of that; there all kinds of
open questions, security concerns, and process management problems.
What if the pre- and postprocessors were functions instead?

+1

this can be solved via pipe and outer processes

Pavel

Show quoted text

--
Álvaro Herrera http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Tom Lane
tgl@sss.pgh.pa.us
In reply to: Etsuro Fujita (#1)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

I'd like to add the following options to the SQL COPY command and the psql \copy
instruction:

* PREPROCESSOR: Specifies the user-supplied program for COPY IN. The data
from an input file is preprocessed by the program before the data is loaded into
a postgres table.
* POSTPROCESSOR: Specifies the user-supplied program for COPY OUT. The data
from a postgres table is postprocessed by the program before the data is stored
in an output file.

The proposed patch causes the external processor programs to execute
with the privileges of the database server, which seems like a pretty
horrid idea. At the very least this would imply limiting use of the
feature to superusers, which greatly restricts its use-case.

I think it would be a lot better if this were designed so that the
processor programs executed on client side. Which would probably make
it not a COPY patch at all, but something in psql.

regards, tom lane

#5Dimitri Fontaine
dimitri@2ndQuadrant.fr
In reply to: Tom Lane (#4)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Tom Lane <tgl@sss.pgh.pa.us> writes:

I think it would be a lot better if this were designed so that the
processor programs executed on client side. Which would probably make
it not a COPY patch at all, but something in psql.

And pgloader, which already has a part of that feature with the per
column reformating facility.

Regards,
--
Dimitri Fontaine
http://2ndQuadrant.fr PostgreSQL : Expertise, Formation et Support

#6Andrew Dunstan
andrew@dunslane.net
In reply to: Dimitri Fontaine (#5)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 09/13/2012 01:20 PM, Dimitri Fontaine wrote:

Tom Lane <tgl@sss.pgh.pa.us> writes:

I think it would be a lot better if this were designed so that the
processor programs executed on client side. Which would probably make
it not a COPY patch at all, but something in psql.

And pgloader, which already has a part of that feature with the per
column reformating facility.

Yeah, I'd be inclined to say that pre/post processing of this kind is
really a job for specialized clients.

cheers

andrew

#7Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Andrew Dunstan (#6)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

OK I will redesign the function.

Thanks everyone for the advice!

Best regards,
Etsuro Fujita

Show quoted text

-----Original Message-----
From: Andrew Dunstan [mailto:andrew@dunslane.net]
Sent: Friday, September 14, 2012 2:27 AM
To: Dimitri Fontaine
Cc: Tom Lane; Etsuro Fujita; pgsql-hackers@postgresql.org
Subject: Re: [HACKERS] WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 09/13/2012 01:20 PM, Dimitri Fontaine wrote:

Tom Lane <tgl@sss.pgh.pa.us> writes:

I think it would be a lot better if this were designed so that the
processor programs executed on client side. Which would probably make
it not a COPY patch at all, but something in psql.

And pgloader, which already has a part of that feature with the per
column reformating facility.

Yeah, I'd be inclined to say that pre/post processing of this kind is
really a job for specialized clients.

cheers

andrew

#8Craig Ringer
ringerc@ringerc.id.au
In reply to: Tom Lane (#4)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 09/13/2012 10:25 PM, Tom Lane wrote:

I think it would be a lot better if this were designed so that the
processor programs executed on client side. Which would probably make
it not a COPY patch at all, but something in psql.

Either that, or allow the pre- and post- processors to be programs
written in a (possibly trusted) PL.

I can't say I really see the point though, when it's easy to just filter
the csv through a pipeline.

--
Craig Ringer

#9Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Craig Ringer (#8)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

From: Craig Ringer [mailto:ringerc@ringerc.id.au]

On 09/13/2012 10:25 PM, Tom Lane wrote:

I think it would be a lot better if this were designed so that the
processor programs executed on client side. Which would probably make
it not a COPY patch at all, but something in psql.

Maybe my explanation was insufficient. Let me add one thing to my earlier
explanation. The submitted patch allows the psql \copy instruction to be
executed like:

$ echo '/bin/gunzip -c $1' > decompress.sh
$ chmod +x decompress.sh

postgres=# \copy foo FROM '/home/pgsql/foo.csv.gz' WITH (format 'csv',
preprocessor '/home/pgsql/decompress.sh')

In this example, command "/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz" is
executed on client side, by using popen(), and command "COPY foo FROM STDIN WITH
(format 'csv')" is sent to backend. I apologize for not providing you with
enough explanation.

Either that, or allow the pre- and post- processors to be programs
written in a (possibly trusted) PL.

I would like to add the hooks not only for the psql \copy instrucntion, but also
for the SQL COPY command, because I think the hooks for the SQL COPY command
would allow to realize variants of contrib/file_fdw such as compressed file FDW
and Hadoop HDFS FDW, etc., which I think would be useful especially for DWH
environments.

Thanks,

Best regards,
Etsuro Fujita

#10Tom Lane
tgl@sss.pgh.pa.us
In reply to: Etsuro Fujita (#9)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

Maybe my explanation was insufficient. Let me add one thing to my earlier
explanation. The submitted patch allows the psql \copy instruction to be
executed like:

$ echo '/bin/gunzip -c $1' > decompress.sh
$ chmod +x decompress.sh

postgres=# \copy foo FROM '/home/pgsql/foo.csv.gz' WITH (format 'csv',
preprocessor '/home/pgsql/decompress.sh')

In this example, command "/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz" is
executed on client side, by using popen(), and command "COPY foo FROM STDIN WITH
(format 'csv')" is sent to backend. I apologize for not providing you with
enough explanation.

Well, in that case, you've got not only an explanation problem but a
syntax problem, because that syntax is utterly misleading. Anybody
looking at it would think that the "format" option is one of the options
being sent to the backend. The code required to pull it out of there
has got to be grossly overcomplicated (and likely bugprone), too.

I think it would be better to present this as something like

\copy foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with format 'csv'

which would cue any reasonably Unix-savvy person that what's happening
is a popen on the client side. It'd probably be a whole lot less
complicated to implement, too.

regards, tom lane

#11Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Tom Lane (#10)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

From: Tom Lane [mailto:tgl@sss.pgh.pa.us]

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

Maybe my explanation was insufficient. Let me add one thing to my earlier
explanation. The submitted patch allows the psql \copy instruction to be
executed like:

$ echo '/bin/gunzip -c $1' > decompress.sh
$ chmod +x decompress.sh

postgres=# \copy foo FROM '/home/pgsql/foo.csv.gz' WITH (format 'csv',
preprocessor '/home/pgsql/decompress.sh')

Well, in that case, you've got not only an explanation problem but a
syntax problem, because that syntax is utterly misleading. Anybody
looking at it would think that the "format" option is one of the options
being sent to the backend. The code required to pull it out of there
has got to be grossly overcomplicated (and likely bugprone), too.

I think it would be better to present this as something like

\copy foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with
format 'csv'

which would cue any reasonably Unix-savvy person that what's happening
is a popen on the client side. It'd probably be a whole lot less
complicated to implement, too.

Great!

I have a question. I think it would be also better to extend the syntax for the
SQL COPY command in the same way, ie,

COPY foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with format
'csv'

Is this okay?

Thanks,

Best regards,
Etsuro Fujita

#12Tom Lane
tgl@sss.pgh.pa.us
In reply to: Etsuro Fujita (#11)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

From: Tom Lane [mailto:tgl@sss.pgh.pa.us]
I think it would be better to present this as something like
\copy foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with
format 'csv'

I have a question. I think it would be also better to extend the syntax for the
SQL COPY command in the same way, ie,
COPY foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with format
'csv'

Yeah, sure --- that case is already superuser-only, so why not give it
the option of being a popen instead of just fopen. My objection was
only that it sounded like you were providing *only* the ability to run
the external processors on the server side. Providing popen on both
sides seems completely sensible.

regards, tom lane

#13Tom Lane
tgl@sss.pgh.pa.us
In reply to: Tom Lane (#12)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

I wrote:

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

I have a question. I think it would be also better to extend the syntax for the
SQL COPY command in the same way, ie,
COPY foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with format
'csv'

Yeah, sure --- that case is already superuser-only, so why not give it
the option of being a popen instead of just fopen.

BTW, one thought that comes to mind is that such an operation is
extremely likely to fail under environments such as SELinux. That's
not necessarily a reason not to do it, but we should be wary of
promising that it will work everywhere. Probably a documentation note
about this would be enough.

regards, tom lane

#14Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Tom Lane (#13)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

From: Tom Lane [mailto:tgl@sss.pgh.pa.us]

I wrote:

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

I have a question. I think it would be also better to extend the syntax
for the SQL COPY command in the same way, ie,
COPY foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with
format 'csv'

Yeah, sure --- that case is already superuser-only, so why not give it
the option of being a popen instead of just fopen.

BTW, one thought that comes to mind is that such an operation is
extremely likely to fail under environments such as SELinux. That's
not necessarily a reason not to do it, but we should be wary of
promising that it will work everywhere. Probably a documentation note
about this would be enough.

OK I'll revise the patch.

Thank you for your advice!

Best regards,
Etsuro Fujita

#15Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Etsuro Fujita (#14)
1 attachment(s)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

I wrote:

From: Tom Lane [mailto:tgl@sss.pgh.pa.us]

I wrote:

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

I have a question. I think it would be also better to extend the syntax
for the SQL COPY command in the same way, ie,
COPY foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with
format 'csv'

Yeah, sure --- that case is already superuser-only, so why not give it
the option of being a popen instead of just fopen.

BTW, one thought that comes to mind is that such an operation is
extremely likely to fail under environments such as SELinux. That's
not necessarily a reason not to do it, but we should be wary of
promising that it will work everywhere. Probably a documentation note
about this would be enough.

OK I'll revise the patch.

I've revised the patch. In this version a user can specify hooks for pre- and
post-processor executables for COPY and \copy in the follwoing way:

$ echo '/bin/gunzip -c $1' > decompress.sh
$ chmod +x decompress.sh

In the case of the COPY command,

postgres=# COPY foo FROM '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz'
WITH (format 'csv');

Also, in the case of the \copy instruction,

postgres=# \copy foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz'
with (format 'csv')

As shown in the example above, I've assumed that the syntax for this option for
e.g., the COPY command is:

COPY table_name FROM 'progname filename' WITH ...
COPY table_name TO 'progname filename' WITH ...

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output. Also, prgoname for COPY OUT is
the user-supplied program that reads standard input and writes to filename taken
as its argument. This makes simple the identification and verification of
progname and filename.

Todo:
* Documentation including documentation note about the limitation for
environments such as SELinux mentioned by Tom.
* More test

Any comments and suggestions are welcomed.

Thanks,

Best regards,
Etsuro Fujita

Attachments:

copy-popen-20121114.patchapplication/octet-stream; name=copy-popen-20121114.patchDownload
diff --git a/postgresql-ef28e05/src/backend/commands/copy.c b/postgresql-ef28e05/src/backend/commands/copy.c
index 0567ab0..30a7f19 100644
--- a/postgresql-ef28e05/src/backend/commands/copy.c
+++ b/postgresql-ef28e05/src/backend/commands/copy.c
@@ -51,6 +51,8 @@
 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
 #define OCTVALUE(c) ((c) - '0')
 
+#define WHITESPACE " \t"
+
 /*
  * Represents the different source/dest cases we need to worry about at
  * the bottom level
@@ -107,6 +109,8 @@ typedef struct CopyStateData
 	QueryDesc  *queryDesc;		/* executable query to copy from */
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
+	char	   *progname;		/* used for popen */
+	StringInfo	cmdbuf;			/* used for popen */
 	bool		binary;			/* binary format? */
 	bool		oids;			/* include OIDs? */
 	bool		csv_mode;		/* Comma Separated Value format? */
@@ -1382,11 +1386,21 @@ BeginCopy(bool is_from,
 static void
 EndCopy(CopyState cstate)
 {
-	if (cstate->filename != NULL && FreeFile(cstate->copy_file))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not close file \"%s\": %m",
-						cstate->filename)));
+	if (cstate->progname != NULL)
+	{
+		if (ClosePipeStream(cstate->copy_file) == -1)
+			ereport(ERROR,
+					(errmsg("could not execute command \"%s\"",
+							cstate->cmdbuf->data)));
+	}
+	else
+	{
+		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close file \"%s\": %m",
+							cstate->filename)));
+	}
 
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
@@ -1443,28 +1457,81 @@ BeginCopyTo(Relation rel,
 	}
 	else
 	{
-		mode_t		oumask;		/* Pre-existing umask value */
 		struct stat st;
+		char*		buffer = NULL;
+		char*		token;
+		int			count = 0;
+
+		buffer = pstrdup(filename);
+		token = strtok(buffer, WHITESPACE);
+		Assert(token != NULL);
+		while (token != NULL) {
+			count++;
+			token = strtok(NULL, WHITESPACE);
+		}
+
+		if (count != 1 && count !=2)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("argument \"%s\" not recognized", filename)));
+		if (count == 1)
+		{
+			cstate->progname = NULL;
+			cstate->filename = pstrdup(filename);
+		}
+		if (count == 2)
+		{
+			buffer = pstrdup(filename);
+			token = strtok(buffer, WHITESPACE);
+			cstate->progname = pstrdup(token);
+			token = strtok(NULL, WHITESPACE);
+			cstate->filename = pstrdup(token);
+		}
 
 		/*
 		 * Prevent write to relative path ... too easy to shoot oneself in the
 		 * foot by overwriting a database file ...
 		 */
-		if (!is_absolute_path(filename))
+		if (!is_absolute_path(cstate->filename))
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_NAME),
 					 errmsg("relative path not allowed for COPY to file")));
 
-		cstate->filename = pstrdup(filename);
-		oumask = umask(S_IWGRP | S_IWOTH);
-		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
-		umask(oumask);
+		if (cstate->progname != NULL)
+		{
+			char		my_exec_path[MAXPGPATH];
 
-		if (cstate->copy_file == NULL)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not open file \"%s\" for writing: %m",
-							cstate->filename)));
+			if (find_my_exec(cstate->progname, my_exec_path) < 0)
+				elog(ERROR, "could not locate command \"%s\"",
+					 cstate->progname);
+
+			cstate->cmdbuf = makeStringInfo();
+			appendStringInfoString(cstate->cmdbuf, my_exec_path);
+			appendStringInfoChar(cstate->cmdbuf, ' ');
+			appendStringInfoString(cstate->cmdbuf, cstate->filename);
+
+			cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data,
+											   PG_BINARY_W);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errmsg("could not execute command \"%s\": %m",
+								cstate->cmdbuf->data)));
+		}
+		else
+		{
+			mode_t		oumask;		/* Pre-existing umask value */
+
+			oumask = umask(S_IWGRP | S_IWOTH);
+			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+			umask(oumask);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for writing: %m",
+								cstate->filename)));
+		}
 
 		fstat(fileno(cstate->copy_file), &st);
 		if (S_ISDIR(st.st_mode))
@@ -2377,15 +2444,67 @@ BeginCopyFrom(Relation rel,
 	else
 	{
 		struct stat st;
+		char*		buffer = NULL;
+		char*		token;
+		int			count = 0;
 
-		cstate->filename = pstrdup(filename);
-		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+		buffer = pstrdup(filename);
+		token = strtok(buffer, WHITESPACE);
+		Assert(token != NULL);
+		while (token != NULL) {
+			count++;
+			token = strtok(NULL, WHITESPACE);
+		}
 
-		if (cstate->copy_file == NULL)
+		if (count != 1 && count !=2)
 			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not open file \"%s\" for reading: %m",
-							cstate->filename)));
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("argument \"%s\" not recognized", filename)));
+		if (count == 1)
+		{
+			cstate->progname = NULL;
+			cstate->filename = pstrdup(filename);
+		}
+		if (count == 2)
+		{
+			buffer = pstrdup(filename);
+			token = strtok(buffer, WHITESPACE);
+			cstate->progname = pstrdup(token);
+			token = strtok(NULL, WHITESPACE);
+			cstate->filename = pstrdup(token);
+		}
+
+		if (cstate->progname != NULL)
+		{
+			char		my_exec_path[MAXPGPATH];
+
+			if (find_my_exec(cstate->progname, my_exec_path) < 0)
+				elog(ERROR, "could not locate command \"%s\"",
+					 cstate->progname);
+
+			cstate->cmdbuf = makeStringInfo();
+			appendStringInfoString(cstate->cmdbuf, my_exec_path);
+			appendStringInfoChar(cstate->cmdbuf, ' ');
+			appendStringInfoString(cstate->cmdbuf, cstate->filename);
+
+			cstate->copy_file = OpenPipeStream(cstate->cmdbuf->data,
+											   PG_BINARY_R);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errmsg("could not execute command \"%s\": %m",
+								cstate->cmdbuf->data)));
+		}
+		else
+		{
+			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+
+			if (cstate->copy_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for reading: %m",
+								cstate->filename)));
+		}
 
 		fstat(fileno(cstate->copy_file), &st);
 		if (S_ISDIR(st.st_mode))
diff --git a/postgresql-ef28e05/src/backend/storage/file/fd.c b/postgresql-ef28e05/src/backend/storage/file/fd.c
index ecb62ba..c679657 100644
--- a/postgresql-ef28e05/src/backend/storage/file/fd.c
+++ b/postgresql-ef28e05/src/backend/storage/file/fd.c
@@ -183,6 +183,7 @@ static uint64 temporary_files_size = 0;
 typedef enum
 {
 	AllocateDescFile,
+	AllocateDescPipe,
 	AllocateDescDir
 } AllocateDescKind;
 
@@ -1539,6 +1540,9 @@ FreeDesc(AllocateDesc *desc)
 		case AllocateDescFile:
 			result = fclose(desc->desc.file);
 			break;
+		case AllocateDescPipe:
+			result = pclose_check(desc->desc.file);
+			break;
 		case AllocateDescDir:
 			result = closedir(desc->desc.dir);
 			break;
@@ -1583,6 +1587,77 @@ FreeFile(FILE *file)
 	return fclose(file);
 }
 
+FILE *
+OpenPipeStream(const char *command, const char *mode)
+{
+	FILE	   *file;
+
+	DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
+			   numAllocatedDescs, command));
+
+	/*
+	 * The test against MAX_ALLOCATED_DESCS prevents us from overflowing
+	 * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile
+	 * from hogging every one of the available FDs, which'd lead to infinite
+	 * looping.
+	 */
+	if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
+		numAllocatedDescs >= max_safe_fds - 1)
+		elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"",
+			 command);
+
+TryAgain:
+	fflush(stdout);
+	fflush(stderr);
+	errno = 0;
+	if ((file = popen(command, mode)) != NULL)
+	{
+		AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
+
+		desc->kind = AllocateDescPipe;
+		desc->desc.file = file;
+		desc->create_subid = GetCurrentSubTransactionId();
+		numAllocatedDescs++;
+		return desc->desc.file;
+	}
+
+	if (errno == EMFILE || errno == ENFILE)
+	{
+		int			save_errno = errno;
+
+		ereport(LOG,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("out of file descriptors: %m; release and retry")));
+		errno = 0;
+		if (ReleaseLruFile())
+			goto TryAgain;
+		errno = save_errno;
+	}
+
+	return NULL;
+}
+
+int
+ClosePipeStream(FILE *file)
+{
+	int			i;
+
+	DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs));
+
+	/* Remove file from list of allocated files, if it's present */
+	for (i = numAllocatedDescs; --i >= 0;)
+	{
+		AllocateDesc *desc = &allocatedDescs[i];
+
+		if (desc->kind == AllocateDescPipe && desc->desc.file == file)
+			return FreeDesc(desc);
+	}
+
+	/* Only get here if someone passes us a file not in allocatedDescs */
+	elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream");
+
+	return pclose_check(file);
+}
 
 /*
  * Routines that want to use <dirent.h> (ie, DIR*) should use AllocateDir
diff --git a/postgresql-ef28e05/src/bin/psql/copy.c b/postgresql-ef28e05/src/bin/psql/copy.c
index 6cc1f46..5ecc30e 100644
--- a/postgresql-ef28e05/src/bin/psql/copy.c
+++ b/postgresql-ef28e05/src/bin/psql/copy.c
@@ -52,6 +52,7 @@ struct copy_options
 	char	   *before_tofrom;	/* COPY string before TO/FROM */
 	char	   *after_tofrom;	/* COPY string after TO/FROM filename */
 	char	   *file;			/* NULL = stdin/stdout */
+	char	   *prog;			/* NULL = fopen, otherwise = popen */
 	bool		psql_inout;		/* true = use psql stdin/stdout */
 	bool		from;			/* true = FROM, false = TO */
 };
@@ -65,6 +66,7 @@ free_copy_options(struct copy_options * ptr)
 	free(ptr->before_tofrom);
 	free(ptr->after_tofrom);
 	free(ptr->file);
+	free(ptr->prog);
 	free(ptr);
 }
 
@@ -89,6 +91,7 @@ parse_slash_copy(const char *args)
 	struct copy_options *result;
 	char	   *token;
 	const char *whitespace = " \t\n\r";
+	char	   *buffer = NULL;
 	char		nonstd_backslash = standard_strings() ? 0 : '\\';
 
 	if (!args)
@@ -211,8 +214,7 @@ parse_slash_copy(const char *args)
 	else
 	{
 		result->psql_inout = false;
-		result->file = pg_strdup(token);
-		expand_tilde(&result->file);
+		buffer = pg_strdup(token);
 	}
 
 	/* Collect the rest of the line (COPY options) */
@@ -221,6 +223,44 @@ parse_slash_copy(const char *args)
 	if (token)
 		result->after_tofrom = pg_strdup(token);
 
+	if (buffer)
+	{
+		int			count = 0;
+
+		token = strtokx(buffer, " \t", NULL, NULL,
+						0, false, false, pset.encoding);
+		if (!token)
+			psql_error("\\copy: parse error at \"%s\"\n", buffer);
+		while (token != NULL) {
+			count++;
+			token = strtokx(NULL, " \t", NULL, NULL,
+							0, false, false, pset.encoding);
+		}
+		if (count != 1 && count != 2)
+			psql_error("\\copy: parse error at \"%s\"\n", buffer);
+
+		if (count == 1)
+		{
+			result->prog = NULL;
+			result->file = pg_strdup(buffer);
+		}
+		if (count == 2)
+		{
+			token = strtokx(buffer, " \t", NULL, NULL,
+							0, false, false, pset.encoding);
+			if (!token)
+				psql_error("\\copy: parse error at \"%s\"\n", buffer);
+			result->prog = pg_strdup(token);
+			token = strtokx(NULL, " \t", NULL, NULL,
+							0, false, false, pset.encoding);
+			if (!token)
+				psql_error("\\copy: parse error at \"%s\"\n", buffer);
+			result->file = pg_strdup(token);
+		}
+		expand_tilde(&result->file);
+		free(buffer);
+	}
+
 	return result;
 
 error:
@@ -247,6 +287,7 @@ do_copy(const char *args)
 	FILE	   *save_file;
 	FILE	  **override_file;
 	struct copy_options *options;
+	char	   *command = NULL;
 	bool		success;
 	struct stat st;
 
@@ -260,12 +301,43 @@ do_copy(const char *args)
 	if (options->file)
 		canonicalize_path(options->file);
 
+	if (options->prog)
+	{
+		PQExpBufferData cmdbuf;
+		char		full_path[MAXPGPATH];
+		int			ret;
+
+		ret = find_my_exec(options->prog, full_path);
+		if (ret)
+		{
+			psql_error("could not locate program: %s\n", options->prog);
+			free_copy_options(options);
+			return false;
+		}
+		initPQExpBuffer(&cmdbuf);
+		appendPQExpBufferStr(&cmdbuf, full_path);
+		appendPQExpBuffer(&cmdbuf, " ");
+		appendPQExpBufferStr(&cmdbuf, options->file);
+		command = pg_strdup(cmdbuf.data);
+		termPQExpBuffer(&cmdbuf);
+	}
+
 	if (options->from)
 	{
 		override_file = &pset.cur_cmd_source;
 
 		if (options->file)
-			copystream = fopen(options->file, PG_BINARY_R);
+		{
+			if (options->prog)
+			{
+				fflush(stdout);
+				fflush(stderr);
+				errno = 0;
+				copystream = popen(command, PG_BINARY_R);
+			}
+			else
+				copystream = fopen(options->file, PG_BINARY_R);
+		}
 		else if (!options->psql_inout)
 			copystream = pset.cur_cmd_source;
 		else
@@ -276,7 +348,17 @@ do_copy(const char *args)
 		override_file = &pset.queryFout;
 
 		if (options->file)
-			copystream = fopen(options->file, PG_BINARY_W);
+		{
+			if (options->prog)
+			{
+				fflush(stdout);
+				fflush(stderr);
+				errno = 0;
+				copystream = popen(command, PG_BINARY_W);
+			}
+			else
+				copystream = fopen(options->file, PG_BINARY_W);
+		}
 		else if (!options->psql_inout)
 			copystream = pset.queryFout;
 		else
@@ -285,8 +367,12 @@ do_copy(const char *args)
 
 	if (!copystream)
 	{
-		psql_error("%s: %s\n",
-				   options->file, strerror(errno));
+		if (options->prog)
+			psql_error("could not execute command \"%s\": %s\n",
+					   command, strerror(errno));
+		else
+			psql_error("%s: %s\n",
+					   options->file, strerror(errno));
 		free_copy_options(options);
 		return false;
 	}
@@ -322,13 +408,25 @@ do_copy(const char *args)
 
 	if (options->file != NULL)
 	{
-		if (fclose(copystream) != 0)
+		if (options->prog)
+		{
+			if (pclose_check(copystream) == -1)
+			{
+				psql_error("could not execute command \"%s\"\n", command);
+				success = false;
+			}
+		}
+		else
 		{
-			psql_error("%s: %s\n", options->file, strerror(errno));
-			success = false;
+			if (fclose(copystream) != 0)
+			{
+				psql_error("%s: %s\n", options->file, strerror(errno));
+				success = false;
+			}
 		}
 	}
 	free_copy_options(options);
+	free(command);
 	return success;
 }
 
diff --git a/postgresql-ef28e05/src/include/storage/fd.h b/postgresql-ef28e05/src/include/storage/fd.h
index 849bb10..b90bab9 100644
--- a/postgresql-ef28e05/src/include/storage/fd.h
+++ b/postgresql-ef28e05/src/include/storage/fd.h
@@ -79,6 +79,10 @@ extern char *FilePathName(File file);
 extern FILE *AllocateFile(const char *name, const char *mode);
 extern int	FreeFile(FILE *file);
 
+/* Operations that allow use of pipe stream */
+extern FILE *OpenPipeStream(const char *command, const char *mode);
+extern int	ClosePipeStream(FILE *file);
+
 /* Operations to allow use of the <dirent.h> library routines */
 extern DIR *AllocateDir(const char *dirname);
 extern struct dirent *ReadDir(DIR *dir, const char *dirname);
#16Simon Riggs
simon@2ndQuadrant.com
In reply to: Etsuro Fujita (#1)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 13 September 2012 10:13, Etsuro Fujita <fujita.etsuro@lab.ntt.co.jp> wrote:

I'd like to add the following options to the SQL COPY command and the psql \copy
instruction:

* PREPROCESSOR: Specifies the user-supplied program for COPY IN. The data
from an input file is preprocessed by the program before the data is loaded into
a postgres table.
* POSTPROCESSOR: Specifies the user-supplied program for COPY OUT. The data
from a postgres table is postprocessed by the program before the data is stored
in an output file.

These options can be specified only when an input or output file is specified.

These options allow to move data between postgres tables and e.g., compressed
files or files on a distributed file system such as Hadoop HDFS.

These options look pretty strange to me and I'm not sure they are a good idea.

If we want to read other/complex data, we have Foreign Data Wrappers.

What I think we need is COPY FROM (SELECT....). COPY (query) TO
already exists, so this is just the same thing in the other direction.
Once we have a SELECT statement in both directions we can add any user
defined transforms we wish implemented as database functions.

At present we only support INSERT SELECT ... FROM FDW
which means all the optimisations we've put into COPY are useless with
FDWs. So we need a way to speed up loads from other data sources.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#17Fujii Masao
masao.fujii@gmail.com
In reply to: Etsuro Fujita (#15)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On Wed, Nov 14, 2012 at 8:30 PM, Etsuro Fujita
<fujita.etsuro@lab.ntt.co.jp> wrote:

I wrote:

From: Tom Lane [mailto:tgl@sss.pgh.pa.us]

I wrote:

"Etsuro Fujita" <fujita.etsuro@lab.ntt.co.jp> writes:

I have a question. I think it would be also better to extend the syntax
for the SQL COPY command in the same way, ie,
COPY foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz |' with
format 'csv'

Yeah, sure --- that case is already superuser-only, so why not give it
the option of being a popen instead of just fopen.

BTW, one thought that comes to mind is that such an operation is
extremely likely to fail under environments such as SELinux. That's
not necessarily a reason not to do it, but we should be wary of
promising that it will work everywhere. Probably a documentation note
about this would be enough.

OK I'll revise the patch.

I've revised the patch. In this version a user can specify hooks for pre- and
post-processor executables for COPY and \copy in the follwoing way:

$ echo '/bin/gunzip -c $1' > decompress.sh
$ chmod +x decompress.sh

In the case of the COPY command,

postgres=# COPY foo FROM '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz'
WITH (format 'csv');

Also, in the case of the \copy instruction,

postgres=# \copy foo from '/home/pgsql/decompress.sh /home/pgsql/foo.csv.gz'
with (format 'csv')

As shown in the example above, I've assumed that the syntax for this option for
e.g., the COPY command is:

COPY table_name FROM 'progname filename' WITH ...
COPY table_name TO 'progname filename' WITH ...

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output.

What about further extending the COPY IN syntax to the following?

COPY table_name FROM 'progname [ option, ... ]' WITH ...

I'd just like to execute

COPY vmstat_table FROM 'vmstat' WITH ...

Also, prgoname for COPY OUT is
the user-supplied program that reads standard input and writes to filename taken
as its argument. This makes simple the identification and verification of
progname and filename.

Todo:
* Documentation including documentation note about the limitation for
environments such as SELinux mentioned by Tom.
* More test

Any comments and suggestions are welcomed.

Isn't it dangerous to allow a user to execute external program in
server side via SQL?

Regards,

--
Fujii Masao

#18Simon Riggs
simon@2ndQuadrant.com
In reply to: Fujii Masao (#17)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output.

What about further extending the COPY IN syntax to the following?

COPY table_name FROM 'progname [ option, ... ]' WITH ...

I'd just like to execute

COPY vmstat_table FROM 'vmstat' WITH ...

I think we should be using FDWs/SRFs here, not inventing new
syntax/architectures for executing external code, so -1 from me.

We can already do
INSERT table SELECT * FROM fdw;
with any logic for generating data lives inside an FDW or SRF.

If we want it in COPY we can have syntax like this...
COPY table FROM (SELECT * FROM fdw)

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#19Fujii Masao
masao.fujii@gmail.com
In reply to: Simon Riggs (#18)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On Thu, Nov 15, 2012 at 12:31 AM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output.

What about further extending the COPY IN syntax to the following?

COPY table_name FROM 'progname [ option, ... ]' WITH ...

I'd just like to execute

COPY vmstat_table FROM 'vmstat' WITH ...

I think we should be using FDWs/SRFs here, not inventing new
syntax/architectures for executing external code, so -1 from me.

We can already do
INSERT table SELECT * FROM fdw;
with any logic for generating data lives inside an FDW or SRF.

If we want it in COPY we can have syntax like this...
COPY table FROM (SELECT * FROM fdw)

New syntax looks attractive to me because it's easy to use that.
It's not easy to implement the FDW for the external program which
a user wants to execute.

Of course if someone implements something like any_external_program_fdw,
I would change my mind..

Regards,

--
Fujii Masao

#20Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Simon Riggs (#18)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Simon Riggs escribió:

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output.

What about further extending the COPY IN syntax to the following?

COPY table_name FROM 'progname [ option, ... ]' WITH ...

I'd just like to execute

COPY vmstat_table FROM 'vmstat' WITH ...

I think we should be using FDWs/SRFs here, not inventing new
syntax/architectures for executing external code, so -1 from me.

Hmm, but then you are forced to write C code, whereas the "external
program" proposal could have you writing a only shell script instead.
So there is some merit to this idea ... though we could have a
"pipe_fdw" that could let you specify an arbitrary program to run.

--
Álvaro Herrera http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#21Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#20)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Alvaro Herrera <alvherre@2ndquadrant.com> writes:

Simon Riggs escribi�:

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output.

I think we should be using FDWs/SRFs here, not inventing new
syntax/architectures for executing external code, so -1 from me.

Hmm, but then you are forced to write C code, whereas the "external
program" proposal could have you writing a only shell script instead.

I disagree with Simon's objection also, because neither reading from
nor writing to an external program is likely to fit the model of
reading/updating a table very well. For instance, there's no good
reason to suppose that reading twice will give the same results. So
force-fitting this usage into the FDW model is not going to work well.

Nor do I really see the argument why a "pipe_fdw" module is cleaner
than a "COPY TO/FROM pipe" feature.

regards, tom lane

#22Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#21)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/14/2012 11:20 AM, Tom Lane wrote:

I disagree with Simon's objection also, because neither reading from
nor writing to an external program is likely to fit the model of
reading/updating a table very well. For instance, there's no good
reason to suppose that reading twice will give the same results. So
force-fitting this usage into the FDW model is not going to work well.

Nor do I really see the argument why a "pipe_fdw" module is cleaner
than a "COPY TO/FROM pipe" feature.

Yeah, I agree, although the syntax looks a bit unclean.

Maybe something like

COPY foo FROM wherever WITH (FILTER '/path/to/program')

might work better. You'd hook up the source to the filter as its stdin
and read its stdout. Not sure what we'd do for \copy though.

cheers

andrew

#23Simon Riggs
simon@2ndQuadrant.com
In reply to: Tom Lane (#21)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 14 November 2012 16:20, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Alvaro Herrera <alvherre@2ndquadrant.com> writes:

Simon Riggs escribió:

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

Here, progname for COPY IN is the user-supplied program that takes filename as
its argument and that writes on standard output.

I think we should be using FDWs/SRFs here, not inventing new
syntax/architectures for executing external code, so -1 from me.

Hmm, but then you are forced to write C code, whereas the "external
program" proposal could have you writing a only shell script instead.

I disagree with Simon's objection also, because neither reading from
nor writing to an external program is likely to fit the model of
reading/updating a table very well. For instance, there's no good
reason to suppose that reading twice will give the same results. So
force-fitting this usage into the FDW model is not going to work well.

Nor do I really see the argument why a "pipe_fdw" module is cleaner
than a "COPY TO/FROM pipe" feature.

Perhaps not cleaner, but we do need

COPY table FROM (SELECT * FROM foo)

So we will then have both ways.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#24Simon Riggs
simon@2ndQuadrant.com
In reply to: Fujii Masao (#17)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

What about further extending the COPY IN syntax to the following?

COPY table_name FROM 'progname [ option, ... ]' WITH ...

I'd just like to execute

COPY vmstat_table FROM 'vmstat' WITH ...

If we go ahead with this, I think it needs additional keyword to
indicate that we will execute the file rather than read from it. I
don't think we should rely on whether the file is executable or not to
determine how we should treat it.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#25Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andrew Dunstan (#22)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Andrew Dunstan <andrew@dunslane.net> writes:

Yeah, I agree, although the syntax looks a bit unclean.

Oh, I had not looked at the syntax closely. I agree, that basically
sucks: it's overcomplicated and under-featured, because you can't
control the actual program command line very conveniently. Nor do I see
a reason to force this into the model of "program filtering a specific
file". What happened to the previous proposal of treating the COPY
target as a pipe specification, ie

COPY table FROM 'some command line |';
COPY table TO '| some command line';

Not sure what we'd do for \copy though.

Adding a pipe symbol to the target works exactly the same for \copy.

regards, tom lane

#26Tom Lane
tgl@sss.pgh.pa.us
In reply to: Simon Riggs (#24)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Simon Riggs <simon@2ndQuadrant.com> writes:

On 14 November 2012 15:09, Fujii Masao <masao.fujii@gmail.com> wrote:

I'd just like to execute
COPY vmstat_table FROM 'vmstat' WITH ...

If we go ahead with this, I think it needs additional keyword to
indicate that we will execute the file rather than read from it. I
don't think we should rely on whether the file is executable or not to
determine how we should treat it.

Agreed, and there's also the question of passing switches etc to the
program, so the string can't be a bare file name anyway. I proposed
pipe symbols (|) in the string previously, but if you find that too
Unix-centric I suppose we could do

COPY TABLE FROM PROGRAM 'command line';
COPY TABLE TO PROGRAM 'command line';

regards, tom lane

#27Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#25)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/14/2012 11:39 AM, Tom Lane wrote:

Andrew Dunstan <andrew@dunslane.net> writes:

Yeah, I agree, although the syntax looks a bit unclean.

Oh, I had not looked at the syntax closely. I agree, that basically
sucks: it's overcomplicated and under-featured, because you can't
control the actual program command line very conveniently. Nor do I see
a reason to force this into the model of "program filtering a specific
file". What happened to the previous proposal of treating the COPY
target as a pipe specification, ie

COPY table FROM 'some command line |';
COPY table TO '| some command line';

I'd like to be able to filter STDIN if possible - with this syntax how
is COPY going to know to hook up STDIN to the program?

cheers

andrew

#28Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andrew Dunstan (#27)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Andrew Dunstan <andrew@dunslane.net> writes:

On 11/14/2012 11:39 AM, Tom Lane wrote:

What happened to the previous proposal of treating the COPY
target as a pipe specification, ie

I'd like to be able to filter STDIN if possible - with this syntax how
is COPY going to know to hook up STDIN to the program?

Huh? That's fairly nonsensical for the backend-side case; there's no
way that stdin (or stdout) of a backend is going to connect anywhere
useful for this purpose. As for doing it on the psql side (\copy),
I think it would be more or less automatic. If you do say

foo | psql -c "\copy tab from 'bar |'" dbname

then bar is going to inherit psql's stdin, which is coming from foo.

regards, tom lane

#29Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#28)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/14/2012 11:56 AM, Tom Lane wrote:

Andrew Dunstan <andrew@dunslane.net> writes:

On 11/14/2012 11:39 AM, Tom Lane wrote:

What happened to the previous proposal of treating the COPY
target as a pipe specification, ie

I'd like to be able to filter STDIN if possible - with this syntax how
is COPY going to know to hook up STDIN to the program?

Huh? That's fairly nonsensical for the backend-side case; there's no
way that stdin (or stdout) of a backend is going to connect anywhere
useful for this purpose. As for doing it on the psql side (\copy),
I think it would be more or less automatic. If you do say

foo | psql -c "\copy tab from 'bar |'" dbname

then bar is going to inherit psql's stdin, which is coming from foo.

Why does it make less sense on the backend than COPY foo FROM STDIN ?
Why shouldn't I want to be able to filter or transform the input? I have
a client with a pretty complex backend-driven ETL tool. One of the
annoying things about it is that we have to transfer the file to the
backend before we can process it. I can imagine this leading to a
similar annoyance.

cheers

andrew

#30Peter Eisentraut
peter_e@gmx.net
In reply to: Andrew Dunstan (#27)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/14/12 11:50 AM, Andrew Dunstan wrote:

COPY table FROM 'some command line |';
COPY table TO '| some command line';

I'd like to be able to filter STDIN if possible - with this syntax how
is COPY going to know to hook up STDIN to the program?

Why don't you filter the data before it gets to stdin? Some program is
feeding the data to "stdin" on the client side. Why doesn't that do the
filtering? I don't see a large advantage in having the data be sent
unfiltered to the server and having the server do the filtering.

#31Andrew Dunstan
andrew@dunslane.net
In reply to: Peter Eisentraut (#30)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/14/2012 02:05 PM, Peter Eisentraut wrote:

On 11/14/12 11:50 AM, Andrew Dunstan wrote:

COPY table FROM 'some command line |';
COPY table TO '| some command line';

I'd like to be able to filter STDIN if possible - with this syntax how
is COPY going to know to hook up STDIN to the program?

Why don't you filter the data before it gets to stdin? Some program is
feeding the data to "stdin" on the client side. Why doesn't that do the
filtering? I don't see a large advantage in having the data be sent
unfiltered to the server and having the server do the filtering.

Centralization of processing would be one obvious reason. I don't really
see why the same reasoning doesn't apply on the backend. You could just
preprocess the input before calling COPY (via a plperlu function for
example). If we're going to have filtering functionality then it should
be as general as possible, ISTM. But I seem to be alone in this, so I
won't push it.

cheers

andrew

#32Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andrew Dunstan (#31)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Andrew Dunstan <andrew@dunslane.net> writes:

On 11/14/2012 02:05 PM, Peter Eisentraut wrote:

Why don't you filter the data before it gets to stdin? Some program is
feeding the data to "stdin" on the client side. Why doesn't that do the
filtering? I don't see a large advantage in having the data be sent
unfiltered to the server and having the server do the filtering.

Centralization of processing would be one obvious reason.

If I understand correctly, what you're imagining is that the client
sources data to a COPY FROM STDIN type of command, then the backend
pipes that out to stdin of some filtering program, which it then reads
the stdout of to get the data it processes and stores.

We could in principle make that work, but there are some pretty serious
implementation problems: popen doesn't do this so we'd have to cons up
our own fork and pipe setup code, and we would have to write a bunch of
asynchronous processing logic to account for the possibility that the
filter program doesn't return data in similar-size chunk to what it
reads. (IOW, it will never be clear when to try to read data from the
filter and when to try to write data to it.)

I think it's way too complicated for the amount of functionality you'd
get. As Peter says, there's no strong reason not to do such processing
on the client side. In fact there are pretty strong reasons to prefer
to do it there, like not needing database superuser privilege to invoke
the filter program.

What I'm imagining is a very very simple addition to COPY that just
allows it to execute popen() instead of fopen() to read or write the
data source/sink. What you suggest would require hundreds of lines and
create many opportunities for new bugs.

regards, tom lane

#33Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#32)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/14/2012 02:37 PM, Tom Lane wrote:

What I'm imagining is a very very simple addition to COPY that just
allows it to execute popen() instead of fopen() to read or write the
data source/sink. What you suggest would require hundreds of lines and
create many opportunities for new bugs.

That's certainly a better answer than any I've had. I accept the reasoning.

cheers

andrew

#34Craig Ringer
craig@2ndQuadrant.com
In reply to: Tom Lane (#26)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/15/2012 12:46 AM, Tom Lane wrote:

Agreed, and there's also the question of passing switches etc to the
program, so the string can't be a bare file name anyway. I proposed
pipe symbols (|) in the string previously, but if you find that too
Unix-centric I suppose we could do

COPY TABLE FROM PROGRAM 'command line';
COPY TABLE TO PROGRAM 'command line';

I'd strongly prefer that from a security standpoint. I intensely dislike
the idea that COPY will change from a command that can at worst expose
data to a command that can execute programs. It means existing security
decisions in applications that use it must be re-evaluated ... and most
won't be. Also, it isn't too hard to check the command string for pipe
chars, but experience has taught over and over with SQL injection, shell
metacharacter exploits, XSS, etc that magic characters that you must
check for are a bad idea, and it's much better to have separate syntax
(like parameterized statements) rather than magic strings.

Additionally, the pipe design appears to presume the presence of a shell
and the desirability of using it. I don't think either assumption is
sensible.

Windows has a shell of sorts (cmd.exe) but its behaviour is different
with regards to quoting and it can be tricky to produce commands that
work under both a UNIX shell and cmd.exe .

More importantly, the shell provides fun opportunities for unexpected
side-effects via metacharacters, leading to undesired behaviour or even
exploits. It's IMO strongly preferable to use an argument vector and
direct execution, so the shell never gets involved.

How about:

COPY ... FROM PROGRAM '/bin/my_program', '$notavariable', '$(rm -rf
$HOME)';

or:

COPY ... FROM (PROGRAM '/bin/my_program', ARGUMENTS
('$notavariable', '$(rm -rf $HOME)') );

?

Something extensible would be good, as somebody is *inevitably* going to
ask "so how do I set environment variables before I call the command"
and "how do I control which return values are considered success".

--
Craig Ringer

regards, tom lane

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#35Tom Lane
tgl@sss.pgh.pa.us
In reply to: Craig Ringer (#34)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Craig Ringer <craig@2ndQuadrant.com> writes:

On 11/15/2012 12:46 AM, Tom Lane wrote:

Agreed, and there's also the question of passing switches etc to the
program, so the string can't be a bare file name anyway. I proposed
pipe symbols (|) in the string previously, but if you find that too
Unix-centric I suppose we could do

COPY TABLE FROM PROGRAM 'command line';
COPY TABLE TO PROGRAM 'command line';

I'd strongly prefer that from a security standpoint.

That's a reasonable concern.

Additionally, the pipe design appears to presume the presence of a shell
and the desirability of using it. I don't think either assumption is
sensible.

I disagree very very strongly with that. If we prevent use of shell
syntax, we will lose a lot of functionality, for instance

copy ... from program 'foo <somefile'
copy ... from program 'foo | bar'

unless you're imagining that we will reimplement a whole lot of that
same shell syntax for ourselves. (And no, I don't care whether the
Windows syntax is exactly the same or not. The program name/path is
already likely to vary across systems, so it's pointless to suppose that
use of the feature would be 100% portable if only we lobotomized it.)

More importantly, the shell provides fun opportunities for unexpected
side-effects via metacharacters, leading to undesired behaviour or even
exploits.

So? You're already handing the keys to the kingdom to anybody who can
control the contents of that command line, even if it's only to point at
the wrong program. And one man's "unexpected side-effect" is another
man's "essential feature", as in my examples above.

regards, tom lane

#36Craig Ringer
craig@2ndQuadrant.com
In reply to: Tom Lane (#35)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/15/2012 10:19 AM, Tom Lane wrote:

I disagree very very strongly with that. If we prevent use of shell
syntax, we will lose a lot of functionality, for instance

copy ... from program 'foo <somefile'
copy ... from program 'foo | bar'

unless you're imagining that we will reimplement a whole lot of that
same shell syntax for ourselves. (And no, I don't care whether the
Windows syntax is exactly the same or not. The program name/path is
already likely to vary across systems, so it's pointless to suppose that
use of the feature would be 100% portable if only we lobotomized it.)

That's reasonable - and it isn't worth making people jump through hoops
with ('bash','-c','/some/command < infile') .

So? You're already handing the keys to the kingdom to anybody who can
control the contents of that command line, even if it's only to point at
the wrong program. And one man's "unexpected side-effect" is another
man's "essential feature", as in my examples above.

That's true if they're controlling the whole command, not so much if
they just provide a file name. I'm just worried that people will use it
without thinking deeply about the consequences, just like they do with
untrusted client input in SQL injection attacks.

I take you point about wanting more than just the execve()-style
invocation. I'd still like to see a way to invoke the command without
having the shell involved, though; APIs to invoke external programs seem
to start out with a version that launches via the shell then quickly
grow more controlled argument-vector versions.

There's certainly room for a quick'n'easy COPY ... FROM PROGRAM ('cmd1 |
cmd2 | tee /tmp/log') . At this point all I think is really vital is to
make copy-with-exec *syntactically different* to plain COPY, and to
leave room for extending the syntax for environment, separate args
vector, etc when they're called for. Like VACUUM, where VACUUM VERBOSE
ANALYZE became VACUUM (VERBOSE, ANALYZE) to make room for (BUFFERS), etc.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#37Robert Haas
robertmhaas@gmail.com
In reply to: Craig Ringer (#36)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On Wed, Nov 14, 2012 at 10:14 PM, Craig Ringer <craig@2ndquadrant.com> wrote:

So? You're already handing the keys to the kingdom to anybody who can
control the contents of that command line, even if it's only to point at
the wrong program. And one man's "unexpected side-effect" is another
man's "essential feature", as in my examples above.

That's true if they're controlling the whole command, not so much if
they just provide a file name. I'm just worried that people will use it
without thinking deeply about the consequences, just like they do with
untrusted client input in SQL injection attacks.

Yeah. If we're going to do this at all, and I'm not convinced it's
worth the work, I think it's definitely good to support a variant
where we specify exactly the things that will be passed to exec().
There's just too many ways to accidentally shoot yourself in the foot
otherwise. If we want to have an option that lets people shoot
themselves in the foot, that's fine. But I think we'd be smart not to
make that the only option.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#38Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#37)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Robert Haas <robertmhaas@gmail.com> writes:

Yeah. If we're going to do this at all, and I'm not convinced it's
worth the work, I think it's definitely good to support a variant
where we specify exactly the things that will be passed to exec().
There's just too many ways to accidentally shoot yourself in the foot
otherwise. If we want to have an option that lets people shoot
themselves in the foot, that's fine. But I think we'd be smart not to
make that the only option.

[ shrug... ] Once again, that will turn this from a ten-line patch
into hundreds of lines (and some more, different, hundreds of lines
for Windows I bet), with a corresponding growth in the opportunities
for bugs, for a benefit that's at best debatable.

The biggest problem this patch has had from the very beginning is
overdesign, and this is more of the same. Let's please just define the
feature as "popen, not fopen, the given string" and have done. You can
put all the warning verbiage you want in the documentation. (But note
that the server-side version would be superuser-only in any flavor of
the feature.)

regards, tom lane

#39Craig Ringer
craig@2ndQuadrant.com
In reply to: Tom Lane (#38)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On 11/16/2012 03:35 AM, Tom Lane wrote:

The biggest problem this patch has had from the very beginning is
overdesign, and this is more of the same. Let's please just define the
feature as "popen, not fopen, the given string" and have done. You can
put all the warning verbiage you want in the documentation. (But note
that the server-side version would be superuser-only in any flavor of
the feature.)

I concede that as server-side COPY is superuser-only already it doesn't
offer the same potential for attack that it otherwise would. If
applications take unchecked file system paths from users and feed them
into a superuser command they already have security problems.

I'd still be much happier to have COPY ... FROM PROGRAM - or something -
to clearly make the two different, for clarity as much as security.

--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

#40Tom Lane
tgl@sss.pgh.pa.us
In reply to: Craig Ringer (#39)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Craig Ringer <craig@2ndQuadrant.com> writes:

I'd still be much happier to have COPY ... FROM PROGRAM - or something -
to clearly make the two different, for clarity as much as security.

I don't object to using a PROGRAM keyword rather than something inside
the string to select this behavior.

regards, tom lane

#41Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Tom Lane (#38)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

From: Tom Lane [mailto:tgl@sss.pgh.pa.us]

Robert Haas <robertmhaas@gmail.com> writes:

Yeah. If we're going to do this at all, and I'm not convinced it's
worth the work, I think it's definitely good to support a variant
where we specify exactly the things that will be passed to exec().
There's just too many ways to accidentally shoot yourself in the foot
otherwise. If we want to have an option that lets people shoot
themselves in the foot, that's fine. But I think we'd be smart not to
make that the only option.

[ shrug... ] Once again, that will turn this from a ten-line patch
into hundreds of lines (and some more, different, hundreds of lines
for Windows I bet), with a corresponding growth in the opportunities
for bugs, for a benefit that's at best debatable.

The biggest problem this patch has had from the very beginning is
overdesign, and this is more of the same. Let's please just define the
feature as "popen, not fopen, the given string" and have done. You can
put all the warning verbiage you want in the documentation. (But note
that the server-side version would be superuser-only in any flavor of
the feature.)

Agreed. I'll reimplement the feature using the PROGRAM keyword:

COPY TABLE FROM PROGRAM 'command line';
COPY TABLE TO PROGRAM 'command line';

Sorry for the late response.

Best regards,
Etsuro Fujita

#42Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#38)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

On Thu, Nov 15, 2012 at 2:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Robert Haas <robertmhaas@gmail.com> writes:

Yeah. If we're going to do this at all, and I'm not convinced it's
worth the work, I think it's definitely good to support a variant
where we specify exactly the things that will be passed to exec().
There's just too many ways to accidentally shoot yourself in the foot
otherwise. If we want to have an option that lets people shoot
themselves in the foot, that's fine. But I think we'd be smart not to
make that the only option.

[ shrug... ] Once again, that will turn this from a ten-line patch
into hundreds of lines (and some more, different, hundreds of lines
for Windows I bet), with a corresponding growth in the opportunities
for bugs, for a benefit that's at best debatable.

The biggest problem this patch has had from the very beginning is
overdesign, and this is more of the same. Let's please just define the
feature as "popen, not fopen, the given string" and have done.

I just don't agree with that. popen() is to security holes as cars
are to alcohol-related fatalities. In each case, the first one
doesn't directly cause the second one; but it's a pretty darn powerful
enabler. Your proposed solution won't force people to write insecure
applications; it'll just make it much more likely that they will do so
... after which, presumably, you'll tell them it's their own darn
fault for using the attractive nuisance. The list of security
vulnerabilities that are the result of insufficiently careful
validation of strings passed to popen() is extremely long. If we give
people a feature that can only be leveraged via popen(), the chances
that someone will thereby open a security hole are indistinguishable
from 1.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#43Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#42)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

Robert Haas <robertmhaas@gmail.com> writes:

On Thu, Nov 15, 2012 at 2:35 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

The biggest problem this patch has had from the very beginning is
overdesign, and this is more of the same. Let's please just define the
feature as "popen, not fopen, the given string" and have done.

... If we give
people a feature that can only be leveraged via popen(), the chances
that someone will thereby open a security hole are indistinguishable
from 1.

You are absolutely right that this feature is a security risk, but it
will be one whether it exposes popen() or only exec(). I do not believe
that the incremental gain in security from disallowing shell notation
is worth either the loss of functionality or the amount of added effort
(and added bugs, some of which will be security issues in themselves)
we'd need to write it that way.

The correct response to the security risks is to (a) make it
superuser-only and (b) document that it's a seriously bad idea to allow
the argument string to come from any untrusted sources. Please note
that we'd have to do these same things with an exec-based patch.

regards, tom lane

#44Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Etsuro Fujita (#41)
1 attachment(s)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

I wrote:

The biggest problem this patch has had from the very beginning is
overdesign, and this is more of the same. Let's please just define the
feature as "popen, not fopen, the given string" and have done. You can
put all the warning verbiage you want in the documentation. (But note
that the server-side version would be superuser-only in any flavor of
the feature.)

Agreed. I'll reimplement the feature using the PROGRAM keyword:

COPY TABLE FROM PROGRAM 'command line';
COPY TABLE TO PROGRAM 'command line';

I've reimplemented the feature. Attached is an updated version of the patch.

Todo:
* More documents
* More tests

Any comments are welcomed.

Thanks,

Best regards,
Etsuro Fujita

Attachments:

copy-popen-20121122.patchapplication/octet-stream; name=copy-popen-20121122.patchDownload
*** a/contrib/file_fdw/file_fdw.c
--- b/contrib/file_fdw/file_fdw.c
***************
*** 588,593 **** fileBeginForeignScan(ForeignScanState *node, int eflags)
--- 588,594 ----
  	 */
  	cstate = BeginCopyFrom(node->ss.ss_currentRelation,
  						   filename,
+ 						   false,
  						   NIL,
  						   options);
  
***************
*** 660,665 **** fileReScanForeignScan(ForeignScanState *node)
--- 661,667 ----
  
  	festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
  									festate->filename,
+ 									false,
  									NIL,
  									festate->options);
  }
***************
*** 993,999 **** file_acquire_sample_rows(Relation onerel, int elevel,
  	/*
  	 * Create CopyState from FDW options.
  	 */
! 	cstate = BeginCopyFrom(onerel, filename, NIL, options);
  
  	/*
  	 * Use per-tuple memory context to prevent leak of memory used to read
--- 995,1001 ----
  	/*
  	 * Create CopyState from FDW options.
  	 */
! 	cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
  
  	/*
  	 * Use per-tuple memory context to prevent leak of memory used to read
*** a/doc/src/sgml/keywords.sgml
--- b/doc/src/sgml/keywords.sgml
***************
*** 3514,3519 ****
--- 3514,3526 ----
      <entry>reserved</entry>
     </row>
     <row>
+     <entry><token>PROGRAM</token></entry>
+     <entry>non-reserved</entry>
+     <entry></entry>
+     <entry></entry>
+     <entry></entry>
+    </row>
+    <row>
      <entry><token>PUBLIC</token></entry>
      <entry></entry>
      <entry>non-reserved</entry>
*** a/doc/src/sgml/ref/copy.sgml
--- b/doc/src/sgml/ref/copy.sgml
***************
*** 23,33 **** PostgreSQL documentation
   <refsynopsisdiv>
  <synopsis>
  COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
!     FROM { '<replaceable class="parameter">filename</replaceable>' | STDIN }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) }
!     TO { '<replaceable class="parameter">filename</replaceable>' | STDOUT }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  <phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase>
--- 23,33 ----
   <refsynopsisdiv>
  <synopsis>
  COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
!     FROM { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDIN }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) }
!     TO { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDOUT }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  <phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase>
***************
*** 70,77 **** COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     <command>COPY</command> with a file name instructs the
     <productname>PostgreSQL</productname> server to directly read from
     or write to a file. The file must be accessible to the server and
!    the name must be specified from the viewpoint of the server. When
!    <literal>STDIN</literal> or <literal>STDOUT</literal> is
     specified, data is transmitted via the connection between the
     client and the server.
    </para>
--- 70,82 ----
     <command>COPY</command> with a file name instructs the
     <productname>PostgreSQL</productname> server to directly read from
     or write to a file. The file must be accessible to the server and
!    the name must be specified from the viewpoint of the server.
!    <command>COPY</command> with a command instructs the
!    <productname>PostgreSQL</productname> server to directly execute
!    a command that input comes from or that output goes to.
!    The command must be executable by the server and specified from
!    the viewpoint of the server.
!    When <literal>STDIN</literal> or <literal>STDOUT</literal> is
     specified, data is transmitted via the connection between the
     client and the server.
    </para>
***************
*** 125,130 **** COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
--- 130,147 ----
     </varlistentry>
  
     <varlistentry>
+     <term><replaceable class="parameter">command</replaceable></term>
+     <listitem>
+      <para>
+       The command that input comes from or that output goes to.
+       A command for COPY FROM, which input comes from, must write its output
+       to standard output, while that for COPY TO, which output goes to, must
+       read its input from standard input.
+      </para>
+     </listitem>
+    </varlistentry>
+ 
+    <varlistentry>
      <term><literal>STDIN</literal></term>
      <listitem>
       <para>
***************
*** 373,378 **** COPY <replaceable class="parameter">count</replaceable>
--- 390,413 ----
     </para>
  
     <para>
+     Likewise, commands specified in a <command>COPY</command> command 
+     are executed directly by the server. Therefore, they must be
+     executable at the database server machine by the <productname>
+     PostgreSQL</productname> user. <command>COPY</command> specifying
+     a command is only allowed to database superusers.
+     It is also recommended that the command specified in
+     <command>COPY</command> always be set using an absolute paths.
+     However, a relative path can be specified, which will also be
+     interpreted relative to the working directory of the server process.
+    </para>
+ 
+    <para>
+     Commands specified in a <command>COPY</command> command might not be
+     executed in operating systems that implement access control for
+     their resources such as the SELinux operating system.
+    </para>
+ 
+    <para>
      <command>COPY FROM</command> will invoke any triggers and check
      constraints on the destination table. However, it will not invoke rules.
     </para>
*** a/doc/src/sgml/ref/psql-ref.sgml
--- b/doc/src/sgml/ref/psql-ref.sgml
***************
*** 829,835 **** testdb=&gt;
        <varlistentry id="APP-PSQL-meta-commands-copy">
          <term><literal>\copy { <replaceable class="parameter">table</replaceable> [ ( <replaceable class="parameter">column_list</replaceable> ) ] | ( <replaceable class="parameter">query</replaceable> ) }
          { <literal>from</literal> | <literal>to</literal> }
!         { <replaceable class="parameter">filename</replaceable> | stdin | stdout | pstdin | pstdout }
          [ [ with ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]</literal></term>
  
          <listitem>
--- 829,835 ----
        <varlistentry id="APP-PSQL-meta-commands-copy">
          <term><literal>\copy { <replaceable class="parameter">table</replaceable> [ ( <replaceable class="parameter">column_list</replaceable> ) ] | ( <replaceable class="parameter">query</replaceable> ) }
          { <literal>from</literal> | <literal>to</literal> }
!         { <replaceable class="parameter">filename</replaceable> | program <replaceable class="parameter">command</replaceable> | stdin | stdout | pstdin | pstdout }
          [ [ with ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]</literal></term>
  
          <listitem>
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 107,112 **** typedef struct CopyStateData
--- 107,113 ----
  	QueryDesc  *queryDesc;		/* executable query to copy from */
  	List	   *attnumlist;		/* integer list of attnums to copy */
  	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
+ 	bool		is_program;		/* do I execute popen/pclose? */
  	bool		binary;			/* binary format? */
  	bool		oids;			/* include OIDs? */
  	bool		csv_mode;		/* Comma Separated Value format? */
***************
*** 276,282 **** static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
  		  const char *queryString, List *attnamelist, List *options);
  static void EndCopy(CopyState cstate);
  static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
! 			const char *filename, List *attnamelist, List *options);
  static void EndCopyTo(CopyState cstate);
  static uint64 DoCopyTo(CopyState cstate);
  static uint64 CopyTo(CopyState cstate);
--- 277,284 ----
  		  const char *queryString, List *attnamelist, List *options);
  static void EndCopy(CopyState cstate);
  static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
! 							 const char *filename, bool is_program,
! 							 List *attnamelist, List *options);
  static void EndCopyTo(CopyState cstate);
  static uint64 DoCopyTo(CopyState cstate);
  static uint64 CopyTo(CopyState cstate);
***************
*** 807,820 **** DoCopy(const CopyStmt *stmt, const char *queryString)
  		if (XactReadOnly && rel->rd_backend != MyBackendId)
  			PreventCommandIfReadOnly("COPY FROM");
  
! 		cstate = BeginCopyFrom(rel, stmt->filename,
  							   stmt->attlist, stmt->options);
  		processed = CopyFrom(cstate);	/* copy from file to database */
  		EndCopyFrom(cstate);
  	}
  	else
  	{
! 		cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
  							 stmt->attlist, stmt->options);
  		processed = DoCopyTo(cstate);	/* copy from database to file */
  		EndCopyTo(cstate);
--- 809,823 ----
  		if (XactReadOnly && rel->rd_backend != MyBackendId)
  			PreventCommandIfReadOnly("COPY FROM");
  
! 		cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
  							   stmt->attlist, stmt->options);
  		processed = CopyFrom(cstate);	/* copy from file to database */
  		EndCopyFrom(cstate);
  	}
  	else
  	{
! 		cstate = BeginCopyTo(rel, stmt->query, queryString,
! 							 stmt->filename, stmt->is_program,
  							 stmt->attlist, stmt->options);
  		processed = DoCopyTo(cstate);	/* copy from database to file */
  		EndCopyTo(cstate);
***************
*** 1382,1392 **** BeginCopy(bool is_from,
  static void
  EndCopy(CopyState cstate)
  {
! 	if (cstate->filename != NULL && FreeFile(cstate->copy_file))
! 		ereport(ERROR,
! 				(errcode_for_file_access(),
! 				 errmsg("could not close file \"%s\": %m",
! 						cstate->filename)));
  
  	MemoryContextDelete(cstate->copycontext);
  	pfree(cstate);
--- 1385,1405 ----
  static void
  EndCopy(CopyState cstate)
  {
! 	if (cstate->is_program)
! 	{
! 		if (ClosePipeStream(cstate->copy_file) == -1)
! 			ereport(ERROR,
! 					(errmsg("could not execute command \"%s\"",
! 							cstate->filename)));
! 	}
! 	else
! 	{
! 		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not close file \"%s\": %m",
! 							cstate->filename)));
! 	}
  
  	MemoryContextDelete(cstate->copycontext);
  	pfree(cstate);
***************
*** 1400,1405 **** BeginCopyTo(Relation rel,
--- 1413,1419 ----
  			Node *query,
  			const char *queryString,
  			const char *filename,
+ 			bool  is_program,
  			List *attnamelist,
  			List *options)
  {
***************
*** 1436,1441 **** BeginCopyTo(Relation rel,
--- 1450,1461 ----
  	cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
  	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
  
+ 	cstate->is_program = is_program;
+ 	if (cstate->is_program && pipe)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("PROGRAM is not supported to stdout or from stdin")));
+ 
  	if (pipe)
  	{
  		if (whereToSendOutput != DestRemote)
***************
*** 1443,1470 **** BeginCopyTo(Relation rel,
  	}
  	else
  	{
- 		mode_t		oumask;		/* Pre-existing umask value */
  		struct stat st;
  
- 		/*
- 		 * Prevent write to relative path ... too easy to shoot oneself in the
- 		 * foot by overwriting a database file ...
- 		 */
- 		if (!is_absolute_path(filename))
- 			ereport(ERROR,
- 					(errcode(ERRCODE_INVALID_NAME),
- 					 errmsg("relative path not allowed for COPY to file")));
- 
  		cstate->filename = pstrdup(filename);
- 		oumask = umask(S_IWGRP | S_IWOTH);
- 		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
- 		umask(oumask);
  
! 		if (cstate->copy_file == NULL)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not open file \"%s\" for writing: %m",
! 							cstate->filename)));
  
  		fstat(fileno(cstate->copy_file), &st);
  		if (S_ISDIR(st.st_mode))
--- 1463,1502 ----
  	}
  	else
  	{
  		struct stat st;
  
  		cstate->filename = pstrdup(filename);
  
! 		if (cstate->is_program)
! 		{
! 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errmsg("could not execute command \"%s\": %m",
! 								cstate->filename)));
! 		}
! 		else
! 		{
! 			mode_t		oumask;		/* Pre-existing umask value */
! 
! 			/*
! 			 * Prevent write to relative path ... too easy to shoot oneself in
! 			 * the foot by overwriting a database file ...
! 			 */
! 			if (!is_absolute_path(filename))
! 				ereport(ERROR,
! 						(errcode(ERRCODE_INVALID_NAME),
! 						 errmsg("relative path not allowed for COPY to file")));
! 
! 			oumask = umask(S_IWGRP | S_IWOTH);
! 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
! 			umask(oumask);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errcode_for_file_access(),
! 						 errmsg("could not open file \"%s\" for writing: %m",
! 								cstate->filename)));
! 		}
  
  		fstat(fileno(cstate->copy_file), &st);
  		if (S_ISDIR(st.st_mode))
***************
*** 2270,2275 **** CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
--- 2302,2308 ----
  CopyState
  BeginCopyFrom(Relation rel,
  			  const char *filename,
+ 			  bool	is_program,
  			  List *attnamelist,
  			  List *options)
  {
***************
*** 2367,2372 **** BeginCopyFrom(Relation rel,
--- 2400,2411 ----
  	cstate->volatile_defexprs = volatile_defexprs;
  	cstate->num_defaults = num_defaults;
  
+ 	cstate->is_program = is_program;
+ 	if (cstate->is_program && pipe)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("PROGRAM is not supported to stdout or from stdin")));
+ 
  	if (pipe)
  	{
  		if (whereToSendOutput == DestRemote)
***************
*** 2379,2391 **** BeginCopyFrom(Relation rel,
  		struct stat st;
  
  		cstate->filename = pstrdup(filename);
- 		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
  
! 		if (cstate->copy_file == NULL)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not open file \"%s\" for reading: %m",
! 							cstate->filename)));
  
  		fstat(fileno(cstate->copy_file), &st);
  		if (S_ISDIR(st.st_mode))
--- 2418,2441 ----
  		struct stat st;
  
  		cstate->filename = pstrdup(filename);
  
! 		if (cstate->is_program)
! 		{
! 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errmsg("could not execute command \"%s\": %m",
! 								cstate->filename)));
! 		}
! 		else
! 		{
! 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errcode_for_file_access(),
! 						 errmsg("could not open file \"%s\" for reading: %m",
! 								cstate->filename)));
! 		}
  
  		fstat(fileno(cstate->copy_file), &st);
  		if (S_ISDIR(st.st_mode))
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
***************
*** 2703,2708 **** _copyCopyStmt(const CopyStmt *from)
--- 2703,2709 ----
  	COPY_NODE_FIELD(attlist);
  	COPY_SCALAR_FIELD(is_from);
  	COPY_STRING_FIELD(filename);
+ 	COPY_SCALAR_FIELD(is_program);
  	COPY_NODE_FIELD(options);
  
  	return newnode;
*** a/src/backend/nodes/equalfuncs.c
--- b/src/backend/nodes/equalfuncs.c
***************
*** 1090,1095 **** _equalCopyStmt(const CopyStmt *a, const CopyStmt *b)
--- 1090,1096 ----
  	COMPARE_NODE_FIELD(attlist);
  	COMPARE_SCALAR_FIELD(is_from);
  	COMPARE_STRING_FIELD(filename);
+ 	COMPARE_SCALAR_FIELD(is_program);
  	COMPARE_NODE_FIELD(options);
  
  	return true;
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 379,385 **** static void processCASbits(int cas_bits, int location, const char *constrType,
  %type <boolean> opt_freeze opt_default opt_recheck
  %type <defelt>	opt_binary opt_oids copy_delimiter
  
! %type <boolean> copy_from
  
  %type <ival>	opt_column event cursor_options opt_hold opt_set_data
  %type <objtype>	reindex_type drop_type comment_type security_label_type
--- 379,385 ----
  %type <boolean> opt_freeze opt_default opt_recheck
  %type <defelt>	opt_binary opt_oids copy_delimiter
  
! %type <boolean> copy_from copy_program
  
  %type <ival>	opt_column event cursor_options opt_hold opt_set_data
  %type <objtype>	reindex_type drop_type comment_type security_label_type
***************
*** 566,572 **** static void processCASbits(int cas_bits, int location, const char *constrType,
  
  	PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION
  	PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
! 	PRIOR PRIVILEGES PROCEDURAL PROCEDURE
  
  	QUOTE
  
--- 566,572 ----
  
  	PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION
  	PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
! 	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM
  
  	QUOTE
  
***************
*** 2314,2327 **** ClosePortalStmt:
   *****************************************************************************/
  
  CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
! 			copy_from copy_file_name copy_delimiter opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = $3;
  					n->query = NULL;
  					n->attlist = $4;
  					n->is_from = $6;
! 					n->filename = $7;
  
  					n->options = NIL;
  					/* Concatenate user-supplied flags */
--- 2314,2328 ----
   *****************************************************************************/
  
  CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
! 			copy_from copy_program copy_file_name copy_delimiter opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = $3;
  					n->query = NULL;
  					n->attlist = $4;
  					n->is_from = $6;
! 					n->is_program = $7;
! 					n->filename = $8;
  
  					n->options = NIL;
  					/* Concatenate user-supplied flags */
***************
*** 2329,2349 **** CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
  						n->options = lappend(n->options, $2);
  					if ($5)
  						n->options = lappend(n->options, $5);
! 					if ($8)
! 						n->options = lappend(n->options, $8);
! 					if ($10)
! 						n->options = list_concat(n->options, $10);
  					$$ = (Node *)n;
  				}
! 			| COPY select_with_parens TO copy_file_name opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = NULL;
  					n->query = $2;
  					n->attlist = NIL;
  					n->is_from = false;
! 					n->filename = $4;
! 					n->options = $6;
  					$$ = (Node *)n;
  				}
  		;
--- 2330,2351 ----
  						n->options = lappend(n->options, $2);
  					if ($5)
  						n->options = lappend(n->options, $5);
! 					if ($9)
! 						n->options = lappend(n->options, $9);
! 					if ($11)
! 						n->options = list_concat(n->options, $11);
  					$$ = (Node *)n;
  				}
! 			| COPY select_with_parens TO copy_program copy_file_name opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = NULL;
  					n->query = $2;
  					n->attlist = NIL;
  					n->is_from = false;
! 					n->is_program = $4;
! 					n->filename = $5;
! 					n->options = $7;
  					$$ = (Node *)n;
  				}
  		;
***************
*** 2353,2358 **** copy_from:
--- 2355,2365 ----
  			| TO									{ $$ = FALSE; }
  		;
  
+ copy_program:
+ 			PROGRAM									{ $$ = TRUE; }
+ 			| /* EMPTY */							{ $$ = FALSE; }
+ 		;
+ 
  /*
   * copy_file_name NULL indicates stdio is used. Whether stdin or stdout is
   * used depends on the direction. (It really doesn't make sense to copy from
***************
*** 12595,12600 **** unreserved_keyword:
--- 12602,12608 ----
  			| PRIVILEGES
  			| PROCEDURAL
  			| PROCEDURE
+ 			| PROGRAM
  			| QUOTE
  			| RANGE
  			| READ
*** a/src/backend/storage/file/fd.c
--- b/src/backend/storage/file/fd.c
***************
*** 183,188 **** static uint64 temporary_files_size = 0;
--- 183,189 ----
  typedef enum
  {
  	AllocateDescFile,
+ 	AllocateDescPipe,
  	AllocateDescDir
  } AllocateDescKind;
  
***************
*** 1539,1544 **** FreeDesc(AllocateDesc *desc)
--- 1540,1548 ----
  		case AllocateDescFile:
  			result = fclose(desc->desc.file);
  			break;
+ 		case AllocateDescPipe:
+ 			result = pclose_check(desc->desc.file);
+ 			break;
  		case AllocateDescDir:
  			result = closedir(desc->desc.dir);
  			break;
***************
*** 1583,1588 **** FreeFile(FILE *file)
--- 1587,1663 ----
  	return fclose(file);
  }
  
+ FILE *
+ OpenPipeStream(const char *command, const char *mode)
+ {
+ 	FILE	   *file;
+ 
+ 	DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
+ 			   numAllocatedDescs, command));
+ 
+ 	/*
+ 	 * The test against MAX_ALLOCATED_DESCS prevents us from overflowing
+ 	 * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile
+ 	 * from hogging every one of the available FDs, which'd lead to infinite
+ 	 * looping.
+ 	 */
+ 	if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
+ 		numAllocatedDescs >= max_safe_fds - 1)
+ 		elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"",
+ 			 command);
+ 
+ TryAgain:
+ 	fflush(stdout);
+ 	fflush(stderr);
+ 	errno = 0;
+ 	if ((file = popen(command, mode)) != NULL)
+ 	{
+ 		AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
+ 
+ 		desc->kind = AllocateDescPipe;
+ 		desc->desc.file = file;
+ 		desc->create_subid = GetCurrentSubTransactionId();
+ 		numAllocatedDescs++;
+ 		return desc->desc.file;
+ 	}
+ 
+ 	if (errno == EMFILE || errno == ENFILE)
+ 	{
+ 		int			save_errno = errno;
+ 
+ 		ereport(LOG,
+ 				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ 				 errmsg("out of file descriptors: %m; release and retry")));
+ 		errno = 0;
+ 		if (ReleaseLruFile())
+ 			goto TryAgain;
+ 		errno = save_errno;
+ 	}
+ 
+ 	return NULL;
+ }
+ 
+ int
+ ClosePipeStream(FILE *file)
+ {
+ 	int			i;
+ 
+ 	DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs));
+ 
+ 	/* Remove file from list of allocated files, if it's present */
+ 	for (i = numAllocatedDescs; --i >= 0;)
+ 	{
+ 		AllocateDesc *desc = &allocatedDescs[i];
+ 
+ 		if (desc->kind == AllocateDescPipe && desc->desc.file == file)
+ 			return FreeDesc(desc);
+ 	}
+ 
+ 	/* Only get here if someone passes us a file not in allocatedDescs */
+ 	elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream");
+ 
+ 	return pclose_check(file);
+ }
  
  /*
   * Routines that want to use <dirent.h> (ie, DIR*) should use AllocateDir
*** a/src/bin/psql/copy.c
--- b/src/bin/psql/copy.c
***************
*** 52,57 **** struct copy_options
--- 52,58 ----
  	char	   *before_tofrom;	/* COPY string before TO/FROM */
  	char	   *after_tofrom;	/* COPY string after TO/FROM filename */
  	char	   *file;			/* NULL = stdin/stdout */
+ 	bool		program;			/* do I execute popen/pclose? */
  	bool		psql_inout;		/* true = use psql stdin/stdout */
  	bool		from;			/* true = FROM, false = TO */
  };
***************
*** 191,201 **** parse_slash_copy(const char *args)
  	else
  		goto error;
  
! 	token = strtokx(NULL, whitespace, NULL, "'",
! 					0, false, true, pset.encoding);
  	if (!token)
  		goto error;
  
  	if (pg_strcasecmp(token, "stdin") == 0 ||
  		pg_strcasecmp(token, "stdout") == 0)
  	{
--- 192,218 ----
  	else
  		goto error;
  
! 	token = strtokx(NULL, whitespace, NULL, NULL,
! 					0, false, false, pset.encoding);
  	if (!token)
  		goto error;
  
+ 	if (pg_strcasecmp(token, "program") == 0)
+ 	{
+ 		result->program = true;
+ 		token = strtokx(NULL, whitespace, NULL, "'",
+ 						0, false, true, pset.encoding);
+ 		if (!token)
+ 			goto error;
+ 	}
+ 	else
+ 	{
+ 		result->program = false;
+ 		strip_quotes(token, '\'', 0, pset.encoding);
+ 		if (!token)
+ 			goto error;
+ 	}
+ 
  	if (pg_strcasecmp(token, "stdin") == 0 ||
  		pg_strcasecmp(token, "stdout") == 0)
  	{
***************
*** 212,218 **** parse_slash_copy(const char *args)
  	{
  		result->psql_inout = false;
  		result->file = pg_strdup(token);
! 		expand_tilde(&result->file);
  	}
  
  	/* Collect the rest of the line (COPY options) */
--- 229,236 ----
  	{
  		result->psql_inout = false;
  		result->file = pg_strdup(token);
! 		if (!result->program)
! 			expand_tilde(&result->file);
  	}
  
  	/* Collect the rest of the line (COPY options) */
***************
*** 256,263 **** do_copy(const char *args)
  	if (!options)
  		return false;
  
  	/* prepare to read or write the target file */
! 	if (options->file)
  		canonicalize_path(options->file);
  
  	if (options->from)
--- 274,288 ----
  	if (!options)
  		return false;
  
+ 	/* program is not supported to stdout or from stdin */
+ 	if (options->program && options->file == NULL)
+ 	{
+ 		psql_error("program is not supported to stdout/pstdout or from stdin/pstdin\n");
+ 		return false;
+ 	}
+ 
  	/* prepare to read or write the target file */
! 	if (!options->program && options->file)
  		canonicalize_path(options->file);
  
  	if (options->from)
***************
*** 265,271 **** do_copy(const char *args)
  		override_file = &pset.cur_cmd_source;
  
  		if (options->file)
! 			copystream = fopen(options->file, PG_BINARY_R);
  		else if (!options->psql_inout)
  			copystream = pset.cur_cmd_source;
  		else
--- 290,306 ----
  		override_file = &pset.cur_cmd_source;
  
  		if (options->file)
! 		{
! 			if (options->program)
! 			{
! 				fflush(stdout);
! 				fflush(stderr);
! 				errno = 0;
! 				copystream = popen(options->file, PG_BINARY_R);
! 			}
! 			else
! 				copystream = fopen(options->file, PG_BINARY_R);
! 		}
  		else if (!options->psql_inout)
  			copystream = pset.cur_cmd_source;
  		else
***************
*** 276,282 **** do_copy(const char *args)
  		override_file = &pset.queryFout;
  
  		if (options->file)
! 			copystream = fopen(options->file, PG_BINARY_W);
  		else if (!options->psql_inout)
  			copystream = pset.queryFout;
  		else
--- 311,327 ----
  		override_file = &pset.queryFout;
  
  		if (options->file)
! 		{
! 			if (options->program)
! 			{
! 				fflush(stdout);
! 				fflush(stderr);
! 				errno = 0;
! 				copystream = popen(options->file, PG_BINARY_W);
! 			}
! 			else
! 				copystream = fopen(options->file, PG_BINARY_W);
! 		}
  		else if (!options->psql_inout)
  			copystream = pset.queryFout;
  		else
***************
*** 285,292 **** do_copy(const char *args)
  
  	if (!copystream)
  	{
! 		psql_error("%s: %s\n",
! 				   options->file, strerror(errno));
  		free_copy_options(options);
  		return false;
  	}
--- 330,341 ----
  
  	if (!copystream)
  	{
! 		if (options->program)
! 			psql_error("could not execute command \"%s\": %s\n",
! 					   options->file, strerror(errno));
! 		else
! 			psql_error("%s: %s\n",
! 					   options->file, strerror(errno));
  		free_copy_options(options);
  		return false;
  	}
***************
*** 322,331 **** do_copy(const char *args)
  
  	if (options->file != NULL)
  	{
! 		if (fclose(copystream) != 0)
  		{
! 			psql_error("%s: %s\n", options->file, strerror(errno));
! 			success = false;
  		}
  	}
  	free_copy_options(options);
--- 371,392 ----
  
  	if (options->file != NULL)
  	{
! 		if (options->program)
! 		{
! 			if (pclose_check(copystream) == -1)
! 			{
! 				psql_error("could not execute command \"%s\": %s\n",
! 						   options->file);
! 				success = false;
! 			}
! 		}
! 		else
  		{
! 			if (fclose(copystream) != 0)
! 			{
! 				psql_error("%s: %s\n", options->file, strerror(errno));
! 				success = false;
! 			}
  		}
  	}
  	free_copy_options(options);
*** a/src/bin/psql/stringutils.c
--- b/src/bin/psql/stringutils.c
***************
*** 13,21 ****
  #include "stringutils.h"
  
  
- static void strip_quotes(char *source, char quote, char escape, int encoding);
- 
- 
  /*
   * Replacement for strtok() (a.k.a. poor man's flex)
   *
--- 13,18 ----
***************
*** 239,245 **** strtokx(const char *s,
   *
   * Note that the source string is overwritten in-place.
   */
! static void
  strip_quotes(char *source, char quote, char escape, int encoding)
  {
  	char	   *src;
--- 236,242 ----
   *
   * Note that the source string is overwritten in-place.
   */
! void
  strip_quotes(char *source, char quote, char escape, int encoding)
  {
  	char	   *src;
*** a/src/bin/psql/stringutils.h
--- b/src/bin/psql/stringutils.h
***************
*** 19,24 **** extern char *strtokx(const char *s,
--- 19,26 ----
  		bool del_quotes,
  		int encoding);
  
+ extern void strip_quotes(char *source, char quote, char escape, int encoding);
+ 
  extern char *quote_if_needed(const char *source, const char *entails_quote,
  				char quote, char escape, int encoding);
  
*** a/src/include/commands/copy.h
--- b/src/include/commands/copy.h
***************
*** 25,31 **** extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
  
  extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options);
  extern CopyState BeginCopyFrom(Relation rel, const char *filename,
! 			  List *attnamelist, List *options);
  extern void EndCopyFrom(CopyState cstate);
  extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
  			 Datum *values, bool *nulls, Oid *tupleOid);
--- 25,31 ----
  
  extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options);
  extern CopyState BeginCopyFrom(Relation rel, const char *filename,
! 			  bool is_program, List *attnamelist, List *options);
  extern void EndCopyFrom(CopyState cstate);
  extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
  			 Datum *values, bool *nulls, Oid *tupleOid);
*** a/src/include/nodes/parsenodes.h
--- b/src/include/nodes/parsenodes.h
***************
*** 1397,1402 **** typedef struct CopyStmt
--- 1397,1403 ----
  								 * for all columns */
  	bool		is_from;		/* TO or FROM */
  	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
+ 	bool		is_program;		/* do I execute popen/pclose? */
  	List	   *options;		/* List of DefElem nodes */
  } CopyStmt;
  
*** a/src/include/parser/kwlist.h
--- b/src/include/parser/kwlist.h
***************
*** 292,297 **** PG_KEYWORD("prior", PRIOR, UNRESERVED_KEYWORD)
--- 292,298 ----
  PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD)
  PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD)
  PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD)
+ PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD)
  PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD)
  PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD)
  PG_KEYWORD("read", READ, UNRESERVED_KEYWORD)
*** a/src/include/storage/fd.h
--- b/src/include/storage/fd.h
***************
*** 79,84 **** extern char *FilePathName(File file);
--- 79,88 ----
  extern FILE *AllocateFile(const char *name, const char *mode);
  extern int	FreeFile(FILE *file);
  
+ /* Operations that allow use of pipe stream */
+ extern FILE *OpenPipeStream(const char *command, const char *mode);
+ extern int	ClosePipeStream(FILE *file);
+ 
  /* Operations to allow use of the <dirent.h> library routines */
  extern DIR *AllocateDir(const char *dirname);
  extern struct dirent *ReadDir(DIR *dir, const char *dirname);
*** a/src/interfaces/ecpg/preproc/ecpg.addons
--- b/src/interfaces/ecpg/preproc/ecpg.addons
***************
*** 192,205 **** ECPG: where_or_current_clauseWHERECURRENT_POFcursor_name block
  		char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4;
  		$$ = cat_str(2,mm_strdup("where current of"), cursor_marker);
  	}
! ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromcopy_file_namecopy_delimiteropt_withcopy_options addon
  			if (strcmp($6, "to") == 0 && strcmp($7, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
  			else if (strcmp($6, "from") == 0 && strcmp($7, "stdout") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY FROM STDOUT is not possible");
  			else if (strcmp($6, "from") == 0 && strcmp($7, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");
! ECPG: CopyStmtCOPYselect_with_parensTOcopy_file_nameopt_withcopy_options addon
  			if (strcmp($4, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
  ECPG: var_valueNumericOnly addon
--- 192,205 ----
  		char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4;
  		$$ = cat_str(2,mm_strdup("where current of"), cursor_marker);
  	}
! ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromcopy_programcopy_file_namecopy_delimiteropt_withcopy_options addon
  			if (strcmp($6, "to") == 0 && strcmp($7, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
  			else if (strcmp($6, "from") == 0 && strcmp($7, "stdout") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY FROM STDOUT is not possible");
  			else if (strcmp($6, "from") == 0 && strcmp($7, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");
! ECPG: CopyStmtCOPYselect_with_parensTOcopy_programcopy_file_nameopt_withcopy_options addon
  			if (strcmp($4, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
  ECPG: var_valueNumericOnly addon
#45Etsuro Fujita
fujita.etsuro@lab.ntt.co.jp
In reply to: Etsuro Fujita (#44)
1 attachment(s)
Re: WIP patch: add (PRE|POST)PROCESSOR options to COPY

I wrote:

I'll reimplement the feature using the PROGRAM keyword:

COPY TABLE FROM PROGRAM 'command line';
COPY TABLE TO PROGRAM 'command line';

I've reimplemented the feature. Attached is an updated version of the patch.

I fixed bugs in the previous version of the patch. Please find attached an
updated version of the patch.

Thanks,

Best regards,
Etsuro Fujita

Attachments:

copy-popen-20121126.patchapplication/octet-stream; name=copy-popen-20121126.patchDownload
*** a/contrib/file_fdw/file_fdw.c
--- b/contrib/file_fdw/file_fdw.c
***************
*** 588,593 **** fileBeginForeignScan(ForeignScanState *node, int eflags)
--- 588,594 ----
  	 */
  	cstate = BeginCopyFrom(node->ss.ss_currentRelation,
  						   filename,
+ 						   false,
  						   NIL,
  						   options);
  
***************
*** 660,665 **** fileReScanForeignScan(ForeignScanState *node)
--- 661,667 ----
  
  	festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
  									festate->filename,
+ 									false,
  									NIL,
  									festate->options);
  }
***************
*** 993,999 **** file_acquire_sample_rows(Relation onerel, int elevel,
  	/*
  	 * Create CopyState from FDW options.
  	 */
! 	cstate = BeginCopyFrom(onerel, filename, NIL, options);
  
  	/*
  	 * Use per-tuple memory context to prevent leak of memory used to read
--- 995,1001 ----
  	/*
  	 * Create CopyState from FDW options.
  	 */
! 	cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
  
  	/*
  	 * Use per-tuple memory context to prevent leak of memory used to read
*** a/doc/src/sgml/keywords.sgml
--- b/doc/src/sgml/keywords.sgml
***************
*** 3514,3519 ****
--- 3514,3526 ----
      <entry>reserved</entry>
     </row>
     <row>
+     <entry><token>PROGRAM</token></entry>
+     <entry>non-reserved</entry>
+     <entry></entry>
+     <entry></entry>
+     <entry></entry>
+    </row>
+    <row>
      <entry><token>PUBLIC</token></entry>
      <entry></entry>
      <entry>non-reserved</entry>
*** a/doc/src/sgml/ref/copy.sgml
--- b/doc/src/sgml/ref/copy.sgml
***************
*** 23,33 **** PostgreSQL documentation
   <refsynopsisdiv>
  <synopsis>
  COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
!     FROM { '<replaceable class="parameter">filename</replaceable>' | STDIN }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) }
!     TO { '<replaceable class="parameter">filename</replaceable>' | STDOUT }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  <phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase>
--- 23,33 ----
   <refsynopsisdiv>
  <synopsis>
  COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
!     FROM { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDIN }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) }
!     TO { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDOUT }
      [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
  
  <phrase>where <replaceable class="parameter">option</replaceable> can be one of:</phrase>
***************
*** 70,77 **** COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     <command>COPY</command> with a file name instructs the
     <productname>PostgreSQL</productname> server to directly read from
     or write to a file. The file must be accessible to the server and
!    the name must be specified from the viewpoint of the server. When
!    <literal>STDIN</literal> or <literal>STDOUT</literal> is
     specified, data is transmitted via the connection between the
     client and the server.
    </para>
--- 70,82 ----
     <command>COPY</command> with a file name instructs the
     <productname>PostgreSQL</productname> server to directly read from
     or write to a file. The file must be accessible to the server and
!    the name must be specified from the viewpoint of the server.
!    <command>COPY</command> with a command instructs the
!    <productname>PostgreSQL</productname> server to directly execute
!    a command that input comes from or that output goes to.
!    The command must be executable by the server and specified from
!    the viewpoint of the server.
!    When <literal>STDIN</literal> or <literal>STDOUT</literal> is
     specified, data is transmitted via the connection between the
     client and the server.
    </para>
***************
*** 125,130 **** COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
--- 130,147 ----
     </varlistentry>
  
     <varlistentry>
+     <term><replaceable class="parameter">command</replaceable></term>
+     <listitem>
+      <para>
+       The command that input comes from or that output goes to.
+       A command for COPY FROM, which input comes from, must write its output
+       to standard output, while that for COPY TO, which output goes to, must
+       read its input from standard input.
+      </para>
+     </listitem>
+    </varlistentry>
+ 
+    <varlistentry>
      <term><literal>STDIN</literal></term>
      <listitem>
       <para>
***************
*** 373,378 **** COPY <replaceable class="parameter">count</replaceable>
--- 390,414 ----
     </para>
  
     <para>
+     Commands specified in a <command>COPY</command> command are executed
+     directly by the server, not by the client application. Therefore,
+     they must be executable at the database server machine, not the client.
+     They must be executable by the <productname>PostgreSQL</productname>
+     user, not the client. <command>COPY</command> specifying a command
+     is only allowed to database superusers.
+     It is recommended that the command specified in
+     <command>COPY</command> always be set using an absolute paths.
+     However, a relative path can be specified, which will be interpreted
+     relative to the working directory of the server process.
+    </para>
+ 
+    <para>
+     Commands specified in a <command>COPY</command> command might not be
+     executed in operating systems that implement access control for
+     their resources such as the SELinux operating system.
+    </para>
+ 
+    <para>
      <command>COPY FROM</command> will invoke any triggers and check
      constraints on the destination table. However, it will not invoke rules.
     </para>
*** a/doc/src/sgml/ref/psql-ref.sgml
--- b/doc/src/sgml/ref/psql-ref.sgml
***************
*** 829,835 **** testdb=&gt;
        <varlistentry id="APP-PSQL-meta-commands-copy">
          <term><literal>\copy { <replaceable class="parameter">table</replaceable> [ ( <replaceable class="parameter">column_list</replaceable> ) ] | ( <replaceable class="parameter">query</replaceable> ) }
          { <literal>from</literal> | <literal>to</literal> }
!         { <replaceable class="parameter">filename</replaceable> | stdin | stdout | pstdin | pstdout }
          [ [ with ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]</literal></term>
  
          <listitem>
--- 829,835 ----
        <varlistentry id="APP-PSQL-meta-commands-copy">
          <term><literal>\copy { <replaceable class="parameter">table</replaceable> [ ( <replaceable class="parameter">column_list</replaceable> ) ] | ( <replaceable class="parameter">query</replaceable> ) }
          { <literal>from</literal> | <literal>to</literal> }
!         { <replaceable class="parameter">filename</replaceable> | program <replaceable class="parameter">command</replaceable> | stdin | stdout | pstdin | pstdout }
          [ [ with ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]</literal></term>
  
          <listitem>
***************
*** 846,851 **** testdb=&gt;
--- 846,863 ----
          </para>
  
          <para>
+         When <literal>program</> is specified,
+         <replaceable class="parameter">command</replaceable> is
+         directly executed by <application>psql</application>,
+         not the server, and the data from or to 
+         <replaceable class="parameter">command</replaceable> is
+         routed between the server and the client.
+         This means that the exectution privileges are those of
+         the local user, not the server, and no SQL superuser
+         privileges are required.
+         </para>
+ 
+         <para>
          The syntax of the command is similar to that of the
          <acronym>SQL</acronym> <xref linkend="sql-copy">
          command, and
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 107,112 **** typedef struct CopyStateData
--- 107,113 ----
  	QueryDesc  *queryDesc;		/* executable query to copy from */
  	List	   *attnumlist;		/* integer list of attnums to copy */
  	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
+ 	bool		is_program;		/* do I execute popen/pclose? */
  	bool		binary;			/* binary format? */
  	bool		oids;			/* include OIDs? */
  	bool		csv_mode;		/* Comma Separated Value format? */
***************
*** 276,282 **** static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
  		  const char *queryString, List *attnamelist, List *options);
  static void EndCopy(CopyState cstate);
  static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
! 			const char *filename, List *attnamelist, List *options);
  static void EndCopyTo(CopyState cstate);
  static uint64 DoCopyTo(CopyState cstate);
  static uint64 CopyTo(CopyState cstate);
--- 277,284 ----
  		  const char *queryString, List *attnamelist, List *options);
  static void EndCopy(CopyState cstate);
  static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
! 							 const char *filename, bool is_program,
! 							 List *attnamelist, List *options);
  static void EndCopyTo(CopyState cstate);
  static uint64 DoCopyTo(CopyState cstate);
  static uint64 CopyTo(CopyState cstate);
***************
*** 807,820 **** DoCopy(const CopyStmt *stmt, const char *queryString)
  		if (XactReadOnly && rel->rd_backend != MyBackendId)
  			PreventCommandIfReadOnly("COPY FROM");
  
! 		cstate = BeginCopyFrom(rel, stmt->filename,
  							   stmt->attlist, stmt->options);
  		processed = CopyFrom(cstate);	/* copy from file to database */
  		EndCopyFrom(cstate);
  	}
  	else
  	{
! 		cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
  							 stmt->attlist, stmt->options);
  		processed = DoCopyTo(cstate);	/* copy from database to file */
  		EndCopyTo(cstate);
--- 809,823 ----
  		if (XactReadOnly && rel->rd_backend != MyBackendId)
  			PreventCommandIfReadOnly("COPY FROM");
  
! 		cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
  							   stmt->attlist, stmt->options);
  		processed = CopyFrom(cstate);	/* copy from file to database */
  		EndCopyFrom(cstate);
  	}
  	else
  	{
! 		cstate = BeginCopyTo(rel, stmt->query, queryString,
! 							 stmt->filename, stmt->is_program,
  							 stmt->attlist, stmt->options);
  		processed = DoCopyTo(cstate);	/* copy from database to file */
  		EndCopyTo(cstate);
***************
*** 1382,1392 **** BeginCopy(bool is_from,
  static void
  EndCopy(CopyState cstate)
  {
! 	if (cstate->filename != NULL && FreeFile(cstate->copy_file))
! 		ereport(ERROR,
! 				(errcode_for_file_access(),
! 				 errmsg("could not close file \"%s\": %m",
! 						cstate->filename)));
  
  	MemoryContextDelete(cstate->copycontext);
  	pfree(cstate);
--- 1385,1405 ----
  static void
  EndCopy(CopyState cstate)
  {
! 	if (cstate->is_program)
! 	{
! 		if (ClosePipeStream(cstate->copy_file) == -1)
! 			ereport(ERROR,
! 					(errmsg("could not execute command \"%s\"",
! 							cstate->filename)));
! 	}
! 	else
! 	{
! 		if (cstate->filename != NULL && FreeFile(cstate->copy_file))
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not close file \"%s\": %m",
! 							cstate->filename)));
! 	}
  
  	MemoryContextDelete(cstate->copycontext);
  	pfree(cstate);
***************
*** 1400,1405 **** BeginCopyTo(Relation rel,
--- 1413,1419 ----
  			Node *query,
  			const char *queryString,
  			const char *filename,
+ 			bool  is_program,
  			List *attnamelist,
  			List *options)
  {
***************
*** 1436,1441 **** BeginCopyTo(Relation rel,
--- 1450,1461 ----
  	cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
  	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
  
+ 	cstate->is_program = is_program;
+ 	if (pipe && cstate->is_program)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("PROGRAM is not supported to stdout or from stdin")));
+ 
  	if (pipe)
  	{
  		if (whereToSendOutput != DestRemote)
***************
*** 1443,1476 **** BeginCopyTo(Relation rel,
  	}
  	else
  	{
! 		mode_t		oumask;		/* Pre-existing umask value */
! 		struct stat st;
  
! 		/*
! 		 * Prevent write to relative path ... too easy to shoot oneself in the
! 		 * foot by overwriting a database file ...
! 		 */
! 		if (!is_absolute_path(filename))
! 			ereport(ERROR,
! 					(errcode(ERRCODE_INVALID_NAME),
! 					 errmsg("relative path not allowed for COPY to file")));
  
! 		cstate->filename = pstrdup(filename);
! 		oumask = umask(S_IWGRP | S_IWOTH);
! 		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
! 		umask(oumask);
  
! 		if (cstate->copy_file == NULL)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not open file \"%s\" for writing: %m",
! 							cstate->filename)));
  
! 		fstat(fileno(cstate->copy_file), &st);
! 		if (S_ISDIR(st.st_mode))
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("\"%s\" is a directory", cstate->filename)));
  	}
  
  	MemoryContextSwitchTo(oldcontext);
--- 1463,1507 ----
  	}
  	else
  	{
! 		cstate->filename = pstrdup(filename);
  
! 		if (cstate->is_program)
! 		{
! 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errmsg("could not execute command \"%s\": %m",
! 								cstate->filename)));
! 		}
! 		else
! 		{
! 			mode_t		oumask;		/* Pre-existing umask value */
! 			struct stat st;
  
! 			/*
! 			 * Prevent write to relative path ... too easy to shoot oneself in
! 			 * the foot by overwriting a database file ...
! 			 */
! 			if (!is_absolute_path(filename))
! 				ereport(ERROR,
! 						(errcode(ERRCODE_INVALID_NAME),
! 						 errmsg("relative path not allowed for COPY to file")));
  
! 			oumask = umask(S_IWGRP | S_IWOTH);
! 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
! 			umask(oumask);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errcode_for_file_access(),
! 						 errmsg("could not open file \"%s\" for writing: %m",
! 								cstate->filename)));
  
! 			fstat(fileno(cstate->copy_file), &st);
! 			if (S_ISDIR(st.st_mode))
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("\"%s\" is a directory", cstate->filename)));
! 		}
  	}
  
  	MemoryContextSwitchTo(oldcontext);
***************
*** 2270,2275 **** CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
--- 2301,2307 ----
  CopyState
  BeginCopyFrom(Relation rel,
  			  const char *filename,
+ 			  bool	is_program,
  			  List *attnamelist,
  			  List *options)
  {
***************
*** 2367,2372 **** BeginCopyFrom(Relation rel,
--- 2399,2410 ----
  	cstate->volatile_defexprs = volatile_defexprs;
  	cstate->num_defaults = num_defaults;
  
+ 	cstate->is_program = is_program;
+ 	if (pipe && cstate->is_program)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("PROGRAM is not supported to stdout or from stdin")));
+ 
  	if (pipe)
  	{
  		if (whereToSendOutput == DestRemote)
***************
*** 2376,2397 **** BeginCopyFrom(Relation rel,
  	}
  	else
  	{
- 		struct stat st;
- 
  		cstate->filename = pstrdup(filename);
- 		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
  
! 		if (cstate->copy_file == NULL)
! 			ereport(ERROR,
! 					(errcode_for_file_access(),
! 					 errmsg("could not open file \"%s\" for reading: %m",
! 							cstate->filename)));
  
! 		fstat(fileno(cstate->copy_file), &st);
! 		if (S_ISDIR(st.st_mode))
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("\"%s\" is a directory", cstate->filename)));
  	}
  
  	if (!cstate->binary)
--- 2414,2446 ----
  	}
  	else
  	{
  		cstate->filename = pstrdup(filename);
  
! 		if (cstate->is_program)
! 		{
! 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errmsg("could not execute command \"%s\": %m",
! 								cstate->filename)));
! 		}
! 		else
! 		{
! 			struct stat st;
  
! 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
! 			if (cstate->copy_file == NULL)
! 				ereport(ERROR,
! 						(errcode_for_file_access(),
! 						 errmsg("could not open file \"%s\" for reading: %m",
! 								cstate->filename)));
! 
! 			fstat(fileno(cstate->copy_file), &st);
! 			if (S_ISDIR(st.st_mode))
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("\"%s\" is a directory", cstate->filename)));
! 		}
  	}
  
  	if (!cstate->binary)
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
***************
*** 2702,2707 **** _copyCopyStmt(const CopyStmt *from)
--- 2702,2708 ----
  	COPY_NODE_FIELD(query);
  	COPY_NODE_FIELD(attlist);
  	COPY_SCALAR_FIELD(is_from);
+ 	COPY_SCALAR_FIELD(is_program);
  	COPY_STRING_FIELD(filename);
  	COPY_NODE_FIELD(options);
  
*** a/src/backend/nodes/equalfuncs.c
--- b/src/backend/nodes/equalfuncs.c
***************
*** 1089,1094 **** _equalCopyStmt(const CopyStmt *a, const CopyStmt *b)
--- 1089,1095 ----
  	COMPARE_NODE_FIELD(query);
  	COMPARE_NODE_FIELD(attlist);
  	COMPARE_SCALAR_FIELD(is_from);
+ 	COMPARE_SCALAR_FIELD(is_program);
  	COMPARE_STRING_FIELD(filename);
  	COMPARE_NODE_FIELD(options);
  
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 379,385 **** static void processCASbits(int cas_bits, int location, const char *constrType,
  %type <boolean> opt_freeze opt_default opt_recheck
  %type <defelt>	opt_binary opt_oids copy_delimiter
  
! %type <boolean> copy_from
  
  %type <ival>	opt_column event cursor_options opt_hold opt_set_data
  %type <objtype>	reindex_type drop_type comment_type security_label_type
--- 379,385 ----
  %type <boolean> opt_freeze opt_default opt_recheck
  %type <defelt>	opt_binary opt_oids copy_delimiter
  
! %type <boolean> copy_from copy_program
  
  %type <ival>	opt_column event cursor_options opt_hold opt_set_data
  %type <objtype>	reindex_type drop_type comment_type security_label_type
***************
*** 566,572 **** static void processCASbits(int cas_bits, int location, const char *constrType,
  
  	PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION
  	PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
! 	PRIOR PRIVILEGES PROCEDURAL PROCEDURE
  
  	QUOTE
  
--- 566,572 ----
  
  	PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POSITION
  	PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
! 	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM
  
  	QUOTE
  
***************
*** 2299,2305 **** ClosePortalStmt:
--- 2299,2307 ----
   *
   *		QUERY :
   *				COPY relname [(columnList)] FROM/TO file [WITH] [(options)]
+  *				COPY relname [(columnList)] FROM/TO PROGRAM file [WITH] [(options)]
   *				COPY ( SELECT ... ) TO file [WITH] [(options)]
+  *				COPY ( SELECT ... ) TO PROGRAM file [WITH] [(options)]
   *
   *				In the preferred syntax the options are comma-separated
   *				and use generic identifiers instead of keywords.  The pre-9.0
***************
*** 2314,2327 **** ClosePortalStmt:
   *****************************************************************************/
  
  CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
! 			copy_from copy_file_name copy_delimiter opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = $3;
  					n->query = NULL;
  					n->attlist = $4;
  					n->is_from = $6;
! 					n->filename = $7;
  
  					n->options = NIL;
  					/* Concatenate user-supplied flags */
--- 2316,2330 ----
   *****************************************************************************/
  
  CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
! 			copy_from copy_program copy_file_name copy_delimiter opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = $3;
  					n->query = NULL;
  					n->attlist = $4;
  					n->is_from = $6;
! 					n->is_program = $7;
! 					n->filename = $8;
  
  					n->options = NIL;
  					/* Concatenate user-supplied flags */
***************
*** 2329,2349 **** CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
  						n->options = lappend(n->options, $2);
  					if ($5)
  						n->options = lappend(n->options, $5);
! 					if ($8)
! 						n->options = lappend(n->options, $8);
! 					if ($10)
! 						n->options = list_concat(n->options, $10);
  					$$ = (Node *)n;
  				}
! 			| COPY select_with_parens TO copy_file_name opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = NULL;
  					n->query = $2;
  					n->attlist = NIL;
  					n->is_from = false;
! 					n->filename = $4;
! 					n->options = $6;
  					$$ = (Node *)n;
  				}
  		;
--- 2332,2353 ----
  						n->options = lappend(n->options, $2);
  					if ($5)
  						n->options = lappend(n->options, $5);
! 					if ($9)
! 						n->options = lappend(n->options, $9);
! 					if ($11)
! 						n->options = list_concat(n->options, $11);
  					$$ = (Node *)n;
  				}
! 			| COPY select_with_parens TO copy_program copy_file_name opt_with copy_options
  				{
  					CopyStmt *n = makeNode(CopyStmt);
  					n->relation = NULL;
  					n->query = $2;
  					n->attlist = NIL;
  					n->is_from = false;
! 					n->is_program = $4;
! 					n->filename = $5;
! 					n->options = $7;
  					$$ = (Node *)n;
  				}
  		;
***************
*** 2353,2358 **** copy_from:
--- 2357,2367 ----
  			| TO									{ $$ = FALSE; }
  		;
  
+ copy_program:
+ 			PROGRAM									{ $$ = TRUE; }
+ 			| /* EMPTY */							{ $$ = FALSE; }
+ 		;
+ 
  /*
   * copy_file_name NULL indicates stdio is used. Whether stdin or stdout is
   * used depends on the direction. (It really doesn't make sense to copy from
***************
*** 12595,12600 **** unreserved_keyword:
--- 12604,12610 ----
  			| PRIVILEGES
  			| PROCEDURAL
  			| PROCEDURE
+ 			| PROGRAM
  			| QUOTE
  			| RANGE
  			| READ
*** a/src/backend/storage/file/fd.c
--- b/src/backend/storage/file/fd.c
***************
*** 183,188 **** static uint64 temporary_files_size = 0;
--- 183,189 ----
  typedef enum
  {
  	AllocateDescFile,
+ 	AllocateDescPipe,
  	AllocateDescDir
  } AllocateDescKind;
  
***************
*** 1539,1544 **** FreeDesc(AllocateDesc *desc)
--- 1540,1548 ----
  		case AllocateDescFile:
  			result = fclose(desc->desc.file);
  			break;
+ 		case AllocateDescPipe:
+ 			result = pclose_check(desc->desc.file);
+ 			break;
  		case AllocateDescDir:
  			result = closedir(desc->desc.dir);
  			break;
***************
*** 1583,1588 **** FreeFile(FILE *file)
--- 1587,1663 ----
  	return fclose(file);
  }
  
+ FILE *
+ OpenPipeStream(const char *command, const char *mode)
+ {
+ 	FILE	   *file;
+ 
+ 	DO_DB(elog(LOG, "OpenPipeStream: Allocated %d (%s)",
+ 			   numAllocatedDescs, command));
+ 
+ 	/*
+ 	 * The test against MAX_ALLOCATED_DESCS prevents us from overflowing
+ 	 * allocatedFiles[]; the test against max_safe_fds prevents AllocateFile
+ 	 * from hogging every one of the available FDs, which'd lead to infinite
+ 	 * looping.
+ 	 */
+ 	if (numAllocatedDescs >= MAX_ALLOCATED_DESCS ||
+ 		numAllocatedDescs >= max_safe_fds - 1)
+ 		elog(ERROR, "exceeded MAX_ALLOCATED_DESCS while trying to execute command \"%s\"",
+ 			 command);
+ 
+ TryAgain:
+ 	fflush(stdout);
+ 	fflush(stderr);
+ 	errno = 0;
+ 	if ((file = popen(command, mode)) != NULL)
+ 	{
+ 		AllocateDesc *desc = &allocatedDescs[numAllocatedDescs];
+ 
+ 		desc->kind = AllocateDescPipe;
+ 		desc->desc.file = file;
+ 		desc->create_subid = GetCurrentSubTransactionId();
+ 		numAllocatedDescs++;
+ 		return desc->desc.file;
+ 	}
+ 
+ 	if (errno == EMFILE || errno == ENFILE)
+ 	{
+ 		int			save_errno = errno;
+ 
+ 		ereport(LOG,
+ 				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ 				 errmsg("out of file descriptors: %m; release and retry")));
+ 		errno = 0;
+ 		if (ReleaseLruFile())
+ 			goto TryAgain;
+ 		errno = save_errno;
+ 	}
+ 
+ 	return NULL;
+ }
+ 
+ int
+ ClosePipeStream(FILE *file)
+ {
+ 	int			i;
+ 
+ 	DO_DB(elog(LOG, "ClosePipeStream: Allocated %d", numAllocatedDescs));
+ 
+ 	/* Remove file from list of allocated files, if it's present */
+ 	for (i = numAllocatedDescs; --i >= 0;)
+ 	{
+ 		AllocateDesc *desc = &allocatedDescs[i];
+ 
+ 		if (desc->kind == AllocateDescPipe && desc->desc.file == file)
+ 			return FreeDesc(desc);
+ 	}
+ 
+ 	/* Only get here if someone passes us a file not in allocatedDescs */
+ 	elog(WARNING, "file passed to ClosePipeStream was not obtained from OpenPipeStream");
+ 
+ 	return pclose_check(file);
+ }
  
  /*
   * Routines that want to use <dirent.h> (ie, DIR*) should use AllocateDir
*** a/src/bin/psql/copy.c
--- b/src/bin/psql/copy.c
***************
*** 52,57 **** struct copy_options
--- 52,58 ----
  	char	   *before_tofrom;	/* COPY string before TO/FROM */
  	char	   *after_tofrom;	/* COPY string after TO/FROM filename */
  	char	   *file;			/* NULL = stdin/stdout */
+ 	bool		program;		/* do I execute popen/pclose? */
  	bool		psql_inout;		/* true = use psql stdin/stdout */
  	bool		from;			/* true = FROM, false = TO */
  };
***************
*** 191,201 **** parse_slash_copy(const char *args)
  	else
  		goto error;
  
! 	token = strtokx(NULL, whitespace, NULL, "'",
! 					0, false, true, pset.encoding);
  	if (!token)
  		goto error;
  
  	if (pg_strcasecmp(token, "stdin") == 0 ||
  		pg_strcasecmp(token, "stdout") == 0)
  	{
--- 192,216 ----
  	else
  		goto error;
  
! 	token = strtokx(NULL, whitespace, NULL, NULL,
! 					0, false, false, pset.encoding);
  	if (!token)
  		goto error;
  
+ 	if (pg_strcasecmp(token, "program") == 0)
+ 	{
+ 		result->program = true;
+ 		token = strtokx(NULL, whitespace, NULL, "'",
+ 						0, false, true, pset.encoding);
+ 		if (!token)
+ 			goto error;
+ 	}
+ 	else
+ 	{
+ 		result->program = false;
+ 		strip_quotes(token, '\'', 0, pset.encoding);
+ 	}
+ 
  	if (pg_strcasecmp(token, "stdin") == 0 ||
  		pg_strcasecmp(token, "stdout") == 0)
  	{
***************
*** 212,218 **** parse_slash_copy(const char *args)
  	{
  		result->psql_inout = false;
  		result->file = pg_strdup(token);
! 		expand_tilde(&result->file);
  	}
  
  	/* Collect the rest of the line (COPY options) */
--- 227,234 ----
  	{
  		result->psql_inout = false;
  		result->file = pg_strdup(token);
! 		if (!result->program)
! 			expand_tilde(&result->file);
  	}
  
  	/* Collect the rest of the line (COPY options) */
***************
*** 256,263 **** do_copy(const char *args)
  	if (!options)
  		return false;
  
! 	/* prepare to read or write the target file */
! 	if (options->file)
  		canonicalize_path(options->file);
  
  	if (options->from)
--- 272,285 ----
  	if (!options)
  		return false;
  
! 	if (options->file == NULL && options->program)
! 	{
! 		psql_error("program is not supported to stdout/pstdout or from stdin/pstdin\n");
! 		return false;
! 	}
! 
! 	/* prepare to read or write the target file or command */
! 	if (options->file && !options->program)
  		canonicalize_path(options->file);
  
  	if (options->from)
***************
*** 265,271 **** do_copy(const char *args)
  		override_file = &pset.cur_cmd_source;
  
  		if (options->file)
! 			copystream = fopen(options->file, PG_BINARY_R);
  		else if (!options->psql_inout)
  			copystream = pset.cur_cmd_source;
  		else
--- 287,303 ----
  		override_file = &pset.cur_cmd_source;
  
  		if (options->file)
! 		{
! 			if (options->program)
! 			{
! 				fflush(stdout);
! 				fflush(stderr);
! 				errno = 0;
! 				copystream = popen(options->file, PG_BINARY_R);
! 			}
! 			else
! 				copystream = fopen(options->file, PG_BINARY_R);
! 		}
  		else if (!options->psql_inout)
  			copystream = pset.cur_cmd_source;
  		else
***************
*** 276,282 **** do_copy(const char *args)
  		override_file = &pset.queryFout;
  
  		if (options->file)
! 			copystream = fopen(options->file, PG_BINARY_W);
  		else if (!options->psql_inout)
  			copystream = pset.queryFout;
  		else
--- 308,324 ----
  		override_file = &pset.queryFout;
  
  		if (options->file)
! 		{
! 			if (options->program)
! 			{
! 				fflush(stdout);
! 				fflush(stderr);
! 				errno = 0;
! 				copystream = popen(options->file, PG_BINARY_W);
! 			}
! 			else
! 				copystream = fopen(options->file, PG_BINARY_W);
! 		}
  		else if (!options->psql_inout)
  			copystream = pset.queryFout;
  		else
***************
*** 285,305 **** do_copy(const char *args)
  
  	if (!copystream)
  	{
! 		psql_error("%s: %s\n",
! 				   options->file, strerror(errno));
  		free_copy_options(options);
  		return false;
  	}
  
! 	/* make sure the specified file is not a directory */
! 	fstat(fileno(copystream), &st);
! 	if (S_ISDIR(st.st_mode))
  	{
! 		fclose(copystream);
! 		psql_error("%s: cannot copy from/to a directory\n",
! 				   options->file);
! 		free_copy_options(options);
! 		return false;
  	}
  
  	/* build the command we will send to the backend */
--- 327,354 ----
  
  	if (!copystream)
  	{
! 		if (options->program)
! 			psql_error("could not execute command \"%s\": %s\n",
! 					   options->file, strerror(errno));
! 		else
! 			psql_error("%s: %s\n",
! 					   options->file, strerror(errno));
  		free_copy_options(options);
  		return false;
  	}
  
! 	if (!options->program)
  	{
! 		/* make sure the specified file is not a directory */
! 		fstat(fileno(copystream), &st);
! 		if (S_ISDIR(st.st_mode))
! 		{
! 			fclose(copystream);
! 			psql_error("%s: cannot copy from/to a directory\n",
! 					   options->file);
! 			free_copy_options(options);
! 			return false;
! 		}
  	}
  
  	/* build the command we will send to the backend */
***************
*** 322,331 **** do_copy(const char *args)
  
  	if (options->file != NULL)
  	{
! 		if (fclose(copystream) != 0)
  		{
! 			psql_error("%s: %s\n", options->file, strerror(errno));
! 			success = false;
  		}
  	}
  	free_copy_options(options);
--- 371,392 ----
  
  	if (options->file != NULL)
  	{
! 		if (options->program)
! 		{
! 			if (pclose_check(copystream) == -1)
! 			{
! 				psql_error("could not execute command \"%s\"\n",
! 						   options->file);
! 				success = false;
! 			}
! 		}
! 		else
  		{
! 			if (fclose(copystream) != 0)
! 			{
! 				psql_error("%s: %s\n", options->file, strerror(errno));
! 				success = false;
! 			}
  		}
  	}
  	free_copy_options(options);
*** a/src/bin/psql/stringutils.c
--- b/src/bin/psql/stringutils.c
***************
*** 13,21 ****
  #include "stringutils.h"
  
  
- static void strip_quotes(char *source, char quote, char escape, int encoding);
- 
- 
  /*
   * Replacement for strtok() (a.k.a. poor man's flex)
   *
--- 13,18 ----
***************
*** 239,245 **** strtokx(const char *s,
   *
   * Note that the source string is overwritten in-place.
   */
! static void
  strip_quotes(char *source, char quote, char escape, int encoding)
  {
  	char	   *src;
--- 236,242 ----
   *
   * Note that the source string is overwritten in-place.
   */
! void
  strip_quotes(char *source, char quote, char escape, int encoding)
  {
  	char	   *src;
*** a/src/bin/psql/stringutils.h
--- b/src/bin/psql/stringutils.h
***************
*** 19,24 **** extern char *strtokx(const char *s,
--- 19,26 ----
  		bool del_quotes,
  		int encoding);
  
+ extern void strip_quotes(char *source, char quote, char escape, int encoding);
+ 
  extern char *quote_if_needed(const char *source, const char *entails_quote,
  				char quote, char escape, int encoding);
  
*** a/src/include/commands/copy.h
--- b/src/include/commands/copy.h
***************
*** 25,31 **** extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
  
  extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options);
  extern CopyState BeginCopyFrom(Relation rel, const char *filename,
! 			  List *attnamelist, List *options);
  extern void EndCopyFrom(CopyState cstate);
  extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
  			 Datum *values, bool *nulls, Oid *tupleOid);
--- 25,31 ----
  
  extern void ProcessCopyOptions(CopyState cstate, bool is_from, List *options);
  extern CopyState BeginCopyFrom(Relation rel, const char *filename,
! 			  bool is_program, List *attnamelist, List *options);
  extern void EndCopyFrom(CopyState cstate);
  extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
  			 Datum *values, bool *nulls, Oid *tupleOid);
*** a/src/include/nodes/parsenodes.h
--- b/src/include/nodes/parsenodes.h
***************
*** 1397,1402 **** typedef struct CopyStmt
--- 1397,1403 ----
  	List	   *attlist;		/* List of column names (as Strings), or NIL
  								 * for all columns */
  	bool		is_from;		/* TO or FROM */
+ 	bool		is_program;		/* do I execute popen/pclose? */
  	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
  	List	   *options;		/* List of DefElem nodes */
  } CopyStmt;
*** a/src/include/parser/kwlist.h
--- b/src/include/parser/kwlist.h
***************
*** 292,297 **** PG_KEYWORD("prior", PRIOR, UNRESERVED_KEYWORD)
--- 292,298 ----
  PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD)
  PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD)
  PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD)
+ PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD)
  PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD)
  PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD)
  PG_KEYWORD("read", READ, UNRESERVED_KEYWORD)
*** a/src/include/storage/fd.h
--- b/src/include/storage/fd.h
***************
*** 79,84 **** extern char *FilePathName(File file);
--- 79,88 ----
  extern FILE *AllocateFile(const char *name, const char *mode);
  extern int	FreeFile(FILE *file);
  
+ /* Operations that allow use of pipe stream */
+ extern FILE *OpenPipeStream(const char *command, const char *mode);
+ extern int	ClosePipeStream(FILE *file);
+ 
  /* Operations to allow use of the <dirent.h> library routines */
  extern DIR *AllocateDir(const char *dirname);
  extern struct dirent *ReadDir(DIR *dir, const char *dirname);
*** a/src/interfaces/ecpg/preproc/ecpg.addons
--- b/src/interfaces/ecpg/preproc/ecpg.addons
***************
*** 192,206 **** ECPG: where_or_current_clauseWHERECURRENT_POFcursor_name block
  		char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4;
  		$$ = cat_str(2,mm_strdup("where current of"), cursor_marker);
  	}
! ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromcopy_file_namecopy_delimiteropt_withcopy_options addon
! 			if (strcmp($6, "to") == 0 && strcmp($7, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
! 			else if (strcmp($6, "from") == 0 && strcmp($7, "stdout") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY FROM STDOUT is not possible");
! 			else if (strcmp($6, "from") == 0 && strcmp($7, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");
! ECPG: CopyStmtCOPYselect_with_parensTOcopy_file_nameopt_withcopy_options addon
! 			if (strcmp($4, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
  ECPG: var_valueNumericOnly addon
  		if ($1[0] == '$')
--- 192,214 ----
  		char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4;
  		$$ = cat_str(2,mm_strdup("where current of"), cursor_marker);
  	}
! ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listopt_oidscopy_fromcopy_programcopy_file_namecopy_delimiteropt_withcopy_options addon
! 			if (strcmp($7, "program") == 0 && strcmp($8, "stdin") == 0)
! 				mmerror(PARSE_ERROR, ET_ERROR, "PROGRAM STDIN is not possible");
! 			else if (strcmp($7, "program") == 0 && strcmp($8, "stdout") == 0)
! 				mmerror(PARSE_ERROR, ET_ERROR, "PROGRAM STDOUT is not possible");
! 			else if (strcmp($6, "to") == 0 && strcmp($8, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
! 			else if (strcmp($6, "from") == 0 && strcmp($8, "stdout") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY FROM STDOUT is not possible");
! 			else if (strcmp($6, "from") == 0 && strcmp($8, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");
! ECPG: CopyStmtCOPYselect_with_parensTOcopy_programcopy_file_nameopt_withcopy_options addon
! 			if (strcmp($4, "program") == 0 && strcmp($5, "stdin") == 0)
! 				mmerror(PARSE_ERROR, ET_ERROR, "PROGRAM STDIN is not possible");
! 			else if (strcmp($4, "program") == 0 && strcmp($5, "stdout") == 0)
! 				mmerror(PARSE_ERROR, ET_ERROR, "PROGRAM STDOUT is not possible");
! 			else if (strcmp($5, "stdin") == 0)
  				mmerror(PARSE_ERROR, ET_ERROR, "COPY TO STDIN is not possible");
  ECPG: var_valueNumericOnly addon
  		if ($1[0] == '$')