Question about initial logical decoding snapshot
Hi hackers.
I'm studying the source code about creation of initial logical decoding snapshot. What confused me is that why must we process 3 xl_running_xacts before we get to the consistent state. I think we only need 2 xl_running_xacts.
I think we can get to consistent state when we meet the 2nd xl_running_xact with its oldestRunningXid > 1st xl_running_xact's nextXid, this means the active transactions in 1st xl_running_xact all had commited, and we have all the logs of transactions who will commit afterwards, so there is consistent state in this time point and we can export a snapshot.
I had read the discussion in [0]/messages/by-id/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com and the comment of commit '955a684', but I haven't got a detailed explanation about why we need 4 stages during creation of initial logical decoding snapshot but not 3 stages.
My rencent job is relevant to logical decoding so I want to figure this problem out, I'm very grateful if you can answer me, thanks.
[0]: /messages/by-id/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com
--
Best regards
Chong Wang
Greenplum DataFlow team
On Fri, Dec 30, 2022 at 11:57 PM Chong Wang <chongwa@vmware.com> wrote:
I'm studying the source code about creation of initial logical decoding snapshot. What confused me is that why must we process 3 xl_running_xacts before we get to the consistent state. I think we only need 2 xl_running_xacts.
I think we can get to consistent state when we meet the 2nd xl_running_xact with its oldestRunningXid > 1st xl_running_xact's nextXid, this means the active transactions in 1st xl_running_xact all had commited, and we have all the logs of transactions who will commit afterwards, so there is consistent state in this time point and we can export a snapshot.
Yeah, we will have logs for all transactions in such a case but I
think we won't have a valid snapshot by that time. Consider a case
that there are two transactions 723,724 in the 2nd xl_running_xact
record for which we have waited to finish and then consider that point
as a consistent point and exported that snapshot. It is quite possible
that by that time the commit record of one or more of those xacts (say
724) wouldn't have been encountered by decoding process and that means
it won't be recorded in the xip list of the snapshot (we do that in
DecodeCommit->SnapBuildCommitTxn). So, during export in function
SnapBuildInitialSnapshot(), we will consider 723 as committed and 724
as running. This could not lead to inconsistent data on the client
side that imports such a snapshot and use it for copy and further
replicating the other xacts.
OTOH, currently, before marking snapshot state as consistent we wait
for these xacts to finish and for another xl_running_xact where
oldestRunningXid >= builder->next_phase_at to appear which means the
commit for both 723 and 724 would have appeared in the snapshot.
Does that makes sense to you or am, I missing something here?
--
With Regards,
Amit Kapila.
On Tue, Jan 3, 2023 at 4:44 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Dec 30, 2022 at 11:57 PM Chong Wang <chongwa@vmware.com> wrote:
I'm studying the source code about creation of initial logical decoding snapshot. What confused me is that why must we process 3 xl_running_xacts before we get to the consistent state. I think we only need 2 xl_running_xacts.
I think we can get to consistent state when we meet the 2nd xl_running_xact with its oldestRunningXid > 1st xl_running_xact's nextXid, this means the active transactions in 1st xl_running_xact all had commited, and we have all the logs of transactions who will commit afterwards, so there is consistent state in this time point and we can export a snapshot.
Yeah, we will have logs for all transactions in such a case but I
think we won't have a valid snapshot by that time. Consider a case
that there are two transactions 723,724 in the 2nd xl_running_xact
record for which we have waited to finish and then consider that point
as a consistent point and exported that snapshot. It is quite possible
that by that time the commit record of one or more of those xacts (say
724) wouldn't have been encountered by decoding process and that means
it won't be recorded in the xip list of the snapshot (we do that in
DecodeCommit->SnapBuildCommitTxn). So, during export in function
SnapBuildInitialSnapshot(), we will consider 723 as committed and 724
as running. This could not lead to inconsistent data on the client
side that imports such a snapshot and use it for copy and further
replicating the other xacts.OTOH, currently, before marking snapshot state as consistent we wait
for these xacts to finish and for another xl_running_xact where
oldestRunningXid >= builder->next_phase_at to appear which means the
commit for both 723 and 724 would have appeared in the snapshot.Does that makes sense to you or am, I missing something here?
You can also refer to the discussion in the thread [1]/messages/by-id/c94be044-818f-15e3-1ad3-7a7ae2dfed0a@iki.fi which is
related to your question.
[1]: /messages/by-id/c94be044-818f-15e3-1ad3-7a7ae2dfed0a@iki.fi
--
With Regards,
Amit Kapila.
Hello,
I was curious as to why we need 3rd running_xact and wanted to learn
more about it, so I have made a few changes to come up with a patch
which builds the snapshot in 2 running_xacts. The motive is to run the
tests to see the failures/issues with this approach to understand the
need of reading 3rd running_xact to build a consistent snapshot. On
this patch, I have got one test-failure which is
test_decoding/twophase_snapshot.
Approach:
When we start building a snapshot, on the occurrence of first
running_xact, move the state from START to BUILDING and wait for all
in-progress transactions to finish. On the second running_xact where
we find oldestRunningXid >= 1st xl_running_xact's nextXid, move to
CONSISTENT state. So, it means all the transactions started before
BUILDING state are now finished and all the new transactions that are
currently in progress are the ones that are started after BUILDING
state and thus have enough info to be decoded.
Failure analysis for twophase_snapshot test:
After the patch application, test-case fails because slot is created
sooner and 'PREPARE TRANSACTION test1' is available as result of first
'pg_logical_slot_get_changes' itself. Intent of this testcase is to
see how two-phase txn is handled when snapshot-build completes in 3
stages (BUILDING-->FULL-->CONSISTENT). Originally, the PREPARED txn is
started between FULL and CONSISTENT stage and thus as per the current
code logic, 'DecodePrepare' will skip it. Please see code in
DecodePrepare:
/* We can't start streaming unless a consistent state is reached. */
if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
{
ReorderBufferSkipPrepare(ctx->reorder, xid);
return;
}
So first 'pg_logical_slot_get_changes' will not show these changes.
Once we do 'commit prepared' after CONSISTENT state is reached, it
will be available for next 'pg_logical_slot_get_changes' to consume.
On the other hand, after the current patch, since we reach consistent
state sooner, so with the same test-case, PREPARED transaction now
ends up starting after CONSISTENT state and thus will be available to
be consumed by first 'pg_logical_slot_get_changes' itself. This makes
the testcase to fail.
Please note that in the patch, I have maintained 'WAIT for all running
transactions to end' even after reaching CONSISTENT state. I have
tried running tests even after removing that WAIT after CONSISTENT,
with that, we get one more test failure which is
test_decoding/ondisk_startup. The reason for failure here is the same
as previous case i.e., since we reach CONSISTENT state earlier,
slot-creation finishes faster and thus we see slight change in result
for this test. ('step s1init completed' seen earlier in log file).
Both the failing tests here are written in such a way that they align
with the 3-phase snapshot build process. Otherwise, I do not see any
logical issues yet with this approach based on the test-cases
available so far.
So, I still have not gotten clarity on why we need 3rd running_xact
here. In code, I see a comment in SnapBuildFindSnapshot() which says
"c) ...But for older running transactions no viable snapshot exists
yet, so CONSISTENT will only be reached once all of those have
finished." This comment refers to txns started between BUILDING and
FULL state. I do not understand it fully. I am not sure what tests I
need to run on the patch to reproduce this issue where we do not have
a viable snapshot when we go by two running_xacts only.
Any thoughts/comments are most welcome. Attached the patch for review.
Thanks
Shveta
Show quoted text
On Fri, Dec 30, 2022 at 11:57 PM Chong Wang <chongwa@vmware.com> wrote:
Hi hackers.
I'm studying the source code about creation of initial logical decoding snapshot. What confused me is that why must we process 3 xl_running_xacts before we get to the consistent state. I think we only need 2 xl_running_xacts.
I think we can get to consistent state when we meet the 2nd xl_running_xact with its oldestRunningXid > 1st xl_running_xact's nextXid, this means the active transactions in 1st xl_running_xact all had commited, and we have all the logs of transactions who will commit afterwards, so there is consistent state in this time point and we can export a snapshot.
I had read the discussion in [0] and the comment of commit '955a684', but I haven't got a detailed explanation about why we need 4 stages during creation of initial logical decoding snapshot but not 3 stages.
My rencent job is relevant to logical decoding so I want to figure this problem out, I'm very grateful if you can answer me, thanks.
[0] /messages/by-id/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com
--
Best regards
Chong Wang
Greenplum DataFlow team
Attachments:
v1-0001-SnapBuild-in-2-stages.patchapplication/octet-stream; name=v1-0001-SnapBuild-in-2-stages.patchDownload
From 67085b4e79a21beb941b232390efe9d0903f3cfa Mon Sep 17 00:00:00 2001
From: "shveta.malik" <shveta.malik@gmail.com>
Date: Wed, 18 Jan 2023 16:29:13 +0530
Subject: [PATCH v1] SnapBuild in 2 stages
---
src/backend/replication/logical/decode.c | 96 +++++++++++++++++----
src/backend/replication/logical/snapbuild.c | 79 ++++++-----------
src/include/replication/snapbuild.h | 1 +
3 files changed, 105 insertions(+), 71 deletions(-)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index a53e23c679..edc3b2516d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -179,13 +179,6 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
- /*
- * If the snapshot isn't yet fully built, we cannot decode anything, so
- * bail out.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
- return;
-
switch (info)
{
case XLOG_XACT_COMMIT:
@@ -204,6 +197,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
+ /*
+ * If the snapshot building is not yet started or we have a
+ * txn for which we do not have enough info, there is no point
+ * in decoding changes yet, so bail out.
+ */
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
+ return;
+
/*
* We would like to process the transaction in a two-phase
* manner iff output plugin supports two-phase commits and
@@ -232,6 +235,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
+ /*
+ * If the snapshot building is not yet started or we have a
+ * txn for which we do not have enough info, there is no point
+ * in decoding changes yet, so bail out.
+ */
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
+ return;
+
/*
* We would like to process the transaction in a two-phase
* manner iff output plugin supports two-phase commits and
@@ -260,6 +273,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xid = XLogRecGetXid(r);
invals = (xl_xact_invals *) XLogRecGetData(r);
+ /*
+ * If the snapshot building is not yet started or we have a
+ * txn for which we do not have enough info, there is no point
+ * in decoding changes yet, so bail out.
+ */
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
+ return;
+
/*
* Execute the invalidations for xid-less transactions,
* otherwise, accumulate them so that they can be processed at
@@ -291,6 +314,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ParsePrepareRecord(XLogRecGetInfo(buf->record),
xlrec, &parsed);
+ /*
+ * If the snapshot building is not yet started or we have a
+ * txn for which we do not have enough info, there is no point
+ * in decoding changes yet, so bail out.
+ */
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(parsed.twophase_xid, SnapBuildNextPhaseAt(builder))))
+ return;
+
/*
* We would like to process the transaction in a two-phase
* manner iff output plugin supports two-phase commits and
@@ -384,11 +417,19 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
/*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding changes.
+ * if we are just fast-forwarding, there is no point in decoding changes.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (ctx->fast_forward)
+ return;
+
+ /*
+ * If the snapshot building is not yet started or we have a txn for which
+ * we do not have enough info, there is no point in decoding changes yet,
+ * so bail out.
+ */
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
return;
switch (info)
@@ -444,11 +485,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
/*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding data changes.
+ * If we are just fast-forwarding, there is no point in decoding data
+ * changes.
+ */
+ if (ctx->fast_forward)
+ return;
+
+ /*
+ * If the snapshot building is not yet started or we have a txn for which
+ * we do not have enough info, there is no point in decoding changes yet,
+ * so bail out.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
return;
switch (info)
@@ -573,11 +623,19 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
/*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
+ * If we are just fast-forwarding, there is no point in decoding messages.
+ */
+ if (ctx->fast_forward)
+ return;
+
+ /*
+ * If the snapshot building is not yet started or we have a txn for which
+ * we do not have enough info, there is no point in decoding changes yet,
+ * so bail out.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) == SNAPBUILD_START ||
+ (SnapBuildCurrentState(builder) == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
return;
message = (xl_logical_message *) XLogRecGetData(r);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 829c568112..d16d6a522c 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -404,6 +404,15 @@ SnapBuildCurrentState(SnapBuild *builder)
return builder->state;
}
+/*
+ * Txn Id at which next phase of snapshot building will happen
+ */
+TransactionId
+SnapBuildNextPhaseAt(SnapBuild *builder)
+{
+ return builder->next_phase_at;
+}
+
/*
* Return the LSN at which the two-phase decoding was first enabled.
*/
@@ -487,8 +496,6 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
Snapshot snapshot;
Size ssize;
- Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
-
ssize = sizeof(SnapshotData)
+ sizeof(TransactionId) * builder->committed.xcnt
+ sizeof(TransactionId) * 1 /* toplevel xid */ ;
@@ -763,20 +770,15 @@ SnapBuildResetExportedSnapshotState(void)
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- /*
- * We can't handle data in transactions if we haven't built a snapshot
- * yet, so don't store them.
- */
- if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
- return false;
/*
- * No point in keeping track of changes in transactions that we don't have
- * enough information about to decode. This means that they started before
- * we got into the SNAPBUILD_FULL_SNAPSHOT state.
+ * If the snapshot building is not yet started or we have a txn for which
+ * we do not have enough info, there is no point in decoding changes yet,
+ * so bail out.
*/
- if (builder->state < SNAPBUILD_CONSISTENT &&
- TransactionIdPrecedes(xid, builder->next_phase_at))
+ if (builder->state == SNAPBUILD_START ||
+ (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
+ TransactionIdPrecedes(xid, builder->next_phase_at)))
return false;
/*
@@ -1142,13 +1144,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/* if there's any reason to build a historic snapshot, do so now */
if (needs_snapshot)
{
- /*
- * If we haven't built a complete snapshot yet there's no need to hand
- * it out, it wouldn't (and couldn't) be used anyway.
- */
- if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
- return;
-
/*
* Decrease the snapshot builder's refcount of the old snapshot, note
* that it still will be used if it has been handed out to the
@@ -1331,15 +1326,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* c) First incrementally build a snapshot for catalog tuples
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
* transactions to finish. Every transaction starting after that
- * (FULL_SNAPSHOT state), has enough information to be decoded. But
+ * (BUILDING_SNAPSHOT), has enough information to be decoded. But
* for older running transactions no viable snapshot exists yet, so
- * CONSISTENT will only be reached once all of those have finished.
+ * CONSISTENT will only be reached once all of those (started before
+ * BUILDING_SNAPSHOT) have finished.
* ---
*/
/*
- * xl_running_xacts record is older than what we can use, we might not have
- * all necessary catalog rows anymore.
+ * xl_running_xacts record is older than what we can use, we might not
+ * have all necessary catalog rows anymore.
*/
if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
NormalTransactionIdPrecedes(running->oldestRunningXid,
@@ -1438,22 +1434,22 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
}
/*
- * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
+ * c) transition from BUILDING_SNAPSHOT to SNAPBUILD_CONSISTENT.
*
* In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
- * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
- * means all transactions starting afterwards have enough information to
- * be decoded. Switch to FULL_SNAPSHOT.
+ * is >= than nextXid from when we switched to SNAPBUILD_CONSISTENT. This
+ * means all transactions that are currently in progress have enough
+ * information to be decoded. Switch to SNAPBUILD_CONSISTENT.
*/
else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
TransactionIdPrecedesOrEquals(builder->next_phase_at,
running->oldestRunningXid))
{
- builder->state = SNAPBUILD_FULL_SNAPSHOT;
- builder->next_phase_at = running->nextXid;
+ builder->state = SNAPBUILD_CONSISTENT;
+ builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found initial consistent point at %X/%X",
+ (errmsg("logical decoding found consistent point at %X/%X",
LSN_FORMAT_ARGS(lsn)),
errdetail("Waiting for transactions (approximately %d) older than %u to end.",
running->xcnt, running->nextXid)));
@@ -1461,27 +1457,6 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
SnapBuildWaitSnapshot(running, running->nextXid);
}
- /*
- * c) transition from FULL_SNAPSHOT to CONSISTENT.
- *
- * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
- * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
- * transactions that are currently in progress have a catalog snapshot,
- * and all their changes have been collected. Switch to CONSISTENT.
- */
- else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
- TransactionIdPrecedesOrEquals(builder->next_phase_at,
- running->oldestRunningXid))
- {
- builder->state = SNAPBUILD_CONSISTENT;
- builder->next_phase_at = InvalidTransactionId;
-
- ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no old transactions anymore.")));
- }
-
/*
* We already started to track running xacts and need to wait for all
* in-progress ones to finish. We fall through to the normal processing of
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index f49b941b53..071fca6d3b 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -73,6 +73,7 @@ extern void SnapBuildClearExportedSnapshot(void);
extern void SnapBuildResetExportedSnapshotState(void);
extern SnapBuildState SnapBuildCurrentState(SnapBuild *builder);
+extern TransactionId SnapBuildNextPhaseAt(SnapBuild *builder);
extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder);
extern bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr);
--
2.34.1