From 86fd522d40eefc6320ee4f963404454855d5dcfc Mon Sep 17 00:00:00 2001 From: Alex K Date: Fri, 9 Jun 2017 23:41:51 +0300 Subject: [PATCH 1/2] Allow ignoring some errors during COPY FROM --- contrib/file_fdw/file_fdw.c | 4 +- src/backend/commands/copy.c | 294 ++++++++++++++++++++++++++------------------ src/include/commands/copy.h | 2 +- 3 files changed, 178 insertions(+), 122 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 277639f6e9..1d855ca6e7 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -689,7 +689,7 @@ fileIterateForeignScan(ForeignScanState *node) { FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - bool found; + int found; ErrorContextCallback errcallback; /* Set up callback to identify error line number. */ @@ -1080,7 +1080,7 @@ file_acquire_sample_rows(Relation onerel, int elevel, TupleDesc tupDesc; Datum *values; bool *nulls; - bool found; + int found; char *filename; bool is_program; List *options; diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 0a33c40c17..d40d753c47 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -54,6 +54,16 @@ #define OCTVALUE(c) ((c) - '0') /* + NextCopyFrom states: + 0 – Error or stop + 1 – Successfully read data + 2 – Data with errors, skip +*/ +#define NCF_STOP 0 +#define NCF_SUCCESS 1 +#define NCF_SKIP 2 + +/* * Represents the different source/dest cases we need to worry about at * the bottom level */ @@ -139,6 +149,7 @@ typedef struct CopyStateData int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ + bool ignore_errors; /* ignore errors during COPY FROM */ /* * Working state for COPY TO/FROM @@ -2310,6 +2321,7 @@ CopyFrom(CopyState cstate) bool useHeapMultiInsert; int nBufferedTuples = 0; int prev_leaf_part_index = -1; + int next_cf_state; /* NextCopyFrom return state */ #define MAX_BUFFERED_TUPLES 1000 HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */ @@ -2519,116 +2531,125 @@ CopyFrom(CopyState cstate) /* Switch into its memory context */ MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid)) - break; - - /* And now we can form the input tuple. */ - tuple = heap_form_tuple(tupDesc, values, nulls); - - if (loaded_oid != InvalidOid) - HeapTupleSetOid(tuple, loaded_oid); - - /* - * Constraints might reference the tableoid column, so initialize - * t_tableOid before evaluating them. - */ - tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); - - /* Triggers and stuff need to be invoked in query context. */ - MemoryContextSwitchTo(oldcontext); - - /* Place tuple in tuple slot --- but slot shouldn't free it */ - slot = myslot; - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - - /* Determine the partition to heap_insert the tuple into */ - if (cstate->partition_dispatch_info) - { - int leaf_part_index; - TupleConversionMap *map; - - /* - * Away we go ... If we end up not finding a partition after all, - * ExecFindPartition() does not return and errors out instead. - * Otherwise, the returned value is to be used as an index into - * arrays mt_partitions[] and mt_partition_tupconv_maps[] that - * will get us the ResultRelInfo and TupleConversionMap for the - * partition, respectively. - */ - leaf_part_index = ExecFindPartition(resultRelInfo, - cstate->partition_dispatch_info, - slot, - estate); - Assert(leaf_part_index >= 0 && - leaf_part_index < cstate->num_partitions); - - /* - * If this tuple is mapped to a partition that is not same as the - * previous one, we'd better make the bulk insert mechanism gets a - * new buffer. - */ - if (prev_leaf_part_index != leaf_part_index) - { - ReleaseBulkInsertStatePin(bistate); - prev_leaf_part_index = leaf_part_index; - } - - /* - * Save the old ResultRelInfo and switch to the one corresponding - * to the selected partition. - */ - saved_resultRelInfo = resultRelInfo; - resultRelInfo = cstate->partitions + leaf_part_index; - - /* We do not yet have a way to insert into a foreign partition */ - if (resultRelInfo->ri_FdwRoutine) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot route inserted tuples to a foreign table"))); - - /* - * For ExecInsertIndexTuples() to work on the partition's indexes - */ - estate->es_result_relation_info = resultRelInfo; - - /* - * We might need to convert from the parent rowtype to the - * partition rowtype. - */ - map = cstate->partition_tupconv_maps[leaf_part_index]; - if (map) - { - Relation partrel = resultRelInfo->ri_RelationDesc; - - tuple = do_convert_tuple(tuple, map); - - /* - * We must use the partition's tuple descriptor from this - * point on. Use a dedicated slot from this point on until - * we're finished dealing with the partition. - */ - slot = cstate->partition_tuple_slot; - Assert(slot != NULL); - ExecSetSlotDescriptor(slot, RelationGetDescr(partrel)); - ExecStoreTuple(tuple, slot, InvalidBuffer, true); - } + next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid); - tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); - } - - skip_tuple = false; - - /* BEFORE ROW INSERT Triggers */ - if (resultRelInfo->ri_TrigDesc && - resultRelInfo->ri_TrigDesc->trig_insert_before_row) - { - slot = ExecBRInsertTriggers(estate, resultRelInfo, slot); - - if (slot == NULL) /* "do nothing" */ - skip_tuple = true; - else /* trigger might have changed tuple */ - tuple = ExecMaterializeSlot(slot); + if (!next_cf_state) { + break; } + else if (next_cf_state == NCF_SUCCESS) + { + /* And now we can form the input tuple. */ + tuple = heap_form_tuple(tupDesc, values, nulls); + + if (loaded_oid != InvalidOid) + HeapTupleSetOid(tuple, loaded_oid); + + /* + * Constraints might reference the tableoid column, so initialize + * t_tableOid before evaluating them. + */ + tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* Triggers and stuff need to be invoked in query context. */ + MemoryContextSwitchTo(oldcontext); + + /* Place tuple in tuple slot --- but slot shouldn't free it */ + slot = myslot; + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + /* Determine the partition to heap_insert the tuple into */ + if (cstate->partition_dispatch_info) + { + int leaf_part_index; + TupleConversionMap *map; + + /* + * Away we go ... If we end up not finding a partition after all, + * ExecFindPartition() does not return and errors out instead. + * Otherwise, the returned value is to be used as an index into + * arrays mt_partitions[] and mt_partition_tupconv_maps[] that + * will get us the ResultRelInfo and TupleConversionMap for the + * partition, respectively. + */ + leaf_part_index = ExecFindPartition(resultRelInfo, + cstate->partition_dispatch_info, + slot, + estate); + Assert(leaf_part_index >= 0 && + leaf_part_index < cstate->num_partitions); + + /* + * If this tuple is mapped to a partition that is not same as the + * previous one, we'd better make the bulk insert mechanism gets a + * new buffer. + */ + if (prev_leaf_part_index != leaf_part_index) + { + ReleaseBulkInsertStatePin(bistate); + prev_leaf_part_index = leaf_part_index; + } + + /* + * Save the old ResultRelInfo and switch to the one corresponding + * to the selected partition. + */ + saved_resultRelInfo = resultRelInfo; + resultRelInfo = cstate->partitions + leaf_part_index; + + /* We do not yet have a way to insert into a foreign partition */ + if (resultRelInfo->ri_FdwRoutine) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot route inserted tuples to a foreign table"))); + + /* + * For ExecInsertIndexTuples() to work on the partition's indexes + */ + estate->es_result_relation_info = resultRelInfo; + + /* + * We might need to convert from the parent rowtype to the + * partition rowtype. + */ + map = cstate->partition_tupconv_maps[leaf_part_index]; + if (map) + { + Relation partrel = resultRelInfo->ri_RelationDesc; + + tuple = do_convert_tuple(tuple, map); + + /* + * We must use the partition's tuple descriptor from this + * point on. Use a dedicated slot from this point on until + * we're finished dealing with the partition. + */ + slot = cstate->partition_tuple_slot; + Assert(slot != NULL); + ExecSetSlotDescriptor(slot, RelationGetDescr(partrel)); + ExecStoreTuple(tuple, slot, InvalidBuffer, true); + } + + tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + } + + skip_tuple = false; + + /* BEFORE ROW INSERT Triggers */ + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row) + { + slot = ExecBRInsertTriggers(estate, resultRelInfo, slot); + + if (slot == NULL) /* "do nothing" */ + skip_tuple = true; + else /* trigger might have changed tuple */ + tuple = ExecMaterializeSlot(slot); + } + } + else + { + skip_tuple = true; + } if (!skip_tuple) { @@ -2925,6 +2946,7 @@ BeginCopyFrom(ParseState *pstate, cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; + cstate->ignore_errors = true; /* Set up variables to avoid per-attribute overhead. */ initStringInfo(&cstate->attribute_buf); @@ -3202,7 +3224,7 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields) * relation passed to BeginCopyFrom. This function fills the arrays. * Oid of the tuple is returned with 'tupleOid' separately. */ -bool +int NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid) { @@ -3220,11 +3242,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; + int error_level = ERROR; /* Error level for COPY FROM input data errors */ + int exec_state = NCF_SUCCESS; /* Return code */ + tupDesc = RelationGetDescr(cstate->rel); attr = tupDesc->attrs; num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); nfields = file_has_oids ? (attr_count + 1) : attr_count; + + /* Set error level to WARNING, if errors handling is turned on */ + if (cstate->ignore_errors) + { + error_level = WARNING; + } /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); @@ -3240,13 +3271,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* read raw fields in the next line */ if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; + return NCF_STOP; /* check for overflowing fields */ if (nfields > 0 && fldct > nfields) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("extra data after last expected column"))); + } + fieldno = 0; @@ -3254,15 +3289,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, if (file_has_oids) { if (fieldno >= fldct) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for OID column"))); + } + string = field_strings[fieldno++]; if (string == NULL) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("null OID in COPY data"))); + } else if (cstate->oids && tupleOid != NULL) { cstate->cur_attname = "oid"; @@ -3270,9 +3312,13 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(string))); if (*tupleOid == InvalidOid) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); + } + cstate->cur_attname = NULL; cstate->cur_attval = NULL; } @@ -3285,10 +3331,14 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, int m = attnum - 1; if (fieldno >= fldct) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("missing data for column \"%s\"", NameStr(attr[m]->attname)))); + } + string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3371,14 +3421,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); - return false; + return NCF_STOP; } if (fld_count != attr_count) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("row field count is %d, expected %d", (int) fld_count, attr_count))); + } if (file_has_oids) { @@ -3393,9 +3446,12 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, -1, &isnull)); if (isnull || loaded_oid == InvalidOid) - ereport(ERROR, + { + exec_state = NCF_SKIP; + ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); + } cstate->cur_attname = NULL; if (cstate->oids && tupleOid != NULL) *tupleOid = loaded_oid; @@ -3437,7 +3493,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, &nulls[defmap[i]]); } - return true; + return exec_state; } /* diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f081f2219f..c4d38dbea7 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -31,7 +31,7 @@ extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_fro extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options); extern void EndCopyFrom(CopyState cstate); -extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, +extern int NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid); extern bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields); -- 2.11.0 From 7acc7b951c75f7e307f176d5ad991abe21592eb4 Mon Sep 17 00:00:00 2001 From: Alex K Date: Sun, 11 Jun 2017 16:48:37 +0300 Subject: [PATCH 2/2] Report line number on each ERROR/WARNING --- src/backend/commands/copy.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index d40d753c47..ed2cc87c8a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -3279,7 +3279,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, exec_state = NCF_SKIP; ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + errmsg("extra data after last expected column at line %d", cstate->cur_lineno))); } @@ -3293,7 +3293,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, exec_state = NCF_SKIP; ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for OID column"))); + errmsg("missing data for OID column at line %d", cstate->cur_lineno))); } string = field_strings[fieldno++]; @@ -3303,7 +3303,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, exec_state = NCF_SKIP; ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("null OID in COPY data"))); + errmsg("null OID in COPY data at line %d", cstate->cur_lineno))); } else if (cstate->oids && tupleOid != NULL) { @@ -3316,7 +3316,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, exec_state = NCF_SKIP; ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid OID in COPY data"))); + errmsg("invalid OID in COPY data at line %d", cstate->cur_lineno))); } cstate->cur_attname = NULL; @@ -3335,8 +3335,8 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, exec_state = NCF_SKIP; ereport(error_level, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(attr[m]->attname)))); + errmsg("missing data for column \"%s\" at line %d", + NameStr(attr[m]->attname), cstate->cur_lineno))); } string = field_strings[fieldno++]; -- 2.11.0