From 5c1ec351edb97c42b1161ce9717819281024fddf Mon Sep 17 00:00:00 2001 From: Bilva Sanaba Date: Tue, 30 Jun 2020 16:08:07 -0700 Subject: [PATCH] Support COPY TO callback functions --- src/backend/commands/copy.c | 31 +++++++++++++++++-------------- src/include/commands/copy.h | 7 +++++++ 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3e199bdfd0..2322b6d558 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -133,6 +133,7 @@ typedef struct CopyStateData char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ copy_data_source_cb data_source_cb; /* function for reading data */ + copy_data_destination_cb data_destination_cb; /* function to write data */ bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ @@ -356,11 +357,6 @@ static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, List *options); static void EndCopy(CopyState cstate); static void ClosePipeToProgram(CopyState cstate); -static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, - Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); -static void EndCopyTo(CopyState cstate); -static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot); static bool CopyReadLine(CopyState cstate); @@ -586,7 +582,7 @@ CopySendEndOfRow(CopyState cstate) (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; case COPY_CALLBACK: - Assert(false); /* Not yet supported. */ + cstate->data_destination_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -1075,7 +1071,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, { cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, - stmt->attlist, stmt->options); + stmt->attlist, stmt->options, + NULL); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); } @@ -1817,7 +1814,7 @@ EndCopy(CopyState cstate) /* * Setup CopyState to read tuples from a table or a query for COPY TO. */ -static CopyState +CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, @@ -1825,10 +1822,11 @@ BeginCopyTo(ParseState *pstate, const char *filename, bool is_program, List *attnamelist, - List *options) + List *options, + copy_data_destination_cb data_destination_cb) { CopyState cstate; - bool pipe = (filename == NULL); + bool pipe = (filename == NULL) && (data_destination_cb == NULL); MemoryContext oldcontext; if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) @@ -1873,7 +1871,12 @@ BeginCopyTo(ParseState *pstate, options); oldcontext = MemoryContextSwitchTo(cstate->copycontext); - if (pipe) + if (data_destination_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_destination_cb = data_destination_cb; + } + else if (pipe) { Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) @@ -1953,10 +1956,10 @@ BeginCopyTo(ParseState *pstate, * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */ -static uint64 +uint64 DoCopyTo(CopyState cstate) { - bool pipe = (cstate->filename == NULL); + bool pipe = (cstate->filename == NULL) && (cstate->data_destination_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); uint64 processed; @@ -1988,7 +1991,7 @@ DoCopyTo(CopyState cstate) /* * Clean up storage and release resources for COPY TO. */ -static void +void EndCopyTo(CopyState cstate) { if (cstate->queryDesc != NULL) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833565..6617bacd87 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -22,6 +22,7 @@ /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_destination_cb) (void *data, int len); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -41,4 +42,10 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); +extern CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, + Oid queryRelId, const char *filename, bool is_program, + List *attnamelist, List *options, copy_data_destination_cb data_destination_cb); +extern uint64 DoCopyTo(CopyState cstate); +extern void EndCopyTo(CopyState cstate); + #endif /* COPY_H */ -- 2.22.0