[PATCH 0/4] COPY to a UDF: "COPY ... TO FUNCTION ..."

Started by Daniel Farinaover 16 years ago94 messageshackers
Jump to latest
#1Daniel Farina
dfarina@truviso.com

I have extended COPY to support using a UDF as a target instead of the
normal 'file' or STDOUT targets. This dovetails nicely with a couple
of extensions I have also written for dblink for the purposes of
enabling direct cross-node bulk loading and replication. Please
peruse the patches (the non-test containing patches also possess
robust human-readable summaries and explanations) that are In-Reply-To
this email for more details.

You can also get these patches from a git repo. These patches are
applied against the history tracked by git.postgresql.org:

git fetch http://fdr.lolrus.org/postgresql.git \
copy-to-function:copy-to-function
git checkout copy-to-function

While the functionality exposed in these patches has appeared robust
and efficient to us at Truviso, the code had ease-of-upstream merging
as a central design point, and as such I have shied away from adding
more invasive functionality that would make the interface less
byzantine/more usable. This was intended to be the most surgical cut
before it seemed likely that this might be interesting to the
PostgreSQL project.

At least one additional datapoint of someone else wanting such a
functionality is seen in this thread:

http://archives.postgresql.org/pgsql-hackers/2009-08/msg00428.php

Open Issues:

* setup/copy/teardown and error handling: as-is it is unhealthily
tempting to use a global variable (as seen in the dblink patches)
to track state between setup/copy/cleanup steps. I'm not sure what
the right aesthetic is to make this a little more controlled than
calling specific functions in exactly the right order.

* 'transition state': similar to an aggregate, it may make sense for
the target of TO FUNCTION to have a context in which it can stash
state, or at least have access to a few constant parameters as it
accepts records. If such functionality existed one might be able
to conveniently rewrite the current COPY ... TO (STDOUT|'file')
behavior to be syntactic sugar for TO FUNCTION behavior, which is
somewhat aesthetically pleasing to me.

* It might be interesting to increase the symmetry of this operation
allowing COPY to bulk load into UDFs. With that in mind, the
design the interfaces may change...or not.

This work is released under the BSD license as utilized by the
PostgreSQL project. The copyright owner is Truviso, Inc in 2009.

Daniel Farina (4):
Add "COPY ... TO FUNCTION ..." support
Add tests for "COPY ... TO FUNCTION ..."
Add dblink functions for use with COPY ... TO FUNCTION ...
Add tests to dblink covering use of COPY TO FUNCTION

contrib/dblink/dblink.c | 190 ++++++++++++++++++++++++
contrib/dblink/dblink.h | 5 +
contrib/dblink/dblink.sql.in | 20 +++
contrib/dblink/expected/dblink.out | 272 +++++++++++++++++++++++++++++++++++
contrib/dblink/sql/dblink.sql | 112 ++++++++++++++
contrib/dblink/uninstall_dblink.sql | 8 +
src/backend/catalog/namespace.c | 21 +++
src/backend/commands/copy.c | 190 +++++++++++++++++++++----
src/backend/executor/spi.c | 2 +-
src/backend/nodes/copyfuncs.c | 2 +-
src/backend/nodes/equalfuncs.c | 2 +-
src/backend/parser/gram.y | 30 +++--
src/include/catalog/namespace.h | 1 +
src/include/nodes/parsenodes.h | 3 +-
src/test/regress/input/copy.source | 38 +++++
src/test/regress/output/copy.source | 69 +++++++++
src/test/regress/regress.c | 56 +++++++
17 files changed, 982 insertions(+), 39 deletions(-)

#2Daniel Farina
dfarina@truviso.com
In reply to: Daniel Farina (#1)
[PATCH 1/4] Add "COPY ... TO FUNCTION ..." support

This relatively small change enables all sort of new and shiny evil by
allowing specification of a function to COPY that accepts each
produced row's content in turn. The function must accept a single
INTERNAL-type argument, which is actually of the type StringInfo.

Patch highlights:

- Grammar production changes to enable "TO FUNCTION <qualified name>"

- A new enumeration value in CopyDest to indicate this new mode
called COPY_FN.

- CopyStateData's "filename" field has been renamed "destination"
and is now a Node type. Before it was either a string or NULL;
now it may be a RangeVar, a string, or NULL. Some code now has to
go through some minor strVal() unboxing for the regular TO '/file'
behavior.

- Due to the relatively restricted way this function can be called
it was possible to reduce per-row overhead by computing the
FunctionCallInfo once and then reusing it, as opposed to simply
using one of the convenience functions in the fmgr.

- Add and expose the makeNameListFromRangeVar procedure to
src/catalog/namespace.c, the inverse of makeRangeVarFromNameList.

Signed-off-by: Daniel Farina <dfarina@truviso.com>
---
src/backend/catalog/namespace.c | 21 +++++
src/backend/commands/copy.c | 190 +++++++++++++++++++++++++++++++++-----
src/backend/executor/spi.c | 2 +-
src/backend/nodes/copyfuncs.c | 2 +-
src/backend/nodes/equalfuncs.c | 2 +-
src/backend/parser/gram.y | 30 ++++--
src/include/catalog/namespace.h | 1 +
src/include/nodes/parsenodes.h | 3 +-
8 files changed, 212 insertions(+), 39 deletions(-)

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 99c9140..8911e29 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -2467,6 +2467,27 @@ QualifiedNameGetCreationNamespace(List *names, char **objname_p)
 }
 /*
+ * makeNameListFromRangeVar
+ *		Utility routine to convert a qualified-name list into RangeVar form.
+ */
+List *
+makeNameListFromRangeVar(RangeVar *rangevar)
+{
+	List *names = NIL;
+
+	Assert(rangevar->relname != NULL);
+	names = lcons(makeString(rangevar->relname), names);
+
+	if (rangevar->schemaname != NULL)
+		names = lcons(makeString(rangevar->schemaname), names);
+
+	if (rangevar->catalogname != NULL)
+		names = lcons(makeString(rangevar->catalogname), names);
+
+	return names;
+}
+
+/*
  * makeRangeVarFromNameList
  *		Utility routine to convert a qualified-name list into RangeVar form.
  */
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5e95fd7..985505a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/planner.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
@@ -55,7 +56,8 @@ typedef enum CopyDest
 {
 	COPY_FILE,					/* to/from file */
 	COPY_OLD_FE,				/* to/from frontend (2.0 protocol) */
-	COPY_NEW_FE					/* to/from frontend (3.0 protocol) */
+	COPY_NEW_FE,				/* to/from frontend (3.0 protocol) */
+	COPY_FN						/* to function */
 } CopyDest;
 /*
@@ -104,7 +106,8 @@ typedef struct CopyStateData
 	Relation	rel;			/* relation to copy to or from */
 	QueryDesc  *queryDesc;		/* executable query to copy from */
 	List	   *attnumlist;		/* integer list of attnums to copy */
-	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
+	Node	   *destination;	/* filename, or NULL for STDIN/STDOUT, or a
+								 * function */
 	bool		binary;			/* binary format? */
 	bool		oids;			/* include OIDs? */
 	bool		csv_mode;		/* Comma Separated Value format? */
@@ -131,6 +134,13 @@ typedef struct CopyStateData
 	MemoryContext rowcontext;	/* per-row evaluation context */
 	/*
+	 * For writing rows out to a function. Used if copy_dest == COPY_FN
+	 *
+	 * Avoids repeated use of DirectFunctionCall for efficiency.
+	 */
+	FunctionCallInfoData	output_fcinfo;
+
+	/*
 	 * These variables are used to reduce overhead in textual COPY FROM.
 	 *
 	 * attribute_buf holds the separated, de-escaped text for each field of
@@ -425,9 +435,11 @@ CopySendEndOfRow(CopyState cstate)
 {
 	StringInfo	fe_msgbuf = cstate->fe_msgbuf;
+	/* Take care adding row delimiters*/
 	switch (cstate->copy_dest)
 	{
 		case COPY_FILE:
+		case COPY_FN:
 			if (!cstate->binary)
 			{
 				/* Default line termination depends on platform */
@@ -437,6 +449,18 @@ CopySendEndOfRow(CopyState cstate)
 				CopySendString(cstate, "\r\n");
 #endif
 			}
+			break;
+		case COPY_NEW_FE:
+		case COPY_OLD_FE:
+			/* The FE/BE protocol uses \n as newline for all platforms */
+			if (!cstate->binary)
+				CopySendChar(cstate, '\n');
+			break;
+	}
+
+	switch (cstate->copy_dest)
+	{
+		case COPY_FILE:
 			(void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
 						  1, cstate->copy_file);
@@ -446,10 +470,6 @@ CopySendEndOfRow(CopyState cstate)
 						 errmsg("could not write to COPY file: %m")));
 			break;
 		case COPY_OLD_FE:
-			/* The FE/BE protocol uses \n as newline for all platforms */
-			if (!cstate->binary)
-				CopySendChar(cstate, '\n');
-
 			if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
 			{
 				/* no hope of recovering connection sync, so FATAL */
@@ -459,13 +479,19 @@ CopySendEndOfRow(CopyState cstate)
 			}
 			break;
 		case COPY_NEW_FE:
