contrib/file_fdw/file_fdw.c | 130 ++++++++++++++++++++++++++++++++++++++++++- src/backend/commands/copy.c | 76 ++++++++++++++++++++----- 2 files changed, 189 insertions(+), 17 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index e3b9223..fd9412f 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -16,6 +16,7 @@ #include #include "access/reloptions.h" +#include "access/sysattr.h" #include "catalog/pg_foreign_table.h" #include "commands/copy.h" #include "commands/defrem.h" @@ -29,6 +30,7 @@ #include "optimizer/pathnode.h" #include "optimizer/planmain.h" #include "optimizer/restrictinfo.h" +#include "optimizer/var.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -136,6 +138,9 @@ static bool is_valid_option(const char *option, Oid context); static void fileGetOptions(Oid foreigntableid, char **filename, List **other_options); static List *get_file_fdw_attribute_options(Oid relid); +static bool check_selective_binary_conversion(RelOptInfo *baserel, + Oid foreigntableid, + List **columns); static void estimate_size(PlannerInfo *root, RelOptInfo *baserel, FileFdwPlanState *fdw_private); static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, @@ -457,6 +462,16 @@ fileGetForeignPaths(PlannerInfo *root, FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private; Cost startup_cost; Cost total_cost; + bool result; + List *columns; + List *coption = NIL; + + /* Decide whether to selectively perform binary conversion */ + result = check_selective_binary_conversion(baserel, + foreigntableid, + &columns); + if (result) + coption = list_make1(makeDefElem("convert_binary", (Node *) columns)); /* Estimate costs */ estimate_costs(root, baserel, fdw_private, @@ -470,7 +485,7 @@ fileGetForeignPaths(PlannerInfo *root, total_cost, NIL, /* no pathkeys */ NULL, /* no outer rel either */ - NIL)); /* no fdw_private data */ + coption)); /* * If data file was sorted, and we knew it somehow, we could insert @@ -507,7 +522,7 @@ fileGetForeignPlan(PlannerInfo *root, scan_clauses, scan_relid, NIL, /* no expressions to evaluate */ - NIL); /* no private state either */ + best_path->fdw_private); } /* @@ -544,6 +559,7 @@ fileExplainForeignScan(ForeignScanState *node, ExplainState *es) static void fileBeginForeignScan(ForeignScanState *node, int eflags) { + ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; char *filename; List *options; CopyState cstate; @@ -559,6 +575,10 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &filename, &options); + /* Add an option for selective binary conversion */ + if(plan->fdw_private != NIL) + options = list_concat(options, plan->fdw_private); + /* * Create CopyState from FDW options. We always acquire all columns, so * as to match the expected ScanTupleSlot signature. @@ -695,6 +715,112 @@ fileAnalyzeForeignTable(Relation relation, } /* + * check_selective_binary_conversion + */ +static bool +check_selective_binary_conversion(RelOptInfo *baserel, + Oid foreigntableid, + List **columns) +{ + ForeignTable *table; + ListCell *lc; + Relation rel; + TupleDesc tupleDesc; + AttrNumber attnum; + Bitmapset *attrs_used = NULL; + Bitmapset *tmpset; + int cnt; + int i; + + *columns = NIL; + + /* + * Examine format of the file. If binary format, we don't need to convert + * at all. + */ + table = GetForeignTable(foreigntableid); + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "format") == 0) + { + char *format = defGetString(def); + + if (strcmp(format, "binary") == 0) + return false; + break; + } + } + + /* Add all the attributes needed for joins or final output. */ + pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, &attrs_used); + + /* Add all the attributes used by restriction clauses. */ + foreach(lc, baserel->baserestrictinfo) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + pull_varattnos((Node *) rinfo->clause, baserel->relid, &attrs_used); + } + + rel = heap_open(foreigntableid, AccessShareLock); + tupleDesc = RelationGetDescr(rel); + + tmpset = bms_copy(attrs_used); + while ((attnum = bms_first_member(tmpset)) >= 0) + { + /* Adjust for system attributes. */ + attnum += FirstLowInvalidHeapAttributeNumber; + + /* If whole-row reference, give up. */ + if (attnum == 0) + { + *columns = NIL; + return false; + } + + /* Ignore system attributes. */ + if (attnum < 0) + continue; + + /* Get user attributes. */ + if (attnum > 0) + { + Form_pg_attribute attr = tupleDesc->attrs[attnum - 1]; + char *attname = pstrdup(NameStr(attr->attname)); + + /* Skip dropped attributes. */ + if (attr->attisdropped) + continue; + *columns = lappend(*columns, makeString(attname)); + } + } + bms_free(tmpset); + + heap_close(rel, AccessShareLock); + + /* If all the user attributes needed, give up. */ + cnt = 0; + for (i = 0; i < tupleDesc->natts; i++) + { + Form_pg_attribute attr = tupleDesc->attrs[i]; + + /* Skip dropped attributes. */ + if (attr->attisdropped) + continue; + cnt++; + } + if (cnt == list_length(*columns)) + { + *columns = NIL; + return false; + } + + return true; +} + +/* * Estimate size of a foreign table. * * The main result is returned in baserel->rows. We also set diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 98bcb2f..c74c7df 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -122,6 +122,11 @@ typedef struct CopyStateData List *force_notnull; /* list of column names */ bool *force_notnull_flags; /* per-column CSV FNN flags */ + /* parameters from contrib/file_fdw */ + List *convert_binary; /* list of column names */ + bool *convert_binary_flags; /* per-column CSV/TEXT CB flags */ + bool convert_selectively; /* selective binary conversion? */ + /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ @@ -961,6 +966,21 @@ ProcessCopyOptions(CopyState cstate, errmsg("argument to option \"%s\" must be a list of column names", defel->defname))); } + else if (strcmp(defel->defname, "convert_binary") == 0) + { + if (cstate->convert_selectively) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + cstate->convert_selectively = true; + if (defel->arg == NULL || IsA(defel->arg, List)) + cstate->convert_binary = (List *) defel->arg; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a list of column names", + defel->defname))); + } else if (strcmp(defel->defname, "encoding") == 0) { if (cstate->file_encoding >= 0) @@ -1307,6 +1327,28 @@ BeginCopy(bool is_from, } } + /* Convert convert binary name list to per-column flags, check validity */ + cstate->convert_binary_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->convert_binary) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_binary); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("selected column \"%s\" not referenced by COPY", + NameStr(tupDesc->attrs[attnum - 1]->attname)))); + cstate->convert_binary_flags[attnum - 1] = true; + } + } + /* Use client encoding when ENCODING option is not specified. */ if (cstate->file_encoding < 0) cstate->file_encoding = pg_get_client_encoding(); @@ -2565,23 +2607,27 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, NameStr(attr[m]->attname)))); string = field_strings[fieldno++]; - if (cstate->csv_mode && string == NULL && - cstate->force_notnull_flags[m]) + if (!cstate->convert_selectively || + cstate->convert_binary_flags[m]) { - /* Go ahead and read the NULL string */ - string = cstate->null_print; - } + if (cstate->csv_mode && string == NULL && + cstate->force_notnull_flags[m]) + { + /* Go ahead and read the NULL string */ + string = cstate->null_print; + } - cstate->cur_attname = NameStr(attr[m]->attname); - cstate->cur_attval = string; - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - attr[m]->atttypmod); - if (string != NULL) - nulls[m] = false; - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; + cstate->cur_attname = NameStr(attr[m]->attname); + cstate->cur_attval = string; + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + attr[m]->atttypmod); + if (string != NULL) + nulls[m] = false; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } } Assert(fieldno == nfields);