Stream consistent snapshot via a logical decoding plugin as a series of INSERTs
Hello,
I'd like to propose generic functions (probably in an extension, or in core
if not possible otherwise) to facilitate streaming existing data from the
database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.
The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT command
of the replication protocol to get a consistent snapshot of the database,
then start listening to new changes on the slot.
One of the implementations of this approach is "Bottled Water":
https://github.com/confluentinc/bottledwater-pg
The way this initial export phase is implemented there is by providing a
SQL-callable set returning function which is using SPI to run "SELECT *
FROM mytable" behind the scenes and runs the resulting tuples through the
INSERT callback of the logical decoding plugin, which lives in the same
loadable module as this SQL function.
Bottled Water logical decoding plugin uses binary protocol based on Avro
data serialization library. As an experiment I was adding support for JSON
output format to it, and for that I had to re-implement the aforementioned
SRF to export initial data to convert tuples to JSON instead.
Now I'd like to compare performance impact of using JSON vs. Avro vs.
binary format of pglogical_output and for that a missing part is something
that would stream the existing data in pglogical's format. Instead of
writing one more implementation of the export function, this time for
pglogical_output, I'd rather use a generic function that accepts a relation
name, logical decoding plugin name and a set of startup options for the
plugin, then pretends that we're decoding a stream of INSERTs on a slot (no
actual slot is needed for that, but setting transaction snapshot beforehand
is something to be done by the client).
In SQL and C pseudo-code:
CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation(
relnamespace text,
relname text,
plugin_name text,
nochildren boolean DEFAULT FALSE,
VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF text
AS '...', 'pg_logical_stream_relation' LANGUAGE C VOLATILE;
CREATE FUNCTION /*pg_catalog.?*/ pg_logical_stream_relation_binary(
relnamespace text,
relname text,
plugin_name text,
nochildren boolean DEFAULT FALSE,
VARIADIC options text[] DEFAULT '{}'::text[]
) RETURNS SETOF bytea
AS '...', 'pg_logical_stream_relation_binary' LANGUAGE C VOLATILE;
-- usage:
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT 'XXXXXXXX-N';
SELECT *
FROM pg_logical_stream_relation('myschema', 'mytable', 'test_decoding',
nochildren := FALSE, ...)
Datum
pg_logical_stream_relation(PG_FUNCTION_ARGS)
{
if (SRF_IS_FIRSTCALL())
{
/* create decoding context */ /* starts the plugin up */
/* emit BEGIN */
}
/*
seq scan
=> emit series of INSERTs
*/
/* emit COMMIT */
/* free decoding context */ /* shuts the plugin down */
}
What do you say?
--
Alex
On 15 January 2016 at 08:30, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:
I'd like to propose generic functions (probably in an extension, or in
core if not possible otherwise) to facilitate streaming existing data from
the database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
command of the replication protocol to get a consistent snapshot of the
database, then start listening to new changes on the slot.
It sounds like this is already possible.
--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/>
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Fri, Jan 15, 2016 at 11:08 AM, Simon Riggs <simon@2ndquadrant.com> wrote:
On 15 January 2016 at 08:30, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:I'd like to propose generic functions (probably in an extension, or in
core if not possible otherwise) to facilitate streaming existing data from
the database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
command of the replication protocol to get a consistent snapshot of the
database, then start listening to new changes on the slot.It sounds like this is already possible.
Totally, that's how it was supposed to be used anyway. What is missing IMO
is retrieving the initial snapshot in the same format as that the later
changes will arrive.
--
Alex
On Fri, Jan 15, 2016 at 12:09 PM, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:
On Fri, Jan 15, 2016 at 11:08 AM, Simon Riggs <simon@2ndquadrant.com>
wrote:On 15 January 2016 at 08:30, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:I'd like to propose generic functions (probably in an extension, or in
core if not possible otherwise) to facilitate streaming existing data from
the database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
command of the replication protocol to get a consistent snapshot of the
database, then start listening to new changes on the slot.It sounds like this is already possible.
Totally, that's how it was supposed to be used anyway. What is missing
IMO is retrieving the initial snapshot in the same format as that the later
changes will arrive.
POC patch attached. Findings:
1) Needs an actual slot for all the decode machinery to work (code depends
on MyReplicationSlot being set).
2) Requires a core patch.
3) Currently only supports textual output, adding binary is trivial.
Acquiring a slot means this cannot be run in parallel from multiple
backends. Any ideas on how to overcome this (except for opening multiple
slots with the same LSN)?
To obtain a consistent snapshot, the client still needs to take care of
preserving and setting transaction snapshot properly.
--
Alex
Attachments:
0001-POC-pg_logical_slot_stream_relation.patchtext/x-patch; charset=US-ASCII; name=0001-POC-pg_logical_slot_stream_relation.patchDownload
From f800b8c387eb17f4eb005b38b78585f1f165b0d3 Mon Sep 17 00:00:00 2001
From: Oleksandr Shulgin <oleksandr.shulgin@zalando.de>
Date: Fri, 15 Jan 2016 17:30:04 +0100
Subject: [PATCH] POC: pg_logical_slot_stream_relation
---
src/backend/catalog/system_views.sql | 9 +
src/backend/replication/logical/logicalfuncs.c | 337 +++++++++++++++++++++---
src/backend/replication/logical/reorderbuffer.c | 6 +-
src/include/catalog/pg_proc.h | 2 +
src/include/replication/logicalfuncs.h | 1 +
src/include/replication/reorderbuffer.h | 3 +
6 files changed, 315 insertions(+), 43 deletions(-)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 923fe58..5431b61 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -941,6 +941,15 @@ LANGUAGE INTERNAL
VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes';
+CREATE OR REPLACE FUNCTION pg_logical_slot_stream_relation(
+ IN slot_name name, IN relnamespace name, IN relname name, IN nochildren bool DEFAULT FALSE,
+ VARIADIC options text[] DEFAULT '{}',
+ OUT data text)
+RETURNS SETOF TEXT
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000
+AS 'pg_logical_slot_stream_relation';
+
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
OUT slot_name name, OUT xlog_position pg_lsn)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 56e47e4..bc62784 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -21,12 +21,18 @@
#include "funcapi.h"
#include "miscadmin.h"
+#include "access/htup_details.h"
#include "access/xlog_internal.h"
+#include "executor/spi.h"
+
+#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
+#include "lib/stringinfo.h"
+
#include "mb/pg_wchar.h"
#include "utils/array.h"
@@ -40,6 +46,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
+#include "replication/reorderbuffer.h"
#include "storage/fd.h"
@@ -50,6 +57,11 @@ typedef struct DecodingOutputState
TupleDesc tupdesc;
bool binary_output;
int64 returned_rows;
+
+ /* for pg_logical_stream_relation */
+ Relation rel;
+ Portal cursor;
+ TupleTableSlot *tupslot;
} DecodingOutputState;
/*
@@ -270,6 +282,53 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
return count;
}
+static List *
+deconstruct_options_array(ArrayType *arr)
+{
+ Size ndim;
+ List *options = NIL;
+
+ ndim = ARR_NDIM(arr);
+ if (ndim > 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("array must be one-dimensional")));
+ }
+ else if (array_contains_nulls(arr))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("array must not contain nulls")));
+ }
+ else if (ndim == 1)
+ {
+ int nelems;
+ Datum *datum_opts;
+ int i;
+
+ Assert(ARR_ELEMTYPE(arr) == TEXTOID);
+
+ deconstruct_array(arr, TEXTOID, -1, false, 'i',
+ &datum_opts, NULL, &nelems);
+
+ if (nelems % 2 != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("array must have even number of elements")));
+
+ for (i = 0; i < nelems; i += 2)
+ {
+ char *name = TextDatumGetCString(datum_opts[i]);
+ char *opt = TextDatumGetCString(datum_opts[i + 1]);
+
+ options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
+ }
+ }
+
+ return options;
+}
+
/*
* Helper function for the various SQL callable logical decoding functions.
*/
@@ -287,7 +346,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
- Size ndim;
List *options = NIL;
DecodingOutputState *p;
@@ -339,44 +397,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
- /* Deconstruct options array */
- ndim = ARR_NDIM(arr);
- if (ndim > 1)
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("array must be one-dimensional")));
- }
- else if (array_contains_nulls(arr))
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("array must not contain nulls")));
- }
- else if (ndim == 1)
- {
- int nelems;
- Datum *datum_opts;
- int i;
-
- Assert(ARR_ELEMTYPE(arr) == TEXTOID);
-
- deconstruct_array(arr, TEXTOID, -1, false, 'i',
- &datum_opts, NULL, &nelems);
-
- if (nelems % 2 != 0)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("array must have even number of elements")));
-
- for (i = 0; i < nelems; i += 2)
- {
- char *name = TextDatumGetCString(datum_opts[i]);
- char *opt = TextDatumGetCString(datum_opts[i + 1]);
-
- options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
- }
- }
+ options = deconstruct_options_array(arr);
p->tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
@@ -515,3 +536,241 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}
+
+Datum
+pg_logical_slot_stream_relation(PG_FUNCTION_ARGS)
+{
+ Name name;
+ Name relnamespace;
+ Name relname;
+ bool nochildren;
+ ArrayType *arr;
+ List *options = NIL;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+ LogicalDecodingContext *ctx;
+ DecodingOutputState *p;
+ ReorderBufferTXN *txn;
+ ReorderBufferChange *change;
+
+ FuncCallContext *funcctx;
+ MemoryContext oldcontext;
+ const char *relident;
+ int ret;
+ SPIPlanPtr plan;
+ StringInfoData query;
+ Oid nspoid;
+
+ HeapTuple tuple;
+ bool isnull;
+ Datum result;
+
+ oldcontext = CurrentMemoryContext;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ if (PG_ARGISNULL(0))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("slot name must not be null")));
+ name = PG_GETARG_NAME(0);
+
+ if (PG_ARGISNULL(1))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("relnamespace cannot be null")));
+ relnamespace = PG_GETARG_NAME(1);
+
+ if (PG_ARGISNULL(2))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("relname cannot be null")));
+ relname = PG_GETARG_NAME(2);
+
+ if (PG_ARGISNULL(3))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("nochildren must not be null")));
+ nochildren = PG_GETARG_BOOL(3);
+
+ if (PG_ARGISNULL(4))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("options array must not be null")));
+ arr = PG_GETARG_ARRAYTYPE_P(4);
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ /*
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+ */
+
+ MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+ options = deconstruct_options_array(arr);
+
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* Things allocated in this memory context will live until SRF_RETURN_DONE(). */
+ MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ ReplicationSlotAcquire(NameStr(*name));
+
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ options,
+ NULL /*logical_read_local_xlog_page*/,
+ LogicalOutputPrepareWrite,
+ LogicalOutputWrite);
+ funcctx->user_fctx = ctx;
+
+ /*
+ * Check whether the output plugin writes textual output if that's
+ * what we need.
+ */
+ if (/*!binary &&*/ ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
+ NameStr(MyReplicationSlot->data.plugin),
+ format_procedure(fcinfo->flinfo->fn_oid))));
+
+ p = palloc0(sizeof(DecodingOutputState));
+ p->binary_output = false /*binary*/;
+
+ /* Build a tuple descriptor for our result type */
+ if (get_func_result_type(3784 /* pg_logical_slot_peek_changes */,
+ NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ p->tupstore = tuplestore_begin_heap(true, false, work_mem);
+
+ /* and we need a slot to fetch the tuples back */
+ p->tupslot = MakeSingleTupleTableSlot(p->tupdesc);
+
+ ctx->output_writer_private = p;
+
+ /* Build a simple "SELECT ... FROM [ONLY] schema.table" query. */
+ initStringInfo(&query);
+ appendStringInfoString(&query, "SELECT * FROM ");
+
+ /* Exclude data from children tables? */
+ if (nochildren)
+ appendStringInfoString(&query, " ONLY");
+
+ relident = quote_qualified_identifier(NameStr(*relnamespace), NameStr(*relname));
+ appendStringInfoString(&query, relident);
+
+ nspoid = get_namespace_oid(NameStr(*relnamespace), false);
+ p->rel = RelationIdGetRelation(get_relname_relid(NameStr(*relname), nspoid));
+
+ /* Initialize SPI (this switches to its own memory context). */
+ if ((ret = SPI_connect()) < 0)
+ elog(ERROR, "SPI_connect returned %d", ret);
+
+ plan = SPI_prepare_cursor(query.data, 0, NULL, CURSOR_OPT_NO_SCROLL);
+ if (!plan)
+ elog(ERROR, "SPI_prepare_cursor failed with error %d", SPI_result);
+
+ p->cursor = SPI_cursor_open(NULL, plan, NULL, NULL, true);
+
+ /* XXX: emit BEGIN? */
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+
+ MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+ ctx = (LogicalDecodingContext *) funcctx->user_fctx;
+ p = (DecodingOutputState *) ctx->output_writer_private;
+
+ /*
+ * A single call to logical decoding plugin callback might emit a number
+ * of writes, so if we have some stuff in tupstore from previous call we
+ * give it away now.
+ */
+ if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+ p->tupslot))
+ {
+ tuple = ExecCopySlotTuple(p->tupslot);
+ result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+
+ /* XXX: Assert(!isnull) ? */
+ if (isnull)
+ SRF_RETURN_NEXT_NULL(funcctx);
+ else
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+ tuplestore_clear(p->tupstore);
+
+ SPI_cursor_fetch(p->cursor, true, 1);
+ if (SPI_processed == 0)
+ {
+ /* We're done, release the slot and other resources. */
+ /* XXX: emit COMMIT? */
+ RelationClose(p->rel);
+ ExecDropSingleTupleTableSlot(p->tupslot);
+
+ SPI_cursor_close(p->cursor);
+ SPI_freetuptable(SPI_tuptable);
+ SPI_finish();
+
+ FreeDecodingContext(ctx);
+ ReplicationSlotRelease();
+
+ MemoryContextSwitchTo(oldcontext);
+
+ SRF_RETURN_DONE(funcctx);
+ }
+ if (SPI_processed != 1)
+ elog(ERROR, "expected exactly 1 row from cursor, but got %d rows", SPI_processed);
+
+ /* SPI_cursor_fetch() leaves us in the SPI memory context, switch back: */
+ /* XXX: do we need our own context here? */
+ MemoryContextSwitchTo(ctx->context);
+
+ /* emit INSERT */
+ txn = ReorderBufferGetTXN(ctx->reorder);
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ memset(change->data.tp.newtuple, 0, sizeof(ReorderBufferTupleBuf));
+ memcpy(&change->data.tp.newtuple->tuple, SPI_tuptable->vals[0], sizeof(HeapTupleData));
+
+ ctx->reorder->apply_change(ctx->reorder, txn, p->rel, change);
+
+ ReorderBufferReturnChange(ctx->reorder, change);
+ ReorderBufferReturnTXN(ctx->reorder, txn);
+
+ MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+ /* fetch a tuple from the store */
+ if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+ p->tupslot))
+ {
+ tuple = ExecCopySlotTuple(p->tupslot);
+ result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+ }
+
+ /* don't forget to clear the SPI temp context */
+ SPI_freetuptable(SPI_tuptable);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /* XXX: Assert(!isnull) ? */
+ if (isnull)
+ SRF_RETURN_NEXT_NULL(funcctx);
+ else
+ SRF_RETURN_NEXT(funcctx, result);
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 78acced..df1fa12 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,8 +165,6 @@ static const Size max_cached_transactions = 512;
* primary reorderbuffer support routines
* ---------------------------------------
*/
-static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
-static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
TransactionId xid, bool create, bool *is_new,
XLogRecPtr lsn, bool create_as_top);
@@ -288,7 +286,7 @@ ReorderBufferFree(ReorderBuffer *rb)
/*
* Get an unused, possibly preallocated, ReorderBufferTXN.
*/
-static ReorderBufferTXN *
+ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
@@ -322,7 +320,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
* Deallocation might be delayed for efficiency purposes, for details check
* the comments above max_cached_changes's definition.
*/
-static void
+void
ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/* clean the lookup cache if we were cached (quite likely) */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index f58672e..a861153 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5221,6 +5221,8 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
DESCR("peek at changes from replication slot");
DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3997 ( pg_logical_slot_stream_relation PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 5 0 25 "19 19 19 16 1009" "{19,19,19,16,1009,25}" "{i,i,i,i,v,o}" "{slot_name,relnamespace,relname,nochildren,options,data}" _null_ _null_ pg_logical_slot_stream_relation _null_ _null_ _null_ ));
+DESCR("stream relation as a series of changes using the replication slot plugin");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1df..df60bfe 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -20,5 +20,6 @@ extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_stream_relation(PG_FUNCTION_ARGS);
#endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2abee0a..e321e43 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -341,6 +341,9 @@ struct ReorderBuffer
ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *);
+ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
+void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
--
2.5.0
On Fri, Jan 15, 2016 at 5:31 PM, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:
POC patch attached. Findings:
1) Needs an actual slot for all the decode machinery to work (code depends
on MyReplicationSlot being set).
2) Requires a core patch.
3) Currently only supports textual output, adding binary is trivial.Acquiring a slot means this cannot be run in parallel from multiple
backends. Any ideas on how to overcome this (except for opening multiple
slots with the same LSN)?
To obtain a consistent snapshot, the client still needs to take care of
preserving and setting transaction snapshot properly.
Testing revealed a number of problems with memory handling in this code, a
corrected v2 is attached.
Completely another problem is proper handling of SPI stack and releasing
the replication slot. The latter I'd like to avoid dealing with, because
at the moment it's not possible to stream a number of relations in parallel
using this POC function, so I'd rather move in a direction of not acquiring
the replication slot at all.
The SPI problem manifests itself if I place a LIMIT on top of the query:
# SELECT pg_logical_slot_stream_relation('slot1', 'pg_catalog', 'pg_class')
LIMIT 5;
WARNING: relcache reference leak: relation "pg_class" not closed
WARNING: transaction left non-empty SPI stack
HINT: Check for missing "SPI_finish" calls.
I wonder if there is a way to install some sort of cleanup handler that
will be called by executor?
--
Alex
Attachments:
pg_logical_slot_stream_relation-v2.patchtext/x-patch; charset=US-ASCII; name=pg_logical_slot_stream_relation-v2.patchDownload
From 83c2c754066f43111d0f21ff088cc5503e910aab Mon Sep 17 00:00:00 2001
From: Oleksandr Shulgin <oleksandr.shulgin@zalando.de>
Date: Fri, 15 Jan 2016 17:30:04 +0100
Subject: [PATCH] POC: pg_logical_slot_stream_relation
---
src/backend/catalog/system_views.sql | 9 +
src/backend/replication/logical/logicalfuncs.c | 355 +++++++++++++++++++++---
src/backend/replication/logical/reorderbuffer.c | 6 +-
src/include/catalog/pg_proc.h | 2 +
src/include/replication/logicalfuncs.h | 1 +
src/include/replication/reorderbuffer.h | 3 +
6 files changed, 333 insertions(+), 43 deletions(-)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 923fe58..5431b61 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -941,6 +941,15 @@ LANGUAGE INTERNAL
VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes';
+CREATE OR REPLACE FUNCTION pg_logical_slot_stream_relation(
+ IN slot_name name, IN relnamespace name, IN relname name, IN nochildren bool DEFAULT FALSE,
+ VARIADIC options text[] DEFAULT '{}',
+ OUT data text)
+RETURNS SETOF TEXT
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000
+AS 'pg_logical_slot_stream_relation';
+
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
OUT slot_name name, OUT xlog_position pg_lsn)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 562c8f6..c1605de 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -21,12 +21,18 @@
#include "funcapi.h"
#include "miscadmin.h"
+#include "access/htup_details.h"
#include "access/xlog_internal.h"
+#include "executor/spi.h"
+
+#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
+#include "lib/stringinfo.h"
+
#include "mb/pg_wchar.h"
#include "utils/array.h"
@@ -40,6 +46,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
+#include "replication/reorderbuffer.h"
#include "storage/fd.h"
@@ -50,6 +57,12 @@ typedef struct DecodingOutputState
TupleDesc tupdesc;
bool binary_output;
int64 returned_rows;
+
+ /* for pg_logical_stream_relation */
+ MemoryContext context;
+ Relation rel;
+ Portal cursor;
+ TupleTableSlot *tupslot;
} DecodingOutputState;
/*
@@ -270,6 +283,53 @@ logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
return count;
}
+static List *
+deconstruct_options_array(ArrayType *arr)
+{
+ Size ndim;
+ List *options = NIL;
+
+ ndim = ARR_NDIM(arr);
+ if (ndim > 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("array must be one-dimensional")));
+ }
+ else if (array_contains_nulls(arr))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("array must not contain nulls")));
+ }
+ else if (ndim == 1)
+ {
+ int nelems;
+ Datum *datum_opts;
+ int i;
+
+ Assert(ARR_ELEMTYPE(arr) == TEXTOID);
+
+ deconstruct_array(arr, TEXTOID, -1, false, 'i',
+ &datum_opts, NULL, &nelems);
+
+ if (nelems % 2 != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("array must have even number of elements")));
+
+ for (i = 0; i < nelems; i += 2)
+ {
+ char *name = TextDatumGetCString(datum_opts[i]);
+ char *opt = TextDatumGetCString(datum_opts[i + 1]);
+
+ options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
+ }
+ }
+
+ return options;
+}
+
/*
* Helper function for the various SQL callable logical decoding functions.
*/
@@ -287,7 +347,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
- Size ndim;
List *options = NIL;
DecodingOutputState *p;
@@ -339,44 +398,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
- /* Deconstruct options array */
- ndim = ARR_NDIM(arr);
- if (ndim > 1)
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("array must be one-dimensional")));
- }
- else if (array_contains_nulls(arr))
- {
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("array must not contain nulls")));
- }
- else if (ndim == 1)
- {
- int nelems;
- Datum *datum_opts;
- int i;
-
- Assert(ARR_ELEMTYPE(arr) == TEXTOID);
-
- deconstruct_array(arr, TEXTOID, -1, false, 'i',
- &datum_opts, NULL, &nelems);
-
- if (nelems % 2 != 0)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("array must have even number of elements")));
-
- for (i = 0; i < nelems; i += 2)
- {
- char *name = TextDatumGetCString(datum_opts[i]);
- char *opt = TextDatumGetCString(datum_opts[i + 1]);
-
- options = lappend(options, makeDefElem(name, (Node *) makeString(opt)));
- }
- }
+ options = deconstruct_options_array(arr);
p->tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
@@ -515,3 +537,258 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}
+
+Datum
+pg_logical_slot_stream_relation(PG_FUNCTION_ARGS)
+{
+ Name name;
+ Name relnamespace;
+ Name relname;
+ bool nochildren;
+ ArrayType *arr;
+ List *options = NIL;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+ LogicalDecodingContext *ctx;
+ DecodingOutputState *p;
+ ReorderBufferTXN *txn;
+ ReorderBufferChange *change;
+
+ FuncCallContext *funcctx;
+ MemoryContext oldcontext;
+ const char *relident;
+ int ret;
+ SPIPlanPtr plan;
+ StringInfoData query;
+ Oid nspoid;
+
+ HeapTuple tuple;
+ bool isnull;
+ Datum result;
+
+ oldcontext = CurrentMemoryContext;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ if (PG_ARGISNULL(0))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("slot name must not be null")));
+ name = PG_GETARG_NAME(0);
+
+ if (PG_ARGISNULL(1))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("relnamespace cannot be null")));
+ relnamespace = PG_GETARG_NAME(1);
+
+ if (PG_ARGISNULL(2))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("relname cannot be null")));
+ relname = PG_GETARG_NAME(2);
+
+ if (PG_ARGISNULL(3))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("nochildren must not be null")));
+ nochildren = PG_GETARG_BOOL(3);
+
+ if (PG_ARGISNULL(4))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("options array must not be null")));
+ arr = PG_GETARG_ARRAYTYPE_P(4);
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ /*
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+ */
+
+ MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+
+ options = deconstruct_options_array(arr);
+
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* Things allocated in this memory context will live until SRF_RETURN_DONE(). */
+ MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ ReplicationSlotAcquire(NameStr(*name));
+
+ ctx = CreateDecodingContext(InvalidXLogRecPtr,
+ options,
+ NULL /*logical_read_local_xlog_page*/,
+ LogicalOutputPrepareWrite,
+ LogicalOutputWrite);
+ funcctx->user_fctx = ctx;
+
+ /*
+ * Check whether the output plugin writes textual output if that's
+ * what we need.
+ */
+ if (/*!binary &&*/ ctx->options.output_type != OUTPUT_PLUGIN_TEXTUAL_OUTPUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
+ NameStr(MyReplicationSlot->data.plugin),
+ format_procedure(fcinfo->flinfo->fn_oid))));
+
+ p = palloc0(sizeof(DecodingOutputState));
+ ctx->output_writer_private = p;
+
+ p->binary_output = false /*binary*/;
+
+ /* Build a simple "SELECT ... FROM [ONLY] schema.table" query. */
+ initStringInfo(&query);
+ appendStringInfoString(&query, "SELECT * FROM ");
+
+ /* Exclude data from children tables? */
+ if (nochildren)
+ appendStringInfoString(&query, " ONLY");
+
+ relident = quote_qualified_identifier(NameStr(*relnamespace), NameStr(*relname));
+ appendStringInfoString(&query, relident);
+
+ nspoid = get_namespace_oid(NameStr(*relnamespace), false);
+ p->rel = RelationIdGetRelation(get_relname_relid(NameStr(*relname), nspoid));
+
+ /* Build a tuple descriptor for our result type. */
+ if (get_func_result_type(3784 /* XXX: piggyback on pg_logical_slot_peek_changes */,
+ NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ p->context = AllocSetContextCreate(CurrentMemoryContext,
+ "pg_logical_slot_stream_relation context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(p->context);
+
+ /* Make the tuple store. */
+ p->tupstore = tuplestore_begin_heap(true, false, work_mem);
+
+ /* And we need a slot to fetch the tuples back. */
+ p->tupslot = MakeSingleTupleTableSlot(p->tupdesc);
+
+ /* Initialize SPI (this switches to its own memory context). */
+ if ((ret = SPI_connect()) < 0)
+ elog(ERROR, "SPI_connect returned %d", ret);
+
+ plan = SPI_prepare_cursor(query.data, 0, NULL, CURSOR_OPT_NO_SCROLL);
+ if (!plan)
+ elog(ERROR, "SPI_prepare_cursor failed with error %d", SPI_result);
+
+ p->cursor = SPI_cursor_open(NULL, plan, NULL, NULL, true);
+
+ /* XXX: emit BEGIN? */
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+
+ ctx = (LogicalDecodingContext *) funcctx->user_fctx;
+ p = (DecodingOutputState *) ctx->output_writer_private;
+
+ MemoryContextSwitchTo(p->context);
+
+ /*
+ * A single call to logical decoding plugin callback might emit a number
+ * of writes, so if we have some stuff in tupstore from previous call we
+ * give it away now.
+ */
+ if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+ p->tupslot))
+ {
+ tuple = ExecCopySlotTuple(p->tupslot);
+ result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /* XXX: Assert(!isnull) ? */
+ if (isnull)
+ SRF_RETURN_NEXT_NULL(funcctx);
+ else
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
+ /* Fetch next tuple from the seq scan. */
+ SPI_cursor_fetch(p->cursor, true, 1);
+ if (SPI_processed == 0)
+ {
+ /* We're done, release the slot and other resources. */
+ /* XXX: emit COMMIT? */
+ RelationClose(p->rel);
+ ExecDropSingleTupleTableSlot(p->tupslot);
+
+ SPI_cursor_close(p->cursor);
+ SPI_freetuptable(SPI_tuptable);
+ SPI_finish();
+
+ FreeDecodingContext(ctx);
+ ReplicationSlotRelease();
+
+ MemoryContextSwitchTo(oldcontext);
+
+ SRF_RETURN_DONE(funcctx);
+ }
+ if (SPI_processed != 1)
+ elog(ERROR, "expected exactly 1 row from cursor, but got %d rows", SPI_processed);
+
+ /* SPI_cursor_fetch() leaves us in the SPI memory context, switch back. */
+ MemoryContextSwitchTo(p->context);
+
+ /* We're done with the last time results: reset the context, tuplestore and slot. */
+ MemoryContextReset(p->context);
+ p->tupstore = tuplestore_begin_heap(true, false, work_mem);
+ p->tupslot = MakeSingleTupleTableSlot(p->tupdesc);
+
+ /* emit INSERT */
+ txn = ReorderBufferGetTXN(ctx->reorder);
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ memset(change->data.tp.newtuple, 0, sizeof(ReorderBufferTupleBuf));
+ memcpy(&change->data.tp.newtuple->tuple, SPI_tuptable->vals[0], sizeof(HeapTupleData));
+
+ ctx->reorder->apply_change(ctx->reorder, txn, p->rel, change);
+
+ ReorderBufferReturnChange(ctx->reorder, change);
+ ReorderBufferReturnTXN(ctx->reorder, txn);
+
+ /* fetch a tuple from the store */
+ if (tuplestore_gettupleslot(p->tupstore, /* forward = */ true, /* copy = */ false,
+ p->tupslot))
+ {
+ tuple = ExecCopySlotTuple(p->tupslot);
+ result = fastgetattr(tuple, 3, p->tupdesc, &isnull);
+ }
+ else
+ {
+ isnull = true;
+ result = (Datum) 0;
+ }
+
+ /* don't forget to clear the SPI temp context */
+ SPI_freetuptable(SPI_tuptable);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /* XXX: Assert(!isnull) ? */
+ if (isnull)
+ SRF_RETURN_NEXT_NULL(funcctx);
+ else
+ SRF_RETURN_NEXT(funcctx, result);
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7402f20..2a9c2e0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,8 +165,6 @@ static const Size max_cached_transactions = 512;
* primary reorderbuffer support routines
* ---------------------------------------
*/
-static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
-static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
TransactionId xid, bool create, bool *is_new,
XLogRecPtr lsn, bool create_as_top);
@@ -288,7 +286,7 @@ ReorderBufferFree(ReorderBuffer *rb)
/*
* Get an unused, possibly preallocated, ReorderBufferTXN.
*/
-static ReorderBufferTXN *
+ReorderBufferTXN *
ReorderBufferGetTXN(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
@@ -322,7 +320,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
* Deallocation might be delayed for efficiency purposes, for details check
* the comments above max_cached_changes's definition.
*/
-static void
+void
ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/* clean the lookup cache if we were cached (quite likely) */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 3df5ac5..23c0b5e 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5064,6 +5064,8 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
DESCR("peek at changes from replication slot");
DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3997 ( pg_logical_slot_stream_relation PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 5 0 25 "19 19 19 16 1009" "{19,19,19,16,1009,25}" "{i,i,i,i,v,o}" "{slot_name,relnamespace,relname,nochildren,options,data}" _null_ _null_ pg_logical_slot_stream_relation _null_ _null_ _null_ ));
+DESCR("stream relation as a series of changes using the replication slot plugin");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1df..df60bfe 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -20,5 +20,6 @@ extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_slot_stream_relation(PG_FUNCTION_ARGS);
#endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2abee0a..e321e43 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -341,6 +341,9 @@ struct ReorderBuffer
ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *);
+ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb);
+void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
--
2.5.0
On 15 January 2016 at 16:30, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:
I'd like to propose generic functions (probably in an extension, or in
core if not possible otherwise) to facilitate streaming existing data from
the database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.
So effectively produce synthetic logical decoding callbacks to run a bunch
of fake INSERTs, presumably with a fake xid etc?
The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
command of the replication protocol to get a consistent snapshot of the
database, then start listening to new changes on the slot.
My impression is that you want to avoid the current step of "synchronize
database initial contents" when using logical decoding for replication. But
I guess you're looking to then populate that empty schema in-band via
logical decoding, rather than having to do a --data-only dump or use COPY.
Right?
That won't help you for schema; presumably you'd still do a pg_dump
--schema-only | pg_restore for that.
Just like when restoring a --data-only dump or using COPY you'd have to
disable FKs during sync, but that's pretty much unavoidable.
The way this initial export phase is implemented there is by providing a
SQL-callable set returning function which is using SPI to run "SELECT *
FROM mytable" behind the scenes and runs the resulting tuples through the
INSERT callback of the logical decoding plugin, which lives in the same
loadable module as this SQL function.
o_O
What about the reorder buffer, the logical decoding memory context, etc?
Bottled Water logical decoding plugin uses binary protocol based on Avro
data serialization library. As an experiment I was adding support for JSON
output format to it, and for that I had to re-implement the aforementioned
SRF to export initial data to convert tuples to JSON instead.
Have you taken a look at what's been done with pglogical and
pglogical_output?
We've got extensible protocol support there, and if Avro offers compelling
benefits over the current binary serialization I'm certainly interested in
hearing about it.
What do you say?
Interesting idea. As outlined I think it sounds pretty fragile though; I
really, really don't like the idea of lying to the insert callback by
passing it a fake insert with (presumably) fake reorder buffer txn, etc.
What we've done in pglogical is take a --schema-only dump then, on the
downstream, attach to the exported snapshot and use COPY ... TO STDOUT over
a libpq connection to the upstream feed that to COPY ... FROM STDIN on
another libpq connection to "ourselves", i.e. the downstream. Unless Petr
changed it to use COPY innards directly on the downstream; I know he talked
about it but haven't checked if he did. Anyway, either way it's not pretty
and requires a sideband non-replication connection to sync initial state.
The upside is that it can be relatively easily parallelized for faster sync
using multiple connections.
To what extent are you setting up a true logical decoding context here?
Where does the xact info come from? The commit record? etc. You're
presumably not forming a reorder buffer then decoding it since it could
create a massive tempfile on disk, so are you just dummying this info up?
Or hoping the plugin won't look at it?
The functionality is good and I think that for the SQL level you'd have to
use SET TRANSACTION SNAPSHOT as you show. But I think it should really be
usable from the replication protocol too - and should try to keep the state
as close to that of a normal decoding session as possible. We'd at least
need a new walsender protocol command equivalent that took the snapshot
identifier, relation info and the other decoding params instead of a slot
name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that
omits SLOT and instead takes TABLES as an argument, with a list of
relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd
return to walsender protocol mode on completion, like the phys rep protocol
does when it's time for a timeline switch.
Rather than lie to the insert callback I'd really rather define a new
logical decoding callback for copying initial records. It doesn't get any
xact info (since it's not useful/relevant) or a full reorder buffer. No
ReorderBufferChange is passed; instead we pass something like a
ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin
id, origin lsn and commit timestamp (if known) and the RelFileNode
affected. The LogicalDecodingContext that's set up for the callback gets
ctx->reorder = NULL . There's no ReorderBufferTxn argument and none is
defined.
Since it's a new callback the plugin knows the rules, knows it's getting
initial state data to sync over, etc. It doesn't have to try to guess if
it's seeing a real insert and act differently with respect to xact identity
etc.
Obviously that's 9.6 material at the soonest, and only 9.6 if it could be
done ... well, right about now. So that won't meet your immediate needs,
but I think the same is true of the interface you propose above.
What I suggest doing in the mean time is specifying a new callback function
interface for tuple copies as described above, to be implemented by logical
decoding modules that support this extension. In each decoding plugin we
then define a SQL-callable function with 'internal' return type that
returns a pointer to the callback so you can obtain the hook function
address via a fmgr call via pg_proc. The callback would expect a state much
like I describe above and we'd use a SQL-callable function like what you
outlined to set up a fake logical decoding state for it, complete with
decoding context etc. Probably copying & pasting a moderately painful
amount of the logical decoding guts into an ext in the process :( since I
don't think you can easily set up much of the decoding state using the
decoding backend code without having a slot to use. Still, that'd let us
prototype this and prove the idea for inclusion in 9.7 (?) in-core while
retaining the capability via an extension for earlier versions.
You'd have to do much of the same hoop jumping to call an arbitrary output
plugin's insert callback directly, if not more.
Alternately, you could just use COPY ;)
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Wed, Jan 20, 2016 at 7:57 AM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 15 January 2016 at 16:30, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:I'd like to propose generic functions (probably in an extension, or in
core if not possible otherwise) to facilitate streaming existing data from
the database *in the same format* that one would get if these would be the
changes decoded by a logical decoding plugin.So effectively produce synthetic logical decoding callbacks to run a bunch
of fake INSERTs, presumably with a fake xid etc?
Exactly.
The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT
command of the replication protocol to get a consistent snapshot of the
database, then start listening to new changes on the slot.My impression is that you want to avoid the current step of "synchronize
database initial contents" when using logical decoding for replication.
Yes, but...
But I guess you're looking to then populate that empty schema in-band via
logical decoding, rather than having to do a --data-only dump or use COPY.
Right?That won't help you for schema; presumably you'd still do a pg_dump
--schema-only | pg_restore for that.Just like when restoring a --data-only dump or using COPY you'd have to
disable FKs during sync, but that's pretty much unavoidable.
All of this implies another *postgres* database on the receiving side,
which is not necessarily the case for my research.
The way this initial export phase is implemented there is by providing a
SQL-callable set returning function which is using SPI to run "SELECT *
FROM mytable" behind the scenes and runs the resulting tuples through the
INSERT callback of the logical decoding plugin, which lives in the same
loadable module as this SQL function.o_O
What about the reorder buffer, the logical decoding memory context, etc?
As shown by the POC patch, it is rather straightforward to achieve.
Bottled Water logical decoding plugin uses binary protocol based on Avro
data serialization library. As an experiment I was adding support for JSON
output format to it, and for that I had to re-implement the aforementioned
SRF to export initial data to convert tuples to JSON instead.Have you taken a look at what's been done with pglogical and
pglogical_output?We've got extensible protocol support there, and if Avro offers compelling
benefits over the current binary serialization I'm certainly interested in
hearing about it.
This is what I'm going to benchmark. With the generic function I can just
create two slots: one for pglogical and another one for BottledWater/Avro
and see which one performs better when forced to stream some TB worth of
INSERTs through the change callback.
What do you say?
Interesting idea. As outlined I think it sounds pretty fragile though; I
really, really don't like the idea of lying to the insert callback by
passing it a fake insert with (presumably) fake reorder buffer txn, etc.
Fair enough. However for performance testing it could be not that bad,
even if nothing of that lands in the actual API.
What we've done in pglogical is take a --schema-only dump then, on the
downstream, attach to the exported snapshot and use COPY ... TO STDOUT over
a libpq connection to the upstream feed that to COPY ... FROM STDIN on
another libpq connection to "ourselves", i.e. the downstream. Unless Petr
changed it to use COPY innards directly on the downstream; I know he talked
about it but haven't checked if he did. Anyway, either way it's not pretty
and requires a sideband non-replication connection to sync initial state.
The upside is that it can be relatively easily parallelized for faster sync
using multiple connections.
I've also measured that to have a baseline for comparing it to decoding
performance.
To what extent are you setting up a true logical decoding context here?
It is done in the same way exact pg_logical_slot_get/peek_changes() do.
Where does the xact info come from? The commit record? etc.
palloc0()
You're presumably not forming a reorder buffer then decoding it since it
could create a massive tempfile on disk, so are you just dummying this info
up?
In my experience, it doesn't. We know it's going to be a "committed xact",
so we don't really need to queue the changes up before we see a "commit"
record.
Or hoping the plugin won't look at it?
Pretty much. :-)
The functionality is good and I think that for the SQL level you'd have to
use SET TRANSACTION SNAPSHOT as you show. But I think it should really be
usable from the replication protocol too - and should try to keep the state
as close to that of a normal decoding session as possible. We'd at least
need a new walsender protocol command equivalent that took the snapshot
identifier, relation info and the other decoding params instead of a slot
name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that
omits SLOT and instead takes TABLES as an argument, with a list of
relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd
return to walsender protocol mode on completion, like the phys rep protocol
does when it's time for a timeline switch.
I've had similar thoughts.
Another consideration is that we might introduce modes for acquiring the
slot: Exclusive and Shared access (can be implemented with LWLocks?), so
that peek_changes() and stream_relation() could acquire the slot in Shared
access mode, thus allowing parallel queries, while START_REPLICATION and
get_changes() would require Exclusive access.
Rather than lie to the insert callback I'd really rather define a new
logical decoding callback for copying initial records. It doesn't get any
xact info (since it's not useful/relevant) or a full reorder buffer. No
ReorderBufferChange is passed; instead we pass something like a
ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin
id, origin lsn and commit timestamp (if known) and the RelFileNode
affected. The LogicalDecodingContext that's set up for the callback gets
ctx->reorder = NULL . There's no ReorderBufferTxn argument and none is
defined.Since it's a new callback the plugin knows the rules, knows it's getting
initial state data to sync over, etc. It doesn't have to try to guess if
it's seeing a real insert and act differently with respect to xact identity
etc.Obviously that's 9.6 material at the soonest, and only 9.6 if it could be
done ... well, right about now. So that won't meet your immediate needs,
but I think the same is true of the interface you propose above.
That can be a good approach going forward, yes.
What I suggest doing in the mean time is specifying a new callback function
interface for tuple copies as described above, to be implemented by logical
decoding modules that support this extension. In each decoding plugin we
then define a SQL-callable function with 'internal' return type that
returns a pointer to the callback so you can obtain the hook function
address via a fmgr call via pg_proc. The callback would expect a state much
like I describe above and we'd use a SQL-callable function like what you
outlined to set up a fake logical decoding state for it, complete with
decoding context etc. Probably copying & pasting a moderately painful
amount of the logical decoding guts into an ext in the process :( since I
don't think you can easily set up much of the decoding state using the
decoding backend code without having a slot to use. Still, that'd let us
prototype this and prove the idea for inclusion in 9.7 (?) in-core while
retaining the capability via an extension for earlier versions.You'd have to do much of the same hoop jumping to call an arbitrary output
plugin's insert callback directly, if not more.Alternately, you could just use COPY ;)
Thanks for the thoughtful reply! I'm going to experiment with my toy code
a bit more, while keeping in mind what could a more workable approach look
like.
--
Alex
On 20 January 2016 at 15:50, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:
All of this implies another *postgres* database on the receiving side,
which is not necessarily the case for my research.
Good point. It might not be a DB at all, either, i.e. it might not
understand INSERTs and you may want data in some arbitrary format. Like
json.
The same is true for other intended uses of pglogical_output. It's a pain
to have to query the desired initial state out of the database via normal
libpq, so I like your idea to let the output plugin deal with that.
This is what I'm going to benchmark. With the generic function I can just
create two slots: one for pglogical and another one for BottledWater/Avro
and see which one performs better when forced to stream some TB worth of
INSERTs through the change callback.
Makes sense.
What do you say?
Interesting idea. As outlined I think it sounds pretty fragile though; I
really, really don't like the idea of lying to the insert callback by
passing it a fake insert with (presumably) fake reorder buffer txn, etc.Fair enough. However for performance testing it could be not that bad,
even if nothing of that lands in the actual API.
I agree. It's fine for performance testing.
It should be relatively simple to add to pglogical_output, though you might
have to hack out a few things if the faked-up state doesn't fully stand up
to scrutiny.
You're presumably not forming a reorder buffer then decoding it since it
could create a massive tempfile on disk, so are you just dummying this info
up?In my experience, it doesn't. We know it's going to be a "committed
xact", so we don't really need to queue the changes up before we see a
"commit" record.
OK.
That's probably going to confuse pglogical_output a bit because it looks at
the tx start/end records. But it might not look closely enough to care, to
be honest, and may cope OK with the bogus data. It can probably be hacked
around for testing purposes.
Another consideration is that we might introduce modes for acquiring the
slot: Exclusive and Shared access (can be implemented with LWLocks?), so
that peek_changes() and stream_relation() could acquire the slot in Shared
access mode, thus allowing parallel queries, while START_REPLICATION and
get_changes() would require Exclusive access.
That'd be nice, but probably not totally necessary for streaming relations.
It doesn't really need the slot at all. Or shouldn't, I think. Though it
might be easiest to allow it to acquire the slot just for convenience and
shared code.
Thanks for the thoughtful reply! I'm going to experiment with my toy code
a bit more, while keeping in mind what could a more workable approach look
like.
Great.
I'll be really interested in your results.
If you have trouble making pglogical_output cooperate with your tests and
measurements feel free to mail me directly and I'll see if I can help find
what's going wrong in the test harness or in pglogical_output .
--
Craig Ringer http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
On Wed, Jan 20, 2016 at 9:26 AM, Craig Ringer <craig@2ndquadrant.com> wrote:
On 20 January 2016 at 15:50, Shulgin, Oleksandr <
oleksandr.shulgin@zalando.de> wrote:That'd be nice, but probably not totally necessary for streaming
relations. It doesn't really need the slot at all. Or shouldn't, I think.
Though it might be easiest to allow it to acquire the slot just for
convenience and shared code.
Yes, before looking at the code I thought I could do without a slot, but
dependency on MyReplicationSlot being set in a number of places has forced
me to actually acquire it, in order to keep the changes more contained.
--
Alex