-			/* The FE/BE protocol uses \n as newline for all platforms */
-			if (!cstate->binary)
-				CopySendChar(cstate, '\n');
-
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_FN:
+			FunctionCallInvoke(&cstate->output_fcinfo);
+
+			/*
+			 * These are set earlier and are not supposed to change row to row.
+			 */
+			Assert(cstate->output_fcinfo.arg[0] ==
+				   PointerGetDatum(cstate->fe_msgbuf));
+			Assert(!cstate->output_fcinfo.argnull[0]);
+			break;
 	}
 	resetStringInfo(fe_msgbuf);
@@ -577,6 +603,12 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
 				bytesread += avail;
 			}
 			break;
+		case COPY_FN:
+			/*
+			 * Should be disallowed by some prior step
+			 */
+			Assert(false);
+			break;
 	}
 	return bytesread;
@@ -719,7 +751,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 {
 	CopyState	cstate;
 	bool		is_from = stmt->is_from;
-	bool		pipe = (stmt->filename == NULL);
+	bool		pipe = (stmt->destination == NULL);
 	List	   *attnamelist = stmt->attlist;
 	List	   *force_quote = NIL;
 	List	   *force_notnull = NIL;
@@ -986,6 +1018,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 				 errhint("Anyone can COPY to stdout or from stdin. "
 						 "psql's \\copy command also works for anyone.")));
+	/* Disallow COPY ... FROM FUNCTION (only TO FUNCTION supported) */
+	if (is_from && cstate->destination != NULL &&
+		IsA(cstate->destination, RangeVar))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("COPY FROM does not support functions as sources")));
+
+
 	if (stmt->relation)
 	{
 		Assert(!stmt->query);
@@ -1183,7 +1223,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
 	cstate->copy_dest = COPY_FILE;		/* default */
-	cstate->filename = stmt->filename;
+	cstate->destination = stmt->destination;
 	if (is_from)
 		CopyFrom(cstate);		/* copy from file to database */
@@ -1225,7 +1265,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 static void
 DoCopyTo(CopyState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->destination == NULL);
 	if (cstate->rel)
 	{
@@ -1257,37 +1297,128 @@ DoCopyTo(CopyState cstate)
 		else
 			cstate->copy_file = stdout;
 	}
-	else
+	else if (IsA(cstate->destination, String))
 	{
 		mode_t		oumask;		/* Pre-existing umask value */
 		struct stat st;
+		char	   *dest_filename = strVal(cstate->destination);
 		/*
 		 * Prevent write to relative path ... too easy to shoot oneself in the
 		 * foot by overwriting a database file ...
 		 */
-		if (!is_absolute_path(cstate->filename))
+		if (!is_absolute_path(dest_filename))
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_NAME),
 					 errmsg("relative path not allowed for COPY to file")));
 		oumask = umask((mode_t) 022);
-		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+		cstate->copy_file = AllocateFile(dest_filename, PG_BINARY_W);
 		umask(oumask);

if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
- cstate->filename)));
+ dest_filename)));

 		fstat(fileno(cstate->copy_file), &st);
 		if (S_ISDIR(st.st_mode))
 			ereport(ERROR,
 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-					 errmsg("\"%s\" is a directory", cstate->filename)));
+					 errmsg("\"%s\" is a directory", dest_filename)));
 	}
+	/* Branch taken in the "COPY ... TO FUNCTION funcname" situation */
+	else if (IsA(cstate->destination, RangeVar))
+	{
+		List			*names;
+		FmgrInfo		*flinfo;
+		FuncDetailCode	 fdresult;
+		Oid				 funcid;
+		Oid				 rettype;
+		bool			 retset;
+		int				 nvargs;
+		Oid				*true_typeids;
+		const int		 nargs		= 1;
+		Oid				 argtypes[]	= { INTERNALOID };
+
+		/* Flip copy-action dispatch flag */
+		cstate->copy_dest = COPY_FN;
+
+		/* Make an fcinfo that can be reused and is stored on the cstate. */
+		names = makeNameListFromRangeVar((RangeVar *) cstate->destination);
+		flinfo  = palloc0(sizeof *flinfo);
+
+
+		fdresult = func_get_detail(names, NIL, NIL, nargs, argtypes, false,
+								   false,
+
+								   /* Begin out-arguments */
+								   &funcid, &rettype, &retset, &nvargs,
+								   &true_typeids, NULL);
+
+		/*
+		 * Check to ensure that this is a "normal" function when looked up,
+		 * otherwise error.
+		 */
+		switch (fdresult)
+		{
+			/* Normal function found; do nothing */
+			case FUNCDETAIL_NORMAL:
+				break;
+
+			case FUNCDETAIL_NOTFOUND:
+				ereport(ERROR,
+						(errcode(ERRCODE_UNDEFINED_FUNCTION),
+						 errmsg("function %s does not exist",
+								func_signature_string(names, nargs, NIL,
+													  argtypes))));
+				break;
+
+			case FUNCDETAIL_AGGREGATE:
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						 errmsg("function %s must not be an aggregate",
+								func_signature_string(names, nargs, NIL,
+													  argtypes))));
+				break;
+
+			case FUNCDETAIL_WINDOWFUNC:
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						 errmsg("function %s must not be a window function",
+								func_signature_string(names, nargs, NIL,
+													  argtypes))));
+				break;
+
+			case FUNCDETAIL_COERCION:
+				/*
+				 * Should never be yielded from func_get_detail if it is passed
+				 * fargs == NIL, as it is previously.
+				 */
+				Assert(false);
+				break;
+
+			case FUNCDETAIL_MULTIPLE:
+				/*
+				 * Only support one signature, thus overloading of a name with
+				 * different types should never occur.
+				 */
+				Assert(false);
+				break;
+
+		}
+
+		fmgr_info(funcid, flinfo);
+		InitFunctionCallInfoData(cstate->output_fcinfo, flinfo,
+								 1, NULL, NULL);
+	}
+	else
+		/* Unexpected type was found for cstate->destination. */
+		Assert(false);
+
+
 	PG_TRY();
 	{
 		if (cstate->fe_copy)
@@ -1310,13 +1441,13 @@ DoCopyTo(CopyState cstate)
 	}
 	PG_END_TRY();
-	if (!pipe)
+	if (!pipe && cstate->copy_dest != COPY_FN)
 	{
 		if (FreeFile(cstate->copy_file))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not write to file \"%s\": %m",
-							cstate->filename)));
+							strVal(cstate->destination))));
 	}
 }

@@ -1342,6 +1473,13 @@ CopyTo(CopyState cstate)
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();

+	/*
+	 * fe_msgbuf is never rebound, so there is only a need to set up the
+	 * output_fcinfo once.
+	 */
+	cstate->output_fcinfo.arg[0] = PointerGetDatum(cstate->fe_msgbuf);
+	cstate->output_fcinfo.argnull[0] = false;
+
 	/* Get info about the columns we need to process. */
 	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
 	foreach(cur, cstate->attnumlist)
@@ -1668,7 +1806,7 @@ limit_printout_length(const char *str)
 static void
 CopyFrom(CopyState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->destination == NULL);
 	HeapTuple	tuple;
 	TupleDesc	tupDesc;
 	Form_pg_attribute *attr;
@@ -1768,19 +1906,21 @@ CopyFrom(CopyState cstate)
 	{
 		struct stat st;
-		cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+		cstate->copy_file = AllocateFile(strVal(cstate->destination),
+										 PG_BINARY_R);

if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
- cstate->filename)));
+ strVal(cstate->destination))));

 		fstat(fileno(cstate->copy_file), &st);
 		if (S_ISDIR(st.st_mode))
 			ereport(ERROR,
 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-					 errmsg("\"%s\" is a directory", cstate->filename)));
+					 errmsg("\"%s\" is a directory",
+							strVal(cstate->destination))));
 	}

tupDesc = RelationGetDescr(cstate->rel);
@@ -2215,7 +2355,7 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from file \"%s\": %m",
- cstate->filename)));
+ strVal(cstate->destination))));
}

 	/*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index f045f9c..0914dc9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1829,7 +1829,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				{
 					CopyStmt   *cstmt = (CopyStmt *) stmt;
-					if (cstmt->filename == NULL)
+					if (cstmt->destination == NULL)
 					{
 						my_res = SPI_ERROR_COPY;
 						goto fail;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 8bc72d1..9b39abe 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2485,7 +2485,7 @@ _copyCopyStmt(CopyStmt *from)
 	COPY_NODE_FIELD(query);
 	COPY_NODE_FIELD(attlist);
 	COPY_SCALAR_FIELD(is_from);
-	COPY_STRING_FIELD(filename);
+	COPY_NODE_FIELD(destination);
 	COPY_NODE_FIELD(options);
 	return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3d65d8b..6ddf226 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1085,7 +1085,7 @@ _equalCopyStmt(CopyStmt *a, CopyStmt *b)
 	COMPARE_NODE_FIELD(query);
 	COMPARE_NODE_FIELD(attlist);
 	COMPARE_SCALAR_FIELD(is_from);
-	COMPARE_STRING_FIELD(filename);
+	COMPARE_NODE_FIELD(destination);
 	COMPARE_NODE_FIELD(options);
 	return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130e6f4..23331ee 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -251,8 +251,7 @@ static TypeName *TableFuncTypeName(List *columns);
 %type <value>	TriggerFuncArg
 %type <node>	TriggerWhen
-%type <str>		copy_file_name
-				database_name access_method_clause access_method attr_name
+%type <str>		database_name access_method_clause access_method attr_name
 				index_name name cursor_name file_name cluster_index_specification

%type <list> func_name handler_name qual_Op qual_all_Op subquery_Op
@@ -433,6 +432,8 @@ static TypeName *TableFuncTypeName(List *columns);
%type <ival> opt_frame_clause frame_extent frame_bound

+%type <node> copy_file_or_function_name
+
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
* They must be listed first so that their numeric codes do not depend on
@@ -1977,14 +1978,15 @@ ClosePortalStmt:
*****************************************************************************/

 CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
