From d1c9dcb8ecc9e3138a995b7ada468e0a6d07ceba Mon Sep 17 00:00:00 2001 From: Shubham Khanna Date: Wed, 8 May 2024 10:53:52 +0530 Subject: [PATCH v1] Support capturing generated column data using pgoutput and test_decoding plugin. Now if include_generated_columns option is specified, the generated column information and generated column data also will be sent. Usage from pgoutput plugin: SELECT * FROM pg_logical_slot_peek_binary_changes('slot1', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub1', 'include_generated_columns', 'true'); Usage from test_decoding plugin: SELECT data FROM pg_logical_slot_get_changes('slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1'); --- contrib/test_decoding/expected/ddl.out | 14 +++++ contrib/test_decoding/sql/ddl.sql | 6 +++ contrib/test_decoding/test_decoding.c | 25 +++++++-- src/backend/replication/logical/proto.c | 60 ++++++++++++++------- src/backend/replication/pgoutput/pgoutput.c | 39 ++++++++++---- src/include/replication/logicalproto.h | 13 +++-- src/include/replication/pgoutput.h | 1 + 7 files changed, 121 insertions(+), 37 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index bcd1f74b2b..07bac1f677 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -843,6 +843,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc data (0 rows) \pset format aligned +-- check include_generated_columns option with generated column +CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED); +INSERT INTO gencoltable (a) VALUES (1), (2), (3); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1'); + data +------------------------------------------------------------- + BEGIN + table public.gencoltable: INSERT: a[integer]:1 b[integer]:2 + table public.gencoltable: INSERT: a[integer]:2 b[integer]:4 + table public.gencoltable: INSERT: a[integer]:3 b[integer]:6 + COMMIT +(5 rows) + +DROP TABLE gencoltable; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index 2f8e4e7f2c..d688775580 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -437,6 +437,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); \pset format aligned +-- check include_generated_columns option with generated column +CREATE TABLE gencoltable (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED); +INSERT INTO gencoltable (a) VALUES (1), (2), (3); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include_generated_columns', '1'); +DROP TABLE gencoltable; + SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 7c50d13969..f15ff93ac3 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -31,6 +31,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool only_local; + bool include_generated_columns; } TestDecodingData; /* @@ -259,6 +260,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include_generated_columns") == 0) + { + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &data->include_generated_columns)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -521,7 +532,8 @@ print_literal(StringInfo s, Oid typid, char *outputstr) /* print the tuple 'tuple' into the StringInfo s */ static void -tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls) +tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, + bool skip_nulls, bool include_generated_columns) { int natt; @@ -544,6 +556,9 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_ if (attr->attisdropped) continue; + if (attr->attgenerated && !include_generated_columns) + continue; + /* * Don't print system columns, oid will already have been printed if * present. @@ -641,7 +656,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.newtuple, - false); + false, data->include_generated_columns); break; case REORDER_BUFFER_CHANGE_UPDATE: appendStringInfoString(ctx->out, " UPDATE:"); @@ -650,7 +665,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, appendStringInfoString(ctx->out, " old-key:"); tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.oldtuple, - true); + true, data->include_generated_columns ); appendStringInfoString(ctx->out, " new-tuple:"); } @@ -659,7 +674,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.newtuple, - false); + false, data->include_generated_columns); break; case REORDER_BUFFER_CHANGE_DELETE: appendStringInfoString(ctx->out, " DELETE:"); @@ -671,7 +686,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else tuple_to_stringinfo(ctx->out, tupdesc, change->data.tp.oldtuple, - true); + true, data->include_generated_columns); break; default: Assert(false); diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 95c09c9516..69a44a7122 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -30,10 +30,12 @@ #define TRUNCATE_RESTART_SEQS (1<<1) static void logicalrep_write_attrs(StringInfo out, Relation rel, - Bitmapset *columns); + Bitmapset *columns, + bool publish_generated_column); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool publish_generated_column); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - TupleTableSlot *newslot, bool binary, Bitmapset *columns) + TupleTableSlot *newslot, bool binary, Bitmapset *columns, + bool publish_generated_column) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns, + publish_generated_column); } /* @@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns) + bool binary, Bitmapset *columns, + bool publish_generated_column) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -478,11 +483,13 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, + publish_generated_column); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns); + logicalrep_write_tuple(out, rel, newslot, binary, columns, + publish_generated_column); } /* @@ -532,7 +539,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, - Bitmapset *columns) + Bitmapset *columns, bool publish_generated_column) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -552,7 +559,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, + publish_generated_column); } /* @@ -668,7 +676,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, */ void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, - Bitmapset *columns) + Bitmapset *columns, bool publish_generated_column) { char *relname; @@ -690,7 +698,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel, columns); + logicalrep_write_attrs(out, rel, columns, publish_generated_column); } /* @@ -767,7 +775,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns) + bool binary, Bitmapset *columns, + bool publish_generated_column) { TupleDesc desc; Datum *values; @@ -781,10 +790,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) continue; - if (!column_in_column_list(att->attnum, columns)) + if (!column_in_column_list(att->attnum, columns) && !att->attgenerated) + continue; + + if (att->attgenerated && !publish_generated_column) continue; nliveatts++; @@ -802,10 +814,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, Form_pg_type typclass; Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) continue; - if (!column_in_column_list(att->attnum, columns)) + if (!column_in_column_list(att->attnum, columns) && !att->attgenerated) + continue; + + if (att->attgenerated && !publish_generated_column) continue; if (isnull[i]) @@ -923,7 +938,8 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, + bool publish_generated_column) { TupleDesc desc; int i; @@ -938,7 +954,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !publish_generated_column) continue; if (!column_in_column_list(att->attnum, columns)) @@ -959,7 +978,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) Form_pg_attribute att = TupleDescAttr(desc, i); uint8 flags = 0; - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !publish_generated_column) continue; if (!column_in_column_list(att->attnum, columns)) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d2b35cfb96..44d629a624 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -86,7 +86,8 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns); + Bitmapset *columns, + bool publish_generated_column); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -283,11 +284,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool generate_column_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->publish_generated_column = false; foreach(lc, options) { @@ -396,6 +399,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "include_generated_columns") == 0) + { + if (generate_column_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + generate_column_option_given = true; + + data->publish_generated_column = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -731,11 +744,13 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns, + data->publish_generated_column); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx, relentry->columns); + send_relation_and_attrs(relation, xid, ctx, relentry->columns, + data->publish_generated_column); if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -749,7 +764,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns) + Bitmapset *columns, bool publish_generated_column) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -766,7 +781,10 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || att->attgenerated) + if (att->attisdropped) + continue; + + if (att->attgenerated && !publish_generated_column) continue; if (att->atttypid < FirstGenbkiObjectId) @@ -782,7 +800,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns); + logicalrep_write_rel(ctx->out, xid, relation, columns, publish_generated_column); OutputPluginWrite(ctx, false); } @@ -1531,15 +1549,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + data->publish_generated_column); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, - new_slot, data->binary, relentry->columns); + new_slot, data->binary, relentry->columns, + data->publish_generated_column); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + data->publish_generated_column); break; default: Assert(false); diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index c409638a2e..2676acefce 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -225,18 +225,22 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool publish_generated_column); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - TupleTableSlot *newslot, bool binary, Bitmapset *columns); + TupleTableSlot *newslot, bool binary, + Bitmapset *columns, + bool publish_generated_column); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, - bool binary, Bitmapset *columns); + bool binary, Bitmapset *columns, + bool publish_generated_column); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, @@ -247,7 +251,8 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel, Bitmapset *columns); + Relation rel, Bitmapset *columns, + bool publish_generated_column); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 89f94e1147..c4773f60a3 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + bool publish_generated_column; } PGOutputData; #endif /* PGOUTPUT_H */ -- 2.34.1