diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 93c948856e..7a7e572d6c 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -466,6 +466,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &change->data.tp.oldtuple->tuple, true); break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + appendStringInfoString(ctx->out, " SEQUENCE:"); + if (change->data.sequence.newtuple == NULL) + appendStringInfoString(ctx->out, " (no-tuple-data)"); + else + tuple_to_stringinfo(ctx->out, tupdesc, + &change->data.sequence.newtuple->tuple, + false); + break; default: Assert(false); } diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 6aab73bfd4..941015e4aa 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -49,11 +49,10 @@ /* - * We don't want to log each fetching of a value from a sequence, - * so we pre-log a few fetches in advance. In the event of - * crash we can lose (skip over) as many values as we pre-logged. + * Sequence replication is now supported and we will now need to log each sequence + * update to WAL such that the standby can properly receive the sequence change */ -#define SEQ_LOG_VALS 32 +#define SEQ_LOG_VALS 0 /* * The "special area" of a sequence's buffer page looks like this. diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3abf8..3dc14ead08 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,6 +42,7 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" +#include "commands/sequence.h" typedef struct XLogRecordBuffer { @@ -70,9 +71,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* * Take every XLogReadRecord()ed record and perform the actions required to @@ -130,6 +133,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeLogicalMsgOp(ctx, &buf); break; + case RM_SEQ_ID: + DecodeSequence(ctx, &buf); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's @@ -145,7 +152,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_HASH_ID: case RM_GIN_ID: case RM_GIST_ID: - case RM_SEQ_ID: case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: @@ -1052,3 +1058,80 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Decode Sequence Tuple + */ +static void +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) +{ + int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; + + Assert(datalen >= 0); + + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;; + + ItemPointerSetInvalid(&tuple->tuple.t_self); + + tuple->tuple.t_tableOid = InvalidOid; + + memcpy(((char *) tuple->tuple.t_data), + data + sizeof(xl_seq_rec), + SizeofHeapTupleHeader); + + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, + datalen); +} + +/* + * Handle sequence decode + */ +static void +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + ReorderBufferChange *change; + RelFileNode target_node; + XLogReaderState *r = buf->record; + char *tupledata = NULL; + Size tuplelen; + Size datalen = 0; + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + + /* only decode changes flagged with XLOG_SEQ_LOG */ + if (info != XLOG_SEQ_LOG) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_SEQUENCE; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.sequence.relnode, &target_node, sizeof(RelFileNode)); + + tupledata = XLogRecGetData(r); + datalen = XLogRecGetDataLen(r); + + if(!datalen || !tupledata) + return; + + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); + + change->data.sequence.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeSeqTuple(tupledata, datalen, change->data.sequence.newtuple); + + ReorderBufferXidSetCatalogChanges(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr); + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); + +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 481277a1fd..24f2cdf51d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -474,6 +474,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.newtuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.sequence.newtuple); + change->data.sequence.newtuple = NULL; + } + break; } pfree(change); @@ -1833,6 +1840,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode, + change->data.sequence.relnode.relNode); + + if (reloid == InvalidOid && + change->data.sequence.newtuple == NULL) + goto change_done; + else if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.tp.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.sequence.relnode, + MAIN_FORKNUM)); + + if (!RelationIsLogicallyLogged(relation)) + goto change_done; + + /* user-triggered change */ + if (!IsToastRelation(relation)) + { + ReorderBufferToastReplace(rb, txn, relation, change); + rb->apply_change(rb, txn, relation, change); + } + break; } } @@ -2516,15 +2555,23 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_SEQUENCE: { char *data; ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; Size newlen = 0; - - oldtup = change->data.tp.oldtuple; - newtup = change->data.tp.newtuple; + if (change->action == REORDER_BUFFER_CHANGE_SEQUENCE) + { + oldtup = NULL; + newtup = change->data.sequence.newtuple; + } + else + { + oldtup = change->data.tp.oldtuple; + newtup = change->data.tp.newtuple; + } if (oldtup) { @@ -2707,14 +2754,23 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_UPDATE: case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_SEQUENCE: { ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; Size newlen = 0; - oldtup = change->data.tp.oldtuple; - newtup = change->data.tp.newtuple; + if (change->action == REORDER_BUFFER_CHANGE_SEQUENCE) + { + oldtup = NULL; + newtup = change->data.sequence.newtuple; + } + else + { + oldtup = change->data.tp.oldtuple; + newtup = change->data.tp.newtuple; + } if (oldtup) { @@ -3048,6 +3104,32 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.newtuple) + { + /* here, data might not be suitably aligned! */ + uint32 tuplelen; + + memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), + sizeof(uint32)); + + change->data.sequence.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.sequence.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.sequence.newtuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.tp.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.sequence.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + break; } dlist_push_tail(&txn->changes, &change->node); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 626ecf4dc9..cf3fd45c5f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -62,7 +62,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE, }; /* forward declaration */ @@ -149,6 +150,15 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + /* + * Truncate data for REORDER_BUFFER_CHANGE_SEQUENCE representing one + * set of relations to be truncated. + */ + struct + { + RelFileNode relnode; + ReorderBufferTupleBuf *newtuple; + } sequence; } data; /*