Make COPY format extendable: Extract COPY TO format implementations
Hi,
I want to work on making COPY format extendable. I attach
the first patch for it. I'll send more patches after this is
merged.
Background:
Currently, COPY TO/FROM supports only "text", "csv" and
"binary" formats. There are some requests to support more
COPY formats. For example:
* 2023-11: JSON and JSON lines [1]/messages/by-id/24e3ee88-ec1e-421b-89ae-8a47ee0d2df1@joeconway.com
* 2022-04: Apache Arrow [2]/messages/by-id/CAGrfaBVyfm0wPzXVqm0=h5uArYh9N_ij+sVpUtDHqkB=VyB3jw@mail.gmail.com
* 2018-02: Apache Avro, Apache Parquet and Apache ORC [3]/messages/by-id/20180210151304.fonjztsynewldfba@gmail.com
(FYI: I want to add support for Apache Arrow.)
There were discussions how to add support for more formats. [3]/messages/by-id/20180210151304.fonjztsynewldfba@gmail.com[4]/messages/by-id/3741749.1655952719@sss.pgh.pa.us
In these discussions, we got a consensus about making COPY
format extendable.
But it seems that nobody works on this yet. So I want to
work on this. (If there is anyone who wants to work on this
together, I'm happy.)
Summary:
The attached patch introduces CopyToFormatOps struct that is
similar to TupleTableSlotOps for TupleTableSlot but
CopyToFormatOps is for COPY TO format. CopyToFormatOps has
routines to implement a COPY TO format.
The attached patch doesn't change:
* the current behavior (all existing tests are still passed
without changing them)
* the existing "text", "csv" and "binary" format output
implementations including local variable names (the
attached patch just move them and adjust indent)
* performance (no significant loss of performance)
In other words, this is just a refactoring for further
changes to make COPY format extendable. If I use "complete
the task and then request reviews for it" approach, it will
be difficult to review because changes for it will be
large. So I want to work on this step by step. Is it
acceptable?
TODOs that should be done in subsequent patches:
* Add some CopyToState readers such as CopyToStateGetDest(),
CopyToStateGetAttnums() and CopyToStateGetOpts()
(We will need to consider which APIs should be exported.)
(This is for implemeing COPY TO format by extension.)
* Export CopySend*() in src/backend/commands/copyto.c
(This is for implemeing COPY TO format by extension.)
* Add API to register a new COPY TO format implementation
* Add "CREATE XXX" to register a new COPY TO format (or COPY
TO/FROM format) implementation
("CREATE COPY HANDLER" was suggested in [5]/messages/by-id/20180211211235.5x3jywe5z3lkgcsr@alap3.anarazel.de.)
* Same for COPY FROM
Performance:
We got a consensus about making COPY format extendable but
we should care about performance. [6]/messages/by-id/3741749.1655952719@sss.pgh.pa.us
I think that step 1 ought to be to convert the existing
formats into plug-ins, and demonstrate that there's no
significant loss of performance.
So I measured COPY TO time with/without this change. You can
see there is no significant loss of performance.
Data: Random 32 bit integers:
CREATE TABLE data (int32 integer);
INSERT INTO data
SELECT random() * 10000
FROM generate_series(1, ${n_records});
The number of records: 100K, 1M and 10M
100K without this change:
format,elapsed time (ms)
text,22.527
csv,23.822
binary,24.806
100K with this change:
format,elapsed time (ms)
text,22.919
csv,24.643
binary,24.705
1M without this change:
format,elapsed time (ms)
text,223.457
csv,233.583
binary,242.687
1M with this change:
format,elapsed time (ms)
text,224.591
csv,233.964
binary,247.164
10M without this change:
format,elapsed time (ms)
text,2330.383
csv,2411.394
binary,2590.817
10M with this change:
format,elapsed time (ms)
text,2231.307
csv,2408.067
binary,2473.617
[1]: /messages/by-id/24e3ee88-ec1e-421b-89ae-8a47ee0d2df1@joeconway.com
[2]: /messages/by-id/CAGrfaBVyfm0wPzXVqm0=h5uArYh9N_ij+sVpUtDHqkB=VyB3jw@mail.gmail.com
[3]: /messages/by-id/20180210151304.fonjztsynewldfba@gmail.com
[4]: /messages/by-id/3741749.1655952719@sss.pgh.pa.us
[5]: /messages/by-id/20180211211235.5x3jywe5z3lkgcsr@alap3.anarazel.de
[6]: /messages/by-id/3741749.1655952719@sss.pgh.pa.us
Thanks,
--
kou
Attachments:
v1-0001-Extract-COPY-TO-format-implementations.patchtext/x-patch; charset=us-asciiDownload
From 7f00b2b0fb878ae1c687c151dd751512d02ed83e Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Dec 2023 12:32:54 +0900
Subject: [PATCH v1] Extract COPY TO format implementations
This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com
This doesn't change the current behavior. This just introduces
CopyToFormatOps, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.
Note that CopyToFormatOps can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.
Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:
https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us
> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.
You can see that there is no significant loss of performance:
Data: Random 32 bit integers:
CREATE TABLE data (int32 integer);
INSERT INTO data
SELECT random() * 10000
FROM generate_series(1, ${n_records});
The number of records: 100K, 1M and 10M
100K without this change:
format,elapsed time (ms)
text,22.527
csv,23.822
binary,24.806
100K with this change:
format,elapsed time (ms)
text,22.919
csv,24.643
binary,24.705
1M without this change:
format,elapsed time (ms)
text,223.457
csv,233.583
binary,242.687
1M with this change:
format,elapsed time (ms)
text,224.591
csv,233.964
binary,247.164
10M without this change:
format,elapsed time (ms)
text,2330.383
csv,2411.394
binary,2590.817
10M with this change:
format,elapsed time (ms)
text,2231.307
csv,2408.067
binary,2473.617
---
src/backend/commands/copy.c | 8 +
src/backend/commands/copyto.c | 387 +++++++++++++++++++++-------------
src/include/commands/copy.h | 27 ++-
3 files changed, 266 insertions(+), 156 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..27a1add456 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate,
opts_out->file_encoding = -1;
+ /* Text is the default format. */
+ opts_out->to_ops = CopyToFormatOpsText;
/* Extract options from the statement node tree */
foreach(option, options)
{
@@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate,
if (strcmp(fmt, "text") == 0)
/* default format */ ;
else if (strcmp(fmt, "csv") == 0)
+ {
opts_out->csv_mode = true;
+ opts_out->to_ops = CopyToFormatOpsCSV;
+ }
else if (strcmp(fmt, "binary") == 0)
+ {
opts_out->binary = true;
+ opts_out->to_ops = CopyToFormatOpsBinary;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..295e96dbc5 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,238 @@ static void CopySendEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
+/*
+ * CopyToFormatOps implementations.
+ */
+
+/*
+ * CopyToFormatOps implementation for "text" and "csv". CopyToFormatText*()
+ * refer cstate->opts.csv_mode and change their behavior. We can split this
+ * implementation and stop referring cstate->opts.csv_mode later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:
+ /* Default line termination depends on platform */
+#ifndef WIN32
+ CopySendChar(cstate, '\n');
+#else
+ CopySendString(cstate, "\r\n");
+#endif
+ break;
+ case COPY_FRONTEND:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ CopySendChar(cstate, '\n');
+ break;
+ default:
+ break;
+ }
+ CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * For non-binary copy, we need to convert null_print to file
+ * encoding, because it will be sent directly with CopySendString.
+ */
+ if (cstate->need_transcoding)
+ cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+ cstate->opts.null_print_len,
+ cstate->file_encoding);
+
+ /* if a header has been requested send the line */
+ if (cstate->opts.header_line)
+ {
+ bool hdr_delim = false;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ char *colname;
+
+ if (hdr_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ hdr_delim = true;
+
+ colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, colname, false,
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, colname);
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+ }
+}
+
+static void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (need_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ need_delim = true;
+
+ if (isnull)
+ {
+ CopySendString(cstate, cstate->opts.null_print_client);
+ }
+ else
+ {
+ char *string;
+
+ string = OutputFunctionCall(&out_functions[attnum - 1], value);
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->opts.force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, string);
+ }
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToFormatOps implementation for "binary".
+ */
+
+static void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ {
+ /* Generate header for a binary copy */
+ int32 tmp;
+
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ /* No header extension */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ }
+}
+
+static void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (isnull)
+ {
+ CopySendInt32(cstate, -1);
+ }
+ else
+ {
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+}
+
+const CopyToFormatOps CopyToFormatOpsText = {
+ .start = CopyToFormatTextStart,
+ .one_row = CopyToFormatTextOneRow,
+ .end = CopyToFormatTextEnd,
+};
+
+/*
+ * We can use the same CopyToFormatOps for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+const CopyToFormatOps CopyToFormatOpsCSV = CopyToFormatOpsText;
+
+const CopyToFormatOps CopyToFormatOpsBinary = {
+ .start = CopyToFormatBinaryStart,
+ .one_row = CopyToFormatBinaryOneRow,
+ .end = CopyToFormatBinaryEnd,
+};
/*
* Send copy start/stop messages for frontend copies. These have changed
@@ -198,16 +430,6 @@ CopySendEndOfRow(CopyToState cstate)
switch (cstate->copy_dest)
{
case COPY_FILE:
- if (!cstate->opts.binary)
- {
- /* Default line termination depends on platform */
-#ifndef WIN32
- CopySendChar(cstate, '\n');
-#else
- CopySendString(cstate, "\r\n");
-#endif
- }
-
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
@@ -242,10 +464,6 @@ CopySendEndOfRow(CopyToState cstate)
}
break;
case COPY_FRONTEND:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->opts.binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
@@ -748,8 +966,6 @@ DoCopyTo(CopyToState cstate)
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
- int num_phys_attrs;
- ListCell *cur;
uint64 processed;
if (fe_copy)
@@ -759,32 +975,11 @@ DoCopyTo(CopyToState cstate)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
- num_phys_attrs = tupDesc->natts;
cstate->opts.null_print_client = cstate->opts.null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
- /* Get info about the columns we need to process. */
- cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Oid out_func_oid;
- bool isvarlena;
- Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
- if (cstate->opts.binary)
- getTypeBinaryOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- else
- getTypeOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
- }
-
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
@@ -795,57 +990,7 @@ DoCopyTo(CopyToState cstate)
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
- if (cstate->opts.binary)
- {
- /* Generate header for a binary copy */
- int32 tmp;
-
- /* Signature */
- CopySendData(cstate, BinarySignature, 11);
- /* Flags field */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- /* No header extension */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- }
- else
- {
- /*
- * For non-binary copy, we need to convert null_print to file
- * encoding, because it will be sent directly with CopySendString.
- */
- if (cstate->need_transcoding)
- cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
- cstate->opts.null_print_len,
- cstate->file_encoding);
-
- /* if a header has been requested send the line */
- if (cstate->opts.header_line)
- {
- bool hdr_delim = false;
-
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- char *colname;
-
- if (hdr_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- hdr_delim = true;
-
- colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, colname, false,
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, colname);
- }
-
- CopySendEndOfRow(cstate);
- }
- }
+ cstate->opts.to_ops.start(cstate, tupDesc);
if (cstate->rel)
{
@@ -884,13 +1029,7 @@ DoCopyTo(CopyToState cstate)
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
- if (cstate->opts.binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
+ cstate->opts.to_ops.end(cstate);
MemoryContextDelete(cstate->rowcontext);
@@ -906,71 +1045,15 @@ DoCopyTo(CopyToState cstate)
static void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
- bool need_delim = false;
- FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
- ListCell *cur;
- char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
- if (cstate->opts.binary)
- {
- /* Binary per-tuple header */
- CopySendInt16(cstate, list_length(cstate->attnumlist));
- }
-
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Datum value = slot->tts_values[attnum - 1];
- bool isnull = slot->tts_isnull[attnum - 1];
-
- if (!cstate->opts.binary)
- {
- if (need_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- need_delim = true;
- }
-
- if (isnull)
- {
- if (!cstate->opts.binary)
- CopySendString(cstate, cstate->opts.null_print_client);
- else
- CopySendInt32(cstate, -1);
- }
- else
- {
- if (!cstate->opts.binary)
- {
- string = OutputFunctionCall(&out_functions[attnum - 1],
- value);
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, string,
- cstate->opts.force_quote_flags[attnum - 1],
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, string);
- }
- else
- {
- bytea *outputbytes;
-
- outputbytes = SendFunctionCall(&out_functions[attnum - 1],
- value);
- CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
- CopySendData(cstate, VARDATA(outputbytes),
- VARSIZE(outputbytes) - VARHDRSZ);
- }
- }
- }
-
- CopySendEndOfRow(cstate);
+ cstate->opts.to_ops.one_row(cstate, slot);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..6b5231b2f3 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,28 @@ typedef enum CopyHeaderChoice
COPY_HEADER_MATCH,
} CopyHeaderChoice;
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToFormatOps
+{
+ /* Called when COPY TO is started. This will send a header. */
+ void (*start) (CopyToState cstate, TupleDesc tupDesc);
+
+ /* Copy one row for COPY TO. */
+ void (*one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+ /* Called when COPY TO is ended. This will send a trailer. */
+ void (*end) (CopyToState cstate);
+} CopyToFormatOps;
+
+/* Predefined CopyToFormatOps for "text", "csv" and "binary". */
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsText;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsCSV;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsBinary;
+
/*
* A struct to hold COPY options, in a parsed form. All of these are related
* to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +85,9 @@ typedef struct CopyFormatOptions
bool *force_null_flags; /* per-column CSV FN flags */
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
+ CopyToFormatOps to_ops; /* how to format to */
} CopyFormatOptions;
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
typedef void (*copy_data_dest_cb) (void *data, int len);
--
2.40.1
On Mon, Dec 04, 2023 at 03:35:48PM +0900, Sutou Kouhei wrote:
I want to work on making COPY format extendable. I attach
the first patch for it. I'll send more patches after this is
merged.
Given the current discussion about adding JSON, I think this could be a
nice bit of refactoring that could ultimately open the door to providing
other COPY formats via shared libraries.
In other words, this is just a refactoring for further
changes to make COPY format extendable. If I use "complete
the task and then request reviews for it" approach, it will
be difficult to review because changes for it will be
large. So I want to work on this step by step. Is it
acceptable?
I think it makes sense to do this part independently, but we should be
careful to design this with the follow-up tasks in mind.
So I measured COPY TO time with/without this change. You can
see there is no significant loss of performance.Data: Random 32 bit integers:
CREATE TABLE data (int32 integer);
INSERT INTO data
SELECT random() * 10000
FROM generate_series(1, ${n_records});
Seems encouraging. I assume the performance concerns stem from the use of
function pointers. Or was there something else?
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
Hi,
Thanks for replying to this proposal!
In <20231205182458.GC2757816@nathanxps13>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Tue, 5 Dec 2023 12:24:58 -0600,
Nathan Bossart <nathandbossart@gmail.com> wrote:
I think it makes sense to do this part independently, but we should be
careful to design this with the follow-up tasks in mind.
OK. I'll keep updating the "TODOs" section in the original
e-mail. It also includes design in the follow-up tasks. We
can discuss the design separately from the patches
submitting. (The current submitted patch just focuses on
refactoring but we can discuss the final design.)
I assume the performance concerns stem from the use of
function pointers. Or was there something else?
I think so too.
The original e-mail that mentioned the performance concern
[1]: /messages/by-id/3741749.1655952719@sss.pgh.pa.us
pointers might be concerned.
If the currently supported formats ("text", "csv" and
"binary") are implemented as an extension, it may have more
concerns but we will keep them as built-in formats for
compatibility. So I think that no more concerns exist for
these formats.
[1]: /messages/by-id/3741749.1655952719@sss.pgh.pa.us
Thanks,
--
kou
On Wed, Dec 6, 2023 at 10:45 AM Sutou Kouhei <kou@clear-code.com> wrote:
Hi,
Thanks for replying to this proposal!
In <20231205182458.GC2757816@nathanxps13>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Tue, 5 Dec 2023 12:24:58 -0600,
Nathan Bossart <nathandbossart@gmail.com> wrote:I think it makes sense to do this part independently, but we should be
careful to design this with the follow-up tasks in mind.OK. I'll keep updating the "TODOs" section in the original
e-mail. It also includes design in the follow-up tasks. We
can discuss the design separately from the patches
submitting. (The current submitted patch just focuses on
refactoring but we can discuss the final design.)I assume the performance concerns stem from the use of
function pointers. Or was there something else?I think so too.
The original e-mail that mentioned the performance concern
[1] didn't say about the reason but the use of function
pointers might be concerned.If the currently supported formats ("text", "csv" and
"binary") are implemented as an extension, it may have more
concerns but we will keep them as built-in formats for
compatibility. So I think that no more concerns exist for
these formats.
For the modern formats(parquet, orc, avro, etc.), will they be
implemented as extensions or in core?
The patch looks good except for a pair of extra curly braces.
[1]: /messages/by-id/3741749.1655952719@sss.pgh.pa.us
Thanks,
--
kou
--
Regards
Junwang Zhao
Hi,
In <CAEG8a3Jf7kPV3ez5OHu-pFGscKfVyd9KkubMF199etkfz=EPRg@mail.gmail.com>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 11:18:35 +0800,
Junwang Zhao <zhjwpku@gmail.com> wrote:
For the modern formats(parquet, orc, avro, etc.), will they be
implemented as extensions or in core?
I think that they should be implemented as extensions
because they will depend of external libraries and may not
use C. For example, C++ will be used for Apache Parquet
because the official Apache Parquet C++ implementation
exists but the C implementation doesn't.
(I can implement an extension for Apache Parquet after we
complete this feature. I'll implement an extension for
Apache Arrow with the official Apache Arrow C++
implementation. And it's easy that we convert Apache Arrow
data to Apache Parquet with the official Apache Parquet
implementation.)
The patch looks good except for a pair of extra curly braces.
Thanks for the review! I attach the v2 patch that removes
extra curly braces for "if (isnull)".
Thanks,
--
kou
Attachments:
v2-0001-Extract-COPY-TO-format-implementations.patchtext/x-patch; charset=us-asciiDownload
From 2cd0d344d68667db71b621a8c94f376ddf1707c3 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Dec 2023 12:32:54 +0900
Subject: [PATCH v2] Extract COPY TO format implementations
This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com
This doesn't change the current behavior. This just introduces
CopyToFormatOps, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.
Note that CopyToFormatOps can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.
Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:
https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us
> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.
You can see that there is no significant loss of performance:
Data: Random 32 bit integers:
CREATE TABLE data (int32 integer);
INSERT INTO data
SELECT random() * 10000
FROM generate_series(1, ${n_records});
The number of records: 100K, 1M and 10M
100K without this change:
format,elapsed time (ms)
text,22.527
csv,23.822
binary,24.806
100K with this change:
format,elapsed time (ms)
text,22.919
csv,24.643
binary,24.705
1M without this change:
format,elapsed time (ms)
text,223.457
csv,233.583
binary,242.687
1M with this change:
format,elapsed time (ms)
text,224.591
csv,233.964
binary,247.164
10M without this change:
format,elapsed time (ms)
text,2330.383
csv,2411.394
binary,2590.817
10M with this change:
format,elapsed time (ms)
text,2231.307
csv,2408.067
binary,2473.617
---
src/backend/commands/copy.c | 8 +
src/backend/commands/copyto.c | 383 ++++++++++++++++++++--------------
src/include/commands/copy.h | 27 ++-
3 files changed, 262 insertions(+), 156 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..27a1add456 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate,
opts_out->file_encoding = -1;
+ /* Text is the default format. */
+ opts_out->to_ops = CopyToFormatOpsText;
/* Extract options from the statement node tree */
foreach(option, options)
{
@@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate,
if (strcmp(fmt, "text") == 0)
/* default format */ ;
else if (strcmp(fmt, "csv") == 0)
+ {
opts_out->csv_mode = true;
+ opts_out->to_ops = CopyToFormatOpsCSV;
+ }
else if (strcmp(fmt, "binary") == 0)
+ {
opts_out->binary = true;
+ opts_out->to_ops = CopyToFormatOpsBinary;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..79806b9a1b 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,234 @@ static void CopySendEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
+/*
+ * CopyToFormatOps implementations.
+ */
+
+/*
+ * CopyToFormatOps implementation for "text" and "csv". CopyToFormatText*()
+ * refer cstate->opts.csv_mode and change their behavior. We can split this
+ * implementation and stop referring cstate->opts.csv_mode later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:
+ /* Default line termination depends on platform */
+#ifndef WIN32
+ CopySendChar(cstate, '\n');
+#else
+ CopySendString(cstate, "\r\n");
+#endif
+ break;
+ case COPY_FRONTEND:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ CopySendChar(cstate, '\n');
+ break;
+ default:
+ break;
+ }
+ CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * For non-binary copy, we need to convert null_print to file
+ * encoding, because it will be sent directly with CopySendString.
+ */
+ if (cstate->need_transcoding)
+ cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+ cstate->opts.null_print_len,
+ cstate->file_encoding);
+
+ /* if a header has been requested send the line */
+ if (cstate->opts.header_line)
+ {
+ bool hdr_delim = false;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ char *colname;
+
+ if (hdr_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ hdr_delim = true;
+
+ colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, colname, false,
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, colname);
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+ }
+}
+
+static void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (need_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ need_delim = true;
+
+ if (isnull)
+ CopySendString(cstate, cstate->opts.null_print_client);
+ else
+ {
+ char *string;
+
+ string = OutputFunctionCall(&out_functions[attnum - 1], value);
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->opts.force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, string);
+ }
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToFormatOps implementation for "binary".
+ */
+
+static void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ {
+ /* Generate header for a binary copy */
+ int32 tmp;
+
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ /* No header extension */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ }
+}
+
+static void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (isnull)
+ CopySendInt32(cstate, -1);
+ else
+ {
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+}
+
+const CopyToFormatOps CopyToFormatOpsText = {
+ .start = CopyToFormatTextStart,
+ .one_row = CopyToFormatTextOneRow,
+ .end = CopyToFormatTextEnd,
+};
+
+/*
+ * We can use the same CopyToFormatOps for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+const CopyToFormatOps CopyToFormatOpsCSV = CopyToFormatOpsText;
+
+const CopyToFormatOps CopyToFormatOpsBinary = {
+ .start = CopyToFormatBinaryStart,
+ .one_row = CopyToFormatBinaryOneRow,
+ .end = CopyToFormatBinaryEnd,
+};
/*
* Send copy start/stop messages for frontend copies. These have changed
@@ -198,16 +426,6 @@ CopySendEndOfRow(CopyToState cstate)
switch (cstate->copy_dest)
{
case COPY_FILE:
- if (!cstate->opts.binary)
- {
- /* Default line termination depends on platform */
-#ifndef WIN32
- CopySendChar(cstate, '\n');
-#else
- CopySendString(cstate, "\r\n");
-#endif
- }
-
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
@@ -242,10 +460,6 @@ CopySendEndOfRow(CopyToState cstate)
}
break;
case COPY_FRONTEND:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->opts.binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
@@ -748,8 +962,6 @@ DoCopyTo(CopyToState cstate)
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
- int num_phys_attrs;
- ListCell *cur;
uint64 processed;
if (fe_copy)
@@ -759,32 +971,11 @@ DoCopyTo(CopyToState cstate)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
- num_phys_attrs = tupDesc->natts;
cstate->opts.null_print_client = cstate->opts.null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
- /* Get info about the columns we need to process. */
- cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Oid out_func_oid;
- bool isvarlena;
- Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
- if (cstate->opts.binary)
- getTypeBinaryOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- else
- getTypeOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
- }
-
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
@@ -795,57 +986,7 @@ DoCopyTo(CopyToState cstate)
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
- if (cstate->opts.binary)
- {
- /* Generate header for a binary copy */
- int32 tmp;
-
- /* Signature */
- CopySendData(cstate, BinarySignature, 11);
- /* Flags field */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- /* No header extension */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- }
- else
- {
- /*
- * For non-binary copy, we need to convert null_print to file
- * encoding, because it will be sent directly with CopySendString.
- */
- if (cstate->need_transcoding)
- cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
- cstate->opts.null_print_len,
- cstate->file_encoding);
-
- /* if a header has been requested send the line */
- if (cstate->opts.header_line)
- {
- bool hdr_delim = false;
-
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- char *colname;
-
- if (hdr_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- hdr_delim = true;
-
- colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, colname, false,
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, colname);
- }
-
- CopySendEndOfRow(cstate);
- }
- }
+ cstate->opts.to_ops.start(cstate, tupDesc);
if (cstate->rel)
{
@@ -884,13 +1025,7 @@ DoCopyTo(CopyToState cstate)
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
- if (cstate->opts.binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
+ cstate->opts.to_ops.end(cstate);
MemoryContextDelete(cstate->rowcontext);
@@ -906,71 +1041,15 @@ DoCopyTo(CopyToState cstate)
static void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
- bool need_delim = false;
- FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
- ListCell *cur;
- char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
- if (cstate->opts.binary)
- {
- /* Binary per-tuple header */
- CopySendInt16(cstate, list_length(cstate->attnumlist));
- }
-
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Datum value = slot->tts_values[attnum - 1];
- bool isnull = slot->tts_isnull[attnum - 1];
-
- if (!cstate->opts.binary)
- {
- if (need_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- need_delim = true;
- }
-
- if (isnull)
- {
- if (!cstate->opts.binary)
- CopySendString(cstate, cstate->opts.null_print_client);
- else
- CopySendInt32(cstate, -1);
- }
- else
- {
- if (!cstate->opts.binary)
- {
- string = OutputFunctionCall(&out_functions[attnum - 1],
- value);
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, string,
- cstate->opts.force_quote_flags[attnum - 1],
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, string);
- }
- else
- {
- bytea *outputbytes;
-
- outputbytes = SendFunctionCall(&out_functions[attnum - 1],
- value);
- CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
- CopySendData(cstate, VARDATA(outputbytes),
- VARSIZE(outputbytes) - VARHDRSZ);
- }
- }
- }
-
- CopySendEndOfRow(cstate);
+ cstate->opts.to_ops.one_row(cstate, slot);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..6b5231b2f3 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,28 @@ typedef enum CopyHeaderChoice
COPY_HEADER_MATCH,
} CopyHeaderChoice;
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToFormatOps
+{
+ /* Called when COPY TO is started. This will send a header. */
+ void (*start) (CopyToState cstate, TupleDesc tupDesc);
+
+ /* Copy one row for COPY TO. */
+ void (*one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+ /* Called when COPY TO is ended. This will send a trailer. */
+ void (*end) (CopyToState cstate);
+} CopyToFormatOps;
+
+/* Predefined CopyToFormatOps for "text", "csv" and "binary". */
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsText;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsCSV;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsBinary;
+
/*
* A struct to hold COPY options, in a parsed form. All of these are related
* to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +85,9 @@ typedef struct CopyFormatOptions
bool *force_null_flags; /* per-column CSV FN flags */
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
+ CopyToFormatOps to_ops; /* how to format to */
} CopyFormatOptions;
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
typedef void (*copy_data_dest_cb) (void *data, int len);
--
2.40.1
On Wed, Dec 6, 2023 at 2:19 PM Sutou Kouhei <kou@clear-code.com> wrote:
Hi,
In <CAEG8a3Jf7kPV3ez5OHu-pFGscKfVyd9KkubMF199etkfz=EPRg@mail.gmail.com>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 11:18:35 +0800,
Junwang Zhao <zhjwpku@gmail.com> wrote:For the modern formats(parquet, orc, avro, etc.), will they be
implemented as extensions or in core?I think that they should be implemented as extensions
because they will depend of external libraries and may not
use C. For example, C++ will be used for Apache Parquet
because the official Apache Parquet C++ implementation
exists but the C implementation doesn't.(I can implement an extension for Apache Parquet after we
complete this feature. I'll implement an extension for
Apache Arrow with the official Apache Arrow C++
implementation. And it's easy that we convert Apache Arrow
data to Apache Parquet with the official Apache Parquet
implementation.)The patch looks good except for a pair of extra curly braces.
Thanks for the review! I attach the v2 patch that removes
extra curly braces for "if (isnull)".
For the extra curly braces, I mean the following code block in
CopyToFormatBinaryStart:
+ { <-- I thought this is useless?
+ /* Generate header for a binary copy */
+ int32 tmp;
+
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ /* No header extension */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ }
Thanks,
--
kou
--
Regards
Junwang Zhao
Hi,
In <CAEG8a3K9dE2gt3+K+h=DwTqMenR84aeYuYS+cty3SR3LAeDBAQ@mail.gmail.com>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 15:11:34 +0800,
Junwang Zhao <zhjwpku@gmail.com> wrote:
For the extra curly braces, I mean the following code block in
CopyToFormatBinaryStart:+ { <-- I thought this is useless? + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); + }
Oh, I see. I've removed and attach the v3 patch. In general,
I don't change variable name and so on in this patch. I just
move codes in this patch. But I also removed the "tmp"
variable for this case because I think that the name isn't
suitable for larger scope. (I think that "tmp" is acceptable
in a small scope like the above code.)
New code:
/* Generate header for a binary copy */
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
CopySendInt32(cstate, 0);
/* No header extension */
CopySendInt32(cstate, 0);
Thanks,
--
kou
Attachments:
v3-0001-Extract-COPY-TO-format-implementations.patchtext/x-patch; charset=us-asciiDownload
From 9fe0087d9a6a79a7d1a7d0af63eb16abadbf0d4a Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Dec 2023 12:32:54 +0900
Subject: [PATCH v3] Extract COPY TO format implementations
This is a part of making COPY format extendable. See also these past
discussions:
* New Copy Formats - avro/orc/parquet:
https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com
* Make COPY extendable in order to support Parquet and other formats:
https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com
This doesn't change the current behavior. This just introduces
CopyToFormatOps, which just has function pointers of format
implementation like TupleTableSlotOps, and use it for existing "text",
"csv" and "binary" format implementations.
Note that CopyToFormatOps can't be used from extensions yet because
CopySend*() aren't exported yet. Extensions can't send formatted data
to a destination without CopySend*(). They will be exported by
subsequent patches.
Here is a benchmark result with/without this change because there was
a discussion that we should care about performance regression:
https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us
> I think that step 1 ought to be to convert the existing formats into
> plug-ins, and demonstrate that there's no significant loss of
> performance.
You can see that there is no significant loss of performance:
Data: Random 32 bit integers:
CREATE TABLE data (int32 integer);
INSERT INTO data
SELECT random() * 10000
FROM generate_series(1, ${n_records});
The number of records: 100K, 1M and 10M
100K without this change:
format,elapsed time (ms)
text,22.527
csv,23.822
binary,24.806
100K with this change:
format,elapsed time (ms)
text,22.919
csv,24.643
binary,24.705
1M without this change:
format,elapsed time (ms)
text,223.457
csv,233.583
binary,242.687
1M with this change:
format,elapsed time (ms)
text,224.591
csv,233.964
binary,247.164
10M without this change:
format,elapsed time (ms)
text,2330.383
csv,2411.394
binary,2590.817
10M with this change:
format,elapsed time (ms)
text,2231.307
csv,2408.067
binary,2473.617
---
src/backend/commands/copy.c | 8 +
src/backend/commands/copyto.c | 377 ++++++++++++++++++++--------------
src/include/commands/copy.h | 27 ++-
3 files changed, 256 insertions(+), 156 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..27a1add456 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate,
opts_out->file_encoding = -1;
+ /* Text is the default format. */
+ opts_out->to_ops = CopyToFormatOpsText;
/* Extract options from the statement node tree */
foreach(option, options)
{
@@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate,
if (strcmp(fmt, "text") == 0)
/* default format */ ;
else if (strcmp(fmt, "csv") == 0)
+ {
opts_out->csv_mode = true;
+ opts_out->to_ops = CopyToFormatOpsCSV;
+ }
else if (strcmp(fmt, "binary") == 0)
+ {
opts_out->binary = true;
+ opts_out->to_ops = CopyToFormatOpsBinary;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..8f51090a03 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,228 @@ static void CopySendEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
+/*
+ * CopyToFormatOps implementations.
+ */
+
+/*
+ * CopyToFormatOps implementation for "text" and "csv". CopyToFormatText*()
+ * refer cstate->opts.csv_mode and change their behavior. We can split this
+ * implementation and stop referring cstate->opts.csv_mode later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:
+ /* Default line termination depends on platform */
+#ifndef WIN32
+ CopySendChar(cstate, '\n');
+#else
+ CopySendString(cstate, "\r\n");
+#endif
+ break;
+ case COPY_FRONTEND:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ CopySendChar(cstate, '\n');
+ break;
+ default:
+ break;
+ }
+ CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * For non-binary copy, we need to convert null_print to file
+ * encoding, because it will be sent directly with CopySendString.
+ */
+ if (cstate->need_transcoding)
+ cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+ cstate->opts.null_print_len,
+ cstate->file_encoding);
+
+ /* if a header has been requested send the line */
+ if (cstate->opts.header_line)
+ {
+ bool hdr_delim = false;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ char *colname;
+
+ if (hdr_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ hdr_delim = true;
+
+ colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, colname, false,
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, colname);
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+ }
+}
+
+static void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (need_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ need_delim = true;
+
+ if (isnull)
+ CopySendString(cstate, cstate->opts.null_print_client);
+ else
+ {
+ char *string;
+
+ string = OutputFunctionCall(&out_functions[attnum - 1], value);
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->opts.force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, string);
+ }
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyToFormatOps implementation for "binary".
+ */
+
+static void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /* Generate header for a binary copy */
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ CopySendInt32(cstate, 0);
+ /* No header extension */
+ CopySendInt32(cstate, 0);
+}
+
+static void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (isnull)
+ CopySendInt32(cstate, -1);
+ else
+ {
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ CopySendEndOfRow(cstate);
+}
+
+static void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+}
+
+const CopyToFormatOps CopyToFormatOpsText = {
+ .start = CopyToFormatTextStart,
+ .one_row = CopyToFormatTextOneRow,
+ .end = CopyToFormatTextEnd,
+};
+
+/*
+ * We can use the same CopyToFormatOps for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+const CopyToFormatOps CopyToFormatOpsCSV = CopyToFormatOpsText;
+
+const CopyToFormatOps CopyToFormatOpsBinary = {
+ .start = CopyToFormatBinaryStart,
+ .one_row = CopyToFormatBinaryOneRow,
+ .end = CopyToFormatBinaryEnd,
+};
/*
* Send copy start/stop messages for frontend copies. These have changed
@@ -198,16 +420,6 @@ CopySendEndOfRow(CopyToState cstate)
switch (cstate->copy_dest)
{
case COPY_FILE:
- if (!cstate->opts.binary)
- {
- /* Default line termination depends on platform */
-#ifndef WIN32
- CopySendChar(cstate, '\n');
-#else
- CopySendString(cstate, "\r\n");
-#endif
- }
-
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
@@ -242,10 +454,6 @@ CopySendEndOfRow(CopyToState cstate)
}
break;
case COPY_FRONTEND:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->opts.binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
@@ -748,8 +956,6 @@ DoCopyTo(CopyToState cstate)
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
- int num_phys_attrs;
- ListCell *cur;
uint64 processed;
if (fe_copy)
@@ -759,32 +965,11 @@ DoCopyTo(CopyToState cstate)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
- num_phys_attrs = tupDesc->natts;
cstate->opts.null_print_client = cstate->opts.null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
- /* Get info about the columns we need to process. */
- cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Oid out_func_oid;
- bool isvarlena;
- Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
- if (cstate->opts.binary)
- getTypeBinaryOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- else
- getTypeOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
- }
-
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
@@ -795,57 +980,7 @@ DoCopyTo(CopyToState cstate)
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
- if (cstate->opts.binary)
- {
- /* Generate header for a binary copy */
- int32 tmp;
-
- /* Signature */
- CopySendData(cstate, BinarySignature, 11);
- /* Flags field */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- /* No header extension */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- }
- else
- {
- /*
- * For non-binary copy, we need to convert null_print to file
- * encoding, because it will be sent directly with CopySendString.
- */
- if (cstate->need_transcoding)
- cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
- cstate->opts.null_print_len,
- cstate->file_encoding);
-
- /* if a header has been requested send the line */
- if (cstate->opts.header_line)
- {
- bool hdr_delim = false;
-
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- char *colname;
-
- if (hdr_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- hdr_delim = true;
-
- colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, colname, false,
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, colname);
- }
-
- CopySendEndOfRow(cstate);
- }
- }
+ cstate->opts.to_ops.start(cstate, tupDesc);
if (cstate->rel)
{
@@ -884,13 +1019,7 @@ DoCopyTo(CopyToState cstate)
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
- if (cstate->opts.binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
+ cstate->opts.to_ops.end(cstate);
MemoryContextDelete(cstate->rowcontext);
@@ -906,71 +1035,15 @@ DoCopyTo(CopyToState cstate)
static void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
- bool need_delim = false;
- FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
- ListCell *cur;
- char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
- if (cstate->opts.binary)
- {
- /* Binary per-tuple header */
- CopySendInt16(cstate, list_length(cstate->attnumlist));
- }
-
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Datum value = slot->tts_values[attnum - 1];
- bool isnull = slot->tts_isnull[attnum - 1];
-
- if (!cstate->opts.binary)
- {
- if (need_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- need_delim = true;
- }
-
- if (isnull)
- {
- if (!cstate->opts.binary)
- CopySendString(cstate, cstate->opts.null_print_client);
- else
- CopySendInt32(cstate, -1);
- }
- else
- {
- if (!cstate->opts.binary)
- {
- string = OutputFunctionCall(&out_functions[attnum - 1],
- value);
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, string,
- cstate->opts.force_quote_flags[attnum - 1],
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, string);
- }
- else
- {
- bytea *outputbytes;
-
- outputbytes = SendFunctionCall(&out_functions[attnum - 1],
- value);
- CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
- CopySendData(cstate, VARDATA(outputbytes),
- VARSIZE(outputbytes) - VARHDRSZ);
- }
- }
- }
-
- CopySendEndOfRow(cstate);
+ cstate->opts.to_ops.one_row(cstate, slot);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..6b5231b2f3 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,28 @@ typedef enum CopyHeaderChoice
COPY_HEADER_MATCH,
} CopyHeaderChoice;
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY TO format implementation. */
+typedef struct CopyToFormatOps
+{
+ /* Called when COPY TO is started. This will send a header. */
+ void (*start) (CopyToState cstate, TupleDesc tupDesc);
+
+ /* Copy one row for COPY TO. */
+ void (*one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+ /* Called when COPY TO is ended. This will send a trailer. */
+ void (*end) (CopyToState cstate);
+} CopyToFormatOps;
+
+/* Predefined CopyToFormatOps for "text", "csv" and "binary". */
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsText;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsCSV;
+extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsBinary;
+
/*
* A struct to hold COPY options, in a parsed form. All of these are related
* to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +85,9 @@ typedef struct CopyFormatOptions
bool *force_null_flags; /* per-column CSV FN flags */
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
+ CopyToFormatOps to_ops; /* how to format to */
} CopyFormatOptions;
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
typedef void (*copy_data_dest_cb) (void *data, int len);
--
2.40.1
Sutou Kouhei wrote:
* 2022-04: Apache Arrow [2]
* 2018-02: Apache Avro, Apache Parquet and Apache ORC [3](FYI: I want to add support for Apache Arrow.)
There were discussions how to add support for more formats. [3][4]
In these discussions, we got a consensus about making COPY
format extendable.
These formats seem all column-oriented whereas COPY is row-oriented
at the protocol level [1]https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY.
With regard to the procotol, how would it work to support these formats?
[1]: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
On Wed, Dec 6, 2023 at 3:28 PM Sutou Kouhei <kou@clear-code.com> wrote:
Hi,
In <CAEG8a3K9dE2gt3+K+h=DwTqMenR84aeYuYS+cty3SR3LAeDBAQ@mail.gmail.com>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 15:11:34 +0800,
Junwang Zhao <zhjwpku@gmail.com> wrote:For the extra curly braces, I mean the following code block in
CopyToFormatBinaryStart:+ { <-- I thought this is useless? + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); + }Oh, I see. I've removed and attach the v3 patch. In general,
I don't change variable name and so on in this patch. I just
move codes in this patch. But I also removed the "tmp"
variable for this case because I think that the name isn't
suitable for larger scope. (I think that "tmp" is acceptable
in a small scope like the above code.)New code:
/* Generate header for a binary copy */
/* Signature */
CopySendData(cstate, BinarySignature, 11);
/* Flags field */
CopySendInt32(cstate, 0);
/* No header extension */
CopySendInt32(cstate, 0);Thanks,
--
kou
Hi Kou,
I read the thread[1]/messages/by-id/20180211211235.5x3jywe5z3lkgcsr@alap3.anarazel.de -- Regards Junwang Zhao you posted and I think Andres's suggestion sounds great.
Should we extract both *copy to* and *copy from* for the first step, in that
case we can add the pg_copy_handler catalog smoothly later.
Attached V4 adds 'extract copy from' and it passed the cirrus ci,
please take a look.
I added a hook *copy_from_end* but this might be removed later if not used.
[1]: /messages/by-id/20180211211235.5x3jywe5z3lkgcsr@alap3.anarazel.de -- Regards Junwang Zhao
--
Regards
Junwang Zhao
Attachments:
v4-0001-Extract-COPY-handlers.patchapplication/octet-stream; name=v4-0001-Extract-COPY-handlers.patchDownload
From df9e25b8517ab5dc6dd9f73ed7ad91fc20b1f938 Mon Sep 17 00:00:00 2001
From: Zhao Junwang <zhjwpku@gmail.com>
Date: Wed, 6 Dec 2023 19:13:22 +0800
Subject: [PATCH v4] Extract COPY handlers
---
src/backend/commands/copy.c | 44 ++++
src/backend/commands/copyfrom.c | 275 ++++++++++++---------
src/backend/commands/copyfromparse.c | 309 ++++++++++++-----------
src/backend/commands/copyto.c | 354 +++++++++++++++------------
src/include/commands/copy.h | 49 +++-
5 files changed, 619 insertions(+), 412 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..6ae904c1b8 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate,
opts_out->file_encoding = -1;
+ /* Text is the default format. */
+ opts_out->handler = CopyHandlerOpsText;
/* Extract options from the statement node tree */
foreach(option, options)
{
@@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate,
if (strcmp(fmt, "text") == 0)
/* default format */ ;
else if (strcmp(fmt, "csv") == 0)
+ {
opts_out->csv_mode = true;
+ opts_out->handler = CopyHandlerOpsCSV;
+ }
else if (strcmp(fmt, "binary") == 0)
+ {
opts_out->binary = true;
+ opts_out->handler = CopyHandlerOpsBinary;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -864,3 +872,39 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
return attnums;
}
+
+const CopyHandlerOps CopyHandlerOpsText = {
+ .copy_to_start = CopyToFormatTextStart,
+ .copy_to_one_row = CopyToFormatTextOneRow,
+ .copy_to_end = CopyToFormatTextEnd,
+ .copy_from_start = CopyFromFormatTextStart,
+ .copy_from_next = CopyFromFormatTextNext,
+ .copy_from_error_callback = CopyFromFormatTextErrorCallback,
+ .copy_from_end = NULL,
+};
+
+/*
+ * We can use the same CopyHandlerOps for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+const CopyHandlerOps CopyHandlerOpsCSV = {
+ .copy_to_start = CopyToFormatTextStart,
+ .copy_to_one_row = CopyToFormatTextOneRow,
+ .copy_to_end = CopyToFormatTextEnd,
+ .copy_from_start = CopyFromFormatTextStart,
+ .copy_from_next = CopyFromFormatTextNext,
+ .copy_from_error_callback = CopyFromFormatTextErrorCallback,
+ .copy_from_end = NULL,
+};
+
+const CopyHandlerOps CopyHandlerOpsBinary = {
+ .copy_to_start = CopyToFormatBinaryStart,
+ .copy_to_one_row = CopyToFormatBinaryOneRow,
+ .copy_to_end = CopyToFormatBinaryEnd,
+ .copy_from_start = CopyFromFormatBinaryStart,
+ .copy_from_next = CopyFromFormatBinaryNext,
+ .copy_from_error_callback = CopyFromFormatBinaryErrorCallback,
+ .copy_from_end = NULL,
+};
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f4861652a9..01c3c1c84f 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,6 +107,71 @@ static char *limit_printout_length(const char *str);
static void ClosePipeFromProgram(CopyFromState cstate);
+void
+CopyFromFormatBinaryErrorCallback(CopyFromState cstate)
+{
+ /* can't usefully display the data */
+ if (cstate->cur_attname)
+ errcontext("COPY %s, line %llu, column %s",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno,
+ cstate->cur_attname);
+ else
+ errcontext("COPY %s, line %llu",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno);
+}
+
+void
+CopyFromFormatTextErrorCallback(CopyFromState cstate)
+{
+ if (cstate->cur_attname && cstate->cur_attval)
+ {
+ /* error is relevant to a particular column */
+ char *attval;
+
+ attval = limit_printout_length(cstate->cur_attval);
+ errcontext("COPY %s, line %llu, column %s: \"%s\"",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno,
+ cstate->cur_attname,
+ attval);
+ pfree(attval);
+ }
+ else if (cstate->cur_attname)
+ {
+ /* error is relevant to a particular column, value is NULL */
+ errcontext("COPY %s, line %llu, column %s: null input",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno,
+ cstate->cur_attname);
+ }
+ else
+ {
+ /*
+ * Error is relevant to a particular line.
+ *
+ * If line_buf still contains the correct line, print it.
+ */
+ if (cstate->line_buf_valid)
+ {
+ char *lineval;
+
+ lineval = limit_printout_length(cstate->line_buf.data);
+ errcontext("COPY %s, line %llu: \"%s\"",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno, lineval);
+ pfree(lineval);
+ }
+ else
+ {
+ errcontext("COPY %s, line %llu",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno);
+ }
+ }
+}
+
/*
* error context callback for COPY FROM
*
@@ -123,67 +188,7 @@ CopyFromErrorCallback(void *arg)
cstate->cur_relname);
return;
}
- if (cstate->opts.binary)
- {
- /* can't usefully display the data */
- if (cstate->cur_attname)
- errcontext("COPY %s, line %llu, column %s",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno,
- cstate->cur_attname);
- else
- errcontext("COPY %s, line %llu",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno);
- }
- else
- {
- if (cstate->cur_attname && cstate->cur_attval)
- {
- /* error is relevant to a particular column */
- char *attval;
-
- attval = limit_printout_length(cstate->cur_attval);
- errcontext("COPY %s, line %llu, column %s: \"%s\"",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno,
- cstate->cur_attname,
- attval);
- pfree(attval);
- }
- else if (cstate->cur_attname)
- {
- /* error is relevant to a particular column, value is NULL */
- errcontext("COPY %s, line %llu, column %s: null input",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno,
- cstate->cur_attname);
- }
- else
- {
- /*
- * Error is relevant to a particular line.
- *
- * If line_buf still contains the correct line, print it.
- */
- if (cstate->line_buf_valid)
- {
- char *lineval;
-
- lineval = limit_printout_length(cstate->line_buf.data);
- errcontext("COPY %s, line %llu: \"%s\"",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno, lineval);
- pfree(lineval);
- }
- else
- {
- errcontext("COPY %s, line %llu",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno);
- }
- }
- }
+ cstate->opts.handler.copy_from_error_callback(cstate);
}
/*
@@ -1320,6 +1325,101 @@ CopyFrom(CopyFromState cstate)
return processed;
}
+void
+CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+ FmgrInfo *in_functions;
+ Oid *typioparams;
+ Oid in_func_oid;
+ AttrNumber num_phys_attrs;
+
+ /*
+ * Pick up the required catalog information for each attribute in the
+ * relation, including the input function, the element type (to pass to
+ * the input function), and info about defaults and constraints. (Which
+ * input function we use depends on text/binary format choice.)
+ */
+ num_phys_attrs = tupDesc->natts;
+ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+ for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+ /* We don't need info for dropped attributes */
+ if (att->attisdropped)
+ continue;
+
+ /* Fetch the input function and typioparam info */
+ getTypeBinaryInputInfo(att->atttypid,
+ &in_func_oid, &typioparams[attnum - 1]);
+
+ fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+ }
+ cstate->in_functions = in_functions;
+ cstate->typioparams = typioparams;
+}
+
+void
+CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+ FmgrInfo *in_functions;
+ Oid *typioparams;
+ Oid in_func_oid;
+ AttrNumber attr_count,
+ num_phys_attrs;
+
+ num_phys_attrs = tupDesc->natts;
+
+ /*
+ * If encoding conversion is needed, we need another buffer to hold
+ * the converted input data. Otherwise, we can just point input_buf
+ * to the same buffer as raw_buf.
+ */
+ if (cstate->need_transcoding)
+ {
+ cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+ cstate->input_buf_index = cstate->input_buf_len = 0;
+ }
+ else
+ cstate->input_buf = cstate->raw_buf;
+ cstate->input_reached_eof = false;
+
+ initStringInfo(&cstate->line_buf);
+
+ /* create workspace for CopyReadAttributes results */
+ attr_count = list_length(cstate->attnumlist);
+
+ cstate->max_fields = attr_count;
+ cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+
+ /*
+ * Pick up the required catalog information for each attribute in the
+ * relation, including the input function, the element type (to pass to
+ * the input function), and info about defaults and constraints. (Which
+ * input function we use depends on text/binary format choice.)
+ */
+ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+ for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+ /* We don't need info for dropped attributes */
+ if (att->attisdropped)
+ continue;
+
+ /* Fetch the input function and typioparam info */
+ getTypeInputInfo(att->atttypid,
+ &in_func_oid, &typioparams[attnum - 1]);
+ fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+ }
+ cstate->in_functions = in_functions;
+ cstate->typioparams = typioparams;
+}
+
/*
* Setup to read tuples from a file for COPY FROM.
*
@@ -1348,9 +1448,6 @@ BeginCopyFrom(ParseState *pstate,
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
num_defaults;
- FmgrInfo *in_functions;
- Oid *typioparams;
- Oid in_func_oid;
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
@@ -1518,25 +1615,6 @@ BeginCopyFrom(ParseState *pstate,
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->raw_reached_eof = false;
- if (!cstate->opts.binary)
- {
- /*
- * If encoding conversion is needed, we need another buffer to hold
- * the converted input data. Otherwise, we can just point input_buf
- * to the same buffer as raw_buf.
- */
- if (cstate->need_transcoding)
- {
- cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
- cstate->input_buf_index = cstate->input_buf_len = 0;
- }
- else
- cstate->input_buf = cstate->raw_buf;
- cstate->input_reached_eof = false;
-
- initStringInfo(&cstate->line_buf);
- }
-
initStringInfo(&cstate->attribute_buf);
/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1546,17 +1624,10 @@ BeginCopyFrom(ParseState *pstate,
cstate->rteperminfos = pstate->p_rteperminfos;
}
+ cstate->opts.handler.copy_from_start(cstate, tupDesc);
+
num_defaults = 0;
volatile_defexprs = false;
-
- /*
- * Pick up the required catalog information for each attribute in the
- * relation, including the input function, the element type (to pass to
- * the input function), and info about defaults and constraints. (Which
- * input function we use depends on text/binary format choice.)
- */
- in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
@@ -1568,15 +1639,6 @@ BeginCopyFrom(ParseState *pstate,
if (att->attisdropped)
continue;
- /* Fetch the input function and typioparam info */
- if (cstate->opts.binary)
- getTypeBinaryInputInfo(att->atttypid,
- &in_func_oid, &typioparams[attnum - 1]);
- else
- getTypeInputInfo(att->atttypid,
- &in_func_oid, &typioparams[attnum - 1]);
- fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
/* Get default info if available */
defexprs[attnum - 1] = NULL;
@@ -1636,8 +1698,6 @@ BeginCopyFrom(ParseState *pstate,
cstate->bytes_processed = 0;
/* We keep those variables in cstate. */
- cstate->in_functions = in_functions;
- cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
cstate->volatile_defexprs = volatile_defexprs;
@@ -1716,15 +1776,6 @@ BeginCopyFrom(ParseState *pstate,
ReceiveCopyBinaryHeader(cstate);
}
- /* create workspace for CopyReadAttributes results */
- if (!cstate->opts.binary)
- {
- AttrNumber attr_count = list_length(cstate->attnumlist);
-
- cstate->max_fields = attr_count;
- cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
- }
-
MemoryContextSwitchTo(oldcontext);
return cstate;
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f553734582..e840ebb108 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -839,187 +839,208 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
return true;
}
-/*
- * Read next tuple from file for COPY FROM. Return false if no more tuples.
- *
- * 'econtext' is used to evaluate default expression for each column that is
- * either not read from the file or is using the DEFAULT option of COPY FROM.
- * It can be NULL when no default values are used, i.e. when all columns are
- * read from the file, and DEFAULT option is unset.
- *
- * 'values' and 'nulls' arrays must be the same length as columns of the
- * relation passed to BeginCopyFrom. This function fills the arrays.
- */
bool
-NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
- Datum *values, bool *nulls)
+CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls)
{
TupleDesc tupDesc;
- AttrNumber num_phys_attrs,
- attr_count,
- num_defaults = cstate->num_defaults;
+ AttrNumber attr_count;
+ int16 fld_count;
+ ListCell *cur;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
- int i;
- int *defmap = cstate->defmap;
- ExprState **defexprs = cstate->defexprs;
- tupDesc = RelationGetDescr(cstate->rel);
- num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
- /* Initialize all values for row to NULL */
- MemSet(values, 0, num_phys_attrs * sizeof(Datum));
- MemSet(nulls, true, num_phys_attrs * sizeof(bool));
- MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+ cstate->cur_lineno++;
- if (!cstate->opts.binary)
+ if (!CopyGetInt16(cstate, &fld_count))
{
- char **field_strings;
- ListCell *cur;
- int fldct;
- int fieldno;
- char *string;
+ /* EOF detected (end of file, or protocol-level EOF) */
+ return false;
+ }
- /* read raw fields in the next line */
- if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
- return false;
+ if (fld_count == -1)
+ {
+ /*
+ * Received EOF marker. Wait for the protocol-level EOF, and
+ * complain if it doesn't come immediately. In COPY FROM STDIN,
+ * this ensures that we correctly handle CopyFail, if client
+ * chooses to send that now. When copying from file, we could
+ * ignore the rest of the file like in text mode, but we choose to
+ * be consistent with the COPY FROM STDIN case.
+ */
+ char dummy;
- /* check for overflowing fields */
- if (attr_count > 0 && fldct > attr_count)
+ if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("extra data after last expected column")));
+ errmsg("received copy data after EOF marker")));
+ return false;
+ }
- fieldno = 0;
+ if (fld_count != attr_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
- /* Loop to read the user attributes on the line. */
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- int m = attnum - 1;
- Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+ tupDesc = RelationGetDescr(cstate->rel);
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
+ Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+ cstate->cur_attname = NameStr(att->attname);
+ values[m] = CopyReadBinaryAttribute(cstate,
+ &in_functions[m],
+ typioparams[m],
+ att->atttypmod,
+ &nulls[m]);
+ cstate->cur_attname = NULL;
+ }
- if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for column \"%s\"",
- NameStr(att->attname))));
- string = field_strings[fieldno++];
+ return true;
+}
- if (cstate->convert_select_flags &&
- !cstate->convert_select_flags[m])
- {
- /* ignore input field, leaving column as NULL */
- continue;
- }
+bool
+CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls)
+{
+ TupleDesc tupDesc;
+ AttrNumber attr_count;
+ FmgrInfo *in_functions = cstate->in_functions;
+ Oid *typioparams = cstate->typioparams;
+ ExprState **defexprs = cstate->defexprs;
+ char **field_strings;
+ ListCell *cur;
+ int fldct;
+ int fieldno = 0;
+ char *string;
- if (cstate->opts.csv_mode)
- {
- if (string == NULL &&
- cstate->opts.force_notnull_flags[m])
- {
- /*
- * FORCE_NOT_NULL option is set and column is NULL -
- * convert it to the NULL string.
- */
- string = cstate->opts.null_print;
- }
- else if (string != NULL && cstate->opts.force_null_flags[m]
- && strcmp(string, cstate->opts.null_print) == 0)
- {
- /*
- * FORCE_NULL option is set and column matches the NULL
- * string. It must have been quoted, or otherwise the
- * string would already have been set to NULL. Convert it
- * to NULL as specified.
- */
- string = NULL;
- }
- }
+ attr_count = list_length(cstate->attnumlist);
+
+ /* read raw fields in the next line */
+ if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+ return false;
+
+ /* check for overflowing fields */
+ if (attr_count > 0 && fldct > attr_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("extra data after last expected column")));
- cstate->cur_attname = NameStr(att->attname);
- cstate->cur_attval = string;
+ tupDesc = RelationGetDescr(cstate->rel);
+ /* Loop to read the user attributes on the line. */
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
+ Form_pg_attribute att = TupleDescAttr(tupDesc, m);
- if (string != NULL)
- nulls[m] = false;
+ if (fieldno >= fldct)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for column \"%s\"",
+ NameStr(att->attname))));
+ string = field_strings[fieldno++];
- if (cstate->defaults[m])
+ if (cstate->convert_select_flags &&
+ !cstate->convert_select_flags[m])
+ {
+ /* ignore input field, leaving column as NULL */
+ continue;
+ }
+
+ if (cstate->opts.csv_mode)
+ {
+ if (string == NULL &&
+ cstate->opts.force_notnull_flags[m])
{
/*
- * The caller must supply econtext and have switched into the
- * per-tuple memory context in it.
+ * FORCE_NOT_NULL option is set and column is NULL -
+ * convert it to the NULL string.
*/
- Assert(econtext != NULL);
- Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
- values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+ string = cstate->opts.null_print;
+ }
+ else if (string != NULL && cstate->opts.force_null_flags[m]
+ && strcmp(string, cstate->opts.null_print) == 0)
+ {
+ /*
+ * FORCE_NULL option is set and column matches the NULL
+ * string. It must have been quoted, or otherwise the
+ * string would already have been set to NULL. Convert it
+ * to NULL as specified.
+ */
+ string = NULL;
}
- else
- values[m] = InputFunctionCall(&in_functions[m],
- string,
- typioparams[m],
- att->atttypmod);
-
- cstate->cur_attname = NULL;
- cstate->cur_attval = NULL;
}
- Assert(fieldno == attr_count);
- }
- else
- {
- /* binary */
- int16 fld_count;
- ListCell *cur;
+ cstate->cur_attname = NameStr(att->attname);
+ cstate->cur_attval = string;
- cstate->cur_lineno++;
+ if (string != NULL)
+ nulls[m] = false;
- if (!CopyGetInt16(cstate, &fld_count))
- {
- /* EOF detected (end of file, or protocol-level EOF) */
- return false;
- }
-
- if (fld_count == -1)
+ if (cstate->defaults[m])
{
/*
- * Received EOF marker. Wait for the protocol-level EOF, and
- * complain if it doesn't come immediately. In COPY FROM STDIN,
- * this ensures that we correctly handle CopyFail, if client
- * chooses to send that now. When copying from file, we could
- * ignore the rest of the file like in text mode, but we choose to
- * be consistent with the COPY FROM STDIN case.
+ * The caller must supply econtext and have switched into the
+ * per-tuple memory context in it.
*/
- char dummy;
+ Assert(econtext != NULL);
+ Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
- if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("received copy data after EOF marker")));
- return false;
+ values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
}
+ else
+ values[m] = InputFunctionCall(&in_functions[m],
+ string,
+ typioparams[m],
+ att->atttypmod);
- if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ cstate->cur_attname = NULL;
+ cstate->cur_attval = NULL;
+ }
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- int m = attnum - 1;
- Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
- cstate->cur_attname = NameStr(att->attname);
- values[m] = CopyReadBinaryAttribute(cstate,
- &in_functions[m],
- typioparams[m],
- att->atttypmod,
- &nulls[m]);
- cstate->cur_attname = NULL;
- }
+ Assert(fieldno == attr_count);
+ return true;
+}
+
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each column that is
+ * either not read from the file or is using the DEFAULT option of COPY FROM.
+ * It can be NULL when no default values are used, i.e. when all columns are
+ * read from the file, and DEFAULT option is unset.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ */
+bool
+NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls)
+{
+ TupleDesc tupDesc;
+ AttrNumber num_phys_attrs,
+ num_defaults = cstate->num_defaults;
+ int i;
+ int *defmap = cstate->defmap;
+ ExprState **defexprs = cstate->defexprs;
+
+ tupDesc = RelationGetDescr(cstate->rel);
+ num_phys_attrs = tupDesc->natts;
+
+ /* Initialize all values for row to NULL */
+ MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+ MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+ MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+
+ if (!cstate->opts.handler.copy_from_next(cstate, econtext, values, nulls))
+ {
+ return false;
}
/*
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..4538bc6292 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,205 @@ static void CopySendEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
+/*
+ * CopyHandlerOps implementation of COPY TO for "text" and "csv".
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their behavior.
+ * We can split this implementation and stop referring cstate->opts.csv_mode
+ * later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:
+ /* Default line termination depends on platform */
+#ifndef WIN32
+ CopySendChar(cstate, '\n');
+#else
+ CopySendString(cstate, "\r\n");
+#endif
+ break;
+ case COPY_FRONTEND:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ CopySendChar(cstate, '\n');
+ break;
+ default:
+ break;
+ }
+ CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * For non-binary copy, we need to convert null_print to file
+ * encoding, because it will be sent directly with CopySendString.
+ */
+ if (cstate->need_transcoding)
+ cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+ cstate->opts.null_print_len,
+ cstate->file_encoding);
+
+ /* if a header has been requested send the line */
+ if (cstate->opts.header_line)
+ {
+ bool hdr_delim = false;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ char *colname;
+
+ if (hdr_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ hdr_delim = true;
+
+ colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, colname, false,
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, colname);
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+ }
+}
+
+void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (need_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ need_delim = true;
+
+ if (isnull)
+ CopySendString(cstate, cstate->opts.null_print_client);
+ else
+ {
+ char *string;
+
+ string = OutputFunctionCall(&out_functions[attnum - 1], value);
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->opts.force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, string);
+ }
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyHandlerOps implementation for "binary" COPY TO.
+ */
+
+void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /* Generate header for a binary copy */
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ CopySendInt32(cstate, 0);
+ /* No header extension */
+ CopySendInt32(cstate, 0);
+}
+
+void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (isnull)
+ CopySendInt32(cstate, -1);
+ else
+ {
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+}
/*
* Send copy start/stop messages for frontend copies. These have changed
@@ -198,16 +397,6 @@ CopySendEndOfRow(CopyToState cstate)
switch (cstate->copy_dest)
{
case COPY_FILE:
- if (!cstate->opts.binary)
- {
- /* Default line termination depends on platform */
-#ifndef WIN32
- CopySendChar(cstate, '\n');
-#else
- CopySendString(cstate, "\r\n");
-#endif
- }
-
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
@@ -242,10 +431,6 @@ CopySendEndOfRow(CopyToState cstate)
}
break;
case COPY_FRONTEND:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->opts.binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
@@ -748,8 +933,6 @@ DoCopyTo(CopyToState cstate)
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
- int num_phys_attrs;
- ListCell *cur;
uint64 processed;
if (fe_copy)
@@ -759,32 +942,11 @@ DoCopyTo(CopyToState cstate)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
- num_phys_attrs = tupDesc->natts;
cstate->opts.null_print_client = cstate->opts.null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
- /* Get info about the columns we need to process. */
- cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Oid out_func_oid;
- bool isvarlena;
- Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
- if (cstate->opts.binary)
- getTypeBinaryOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- else
- getTypeOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
- }
-
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
@@ -795,57 +957,7 @@ DoCopyTo(CopyToState cstate)
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
- if (cstate->opts.binary)
- {
- /* Generate header for a binary copy */
- int32 tmp;
-
- /* Signature */
- CopySendData(cstate, BinarySignature, 11);
- /* Flags field */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- /* No header extension */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- }
- else
- {
- /*
- * For non-binary copy, we need to convert null_print to file
- * encoding, because it will be sent directly with CopySendString.
- */
- if (cstate->need_transcoding)
- cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
- cstate->opts.null_print_len,
- cstate->file_encoding);
-
- /* if a header has been requested send the line */
- if (cstate->opts.header_line)
- {
- bool hdr_delim = false;
-
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- char *colname;
-
- if (hdr_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- hdr_delim = true;
-
- colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, colname, false,
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, colname);
- }
-
- CopySendEndOfRow(cstate);
- }
- }
+ cstate->opts.handler.copy_to_start(cstate, tupDesc);
if (cstate->rel)
{
@@ -884,13 +996,7 @@ DoCopyTo(CopyToState cstate)
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
- if (cstate->opts.binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
+ cstate->opts.handler.copy_to_end(cstate);
MemoryContextDelete(cstate->rowcontext);
@@ -906,71 +1012,15 @@ DoCopyTo(CopyToState cstate)
static void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
- bool need_delim = false;
- FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
- ListCell *cur;
- char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
- if (cstate->opts.binary)
- {
- /* Binary per-tuple header */
- CopySendInt16(cstate, list_length(cstate->attnumlist));
- }
-
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Datum value = slot->tts_values[attnum - 1];
- bool isnull = slot->tts_isnull[attnum - 1];
-
- if (!cstate->opts.binary)
- {
- if (need_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- need_delim = true;
- }
-
- if (isnull)
- {
- if (!cstate->opts.binary)
- CopySendString(cstate, cstate->opts.null_print_client);
- else
- CopySendInt32(cstate, -1);
- }
- else
- {
- if (!cstate->opts.binary)
- {
- string = OutputFunctionCall(&out_functions[attnum - 1],
- value);
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, string,
- cstate->opts.force_quote_flags[attnum - 1],
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, string);
- }
- else
- {
- bytea *outputbytes;
-
- outputbytes = SendFunctionCall(&out_functions[attnum - 1],
- value);
- CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
- CopySendData(cstate, VARDATA(outputbytes),
- VARSIZE(outputbytes) - VARHDRSZ);
- }
- }
- }
-
- CopySendEndOfRow(cstate);
+ cstate->opts.handler.copy_to_one_row(cstate, slot);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..5b3ffcd190 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,34 @@ typedef enum CopyHeaderChoice
COPY_HEADER_MATCH,
} CopyHeaderChoice;
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyHandlerOps
+{
+ /* Called when COPY TO is started. This will send a header. */
+ void (*copy_to_start) (CopyToState cstate, TupleDesc tupDesc);
+
+ /* Copy one row for COPY TO. */
+ void (*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+ /* Called when COPY TO is ended. This will send a trailer. */
+ void (*copy_to_end) (CopyToState cstate);
+
+ void (*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc);
+ bool (*copy_from_next) (CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+ void (*copy_from_error_callback) (CopyFromState cstate);
+ void (*copy_from_end) (CopyFromState cstate);
+} CopyHandlerOps;
+
+/* Predefined CopyToFormatOps for "text", "csv" and "binary". */
+extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsText;
+extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsCSV;
+extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsBinary;
+
/*
* A struct to hold COPY options, in a parsed form. All of these are related
* to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +91,9 @@ typedef struct CopyFormatOptions
bool *force_null_flags; /* per-column CSV FN flags */
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
+ CopyHandlerOps handler; /* copy handler operations */
} CopyFormatOptions;
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
typedef void (*copy_data_dest_cb) (void *data, int len);
@@ -102,4 +127,20 @@ extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
List *attnamelist);
+extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatTextEnd(CopyToState cstate);
+extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+extern void CopyFromFormatTextErrorCallback(CopyFromState cstate);
+
+extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatBinaryEnd(CopyToState cstate);
+extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate);
+
#endif /* COPY_H */
--
2.41.0
On Wed, Dec 6, 2023 at 8:32 PM Daniel Verite <daniel@manitou-mail.org> wrote:
Sutou Kouhei wrote:
* 2022-04: Apache Arrow [2]
* 2018-02: Apache Avro, Apache Parquet and Apache ORC [3](FYI: I want to add support for Apache Arrow.)
There were discussions how to add support for more formats. [3][4]
In these discussions, we got a consensus about making COPY
format extendable.These formats seem all column-oriented whereas COPY is row-oriented
at the protocol level [1].
With regard to the procotol, how would it work to support these formats?
They have kind of *RowGroup* concepts, a bunch of rows goes to a RowBatch
and the data of the same column goes together.
I think they should fit the COPY semantics and there are some FDW out there for
these modern formats, like [1]https://github.com/adjust/parquet_fdw. If we support COPY to deal with the
format, it will
be easier to interact with them(without creating
server/usermapping/foreign table).
[1]: https://github.com/adjust/parquet_fdw
[1] https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
Best regards,
--
Daniel Vérité
https://postgresql.verite.pro/
Twitter: @DanielVerite
--
Regards
Junwang Zhao
On Wed, Dec 06, 2023 at 10:07:51PM +0800, Junwang Zhao wrote:
I read the thread[1] you posted and I think Andres's suggestion sounds great.
Should we extract both *copy to* and *copy from* for the first step, in that
case we can add the pg_copy_handler catalog smoothly later.Attached V4 adds 'extract copy from' and it passed the cirrus ci,
please take a look.I added a hook *copy_from_end* but this might be removed later if not used.
[1]: /messages/by-id/20180211211235.5x3jywe5z3lkgcsr@alap3.anarazel.de
I was looking at the differences between v3 posted by Sutou-san and
v4 from you, seeing that:
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyHandlerOps
{
/* Called when COPY TO is started. This will send a header. */
- void (*start) (CopyToState cstate, TupleDesc tupDesc);
+ void (*copy_to_start) (CopyToState cstate, TupleDesc tupDesc);
/* Copy one row for COPY TO. */
- void (*one_row) (CopyToState cstate, TupleTableSlot *slot);
+ void (*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot);
/* Called when COPY TO is ended. This will send a trailer. */
- void (*end) (CopyToState cstate);
-} CopyToFormatOps;
+ void (*copy_to_end) (CopyToState cstate);
+
+ void (*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc);
+ bool (*copy_from_next) (CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+ void (*copy_from_error_callback) (CopyFromState cstate);
+ void (*copy_from_end) (CopyFromState cstate);
+} CopyHandlerOps;
And we've spent a good deal of time refactoring the copy code so as
the logic behind TO and FROM is split. Having a set of routines that
groups both does not look like a step in the right direction to me,
and v4 is an attempt at solving two problems, while v3 aims to improve
one case. It seems to me that each callback portion should be focused
on staying in its own area of the code, aka copyfrom*.c or copyto*.c.
--
Michael
Hi,
In <CAEG8a3LSRhK601Bn50u71BgfNWm4q3kv-o-KEq=hrbyLbY_EsA@mail.gmail.com>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 22:07:51 +0800,
Junwang Zhao <zhjwpku@gmail.com> wrote:
Should we extract both *copy to* and *copy from* for the first step, in that
case we can add the pg_copy_handler catalog smoothly later.
I don't object it (mixing TO/FROM changes to one patch) but
it may make review difficult. Is it acceptable?
FYI: I planed that I implement TO part, and then FROM part,
and then unify TO/FROM parts if needed. [1]/messages/by-id/20231204.153548.2126325458835528809.kou@clear-code.com
Attached V4 adds 'extract copy from' and it passed the cirrus ci,
please take a look.
Thanks. Here are my comments:
+ /* + * Error is relevant to a particular line. + * + * If line_buf still contains the correct line, print it. + */ + if (cstate->line_buf_valid)
We need to fix the indentation.
+CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + AttrNumber num_phys_attrs; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + num_phys_attrs = tupDesc->natts; + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
We need to update the comment because defaults and
constraints aren't picked up here.
+CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc)
...
+ /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
ditto.
@@ -1716,15 +1776,6 @@ BeginCopyFrom(ParseState *pstate,
ReceiveCopyBinaryHeader(cstate);
}
I think that this block should be moved to
CopyFromFormatBinaryStart() too. But we need to run it after
we setup inputs such as data_source_cb, pipe and filename...
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyHandlerOps
+{
+ /* Called when COPY TO is started. This will send a header. */
+ void (*copy_to_start) (CopyToState cstate, TupleDesc tupDesc);
+
+ /* Copy one row for COPY TO. */
+ void (*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+ /* Called when COPY TO is ended. This will send a trailer. */
+ void (*copy_to_end) (CopyToState cstate);
+
+ void (*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc);
+ bool (*copy_from_next) (CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+ void (*copy_from_error_callback) (CopyFromState cstate);
+ void (*copy_from_end) (CopyFromState cstate);
+} CopyHandlerOps;
It seems that "copy_" prefix is redundant. Should we use
"to_start" instead of "copy_to_start" and so on?
BTW, it seems that "COPY FROM (FORMAT json)" may not be implemented. [2]/messages/by-id/CALvfUkBxTYy5uWPFVwpk_7ii2zgT07t3d-yR_cy4sfrrLU=kcg@mail.gmail.com
We may need to care about NULL copy_from_* cases.
I added a hook *copy_from_end* but this might be removed later if not used.
It may be useful to clean up resources for COPY FROM but the
patch doesn't call the copy_from_end. How about removing it
for now? We can add it and call it from EndCopyFrom() later?
Because it's not needed for now.
I think that we should focus on refactoring instead of
adding a new feature in this patch.
[1]: /messages/by-id/20231204.153548.2126325458835528809.kou@clear-code.com
[2]: /messages/by-id/CALvfUkBxTYy5uWPFVwpk_7ii2zgT07t3d-yR_cy4sfrrLU=kcg@mail.gmail.com
Thanks,
--
kou
On Thu, Dec 7, 2023 at 8:39 AM Michael Paquier <michael@paquier.xyz> wrote:
On Wed, Dec 06, 2023 at 10:07:51PM +0800, Junwang Zhao wrote:
I read the thread[1] you posted and I think Andres's suggestion sounds great.
Should we extract both *copy to* and *copy from* for the first step, in that
case we can add the pg_copy_handler catalog smoothly later.Attached V4 adds 'extract copy from' and it passed the cirrus ci,
please take a look.I added a hook *copy_from_end* but this might be removed later if not used.
[1]: /messages/by-id/20180211211235.5x3jywe5z3lkgcsr@alap3.anarazel.de
I was looking at the differences between v3 posted by Sutou-san and
v4 from you, seeing that:+/* Routines for a COPY HANDLER implementation. */ +typedef struct CopyHandlerOps { /* Called when COPY TO is started. This will send a header. */ - void (*start) (CopyToState cstate, TupleDesc tupDesc); + void (*copy_to_start) (CopyToState cstate, TupleDesc tupDesc);/* Copy one row for COPY TO. */ - void (*one_row) (CopyToState cstate, TupleTableSlot *slot); + void (*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot);/* Called when COPY TO is ended. This will send a trailer. */ - void (*end) (CopyToState cstate); -} CopyToFormatOps; + void (*copy_to_end) (CopyToState cstate); + + void (*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc); + bool (*copy_from_next) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + void (*copy_from_error_callback) (CopyFromState cstate); + void (*copy_from_end) (CopyFromState cstate); +} CopyHandlerOps;And we've spent a good deal of time refactoring the copy code so as
the logic behind TO and FROM is split. Having a set of routines that
groups both does not look like a step in the right direction to me,
The point of this refactor (from my view) is to make it possible to add new
copy handlers in extensions, just like access method. As Andres suggested,
a system catalog like *pg_copy_handler*, if we split TO and FROM into two
sets of routines, does that mean we have to create two catalog(
pg_copy_from_handler and pg_copy_to_handler)?
and v4 is an attempt at solving two problems, while v3 aims to improve
one case. It seems to me that each callback portion should be focused
on staying in its own area of the code, aka copyfrom*.c or copyto*.c.
--
Michael
--
Regards
Junwang Zhao
On Thu, Dec 7, 2023 at 1:05 PM Sutou Kouhei <kou@clear-code.com> wrote:
Hi,
In <CAEG8a3LSRhK601Bn50u71BgfNWm4q3kv-o-KEq=hrbyLbY_EsA@mail.gmail.com>
"Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 22:07:51 +0800,
Junwang Zhao <zhjwpku@gmail.com> wrote:Should we extract both *copy to* and *copy from* for the first step, in that
case we can add the pg_copy_handler catalog smoothly later.I don't object it (mixing TO/FROM changes to one patch) but
it may make review difficult. Is it acceptable?FYI: I planed that I implement TO part, and then FROM part,
and then unify TO/FROM parts if needed. [1]
I'm fine with step by step refactoring, let's just wait for more
suggestions.
Attached V4 adds 'extract copy from' and it passed the cirrus ci,
please take a look.Thanks. Here are my comments:
+ /* + * Error is relevant to a particular line. + * + * If line_buf still contains the correct line, print it. + */ + if (cstate->line_buf_valid)We need to fix the indentation.
+CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + AttrNumber num_phys_attrs; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + num_phys_attrs = tupDesc->natts; + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));We need to update the comment because defaults and
constraints aren't picked up here.+CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc)
...
+ /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));ditto.
@@ -1716,15 +1776,6 @@ BeginCopyFrom(ParseState *pstate,
ReceiveCopyBinaryHeader(cstate);
}I think that this block should be moved to
CopyFromFormatBinaryStart() too. But we need to run it after
we setup inputs such as data_source_cb, pipe and filename...+/* Routines for a COPY HANDLER implementation. */ +typedef struct CopyHandlerOps +{ + /* Called when COPY TO is started. This will send a header. */ + void (*copy_to_start) (CopyToState cstate, TupleDesc tupDesc); + + /* Copy one row for COPY TO. */ + void (*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO is ended. This will send a trailer. */ + void (*copy_to_end) (CopyToState cstate); + + void (*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc); + bool (*copy_from_next) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + void (*copy_from_error_callback) (CopyFromState cstate); + void (*copy_from_end) (CopyFromState cstate); +} CopyHandlerOps;It seems that "copy_" prefix is redundant. Should we use
"to_start" instead of "copy_to_start" and so on?BTW, it seems that "COPY FROM (FORMAT json)" may not be implemented. [2]
We may need to care about NULL copy_from_* cases.I added a hook *copy_from_end* but this might be removed later if not used.
It may be useful to clean up resources for COPY FROM but the
patch doesn't call the copy_from_end. How about removing it
for now? We can add it and call it from EndCopyFrom() later?
Because it's not needed for now.I think that we should focus on refactoring instead of
adding a new feature in this patch.[1]: /messages/by-id/20231204.153548.2126325458835528809.kou@clear-code.com
[2]: /messages/by-id/CALvfUkBxTYy5uWPFVwpk_7ii2zgT07t3d-yR_cy4sfrrLU=kcg@mail.gmail.comThanks,
--
kou
--
Regards
Junwang Zhao
On 2023-12-07 Th 03:37, Junwang Zhao wrote:
The point of this refactor (from my view) is to make it possible to add new
copy handlers in extensions, just like access method. As Andres suggested,
a system catalog like *pg_copy_handler*, if we split TO and FROM into two
sets of routines, does that mean we have to create two catalog(
pg_copy_from_handler and pg_copy_to_handler)?
Surely not. Either have two fields, one for the TO handler and one for
the FROM handler, or a flag on each row indicating if it's a FROM or TO
handler.
cheers
andrew
--
Andrew Dunstan
EDB: https://www.enterprisedb.com
On Fri, Dec 8, 2023 at 1:39 AM Andrew Dunstan <andrew@dunslane.net> wrote:
On 2023-12-07 Th 03:37, Junwang Zhao wrote:
The point of this refactor (from my view) is to make it possible to add new
copy handlers in extensions, just like access method. As Andres suggested,
a system catalog like *pg_copy_handler*, if we split TO and FROM into two
sets of routines, does that mean we have to create two catalog(
pg_copy_from_handler and pg_copy_to_handler)?Surely not. Either have two fields, one for the TO handler and one for
the FROM handler, or a flag on each row indicating if it's a FROM or TO
handler.
True.
But why do we need a system catalog like pg_copy_handler in the first
place? I imagined that an extension can define a handler function
returning a set of callbacks and the parser can lookup the handler
function by name, like FDW and TABLESAMPLE.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
On Fri, Dec 8, 2023 at 3:27 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Fri, Dec 8, 2023 at 1:39 AM Andrew Dunstan <andrew@dunslane.net> wrote:
On 2023-12-07 Th 03:37, Junwang Zhao wrote:
The point of this refactor (from my view) is to make it possible to add new
copy handlers in extensions, just like access method. As Andres suggested,
a system catalog like *pg_copy_handler*, if we split TO and FROM into two
sets of routines, does that mean we have to create two catalog(
pg_copy_from_handler and pg_copy_to_handler)?Surely not. Either have two fields, one for the TO handler and one for
the FROM handler, or a flag on each row indicating if it's a FROM or TO
handler.
If we wrap the two fields into a single structure, that will still be in
copy.h, which I think is not necessary. A single routing wrapper should
be enough, the actual implementation still stays separate
copy_[to/from].c files.
True.
But why do we need a system catalog like pg_copy_handler in the first
place? I imagined that an extension can define a handler function
returning a set of callbacks and the parser can lookup the handler
function by name, like FDW and TABLESAMPLE.
I can see FDW related utility commands but no TABLESAMPLE related,
and there is a pg_foreign_data_wrapper system catalog which has
a *fdwhandler* field.
If we want extensions to create a new copy handler, I think
something like pg_copy_hander should be necessary.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
I go one step further to implement the pg_copy_handler, attached V5 is
the implementation with some changes suggested by Kou.
You can also review this on this github pull request [1]https://github.com/zhjwpku/postgres/pull/1/files.
[1]: https://github.com/zhjwpku/postgres/pull/1/files
--
Regards
Junwang Zhao
Attachments:
v5-0001-Extract-COPY-handlers.patchapplication/octet-stream; name=v5-0001-Extract-COPY-handlers.patchDownload
From 30a383c048d948c370e26149820c4bc5ea004d94 Mon Sep 17 00:00:00 2001
From: Zhao Junwang <zhjwpku@gmail.com>
Date: Wed, 6 Dec 2023 19:13:22 +0800
Subject: [PATCH v5] Extract COPY handlers
---
src/backend/catalog/Makefile | 4 +-
src/backend/commands/copy.c | 159 ++++++++++-
src/backend/commands/copyfrom.c | 269 ++++++++++--------
src/backend/commands/copyfromparse.c | 309 +++++++++++----------
src/backend/commands/copyto.c | 354 ++++++++++++++----------
src/backend/nodes/Makefile | 1 +
src/backend/nodes/gen_node_support.pl | 2 +
src/backend/utils/adt/pseudotypes.c | 1 +
src/include/catalog/meson.build | 2 +
src/include/catalog/pg_copy_handler.dat | 25 ++
src/include/catalog/pg_copy_handler.h | 50 ++++
src/include/catalog/pg_proc.dat | 21 ++
src/include/catalog/pg_type.dat | 7 +
src/include/commands/copy.h | 51 +++-
src/test/regress/expected/oidjoins.out | 1 +
15 files changed, 839 insertions(+), 417 deletions(-)
create mode 100644 src/include/catalog/pg_copy_handler.dat
create mode 100644 src/include/catalog/pg_copy_handler.h
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index ec7b6f5362..b0c4cdcdf2 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -118,7 +118,8 @@ CATALOG_HEADERS := \
pg_publication_namespace.h \
pg_publication_rel.h \
pg_subscription.h \
- pg_subscription_rel.h
+ pg_subscription_rel.h \
+ pg_copy_handler.h \
GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h
@@ -150,6 +151,7 @@ POSTGRES_BKI_DATA = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_ts_parser.dat \
pg_ts_template.dat \
pg_type.dat \
+ pg_copy_handler.dat \
)
all: generated-header-symlinks
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..d5c1cb50a3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -18,10 +18,12 @@
#include <unistd.h>
#include <sys/stat.h>
+#include "access/genam.h"
#include "access/sysattr.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/pg_authid.h"
+#include "catalog/pg_copy_handler.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "executor/executor.h"
@@ -36,6 +38,8 @@
#include "rewrite/rewriteHandler.h"
#include "utils/acl.h"
#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/formatting.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@@ -427,6 +431,8 @@ ProcessCopyOptions(ParseState *pstate,
opts_out->file_encoding = -1;
+ /* Text is the default format. */
+ opts_out->handler = GetCopyRoutineByName(DEFAULT_COPY_HANDLER);
/* Extract options from the statement node tree */
foreach(option, options)
{
@@ -439,17 +445,11 @@ ProcessCopyOptions(ParseState *pstate,
if (format_specified)
errorConflictingDefElem(defel, pstate);
format_specified = true;
- if (strcmp(fmt, "text") == 0)
- /* default format */ ;
- else if (strcmp(fmt, "csv") == 0)
+ opts_out->handler = GetCopyRoutineByName(fmt);
+ if (strcmp(fmt, "csv") == 0)
opts_out->csv_mode = true;
else if (strcmp(fmt, "binary") == 0)
opts_out->binary = true;
- else
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("COPY format \"%s\" not recognized", fmt),
- parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "freeze") == 0)
{
@@ -864,3 +864,146 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
return attnums;
}
+
+static const
+CopyRoutine CopyRoutineText = {
+ .type = T_CopyRoutine,
+ .to_start = CopyToFormatTextStart,
+ .to_one_row = CopyToFormatTextOneRow,
+ .to_end = CopyToFormatTextEnd,
+ .from_start = CopyFromFormatTextStart,
+ .from_next = CopyFromFormatTextNext,
+ .from_error_callback = CopyFromFormatTextErrorCallback,
+};
+
+/*
+ * We can use the same CopyRoutine for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+static const
+CopyRoutine CopyRoutineCSV = {
+ .type = T_CopyRoutine,
+ .to_start = CopyToFormatTextStart,
+ .to_one_row = CopyToFormatTextOneRow,
+ .to_end = CopyToFormatTextEnd,
+ .from_start = CopyFromFormatTextStart,
+ .from_next = CopyFromFormatTextNext,
+ .from_error_callback = CopyFromFormatTextErrorCallback,
+};
+
+static const
+CopyRoutine CopyRoutineBinary = {
+ .type = T_CopyRoutine,
+ .to_start = CopyToFormatBinaryStart,
+ .to_one_row = CopyToFormatBinaryOneRow,
+ .to_end = CopyToFormatBinaryEnd,
+ .from_start = CopyFromFormatBinaryStart,
+ .from_next = CopyFromFormatBinaryNext,
+ .from_error_callback = CopyFromFormatBinaryErrorCallback,
+};
+
+Datum
+text_copy_handler(PG_FUNCTION_ARGS)
+{
+ PG_RETURN_POINTER(&CopyRoutineText);
+}
+
+Datum
+csv_copy_handler(PG_FUNCTION_ARGS)
+{
+ PG_RETURN_POINTER(&CopyRoutineCSV);
+}
+
+Datum
+binary_copy_handler(PG_FUNCTION_ARGS)
+{
+ PG_RETURN_POINTER(&CopyRoutineBinary);
+}
+
+static NameData
+fmt_to_name(char *fmt)
+{
+ char *lcf; /* lower cased fmt */
+ size_t len;
+ NameData fmtname;
+
+ if (strlen(fmt) >= NAMEDATALEN)
+ elog(ERROR, "fmt name \"%s\" exceeds maximum name length "
+ "of %d bytes", fmt, NAMEDATALEN - 1);
+
+ len = strlen(fmt);
+ lcf = asc_tolower(fmt, len);
+ len = strlen(lcf);
+
+ memcpy(&(NameStr(fmtname)), lcf, len);
+ NameStr(fmtname)[len] = '\0';
+ pfree(lcf);
+
+ return fmtname;
+}
+
+CopyRoutine *
+GetCopyRoutine(Oid copyhandler)
+{
+ Datum datum;
+ CopyRoutine *routine;
+
+ datum = OidFunctionCall0(copyhandler);
+ routine = (CopyRoutine *) DatumGetPointer(datum);
+
+ if (routine == NULL || !IsA(routine, CopyRoutine))
+ elog(ERROR, "copy handler function %u did not return an CopyRoutine struct",
+ copyhandler);
+
+ return routine;
+}
+
+CopyRoutine *
+GetCopyRoutineByName(char *fmt)
+{
+ HeapTuple tuple;
+ NameData fmtname;
+ Relation chrel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ Form_pg_copy_handler chform;
+ regproc copyhandler;
+
+ fmtname = fmt_to_name(fmt);
+
+ chrel = table_open(CopyHandlerRelationId, AccessShareLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_copy_handler_chname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ NameGetDatum(&fmtname));
+
+ scan = systable_beginscan(chrel, CopyHandlerNameIndexId, true,
+ NULL, 1, &scankey);
+ tuple = systable_getnext(scan);
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY format \"%s\" not recognized", fmt)));
+
+ chform = (Form_pg_copy_handler)GETSTRUCT(tuple);
+
+ copyhandler = chform->copyhandler;
+
+ /* Complain if handler OID is invalid */
+ if (!RegProcedureIsValid(copyhandler))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("index access method \"%s\" does not have a handler",
+ NameStr(chform->chname))));
+ }
+
+ systable_endscan(scan);
+ table_close(chrel, AccessShareLock);
+
+ /* And finally, call the handler function to get the API struct. */
+ return GetCopyRoutine(copyhandler);
+}
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f4861652a9..c2d92108c1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,83 +107,88 @@ static char *limit_printout_length(const char *str);
static void ClosePipeFromProgram(CopyFromState cstate);
-/*
- * error context callback for COPY FROM
- *
- * The argument for the error context must be CopyFromState.
- */
void
-CopyFromErrorCallback(void *arg)
+CopyFromFormatBinaryErrorCallback(CopyFromState cstate)
{
- CopyFromState cstate = (CopyFromState) arg;
+ /* can't usefully display the data */
+ if (cstate->cur_attname)
+ errcontext("COPY %s, line %llu, column %s",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno,
+ cstate->cur_attname);
+ else
+ errcontext("COPY %s, line %llu",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno);
+}
- if (cstate->relname_only)
+void
+CopyFromFormatTextErrorCallback(CopyFromState cstate)
+{
+ if (cstate->cur_attname && cstate->cur_attval)
{
- errcontext("COPY %s",
- cstate->cur_relname);
- return;
+ /* error is relevant to a particular column */
+ char *attval;
+
+ attval = limit_printout_length(cstate->cur_attval);
+ errcontext("COPY %s, line %llu, column %s: \"%s\"",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno,
+ cstate->cur_attname,
+ attval);
+ pfree(attval);
}
- if (cstate->opts.binary)
+ else if (cstate->cur_attname)
{
- /* can't usefully display the data */
- if (cstate->cur_attname)
- errcontext("COPY %s, line %llu, column %s",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno,
- cstate->cur_attname);
- else
- errcontext("COPY %s, line %llu",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno);
+ /* error is relevant to a particular column, value is NULL */
+ errcontext("COPY %s, line %llu, column %s: null input",
+ cstate->cur_relname,
+ (unsigned long long) cstate->cur_lineno,
+ cstate->cur_attname);
}
else
{
- if (cstate->cur_attname && cstate->cur_attval)
+ /*
+ * Error is relevant to a particular line.
+ *
+ * If line_buf still contains the correct line, print it.
+ */
+ if (cstate->line_buf_valid)
{
- /* error is relevant to a particular column */
- char *attval;
+ char *lineval;
- attval = limit_printout_length(cstate->cur_attval);
- errcontext("COPY %s, line %llu, column %s: \"%s\"",
+ lineval = limit_printout_length(cstate->line_buf.data);
+ errcontext("COPY %s, line %llu: \"%s\"",
cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno,
- cstate->cur_attname,
- attval);
- pfree(attval);
+ (unsigned long long) cstate->cur_lineno, lineval);
+ pfree(lineval);
}
- else if (cstate->cur_attname)
+ else
{
- /* error is relevant to a particular column, value is NULL */
- errcontext("COPY %s, line %llu, column %s: null input",
+ errcontext("COPY %s, line %llu",
cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno,
- cstate->cur_attname);
+ (unsigned long long) cstate->cur_lineno);
}
- else
- {
- /*
- * Error is relevant to a particular line.
- *
- * If line_buf still contains the correct line, print it.
- */
- if (cstate->line_buf_valid)
- {
- char *lineval;
+ }
+}
- lineval = limit_printout_length(cstate->line_buf.data);
- errcontext("COPY %s, line %llu: \"%s\"",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno, lineval);
- pfree(lineval);
- }
- else
- {
- errcontext("COPY %s, line %llu",
- cstate->cur_relname,
- (unsigned long long) cstate->cur_lineno);
- }
- }
+/*
+ * error context callback for COPY FROM
+ *
+ * The argument for the error context must be CopyFromState.
+ */
+void
+CopyFromErrorCallback(void *arg)
+{
+ CopyFromState cstate = (CopyFromState) arg;
+
+ if (cstate->relname_only)
+ {
+ errcontext("COPY %s",
+ cstate->cur_relname);
+ return;
}
+ cstate->opts.handler->from_error_callback(cstate);
}
/*
@@ -1320,6 +1325,99 @@ CopyFrom(CopyFromState cstate)
return processed;
}
+void
+CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+ FmgrInfo *in_functions;
+ Oid *typioparams;
+ Oid in_func_oid;
+ AttrNumber num_phys_attrs;
+
+ /*
+ * Pick up the required catalog information for each attribute in the
+ * relation, including the input function, the element type (to pass to
+ * the input function).
+ */
+ num_phys_attrs = tupDesc->natts;
+ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+ for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+ /* We don't need info for dropped attributes */
+ if (att->attisdropped)
+ continue;
+
+ /* Fetch the input function and typioparam info */
+ getTypeBinaryInputInfo(att->atttypid,
+ &in_func_oid, &typioparams[attnum - 1]);
+
+ fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+ }
+ cstate->in_functions = in_functions;
+ cstate->typioparams = typioparams;
+}
+
+void
+CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+ FmgrInfo *in_functions;
+ Oid *typioparams;
+ Oid in_func_oid;
+ AttrNumber attr_count,
+ num_phys_attrs;
+
+ num_phys_attrs = tupDesc->natts;
+
+ /*
+ * If encoding conversion is needed, we need another buffer to hold
+ * the converted input data. Otherwise, we can just point input_buf
+ * to the same buffer as raw_buf.
+ */
+ if (cstate->need_transcoding)
+ {
+ cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+ cstate->input_buf_index = cstate->input_buf_len = 0;
+ }
+ else
+ cstate->input_buf = cstate->raw_buf;
+ cstate->input_reached_eof = false;
+
+ initStringInfo(&cstate->line_buf);
+
+ /* create workspace for CopyReadAttributes results */
+ attr_count = list_length(cstate->attnumlist);
+
+ cstate->max_fields = attr_count;
+ cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+
+ /*
+ * Pick up the required catalog information for each attribute in the
+ * relation, including the input function, the element type (to pass to
+ * the input function).
+ */
+ in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+ for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+ /* We don't need info for dropped attributes */
+ if (att->attisdropped)
+ continue;
+
+ /* Fetch the input function and typioparam info */
+ getTypeInputInfo(att->atttypid,
+ &in_func_oid, &typioparams[attnum - 1]);
+ fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+ }
+ cstate->in_functions = in_functions;
+ cstate->typioparams = typioparams;
+}
+
/*
* Setup to read tuples from a file for COPY FROM.
*
@@ -1348,9 +1446,6 @@ BeginCopyFrom(ParseState *pstate,
TupleDesc tupDesc;
AttrNumber num_phys_attrs,
num_defaults;
- FmgrInfo *in_functions;
- Oid *typioparams;
- Oid in_func_oid;
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
@@ -1518,25 +1613,6 @@ BeginCopyFrom(ParseState *pstate,
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->raw_reached_eof = false;
- if (!cstate->opts.binary)
- {
- /*
- * If encoding conversion is needed, we need another buffer to hold
- * the converted input data. Otherwise, we can just point input_buf
- * to the same buffer as raw_buf.
- */
- if (cstate->need_transcoding)
- {
- cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
- cstate->input_buf_index = cstate->input_buf_len = 0;
- }
- else
- cstate->input_buf = cstate->raw_buf;
- cstate->input_reached_eof = false;
-
- initStringInfo(&cstate->line_buf);
- }
-
initStringInfo(&cstate->attribute_buf);
/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1546,17 +1622,14 @@ BeginCopyFrom(ParseState *pstate,
cstate->rteperminfos = pstate->p_rteperminfos;
}
- num_defaults = 0;
- volatile_defexprs = false;
+ cstate->opts.handler->from_start(cstate, tupDesc);
/*
- * Pick up the required catalog information for each attribute in the
- * relation, including the input function, the element type (to pass to
- * the input function), and info about defaults and constraints. (Which
+ * Pick up info about defaults and constraints. (Which
* input function we use depends on text/binary format choice.)
*/
- in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+ num_defaults = 0;
+ volatile_defexprs = false;
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
@@ -1568,15 +1641,6 @@ BeginCopyFrom(ParseState *pstate,
if (att->attisdropped)
continue;
- /* Fetch the input function and typioparam info */
- if (cstate->opts.binary)
- getTypeBinaryInputInfo(att->atttypid,
- &in_func_oid, &typioparams[attnum - 1]);
- else
- getTypeInputInfo(att->atttypid,
- &in_func_oid, &typioparams[attnum - 1]);
- fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
/* Get default info if available */
defexprs[attnum - 1] = NULL;
@@ -1636,8 +1700,6 @@ BeginCopyFrom(ParseState *pstate,
cstate->bytes_processed = 0;
/* We keep those variables in cstate. */
- cstate->in_functions = in_functions;
- cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
cstate->volatile_defexprs = volatile_defexprs;
@@ -1716,15 +1778,6 @@ BeginCopyFrom(ParseState *pstate,
ReceiveCopyBinaryHeader(cstate);
}
- /* create workspace for CopyReadAttributes results */
- if (!cstate->opts.binary)
- {
- AttrNumber attr_count = list_length(cstate->attnumlist);
-
- cstate->max_fields = attr_count;
- cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
- }
-
MemoryContextSwitchTo(oldcontext);
return cstate;
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f553734582..bbe5bd1166 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -839,187 +839,208 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
return true;
}
-/*
- * Read next tuple from file for COPY FROM. Return false if no more tuples.
- *
- * 'econtext' is used to evaluate default expression for each column that is
- * either not read from the file or is using the DEFAULT option of COPY FROM.
- * It can be NULL when no default values are used, i.e. when all columns are
- * read from the file, and DEFAULT option is unset.
- *
- * 'values' and 'nulls' arrays must be the same length as columns of the
- * relation passed to BeginCopyFrom. This function fills the arrays.
- */
bool
-NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
- Datum *values, bool *nulls)
+CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls)
{
TupleDesc tupDesc;
- AttrNumber num_phys_attrs,
- attr_count,
- num_defaults = cstate->num_defaults;
+ AttrNumber attr_count;
+ int16 fld_count;
+ ListCell *cur;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
- int i;
- int *defmap = cstate->defmap;
- ExprState **defexprs = cstate->defexprs;
- tupDesc = RelationGetDescr(cstate->rel);
- num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
- /* Initialize all values for row to NULL */
- MemSet(values, 0, num_phys_attrs * sizeof(Datum));
- MemSet(nulls, true, num_phys_attrs * sizeof(bool));
- MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+ cstate->cur_lineno++;
- if (!cstate->opts.binary)
+ if (!CopyGetInt16(cstate, &fld_count))
{
- char **field_strings;
- ListCell *cur;
- int fldct;
- int fieldno;
- char *string;
+ /* EOF detected (end of file, or protocol-level EOF) */
+ return false;
+ }
- /* read raw fields in the next line */
- if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
- return false;
+ if (fld_count == -1)
+ {
+ /*
+ * Received EOF marker. Wait for the protocol-level EOF, and
+ * complain if it doesn't come immediately. In COPY FROM STDIN,
+ * this ensures that we correctly handle CopyFail, if client
+ * chooses to send that now. When copying from file, we could
+ * ignore the rest of the file like in text mode, but we choose to
+ * be consistent with the COPY FROM STDIN case.
+ */
+ char dummy;
- /* check for overflowing fields */
- if (attr_count > 0 && fldct > attr_count)
+ if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("extra data after last expected column")));
+ errmsg("received copy data after EOF marker")));
+ return false;
+ }
- fieldno = 0;
+ if (fld_count != attr_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
- /* Loop to read the user attributes on the line. */
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- int m = attnum - 1;
- Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+ tupDesc = RelationGetDescr(cstate->rel);
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
+ Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+ cstate->cur_attname = NameStr(att->attname);
+ values[m] = CopyReadBinaryAttribute(cstate,
+ &in_functions[m],
+ typioparams[m],
+ att->atttypmod,
+ &nulls[m]);
+ cstate->cur_attname = NULL;
+ }
- if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for column \"%s\"",
- NameStr(att->attname))));
- string = field_strings[fieldno++];
+ return true;
+}
- if (cstate->convert_select_flags &&
- !cstate->convert_select_flags[m])
- {
- /* ignore input field, leaving column as NULL */
- continue;
- }
+bool
+CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls)
+{
+ TupleDesc tupDesc;
+ AttrNumber attr_count;
+ FmgrInfo *in_functions = cstate->in_functions;
+ Oid *typioparams = cstate->typioparams;
+ ExprState **defexprs = cstate->defexprs;
+ char **field_strings;
+ ListCell *cur;
+ int fldct;
+ int fieldno = 0;
+ char *string;
- if (cstate->opts.csv_mode)
- {
- if (string == NULL &&
- cstate->opts.force_notnull_flags[m])
- {
- /*
- * FORCE_NOT_NULL option is set and column is NULL -
- * convert it to the NULL string.
- */
- string = cstate->opts.null_print;
- }
- else if (string != NULL && cstate->opts.force_null_flags[m]
- && strcmp(string, cstate->opts.null_print) == 0)
- {
- /*
- * FORCE_NULL option is set and column matches the NULL
- * string. It must have been quoted, or otherwise the
- * string would already have been set to NULL. Convert it
- * to NULL as specified.
- */
- string = NULL;
- }
- }
+ attr_count = list_length(cstate->attnumlist);
+
+ /* read raw fields in the next line */
+ if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+ return false;
+
+ /* check for overflowing fields */
+ if (attr_count > 0 && fldct > attr_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("extra data after last expected column")));
- cstate->cur_attname = NameStr(att->attname);
- cstate->cur_attval = string;
+ tupDesc = RelationGetDescr(cstate->rel);
+ /* Loop to read the user attributes on the line. */
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ int m = attnum - 1;
+ Form_pg_attribute att = TupleDescAttr(tupDesc, m);
- if (string != NULL)
- nulls[m] = false;
+ if (fieldno >= fldct)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for column \"%s\"",
+ NameStr(att->attname))));
+ string = field_strings[fieldno++];
- if (cstate->defaults[m])
+ if (cstate->convert_select_flags &&
+ !cstate->convert_select_flags[m])
+ {
+ /* ignore input field, leaving column as NULL */
+ continue;
+ }
+
+ if (cstate->opts.csv_mode)
+ {
+ if (string == NULL &&
+ cstate->opts.force_notnull_flags[m])
{
/*
- * The caller must supply econtext and have switched into the
- * per-tuple memory context in it.
+ * FORCE_NOT_NULL option is set and column is NULL -
+ * convert it to the NULL string.
*/
- Assert(econtext != NULL);
- Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
- values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+ string = cstate->opts.null_print;
+ }
+ else if (string != NULL && cstate->opts.force_null_flags[m]
+ && strcmp(string, cstate->opts.null_print) == 0)
+ {
+ /*
+ * FORCE_NULL option is set and column matches the NULL
+ * string. It must have been quoted, or otherwise the
+ * string would already have been set to NULL. Convert it
+ * to NULL as specified.
+ */
+ string = NULL;
}
- else
- values[m] = InputFunctionCall(&in_functions[m],
- string,
- typioparams[m],
- att->atttypmod);
-
- cstate->cur_attname = NULL;
- cstate->cur_attval = NULL;
}
- Assert(fieldno == attr_count);
- }
- else
- {
- /* binary */
- int16 fld_count;
- ListCell *cur;
+ cstate->cur_attname = NameStr(att->attname);
+ cstate->cur_attval = string;
- cstate->cur_lineno++;
+ if (string != NULL)
+ nulls[m] = false;
- if (!CopyGetInt16(cstate, &fld_count))
- {
- /* EOF detected (end of file, or protocol-level EOF) */
- return false;
- }
-
- if (fld_count == -1)
+ if (cstate->defaults[m])
{
/*
- * Received EOF marker. Wait for the protocol-level EOF, and
- * complain if it doesn't come immediately. In COPY FROM STDIN,
- * this ensures that we correctly handle CopyFail, if client
- * chooses to send that now. When copying from file, we could
- * ignore the rest of the file like in text mode, but we choose to
- * be consistent with the COPY FROM STDIN case.
+ * The caller must supply econtext and have switched into the
+ * per-tuple memory context in it.
*/
- char dummy;
+ Assert(econtext != NULL);
+ Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
- if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("received copy data after EOF marker")));
- return false;
+ values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
}
+ else
+ values[m] = InputFunctionCall(&in_functions[m],
+ string,
+ typioparams[m],
+ att->atttypmod);
- if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ cstate->cur_attname = NULL;
+ cstate->cur_attval = NULL;
+ }
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- int m = attnum - 1;
- Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
- cstate->cur_attname = NameStr(att->attname);
- values[m] = CopyReadBinaryAttribute(cstate,
- &in_functions[m],
- typioparams[m],
- att->atttypmod,
- &nulls[m]);
- cstate->cur_attname = NULL;
- }
+ Assert(fieldno == attr_count);
+ return true;
+}
+
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each column that is
+ * either not read from the file or is using the DEFAULT option of COPY FROM.
+ * It can be NULL when no default values are used, i.e. when all columns are
+ * read from the file, and DEFAULT option is unset.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ */
+bool
+NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls)
+{
+ TupleDesc tupDesc;
+ AttrNumber num_phys_attrs,
+ num_defaults = cstate->num_defaults;
+ int i;
+ int *defmap = cstate->defmap;
+ ExprState **defexprs = cstate->defexprs;
+
+ tupDesc = RelationGetDescr(cstate->rel);
+ num_phys_attrs = tupDesc->natts;
+
+ /* Initialize all values for row to NULL */
+ MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+ MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+ MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+
+ if (!cstate->opts.handler->from_next(cstate, econtext, values, nulls))
+ {
+ return false;
}
/*
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..f906a6cf7f 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,205 @@ static void CopySendEndOfRow(CopyToState cstate);
static void CopySendInt32(CopyToState cstate, int32 val);
static void CopySendInt16(CopyToState cstate, int16 val);
+/*
+ * CopyHandlerOps implementation of COPY TO for "text" and "csv".
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their behavior.
+ * We can split this implementation and stop referring cstate->opts.csv_mode
+ * later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:
+ /* Default line termination depends on platform */
+#ifndef WIN32
+ CopySendChar(cstate, '\n');
+#else
+ CopySendString(cstate, "\r\n");
+#endif
+ break;
+ case COPY_FRONTEND:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ CopySendChar(cstate, '\n');
+ break;
+ default:
+ break;
+ }
+ CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * For non-binary copy, we need to convert null_print to file
+ * encoding, because it will be sent directly with CopySendString.
+ */
+ if (cstate->need_transcoding)
+ cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+ cstate->opts.null_print_len,
+ cstate->file_encoding);
+
+ /* if a header has been requested send the line */
+ if (cstate->opts.header_line)
+ {
+ bool hdr_delim = false;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ char *colname;
+
+ if (hdr_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ hdr_delim = true;
+
+ colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, colname, false,
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, colname);
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+ }
+}
+
+void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (need_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ need_delim = true;
+
+ if (isnull)
+ CopySendString(cstate, cstate->opts.null_print_client);
+ else
+ {
+ char *string;
+
+ string = OutputFunctionCall(&out_functions[attnum - 1], value);
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->opts.force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, string);
+ }
+ }
+
+ CopyToFormatTextSendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyHandlerOps implementation for "binary" COPY TO.
+ */
+
+void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+ int num_phys_attrs;
+ ListCell *cur;
+
+ num_phys_attrs = tupDesc->natts;
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /* Generate header for a binary copy */
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ CopySendInt32(cstate, 0);
+ /* No header extension */
+ CopySendInt32(cstate, 0);
+}
+
+void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+ FmgrInfo *out_functions = cstate->out_functions;
+ ListCell *cur;
+
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (isnull)
+ CopySendInt32(cstate, -1);
+ else
+ {
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+}
/*
* Send copy start/stop messages for frontend copies. These have changed
@@ -198,16 +397,6 @@ CopySendEndOfRow(CopyToState cstate)
switch (cstate->copy_dest)
{
case COPY_FILE:
- if (!cstate->opts.binary)
- {
- /* Default line termination depends on platform */
-#ifndef WIN32
- CopySendChar(cstate, '\n');
-#else
- CopySendString(cstate, "\r\n");
-#endif
- }
-
if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
cstate->copy_file) != 1 ||
ferror(cstate->copy_file))
@@ -242,10 +431,6 @@ CopySendEndOfRow(CopyToState cstate)
}
break;
case COPY_FRONTEND:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->opts.binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
break;
@@ -748,8 +933,6 @@ DoCopyTo(CopyToState cstate)
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
- int num_phys_attrs;
- ListCell *cur;
uint64 processed;
if (fe_copy)
@@ -759,32 +942,11 @@ DoCopyTo(CopyToState cstate)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
- num_phys_attrs = tupDesc->natts;
cstate->opts.null_print_client = cstate->opts.null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
- /* Get info about the columns we need to process. */
- cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Oid out_func_oid;
- bool isvarlena;
- Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
- if (cstate->opts.binary)
- getTypeBinaryOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- else
- getTypeOutputInfo(attr->atttypid,
- &out_func_oid,
- &isvarlena);
- fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
- }
-
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
@@ -795,57 +957,7 @@ DoCopyTo(CopyToState cstate)
"COPY TO",
ALLOCSET_DEFAULT_SIZES);
- if (cstate->opts.binary)
- {
- /* Generate header for a binary copy */
- int32 tmp;
-
- /* Signature */
- CopySendData(cstate, BinarySignature, 11);
- /* Flags field */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- /* No header extension */
- tmp = 0;
- CopySendInt32(cstate, tmp);
- }
- else
- {
- /*
- * For non-binary copy, we need to convert null_print to file
- * encoding, because it will be sent directly with CopySendString.
- */
- if (cstate->need_transcoding)
- cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
- cstate->opts.null_print_len,
- cstate->file_encoding);
-
- /* if a header has been requested send the line */
- if (cstate->opts.header_line)
- {
- bool hdr_delim = false;
-
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- char *colname;
-
- if (hdr_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- hdr_delim = true;
-
- colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, colname, false,
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, colname);
- }
-
- CopySendEndOfRow(cstate);
- }
- }
+ cstate->opts.handler->to_start(cstate, tupDesc);
if (cstate->rel)
{
@@ -884,13 +996,7 @@ DoCopyTo(CopyToState cstate)
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
- if (cstate->opts.binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
+ cstate->opts.handler->to_end(cstate);
MemoryContextDelete(cstate->rowcontext);
@@ -906,71 +1012,15 @@ DoCopyTo(CopyToState cstate)
static void
CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
{
- bool need_delim = false;
- FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
- ListCell *cur;
- char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
- if (cstate->opts.binary)
- {
- /* Binary per-tuple header */
- CopySendInt16(cstate, list_length(cstate->attnumlist));
- }
-
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
- foreach(cur, cstate->attnumlist)
- {
- int attnum = lfirst_int(cur);
- Datum value = slot->tts_values[attnum - 1];
- bool isnull = slot->tts_isnull[attnum - 1];
-
- if (!cstate->opts.binary)
- {
- if (need_delim)
- CopySendChar(cstate, cstate->opts.delim[0]);
- need_delim = true;
- }
-
- if (isnull)
- {
- if (!cstate->opts.binary)
- CopySendString(cstate, cstate->opts.null_print_client);
- else
- CopySendInt32(cstate, -1);
- }
- else
- {
- if (!cstate->opts.binary)
- {
- string = OutputFunctionCall(&out_functions[attnum - 1],
- value);
- if (cstate->opts.csv_mode)
- CopyAttributeOutCSV(cstate, string,
- cstate->opts.force_quote_flags[attnum - 1],
- list_length(cstate->attnumlist) == 1);
- else
- CopyAttributeOutText(cstate, string);
- }
- else
- {
- bytea *outputbytes;
-
- outputbytes = SendFunctionCall(&out_functions[attnum - 1],
- value);
- CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
- CopySendData(cstate, VARDATA(outputbytes),
- VARSIZE(outputbytes) - VARHDRSZ);
- }
- }
- }
-
- CopySendEndOfRow(cstate);
+ cstate->opts.handler->to_one_row(cstate, slot);
MemoryContextSwitchTo(oldcontext);
}
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index ebbe9052cb..e64e121c01 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -50,6 +50,7 @@ node_headers = \
access/sdir.h \
access/tableam.h \
access/tsmapi.h \
+ commands/copy.h \
commands/event_trigger.h \
commands/trigger.h \
executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
index 72c7963578..237ac42742 100644
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
access/sdir.h
access/tableam.h
access/tsmapi.h
+ commands/copy.h
commands/event_trigger.h
commands/trigger.h
executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
access/sdir.h
access/tableam.h
access/tsmapi.h
+ commands/copy.h
commands/event_trigger.h
commands/trigger.h
executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index 3ba8cb192c..8f12927a4d 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -378,3 +378,4 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
PSEUDOTYPE_DUMMY_IO_FUNCS(anycompatible);
PSEUDOTYPE_DUMMY_IO_FUNCS(anycompatiblenonarray);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build
index dcb3c5f766..d9aa3f14ba 100644
--- a/src/include/catalog/meson.build
+++ b/src/include/catalog/meson.build
@@ -69,6 +69,7 @@ catalog_headers = [
'pg_publication_rel.h',
'pg_subscription.h',
'pg_subscription_rel.h',
+ 'pg_copy_handler.h',
]
# The .dat files we need can just be listed alphabetically.
@@ -97,6 +98,7 @@ bki_data = [
'pg_ts_parser.dat',
'pg_ts_template.dat',
'pg_type.dat',
+ 'pg_copy_handler.dat',
]
bki_data_f = files(bki_data)
diff --git a/src/include/catalog/pg_copy_handler.dat b/src/include/catalog/pg_copy_handler.dat
new file mode 100644
index 0000000000..052329cb53
--- /dev/null
+++ b/src/include/catalog/pg_copy_handler.dat
@@ -0,0 +1,25 @@
+#----------------------------------------------------------------------
+#
+# pg_copy_handler.dat
+# Initial contents of the pg_copy_handler system catalog.
+#
+# Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/include/catalog/pg_copy_handler.dat
+#
+#----------------------------------------------------------------------
+
+[
+
+{ oid => '4554', oid_symbol => 'TEXT_COPY_HANDLER_OID',
+ descr => 'text copy handler',
+ chname => 'text', copyhandler => 'text_copy_handler'},
+{ oid => '4555', oid_symbol => 'CSV_COPY_HANDLER_OID',
+ descr => 'csv copy handler',
+ chname => 'csv', copyhandler => 'csv_copy_handler'},
+{ oid => '4556', oid_symbol => 'BINARY_COPY_HANDLER_OID',
+ descr => 'binary copy handler',
+ chname => 'binary', copyhandler => 'binary_copy_handler'},
+
+]
diff --git a/src/include/catalog/pg_copy_handler.h b/src/include/catalog/pg_copy_handler.h
new file mode 100644
index 0000000000..74ad06f4d6
--- /dev/null
+++ b/src/include/catalog/pg_copy_handler.h
@@ -0,0 +1,50 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_copy_handler.h
+ * definition of the "copy handler" system catalog (pg_copy_handler)
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_copy_handler.h
+ *
+ * NOTES
+ * The Catalog.pm module reads this file and derives schema
+ * information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_COPY_HANDLER_H
+#define PG_COPY_HANDLER_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_copy_handler_d.h"
+
+/* ----------------
+ * pg_copy_handler definition. cpp turns this into
+ * typedef struct FormData_pg_copy_handler
+ * ----------------
+ */
+CATALOG(pg_copy_handler,4551,CopyHandlerRelationId)
+{
+ Oid oid; /* oid */
+
+ /* copy handler name */
+ NameData chname;
+
+ /* handler function */
+ regproc copyhandler BKI_LOOKUP(pg_proc);
+} FormData_pg_copy_handler;
+
+/* ----------------
+ * Form_pg_copy_handler corresponds to a pointer to a tuple with
+ * the format of pg_copy_handler relation.
+ * ----------------
+ */
+typedef FormData_pg_copy_handler *Form_pg_copy_handler;
+
+DECLARE_UNIQUE_INDEX(pg_copy_handler_name_index, 4552, CopyHandlerNameIndexId, pg_copy_handler, btree(chname name_ops));
+DECLARE_UNIQUE_INDEX_PKEY(pg_copy_handler_oid_index, 4553, CopyHandlerOidIndexId, pg_copy_handler, btree(oid oid_ops));
+
+#endif /* PG_COPY_HANDLER_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb58dee3bc..8b9b60c90f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -948,6 +948,20 @@
prorettype => 'void', proargtypes => 'regclass int8',
prosrc => 'brin_desummarize_range' },
+# Copy handlers
+{ oid => '4557', descr => 'text copy handler',
+ proname => 'text_copy_handler', provolatile => 'v',
+ prorettype => 'copy_handler', proargtypes => 'internal',
+ prosrc => 'text_copy_handler' },
+{ oid => '4558', descr => 'csv copy handler',
+ proname => 'csv_copy_handler', provolatile => 'v',
+ prorettype => 'copy_handler', proargtypes => 'internal',
+ prosrc => 'csv_copy_handler' },
+{ oid => '4559', descr => 'binary copy handler',
+ proname => 'binary_copy_handler', provolatile => 'v',
+ prorettype => 'copy_handler', proargtypes => 'internal',
+ prosrc => 'binary_copy_handler' },
+
{ oid => '338', descr => 'validate an operator class',
proname => 'amvalidate', provolatile => 'v', prorettype => 'bool',
proargtypes => 'oid', prosrc => 'amvalidate' },
@@ -7609,6 +7623,13 @@
{ oid => '268', descr => 'I/O',
proname => 'table_am_handler_out', prorettype => 'cstring',
proargtypes => 'table_am_handler', prosrc => 'table_am_handler_out' },
+{ oid => '388', descr => 'I/O',
+ proname => 'copy_handler_in', proisstrict => 'f',
+ prorettype => 'copy_handler', proargtypes => 'cstring',
+ prosrc => 'copy_handler_in' },
+{ oid => '389', descr => 'I/O',
+ proname => 'copy_handler_out', prorettype => 'cstring',
+ proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
{ oid => '5086', descr => 'I/O',
proname => 'anycompatible_in', prorettype => 'anycompatible',
proargtypes => 'cstring', prosrc => 'anycompatible_in' },
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index f6110a850d..aa6d731cb5 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -639,6 +639,13 @@
typcategory => 'P', typinput => 'table_am_handler_in',
typoutput => 'table_am_handler_out', typreceive => '-', typsend => '-',
typalign => 'i' },
+{ oid => '3814',
+ typname => 'copy_handler',
+ descr => 'pseudo-type for the result of a copy handler',
+ typlen => '4', typbyval => 't', typtype => 'p',
+ typcategory => 'P', typinput => 'copy_handler_in',
+ typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+ typalign => 'i' },
{ oid => '3831',
descr => 'pseudo-type representing a range over a polymorphic base type',
typname => 'anyrange', typlen => '-1', typbyval => 'f', typtype => 'p',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..c2af6f2aff 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,33 @@ typedef enum CopyHeaderChoice
COPY_HEADER_MATCH,
} CopyHeaderChoice;
+#define DEFAULT_COPY_HANDLER "text"
+
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyRoutine
+{
+ /* this must be set to T_CopyRoutine */
+ NodeTag type;
+
+ /* Called when COPY TO is started. This will send a header. */
+ void (*to_start) (CopyToState cstate, TupleDesc tupDesc);
+
+ /* Copy one row for COPY TO. */
+ void (*to_one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+ /* Called when COPY TO is ended. This will send a trailer. */
+ void (*to_end) (CopyToState cstate);
+
+ void (*from_start) (CopyFromState cstate, TupleDesc tupDesc);
+ bool (*from_next) (CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+ void (*from_error_callback) (CopyFromState cstate);
+} CopyRoutine;
+
/*
* A struct to hold COPY options, in a parsed form. All of these are related
* to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +90,9 @@ typedef struct CopyFormatOptions
bool *force_null_flags; /* per-column CSV FN flags */
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
+ CopyRoutine *handler; /* copy handler routines */
} CopyFormatOptions;
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
typedef void (*copy_data_dest_cb) (void *data, int len);
@@ -91,6 +115,9 @@ extern uint64 CopyFrom(CopyFromState cstate);
extern DestReceiver *CreateCopyDestReceiver(void);
+extern CopyRoutine *GetCopyRoutine(Oid copyhandler);
+extern CopyRoutine *GetCopyRoutineByName(char *fmt);
+
/*
* internal prototypes
*/
@@ -102,4 +129,20 @@ extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
List *attnamelist);
+extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatTextEnd(CopyToState cstate);
+extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+extern void CopyFromFormatTextErrorCallback(CopyFromState cstate);
+
+extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatBinaryEnd(CopyToState cstate);
+extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
+extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate);
+
#endif /* COPY_H */
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be..a1ab73dc37 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -266,3 +266,4 @@ NOTICE: checking pg_subscription {subdbid} => pg_database {oid}
NOTICE: checking pg_subscription {subowner} => pg_authid {oid}
NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid}
NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid}
+NOTICE: checking pg_copy_handler {copyhandler} => pg_proc {oid}
--
2.41.0
On Fri, Dec 08, 2023 at 10:32:27AM +0800, Junwang Zhao wrote:
I can see FDW related utility commands but no TABLESAMPLE related,
and there is a pg_foreign_data_wrapper system catalog which has
a *fdwhandler* field.
+ */ +CATALOG(pg_copy_handler,4551,CopyHandlerRelationId)
Using a catalog is an over-engineered design. Others have provided
hints about that upthread, but it would be enough to have one or two
handler types that are wrapped around one or two SQL *functions*, like
tablesamples. It seems like you've missed it, but feel free to read
about tablesample-method.sgml, that explains how this is achieved for
tablesamples.
If we want extensions to create a new copy handler, I think
something like pg_copy_hander should be necessary.
A catalog is not necessary, that's the point, because it can be
replaced by a scan of pg_proc with the function name defined in a COPY
query (be it through a FORMAT, or different option in a DefElem).
An example of extension with tablesamples is contrib/tsm_system_rows/,
that just uses a function returning a tsm_handler:
CREATE FUNCTION system_rows(internal)
RETURNS tsm_handler
AS 'MODULE_PATHNAME', 'tsm_system_rows_handler'
LANGUAGE C STRICT;
Then SELECT queries rely on the contents of the TABLESAMPLE clause to
find the set of callbacks it should use by calling the function.
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyRoutine
+{
FWIW, I find weird the concept of having one handler for both COPY
FROM and COPY TO as each one of them has callbacks that are mutually
exclusive to the other, but I'm OK if there is a consensus of only
one. So I'd suggest to use *two* NodeTags instead for a cleaner
split, meaning that we'd need two functions for each method. My point
is that a custom COPY handler could just define a COPY TO handler or a
COPY FROM handler, though it mostly comes down to a matter of taste
regarding how clean the error handling becomes if one tries to use a
set of callbacks with a COPY type (TO or FROM) not matching it.
--
Michael
On Fri, Dec 8, 2023 at 2:17 PM Michael Paquier <michael@paquier.xyz> wrote:
On Fri, Dec 08, 2023 at 10:32:27AM +0800, Junwang Zhao wrote:
I can see FDW related utility commands but no TABLESAMPLE related,
and there is a pg_foreign_data_wrapper system catalog which has
a *fdwhandler* field.+ */ +CATALOG(pg_copy_handler,4551,CopyHandlerRelationId)
Using a catalog is an over-engineered design. Others have provided
hints about that upthread, but it would be enough to have one or two
handler types that are wrapped around one or two SQL *functions*, like
tablesamples. It seems like you've missed it, but feel free to read
about tablesample-method.sgml, that explains how this is achieved for
tablesamples.
Agreed. My previous example of FDW was not a good one, I missed something.
If we want extensions to create a new copy handler, I think
something like pg_copy_hander should be necessary.A catalog is not necessary, that's the point, because it can be
replaced by a scan of pg_proc with the function name defined in a COPY
query (be it through a FORMAT, or different option in a DefElem).
An example of extension with tablesamples is contrib/tsm_system_rows/,
that just uses a function returning a tsm_handler:
CREATE FUNCTION system_rows(internal)
RETURNS tsm_handler
AS 'MODULE_PATHNAME', 'tsm_system_rows_handler'
LANGUAGE C STRICT;Then SELECT queries rely on the contents of the TABLESAMPLE clause to
find the set of callbacks it should use by calling the function.+/* Routines for a COPY HANDLER implementation. */ +typedef struct CopyRoutine +{FWIW, I find weird the concept of having one handler for both COPY
FROM and COPY TO as each one of them has callbacks that are mutually
exclusive to the other, but I'm OK if there is a consensus of only
one. So I'd suggest to use *two* NodeTags instead for a cleaner
split, meaning that we'd need two functions for each method. My point
is that a custom COPY handler could just define a COPY TO handler or a
COPY FROM handler, though it mostly comes down to a matter of taste
regarding how clean the error handling becomes if one tries to use a
set of callbacks with a COPY type (TO or FROM) not matching it.
I tend to agree to have separate two functions for each method. But
given we implement it in tablesample-way, I think we need to make it
clear how to call one of the two functions depending on COPY TO and
FROM.
IIUC in tablesamples cases, we scan pg_proc to find the handler
function like system_rows(internal) by the method name specified in
the query. On the other hand, in COPY cases, the queries would be
going to be like:
COPY tab TO stdout WITH (format = 'arrow');
and
COPY tab FROM stdin WITH (format = 'arrow');
So a custom COPY extension would not be able to define SQL functions
just like arrow(internal) for example. We might need to define a rule
like the function returning copy_in/out_handler must be defined as
<method name>_to(internal) and <method_name>_from(internal).
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
On Fri, Dec 08, 2023 at 03:42:06PM +0900, Masahiko Sawada wrote:
So a custom COPY extension would not be able to define SQL functions
just like arrow(internal) for example. We might need to define a rule
like the function returning copy_in/out_handler must be defined as
<method name>_to(internal) and <method_name>_from(internal).
Yeah, I was wondering if there was a trick to avoid the input internal
argument conflict, but cannot recall something elegant on the top of
my mind. Anyway, I'd be OK with any approach as long as it plays
nicely with the query integration, and that's FORMAT's DefElem with
its string value to do the function lookups.
--
Michael
Dear Junagn, Sutou-san,
Basically I agree your point - improving a extendibility is good.
(I remember that this theme was talked at Japan PostgreSQL conference)
Below are my comments for your patch.
01. General
Just to confirm - is it OK to partially implement APIs? E.g., only COPY TO is
available. Currently it seems not to consider a case which is not implemented.
02. General
It might be trivial, but could you please clarify how users can extend? Is it OK
to do below steps?
1. Create a handler function, via CREATE FUNCTION,
2. Register a handler, via new SQL (CREATE COPY HANDLER),
3. Specify the added handler as COPY ... FORMAT clause.
03. General
Could you please add document-related tasks to your TODO? I imagined like
fdwhandler.sgml.
04. General - copyright
For newly added files, the below copyright seems sufficient. See applyparallelworker.c.
```
* Copyright (c) 2023, PostgreSQL Global Development Group
```
05. src/include/catalog/* files
IIUC, 8000 or higher OIDs should be used while developing a patch. src/include/catalog/unused_oids
would suggest a candidate which you can use.
06. copy.c
I felt that we can create files per copying methods, like copy_{text|csv|binary}.c,
like indexes.
How do other think?
07. fmt_to_name()
I'm not sure the function is really needed. Can we follow like get_foreign_data_wrapper_oid()
and remove the funciton?
08. GetCopyRoutineByName()
Should we use syscache for searching a catalog?
09. CopyToFormatTextSendEndOfRow(), CopyToFormatBinaryStart()
Comments still refer CopyHandlerOps, whereas it was renamed.
10. copy.h
Per foreign.h and fdwapi.h, should we add a new header file and move some APIs?
11. copy.h
```
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
```
Are above changes really needed?
12. CopyFormatOptions
Can we remove `bool binary` in future?
13. external functions
```
+extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatTextEnd(CopyToState cstate);
+extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+
Datum *values, bool *nulls);
+extern void CopyFromFormatTextErrorCallback(CopyFromState cstate);
+
+extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatBinaryEnd(CopyToState cstate);
+extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatBinaryNext(CopyFromState cstate,
ExprContext *econtext,
+
Datum *values, bool *nulls);
+extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate);
```
FYI - If you add files for {text|csv|binary}, these declarations can be removed.
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
On Sat, Dec 9, 2023 at 10:43 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Junagn, Sutou-san,
Basically I agree your point - improving a extendibility is good.
(I remember that this theme was talked at Japan PostgreSQL conference)
Below are my comments for your patch.01. General
Just to confirm - is it OK to partially implement APIs? E.g., only COPY TO is
available. Currently it seems not to consider a case which is not implemented.
For partially implements, we can leave the hook as NULL, and check the NULL
at *ProcessCopyOptions* and report error if not supported.
02. General
It might be trivial, but could you please clarify how users can extend? Is it OK
to do below steps?1. Create a handler function, via CREATE FUNCTION,
2. Register a handler, via new SQL (CREATE COPY HANDLER),
3. Specify the added handler as COPY ... FORMAT clause.
My original thought was option 2, but as Michael point, option 1 is
the right way
to go.
03. General
Could you please add document-related tasks to your TODO? I imagined like
fdwhandler.sgml.04. General - copyright
For newly added files, the below copyright seems sufficient. See applyparallelworker.c.
```
* Copyright (c) 2023, PostgreSQL Global Development Group
```05. src/include/catalog/* files
IIUC, 8000 or higher OIDs should be used while developing a patch. src/include/catalog/unused_oids
would suggest a candidate which you can use.
Yeah, I will run renumber_oids.pl at last.
06. copy.c
I felt that we can create files per copying methods, like copy_{text|csv|binary}.c,
like indexes.
How do other think?
Not sure about this, it seems others have put a lot of effort into
splitting TO and From.
Also like to hear from others.
07. fmt_to_name()
I'm not sure the function is really needed. Can we follow like get_foreign_data_wrapper_oid()
and remove the funciton?
I have referenced some code from greenplum, will remove this.
08. GetCopyRoutineByName()
Should we use syscache for searching a catalog?
09. CopyToFormatTextSendEndOfRow(), CopyToFormatBinaryStart()
Comments still refer CopyHandlerOps, whereas it was renamed.
10. copy.h
Per foreign.h and fdwapi.h, should we add a new header file and move some APIs?
11. copy.h
```
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
```Are above changes really needed?
12. CopyFormatOptions
Can we remove `bool binary` in future?
13. external functions
``` +extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatTextEnd(CopyToState cstate); +extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatTextErrorCallback(CopyFromState cstate); + +extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatBinaryEnd(CopyToState cstate); +extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate); ```FYI - If you add files for {text|csv|binary}, these declarations can be removed.
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
Thanks for all the valuable suggestions.
--
Regards
Junwang Zhao
Hi Junwang
Please also see my presentation slides from last years PostgreSQL
Conference in Berlin (attached)
The main Idea is to make not just "format", but also "transport" and
"stream processing" extendable via virtual function tables.
Btw, will any of you here be in Prague next week ?
Would be a good opportunity to discuss this in person.
Best Regards
Hannu
Show quoted text
On Sat, Dec 9, 2023 at 9:39 AM Junwang Zhao <zhjwpku@gmail.com> wrote:
On Sat, Dec 9, 2023 at 10:43 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:Dear Junagn, Sutou-san,
Basically I agree your point - improving a extendibility is good.
(I remember that this theme was talked at Japan PostgreSQL conference)
Below are my comments for your patch.01. General
Just to confirm - is it OK to partially implement APIs? E.g., only COPY TO is
available. Currently it seems not to consider a case which is not implemented.For partially implements, we can leave the hook as NULL, and check the NULL
at *ProcessCopyOptions* and report error if not supported.02. General
It might be trivial, but could you please clarify how users can extend? Is it OK
to do below steps?1. Create a handler function, via CREATE FUNCTION,
2. Register a handler, via new SQL (CREATE COPY HANDLER),
3. Specify the added handler as COPY ... FORMAT clause.My original thought was option 2, but as Michael point, option 1 is
the right way
to go.03. General
Could you please add document-related tasks to your TODO? I imagined like
fdwhandler.sgml.04. General - copyright
For newly added files, the below copyright seems sufficient. See applyparallelworker.c.
```
* Copyright (c) 2023, PostgreSQL Global Development Group
```05. src/include/catalog/* files
IIUC, 8000 or higher OIDs should be used while developing a patch. src/include/catalog/unused_oids
would suggest a candidate which you can use.Yeah, I will run renumber_oids.pl at last.
06. copy.c
I felt that we can create files per copying methods, like copy_{text|csv|binary}.c,
like indexes.
How do other think?Not sure about this, it seems others have put a lot of effort into
splitting TO and From.
Also like to hear from others.07. fmt_to_name()
I'm not sure the function is really needed. Can we follow like get_foreign_data_wrapper_oid()
and remove the funciton?I have referenced some code from greenplum, will remove this.
08. GetCopyRoutineByName()
Should we use syscache for searching a catalog?
09. CopyToFormatTextSendEndOfRow(), CopyToFormatBinaryStart()
Comments still refer CopyHandlerOps, whereas it was renamed.
10. copy.h
Per foreign.h and fdwapi.h, should we add a new header file and move some APIs?
11. copy.h
```
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
```Are above changes really needed?
12. CopyFormatOptions
Can we remove `bool binary` in future?
13. external functions
``` +extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatTextEnd(CopyToState cstate); +extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatTextErrorCallback(CopyFromState cstate); + +extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatBinaryEnd(CopyToState cstate); +extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate); ```FYI - If you add files for {text|csv|binary}, these declarations can be removed.
Best Regards,
Hayato Kuroda
FUJITSU LIMITEDThanks for all the valuable suggestions.
--
Regards
Junwang Zhao
Attachments:
Cloud-friendly COPY.pdfapplication/pdf; name="Cloud-friendly COPY.pdf"Download
%PDF-1.4
% ����
3
0
obj
<<
/Type
/Catalog
/Names
<<
>>
/PageLabels
<<
/Nums
[
0
<<
/S
/D
/St
1
>>
]
>>
/Outlines
2
0
R
/Pages
1
0
R
>>
endobj
4
0
obj
<<
/Creator
(�� G o o g l e)
/Title
(�� C l o u d - f r i e n d l y C O P Y)
>>
endobj
5
0
obj
<<
/Type
/Page
/Parent
1
0
R
/MediaBox
[
0
0
720
405
]
/Contents
6
0
R
/Resources
7
0
R
/Annots
9
0
R
/Group
<<
/S
/Transparency
/CS
/DeviceRGB
>>
>>
endobj
6
0
obj
<<
/Filter
/FlateDecode
/Length
8
0
R
>>
stream
x��V�nG�{�������cD< ���G9�86X����#�l�5`�������wH.�x��������f���������c*�q�fBs���G��r������L�uD�5���}�O$7NGbB�&������PrL�nN^>����dK��:�(����u��q\Qx>&�*������DM�������P������:�;W�T�c7�o��*(G�d�/[g�>�v����7���k?�x����������u�YO��i�'�!]!���iq������-�A��[^� �'#r���W���1����t�6��V�+�V8�`�JM�gQ�{<�=6�I��T&c�0�[2DB:
���&YFS����47��x����B)�%Sz�� \[`�ZQe�sx��������/����9}2���3h�9=��6G����0^kWb������tsn�����b��s)�s4A�M,�'kG�~S����:�!^���FH�]2Y'Kl�c�#y�{K�������(��,��e�T����A`��Pc����m)���T'�bG|����A���ou�����.�7���^� ��$����2�F��l|����K2-�KV��*�DZpL>QtH:P�he���uIM�xE����U�����XUy\[�~����+������pW
H�4�n�Tg+��g�\
����;[b�S����v[{�S�Bz4Y]�N�F$�v<U|��*Q[��9��`@H��!,�����q����;�q���L��O-���iL��?"����V�`���+�+<W�dT��ePI��^s%��(RVEU3iK�E����W�q�����eP�@]�>X�A�*f�DM4k y����1��1!^�b�����mV�{{���bP�q�wd�����N���'��G��k
N
P �]���w)6�~���/��/��K����|����� �@@���^~@�~6�3����i���Y����R��w
endstream
endobj
8
0
obj
1015
endobj
9
0
obj
[
<<
/Type
/Annot
/Subtype
/Link
/Rect
[
465.6496
74.49528
597.4959
91.29527
]
/Border
[
0
0
0
]
/A
<<
/Type
/Action
/S
/URI
/URI
(mailto:hannuk@google.com)
>>
>>
]
endobj
10
0
obj
<<
/CA
1.0
/ca
1.0
>>
endobj
11
0
obj
<<
/CA
0
/ca
0
>>
endobj
13
0
obj
<<
/Subtype
/Image
/Interpolate
true
/Width
512
/Height
528
/ColorSpace
/DeviceRGB
/BitsPerComponent
8
/SMask
15
0
R
/Filter
/DCTDecode
/Length
16
0
R
>>
stream
���� JFIF �� C
%# , #&')*)-0-(0%()(�� C
(((((((((((((((((((((((((((((((((((((((((((((((((((�� "