From 625b98a4030281eae7a1e23be973849200fe5c91 Mon Sep 17 00:00:00 2001 From: ChangAo Chen Date: Thu, 8 Aug 2024 12:21:07 +0800 Subject: [PATCH v2 1/2] Track transactions committed in BUILDING_SNAPSHOT. Transactions committed in BUILDING_SNAPSHOT are useful for build historic snapshot, but we didn't track them previously. This results in a transaction taking an incorrect snapshot and logical decoding being interrupted. So we must track these transactions. --- src/backend/replication/logical/decode.c | 30 +++++++++++++-------- src/backend/replication/logical/snapbuild.c | 7 +++++ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d687ceee33..267e886ca3 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -206,10 +206,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; /* - * If the snapshot isn't yet fully built, we cannot decode anything, so - * bail out. + * Before BUILDING_SNAPSHOT, the xlog is useless, so bail out. Note + * that the xlog in BUILDING_SNAPSHOT is only useful for build the + * snapshot and will not be decoded. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT) return; switch (info) @@ -282,9 +283,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { TransactionId xid; xl_xact_invals *invals; + bool has_snapshot; xid = XLogRecGetXid(r); invals = (xl_xact_invals *) XLogRecGetData(r); + has_snapshot = SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT; /* * Execute the invalidations for xid-less transactions, @@ -293,7 +296,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (TransactionIdIsValid(xid)) { - if (!ctx->fast_forward) + if (!ctx->fast_forward && has_snapshot) ReorderBufferAddInvalidations(reorder, xid, buf->origptr, invals->nmsgs, @@ -301,7 +304,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } - else if (!ctx->fast_forward) + else if (!ctx->fast_forward && has_snapshot) ReorderBufferImmediateInvalidation(ctx->reorder, invals->nmsgs, invals->msgs); @@ -411,10 +414,12 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * Before BUILDING_SNAPSHOT or we are just fast-forwarding, there is no + * point in decoding changes. Note that we only handle XLOG_HEAP2_NEW_CID + * which mark a transaction as catalog modifying in BUILDING_SNAPSHOT, it's + * useful for build the snapshot. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT || ctx->fast_forward) return; @@ -470,10 +475,12 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * Before BUILDING_SNAPSHOT or we are just fast-forwarding, there is no + * point in decoding changes. Note that we only handle XLOG_HEAP_INPLACE + * which mark a transaction as catalog modifying in BUILDING_SNAPSHOT, it's + * useful for build the snapshot. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT || ctx->fast_forward) return; @@ -1301,6 +1308,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, RepOriginId origin_id) { if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT || (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || FilterByOrigin(ctx, origin_id)) return true; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index ae676145e6..5443b6dce8 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -836,6 +836,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, */ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + /* + * If we don't have snapshot, the transaction won't be + * replayed, just return here. + */ + if (builder->state < SNAPBUILD_FULL_SNAPSHOT) + return; + ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, xlrec->target_locator, xlrec->target_tid, xlrec->cmin, xlrec->cmax, -- 2.34.1