diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f439c582a5..7b03919be4 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate + spill slot truncate sequence ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out new file mode 100644 index 0000000000..58bca1074e --- /dev/null +++ b/contrib/test_decoding/expected/sequence.out @@ -0,0 +1,336 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +ERROR: replication slot "regression_slot" already exists +CREATE SEQUENCE test_sequence; +-- test the sequence changes by several nextval() calls +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 2 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 4 +(1 row) + +-- test the sequence changes by several ALTER commands +ALTER SEQUENCE test_sequence INCREMENT BY 10; +SELECT nextval('test_sequence'); + nextval +--------- + 14 +(1 row) + +ALTER SEQUENCE test_sequence START WITH 3000; +ALTER SEQUENCE test_sequence MAXVALUE 10000; +ALTER SEQUENCE test_sequence RESTART WITH 4000; +SELECT nextval('test_sequence'); + nextval +--------- + 4000 +(1 row) + +-- test the sequence changes by several setval() calls +SELECT setval('test_sequence', 3500); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3510 +(1 row) + +SELECT setval('test_sequence', 3500, true); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3510 +(1 row) + +SELECT setval('test_sequence', 3500, false); + setval +-------- + 3500 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3500 +(1 row) + +-- show results and drop sequence +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + data +---------------------------------------------------------------------------------------------------------- + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:1 log_cnt[bigint]:0 is_called[boolean]:false + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:33 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:4 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:334 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:14 log_cnt[bigint]:32 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:14 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:4000 log_cnt[bigint]:0 is_called[boolean]:false + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:4320 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:3500 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:3830 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:3500 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:3830 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:3500 log_cnt[bigint]:0 is_called[boolean]:false + COMMIT + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:3820 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT +(42 rows) + +DROP SEQUENCE test_sequence; +-- rollback on sequence creation and update +BEGIN; +CREATE SEQUENCE test_sequence; +CREATE TABLE test_table (a INT); +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 2 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3 +(1 row) + +SELECT setval('test_sequence', 3000); + setval +-------- + 3000 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3001 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3002 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3003 +(1 row) + +ALTER SEQUENCE test_sequence RESTART WITH 6000; +INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) ); +SELECT nextval('test_sequence'); + nextval +--------- + 6001 +(1 row) + +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + data +------ +(0 rows) + +-- rollback on table creation with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); + setval +-------- + 3000 +(1 row) + +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + data +------ +(0 rows) + +-- rollback on table with serial column +CREATE TABLE test_table (a SERIAL, b INT); +BEGIN; +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); + setval +-------- + 3000 +(1 row) + +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +-- check table and sequence values after rollback +SELECT * from test_table_a_seq; + last_value | log_cnt | is_called +------------+---------+----------- + 3003 | 30 | t +(1 row) + +SELECT nextval('test_table_a_seq'); + nextval +--------- + 3004 +(1 row) + +DROP TABLE test_table; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + data +---------------------------------------------------------------------------------------------------------- + BEGIN + table public.test_table_a_seq: SEQUENCE: last_value[bigint]:1 log_cnt[bigint]:0 is_called[boolean]:false + COMMIT +(3 rows) + +-- savepoint test on table with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +SAVEPOINT a; +INSERT INTO test_table (b) VALUES (300); +ROLLBACK TO SAVEPOINT a; +DROP TABLE test_table; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + data +---------------------------------------------------------------------------------------------------------- + BEGIN + table public.test_table_a_seq: SEQUENCE: last_value[bigint]:1 log_cnt[bigint]:0 is_called[boolean]:false + table public.test_table_a_seq: SEQUENCE: last_value[bigint]:33 log_cnt[bigint]:0 is_called[boolean]:true + table public.test_table: INSERT: a[integer]:1 b[integer]:100 + table public.test_table: INSERT: a[integer]:2 b[integer]:200 + COMMIT +(6 rows) + +-- savepoint test on table with serial column +BEGIN; +CREATE SEQUENCE test_sequence; +SELECT nextval('test_sequence'); + nextval +--------- + 1 +(1 row) + +SELECT setval('test_sequence', 3000); + setval +-------- + 3000 +(1 row) + +SELECT nextval('test_sequence'); + nextval +--------- + 3001 +(1 row) + +SAVEPOINT a; +ALTER SEQUENCE test_sequence START WITH 7000; +SELECT setval('test_sequence', 5000); + setval +-------- + 5000 +(1 row) + +ROLLBACK TO SAVEPOINT a; +SELECT * FROM test_sequence; + last_value | log_cnt | is_called +------------+---------+----------- + 3001 | 32 | t +(1 row) + +DROP SEQUENCE test_sequence; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + data +--------------------------------------------------------------------------------------------------------- + BEGIN + table public.test_sequence: SEQUENCE: last_value[bigint]:1 log_cnt[bigint]:0 is_called[boolean]:false + table public.test_sequence: SEQUENCE: last_value[bigint]:33 log_cnt[bigint]:0 is_called[boolean]:true + table public.test_sequence: SEQUENCE: last_value[bigint]:3000 log_cnt[bigint]:0 is_called[boolean]:true + table public.test_sequence: SEQUENCE: last_value[bigint]:3033 log_cnt[bigint]:0 is_called[boolean]:true + COMMIT +(6 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql new file mode 100644 index 0000000000..02fad3432c --- /dev/null +++ b/contrib/test_decoding/sql/sequence.sql @@ -0,0 +1,119 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE SEQUENCE test_sequence; + +-- test the sequence changes by several nextval() calls +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); + +-- test the sequence changes by several ALTER commands +ALTER SEQUENCE test_sequence INCREMENT BY 10; +SELECT nextval('test_sequence'); + +ALTER SEQUENCE test_sequence START WITH 3000; +ALTER SEQUENCE test_sequence MAXVALUE 10000; +ALTER SEQUENCE test_sequence RESTART WITH 4000; +SELECT nextval('test_sequence'); + +-- test the sequence changes by several setval() calls +SELECT setval('test_sequence', 3500); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3500, true); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3500, false); +SELECT nextval('test_sequence'); + +-- show results and drop sequence +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); +DROP SEQUENCE test_sequence; + +-- rollback on sequence creation and update +BEGIN; +CREATE SEQUENCE test_sequence; +CREATE TABLE test_table (a INT); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3000); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +SELECT nextval('test_sequence'); +ALTER SEQUENCE test_sequence RESTART WITH 6000; +INSERT INTO test_table VALUES( (SELECT nextval('test_sequence')) ); +SELECT nextval('test_sequence'); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + +-- rollback on table creation with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + +-- rollback on table with serial column +CREATE TABLE test_table (a SERIAL, b INT); + +BEGIN; +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +INSERT INTO test_table (b) VALUES (300); +SELECT setval('test_table_a_seq', 3000); +INSERT INTO test_table (b) VALUES (400); +INSERT INTO test_table (b) VALUES (500); +INSERT INTO test_table (b) VALUES (600); +ALTER SEQUENCE test_table_a_seq RESTART WITH 6000; +INSERT INTO test_table (b) VALUES (700); +INSERT INTO test_table (b) VALUES (800); +INSERT INTO test_table (b) VALUES (900); +ROLLBACK; + +-- check table and sequence values after rollback +SELECT * from test_table_a_seq; +SELECT nextval('test_table_a_seq'); + +DROP TABLE test_table; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + +-- savepoint test on table with serial column +BEGIN; +CREATE TABLE test_table (a SERIAL, b INT); +INSERT INTO test_table (b) VALUES (100); +INSERT INTO test_table (b) VALUES (200); +SAVEPOINT a; +INSERT INTO test_table (b) VALUES (300); +ROLLBACK TO SAVEPOINT a; +DROP TABLE test_table; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + +-- savepoint test on table with serial column +BEGIN; +CREATE SEQUENCE test_sequence; +SELECT nextval('test_sequence'); +SELECT setval('test_sequence', 3000); +SELECT nextval('test_sequence'); +SAVEPOINT a; +ALTER SEQUENCE test_sequence START WITH 7000; +SELECT setval('test_sequence', 5000); +ROLLBACK TO SAVEPOINT a; +SELECT * FROM test_sequence; +DROP SEQUENCE test_sequence; +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequence', '0'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 93c948856e..6ae7fc00fa 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -36,6 +36,7 @@ typedef struct bool skip_empty_xacts; bool xact_wrote_changes; bool only_local; + bool skip_sequence; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -102,6 +103,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; + data->skip_sequence = true; ctx->output_plugin_private = data; @@ -183,6 +185,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "skip-sequence") == 0) + { + + if (elem->arg == NULL) + data->only_local = true; + else if (!parse_bool(strVal(elem->arg), &data->skip_sequence)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -402,6 +415,10 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, data = ctx->output_plugin_private; + /* return if incoming relation is a sequence and skip_sequence is true */ + if (change->action == REORDER_BUFFER_CHANGE_SEQUENCE && data->skip_sequence) + return; + /* output BEGIN if we haven't yet */ if (data->skip_empty_xacts && !data->xact_wrote_changes) { @@ -466,6 +483,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &change->data.tp.oldtuple->tuple, true); break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + appendStringInfoString(ctx->out, " SEQUENCE:"); + if (change->data.sequence.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->data.sequence.newtuple->tuple, + false); + break; default: Assert(false); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3abf8..3dc14ead08 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,6 +42,7 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" +#include "commands/sequence.h" typedef struct XLogRecordBuffer { @@ -70,9 +71,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* * Take every XLogReadRecord()ed record and perform the actions required to @@ -130,6 +133,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeLogicalMsgOp(ctx, &buf); break; + case RM_SEQ_ID: + DecodeSequence(ctx, &buf); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's @@ -145,7 +152,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_HASH_ID: case RM_GIN_ID: case RM_GIST_ID: - case RM_SEQ_ID: case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: @@ -1052,3 +1058,80 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Decode Sequence Tuple + */ +static void +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) +{ + int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; + + Assert(datalen >= 0); + + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;; + + ItemPointerSetInvalid(&tuple->tuple.t_self); + + tuple->tuple.t_tableOid = InvalidOid; + + memcpy(((char *) tuple->tuple.t_data), + data + sizeof(xl_seq_rec), + SizeofHeapTupleHeader); + + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, + datalen); +} + +/* + * Handle sequence decode + */ +static void +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + ReorderBufferChange *change; + RelFileNode target_node; + XLogReaderState *r = buf->record; + char *tupledata = NULL; + Size tuplelen; + Size datalen = 0; + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + + /* only decode changes flagged with XLOG_SEQ_LOG */ + if (info != XLOG_SEQ_LOG) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_SEQUENCE; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.sequence.relnode, &target_node, sizeof(RelFileNode)); + + tupledata = XLogRecGetData(r); + datalen = XLogRecGetDataLen(r); + + if(!datalen || !tupledata) + return; + + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); + + change->data.sequence.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeSeqTuple(tupledata, datalen, change->data.sequence.newtuple); + + ReorderBufferXidSetCatalogChanges(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr); + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4594cf9509..c0fb7a2a0c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -474,6 +474,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.newtuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.sequence.newtuple); + change->data.sequence.newtuple = NULL; + } + break; } pfree(change); @@ -1828,6 +1835,28 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode, + change->data.sequence.relnode.relNode); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.tp.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.sequence.relnode, + MAIN_FORKNUM)); + + if (RelationIsLogicallyLogged(relation)) + rb->apply_change(rb, txn, relation, change); + break; } } @@ -2511,6 +2540,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_SEQUENCE: { char *data; ReorderBufferTupleBuf *oldtup, @@ -2702,6 +2732,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_SEQUENCE: { ReorderBufferTupleBuf *oldtup, *newtup; @@ -3043,6 +3074,31 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.newtuple) + { + uint32 tuplelen; + + memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), + sizeof(uint32)); + + change->data.sequence.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.sequence.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.sequence.newtuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.sequence.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.sequence.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + break; } dlist_push_tail(&txn->changes, &change->node); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 626ecf4dc9..17c3fb427d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -62,7 +62,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE, }; /* forward declaration */ @@ -149,6 +150,14 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + /* + * Context data for Sequence changes + */ + struct + { + RelFileNode relnode; + ReorderBufferTupleBuf *newtuple; + } sequence; } data; /*