From 14f6f4e70742e5355c058ddb219a5ccc590535ce Mon Sep 17 00:00:00 2001 From: Shubham Khanna Date: Mon, 20 May 2024 10:58:31 +0530 Subject: [PATCH v4] Enable support for 'include_generated_columns' option in 'logical replication' This commit enables support for the 'include_generated_columns' option in logical replication, allowing the transmission of generated column information and data alongside regular table changes. This option is particularly useful for scenarios where applications require access to generated column values for downstream processing or synchronization. With this enhancement, users can now include the 'include_generated_columns' option when querying logical replication slots using either the pgoutput plugin or the test_decoding plugin. This option, when set to 'true' or '1', instructs the replication system to include generated column information and data in the replication stream. CREATE SUBSCRIPTION test1 connection 'dbname=postgres host=localhost port=9999 'publication pub1; Usage from test_decoding plugin: SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1'); Currently copy_data option with include_generated_columns option is not supported. A future patch will remove this limitation. This commit aims to enhance the flexibility and utility of logical replication by allowing users to include generated column information in replication streams, paving the way for more robust data synchronization and processing workflows. --- .../expected/decoding_into_rel.out | 25 +++++++++ .../test_decoding/sql/decoding_into_rel.sql | 10 +++- contrib/test_decoding/test_decoding.c | 26 +++++++-- doc/src/sgml/protocol.sgml | 14 +++++ doc/src/sgml/ref/create_subscription.sgml | 19 +++++++ src/backend/catalog/pg_publication.c | 9 +-- src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 31 +++++++++- .../libpqwalreceiver/libpqwalreceiver.c | 4 ++ src/backend/replication/logical/proto.c | 56 +++++++++++++------ src/backend/replication/logical/relation.c | 2 +- src/backend/replication/logical/worker.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 42 ++++++++++---- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_subscription.h | 3 + src/include/replication/logicalproto.h | 13 +++-- src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/publication.out | 4 +- src/test/regress/sql/publication.sql | 3 +- src/test/subscription/t/011_generated.pl | 33 ++++++++++- src/test/subscription/t/031_column_list.pl | 4 +- 22 files changed, 249 insertions(+), 55 deletions(-) diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out index 8fd3390066..d4116b0fe6 100644 --- a/contrib/test_decoding/expected/decoding_into_rel.out +++ b/contrib/test_decoding/expected/decoding_into_rel.out @@ -103,6 +103,31 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc COMMIT (14 rows) +-- check include-generated-columns option with generated column +CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED); +INSERT INTO gencoltable (a) VALUES (1), (2), (3); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '1'); + data +------------------------------------------------------------- + BEGIN + table public.gencoltable: INSERT: a[integer]:1 b[integer]:2 + table public.gencoltable: INSERT: a[integer]:2 b[integer]:4 + table public.gencoltable: INSERT: a[integer]:3 b[integer]:6 + COMMIT +(5 rows) + +INSERT INTO gencoltable (a) VALUES (4), (5), (6); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '0'); + data +------------------------------------------------ + BEGIN + table public.gencoltable: INSERT: a[integer]:4 + table public.gencoltable: INSERT: a[integer]:5 + table public.gencoltable: INSERT: a[integer]:6 + COMMIT +(5 rows) + +DROP TABLE gencoltable; SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); ?column? ---------- diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql index 1068cec588..c40b860f11 100644 --- a/contrib/test_decoding/sql/decoding_into_rel.sql +++ b/contrib/test_decoding/sql/decoding_into_rel.sql @@ -39,4 +39,12 @@ SELECT * FROM slot_changes_wrapper('regression_slot'); SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); +-- check include-generated-columns option with generated column +CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED); +INSERT INTO gencoltable (a) VALUES (1), (2), (3); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '1'); +INSERT INTO gencoltable (a) VALUES (4), (5), (6); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-generated-columns', '0'); +DROP TABLE gencoltable; + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); \ No newline at end of file diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 7c50d13969..10ca369d2a 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -31,6 +31,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool only_local; + bool include_generated_columns; } TestDecodingData; /* @@ -168,6 +169,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->include_generated_columns = true; ctx->output_plugin_private = data; @@ -259,6 +261,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-generated-columns") == 0) + { + if (elem->arg == NULL) + data->include_generated_columns = true; + else if (!parse_bool(strVal(elem->arg), &data->include_generated_columns)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -521,7 +533,8 @@ print_literal(StringInfo s, Oid typid, char *outputstr) /* print the tuple 'tuple' into the StringInfo s */ static void -tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls) +tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, + bool skip_nulls, bool include_generated_columns) { int natt; @@ -544,6 +557,9 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_ if (attr->attisdropped) continue; + if (attr->attgenerated && !include_generated_columns) + continue; + /* * Don't print system columns, oid will already have been printed if * present. @@ -641,7 +657,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.newtuple, - false); + false, data->include_generated_columns); break; case REORDER_BUFFER_CHANGE_UPDATE: appendStringInfoString(ctx->out, " UPDATE:"); @@ -650,7 +666,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, appendStringInfoString(ctx->out, " old-key:"); tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.oldtuple, - true); + true, data->include_generated_columns ); appendStringInfoString(ctx->out, " new-tuple:"); } @@ -659,7 +675,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.newtuple, - false); + false, data->include_generated_columns); break; case REORDER_BUFFER_CHANGE_DELETE: appendStringInfoString(ctx->out, " DELETE:"); @@ -671,7 +687,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.oldtuple, - true); + true, data->include_generated_columns); break; default: Assert(false); diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 1b27d0a547..e6fee105de 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -3306,6 +3306,20 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + include-generated-columns + + + Boolean option to enable generated columns. + The include-generated-columns option controls whether generated + columns should be included in the string representation of tuples + during logical decoding in PostgreSQL. This allows users to + customize the output format based on whether they want to include + these columns or not. The default is false. + + + + origin diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 740b7d9421..57520b5aef 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -428,6 +428,25 @@ CREATE SUBSCRIPTION subscription_name + + + include_generated_column (boolean) + + + Specifies whether the generated columns present in the tables + associated with the subscription should be replicated. The default is + false. + + + + This parameter can only be set true if copy_data is + set to false. If the subscriber-side column is also a + generated column then this option has no effect; the replicated data will + be ignored and the subscriber column will be filled as normal with the + subscriber-side computed or default data. + + + diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 0602398a54..f611148472 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -506,7 +506,6 @@ publication_translate_columns(Relation targetrel, List *columns, Bitmapset *set = NULL; ListCell *lc; int n = 0; - TupleDesc tupdesc = RelationGetDescr(targetrel); /* Bail out when no column list defined. */ if (!columns) @@ -534,12 +533,6 @@ publication_translate_columns(Relation targetrel, List *columns, errmsg("cannot use system column \"%s\" in publication column list", colname)); - if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) - ereport(ERROR, - errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("cannot use generated column \"%s\" in publication column list", - colname)); - if (bms_is_member(attnum, set)) ereport(ERROR, errcode(ERRCODE_DUPLICATE_OBJECT), @@ -1232,7 +1225,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) continue; attnums[nattnums++] = att->attnum; diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..a090b36465 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->includegeneratedcolumn = subform->subincludegeneratedcolumn; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e407428dbc..8d245722bf 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -72,6 +72,7 @@ #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_LSN 0x00004000 #define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_INCLUDE_GENERATED_COLUMN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -99,6 +100,7 @@ typedef struct SubOpts bool failover; char *origin; XLogRecPtr lsn; + bool include_generated_column; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -161,6 +163,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_INCLUDE_GENERATED_COLUMN)) + opts->include_generated_column = false; /* Parse options */ foreach(lc, stmt_options) @@ -366,6 +370,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_INCLUDE_GENERATED_COLUMN) && + strcmp(defel->defname, "include_generated_column") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_INCLUDE_GENERATED_COLUMN)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_INCLUDE_GENERATED_COLUMN; + opts->include_generated_column = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -446,6 +459,20 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* + * Do additional checking for disallowed combination when copy_data and + * include_generated_column are true. COPY of generated columns is not supported + * yet. + */ + if (opts->copy_data && opts->include_generated_column) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ + errmsg("%s and %s are mutually exclusive options", + "copy_data = true", "include_generated_column = true"))); + } } /* @@ -603,7 +630,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN | + SUBOPT_INCLUDE_GENERATED_COLUMN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -723,6 +751,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subincludegeneratedcolumn - 1] = BoolGetDatum(opts.include_generated_column); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 3c2b1bb496..48830b0e10 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -598,6 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", origin '%s'", options->proto.logical.origin); + if (options->proto.logical.include_generated_column && + PQserverVersion(conn->streamConn) >= 170000) + appendStringInfoString(&cmd, ", include_generated_columns 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 95c09c9516..7405eb3deb 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -30,10 +30,12 @@ #define TRUNCATE_RESTART_SEQS (1<<1) static void logicalrep_write_attrs(StringInfo out, Relation rel, - Bitmapset *columns); + Bitmapset *columns, + bool include_generated_columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool include_generated_columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary, Bitmapset *columns) + TupleTableSlot *newslot, bool binary, Bitmapset *columns, + bool include_generated_columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns, + include_generated_columns); } /* @@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns) + bool binary, Bitmapset *columns, + bool include_generated_columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -478,11 +483,13 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, + include_generated_columns); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns, + include_generated_columns); } /* @@ -532,7 +539,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, - Bitmapset *columns) + Bitmapset *columns, bool include_generated_columns) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -552,7 +559,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, + include_generated_columns); } /* @@ -668,7 +676,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, */ void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, - Bitmapset *columns) + Bitmapset *columns, bool include_generated_columns) { char *relname; @@ -690,7 +698,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel, columns); + logicalrep_write_attrs(out, rel, columns, include_generated_columns); } /* @@ -767,7 +775,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns) + bool binary, Bitmapset *columns, + bool include_generated_columns) { TupleDesc desc; Datum *values; @@ -781,7 +790,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !include_generated_columns) continue; if (!column_in_column_list(att->attnum, columns)) @@ -802,7 +814,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, Form_pg_type typclass; Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !include_generated_columns) continue; if (!column_in_column_list(att->attnum, columns)) @@ -923,7 +938,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, + bool include_generated_columns) { TupleDesc desc; int i; @@ -938,7 +954,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !include_generated_columns) continue; if (!column_in_column_list(att->attnum, columns)) @@ -959,7 +978,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) Form_pg_attribute att = TupleDescAttr(desc, i); uint8 flags = 0; - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !include_generated_columns) continue; if (!column_in_column_list(att->attnum, columns)) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index f139e7b01e..5de1531567 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -421,7 +421,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) int attnum; Form_pg_attribute attr = TupleDescAttr(desc, i); - if (attr->attisdropped || attr->attgenerated) + if (attr->attisdropped) { entry->attrmap->attnums[i] = -1; continue; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e8..a662a1f8ff 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4379,6 +4379,7 @@ set_stream_options(WalRcvStreamOptions *options, options->proto.logical.twophase = false; options->proto.logical.origin = pstrdup(MySubscription->origin); + options->proto.logical.include_generated_column = MySubscription->includegeneratedcolumn; } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d2b35cfb96..7f8715fb29 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -86,7 +86,8 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns); + Bitmapset *columns, + bool include_generated_columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -283,11 +284,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool include_generated_columns_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->include_generated_columns = false; foreach(lc, options) { @@ -396,6 +399,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "include_generated_columns") == 0) + { + if (include_generated_columns_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + include_generated_columns_option_given = true; + + data->include_generated_columns = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -731,11 +744,13 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns, + data->include_generated_columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx, relentry->columns); + send_relation_and_attrs(relation, xid, ctx, relentry->columns, + data->include_generated_columns); if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -749,7 +764,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns) + Bitmapset *columns, bool include_generated_columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -766,7 +781,10 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !include_generated_columns) continue; if (att->atttypid < FirstGenbkiObjectId) @@ -782,7 +800,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns); + logicalrep_write_rel(ctx->out, xid, relation, columns, include_generated_columns); OutputPluginWrite(ctx, false); } @@ -1085,7 +1103,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) continue; nliveatts++; @@ -1413,7 +1431,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChangeType action = change->action; TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - if (!is_publishable_relation(relation)) return; @@ -1531,15 +1548,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + data->include_generated_columns); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, - new_slot, data->binary, relentry->columns); + new_slot, data->binary, relentry->columns, + data->include_generated_columns); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + data->include_generated_columns); break; default: Assert(false); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d453e224d9..e8ff752fd9 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3365,7 +3365,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "streaming", "synchronous_commit", "two_phase","include_generated_columns"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..4d1d45e811 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -98,6 +98,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subincludegeneratedcolumn; /* True if generated columns must be published */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -157,6 +159,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool includegeneratedcolumn; /* publish generated column data */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index c409638a2e..34ec40b07e 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -225,18 +225,22 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool include_generated_columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - TupleTableSlot *newslot, bool binary, Bitmapset *columns); + TupleTableSlot *newslot, bool binary, + Bitmapset *columns, + bool include_generated_columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool include_generated_columns); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, @@ -247,7 +251,8 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel, Bitmapset *columns); + Relation rel, Bitmapset *columns, + bool include_generated_columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 89f94e1147..224394cb93 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + bool include_generated_columns; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 12f71fa99b..fb37720920 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -186,6 +186,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + bool include_generated_column; /* publish generated columns */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 30b6371134..aa1450315d 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -687,9 +687,9 @@ UPDATE testpub_tbl5 SET a = 1; ERROR: cannot update table "testpub_tbl5" DETAIL: Column list used by the publication does not cover the replica identity. ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; --- error: generated column "d" can't be in list +-- ok: generated columns can be in the list too ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -ERROR: cannot use generated column "d" in publication column list +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; -- error: system attributes "ctid" not allowed in column list ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); ERROR: cannot use system column "ctid" in publication column list diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 479d4f3264..b1899ddb1a 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -413,8 +413,9 @@ ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); UPDATE testpub_tbl5 SET a = 1; ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; --- error: generated column "d" can't be in list +-- ok: generated columns can be in the list too ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; -- error: system attributes "ctid" not allowed in column list ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid); -- ok diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl index 8b2e5f4708..e7a48a02d3 100644 --- a/src/test/subscription/t/011_generated.pl +++ b/src/test/subscription/t/011_generated.pl @@ -24,21 +24,44 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)" ); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)" +); + $node_subscriber->safe_psql('postgres', "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED, c int)" ); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b int)" +); + # data for initial sync $node_publisher->safe_psql('postgres', "INSERT INTO tab1 (a) VALUES (1), (2), (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a) VALUES (1), (2), (3)"); $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub1 FOR ALL TABLES"); + "CREATE PUBLICATION pub1 FOR TABLE tab1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab2"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" ); +my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres', qq( + CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (include_generated_column = true) +)); +ok( $stderr =~ + qr/copy_data = true and include_generated_column = true are mutually exclusive options/, + 'cannot use both include_generated_column and copy_data as true'); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (include_generated_column = true, copy_data = false)" +); + # Wait for initial sync of all subscriptions $node_subscriber->wait_for_subscription_sync; @@ -62,6 +85,14 @@ is( $result, qq(1|22| 4|88| 6|132|), 'generated columns replicated'); +$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (4), (5)"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is( $result, qq(4|8 +5|10), 'generated columns replicated to non-generated column on subscriber'); + # try it with a subscriber-side trigger $node_subscriber->safe_psql( diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 9a97fa5020..6e73f892e9 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -1202,7 +1202,7 @@ $result = $node_publisher->safe_psql( is( $result, qq(t t), 'check the number of columns in the old tuple'); -# TEST: Generated and dropped columns are not considered for the column list. +# TEST: Dropped columns are not considered for the column list. # So, the publication having a column list except for those columns and a # publication without any column (aka all columns as part of the columns # list) are considered to have the same column list. @@ -1211,7 +1211,7 @@ $node_publisher->safe_psql( CREATE TABLE test_mix_4 (a int PRIMARY KEY, b int, c int, d int GENERATED ALWAYS AS (a + 1) STORED); ALTER TABLE test_mix_4 DROP COLUMN c; - CREATE PUBLICATION pub_mix_7 FOR TABLE test_mix_4 (a, b); + CREATE PUBLICATION pub_mix_7 FOR TABLE test_mix_4 (a, b, d); CREATE PUBLICATION pub_mix_8 FOR TABLE test_mix_4; -- initial data -- 2.34.1