diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 432b0ca..5ba2f1f 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -579,7 +579,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) errmsg("could not read from COPY file: %m"))); break; case COPY_OLD_FE: - /* * We cannot read more than minread bytes (which in practice is 1) * because old protocol doesn't have any clear way of separating @@ -4540,3 +4539,235 @@ CreateCopyDestReceiver(void) return (DestReceiver *) self; } + +/* + * copy_srf starts here + */ + +#include "funcapi.h" +/* +#ifdef PG_MODULE_MAGIC +PG_MODULE_MAGIC; +#endif +*/ + +PG_FUNCTION_INFO_V1(copy_srf); + +Datum +copy_srf(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + //AttInMetadata *attinmeta; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + + CopyStateData copy_state; + int col; + + Datum *values; + bool *nulls; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + } + + if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + } + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + //attinmeta = TupleDescGetAttInMetadata(tupdesc); + values = (Datum *) palloc(tupdesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); + in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid)); + + for (col = 0; col < tupdesc->natts; col++) + { + getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]); + fmgr_info(in_func_oid,&in_functions[col]); + } + + /* Mock up a copy state */ + initStringInfo(©_state.line_buf); + initStringInfo(©_state.attribute_buf); + copy_state.fe_msgbuf = makeStringInfo(); + + //copy_state.copy_dest = COPY_NEW_FE; /* maybe just COPY_FILE? */ + copy_state.copy_dest = COPY_FILE; /* maybe just COPY_FILE? */ + copy_state.file_encoding = 0; /* TODO total guess */ + copy_state.need_transcoding = false; /* TODO a guess */ + copy_state.encoding_embeds_ascii = false; /* TODO a guess */ + + /* parameters from the COPY command */ + copy_state.rel = NULL; + copy_state.queryDesc = NULL; + /* TODO */ + //List *attnumlist; /* integer list of attnums to copy */ + //copy_state.filename = "/bin/echo 123,abc,456,def,2016-01-04"; + if (PG_ARGISNULL(0)) + { + copy_state.filename = NULL; + } + else + { + text *fn = PG_GETARG_TEXT_P(0); + int len = VARSIZE(fn) - VARHDRSZ; + copy_state.filename = (char *) palloc(len); + memcpy(copy_state.filename,VARDATA(fn),len); + } + //copy_state.is_program = true; + copy_state.is_program = PG_GETARG_BOOL(1); + copy_state.binary = false; + copy_state.oids = false; + copy_state.freeze = false; + copy_state.csv_mode = true; + copy_state.header_line = false; + //char *null_print; /* NULL marker string (server encoding!) */ + //int null_print_len; /* length of same */ + //char *null_print_client; /* same converted to file encoding */ + copy_state.delim = ","; + copy_state.quote = "\""; + copy_state.escape = "\\"; + //List *force_quote; /* list of column names */ + copy_state.force_quote_all = false; + //bool *force_quote_flags; /* per-column CSV FQ flags */ + //List *force_notnull; /* list of column names */ + //bool *force_notnull_flags; /* per-column CSV FNN flags */ + //List *force_null; /* list of column names */ + //bool *force_null_flags; /* per-column CSV FN flags */ + //bool convert_selectively; /* do selective binary conversion? */ + //List *convert_select; /* list of column names (can be NIL) */ + //bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + copy_state.max_fields = tupdesc->natts; + copy_state.raw_fields = (char **) palloc(tupdesc->natts * sizeof(char *)); + + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + per_query_ctx = fcinfo->flinfo->fn_mcxt; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true,false,work_mem); + + /* open "file" */ + if (copy_state.is_program) + { + copy_state.copy_file = OpenPipeStream(copy_state.filename, PG_BINARY_R); + + if (copy_state.copy_file == NULL) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not execute command \"%s\": %m", + copy_state.filename))); + } + } + else + { + struct stat st; + + copy_state.copy_file = AllocateFile(copy_state.filename, PG_BINARY_R); + if (copy_state.copy_file == NULL) + { + /* copy errno because ereport subfunctions might change it */ + int save_errno = errno; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + copy_state.filename), + (save_errno == ENOENT || save_errno == EACCES) ? + errhint("copy_srf instructs the PostgreSQL server process to read a file. " + "You may want a client-side facility such as psql's \\copy.") : 0)); + } + + if (fstat(fileno(copy_state.copy_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + copy_state.filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", copy_state.filename))); + } + + while(1) + { + char **field_strings; + int field_strings_count; + int col; + HeapTuple tuple; + + if (! NextCopyFromRawFields(©_state,&field_strings,&field_strings_count)) + { + break; + } + if (field_strings_count != tupdesc->natts) + { + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("found %d fields but expected %d on line %d", + field_strings_count, tupdesc->natts, copy_state.cur_lineno))); + } + + for (col = 0; col < tupdesc->natts; col++) + { + values[col] = InputFunctionCall(&in_functions[col], + field_strings[col], + typioparams[col], + tupdesc->attrs[col]->atttypmod); + nulls[col] = (field_strings[col] == NULL); + } + + tuple = heap_form_tuple(tupdesc,values,nulls); + //tuple = BuildTupleFromCStrings(attinmeta, field_strings); + tuplestore_puttuple(tupstore, tuple); + } + + /* close "file" */ + if (copy_state.is_program) + { + int pclose_rc; + + pclose_rc = ClosePipeStream(copy_state.copy_file); + if (pclose_rc == -1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close pipe to external command: %m"))); + else if (pclose_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("program \"%s\" failed", + copy_state.filename), + errdetail_internal("%s", wait_result_to_str(pclose_rc)))); + } + else + { + if (copy_state.filename != NULL && FreeFile(copy_state.copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + copy_state.filename))); + } + + tuplestore_donestoring(tupstore); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index e2d08ba..de3973f 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5344,6 +5344,9 @@ DESCR("pg_controldata recovery state information as a function"); DATA(insert OID = 3444 ( pg_control_init PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 2249 "" "{23,23,23,23,23,23,23,23,23,16,16,16,23}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{max_data_alignment,database_block_size,blocks_per_segment,wal_block_size,bytes_per_wal_segment,max_identifier_length,max_index_columns,max_toast_chunk_size,large_object_chunk_size,bigint_timestamps,float4_pass_by_value,float8_pass_by_value,data_page_checksum_version}" _null_ _null_ pg_control_init _null_ _null_ _null_ )); DESCR("pg_controldata init state information as a function"); +DATA(insert OID = 3445 ( copy_srf PGNSP PGUID 12 1 1000 0 0 f f f f t t v u 2 0 2249 "25 16" _null_ _null_ _null_ _null_ _null_ copy_srf _null_ _null_ _null_ )); +DESCR("set-returning COPY proof of concept"); + /* * Symbolic values for provolatile column: these indicate whether the result * of a function is dependent *only* on the values of its explicit arguments,