-			copy_from copy_file_name copy_delimiter opt_with copy_options
+			copy_from copy_file_or_function_name copy_delimiter opt_with
+			copy_options
 				{
 					CopyStmt *n = makeNode(CopyStmt);
 					n->relation = $3;
 					n->query = NULL;
 					n->attlist = $4;
 					n->is_from = $6;
-					n->filename = $7;
+					n->destination = $7;
 					n->options = NIL;
 					/* Concatenate user-supplied flags */
@@ -1998,14 +2000,15 @@ CopyStmt:	COPY opt_binary qualified_name opt_column_list opt_oids
 						n->options = list_concat(n->options, $10);
 					$$ = (Node *)n;
 				}
-			| COPY select_with_parens TO copy_file_name opt_with copy_options
+			| COPY select_with_parens TO copy_file_or_function_name
+			  opt_with copy_options
 				{
 					CopyStmt *n = makeNode(CopyStmt);
 					n->relation = NULL;
 					n->query = $2;
 					n->attlist = NIL;
 					n->is_from = false;
-					n->filename = $4;
+					n->destination = $4;
 					n->options = $6;
 					$$ = (Node *)n;
 				}
@@ -2021,10 +2024,17 @@ copy_from:
  * used depends on the direction. (It really doesn't make sense to copy from
  * stdout. We silently correct the "typo".)		 - AY 9/94
  */
-copy_file_name:
-			Sconst									{ $$ = $1; }
-			| STDIN									{ $$ = NULL; }
-			| STDOUT								{ $$ = NULL; }
+copy_file_or_function_name:
+			Sconst							{ $$ = (Node *) makeString($1); }
+
+			/*
+			 * Note that func_name is not used here because there is no need to
+			 * accept the "funcname(TYPES)" construction, as there is only one
+			 * valid signature.
+			 */
+			| FUNCTION qualified_name		{ $$ = (Node *) $2; }
+			| STDIN							{ $$ = NULL; }
+			| STDOUT						{ $$ = NULL; }
 		;
 copy_options: copy_opt_list							{ $$ = $1; }
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index d356635..1d801cd 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -94,6 +94,7 @@ extern Oid	LookupExplicitNamespace(const char *nspname);
 extern Oid	LookupCreationNamespace(const char *nspname);
 extern Oid	QualifiedNameGetCreationNamespace(List *names, char **objname_p);
+extern List *makeNameListFromRangeVar(RangeVar *rangevar);
 extern RangeVar *makeRangeVarFromNameList(List *names);
 extern char *NameListToString(List *names);
 extern char *NameListToQuotedString(List *names);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b34300f..203088c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1293,7 +1293,8 @@ typedef struct CopyStmt
 	List	   *attlist;		/* List of column names (as Strings), or NIL
 								 * for all columns */
 	bool		is_from;		/* TO or FROM */
-	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
+	Node	   *destination;	/* filename, or NULL for STDIN/STDOUT, or a
+								 * function */
 	List	   *options;		/* List of DefElem nodes */
 } CopyStmt;

--
1.6.5.3

#3Daniel Farina
dfarina@truviso.com
In reply to: Daniel Farina (#1)
[PATCH 2/4] Add tests for "COPY ... TO FUNCTION ..."

Signed-off-by: Daniel Farina <dfarina@truviso.com>
---
src/test/regress/input/copy.source | 38 +++++++++++++++++++
src/test/regress/output/copy.source | 69 +++++++++++++++++++++++++++++++++++
src/test/regress/regress.c | 56 ++++++++++++++++++++++++++++
3 files changed, 163 insertions(+), 0 deletions(-)

diff --git a/src/test/regress/input/copy.source b/src/test/regress/input/copy.source
index 376329d..e5dcd62 100644
--- a/src/test/regress/input/copy.source
+++ b/src/test/regress/input/copy.source
@@ -107,3 +107,41 @@ this is just a line full of junk that would error out if parsed

copy copytest3 to stdout csv header;

+
+-- test copy to function
+
+CREATE FUNCTION copyto_setup_state ()
+        RETURNS void
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+
+CREATE FUNCTION copyto_function (internal)
+        RETURNS void
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+
+CREATE FUNCTION copyto_yield_len ()
+        RETURNS int4
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+
+CREATE FUNCTION copyto_yield_text ()
+        RETURNS text
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+
+CREATE FUNCTION copyto_free ()
+        RETURNS void
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+
+select copyto_setup_state();
+copy copytest to function copyto_function;
+select copyto_yield_len();
+select copyto_yield_text();
+select copyto_free();
+
+select copyto_setup_state();
+copy binary copytest to function copyto_function;
+select copyto_yield_len();
+select copyto_free();
diff --git a/src/test/regress/output/copy.source b/src/test/regress/output/copy.source
index 5a88d6e..74ea935 100644
--- a/src/test/regress/output/copy.source
+++ b/src/test/regress/output/copy.source
@@ -71,3 +71,72 @@ copy copytest3 to stdout csv header;
 c1,"col with , comma","col with "" quote"
 1,a,1
 2,b,2
+-- test copy to function
+CREATE FUNCTION copyto_setup_state ()
+        RETURNS void
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+CREATE FUNCTION copyto_function (internal)
+        RETURNS void
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+CREATE FUNCTION copyto_yield_len ()
+        RETURNS int4
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+CREATE FUNCTION copyto_yield_text ()
+        RETURNS text
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+CREATE FUNCTION copyto_free ()
+        RETURNS void
+        AS '@libdir@/regress@DLSUFFIX@'
+        LANGUAGE C;
+select copyto_setup_state();
+ copyto_setup_state 
+--------------------
+ 
+(1 row)
+
+copy copytest to function copyto_function;
+select copyto_yield_len();
+ copyto_yield_len 
+------------------
+               76
+(1 row)
+
+select copyto_yield_text();
+             copyto_yield_text             
+-------------------------------------------
+ DOS     abc\r\ndef      1                +
+ Unix    abc\ndef        2                +
+ Mac     abc\rdef        3                +
+ esc\\ape        a\\r\\\r\\\n\\nb        4+
+ 
+(1 row)
+
+select copyto_free();
+ copyto_free 
+-------------
+ 
+(1 row)
+
+select copyto_setup_state();
+ copyto_setup_state 
+--------------------
+ 
+(1 row)
+
+copy binary copytest to function copyto_function;
+select copyto_yield_len();
+ copyto_yield_len 
+------------------
+              142
+(1 row)
+
+select copyto_free();
+ copyto_free 
+-------------
+ 
+(1 row)
+
diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c
index 0e94e68..a96a085 100644
--- a/src/test/regress/regress.c
+++ b/src/test/regress/regress.c
@@ -16,6 +16,7 @@
 #include "executor/spi.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
+#include "utils/memutils.h"
 #define P_MAXDIG 12
@@ -34,6 +35,11 @@ extern char *reverse_name(char *string);
 extern int	oldstyle_length(int n, text *t);
 extern Datum int44in(PG_FUNCTION_ARGS);
 extern Datum int44out(PG_FUNCTION_ARGS);
+extern Datum copyto_free(PG_FUNCTION_ARGS);
+extern Datum copyto_function(PG_FUNCTION_ARGS);
+extern Datum copyto_setup_state(PG_FUNCTION_ARGS);
+extern Datum copyto_yield_len(PG_FUNCTION_ARGS);
+extern Datum copyto_yield_text(PG_FUNCTION_ARGS);
 #ifdef PG_MODULE_MAGIC
 PG_MODULE_MAGIC;
@@ -737,3 +743,53 @@ int44out(PG_FUNCTION_ARGS)
 	*--walk = '\0';
 	PG_RETURN_CSTRING(result);
 }
