diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out index 46e915d..af81c47 100644 --- a/contrib/test_decoding/expected/prepared.out +++ b/contrib/test_decoding/expected/prepared.out @@ -25,6 +25,7 @@ BEGIN; INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); +LOCK test_prepared1; PREPARE TRANSACTION 'test_prepared#3'; -- test that we decode correctly while an uncommitted prepared xact -- with ddl exists. @@ -44,27 +45,33 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc ------------------------------------------------------------------------- BEGIN table public.test_prepared1: INSERT: id[integer]:1 - COMMIT + PREPARE + COMMIT PREPARED BEGIN table public.test_prepared1: INSERT: id[integer]:2 COMMIT BEGIN - table public.test_prepared1: INSERT: id[integer]:4 - COMMIT + table public.test_prepared1: INSERT: id[integer]:3 + PREPARE + ABORT PREPARED BEGIN - table public.test_prepared2: INSERT: id[integer]:7 + table public.test_prepared1: INSERT: id[integer]:4 COMMIT BEGIN table public.test_prepared1: INSERT: id[integer]:5 table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar' + PREPARE + BEGIN + table public.test_prepared2: INSERT: id[integer]:7 COMMIT + COMMIT PREPARED BEGIN table public.test_prepared1: INSERT: id[integer]:8 data[text]:null COMMIT BEGIN table public.test_prepared2: INSERT: id[integer]:9 COMMIT -(22 rows) +(28 rows) SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql index e726397..ac76b8c 100644 --- a/contrib/test_decoding/sql/prepared.sql +++ b/contrib/test_decoding/sql/prepared.sql @@ -25,6 +25,7 @@ BEGIN; INSERT INTO test_prepared1 VALUES (5); ALTER TABLE test_prepared1 ADD COLUMN data text; INSERT INTO test_prepared1 VALUES (6, 'frakbar'); +LOCK test_prepared1; PREPARE TRANSACTION 'test_prepared#3'; -- test that we decode correctly while an uncommitted prepared xact diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 949e9a7..53ced57 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -232,10 +232,25 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, return; OutputPluginPrepareWrite(ctx, true); + + switch(txn->xact_action) + { + case XLOG_XACT_COMMIT: + appendStringInfoString(ctx->out, "COMMIT"); + break; + case XLOG_XACT_PREPARE: + appendStringInfoString(ctx->out, "PREPARE"); + break; + case XLOG_XACT_COMMIT_PREPARED: + appendStringInfoString(ctx->out, "COMMIT PREPARED"); + break; + case XLOG_XACT_ABORT_PREPARED: + appendStringInfoString(ctx->out, "ABORT PREPARED"); + break; + } + if (data->include_xids) - appendStringInfo(ctx->out, "COMMIT %u", txn->xid); - else - appendStringInfoString(ctx->out, "COMMIT"); + appendStringInfo(ctx->out, " %u", txn->xid); if (data->include_timestamp) appendStringInfo(ctx->out, " (at %s)", diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 91d27d0..679f457 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -98,10 +98,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE) { xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data; + uint8 gidlen = xl_twophase->gidlen; parsed->twophase_xid = xl_twophase->xid; + data += MinSizeOfXactTwophase; - data += sizeof(xl_xact_twophase); + strcpy(parsed->twophase_gid, data); + data += gidlen; } if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -139,6 +142,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) data += sizeof(xl_xact_xinfo); } + if (parsed->xinfo & XACT_XINFO_HAS_DBINFO) + { + xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data; + + parsed->dbId = xl_dbinfo->dbId; + parsed->tsId = xl_dbinfo->tsId; + + data += sizeof(xl_xact_dbinfo); + } + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) { xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; @@ -164,10 +177,13 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE) { xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data; + uint8 gidlen = xl_twophase->gidlen; parsed->twophase_xid = xl_twophase->xid; + data += MinSizeOfXactTwophase; - data += sizeof(xl_xact_twophase); + strcpy(parsed->twophase_gid, data); + data += gidlen; } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 5415604..964bcaf 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -130,7 +130,6 @@ int max_prepared_xacts = 0; * Note that the max value of GIDSIZE must fit in the uint16 gidlen, * specified in TwoPhaseFileHeader. */ -#define GIDSIZE 200 typedef struct GlobalTransactionData { @@ -188,12 +187,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval); + bool initfileinval, + const char *gid); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); @@ -1236,6 +1237,41 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } +/* + * ParsePrepareRecord + */ +void +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) +{ + TwoPhaseFileHeader *hdr; + char *bufptr; + + hdr = (TwoPhaseFileHeader *) xlrec; + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader)); + + parsed->twophase_xid = hdr->xid; + parsed->dbId = hdr->database; + parsed->nsubxacts = hdr->nsubxacts; + parsed->ncommitrels = hdr->ncommitrels; + parsed->nabortrels = hdr->nabortrels; + parsed->nmsgs = hdr->ninvalmsgs; + + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen); + bufptr += MAXALIGN(hdr->gidlen); + + parsed->subxacts = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + + parsed->commitrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + + parsed->abortrels = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + parsed->msgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); +} + /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1389,11 +1425,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, - hdr->initfileinval); + hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, - hdr->nabortrels, abortrels); + hdr->nabortrels, abortrels, + gid); ProcArrayRemove(proc, latestXid); @@ -2038,7 +2075,8 @@ RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval) + bool initfileinval, + const char *gid) { XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); @@ -2061,7 +2099,7 @@ RecordTransactionCommitPrepared(TransactionId xid, nchildren, children, nrels, rels, ninvalmsgs, invalmsgs, initfileinval, false, - xid); + xid, gid); if (replorigin) @@ -2123,7 +2161,8 @@ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + const char *gid) { XLogRecPtr recptr; @@ -2141,7 +2180,7 @@ RecordTransactionAbortPrepared(TransactionId xid, recptr = XactLogAbortRecord(GetCurrentTimestamp(), nchildren, children, nrels, rels, - xid); + xid, gid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index e47fd44..1081f8c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1230,7 +1230,7 @@ RecordTransactionCommit(void) nchildren, children, nrels, rels, nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, - InvalidTransactionId /* plain commit */ ); + InvalidTransactionId, NULL /* plain commit */ ); if (replorigin) /* Move LSNs forward for this replication origin */ @@ -1582,7 +1582,7 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, - InvalidTransactionId); + InvalidTransactionId, NULL); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -3467,7 +3467,7 @@ BeginTransactionBlock(void) * resource owner, etc while executing inside a Portal. */ bool -PrepareTransactionBlock(char *gid) +PrepareTransactionBlock(const char *gid) { TransactionState s; bool result; @@ -5106,7 +5106,7 @@ XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - TransactionId twophase_xid) + TransactionId twophase_xid, const char *twophase_gid) { xl_xact_commit xlrec; xl_xact_xinfo xl_xinfo; @@ -5178,6 +5178,7 @@ XactLogCommitRecord(TimestampTz commit_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */ } /* dump transaction origin information */ @@ -5228,7 +5229,10 @@ XactLogCommitRecord(TimestampTz commit_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) - XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + { + XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase); + XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen); + } if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); @@ -5249,13 +5253,14 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - TransactionId twophase_xid) + TransactionId twophase_xid, const char *twophase_gid) { xl_xact_abort xlrec; xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; xl_xact_twophase xl_twophase; + xl_xact_dbinfo xl_dbinfo; uint8 info; @@ -5290,6 +5295,14 @@ XactLogAbortRecord(TimestampTz abort_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */ + } + + if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; } if (xl_xinfo.xinfo != 0) @@ -5304,6 +5317,9 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) { XLogRegisterData((char *) (&xl_subxacts), @@ -5321,7 +5337,13 @@ XactLogAbortRecord(TimestampTz abort_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) - XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + { + XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase); + XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); return XLogInsert(RM_XACT_ID, info); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 46cd5ba..c15c2ed 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -34,6 +34,7 @@ #include "access/xlogutils.h" #include "access/xlogreader.h" #include "access/xlogrecord.h" +#include "access/twophase.h" #include "catalog/pg_control.h" @@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid); + xl_xact_parsed_abort *parsed, TransactionId xid); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); @@ -221,6 +224,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; + reorder->xact_action = info; + switch (info) { case XLOG_XACT_COMMIT: @@ -277,17 +282,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_PREPARE: - - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + { + xl_xact_parsed_prepare parsed; + ParsePrepareRecord(XLogRecGetInfo(buf->record), + XLogRecGetData(buf->record), &parsed); + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -607,6 +608,67 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + if (TransactionIdIsValid(parsed->twophase_xid)) { + /* + * We are processing COMMIT PREPARED and know that reorder buffer is + * empty. So we can skip use shortcut for coomiting bare xact. + */ + strcpy(ctx->reorder->gid, parsed->twophase_gid); + ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } else { + /* replay actions of all transaction + subtransactions in order */ + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } +} + +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz commit_time = 0; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + strcpy(ctx->reorder->gid, parsed->twophase_gid); + + /* + * Process invalidation messages, even if we're not interested in the + * transaction's contents, since the various caches need to always be + * consistent. + */ + if (parsed->nmsgs > 0) + { + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + + SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, + parsed->nsubxacts, parsed->subxacts); + + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + FilterByOrigin(ctx, origin_id)) + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr); + } + ReorderBufferForget(ctx->reorder, xid, buf->origptr); + + return; + } + + /* tell the reorderbuffer about the surviving subtransactions */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, commit_time, origin_id, origin_lsn); @@ -621,6 +683,22 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + XLogRecPtr commit_time = InvalidXLogRecPtr; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + + /* + * If that is ROLLBACK PREPARED than send that to callbacks. + */ + if (TransactionIdIsValid(parsed->twophase_xid) + && (parsed->dbId == ctx->slot->data.database)) { + + strcpy(ctx->reorder->gid, parsed->twophase_gid); + + ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + return; + } SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid, parsed->nsubxacts, parsed->subxacts); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fa84bd8..23176c6 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1373,6 +1373,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, txn->commit_time = commit_time; txn->origin_id = origin_id; txn->origin_lsn = origin_lsn; + txn->xact_action = rb->xact_action; + memcpy(txn->gid, rb->gid, GIDSIZE); /* * If this transaction didn't have any real changes in our database, it's @@ -1708,6 +1710,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, PG_END_TRY(); } + +/* + * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED. + */ +void +ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, + true); + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + txn->xact_action = rb->xact_action; + strcpy(txn->gid, rb->gid); + + rb->commit(rb, txn, commit_lsn); +} + /* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index b7ce0c6..1b8e7a0 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); +extern void ParsePrepareRecord(uint8 info, char *xlrec, + xl_xact_parsed_prepare *parsed); extern void StandbyRecoverPreparedTransactions(bool overwriteOK); extern void RecoverPreparedTransactions(void); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index a123d2a..eb052f9 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -21,6 +21,10 @@ #include "storage/sinval.h" #include "utils/datetime.h" +/* + * Maximum size of Global Transaction ID. + */ +#define GIDSIZE 200 /* * Xact isolation levels @@ -224,7 +228,10 @@ typedef struct xl_xact_invals typedef struct xl_xact_twophase { TransactionId xid; + uint8 gidlen; + char gid[GIDSIZE]; } xl_xact_twophase; +#define MinSizeOfXactTwophase offsetof(xl_xact_twophase, gid) typedef struct xl_xact_origin { @@ -283,13 +290,37 @@ typedef struct xl_xact_parsed_commit SharedInvalidationMessage *msgs; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; XLogRecPtr origin_lsn; TimestampTz origin_timestamp; } xl_xact_parsed_commit; +typedef struct xl_xact_parsed_prepare +{ + Oid dbId; /* MyDatabaseId */ + + int nsubxacts; + TransactionId *subxacts; + + int ncommitrels; + RelFileNode *commitrels; + + int nabortrels; + RelFileNode *abortrels; + + int nmsgs; + SharedInvalidationMessage *msgs; + + TransactionId twophase_xid; + char twophase_gid[GIDSIZE]; +} xl_xact_parsed_prepare; + typedef struct xl_xact_parsed_abort { + Oid dbId; + Oid tsId; + TimestampTz xact_time; uint32 xinfo; @@ -300,6 +331,7 @@ typedef struct xl_xact_parsed_abort RelFileNode *xnodes; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; } xl_xact_parsed_abort; @@ -331,7 +363,7 @@ extern void CommitTransactionCommand(void); extern void AbortCurrentTransaction(void); extern void BeginTransactionBlock(void); extern bool EndTransactionBlock(void); -extern bool PrepareTransactionBlock(char *gid); +extern bool PrepareTransactionBlock(const char *gid); extern void UserAbortTransactionBlock(void); extern void ReleaseSavepoint(List *options); extern void DefineSavepoint(char *name); @@ -364,12 +396,12 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - TransactionId twophase_xid); + TransactionId twophase_xid, const char *twophase_gid); extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - TransactionId twophase_xid); + TransactionId twophase_xid, const char *twophase_gid); extern void xact_redo(XLogReaderState *record); /* xactdesc.c */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 9e209ae..13a2195 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "access/twophase.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -144,6 +145,14 @@ typedef struct ReorderBufferTXN */ TransactionId xid; + /* + * Commit callback is used for COMMIT/PREPARE/COMMMIT PREPARED, + * as well as abort for ROLLBACK and ROLLBACK PREPARED. Here + * stored actual xact action allowing decoding plugin to distinguish them. + */ + uint8 xact_action; + char gid[GIDSIZE]; + /* did the TX have catalog changes */ bool has_catalog_changes; @@ -299,6 +308,10 @@ struct ReorderBuffer */ HTAB *by_txn; + /* For twophase tx support we need to pass XACT action to ReorderBufferTXN */ + uint8 xact_action; + char gid[GIDSIZE]; + /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. @@ -375,6 +388,10 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn);