Adding Support for Copy callback functionality on COPY TO api
Hi hackers,
Currently, the COPY TO api does not support callback functions, while the COPY FROM api does. The COPY TO code does, however, include placeholders for supporting callbacks in the future.
Rounding out the support of callback functions to both could be very beneficial for extension development. In particular, supporting callbacks for COPY TO will allow developers to utilize the preexisting command in order to create tools that give users more support for moving data for storage, backup, analytics, etc.
We are aiming to get the support in core PostgreSQL and add COPY TO callback support in the next commitfest. The attached patch contains a change to COPY TO api to support callbacks.
Best,
Bilva
Attachments:
0001-Support-COPY-TO-callback-functions.patchapplication/octet-stream; name=0001-Support-COPY-TO-callback-functions.patchDownload
From 5c1ec351edb97c42b1161ce9717819281024fddf Mon Sep 17 00:00:00 2001
From: Bilva Sanaba <bilvas@amazon.com>
Date: Tue, 30 Jun 2020 16:08:07 -0700
Subject: [PATCH] Support COPY TO callback functions
---
src/backend/commands/copy.c | 31 +++++++++++++++++--------------
src/include/commands/copy.h | 7 +++++++
2 files changed, 24 insertions(+), 14 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3e199bdfd0..2322b6d558 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -133,6 +133,7 @@ typedef struct CopyStateData
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
copy_data_source_cb data_source_cb; /* function for reading data */
+ copy_data_destination_cb data_destination_cb; /* function to write data */
bool binary; /* binary format? */
bool freeze; /* freeze rows on loading? */
bool csv_mode; /* Comma Separated Value format? */
@@ -356,11 +357,6 @@ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
List *options);
static void EndCopy(CopyState cstate);
static void ClosePipeToProgram(CopyState cstate);
-static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
- Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
-static void EndCopyTo(CopyState cstate);
-static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
static bool CopyReadLine(CopyState cstate);
@@ -586,7 +582,7 @@ CopySendEndOfRow(CopyState cstate)
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_CALLBACK:
- Assert(false); /* Not yet supported. */
+ cstate->data_destination_cb(fe_msgbuf->data, fe_msgbuf->len);
break;
}
@@ -1075,7 +1071,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
{
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ stmt->attlist, stmt->options,
+ NULL);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
@@ -1817,7 +1814,7 @@ EndCopy(CopyState cstate)
/*
* Setup CopyState to read tuples from a table or a query for COPY TO.
*/
-static CopyState
+CopyState
BeginCopyTo(ParseState *pstate,
Relation rel,
RawStmt *query,
@@ -1825,10 +1822,11 @@ BeginCopyTo(ParseState *pstate,
const char *filename,
bool is_program,
List *attnamelist,
- List *options)
+ List *options,
+ copy_data_destination_cb data_destination_cb)
{
CopyState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL) && (data_destination_cb == NULL);
MemoryContext oldcontext;
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
@@ -1873,7 +1871,12 @@ BeginCopyTo(ParseState *pstate,
options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
- if (pipe)
+ if (data_destination_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_destination_cb = data_destination_cb;
+ }
+ else if (pipe)
{
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
@@ -1953,10 +1956,10 @@ BeginCopyTo(ParseState *pstate,
* This intermediate routine exists mainly to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
-static uint64
+uint64
DoCopyTo(CopyState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->filename == NULL) && (cstate->data_destination_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
uint64 processed;
@@ -1988,7 +1991,7 @@ DoCopyTo(CopyState cstate)
/*
* Clean up storage and release resources for COPY TO.
*/
-static void
+void
EndCopyTo(CopyState cstate)
{
if (cstate->queryDesc != NULL)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..6617bacd87 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -22,6 +22,7 @@
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_destination_cb) (void *data, int len);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -41,4 +42,10 @@ extern uint64 CopyFrom(CopyState cstate);
extern DestReceiver *CreateCopyDestReceiver(void);
+extern CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+ Oid queryRelId, const char *filename, bool is_program,
+ List *attnamelist, List *options, copy_data_destination_cb data_destination_cb);
+extern uint64 DoCopyTo(CopyState cstate);
+extern void EndCopyTo(CopyState cstate);
+
#endif /* COPY_H */
--
2.22.0
Hi Bilva,
Thank you for registering this patch!
I had a few suggestions:
1. Please run pg_indent[1] on your code. Make sure you add
copy_data_destination_cb to src/tools/pgindent/typedefs.list. Please
run pg_indent on only the files you changed (it will take files as
command line args)
2. For features such as this, it is often helpful to find a use case
within backend/utility/extension code that demonstrate thes callback and
to include the code to exercise it with the patch. Refer how
copy_read_data() is used as copy_data_source_cb, to copy the data from
the query results from the WAL receiver (Refer: copy_table()). Finding
a similar use case in the source tree will make a stronger case
for this patch.
3. Wouldn't we want to return the number of bytes written from
copy_data_destination_cb? (Similar to copy_data_source_cb) We should
also think about how to represent failure. Looking at CopySendEndOfRow(),
we should error out like we do for the other copy_dests after checking the
return value for the callback invocation.
4.
bool pipe = (cstate->filename == NULL) && (cstate->data_destination_cb == NULL);
I think a similar change should also be applied to BeginCopyFrom() and
CopyFrom(). Or even better, such code could be refactored to have a
separate destination type COPY_PIPE. This of course, will be a separate
patch. I think the above line is okay for this patch.
Regards,
Soumyadeep
On Mon, Sep 14, 2020 at 04:28:12PM -0700, Soumyadeep Chakraborty wrote:
I think a similar change should also be applied to BeginCopyFrom() and
CopyFrom(). Or even better, such code could be refactored to have a
separate destination type COPY_PIPE. This of course, will be a separate
patch. I think the above line is okay for this patch.
This feedback has not been answered after two weeks, so I have marked
the patch as returned with feedback.
--
Michael
On 7/2/20 2:41 AM, Sanaba, Bilva wrote:
Hi hackers,
Currently, the COPY TO api does not support callback functions, while
the COPY FROM api does. The COPY TO code does, however, include
placeholders for supporting callbacks in the future.Rounding out the support of callback functions to both could be very
beneficial for extension development. In particular, supporting
callbacks for COPY TO will allow developers to utilize the preexisting
command in order to create tools that give users more support for moving
data for storage, backup, analytics, etc.We are aiming to get the support in core PostgreSQL and add COPY TO
callback support in the next commitfest.The attached patch contains a
change to COPY TO api to support callbacks.
Your code almost exactly the same as proposed in [1]/messages/by-id/3d0909dc-3691-a576-208a-90986e55489f@postgrespro.ru as part of 'Fast
COPY FROM' command. But it seems there are differences.
[1]: /messages/by-id/3d0909dc-3691-a576-208a-90986e55489f@postgrespro.ru
/messages/by-id/3d0909dc-3691-a576-208a-90986e55489f@postgrespro.ru
--
regards,
Andrey Lepikhov
Postgres Professional
On Wed, Sep 30, 2020 at 04:41:51PM +0900, Michael Paquier wrote:
This feedback has not been answered after two weeks, so I have marked
the patch as returned with feedback.
I've rebased this patch and will register it in the next commitfest
shortly.
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
Attachments:
v2-0001-Support-COPY-TO-callback-functions.patchtext/x-diff; charset=us-asciiDownload
From a1c0704a094f849a587c7332a547c89581f4f7e6 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v2 1/1] Support COPY TO callback functions.
---
src/backend/commands/copy.c | 2 +-
src/backend/commands/copyto.c | 18 +++++++++++++++---
src/include/commands/copy.h | 3 ++-
src/tools/pgindent/typedefs.list | 1 +
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..f714c5e22e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -304,7 +304,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ NULL, stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
{
COPY_FILE, /* to file (or a piped program) */
COPY_FRONTEND, /* to frontend */
+ COPY_CALLBACK, /* to callback function */
} CopyDest;
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
CopyFormatOptions opts;
Node *whereClause; /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+ break;
}
/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
CopyToState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL && data_dest_cb == NULL);
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
cstate->copy_dest = COPY_FILE; /* default */
- if (pipe)
+ if (data_dest_cb)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
+ else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
uint64
DoCopyTo(CopyToState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
int num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index cb0096aeb6..76b9d5c23f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
typedef struct CopyToStateData *CopyToState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
*/
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
+ copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
extern void EndCopyTo(CopyToState cstate);
extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 35c9f1efce..83e5270034 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3189,6 +3189,7 @@ compare_context
config_var_value
contain_aggs_of_level_context
convert_testexpr_context
+copy_data_dest_cb
copy_data_source_cb
core_YYSTYPE
core_yy_extra_type
--
2.25.1
On Tue, Aug 02, 2022 at 04:49:19PM -0700, Nathan Bossart wrote:
I've rebased this patch and will register it in the next commitfest
shortly.
Perhaps there should be a module in src/test/modules/ to provide a
short, still useful, example of what this could achieve?
--
Michael
On Fri, Oct 07, 2022 at 03:49:31PM +0900, Michael Paquier wrote:
Perhaps there should be a module in src/test/modules/ to provide a
short, still useful, example of what this could achieve?
Here is an attempt at adding such a test module.
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
Attachments:
v3-0001-Support-COPY-TO-callback-functions.patchtext/x-diff; charset=us-asciiDownload
From 4e60fce09b031258ace825bd540847ebffd7959d Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v3 1/1] Support COPY TO callback functions.
---
src/backend/commands/copy.c | 2 +-
src/backend/commands/copyto.c | 18 +++++--
src/include/commands/copy.h | 3 +-
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/test_copy_callbacks/.gitignore | 4 ++
src/test/modules/test_copy_callbacks/Makefile | 23 +++++++++
.../expected/test_copy_callbacks.out | 12 +++++
.../modules/test_copy_callbacks/meson.build | 34 +++++++++++++
.../sql/test_copy_callbacks.sql | 4 ++
.../test_copy_callbacks--1.0.sql | 8 +++
.../test_copy_callbacks/test_copy_callbacks.c | 50 +++++++++++++++++++
.../test_copy_callbacks.control | 4 ++
src/tools/pgindent/typedefs.list | 1 +
14 files changed, 160 insertions(+), 5 deletions(-)
create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
create mode 100644 src/test/modules/test_copy_callbacks/Makefile
create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
create mode 100644 src/test/modules/test_copy_callbacks/meson.build
create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ NULL, stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
{
COPY_FILE, /* to file (or a piped program) */
COPY_FRONTEND, /* to frontend */
+ COPY_CALLBACK, /* to callback function */
} CopyDest;
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
CopyFormatOptions opts;
Node *whereClause; /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+ break;
}
/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
CopyToState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL && data_dest_cb == NULL);
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
cstate->copy_dest = COPY_FILE; /* default */
- if (pipe)
+ if (data_dest_cb)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
+ else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
uint64
DoCopyTo(CopyToState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
int num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
typedef struct CopyToStateData *CopyToState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
*/
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
+ copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
extern void EndCopyTo(CopyToState cstate);
extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6c31c8707c..7b3f292965 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
snapshot_too_old \
spgist_name_ops \
test_bloomfilter \
+ test_copy_callbacks \
test_ddl_deparse \
test_extensions \
test_ginpostinglist \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index a80e6e2ce2..c2e5f5ffd5 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_bloomfilter')
+subdir('test_copy_callbacks')
subdir('test_ddl_deparse')
subdir('test_extensions')
subdir('test_ginpostinglist')
diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile
new file mode 100644
index 0000000000..6b0a0efc37
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_callbacks/Makefile
+
+MODULE_big = test_copy_callbacks
+OBJS = \
+ $(WIN32RES) \
+ test_copy_callbacks.o
+PGFILEDESC = "test_copy_callbacks - example use of COPY callbacks"
+
+EXTENSION = test_copy_callbacks
+DATA = test_copy_callbacks--1.0.sql
+
+REGRESS = test_copy_callbacks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_callbacks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
new file mode 100644
index 0000000000..b98e974e17
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
@@ -0,0 +1,12 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO TEST VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback();
+NOTICE: COPY TO callback called with data "1 2 3" and length 5
+NOTICE: COPY TO callback called with data "12 34 56" and length 8
+NOTICE: COPY TO callback called with data "123 456 789" and length 11
+ test_copy_to_callback
+-----------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build
new file mode 100644
index 0000000000..0f1ec47951
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/meson.build
@@ -0,0 +1,34 @@
+# FIXME: prevent install during main install, but not during test :/
+
+test_copy_callbacks_sources = files(
+ 'test_copy_callbacks.c',
+)
+
+if host_system == 'windows'
+ test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'test_copy_callbacks',
+ '--FILEDESC', 'test_copy_callbacks - example use of COPY callbacks',])
+endif
+
+test_copy_callbacks = shared_module('test_copy_callbacks',
+ test_copy_callbacks_sources,
+ kwargs: pg_mod_args,
+)
+testprep_targets += test_copy_callbacks
+
+install_data(
+ 'test_copy_callbacks.control',
+ 'test_copy_callbacks--1.0.sql',
+ kwargs: contrib_data_args,
+)
+
+tests += {
+ 'name': 'test_copy_callbacks',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'regress': {
+ 'sql': [
+ 'test_copy_callbacks',
+ ],
+ },
+}
diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
new file mode 100644
index 0000000000..8314f59ec2
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
@@ -0,0 +1,4 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO TEST VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback();
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
new file mode 100644
index 0000000000..c589b19920
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
+
+CREATE FUNCTION test_copy_to_callback()
+ RETURNS pg_catalog.void
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
new file mode 100644
index 0000000000..b704c96224
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
@@ -0,0 +1,50 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_callbacks.c
+ * Code for testing COPY callbacks.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "commands/copy.h"
+#include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+static void
+to_cb(void *data, int len)
+{
+ ereport(NOTICE,
+ (errmsg("COPY TO callback called with data \"%s\" and length %d",
+ (char *) data, len)));
+}
+
+PG_FUNCTION_INFO_V1(test_copy_to_callback);
+Datum
+test_copy_to_callback(PG_FUNCTION_ARGS)
+{
+ Relation rel = table_openrv(makeRangeVar("public", "test", -1),
+ AccessShareLock);
+ List *attlist = list_make3(makeString("a"), makeString("b"),
+ makeString("c"));
+ CopyToState cstate;
+
+ cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+ to_cb, attlist, NIL);
+ (void) DoCopyTo(cstate);
+ EndCopyTo(cstate);
+
+ table_close(rel, AccessShareLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
new file mode 100644
index 0000000000..b7ce3f12ff
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
@@ -0,0 +1,4 @@
+comment = 'Test code for COPY callbacks'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_callbacks'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97c9bc1861..d9b839c979 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3177,6 +3177,7 @@ compare_context
config_var_value
contain_aggs_of_level_context
convert_testexpr_context
+copy_data_dest_cb
copy_data_source_cb
core_YYSTYPE
core_yy_extra_type
--
2.25.1
On Fri, Oct 07, 2022 at 02:48:24PM -0700, Nathan Bossart wrote:
Here is an attempt at adding such a test module.
Using an ereport(NOTICE) to show the data reported in the callback is
fine by me. How about making the module a bit more modular, by
passing as argument a regclass and building a list of arguments with
it? You may want to hold the ShareAccessLock on the relation until
the end of the transaction in this example.
--
Michael
On Sat, Oct 08, 2022 at 02:11:38PM +0900, Michael Paquier wrote:
Using an ereport(NOTICE) to show the data reported in the callback is
fine by me. How about making the module a bit more modular, by
passing as argument a regclass and building a list of arguments with
it? You may want to hold the ShareAccessLock on the relation until
the end of the transaction in this example.
Yeah, that makes more sense. It actually simplifies things a bit, too.
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
Attachments:
v4-0001-Support-COPY-TO-callback-functions.patchtext/x-diff; charset=us-asciiDownload
From 0b45607988e28b405c7795de0c7f82f51d4662e5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v4 1/1] Support COPY TO callback functions.
---
src/backend/commands/copy.c | 2 +-
src/backend/commands/copyto.c | 18 +++++--
src/include/commands/copy.h | 3 +-
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/test_copy_callbacks/.gitignore | 4 ++
src/test/modules/test_copy_callbacks/Makefile | 23 +++++++++
.../expected/test_copy_callbacks.out | 12 +++++
.../modules/test_copy_callbacks/meson.build | 34 ++++++++++++++
.../sql/test_copy_callbacks.sql | 4 ++
.../test_copy_callbacks--1.0.sql | 8 ++++
.../test_copy_callbacks/test_copy_callbacks.c | 47 +++++++++++++++++++
.../test_copy_callbacks.control | 4 ++
src/tools/pgindent/typedefs.list | 1 +
14 files changed, 157 insertions(+), 5 deletions(-)
create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
create mode 100644 src/test/modules/test_copy_callbacks/Makefile
create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
create mode 100644 src/test/modules/test_copy_callbacks/meson.build
create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ NULL, stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
{
COPY_FILE, /* to file (or a piped program) */
COPY_FRONTEND, /* to frontend */
+ COPY_CALLBACK, /* to callback function */
} CopyDest;
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
CopyFormatOptions opts;
Node *whereClause; /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+ break;
}
/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
CopyToState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL && data_dest_cb == NULL);
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
cstate->copy_dest = COPY_FILE; /* default */
- if (pipe)
+ if (data_dest_cb)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
+ else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
uint64
DoCopyTo(CopyToState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
int num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
typedef struct CopyToStateData *CopyToState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
*/
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
+ copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
extern void EndCopyTo(CopyToState cstate);
extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6c31c8707c..7b3f292965 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
snapshot_too_old \
spgist_name_ops \
test_bloomfilter \
+ test_copy_callbacks \
test_ddl_deparse \
test_extensions \
test_ginpostinglist \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index a80e6e2ce2..c2e5f5ffd5 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_bloomfilter')
+subdir('test_copy_callbacks')
subdir('test_ddl_deparse')
subdir('test_extensions')
subdir('test_ginpostinglist')
diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile
new file mode 100644
index 0000000000..6b0a0efc37
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_callbacks/Makefile
+
+MODULE_big = test_copy_callbacks
+OBJS = \
+ $(WIN32RES) \
+ test_copy_callbacks.o
+PGFILEDESC = "test_copy_callbacks - example use of COPY callbacks"
+
+EXTENSION = test_copy_callbacks
+DATA = test_copy_callbacks--1.0.sql
+
+REGRESS = test_copy_callbacks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_callbacks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
new file mode 100644
index 0000000000..3c4c504ef8
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
@@ -0,0 +1,12 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE: COPY TO callback called with data "1 2 3" and length 5
+NOTICE: COPY TO callback called with data "12 34 56" and length 8
+NOTICE: COPY TO callback called with data "123 456 789" and length 11
+ test_copy_to_callback
+-----------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build
new file mode 100644
index 0000000000..0f1ec47951
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/meson.build
@@ -0,0 +1,34 @@
+# FIXME: prevent install during main install, but not during test :/
+
+test_copy_callbacks_sources = files(
+ 'test_copy_callbacks.c',
+)
+
+if host_system == 'windows'
+ test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'test_copy_callbacks',
+ '--FILEDESC', 'test_copy_callbacks - example use of COPY callbacks',])
+endif
+
+test_copy_callbacks = shared_module('test_copy_callbacks',
+ test_copy_callbacks_sources,
+ kwargs: pg_mod_args,
+)
+testprep_targets += test_copy_callbacks
+
+install_data(
+ 'test_copy_callbacks.control',
+ 'test_copy_callbacks--1.0.sql',
+ kwargs: contrib_data_args,
+)
+
+tests += {
+ 'name': 'test_copy_callbacks',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'regress': {
+ 'sql': [
+ 'test_copy_callbacks',
+ ],
+ },
+}
diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
new file mode 100644
index 0000000000..2deffba635
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
@@ -0,0 +1,4 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
new file mode 100644
index 0000000000..215cf3fad6
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
+
+CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
+ RETURNS pg_catalog.void
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
new file mode 100644
index 0000000000..a18f20a5be
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
@@ -0,0 +1,47 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_callbacks.c
+ * Code for testing COPY callbacks.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "commands/copy.h"
+#include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+static void
+to_cb(void *data, int len)
+{
+ ereport(NOTICE,
+ (errmsg("COPY TO callback called with data \"%s\" and length %d",
+ (char *) data, len)));
+}
+
+PG_FUNCTION_INFO_V1(test_copy_to_callback);
+Datum
+test_copy_to_callback(PG_FUNCTION_ARGS)
+{
+ Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock);
+ CopyToState cstate;
+
+ cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+ to_cb, NIL, NIL);
+ (void) DoCopyTo(cstate);
+ EndCopyTo(cstate);
+
+ table_close(rel, AccessShareLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
new file mode 100644
index 0000000000..b7ce3f12ff
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
@@ -0,0 +1,4 @@
+comment = 'Test code for COPY callbacks'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_callbacks'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97c9bc1861..d9b839c979 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3177,6 +3177,7 @@ compare_context
config_var_value
contain_aggs_of_level_context
convert_testexpr_context
+copy_data_dest_cb
copy_data_source_cb
core_YYSTYPE
core_yy_extra_type
--
2.25.1
On Sat, Oct 08, 2022 at 10:37:41AM -0700, Nathan Bossart wrote:
Yeah, that makes more sense. It actually simplifies things a bit, too.
Sorry for the noise. There was an extra #include in v4 that I've removed
in v5.
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
Attachments:
v5-0001-Support-COPY-TO-callback-functions.patchtext/x-diff; charset=us-asciiDownload
From 6e80d41135b8b21f9b06e09a7e85069acc8e57a8 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Tue, 2 Aug 2022 16:15:01 -0700
Subject: [PATCH v5 1/1] Support COPY TO callback functions.
---
src/backend/commands/copy.c | 2 +-
src/backend/commands/copyto.c | 18 ++++++--
src/include/commands/copy.h | 3 +-
src/test/modules/Makefile | 1 +
src/test/modules/meson.build | 1 +
.../modules/test_copy_callbacks/.gitignore | 4 ++
src/test/modules/test_copy_callbacks/Makefile | 23 ++++++++++
.../expected/test_copy_callbacks.out | 12 +++++
.../modules/test_copy_callbacks/meson.build | 34 ++++++++++++++
.../sql/test_copy_callbacks.sql | 4 ++
.../test_copy_callbacks--1.0.sql | 8 ++++
.../test_copy_callbacks/test_copy_callbacks.c | 46 +++++++++++++++++++
.../test_copy_callbacks.control | 4 ++
src/tools/pgindent/typedefs.list | 1 +
14 files changed, 156 insertions(+), 5 deletions(-)
create mode 100644 src/test/modules/test_copy_callbacks/.gitignore
create mode 100644 src/test/modules/test_copy_callbacks/Makefile
create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
create mode 100644 src/test/modules/test_copy_callbacks/meson.build
create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c
create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..db4c9dbc23 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
- stmt->attlist, stmt->options);
+ NULL, stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fca29a9a10..a7b8ec030d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -51,6 +51,7 @@ typedef enum CopyDest
{
COPY_FILE, /* to file (or a piped program) */
COPY_FRONTEND, /* to frontend */
+ COPY_CALLBACK, /* to callback function */
} CopyDest;
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDOUT */
bool is_program; /* is 'filename' a program to popen? */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
CopyFormatOptions opts;
Node *whereClause; /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
+ case COPY_CALLBACK:
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+ break;
}
/* Update the progress */
@@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
CopyToState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL && data_dest_cb == NULL);
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
@@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate,
cstate->copy_dest = COPY_FILE; /* default */
- if (pipe)
+ if (data_dest_cb)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
+ else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
@@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate)
uint64
DoCopyTo(CopyToState cstate)
{
- bool pipe = (cstate->filename == NULL);
+ bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
TupleDesc tupDesc;
int num_phys_attrs;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 3f6677b132..b77b935005 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
typedef struct CopyToStateData *CopyToState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
*/
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
+ copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
extern void EndCopyTo(CopyToState cstate);
extern uint64 DoCopyTo(CopyToState cstate);
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 6c31c8707c..7b3f292965 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
snapshot_too_old \
spgist_name_ops \
test_bloomfilter \
+ test_copy_callbacks \
test_ddl_deparse \
test_extensions \
test_ginpostinglist \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index a80e6e2ce2..c2e5f5ffd5 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
subdir('spgist_name_ops')
subdir('ssl_passphrase_callback')
subdir('test_bloomfilter')
+subdir('test_copy_callbacks')
subdir('test_ddl_deparse')
subdir('test_extensions')
subdir('test_ginpostinglist')
diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile
new file mode 100644
index 0000000000..6b0a0efc37
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_callbacks/Makefile
+
+MODULE_big = test_copy_callbacks
+OBJS = \
+ $(WIN32RES) \
+ test_copy_callbacks.o
+PGFILEDESC = "test_copy_callbacks - example use of COPY callbacks"
+
+EXTENSION = test_copy_callbacks
+DATA = test_copy_callbacks--1.0.sql
+
+REGRESS = test_copy_callbacks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_callbacks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
new file mode 100644
index 0000000000..3c4c504ef8
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
@@ -0,0 +1,12 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE: COPY TO callback called with data "1 2 3" and length 5
+NOTICE: COPY TO callback called with data "12 34 56" and length 8
+NOTICE: COPY TO callback called with data "123 456 789" and length 11
+ test_copy_to_callback
+-----------------------
+
+(1 row)
+
diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build
new file mode 100644
index 0000000000..0f1ec47951
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/meson.build
@@ -0,0 +1,34 @@
+# FIXME: prevent install during main install, but not during test :/
+
+test_copy_callbacks_sources = files(
+ 'test_copy_callbacks.c',
+)
+
+if host_system == 'windows'
+ test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'test_copy_callbacks',
+ '--FILEDESC', 'test_copy_callbacks - example use of COPY callbacks',])
+endif
+
+test_copy_callbacks = shared_module('test_copy_callbacks',
+ test_copy_callbacks_sources,
+ kwargs: pg_mod_args,
+)
+testprep_targets += test_copy_callbacks
+
+install_data(
+ 'test_copy_callbacks.control',
+ 'test_copy_callbacks--1.0.sql',
+ kwargs: contrib_data_args,
+)
+
+tests += {
+ 'name': 'test_copy_callbacks',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'regress': {
+ 'sql': [
+ 'test_copy_callbacks',
+ ],
+ },
+}
diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
new file mode 100644
index 0000000000..2deffba635
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
@@ -0,0 +1,4 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
new file mode 100644
index 0000000000..215cf3fad6
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
+
+CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
+ RETURNS pg_catalog.void
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
new file mode 100644
index 0000000000..54de3fc5ab
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
@@ -0,0 +1,46 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_callbacks.c
+ * Code for testing COPY callbacks.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "commands/copy.h"
+#include "fmgr.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+static void
+to_cb(void *data, int len)
+{
+ ereport(NOTICE,
+ (errmsg("COPY TO callback called with data \"%s\" and length %d",
+ (char *) data, len)));
+}
+
+PG_FUNCTION_INFO_V1(test_copy_to_callback);
+Datum
+test_copy_to_callback(PG_FUNCTION_ARGS)
+{
+ Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock);
+ CopyToState cstate;
+
+ cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+ to_cb, NIL, NIL);
+ (void) DoCopyTo(cstate);
+ EndCopyTo(cstate);
+
+ table_close(rel, AccessShareLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
new file mode 100644
index 0000000000..b7ce3f12ff
--- /dev/null
+++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
@@ -0,0 +1,4 @@
+comment = 'Test code for COPY callbacks'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_callbacks'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 97c9bc1861..d9b839c979 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3177,6 +3177,7 @@ compare_context
config_var_value
contain_aggs_of_level_context
convert_testexpr_context
+copy_data_dest_cb
copy_data_source_cb
core_YYSTYPE
core_yy_extra_type
--
2.25.1
On Sun, Oct 9, 2022 at 2:44 AM Nathan Bossart <nathandbossart@gmail.com> wrote:
Sorry for the noise. There was an extra #include in v4 that I've removed
in v5.
IIUC, COPY TO callback helps move a table's data out of postgres
server. Just wondering, how is it different from existing solutions
like COPY TO ... PROGRAM/FILE, logical replication, pg_dump etc. that
can move a table's data out? I understandb that the COPY FROM callback
was needed for logical replication 7c4f52409. Mentioning a concrete
use-case helps here.
I'm not quite sure if we need a separate module to just tell how to
use this new callback. I strongly feel that it's not necessary. It
unnecessarily creates extra code (actual code is 25 LOC with v1 patch
but 150 LOC with v5 patch) and can cause maintenance burden. These
callback APIs are simple enough to understand for those who know
BeginCopyTo() or BeginCopyFrom() and especially for those who know how
to write extensions. These are not APIs that an end-user uses. The
best would be to document both COPY FROM and COPY TO callbacks,
perhaps with a pseudo code specifying just the essence [1]+ Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock); + CopyToState cstate; + + cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL, + to_cb, NIL, NIL); + (void) DoCopyTo(cstate); + EndCopyTo(cstate); + + table_close(rel, AccessShareLock);, and their
possible usages somewhere here
https://www.postgresql.org/docs/devel/sql-copy.html.
The order of below NOTICE messages isn't guaranteed and it can change
depending on platforms. Previously, we've had to suppress such
messages in the test output 6adc5376d.
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE: COPY TO callback called with data "1 2 3" and length 5
+NOTICE: COPY TO callback called with data "12 34 56" and length 8
+NOTICE: COPY TO callback called with data "123 456 789" and length 11
+ test_copy_to_callback
[1]
+ Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock);
+ CopyToState cstate;
+
+ cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+ to_cb, NIL, NIL);
+ (void) DoCopyTo(cstate);
+ EndCopyTo(cstate);
+
+ table_close(rel, AccessShareLock);
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Mon, Oct 10, 2022 at 12:41:40PM +0530, Bharath Rupireddy wrote:
IIUC, COPY TO callback helps move a table's data out of postgres
server. Just wondering, how is it different from existing solutions
like COPY TO ... PROGRAM/FILE, logical replication, pg_dump etc. that
can move a table's data out? I understandb that the COPY FROM callback
was needed for logical replication 7c4f52409. Mentioning a concrete
use-case helps here.
This new callback allows the use of COPY TO's machinery in extensions. A
couple of generic use-cases are listed upthread [0]/messages/by-id/253C21D1-FCEB-41D9-A2AF-E6517015B7D7@amazon.com, and one concrete
use-case is the aws_s3 extension [1]https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/postgresql-s3-export.html#aws_s3.export_query_to_s3.
I'm not quite sure if we need a separate module to just tell how to
use this new callback. I strongly feel that it's not necessary. It
unnecessarily creates extra code (actual code is 25 LOC with v1 patch
but 150 LOC with v5 patch) and can cause maintenance burden. These
callback APIs are simple enough to understand for those who know
BeginCopyTo() or BeginCopyFrom() and especially for those who know how
to write extensions. These are not APIs that an end-user uses. The
best would be to document both COPY FROM and COPY TO callbacks,
perhaps with a pseudo code specifying just the essence [1], and their
possible usages somewhere here
https://www.postgresql.org/docs/devel/sql-copy.html.The order of below NOTICE messages isn't guaranteed and it can change
depending on platforms. Previously, we've had to suppress such
messages in the test output 6adc5376d.
I really doubt that this small test case is going to cause anything
approaching undue maintenance burden. I think it's important to ensure
this functionality continues to work as expected long into the future.
[0]: /messages/by-id/253C21D1-FCEB-41D9-A2AF-E6517015B7D7@amazon.com
[1]: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/postgresql-s3-export.html#aws_s3.export_query_to_s3
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
On Mon, Oct 10, 2022 at 09:38:59AM -0700, Nathan Bossart wrote:
This new callback allows the use of COPY TO's machinery in extensions. A
couple of generic use-cases are listed upthread [0], and one concrete
use-case is the aws_s3 extension [1].
FWIW, I understand that the proposal is to have an easier control of
how, what and where to the data is processed. COPY TO PROGRAM
provides that with exactly the same kind of interface (data input, its
length) once you have a program able to process the data piped out the
same way. However, it is in the shape of an external process that
receives the data through a pipe hence it provides a much wider attack
surface which is something that all cloud provider care about. The
thing is that this allows extension developers to avoid arbitrary
commands on the backend as the OS user running the Postgres instance,
while still being able to process the data the way they want
(auditing, analytics, whatever) within the strict context of the
process running an extension code. I'd say that this is a very cheap
change to allow people to have more fun with the backend engine
(similar to the recent changes with archive libraries for
archive_command, but much less complex):
src/backend/commands/copy.c | 2 +-
src/backend/commands/copyto.c | 18 +++++++++++++++---
2 files changed, 16 insertions(+), 4 deletions(-)
(Not to mention that we've had our share of CVEs regarding COPY
PROGRAM even if it is superuser-only).
I really doubt that this small test case is going to cause anything
approaching undue maintenance burden. I think it's important to ensure
this functionality continues to work as expected long into the future.
I like these toy modules, they provide test coverage while acting as a
template for new developers. I am wondering whether it should have
something for the copy from callback, actually, as it is named
"test_copy_callbacks" but I see no need to extend the module more than
necessary in the context of this thread (logical decoding uses it,
anyway).
--
Michael
On Tue, Oct 11, 2022 at 09:01:41AM +0900, Michael Paquier wrote:
I like these toy modules, they provide test coverage while acting as a
template for new developers. I am wondering whether it should have
something for the copy from callback, actually, as it is named
"test_copy_callbacks" but I see no need to extend the module more than
necessary in the context of this thread (logical decoding uses it,
anyway).
Yeah, I named it that way because I figured we might want a test for the
COPY FROM callback someday.
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
On Wed, Sep 30, 2020 at 01:48:12PM +0500, Andrey V. Lepikhov wrote:
Your code almost exactly the same as proposed in [1] as part of 'Fast COPY
FROM' command. But it seems there are differences.[1] /messages/by-id/3d0909dc-3691-a576-208a-90986e55489f@postgrespro.ru
I have been looking at what you have here while reviewing the contents
of this thread, and it seems to me that you should basically be able
to achieve the row-level control that your patch is doing with the
callback to do the per-row processing posted here. The main
difference, though, is that you want to have more control at the
beginning and the end of the COPY TO processing which explains the
split of DoCopyTo(). I am a bit surprised to see this much footprint
in the backend code once there are two FDW callbacks to control the
beginning and the end of the COPY TO, to be honest, sacrifying a lot
the existing symmetry between the COPY TO and COPY FROM code paths
where there is currently a strict control on the pre-row and post-row
processing like the per-row memory context.
--
Michael
On Mon, Oct 10, 2022 at 05:06:39PM -0700, Nathan Bossart wrote:
Yeah, I named it that way because I figured we might want a test for the
COPY FROM callback someday.
Okay. So, I have reviewed the whole thing, added a description of all
the fields of BeginCopyTo() in its top comment, tweaked a few things
and added in the module an extra NOTICE with the number of processed
rows. The result seemed fine, so applied.
--
Michael
On Tue, Oct 11, 2022 at 11:52:03AM +0900, Michael Paquier wrote:
Okay. So, I have reviewed the whole thing, added a description of all
the fields of BeginCopyTo() in its top comment, tweaked a few things
and added in the module an extra NOTICE with the number of processed
rows. The result seemed fine, so applied.
Thanks!
--
Nathan Bossart
Amazon Web Services: https://aws.amazon.com