diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 7b8bee8..57f6ec5 100644 *** a/src/backend/commands/copy.c --- b/src/backend/commands/copy.c *************** typedef struct CopyStateData *** 99,105 **** int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - uint64 processed; /* # of tuples processed */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ --- 99,104 ---- *************** typedef struct CopyStateData *** 119,125 **** bool *force_quote_flags; /* per-column CSV FQ flags */ bool *force_notnull_flags; /* per-column CSV FNN flags */ ! /* these are just for error messages, see copy_in_error_callback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ --- 118,124 ---- bool *force_quote_flags; /* per-column CSV FQ flags */ bool *force_notnull_flags; /* per-column CSV FNN flags */ ! /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ *************** typedef struct CopyStateData *** 142,150 **** StringInfoData attribute_buf; /* field raw data pointers found by COPY FROM */ ! ! int max_fields; ! char ** raw_fields; /* * Similarly, line_buf holds the whole input line being processed. The --- 141,148 ---- StringInfoData attribute_buf; /* field raw data pointers found by COPY FROM */ ! int max_fields; ! char **raw_fields; /* * Similarly, line_buf holds the whole input line being processed. The *************** typedef struct CopyStateData *** 167,181 **** char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ - } CopyStateData; ! typedef CopyStateData *CopyState; /* DestReceiver for COPY (SELECT) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ CopyState cstate; /* CopyStateData for the command */ } DR_copy; --- 165,192 ---- char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ ! /* ! * The definition of input functions and default expressions are stored ! * in these variables. ! */ ! EState *estate; ! AttrNumber num_defaults; ! bool file_has_oids; ! FmgrInfo oid_in_function; ! Oid oid_typioparam; ! FmgrInfo *in_functions; ! Oid *typioparams; ! int *defmap; ! ExprState **defexprs; /* array of default att expressions */ ! } CopyStateData; /* DestReceiver for COPY (SELECT) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ CopyState cstate; /* CopyStateData for the command */ + uint64 processed; /* # of tuples processed */ } DR_copy; *************** static const char BinarySignature[11] = *** 248,258 **** /* non-export function prototypes */ ! static void DoCopyTo(CopyState cstate); ! static void CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); ! static void CopyFrom(CopyState cstate); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); --- 259,275 ---- /* non-export function prototypes */ ! static CopyState BeginCopy(bool is_from, ! Relation rel, Node *raw_query, const char *queryString, ! const char *filename, List *attnamelist, List *options); ! static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString, ! const char *filename, List *attnamelist, List *options); ! static void EndCopyTo(CopyState cstate); ! static uint64 DoCopyTo(CopyState cstate); ! static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); ! static uint64 CopyFrom(CopyState cstate, Relation rel); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); *************** CopyLoadRawBuf(CopyState cstate) *** 718,730 **** * Do not allow the copy if user doesn't have proper permission to access * the table or the specifically requested columns. */ ! uint64 ! DoCopy(const CopyStmt *stmt, const char *queryString) { CopyState cstate; - bool is_from = stmt->is_from; - bool pipe = (stmt->filename == NULL); - List *attnamelist = stmt->attlist; List *force_quote = NIL; List *force_notnull = NIL; bool force_quote_all = false; --- 735,746 ---- * Do not allow the copy if user doesn't have proper permission to access * the table or the specifically requested columns. */ ! static CopyState ! BeginCopy(bool is_from, ! Relation rel, Node *raw_query, const char *queryString, ! const char *filename, List *attnamelist, List *options) { CopyState cstate; List *force_quote = NIL; List *force_notnull = NIL; bool force_quote_all = false; *************** DoCopy(const CopyStmt *stmt, const char *** 733,745 **** ListCell *option; TupleDesc tupDesc; int num_phys_attrs; - uint64 processed; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); /* Extract options from the statement node tree */ ! foreach(option, stmt->options) { DefElem *defel = (DefElem *) lfirst(option); --- 749,760 ---- ListCell *option; TupleDesc tupDesc; int num_phys_attrs; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); /* Extract options from the statement node tree */ ! foreach(option, options) { DefElem *defel = (DefElem *) lfirst(option); *************** DoCopy(const CopyStmt *stmt, const char *** 980,1005 **** (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); ! /* Disallow file COPY except to superusers. */ ! if (!pipe && !superuser()) ! ereport(ERROR, ! (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), ! errmsg("must be superuser to COPY to or from a file"), ! errhint("Anyone can COPY to stdout or from stdin. " ! "psql's \\copy command also works for anyone."))); ! ! if (stmt->relation) { RangeTblEntry *rte; List *attnums; ListCell *cur; ! Assert(!stmt->query); ! cstate->queryDesc = NULL; ! /* Open and lock the relation, using the appropriate lock type. */ ! cstate->rel = heap_openrv(stmt->relation, ! (is_from ? RowExclusiveLock : AccessShareLock)); tupDesc = RelationGetDescr(cstate->rel); --- 995,1009 ---- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); ! if (rel) { RangeTblEntry *rte; List *attnums; ListCell *cur; ! Assert(!raw_query); ! cstate->rel = rel; tupDesc = RelationGetDescr(cstate->rel); *************** DoCopy(const CopyStmt *stmt, const char *** 1058,1064 **** * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ ! rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query), queryString, NULL, 0); /* We don't expect more or less than one result query */ --- 1062,1068 ---- * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ ! rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query), queryString, NULL, 0); /* We don't expect more or less than one result query */ *************** DoCopy(const CopyStmt *stmt, const char *** 1160,1173 **** } } - /* Set up variables to avoid per-attribute overhead. */ - initStringInfo(&cstate->attribute_buf); - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - cstate->processed = 0; - /* * Set up encoding conversion info. Even if the client and server * encodings are the same, we must apply pg_client_to_server() to validate --- 1164,1169 ---- *************** DoCopy(const CopyStmt *stmt, const char *** 1181,1229 **** cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ ! cstate->filename = stmt->filename; ! if (is_from) ! CopyFrom(cstate); /* copy from file to database */ else ! DoCopyTo(cstate); /* copy from database to file */ /* ! * Close the relation or query. If reading, we can release the ! * AccessShareLock we got; if writing, we should hold the lock until end ! * of transaction to ensure that updates will be committed before lock is ! * released. */ ! if (cstate->rel) ! heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); else { ! /* Close down the query and free resources. */ ! ExecutorEnd(cstate->queryDesc); ! FreeQueryDesc(cstate->queryDesc); ! PopActiveSnapshot(); } ! /* Clean up storage (probably not really necessary) */ ! processed = cstate->processed; pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); pfree(cstate->raw_buf); pfree(cstate); - - return processed; } /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */ ! static void DoCopyTo(CopyState cstate) { bool pipe = (cstate->filename == NULL); if (cstate->rel) { --- 1177,1722 ---- cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ ! cstate->filename = (filename ? pstrdup(filename) : NULL); ! return cstate; ! } ! ! CopyState ! BeginCopyFrom(Relation rel, ! const char *filename, ! List *attnamelist, ! List *options) ! { ! CopyState cstate; ! bool pipe = (filename == NULL); ! TupleDesc tupDesc; ! Form_pg_attribute *attr; ! AttrNumber num_phys_attrs, ! attr_count, ! num_defaults; ! FmgrInfo *in_functions; ! Oid *typioparams; ! int attnum; ! Oid in_func_oid; ! EState *estate = CreateExecutorState(); /* for ExecPrepareExpr() */ ! int *defmap; ! ExprState **defexprs; ! ! cstate = BeginCopy(true, rel, NULL, NULL, filename, attnamelist, options); ! ! /* Initialize state variables */ ! cstate->fe_eof = false; ! cstate->eol_type = EOL_UNKNOWN; ! cstate->cur_relname = RelationGetRelationName(cstate->rel); ! cstate->cur_lineno = 0; ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! ! /* Set up variables to avoid per-attribute overhead. */ ! initStringInfo(&cstate->attribute_buf); ! initStringInfo(&cstate->line_buf); ! cstate->line_buf_converted = false; ! cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); ! cstate->raw_buf_index = cstate->raw_buf_len = 0; ! ! tupDesc = RelationGetDescr(cstate->rel); ! attr = tupDesc->attrs; ! num_phys_attrs = tupDesc->natts; ! attr_count = list_length(cstate->attnumlist); ! num_defaults = 0; ! ! /* ! * Pick up the required catalog information for each attribute in the ! * relation, including the input function, the element type (to pass to ! * the input function), and info about defaults and constraints. (Which ! * input function we use depends on text/binary format choice.) ! */ ! in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); ! typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); ! defmap = (int *) palloc(num_phys_attrs * sizeof(int)); ! defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); ! ! for (attnum = 1; attnum <= num_phys_attrs; attnum++) ! { ! /* We don't need info for dropped attributes */ ! if (attr[attnum - 1]->attisdropped) ! continue; ! ! /* Fetch the input function and typioparam info */ ! if (cstate->binary) ! getTypeBinaryInputInfo(attr[attnum - 1]->atttypid, ! &in_func_oid, &typioparams[attnum - 1]); ! else ! getTypeInputInfo(attr[attnum - 1]->atttypid, ! &in_func_oid, &typioparams[attnum - 1]); ! fmgr_info(in_func_oid, &in_functions[attnum - 1]); ! ! /* Get default info if needed */ ! if (!list_member_int(cstate->attnumlist, attnum)) ! { ! /* attribute is NOT to be copied from input */ ! /* use default value if one exists */ ! Node *defexpr = build_column_default(cstate->rel, attnum); ! ! if (defexpr != NULL) ! { ! defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr, ! estate); ! defmap[num_defaults] = attnum - 1; ! num_defaults++; ! } ! } ! } ! ! /* We keep those variables in cstate. */ ! cstate->estate = estate; ! cstate->in_functions = in_functions; ! cstate->typioparams = typioparams; ! cstate->defmap = defmap; ! cstate->defexprs = defexprs; ! cstate->num_defaults = num_defaults; ! ! if (pipe) ! { ! if (whereToSendOutput == DestRemote) ! ReceiveCopyBegin(cstate); ! else ! cstate->copy_file = stdin; ! } else ! { ! struct stat st; ! ! cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); ! ! if (cstate->copy_file == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for reading: %m", ! cstate->filename))); ! ! fstat(fileno(cstate->copy_file), &st); ! if (S_ISDIR(st.st_mode)) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("\"%s\" is a directory", cstate->filename))); ! } ! ! if (!cstate->binary) ! { ! /* must rely on user to tell us... */ ! cstate->file_has_oids = cstate->oids; ! } ! else ! { ! /* Read and verify binary header */ ! char readSig[11]; ! int32 tmp; ! ! /* Signature */ ! if (CopyGetData(cstate, readSig, 11, 11) != 11 || ! memcmp(readSig, BinarySignature, 11) != 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("COPY file signature not recognized"))); ! /* Flags field */ ! if (!CopyGetInt32(cstate, &tmp)) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid COPY file header (missing flags)"))); ! cstate->file_has_oids = (tmp & (1 << 16)) != 0; ! tmp &= ~(1 << 16); ! if ((tmp >> 16) != 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("unrecognized critical flags in COPY file header"))); ! /* Header extension length */ ! if (!CopyGetInt32(cstate, &tmp) || ! tmp < 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid COPY file header (missing length)"))); ! /* Skip extension header, if present */ ! while (tmp-- > 0) ! { ! if (CopyGetData(cstate, readSig, 1, 1) != 1) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid COPY file header (wrong length)"))); ! } ! } ! ! if (cstate->file_has_oids && cstate->binary) ! { ! getTypeBinaryInputInfo(OIDOID, ! &in_func_oid, &cstate->oid_typioparam); ! fmgr_info(in_func_oid, &cstate->oid_in_function); ! } ! ! /* create workspace for CopyReadAttributes results */ ! if (!cstate->binary) ! { ! int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count; ! ! cstate->max_fields = nfields; ! cstate->raw_fields = (char **) palloc(nfields * sizeof(char *)); ! } ! ! return cstate; ! } ! ! static CopyState ! BeginCopyTo(Relation rel, ! Node *query, ! const char *queryString, ! const char *filename, ! List *attnamelist, ! List *options) ! { ! return BeginCopy(false, rel, query, queryString, ! filename, attnamelist, options); ! } ! ! /* return false if no more tuples */ ! bool ! NextCopyFrom(CopyState cstate, Datum *values, bool *nulls, Oid *tupleOid) ! { ! TupleDesc tupDesc; ! Form_pg_attribute *attr; ! AttrNumber num_phys_attrs, ! attr_count, ! num_defaults = cstate->num_defaults; ! FmgrInfo *in_functions = cstate->in_functions; ! Oid *typioparams = cstate->typioparams; ! int i; ! int nfields; ! char **field_strings; ! bool isnull; ! int *defmap = cstate->defmap; ! ExprState **defexprs = cstate->defexprs; ! ExprContext *econtext; /* used for ExecEvalExpr for default atts */ ! ! /* on input just throw the header line away */ ! if (cstate->cur_lineno == 0 && cstate->header_line) ! { ! cstate->cur_lineno++; ! if (CopyReadLine(cstate)) ! return false; /* done */ ! } ! ! cstate->cur_lineno++; ! ! tupDesc = RelationGetDescr(cstate->rel); ! attr = tupDesc->attrs; ! num_phys_attrs = tupDesc->natts; ! attr_count = list_length(cstate->attnumlist); ! nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count; ! ! /* Initialize all values for row to NULL */ ! MemSet(values, 0, num_phys_attrs * sizeof(Datum)); ! MemSet(nulls, true, num_phys_attrs * sizeof(bool)); ! ! if (!cstate->binary) ! { ! ListCell *cur; ! int fldct; ! int fieldno; ! char *string; ! ! /* ! * Actually read the line into memory here. ! * EOF at start of line means we're done. If we see EOF after ! * some characters, we act as though it was newline followed by ! * EOF, ie, process the line and then exit loop on next iteration. ! */ ! if (CopyReadLine(cstate) && cstate->line_buf.len == 0) ! return false; ! ! /* Parse the line into de-escaped field values */ ! if (cstate->csv_mode) ! fldct = CopyReadAttributesCSV(cstate); ! else ! fldct = CopyReadAttributesText(cstate); ! ! /* check for overflowing fields */ ! if (nfields > 0 && fldct > nfields) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("extra data after last expected column"))); ! ! fieldno = 0; ! field_strings = cstate->raw_fields; ! ! /* Read the OID field if present */ ! if (cstate->file_has_oids) ! { ! if (fieldno >= fldct) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("missing data for OID column"))); ! string = field_strings[fieldno++]; ! ! if (string == NULL) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("null OID in COPY data"))); ! else if (cstate->oids && tupleOid != NULL) ! { ! cstate->cur_attname = "oid"; ! cstate->cur_attval = string; ! *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin, ! CStringGetDatum(string))); ! if (*tupleOid == InvalidOid) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid OID in COPY data"))); ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! } ! } ! ! /* Loop to read the user attributes on the line. */ ! foreach(cur, cstate->attnumlist) ! { ! int attnum = lfirst_int(cur); ! int m = attnum - 1; ! ! if (fieldno >= fldct) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("missing data for column \"%s\"", ! NameStr(attr[m]->attname)))); ! string = field_strings[fieldno++]; ! ! if (cstate->csv_mode && string == NULL && ! cstate->force_notnull_flags[m]) ! { ! /* Go ahead and read the NULL string */ ! string = cstate->null_print; ! } ! ! cstate->cur_attname = NameStr(attr[m]->attname); ! cstate->cur_attval = string; ! values[m] = InputFunctionCall(&in_functions[m], ! string, ! typioparams[m], ! attr[m]->atttypmod); ! if (string != NULL) ! nulls[m] = false; ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! } ! ! Assert(fieldno == nfields); ! } ! else ! { ! /* binary */ ! int16 fld_count; ! ListCell *cur; ! ! if (!CopyGetInt16(cstate, &fld_count)) ! { ! /* EOF detected (end of file, or protocol-level EOF) */ ! return false; ! } ! ! if (fld_count == -1) ! { ! /* ! * Received EOF marker. In a V3-protocol copy, wait for ! * the protocol-level EOF, and complain if it doesn't come ! * immediately. This ensures that we correctly handle ! * CopyFail, if client chooses to send that now. ! * ! * Note that we MUST NOT try to read more data in an ! * old-protocol copy, since there is no protocol-level EOF ! * marker then. We could go either way for copy from file, ! * but choose to throw error if there's data after the EOF ! * marker, for consistency with the new-protocol case. ! */ ! char dummy; ! ! if (cstate->copy_dest != COPY_OLD_FE && ! CopyGetData(cstate, &dummy, 1, 1) > 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("received copy data after EOF marker"))); ! return false; ! } ! ! if (fld_count != attr_count) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("row field count is %d, expected %d", ! (int) fld_count, attr_count))); ! ! if (cstate->file_has_oids) ! { ! Oid oid; ! ! cstate->cur_attname = "oid"; ! oid = DatumGetObjectId(CopyReadBinaryAttribute(cstate, ! 0, ! &cstate->oid_in_function, ! cstate->oid_typioparam, ! -1, ! &isnull)); ! if (isnull || oid == InvalidOid) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid OID in COPY data"))); ! cstate->cur_attname = NULL; ! if (cstate->oids && tupleOid != NULL) ! *tupleOid = oid; ! } ! ! i = 0; ! foreach(cur, cstate->attnumlist) ! { ! int attnum = lfirst_int(cur); ! int m = attnum - 1; ! ! cstate->cur_attname = NameStr(attr[m]->attname); ! i++; ! values[m] = CopyReadBinaryAttribute(cstate, ! i, ! &in_functions[m], ! typioparams[m], ! attr[m]->atttypmod, ! &nulls[m]); ! cstate->cur_attname = NULL; ! } ! } /* ! * Now compute and insert any defaults available for the columns not ! * provided by the input data. Anything not processed here or above ! * will remain NULL. */ ! econtext = GetPerTupleExprContext(cstate->estate); ! for (i = 0; i < num_defaults; i++) ! { ! values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, ! &nulls[defmap[i]], NULL); ! } ! ! return true; ! } ! ! uint64 ! DoCopy(const CopyStmt *stmt, const char *queryString) ! { ! CopyState cstate; ! bool is_from = stmt->is_from; ! bool pipe = (stmt->filename == NULL); ! Relation rel; ! uint64 processed; ! ! /* Disallow file COPY except to superusers. */ ! if (!pipe && !superuser()) ! ereport(ERROR, ! (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), ! errmsg("must be superuser to COPY to or from a file"), ! errhint("Anyone can COPY to stdout or from stdin. " ! "psql's \\copy command also works for anyone."))); ! ! if (stmt->relation) ! { ! Assert(!stmt->query); ! ! /* Open and lock the relation, using the appropriate lock type. */ ! rel = heap_openrv(stmt->relation, ! (is_from ? RowExclusiveLock : AccessShareLock)); ! } else { ! Assert(stmt->query); ! ! rel = NULL; } ! if (is_from) ! { ! cstate = BeginCopyFrom(rel, stmt->filename, stmt->attlist, stmt->options); ! processed = CopyFrom(cstate, rel); /* copy from file to database */ ! EndCopyFrom(cstate); ! } ! else ! { ! cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename, stmt->attlist, stmt->options); ! processed = DoCopyTo(cstate); /* copy from database to file */ ! EndCopyTo(cstate); ! } + /* + * Close the relation. If reading, we can release the AccessShareLock we got; + * if writing, we should hold the lock until end of transaction to ensure that + * updates will be committed before lock is released. + */ + if (rel != NULL) + heap_close(rel, (is_from ? NoLock : AccessShareLock)); + + return processed; + } + + void + EndCopyFrom(CopyState cstate) + { + FreeExecutorState(cstate->estate); + + /* Clean up storage */ + if (cstate->filename) + { + if (FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from file \"%s\": %m", + cstate->filename))); + pfree(cstate->filename); + } + if (!cstate->binary) + pfree(cstate->raw_fields); pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); pfree(cstate->raw_buf); + pfree(cstate->in_functions); + pfree(cstate->typioparams); + pfree(cstate->defmap); + pfree(cstate->defexprs); pfree(cstate); } + static void + EndCopyTo(CopyState cstate) + { + /* + * Close the relation or query. We can release the AccessShareLock we got. + */ + if (cstate->rel == NULL) + { + /* Close down the query and free resources. */ + ExecutorEnd(cstate->queryDesc); + FreeQueryDesc(cstate->queryDesc); + PopActiveSnapshot(); + } + + /* Clean up storage */ + if (cstate->filename) + pfree(cstate->filename); + pfree(cstate); + } /* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */ ! static uint64 DoCopyTo(CopyState cstate) { bool pipe = (cstate->filename == NULL); + uint64 processed; if (cstate->rel) { *************** DoCopyTo(CopyState cstate) *** 1291,1297 **** if (cstate->fe_copy) SendCopyBegin(cstate); ! CopyTo(cstate); if (cstate->fe_copy) SendCopyEnd(cstate); --- 1784,1790 ---- if (cstate->fe_copy) SendCopyBegin(cstate); ! processed = CopyTo(cstate); if (cstate->fe_copy) SendCopyEnd(cstate); *************** DoCopyTo(CopyState cstate) *** 1316,1333 **** errmsg("could not write to file \"%s\": %m", cstate->filename))); } } /* * Copy from relation or query TO file. */ ! static void CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; Form_pg_attribute *attr; ListCell *cur; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); --- 1809,1829 ---- errmsg("could not write to file \"%s\": %m", cstate->filename))); } + + return processed; } /* * Copy from relation or query TO file. */ ! static uint64 CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; Form_pg_attribute *attr; ListCell *cur; + uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); *************** CopyTo(CopyState cstate) *** 1433,1438 **** --- 1929,1935 ---- scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); + processed = 0; while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { CHECK_FOR_INTERRUPTS(); *************** CopyTo(CopyState cstate) *** 1442,1447 **** --- 1939,1945 ---- /* Format and send the data */ CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls); + processed++; } heap_endscan(scandesc); *************** CopyTo(CopyState cstate) *** 1450,1455 **** --- 1948,1954 ---- { /* run the plan --- the dest receiver will send tuples */ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } if (cstate->binary) *************** CopyTo(CopyState cstate) *** 1461,1466 **** --- 1960,1967 ---- } MemoryContextDelete(cstate->rowcontext); + + return processed; } /* *************** CopyOneRowTo(CopyState cstate, Oid tuple *** 1552,1567 **** CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); - - cstate->processed++; } /* * error context callback for COPY FROM */ ! static void ! copy_in_error_callback(void *arg) { CopyState cstate = (CopyState) arg; --- 2053,2066 ---- CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); } /* * error context callback for COPY FROM */ ! void ! CopyFromErrorCallback(void *arg) { CopyState cstate = (CopyState) arg; *************** limit_printout_length(const char *str) *** 1663,1725 **** /* * Copy FROM file to relation. */ ! static void ! CopyFrom(CopyState cstate) { - bool pipe = (cstate->filename == NULL); HeapTuple tuple; TupleDesc tupDesc; - Form_pg_attribute *attr; - AttrNumber num_phys_attrs, - attr_count, - num_defaults; - FmgrInfo *in_functions; - FmgrInfo oid_in_function; - Oid *typioparams; - Oid oid_typioparam; - int attnum; - int i; - Oid in_func_oid; Datum *values; bool *nulls; - int nfields; - char **field_strings; bool done = false; - bool isnull; ResultRelInfo *resultRelInfo; ! EState *estate = CreateExecutorState(); /* for ExecConstraints() */ TupleTableSlot *slot; - bool file_has_oids; - int *defmap; - ExprState **defexprs; /* array of default att expressions */ - ExprContext *econtext; /* used for ExecEvalExpr for default atts */ MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; ! Assert(cstate->rel); ! if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) { ! if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", ! RelationGetRelationName(cstate->rel)))); ! else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to sequence \"%s\"", ! RelationGetRelationName(cstate->rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to non-table relation \"%s\"", ! RelationGetRelationName(cstate->rel)))); } /*---------- * Check to see if we can avoid writing WAL * --- 2162,2209 ---- /* * Copy FROM file to relation. */ ! static uint64 ! CopyFrom(CopyState cstate, Relation rel) { HeapTuple tuple; TupleDesc tupDesc; Datum *values; bool *nulls; bool done = false; ResultRelInfo *resultRelInfo; ! EState *estate; /* for ExecConstraints() */ TupleTableSlot *slot; MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; + uint64 processed = 0; ! Assert(rel != NULL); ! if (rel->rd_rel->relkind != RELKIND_RELATION) { ! if (rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to view \"%s\"", ! RelationGetRelationName(rel)))); ! else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to sequence \"%s\"", ! RelationGetRelationName(rel)))); else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot copy to non-table relation \"%s\"", ! RelationGetRelationName(rel)))); } + estate = GetCopyExecutorState(cstate); + tupDesc = RelationGetDescr(rel); + /*---------- * Check to see if we can avoid writing WAL * *************** CopyFrom(CopyState cstate) *** 1747,1792 **** * no additional work to enforce that. *---------- */ ! if (cstate->rel->rd_createSubid != InvalidSubTransactionId || ! cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId) { hi_options |= HEAP_INSERT_SKIP_FSM; if (!XLogIsNeeded()) hi_options |= HEAP_INSERT_SKIP_WAL; } - if (pipe) - { - if (whereToSendOutput == DestRemote) - ReceiveCopyBegin(cstate); - else - cstate->copy_file = stdin; - } - else - { - struct stat st; - - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); - - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); - } - - tupDesc = RelationGetDescr(cstate->rel); - attr = tupDesc->attrs; - num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); - num_defaults = 0; - /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code --- 2231,2244 ---- * no additional work to enforce that. *---------- */ ! if (rel->rd_createSubid != InvalidSubTransactionId || ! rel->rd_newRelfilenodeSubid != InvalidSubTransactionId) { hi_options |= HEAP_INSERT_SKIP_FSM; if (!XLogIsNeeded()) hi_options |= HEAP_INSERT_SKIP_WAL; } /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code *************** CopyFrom(CopyState cstate) *** 1794,1801 **** */ resultRelInfo = makeNode(ResultRelInfo); resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ ! resultRelInfo->ri_RelationDesc = cstate->rel; ! resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc); if (resultRelInfo->ri_TrigDesc) { resultRelInfo->ri_TrigFunctions = (FmgrInfo *) --- 2246,2253 ---- */ resultRelInfo = makeNode(ResultRelInfo); resultRelInfo->ri_RangeTableIndex = 1; /* dummy */ ! resultRelInfo->ri_RelationDesc = rel; ! resultRelInfo->ri_TrigDesc = CopyTriggerDesc(rel->trigdesc); if (resultRelInfo->ri_TrigDesc) { resultRelInfo->ri_TrigFunctions = (FmgrInfo *) *************** CopyFrom(CopyState cstate) *** 1815,1865 **** slot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(slot, tupDesc); - econtext = GetPerTupleExprContext(estate); - - /* - * Pick up the required catalog information for each attribute in the - * relation, including the input function, the element type (to pass to - * the input function), and info about defaults and constraints. (Which - * input function we use depends on text/binary format choice.) - */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); - defmap = (int *) palloc(num_phys_attrs * sizeof(int)); - defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); - - for (attnum = 1; attnum <= num_phys_attrs; attnum++) - { - /* We don't need info for dropped attributes */ - if (attr[attnum - 1]->attisdropped) - continue; - - /* Fetch the input function and typioparam info */ - if (cstate->binary) - getTypeBinaryInputInfo(attr[attnum - 1]->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(attr[attnum - 1]->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - - /* Get default info if needed */ - if (!list_member_int(cstate->attnumlist, attnum)) - { - /* attribute is NOT to be copied from input */ - /* use default value if one exists */ - Node *defexpr = build_column_default(cstate->rel, attnum); - - if (defexpr != NULL) - { - defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr, - estate); - defmap[num_defaults] = attnum - 1; - num_defaults++; - } - } - } - /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); --- 2267,2272 ---- *************** CopyFrom(CopyState cstate) *** 1871,1958 **** */ ExecBSInsertTriggers(estate, resultRelInfo); ! if (!cstate->binary) ! file_has_oids = cstate->oids; /* must rely on user to tell us... */ ! else ! { ! /* Read and verify binary header */ ! char readSig[11]; ! int32 tmp; ! ! /* Signature */ ! if (CopyGetData(cstate, readSig, 11, 11) != 11 || ! memcmp(readSig, BinarySignature, 11) != 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("COPY file signature not recognized"))); ! /* Flags field */ ! if (!CopyGetInt32(cstate, &tmp)) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid COPY file header (missing flags)"))); ! file_has_oids = (tmp & (1 << 16)) != 0; ! tmp &= ~(1 << 16); ! if ((tmp >> 16) != 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("unrecognized critical flags in COPY file header"))); ! /* Header extension length */ ! if (!CopyGetInt32(cstate, &tmp) || ! tmp < 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid COPY file header (missing length)"))); ! /* Skip extension header, if present */ ! while (tmp-- > 0) ! { ! if (CopyGetData(cstate, readSig, 1, 1) != 1) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid COPY file header (wrong length)"))); ! } ! } ! ! if (file_has_oids && cstate->binary) ! { ! getTypeBinaryInputInfo(OIDOID, ! &in_func_oid, &oid_typioparam); ! fmgr_info(in_func_oid, &oid_in_function); ! } ! ! values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); ! nulls = (bool *) palloc(num_phys_attrs * sizeof(bool)); ! ! /* create workspace for CopyReadAttributes results */ ! nfields = file_has_oids ? (attr_count + 1) : attr_count; ! if (! cstate->binary) ! { ! cstate->max_fields = nfields; ! cstate->raw_fields = (char **) palloc(nfields * sizeof(char *)); ! } ! ! /* Initialize state variables */ ! cstate->fe_eof = false; ! cstate->eol_type = EOL_UNKNOWN; ! cstate->cur_relname = RelationGetRelationName(cstate->rel); ! cstate->cur_lineno = 0; ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; bistate = GetBulkInsertState(); /* Set up callback to identify error line number */ ! errcontext.callback = copy_in_error_callback; errcontext.arg = (void *) cstate; errcontext.previous = error_context_stack; error_context_stack = &errcontext; - /* on input just throw the header line away */ - if (cstate->header_line) - { - cstate->cur_lineno++; - done = CopyReadLine(cstate); - } - while (!done) { bool skip_tuple; --- 2278,2294 ---- */ ExecBSInsertTriggers(estate, resultRelInfo); ! values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); ! nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); bistate = GetBulkInsertState(); /* Set up callback to identify error line number */ ! errcontext.callback = CopyFromErrorCallback; errcontext.arg = (void *) cstate; errcontext.previous = error_context_stack; error_context_stack = &errcontext; while (!done) { bool skip_tuple; *************** CopyFrom(CopyState cstate) *** 1960,2166 **** CHECK_FOR_INTERRUPTS(); - cstate->cur_lineno++; - /* Reset the per-tuple exprcontext */ ResetPerTupleExprContext(estate); /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); ! /* Initialize all values for row to NULL */ ! MemSet(values, 0, num_phys_attrs * sizeof(Datum)); ! MemSet(nulls, true, num_phys_attrs * sizeof(bool)); ! ! if (!cstate->binary) ! { ! ListCell *cur; ! int fldct; ! int fieldno; ! char *string; ! ! /* Actually read the line into memory here */ ! done = CopyReadLine(cstate); ! ! /* ! * EOF at start of line means we're done. If we see EOF after ! * some characters, we act as though it was newline followed by ! * EOF, ie, process the line and then exit loop on next iteration. ! */ ! if (done && cstate->line_buf.len == 0) ! break; ! ! /* Parse the line into de-escaped field values */ ! if (cstate->csv_mode) ! fldct = CopyReadAttributesCSV(cstate); ! else ! fldct = CopyReadAttributesText(cstate); ! ! /* check for overflowing fields */ ! if (nfields > 0 && fldct > nfields) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("extra data after last expected column"))); ! ! fieldno = 0; ! field_strings = cstate->raw_fields; ! ! /* Read the OID field if present */ ! if (file_has_oids) ! { ! if (fieldno >= fldct) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("missing data for OID column"))); ! string = field_strings[fieldno++]; ! ! if (string == NULL) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("null OID in COPY data"))); ! else ! { ! cstate->cur_attname = "oid"; ! cstate->cur_attval = string; ! loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin, ! CStringGetDatum(string))); ! if (loaded_oid == InvalidOid) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid OID in COPY data"))); ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! } ! } ! ! /* Loop to read the user attributes on the line. */ ! foreach(cur, cstate->attnumlist) ! { ! int attnum = lfirst_int(cur); ! int m = attnum - 1; ! ! if (fieldno >= fldct) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("missing data for column \"%s\"", ! NameStr(attr[m]->attname)))); ! string = field_strings[fieldno++]; ! ! if (cstate->csv_mode && string == NULL && ! cstate->force_notnull_flags[m]) ! { ! /* Go ahead and read the NULL string */ ! string = cstate->null_print; ! } ! ! cstate->cur_attname = NameStr(attr[m]->attname); ! cstate->cur_attval = string; ! values[m] = InputFunctionCall(&in_functions[m], ! string, ! typioparams[m], ! attr[m]->atttypmod); ! if (string != NULL) ! nulls[m] = false; ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! } ! ! Assert(fieldno == nfields); ! } ! else ! { ! /* binary */ ! int16 fld_count; ! ListCell *cur; ! ! if (!CopyGetInt16(cstate, &fld_count)) ! { ! /* EOF detected (end of file, or protocol-level EOF) */ ! done = true; ! break; ! } ! ! if (fld_count == -1) ! { ! /* ! * Received EOF marker. In a V3-protocol copy, wait for ! * the protocol-level EOF, and complain if it doesn't come ! * immediately. This ensures that we correctly handle ! * CopyFail, if client chooses to send that now. ! * ! * Note that we MUST NOT try to read more data in an ! * old-protocol copy, since there is no protocol-level EOF ! * marker then. We could go either way for copy from file, ! * but choose to throw error if there's data after the EOF ! * marker, for consistency with the new-protocol case. ! */ ! char dummy; ! ! if (cstate->copy_dest != COPY_OLD_FE && ! CopyGetData(cstate, &dummy, 1, 1) > 0) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("received copy data after EOF marker"))); ! done = true; ! break; ! } ! ! if (fld_count != attr_count) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("row field count is %d, expected %d", ! (int) fld_count, attr_count))); ! ! if (file_has_oids) ! { ! cstate->cur_attname = "oid"; ! loaded_oid = ! DatumGetObjectId(CopyReadBinaryAttribute(cstate, ! 0, ! &oid_in_function, ! oid_typioparam, ! -1, ! &isnull)); ! if (isnull || loaded_oid == InvalidOid) ! ereport(ERROR, ! (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), ! errmsg("invalid OID in COPY data"))); ! cstate->cur_attname = NULL; ! } ! ! i = 0; ! foreach(cur, cstate->attnumlist) ! { ! int attnum = lfirst_int(cur); ! int m = attnum - 1; ! ! cstate->cur_attname = NameStr(attr[m]->attname); ! i++; ! values[m] = CopyReadBinaryAttribute(cstate, ! i, ! &in_functions[m], ! typioparams[m], ! attr[m]->atttypmod, ! &nulls[m]); ! cstate->cur_attname = NULL; ! } ! } ! ! /* ! * Now compute and insert any defaults available for the columns not ! * provided by the input data. Anything not processed here or above ! * will remain NULL. ! */ ! for (i = 0; i < num_defaults; i++) ! { ! values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, ! &nulls[defmap[i]], NULL); ! } /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); ! if (cstate->oids && file_has_oids) HeapTupleSetOid(tuple, loaded_oid); /* Triggers and stuff need to be invoked in query context. */ --- 2296,2315 ---- CHECK_FOR_INTERRUPTS(); /* Reset the per-tuple exprcontext */ ResetPerTupleExprContext(estate); /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); ! done = !NextCopyFrom(cstate, values, nulls, &loaded_oid); ! if (done) ! break; /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); ! if (loaded_oid != InvalidOid) HeapTupleSetOid(tuple, loaded_oid); /* Triggers and stuff need to be invoked in query context. */ *************** CopyFrom(CopyState cstate) *** 2193,2203 **** ExecStoreTuple(tuple, slot, InvalidBuffer, false); /* Check the constraints of the tuple */ ! if (cstate->rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); /* OK, store the tuple and create index entries for it */ ! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), --- 2342,2352 ---- ExecStoreTuple(tuple, slot, InvalidBuffer, false); /* Check the constraints of the tuple */ ! if (rel->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); /* OK, store the tuple and create index entries for it */ ! heap_insert(rel, tuple, mycid, hi_options, bistate); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), *************** CopyFrom(CopyState cstate) *** 2214,2220 **** * this is the same definition used by execMain.c for counting * tuples inserted by an INSERT command. */ ! cstate->processed++; } } --- 2363,2369 ---- * this is the same definition used by execMain.c for counting * tuples inserted by an INSERT command. */ ! processed++; } } *************** CopyFrom(CopyState cstate) *** 2233,2267 **** pfree(values); pfree(nulls); - if (! cstate->binary) - pfree(cstate->raw_fields); - - pfree(in_functions); - pfree(typioparams); - pfree(defmap); - pfree(defexprs); ExecResetTupleTable(estate->es_tupleTable, false); ExecCloseIndices(resultRelInfo); - FreeExecutorState(estate); - - if (!pipe) - { - if (FreeFile(cstate->copy_file)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from file \"%s\": %m", - cstate->filename))); - } - /* * If we skipped writing WAL, then we need to sync the heap (but not * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel); } --- 2382,2400 ---- pfree(values); pfree(nulls); ExecResetTupleTable(estate->es_tupleTable, false); ExecCloseIndices(resultRelInfo); /* * If we skipped writing WAL, then we need to sync the heap (but not * indexes since those use WAL anyway) */ if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(rel); ! ! return processed; } *************** CopyGetAttnums(TupleDesc tupDesc, Relati *** 3502,3507 **** --- 3635,3647 ---- return attnums; } + /* retrieve internal EState in CopyState */ + EState * + GetCopyExecutorState(CopyState cstate) + { + return cstate->estate; + } + /* * copy_dest_startup --- executor startup *************** copy_dest_receive(TupleTableSlot *slot, *** 3526,3531 **** --- 3666,3672 ---- /* And send the data */ CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); + myState->processed++; } /* diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 6d409e8..8c69703 100644 *** a/src/include/commands/copy.h --- b/src/include/commands/copy.h *************** *** 14,25 **** --- 14,36 ---- #ifndef COPY_H #define COPY_H + #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "tcop/dest.h" + typedef struct CopyStateData *CopyState; + extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString); + extern CopyState BeginCopyFrom(Relation rel, const char *filename, + List *attnamelist, List *options); + extern void EndCopyFrom(CopyState cstate); + extern bool NextCopyFrom(CopyState cstate, + Datum *values, bool *nulls, Oid *tupleOid); + extern EState *GetCopyExecutorState(CopyState cstate); + extern void CopyFromErrorCallback(void *arg); + extern DestReceiver *CreateCopyDestReceiver(void); #endif /* COPY_H */