Bug report and fix about building historic snapshot
Hello, I find a bug in building historic snapshot and the steps to reproduce are as follows:
Prepare:
(pub)create table t1 (id int primary key);
(pub)insert into t1 values (1);
(pub)create publication pub for table t1;
(sub)create table t1 (id int primary key);
Reproduce:
(pub)begin; insert into t1 values (2); (txn1 in session1)
(sub)create subscription sub connection 'hostaddr=127.0.0.1 port=5432 user=xxx dbname=postgres' publication pub; (pub will switch to BUILDING_SNAPSHOT state soon)
(pub)begin; insert into t1 values (3); (txn2 in session2)
(pub)create table t2 (id int primary key); (session3)
(pub)commit; (commit txn1, and pub will switch to FULL_SNAPSHOT state soon)
(pub)begin; insert into t2 values (1); (txn3 in session3)
(pub)commit; (commit txn2, and pub will switch to CONSISTENT state soon)
(pub)commit; (commit txn3, and replay txn3 will failed because its snapshot cannot see t2)
Reasons:
We currently don't track the transaction that begin after BUILDING_SNAPSHOT
and commit before FULL_SNAPSHOT when building historic snapshot in logical
decoding. This can cause a transaction which begin after FULL_SNAPSHOT to take
an incorrect historic snapshot because transactions committed in BUILDING_SNAPSHOT
state will not be processed by SnapBuildCommitTxn().
To fix it, we can track the transaction that begin after BUILDING_SNAPSHOT and
commit before FULL_SNAPSHOT forcely by using SnapBuildCommitTxn().
--
Regards,
ChangAo Chen
Attachments:
0001-Track-transactions-committed-in-BUILDING_SNAPSHOT.patchapplication/octet-stream; charset=ISO-8859-1; name=0001-Track-transactions-committed-in-BUILDING_SNAPSHOT.patchDownload
From a17061fb2fb64897e8aad13dc7f3ce34f2b2d815 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Sun, 21 Jan 2024 16:50:44 +0800
Subject: [PATCH] Track transactions committed in BUILDING_SNAPSHOT.
We previously didn't track the transaction that begin after BUILDING_SNAPSHOT
and commit before FULL_SNAPSHOT when building historic snapshot in logical
decoding. This can cause a transaction which begin after FULL_SNAPSHOT to take
an incorrect historic snapshot because transactions committed in BUILDING_SNAPSHOT
state will not be processed by SnapBuildCommitTxn().
To fix it, we track the transaction that begin after BUILDING_SNAPSHOT and
commit before FULL_SNAPSHOT forcely by using SnapBuildCommitTxn(). Note that
there is no need to track transactions that begin before BUILDING_SNAPSHOT
because they are all finished when we reach FULL_SNAPSHOT state.
---
src/backend/replication/logical/decode.c | 38 ++++++++++++++++++++-
src/backend/replication/logical/snapbuild.c | 8 +++--
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 6a0f22b209..89a3f23ec5 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -54,6 +54,8 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase);
+static void DecodeCommitWhenBuildingSnapshot(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_abort *parsed, TransactionId xid,
bool two_phase);
@@ -210,10 +212,17 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If the snapshot isn't yet fully built, we cannot decode anything, so
- * bail out.
+ * bail out. However, we need track some transactions committed in
+ * BUILDING_SNAPSHOT or we may have an incorrect historic snapshot
+ * when reaching FULL_SNAPSHOT state.
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ {
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED))
+ DecodeCommitWhenBuildingSnapshot(ctx, buf);
return;
+ }
switch (info)
{
@@ -746,6 +755,33 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
UpdateDecodingStats(ctx);
}
+/*
+ * Track the transaction that begin after BUILDING_SNAPSHOT and commit before
+ * FULL_SNAPSHOT. Should only be called when we are in BUILDING_SNAPSHOT state.
+ */
+static void
+DecodeCommitWhenBuildingSnapshot(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf)
+{
+ TransactionId xid;
+ xl_xact_commit *xlrec;
+ xl_xact_parsed_commit parsed;
+ XLogReaderState *r = buf->record;
+
+ /* The record must be XLOG_XACT_COMMIT or XLOG_XACT_COMMIT_PREPARED */
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(r), xlrec, &parsed);
+
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
+
+ SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
+ parsed.nsubxacts, parsed.subxacts,
+ parsed.xinfo);
+}
+
/*
* Decode PREPARE record. Similar logic as in DecodeCommit.
*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a0b7947d2f..e48557045f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1054,10 +1054,12 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
builder->start_decoding_at = lsn + 1;
/*
- * If building an exportable snapshot, force xid to be tracked, even
- * if the transaction didn't modify the catalog.
+ * If building an exportable snapshot or we are in BUILDING_SNAPSHOT
+ * state, force xid to be tracked, even if the transaction didn't
+ * modify the catalog.
*/
- if (builder->building_full_snapshot)
+ if (builder->building_full_snapshot ||
+ builder->state == SNAPBUILD_BUILDING_SNAPSHOT)
{
needs_timetravel = true;
}
--
2.34.1
This patch may be better, which only track catalog modified transactions.
--
Regards,
ChangAo Chen
------------------ Original ------------------
From: "cca5507" <cca5507@qq.com>;
Date: Sun, Jan 21, 2024 05:25 PM
To: "pgsql-hackers"<pgsql-hackers@lists.postgresql.org>;
Subject: Bug report and fix about building historic snapshot
Hello, I find a bug in building historic snapshot and the steps to reproduce are as follows:
Prepare:
(pub)create table t1 (id int primary key);
(pub)insert into t1 values (1);
(pub)create publication pub for table t1;
(sub)create table t1 (id int primary key);
Reproduce:
(pub)begin; insert into t1 values (2); (txn1 in session1)
(sub)create subscription sub connection 'hostaddr=127.0.0.1 port=5432 user=xxx dbname=postgres' publication pub; (pub will switch to BUILDING_SNAPSHOT state soon)
(pub)begin; insert into t1 values (3); (txn2 in session2)
(pub)create table t2 (id int primary key); (session3)
(pub)commit; (commit txn1, and pub will switch to FULL_SNAPSHOT state soon)
(pub)begin; insert into t2 values (1); (txn3 in session3)
(pub)commit; (commit txn2, and pub will switch to CONSISTENT state soon)
(pub)commit; (commit txn3, and replay txn3 will failed because its snapshot cannot see t2)
Reasons:
We currently don't track the transaction that begin after BUILDING_SNAPSHOT
and commit before FULL_SNAPSHOT when building historic snapshot in logical
decoding. This can cause a transaction which begin after FULL_SNAPSHOT to take
an incorrect historic snapshot because transactions committed in BUILDING_SNAPSHOT
state will not be processed by SnapBuildCommitTxn().
To fix it, we can track the transaction that begin after BUILDING_SNAPSHOT and
commit before FULL_SNAPSHOT forcely by using SnapBuildCommitTxn().
--
Regards,
ChangAo Chen
Attachments:
0001-Track-transactions-committed-in-BUILDING_SNAPSHOT.patchapplication/octet-stream; charset=ISO-8859-1; name=0001-Track-transactions-committed-in-BUILDING_SNAPSHOT.patchDownload
From 1a912000d9725cd23b5cdf7918bb6b7b3dac9a48 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Tue, 23 Jan 2024 21:10:26 +0800
Subject: [PATCH] Track transactions committed in BUILDING_SNAPSHOT.
We previously didn't track transactions that begin after BUILDING_SNAPSHOT
and commit before FULL_SNAPSHOT when building historic snapshot in logical
decoding. This can cause transactions that begin after FULL_SNAPSHOT to take
an incorrect snapshot because transactions committed in BUILDING_SNAPSHOT
are not in the snapshot.
To fix it, we track transactions that committed in BUILDING_SNAPSHOT and
add them to historic snapshot if they have catalog changes. Note that
there is no need to track transactions that begin before BUILDING_SNAPSHOT
because they are all finished when we reach FULL_SNAPSHOT.
---
src/backend/replication/logical/decode.c | 88 +++++++++++++++++++--
src/backend/replication/logical/snapbuild.c | 4 +
2 files changed, 85 insertions(+), 7 deletions(-)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 6a0f22b209..ffb2d55983 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -51,6 +51,8 @@ static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeXactWhenBuildingSnapshot(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid,
bool two_phase);
@@ -210,10 +212,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If the snapshot isn't yet fully built, we cannot decode anything, so
- * bail out.
+ * bail out. However, we need track some transactions committed in
+ * BUILDING_SNAPSHOT or we may have an incorrect historic snapshot
+ * when reaching FULL_SNAPSHOT.
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ {
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT)
+ DecodeXactWhenBuildingSnapshot(ctx, buf);
return;
+ }
switch (info)
{
@@ -413,10 +421,11 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
/*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding changes.
+ * Even though we don't have snapshot in BUILDING_SNAPSHOT, we need to
+ * handle XLOG_HEAP2_NEW_CID which mark a transaction has catalog changes.
+ * If we are just fast-forwarding, no need to handle anything.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT ||
ctx->fast_forward)
return;
@@ -472,10 +481,11 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
/*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding data changes.
+ * Even though we don't have snapshot in BUILDING_SNAPSHOT, we need to
+ * handle XLOG_HEAP_INPLACE which mark a transaction has catalog changes.
+ * If we are just fast-forwarding, no need to handle anything.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT ||
ctx->fast_forward)
return;
@@ -657,6 +667,70 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message->message + message->prefix_size);
}
+/*
+ * Handle rmgr XACT_ID records when we are in BUILDING_SNAPSHOT.
+ * Currently, we only handle XLOG_XACT_COMMIT(_PREPARED) and
+ * XLOG_XACT_INVALIDATIONS.
+ *
+ * XLOG_XACT_COMMIT(_PREPARED):
+ * we should add the transaction to historic snapshot if it
+ * has catalog changes and begins after BUILDING_SNAPSHOT.
+ * XLOG_XACT_INVALIDATIONS:
+ * we only track catalog modified transaction in historic
+ * snapshot, so we handle it which mark a transaction has
+ * catalog changes.
+ */
+static void
+DecodeXactWhenBuildingSnapshot(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
+
+ switch (info)
+ {
+ case XLOG_XACT_COMMIT:
+ case XLOG_XACT_COMMIT_PREPARED:
+ {
+ xl_xact_commit *xlrec;
+ xl_xact_parsed_commit parsed;
+ TransactionId xid;
+
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(r), xlrec, &parsed);
+
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
+
+ DecodeCommit(ctx, buf, &parsed, xid, false);
+ }
+ break;
+ case XLOG_XACT_ABORT:
+ case XLOG_XACT_ABORT_PREPARED:
+ case XLOG_XACT_ASSIGNMENT:
+ break;
+ case XLOG_XACT_INVALIDATIONS:
+ {
+ TransactionId xid;
+
+ xid = XLogRecGetXid(r);
+
+ if (TransactionIdIsValid(xid))
+ {
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
+ buf->origptr);
+ }
+ }
+ break;
+ case XLOG_XACT_PREPARE:
+ break;
+ default:
+ elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
+ }
+}
+
/*
* Consolidated commit record handling between the different form of commit
* records.
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a0b7947d2f..f47bddcecf 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -827,6 +827,10 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
*/
ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
+ /* we just call ReorderBufferXidSetCatalogChanges() in BUILDING_SNAPSHOT */
+ if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
+ return;
+
ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
xlrec->target_locator, xlrec->target_tid,
xlrec->cmin, xlrec->cmax,
--
2.34.1