+
+/*
+ * copyto testing
+ */
+static StringInfo global_buf;
+
+PG_FUNCTION_INFO_V1(copyto_setup_state);
+
+Datum
+copyto_setup_state(PG_FUNCTION_ARGS)
+{
+	MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+	global_buf = makeStringInfo();
+	MemoryContextSwitchTo(oldcxt);
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(copyto_function);
+
+Datum
+copyto_function(PG_FUNCTION_ARGS)
+{
+	StringInfo copybuf = (void *) PG_GETARG_POINTER(0);
+	appendBinaryStringInfo(global_buf, copybuf->data, copybuf->len);
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(copyto_yield_len);
+
+Datum
+copyto_yield_len(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_INT32(global_buf->len);
+}
+
+PG_FUNCTION_INFO_V1(copyto_yield_text);
+
+Datum
+copyto_yield_text(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_DATUM(DirectFunctionCall1(textin,
+										CStringGetDatum(global_buf->data)));
+}
+
+Datum
+copyto_free(PG_FUNCTION_ARGS)
+{
+	pfree(global_buf);
+	PG_RETURN_VOID();
+}
-- 
1.6.5.3
#4Daniel Farina
dfarina@truviso.com
In reply to: Daniel Farina (#1)
[PATCH 3/4] Add dblink functions for use with COPY ... TO FUNCTION ...

This patch enables dblink to be used for the purpose of efficient
bulk-loading via COPY and libpq in combination with the COPY TO
FUNCTION patch.

The following functions were added to accomplish this:

dblink_connection_reset: useful when handling errors and one just
wants to restore a connection to a known state, rolling back as many
transactions as necessary.

dblink_copy_end: completes the COPY

dblink_copy_open: puts a connection into the COPY state. Accepts
connection name, relation name, and binary mode flag.

dblink_copy_write: writes a row to the last connection put in the COPY
state by dblink_copy_open.

Generally speaking, code that uses this will look like the following
(presuming a named connection has already been made):

try:
SELECT dblink_copy_open('myconn', 'relation_name', true);
COPY bar TO FUNCTION dblink_copy_write;

-- since the dblink connection is still in the COPY state, one
-- can even copy some more data in multiple steps...
COPY bar_2 TO FUNCTION dblink_copy_write;

SELECT dblink_copy_end();
finally:
SELECT dblink_copy_reset('myconn');

Signed-off-by: Daniel Farina <dfarina@truviso.com>
---
contrib/dblink/dblink.c | 190 +++++++++++++++++++++++++++++++++++
contrib/dblink/dblink.h | 5 +
contrib/dblink/dblink.sql.in | 20 ++++
contrib/dblink/uninstall_dblink.sql | 8 ++
4 files changed, 223 insertions(+), 0 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 72fdf56..d32aeec 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1722,6 +1722,196 @@ dblink_get_notify(PG_FUNCTION_ARGS)
  * internal functions
  */
+/*
+ * Attempts to take the connection into a known state by rolling back
+ * transactions.  If unable to restore the connection to a known idle state,
+ * raises an error.
+ */
+PG_FUNCTION_INFO_V1(dblink_connection_reset);
+Datum
+dblink_connection_reset(PG_FUNCTION_ARGS)
+{
+	PGresult	*res	   = NULL;
+	PGconn		*conn	   = NULL;
+	char		*conname   = NULL;
+	remoteConn	*rconn	   = NULL;
+
+	bool		 triedonce = false;
+
+	DBLINK_INIT;
+
+	/* must be text */
+	Assert(PG_NARGS() == 1);
+	DBLINK_GET_NAMED_CONN;
+
+	if (!conn)
+		DBLINK_CONN_NOT_AVAIL;
+
+	while (!triedonce)
+	{
+		switch (PQtransactionStatus(conn))
+		{
+			case PQTRANS_IDLE:
+				/* Everything is okay */
+				goto finish;
+			case PQTRANS_ACTIVE:
+			case PQTRANS_INTRANS:
+			case PQTRANS_INERROR:
+				res = PQexec(conn, "ROLLBACK;");
+
+				if (PQresultStatus(res) != PGRES_COMMAND_OK)
+					ereport(ERROR,
+							(errcode(ERRCODE_CONNECTION_FAILURE),
+							 errmsg("%s: could not issue ROLLBACK command",
+									PG_FUNCNAME_MACRO)));
+
+				PQclear(res);
+				triedonce = true;
+				break;
+			case PQTRANS_UNKNOWN:
+				elog(ERROR, "%s: connection in unknown transaction state",
+					 PG_FUNCNAME_MACRO);
+		}
+	}
+
+finish:
+	PG_RETURN_VOID();
+}
+
+/*
+ * dblink COPY support, procedures and variables
+ */
+static PGconn *dblink_copy_current = NULL;
+
+/*
+ * dblink_copy_open
+ *
+ * Start a COPY into a given relation on the named remote connection.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_open);
+Datum
+dblink_copy_open(PG_FUNCTION_ARGS)
+{
+	PGresult   *res = NULL;
+	PGconn	   *conn = NULL;
+	char	   *conname = NULL;
+	remoteConn *rconn = NULL;
+
+	const char	*copy_stmt	 = "COPY %s FROM STDIN%s;";
+	const char	*with_binary = " WITH BINARY";
+	const char	*quoted_remoted_relname;
+	bool		 isbinary;
+	int			 snprintf_retcode;
+
+	/*
+	 * Should be large enough to contain any formatted output.  Formed by
+	 * counting the characters in the static formatting sections plus the
+	 * bounded length of identifiers.  Some modest padding was added for
+	 * paranoia's sake, although all uses of this buffer are checked for
+	 * over-length formats anyway.
+	 */
+	char		 buf[64 + NAMEDATALEN];
+
+	DBLINK_INIT;
+
+	/* must be text,text,bool */
+	Assert(PG_NARGS() == 3);
+	DBLINK_GET_NAMED_CONN;
+
+	if (!conn)
+		DBLINK_CONN_NOT_AVAIL;
+
+	/* Read the procedure arguments into primitive values */
+	quoted_remoted_relname = NameListToQuotedString(
+		textToQualifiedNameList(PG_GETARG_TEXT_P(1)));
+	isbinary = PG_GETARG_BOOL(2);
+
+	/*
+	 * Query parameterization only handles value-parameters -- of which
+	 * identifiers are not considered one of -- so format the string the old
+	 * fashioned way.  It is very important to quote identifiers for this
+	 * reason, as performed previously.
+	 */
+	snprintf_retcode = snprintf(buf, sizeof buf, copy_stmt,
+								quoted_remoted_relname,
+								isbinary ? with_binary : "");
+
+	if (snprintf_retcode < 0)
+		elog(ERROR, "could not format dblink COPY query");
+	else if (snprintf_retcode >= sizeof buf)
+		/*
+		 * Should not be able to happen, see documentation of the "buf" value
+		 * in this procedure.
+		 */
+		elog(ERROR, "could not fit formatted dblink COPY query into buffer");
+
+	/*
+	 * Run the created query, and check to ensure that PGRES_COPY_IN state has
+	 * been achieved.
+	 */
+	res = PQexec(conn, buf);
+	if (!res || PQresultStatus(res) != PGRES_COPY_IN)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not start COPY FROM on remote node")));
+	PQclear(res);
+
+	/*
+	 * Everything went well; finally bind the global dblink_copy_current to the
+	 * connection ready to accept copy data.
+	 */
+	dblink_copy_current = conn;
+	PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
+
+/*
+ * dblink_copy_write
+ *
+ * Write the provided StringInfo to the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_write);
+Datum
+dblink_copy_write(PG_FUNCTION_ARGS)
+{
+	StringInfo copybuf = (void *) PG_GETARG_POINTER(0);
+
+	if (PQputCopyData(dblink_copy_current, copybuf->data, copybuf->len) != 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_EXCEPTION),
+				 errmsg("could not send row to remote node")));
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * dblink_copy_end
+ *
+ * Finish the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_end);
+Datum
+dblink_copy_end(PG_FUNCTION_ARGS)
+{
+	PGresult   *res;
+
+	/* Check to ensure that termination data was sent successfully */
+	if (PQputCopyEnd(dblink_copy_current, NULL) != 1)
+		elog(ERROR, "COPY end failed");
+
+	do
+	{
+		res = PQgetResult(dblink_copy_current);
+		if (res == NULL)
+			break;
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			elog(ERROR, "COPY failed: %s",
+				 PQerrorMessage(dblink_copy_current));
+		PQclear(res);
+	} while (true);
+
+	dblink_copy_current = NULL;
+	PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
 /*
  * get_pkey_attnames
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
index 255f5d0..8a2faee 100644
--- a/contrib/dblink/dblink.h
+++ b/contrib/dblink/dblink.h
@@ -59,4 +59,9 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
 extern Datum dblink_current_query(PG_FUNCTION_ARGS);
 extern Datum dblink_get_notify(PG_FUNCTION_ARGS);
+extern Datum dblink_connection_reset(PG_FUNCTION_ARGS);
+
+extern Datum dblink_copy_open(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_write(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_end(PG_FUNCTION_ARGS);
 #endif   /* DBLINK_H */
diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in
index da5dd65..aedca34 100644
--- a/contrib/dblink/dblink.sql.in
+++ b/contrib/dblink/dblink.sql.in
@@ -221,3 +221,23 @@ CREATE OR REPLACE FUNCTION dblink_get_notify(
 RETURNS setof record
 AS 'MODULE_PATHNAME', 'dblink_get_notify'
 LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_connection_reset (text)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_connection_reset'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_open (text, text, boolean)
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_open'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_write (internal)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_copy_write'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_end ()
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_end'
+LANGUAGE C STRICT;
diff --git a/contrib/dblink/uninstall_dblink.sql b/contrib/dblink/uninstall_dblink.sql
index 45cf13c..465beb7 100644
--- a/contrib/dblink/uninstall_dblink.sql
+++ b/contrib/dblink/uninstall_dblink.sql
@@ -11,6 +11,14 @@ DROP FUNCTION dblink_build_sql_delete (text, int2vector, int4, _text);

DROP FUNCTION dblink_build_sql_insert (text, int2vector, int4, _text, _text);

+DROP FUNCTION dblink_copy_end ();
+
+DROP FUNCTION dblink_copy_open (text, text, boolean);
+
+DROP FUNCTION dblink_copy_write (internal);
+
+DROP FUNCTION dblink_connection_reset (text);
+
 DROP FUNCTION dblink_get_pkey (text);

DROP TYPE dblink_pkey_results;
--
1.6.5.3

#5Daniel Farina
dfarina@truviso.com
In reply to: Daniel Farina (#1)
[PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Signed-off-by: Daniel Farina <dfarina@truviso.com>
---
contrib/dblink/expected/dblink.out | 272 ++++++++++++++++++++++++++++++++++++
contrib/dblink/sql/dblink.sql | 112 +++++++++++++++
2 files changed, 384 insertions(+), 0 deletions(-)

diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index d39aa45..788b2a3 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -872,6 +872,278 @@ SELECT * from dblink_get_notify();
 -------------+--------+-------
 (0 rows)
+-- test COPY ... TO FUNCTION support
+CREATE SCHEMA dblink_copy_to_function;
+SET search_path = dblink_copy_to_function, public;
+CREATE TABLE xyzzy(f1 int, f2 text, f3 text[], primary key (f1,f2));
+NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "xyzzy_pkey" for table "xyzzy"
+INSERT INTO xyzzy VALUES (0,'a','{"a0","b0","c0"}');
+INSERT INTO xyzzy VALUES (1,'b','{"a1","b1","c1"}');
+INSERT INTO xyzzy VALUES (2,'c','{"a2","b2","c2"}');
+INSERT INTO xyzzy VALUES (3,'d','{"a3","b3","c3"}');
+INSERT INTO xyzzy VALUES (4,'e','{"a4","b4","c4"}');
+INSERT INTO xyzzy VALUES (5,'f','{"a5","b5","c5"}');
+INSERT INTO xyzzy VALUES (6,'g','{"a6","b6","c6"}');
+INSERT INTO xyzzy VALUES (7,'h','{"a7","b7","c7"}');
+INSERT INTO xyzzy VALUES (8,'i','{"a8","b8","c8"}');
+INSERT INTO xyzzy VALUES (9,'j','{"a9","b9","c9"}');
+CREATE TABLE bar(f1 int, f2 text, f3 text[], primary key (f1,f2));
+NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "bar_pkey" for table "bar"
+INSERT INTO bar VALUES (100,'w','{"a100","b100","c100"}');
+INSERT INTO bar VALUES (101,'x','{"a101","b101","c101"}');
+CREATE TABLE baz(f1 int, f2 text, f3 text[], primary key (f1,f2));
+NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "baz_pkey" for table "baz"
+INSERT INTO baz VALUES (102,'y','{"a102","b102","c102"}');
+INSERT INTO baz VALUES (103,'z','{"a103","b103","c103"}');
+CREATE TABLE plugh(f1 int, f2 text, f3 text[], primary key (f1,f2));
+NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "plugh_pkey" for table "plugh"
+INSERT INTO plugh VALUES (104,'u','{"a102","b102","c102"}');
+INSERT INTO plugh VALUES (105,'v','{"a103","b103","c103"}');
+SELECT dblink_connect('copytofunction','dbname=contrib_regression');
+ dblink_connect 
+----------------
+ OK
+(1 row)
+
+SELECT dblink_exec('copytofunction',
+       'SET search_path = dblink_copy_to_function, public;');
+ dblink_exec 
+-------------
+ SET
+(1 row)
+
+-- ensure that original base data is present
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+ a | b |     c      
+---+---+------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+ 3 | d | {a3,b3,c3}
+ 4 | e | {a4,b4,c4}
+ 5 | f | {a5,b5,c5}
+ 6 | g | {a6,b6,c6}
+ 7 | h | {a7,b7,c7}
+ 8 | i | {a8,b8,c8}
+ 9 | j | {a9,b9,c9}
+(10 rows)
+
+-- try doing a few consecutive copies with one open connection
+SELECT dblink_copy_open('copytofunction', 'xyzzy', false);
+ dblink_copy_open 
+------------------
+ OK
+(1 row)
+
+COPY bar TO FUNCTION dblink_copy_write;
+COPY baz TO FUNCTION dblink_copy_write;
+SELECT dblink_copy_end();
+ dblink_copy_end 
+-----------------
+ OK
+(1 row)
+
+-- confirm that data has arrived
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+  a  | b |        c         
+-----+---+------------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+ 100 | w | {a100,b100,c100}
+ 101 | x | {a101,b101,c101}
+ 102 | y | {a102,b102,c102}
+ 103 | z | {a103,b103,c103}
+(14 rows)
+
+-- try doing a binary COPY
+SELECT dblink_copy_open('copytofunction', 'xyzzy', true);
+ dblink_copy_open 
+------------------
+ OK
+(1 row)
+
+COPY plugh TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_copy_end();
+ dblink_copy_end 
+-----------------
+ OK
+(1 row)
+
+-- confirm that data has arrived
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+  a  | b |        c         
+-----+---+------------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+ 100 | w | {a100,b100,c100}
+ 101 | x | {a101,b101,c101}
+ 102 | y | {a102,b102,c102}
+ 103 | z | {a103,b103,c103}
+ 104 | u | {a102,b102,c102}
+ 105 | v | {a103,b103,c103}
+(16 rows)
+
+-- try using reset to abort out of a copy state
+SELECT dblink_copy_open('copytofunction', 'xyzzy', true);
+ dblink_copy_open 
+------------------
+ OK
+(1 row)
+
+COPY plugh TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_connection_reset('copytofunction');
+ dblink_connection_reset 
+-------------------------
+ 
+(1 row)
+
+-- should fail, as COPY should have been aborted
+SELECT dblink_copy_end();
+ERROR:  COPY end failed
+-- no new data should have appeared
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+  a  | b |        c         
+-----+---+------------------
+   0 | a | {a0,b0,c0}
+   1 | b | {a1,b1,c1}
+   2 | c | {a2,b2,c2}
+   3 | d | {a3,b3,c3}
+   4 | e | {a4,b4,c4}
+   5 | f | {a5,b5,c5}
+   6 | g | {a6,b6,c6}
+   7 | h | {a7,b7,c7}
+   8 | i | {a8,b8,c8}
+   9 | j | {a9,b9,c9}
+ 100 | w | {a100,b100,c100}
+ 101 | x | {a101,b101,c101}
+ 102 | y | {a102,b102,c102}
+ 103 | z | {a103,b103,c103}
+ 104 | u | {a102,b102,c102}
+ 105 | v | {a103,b103,c103}
+(16 rows)
+
+-- should be a no-op, since no transaction should be active at this
+-- point
+SELECT dblink_connection_reset('copytofunction');
+ dblink_connection_reset 
+-------------------------
+ 
+(1 row)
+
+-- generate an error in the remote transaction
+SELECT dblink_exec('copytofunction','BEGIN');
+ dblink_exec 
+-------------
+ BEGIN
+(1 row)
+
+SELECT * FROM dblink('copytofunction', 'SELECT 1 / 0') AS t (a int);
+ERROR:  division by zero
+CONTEXT:  Error occurred on dblink connection named "unnamed": could not execute query.
+-- rollback the errored transaction
+SELECT dblink_connection_reset('copytofunction');
+ dblink_connection_reset 
+-------------------------
+ 
+(1 row)
+
+-- should just work, if reset didn't actually reset the transaction
+-- state an error would result.
+SELECT * FROM dblink('copytofunction', 'SELECT 1;') AS t (a int);
+ a 
+---
+ 1
+(1 row)
+
+-- try a really long identifier to test string handlig in
+-- dblink_copy_open.  This should neatly hit NAMEDATALEN on most
+-- systems, or 64 - 1
+create table
+"012345678901234567890123456789012345678901234567890123456789012" (a int);
+-- should put the connection into the COPY state without complaint...
+SELECT dblink_copy_open('copytofunction',
+       '012345678901234567890123456789012345678901234567890123456789012',
+       true);
+ dblink_copy_open 
+------------------
+ OK
+(1 row)
+
+COPY (SELECT generate_series(1, 5)) TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_copy_end();
+ dblink_copy_end 
+-----------------
+ OK
+(1 row)
+
+-- check to see if data made it
+SELECT * FROM
+  "012345678901234567890123456789012345678901234567890123456789012";
+ a 
+---
+ 1
+ 2
+ 3
+ 4
+ 5
+(5 rows)
+
+-- postgres truncates long identifiers and advertises with a NOTICE,
+-- and as of right now dblink does no remote-machine NOTICE handling.
+-- The result is silent truncation to the remote machine's
+-- NAMEDATALEN.
+SELECT dblink_copy_open('copytofunction',
+       '012345678901234567890123456789012345678901234567890123456789012345678',
+       true);
+ dblink_copy_open 
+------------------
+ OK
+(1 row)
+
+COPY (SELECT generate_series(6, 10)) TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_copy_end();
+ dblink_copy_end 
+-----------------
+ OK
+(1 row)
+
+-- check to see if data made it
+SELECT * FROM
+  "012345678901234567890123456789012345678901234567890123456789012";
+ a  
+----
+  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+(10 rows)
+
 SELECT dblink_disconnect();
  dblink_disconnect 
 -------------------
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index d0ad876..919fd78 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -405,4 +405,116 @@ SELECT notify_name, be_pid = (select t.be_pid from dblink('select pg_backend_pid

SELECT * from dblink_get_notify();

+-- test COPY ... TO FUNCTION support
+CREATE SCHEMA dblink_copy_to_function;
+SET search_path = dblink_copy_to_function, public;
+CREATE TABLE xyzzy(f1 int, f2 text, f3 text[], primary key (f1,f2));
+INSERT INTO xyzzy VALUES (0,'a','{"a0","b0","c0"}');
+INSERT INTO xyzzy VALUES (1,'b','{"a1","b1","c1"}');
+INSERT INTO xyzzy VALUES (2,'c','{"a2","b2","c2"}');
+INSERT INTO xyzzy VALUES (3,'d','{"a3","b3","c3"}');
+INSERT INTO xyzzy VALUES (4,'e','{"a4","b4","c4"}');
+INSERT INTO xyzzy VALUES (5,'f','{"a5","b5","c5"}');
+INSERT INTO xyzzy VALUES (6,'g','{"a6","b6","c6"}');
+INSERT INTO xyzzy VALUES (7,'h','{"a7","b7","c7"}');
+INSERT INTO xyzzy VALUES (8,'i','{"a8","b8","c8"}');
+INSERT INTO xyzzy VALUES (9,'j','{"a9","b9","c9"}');
+
+CREATE TABLE bar(f1 int, f2 text, f3 text[], primary key (f1,f2));
+INSERT INTO bar VALUES (100,'w','{"a100","b100","c100"}');
+INSERT INTO bar VALUES (101,'x','{"a101","b101","c101"}');
+
+CREATE TABLE baz(f1 int, f2 text, f3 text[], primary key (f1,f2));
+INSERT INTO baz VALUES (102,'y','{"a102","b102","c102"}');
+INSERT INTO baz VALUES (103,'z','{"a103","b103","c103"}');
+
+CREATE TABLE plugh(f1 int, f2 text, f3 text[], primary key (f1,f2));
+INSERT INTO plugh VALUES (104,'u','{"a102","b102","c102"}');
+INSERT INTO plugh VALUES (105,'v','{"a103","b103","c103"}');
+
+SELECT dblink_connect('copytofunction','dbname=contrib_regression');
+SELECT dblink_exec('copytofunction',
+       'SET search_path = dblink_copy_to_function, public;');
+
+-- ensure that original base data is present
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+
+-- try doing a few consecutive copies with one open connection
+SELECT dblink_copy_open('copytofunction', 'xyzzy', false);
+COPY bar TO FUNCTION dblink_copy_write;
+COPY baz TO FUNCTION dblink_copy_write;
+SELECT dblink_copy_end();
+
+-- confirm that data has arrived
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+
+-- try doing a binary COPY
+SELECT dblink_copy_open('copytofunction', 'xyzzy', true);
+COPY plugh TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_copy_end();
+
+-- confirm that data has arrived
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+
+-- try using reset to abort out of a copy state
+SELECT dblink_copy_open('copytofunction', 'xyzzy', true);
+COPY plugh TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_connection_reset('copytofunction');
+
+-- should fail, as COPY should have been aborted
+SELECT dblink_copy_end();
+
+-- no new data should have appeared
+SELECT *
+FROM dblink('copytofunction', 'SELECT * FROM xyzzy') AS t(a int, b text, c text[]);
+
+-- should be a no-op, since no transaction should be active at this
+-- point
+SELECT dblink_connection_reset('copytofunction');
+
+-- generate an error in the remote transaction
+SELECT dblink_exec('copytofunction','BEGIN');
+SELECT * FROM dblink('copytofunction', 'SELECT 1 / 0') AS t (a int);
+
+-- rollback the errored transaction
+SELECT dblink_connection_reset('copytofunction');
+
+-- should just work, if reset didn't actually reset the transaction
+-- state an error would result.
+SELECT * FROM dblink('copytofunction', 'SELECT 1;') AS t (a int);
+
+-- try a really long identifier to test string handlig in
+-- dblink_copy_open.  This should neatly hit NAMEDATALEN on most
+-- systems, or 64 - 1
+create table
+"012345678901234567890123456789012345678901234567890123456789012" (a int);
+
+-- should put the connection into the COPY state without complaint...
+SELECT dblink_copy_open('copytofunction',
+       '012345678901234567890123456789012345678901234567890123456789012',
+       true);
+COPY (SELECT generate_series(1, 5)) TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_copy_end();
+
+-- check to see if data made it
+SELECT * FROM
+  "012345678901234567890123456789012345678901234567890123456789012";
+
+-- postgres truncates long identifiers and advertises with a NOTICE,
+-- and as of right now dblink does no remote-machine NOTICE handling.
+-- The result is silent truncation to the remote machine's
+-- NAMEDATALEN.
+SELECT dblink_copy_open('copytofunction',
+       '012345678901234567890123456789012345678901234567890123456789012345678',
+       true);
+COPY (SELECT generate_series(6, 10)) TO FUNCTION dblink_copy_write BINARY;
+SELECT dblink_copy_end();
+
+-- check to see if data made it
+SELECT * FROM
+  "012345678901234567890123456789012345678901234567890123456789012";
+
 SELECT dblink_disconnect();
-- 
1.6.5.3
#6Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#5)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, Nov 23, 2009 at 4:34 PM, Daniel Farina <dfarina@truviso.com> wrote:

Signed-off-by: Daniel Farina <dfarina@truviso.com>

Thanks for the patch. You may want to take a look at this:

http://wiki.postgresql.org/wiki/Submitting_a_Patch

I'm fuzzy on what problem this is attempting to solve... as mentioned
in the above guidelines, it's usually good to start with some design
discussions before writing/submitting code. Also, we prefer that
patches be submitted as context diffs and that they not be split up
over multiple emails.

Thanks,

...Robert

#7Daniel Farina
dfarina@truviso.com
In reply to: Robert Haas (#6)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, Nov 23, 2009 at 2:16 PM, Robert Haas <robertmhaas@gmail.com> wrote:

On Mon, Nov 23, 2009 at 4:34 PM, Daniel Farina <dfarina@truviso.com> wrote:

Signed-off-by: Daniel Farina <dfarina@truviso.com>

Thanks for the patch.  You may want to take a look at this:

http://wiki.postgresql.org/wiki/Submitting_a_Patch

I'm fuzzy on what problem this is attempting to solve...

It seems somewhat strange that the only things COPY can do with its
output stream of bytes is exactly two modes that are baked into
Postgres in the core. This allows carefully written UDFs to do
whatever they will with the stream of bytes, such as sending into a
waiting libpq connection.

as mentioned in the above guidelines, it's usually good to start with some design
discussions before writing/submitting code.

The patch is derived from functionality in the Truviso
postgres-derived database product which is non-optional. This is
extruded from that.

Also, we prefer that patches be submitted as context diffs

I actually remembered this right after I sent it...sorry about that.

And that they not be split up over multiple emails.

With the possible exception of squashing together the test cases into
their implementing patches, I would say this is at least two patches.
One is to a contrib, the other to core PostgreSQL. It so happens the
core addition makes the contrib changes much more obviously useful.

fdr

#8Greg Smith
gsmith@gregsmith.com
In reply to: Robert Haas (#6)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Robert Haas wrote:

I'm fuzzy on what problem this is attempting to solve... as mentioned
in the above guidelines, it's usually good to start with some design
discussions before writing/submitting code.

This has been through some heavy design discussions with a few PG
hackers you know and some you don't, they just couldn't release the
result until now. As for what it's good for, if you look at what you
can do now with dblink, you can easily move rows between nodes using
dblink_build_sql_insert. This is perfectly fine for small bits of work,
but the performance isn't nearly good enough to do serious replication
with it. The upper level patch here allows using COPY as the mechanism
to move things between them, which is much faster for some use cases
(which includes most of the really useful ones). It dramatically
increases the scale of what you can move around using dblink as the
replication transport.

The lower level patch is needed to build that layer, which is an
immediate proof of its utility. In addition, adding a user-defined
function as a COPY target opens up all sorts of possibilities for things
like efficient ETL implementation. And if this approach is accepted as
a reasonable one, as Dan suggested a next step might even be to
similarly allow passing COPY FROM through a UDF, which has the potential
to provide a new efficient implementation path for some of the custom
input filter requests that pop up here periodically.

--
Greg Smith 2ndQuadrant Baltimore, MD
PostgreSQL Training, Services and Support
greg@2ndQuadrant.com www.2ndQuadrant.com

#9Andrew Dunstan
andrew@dunslane.net
In reply to: Greg Smith (#8)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Greg Smith wrote:

Robert Haas wrote:

I'm fuzzy on what problem this is attempting to solve... as mentioned
in the above guidelines, it's usually good to start with some design
discussions before writing/submitting code.

This has been through some heavy design discussions with a few PG
hackers you know and some you don't, they just couldn't release the
result until now. As for what it's good for, if you look at what you
can do now with dblink, you can easily move rows between nodes using
dblink_build_sql_insert. This is perfectly fine for small bits of
work, but the performance isn't nearly good enough to do serious
replication with it. The upper level patch here allows using COPY as
the mechanism to move things between them, which is much faster for
some use cases (which includes most of the really useful ones). It
dramatically increases the scale of what you can move around using
dblink as the replication transport.

I recently found myself trying to push data through dblink() and ended
up writing code to make a call to the target to call a function which
called back to the source to select the data and insert it. The speedup
was massive, so I'll be interested to dig into the details here.

The lower level patch is needed to build that layer, which is an
immediate proof of its utility. In addition, adding a user-defined
function as a COPY target opens up all sorts of possibilities for
things like efficient ETL implementation. And if this approach is
accepted as a reasonable one, as Dan suggested a next step might even
be to similarly allow passing COPY FROM through a UDF, which has the
potential to provide a new efficient implementation path for some of
the custom input filter requests that pop up here periodically.

I'm also interested in COPY returning rows as text[], which was
discussed recently. Does this help move us towards that?

cheers

andrew

#10Daniel Farina
dfarina@truviso.com
In reply to: Andrew Dunstan (#9)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, Nov 23, 2009 at 3:46 PM, Andrew Dunstan <andrew@dunslane.net> wrote:

I recently found myself trying to push data through dblink() and ended up
writing code to make a call to the target to call a function which called
back to the source to select the data and insert it. The speedup was
massive, so I'll be interested to dig into the details here.

The way the indirection is accomplished here should be very cheap.
Overhead should be comparable to the fwrite() call that is used for
copying to a file...this is why I mentioned that it would be
interesting to make this good enough to be the underlying mechanism of
TO STDOUT/TO 'file' to reduce the overall number of mechanisms used to
perform COPY TO.

I'm also interested in COPY returning rows as text[], which was discussed
recently. Does this help move us towards that?

Yes. Take a look at the tests introduced to core PostgeSQL (see patch
2), where instead of returning a text[] I return just a single text of
the verbatim output of the copy. You could imagine making that an SRF
instead. It would have to understand COPY row delimiters in whatever
mode you were operating in, though.

fdr

#11Daniel Farina
dfarina@truviso.com
In reply to: Daniel Farina (#10)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, Nov 23, 2009 at 4:03 PM, Daniel Farina <dfarina@truviso.com> wrote:

Yes.  Take a look at the tests introduced to core PostgeSQL (see patch
2), where instead of returning a text[] I return just a single text of
the verbatim output of the copy.  You could imagine making that an SRF
instead.  It would have to understand COPY row delimiters in whatever
mode you were operating in, though.

Actually, sorry, I lie, but not in a bad way.... Since COPY operates
row at a time (rather than a stream of bytes with arbitrary
boundaries) you could rely on being passed each record one-at-a-time.
You don't have to understand the delimiter. So you could even make a
bytea[][] that even contains the binary output, the first dimension
being row number, the second being the bytes themselves. The header
would pose an interesting problem, though.

fdr

#12Tom Lane
tgl@sss.pgh.pa.us
In reply to: Greg Smith (#8)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Greg Smith <greg@2ndquadrant.com> writes:

Robert Haas wrote:

I'm fuzzy on what problem this is attempting to solve... as mentioned
in the above guidelines, it's usually good to start with some design
discussions before writing/submitting code.

This has been through some heavy design discussions with a few PG
hackers you know and some you don't, they just couldn't release the
result until now.

Those discussions don't have a lot of credibility if they didn't take
place on the public mailing lists.

pgsql-hackers had some preliminary discussions a couple months back
on refactoring COPY to allow things like this --- see the thread
starting here:
http://archives.postgresql.org/pgsql-hackers/2009-09/msg00486.php
While I don't think we arrived at any final decisions, I would like
to know how this design fits in with what was discussed then.

regards, tom lane

#13Daniel Farina
dfarina@truviso.com
In reply to: Tom Lane (#12)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, Nov 23, 2009 at 4:20 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

pgsql-hackers had some preliminary discussions a couple months back
on refactoring COPY to allow things like this --- see the thread
starting here:
http://archives.postgresql.org/pgsql-hackers/2009-09/msg00486.php
While I don't think we arrived at any final decisions, I would like
to know how this design fits in with what was discussed then.

This seems to be about importing/ingress, whereas this patch is about
exporting/egress...it is an interesting question on how much parsing
to do before on the ingress side before handing a row to a function
though, should we try to make these kinds of operations a bit more
symmetrical.

I did consider refactoring COPY, but since it's never clear when we
start a feature whether it is going to manifest itself as a good
upstream candidate we default to trying to make future merges with
Postgres tractable I did not take on such a large and artistic task.

fdr

#14Greg Smith
gsmith@gregsmith.com
In reply to: Tom Lane (#12)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Tom Lane wrote:

Those discussions don't have a lot of credibility if they didn't take
place on the public mailing lists.

You know how people complain about how new contributors are treated
here? Throwing out comments like this, that come off as belittling to
other people's work, doesn't help. All I was suggesting was that Dan
wasn't developing this in complete isolation from the hackers community
as Robert had feared, as will be obvious when we get to:

pgsql-hackers had some preliminary discussions a couple months back
on refactoring COPY to allow things like this --- see the thread
starting here:
http://archives.postgresql.org/pgsql-hackers/2009-09/msg00486.php
While I don't think we arrived at any final decisions, I would like
to know how this design fits in with what was discussed then.

The patch provided here is a first step toward what you suggested in the
related "Copy enhancements thread" a few days later, at
http://archives.postgresql.org/pgsql-hackers/2009-09/msg00616.php It's
one way to implement a better decoupled "data transformation" layer on
top of COPY. When Dan showed me an earlier implementation of the basic
idea embodied in this patch (developed independently and earlier
actually), I gave it a thumbs-up as seeming to match the general
direction the community discussion suggested, and encouraged him to work
on getting the code to where it could be released. He started with
output rather than input, mainly because the dblink feature had a
compelling use-case that justified spending time on development for the
company. Rather than keep going into input transformation, this
development checkpoint made a good place to pause and solicit public
feedback, particularly since the input side has additional hurdles to
clear before it can work.

As far as other past discussion here that might be relevant, this patch
includes a direct change to gram.y to support the new syntax. You've
already suggested before that it might be time to update COPY the same
way EXPLAIN and now VACUUM have been overhauled to provide a more
flexible options interface:
http://archives.postgresql.org/pgsql-hackers/2009-09/msg00616.php This
patch might be more fuel for that idea.

Emmanuel has given up the more error tolerant COPY patch that thread was
associated with, and I haven't heard anything from Andrew about ragged
CVS import either. I think that ultimately those features are useful,
but just exceed what the existing code could be hacked to handle
cleanly. If it's possible to lower the complexity bar to implementing
them by abstracting the transformation into a set of functions, and have
more flexible syntax for the built-in ones the database ships with, that
may be useful groundwork for returning to implementing those features
without bloating COPY's internals (and therefore performance)
intolerably. Dan even suggested in his first note that it might be
possible to write the current STDOUT|file behavior in terms of the new
function interface, which has the potential to make the COPY
implementation cleaner rather than more cluttered (as long as the
performance doesn't suffer).

--
Greg Smith 2ndQuadrant Baltimore, MD
PostgreSQL Training, Services and Support
greg@2ndQuadrant.com www.2ndQuadrant.com

#15Andrew Dunstan
andrew@dunslane.net
In reply to: Greg Smith (#14)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Greg Smith wrote:

I haven't heard anything from Andrew about ragged CVS import either.
I think that ultimately those features are useful, but just exceed
what the existing code could be hacked to handle cleanly.

The patch is attached for your edification/amusement. I have backpatched
it to 8.4 for the client that needed it, and it's working just fine. I
didn't pursue it when it was clear that it was not going to be accepted.
COPY returning text[] would allow us to achieve the same thing, a bit
more verbosely, but it would be a lot more work to develop.

cheers

andrew

Attachments:

raggedcopy.patchtext/x-patch; name=raggedcopy.patchDownload+53-22
#16Hannu Krosing
hannu@tm.ee
In reply to: Greg Smith (#8)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, 2009-11-23 at 18:31 -0500, Greg Smith wrote:

Robert Haas wrote:

I'm fuzzy on what problem this is attempting to solve... as mentioned
in the above guidelines, it's usually good to start with some design
discussions before writing/submitting code.

This has been through some heavy design discussions with a few PG
hackers you know and some you don't, they just couldn't release the
result until now. As for what it's good for, if you look at what you
can do now with dblink, you can easily move rows between nodes using
dblink_build_sql_insert. This is perfectly fine for small bits of work,
but the performance isn't nearly good enough to do serious replication
with it. The upper level patch here allows using COPY as the mechanism
to move things between them, which is much faster for some use cases
(which includes most of the really useful ones). It dramatically
increases the scale of what you can move around using dblink as the
replication transport.

The lower level patch is needed to build that layer, which is an
immediate proof of its utility. In addition, adding a user-defined
function as a COPY target opens up all sorts of possibilities for things
like efficient ETL implementation. And if this approach is accepted as
a reasonable one, as Dan suggested a next step might even be to
similarly allow passing COPY FROM through a UDF, which has the potential
to provide a new efficient implementation path for some of the custom
input filter requests that pop up here periodically.

Can this easily be extended to do things like

COPY stdin TO udf();
or
COPY udf() FROM stdin;

so that I could write a simple partitioning function, either local for
partitioned tables or using pl/proxy for partitioned databases

?

Show quoted text

--
Greg Smith 2ndQuadrant Baltimore, MD
PostgreSQL Training, Services and Support
greg@2ndQuadrant.com www.2ndQuadrant.com

#17Hannu Krosing
hannu@tm.ee
In reply to: Daniel Farina (#13)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Mon, 2009-11-23 at 16:25 -0800, Daniel Farina wrote:

On Mon, Nov 23, 2009 at 4:20 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

pgsql-hackers had some preliminary discussions a couple months back
on refactoring COPY to allow things like this --- see the thread
starting here:
http://archives.postgresql.org/pgsql-hackers/2009-09/msg00486.php
While I don't think we arrived at any final decisions, I would like
to know how this design fits in with what was discussed then.

This seems to be about importing/ingress, whereas this patch is about
exporting/egress...it is an interesting question on how much parsing
to do before on the ingress side before handing a row to a function
though,

My suggestion for

COPY func(rowtype) FROM stdin;

would be to pass the function a fully processed row of that type with
all fields resolved and converted to right types.

it may be useful to also have forms like

COPY func(text) FROM stdin;

and

COPY func(bytea[]) FROM stdin;

for getting a less processed input

should we try to make these kinds of operations a bit more
symmetrical.

--
Hannu Krosing http://www.2ndQuadrant.com
PostgreSQL Scalability and Availability
Services, Consulting and Training

#18Daniel Farina
drfarina@acm.org
In reply to: Hannu Krosing (#16)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Tue, Nov 24, 2009 at 12:29 AM, Hannu Krosing <hannu@krosing.net> wrote:

COPY stdin TO udf();

If stdin becomes (is?) a legitimate source of records, then this patch
will Just Work.

The patch is already quite useful in the COPY (SELECT ...) TO FUNCTION
... scenario.

COPY udf() FROM stdin;

This is unaddressed, but I think it would be a good idea to consider
enabling this kind of thing prior to application.

fdr

#19Daniel Farina
drfarina@gmail.com
In reply to: Hannu Krosing (#17)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

On Tue, Nov 24, 2009 at 12:38 AM, Hannu Krosing <hannu@2ndquadrant.com> wrote:

COPY func(rowtype) FROM stdin;

I didn't consider rowtype...I did consider a type list, such as:

COPY func(typea, typeb, typec) FROM ...

Which would then operate just like a table, but be useless for the
data-cleaning case, and would not allow type overloading to be the
mechanism of selecting behavior.

It was just an idea...I don't really know the use cases well enough,
my anticipated used case was updating eagerly materialized quantities
with the UDF.

fdr

#20Pavel Stehule
pavel.stehule@gmail.com
In reply to: Hannu Krosing (#16)
Re: [PATCH 4/4] Add tests to dblink covering use of COPY TO FUNCTION

Hello

I thing, so this patch is maybe good idea. I am missing better
function specification. Specification by name isn't enough - we can
have a overloaded functions. This syntax doesn't allow to use explicit
cast - from my personal view, the syntax is ugly - with type
specification we don't need to keyword FUNCTION

what about::

var a) by type specification

COPY table TO foo(varchar, int)

var b) by value expression - it allows some changes in order, casting, constants

COPY table TO foo($3, $1::date, $2::text, CURRENT_DATE, true);

One question:
We have a fast copy statement - ok., we have a fast function ok, but
inside a function we have to call "slow" sql query. Personally What is
advantage?

We need pipes like

like COPY table TO foo(..) TO table

foo() should be a transformation function, or real pipe function

Regards
Pavel Stehule

#21Daniel Farina
drfarina@gmail.com
In reply to: Pavel Stehule (#20)
#22Hannu Krosing
hannu@tm.ee
In reply to: Daniel Farina (#21)
#23Daniel Farina
drfarina@gmail.com
In reply to: Hannu Krosing (#22)
#24Daniel Farina
drfarina@gmail.com
In reply to: Daniel Farina (#23)
#25Hannu Krosing
hannu@tm.ee
In reply to: Daniel Farina (#24)
#26Daniel Farina
drfarina@gmail.com
In reply to: Hannu Krosing (#25)
#27Pavel Stehule
pavel.stehule@gmail.com
In reply to: Daniel Farina (#21)
#28Daniel Farina
drfarina@gmail.com
In reply to: Pavel Stehule (#27)
#29Hannu Krosing
hannu@tm.ee
In reply to: Daniel Farina (#26)
#30Pavel Stehule
pavel.stehule@gmail.com
In reply to: Daniel Farina (#28)
#31Hannu Krosing
hannu@tm.ee
In reply to: Daniel Farina (#28)
#32Robert Haas
robertmhaas@gmail.com
In reply to: Greg Smith (#14)
#33Pavel Stehule
pavel.stehule@gmail.com
In reply to: Hannu Krosing (#31)
#34Robert Haas
robertmhaas@gmail.com
In reply to: Andrew Dunstan (#15)
#35Jeff Davis
pgsql@j-davis.com
In reply to: Daniel Farina (#18)
#36Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#30)
#37Tom Lane
tgl@sss.pgh.pa.us
In reply to: Jeff Davis (#36)
#38Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#36)
#39Daniel Farina
drfarina@gmail.com
In reply to: Pavel Stehule (#38)
#40Jeff Davis
pgsql@j-davis.com
In reply to: Tom Lane (#37)
#41Daniel Farina
drfarina@gmail.com
In reply to: Jeff Davis (#40)
#42Pavel Stehule
pavel.stehule@gmail.com
In reply to: Daniel Farina (#39)
#43Daniel Farina
drfarina@gmail.com
In reply to: Pavel Stehule (#42)
#44Pavel Stehule
pavel.stehule@gmail.com
In reply to: Daniel Farina (#43)
#45Jeff Davis
pgsql@j-davis.com
In reply to: Daniel Farina (#43)
#46Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#42)
#47Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#45)
#48Daniel Farina
drfarina@gmail.com
In reply to: Jeff Davis (#46)
#49Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#46)
#50Pavel Stehule
pavel.stehule@gmail.com
In reply to: Daniel Farina (#48)
#51Pavel Stehule
pavel.stehule@gmail.com
In reply to: Pavel Stehule (#50)
#52Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#47)
#53Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#49)
#54Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#53)
#55Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#54)
#56Hannu Krosing
hannu@tm.ee
In reply to: Jeff Davis (#40)
#57Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#55)
#58Daniel Farina
drfarina@gmail.com
In reply to: Pavel Stehule (#50)
#59Tom Lane
tgl@sss.pgh.pa.us
In reply to: Jeff Davis (#55)
#60Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#57)
#61Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#60)
#62Andrew Dunstan
andrew@dunslane.net
In reply to: Jeff Davis (#60)
#63Daniel Farina
drfarina@gmail.com
In reply to: Andrew Dunstan (#62)
#64Jeff Davis
pgsql@j-davis.com
In reply to: Pavel Stehule (#61)
#65Jeff Davis
pgsql@j-davis.com
In reply to: Andrew Dunstan (#62)
#66Andrew Dunstan
andrew@dunslane.net
In reply to: Jeff Davis (#65)
#67Andrew Dunstan
andrew@dunslane.net
In reply to: Jeff Davis (#65)
#68Pavel Stehule
pavel.stehule@gmail.com
In reply to: Jeff Davis (#64)
#69David Fetter
david@fetter.org
In reply to: Andrew Dunstan (#67)
#70Daniel Farina
drfarina@gmail.com
In reply to: David Fetter (#69)
#71Simon Riggs
simon@2ndQuadrant.com
In reply to: Jeff Davis (#45)
#72Jeff Davis
pgsql@j-davis.com
In reply to: Simon Riggs (#71)
#73Greg Smith
gsmith@gregsmith.com
In reply to: Jeff Davis (#65)
#74Jeff Davis
pgsql@j-davis.com
In reply to: Greg Smith (#73)
#75Jeff Davis
pgsql@j-davis.com
In reply to: Daniel Farina (#70)
#76Daniel Farina
drfarina@gmail.com
In reply to: Jeff Davis (#75)
#77Jeff Davis
pgsql@j-davis.com
In reply to: Daniel Farina (#76)
#78Greg Smith
gsmith@gregsmith.com
In reply to: Jeff Davis (#74)
#79Daniel Farina
drfarina@acm.org
In reply to: Greg Smith (#78)
#80Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#79)
#81Daniel Farina
drfarina@acm.org
In reply to: Robert Haas (#80)
#82Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#81)
#83Daniel Farina
drfarina@acm.org
In reply to: Robert Haas (#82)
#84Jeff Davis
pgsql@j-davis.com
In reply to: Robert Haas (#82)
#85Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#83)
#86Jeff Davis
pgsql@j-davis.com
In reply to: Robert Haas (#85)
#87Daniel Farina
drfarina@acm.org
In reply to: Robert Haas (#85)
#88Robert Haas
robertmhaas@gmail.com
In reply to: Jeff Davis (#86)
#89Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#87)
#90Daniel Farina
drfarina@acm.org
In reply to: Robert Haas (#89)
#91Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#90)
#92Daniel Farina
drfarina@acm.org
In reply to: Robert Haas (#91)
#93Robert Haas
robertmhaas@gmail.com
In reply to: Daniel Farina (#92)
#94Jeff Davis
pgsql@j-davis.com
In reply to: Robert Haas (#88)