Found issues related with logical replication and 2PC

Started by Hayato Kuroda (Fujitsu)over 1 year ago14 messages
#1Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
3 attachment(s)

Hi hackers,

While creating a patch which allows ALTER SUBSCRIPTION SET (two_phase) [1]/messages/by-id/8fab8-65d74c80-1-2f28e880@39088166,
we found some issues related with logical replication and two_phase. I think this
can happen not only HEAD but PG14+, but for now I shared patches for HEAD.

Issue #1

When handling a PREPARE message, the subscriber mistook the wrong lsn position
(the end position of the last commit) as the end position of the current prepare.
This can be fixed by adding a new global variable to record the end position of
the last prepare. 0001 patch fixes the issue.

Issue #2

When the subscriber enables two-phase commit but doesn't set max_prepared_transaction >0
and a transaction is prepared on the publisher, the apply worker reports an ERROR
on the subscriber. After that, the prepared transaction is not replayed, which
means it's lost forever. Attached script can emulate the situation.

--
ERROR: prepared transactions are disabled
HINT: Set "max_prepared_transactions" to a nonzero value.
--

The reason is that we advanced the origin progress when aborting the
transaction as well (RecordTransactionAbort->replorigin_session_advance). So,
after setting replorigin_session_origin_lsn, if any ERROR happens when preparing
the transaction, the transaction aborts which incorrectly advances the origin lsn.

An easiest fix is to reset session replication origin before calling the
RecordTransactionAbort(). I think this can happen when 1) LogicalRepApplyLoop()
raises an ERROR or 2) apply worker exits. 0002 patch fixes the issue.

How do you think?

[1]: /messages/by-id/8fab8-65d74c80-1-2f28e880@39088166

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

test_2pc.shapplication/octet-stream; name=test_2pc.shDownload
0001-Add-XactLastPrepareEnd-to-indicate-the-last-PREPARE-.patchapplication/octet-stream; name=0001-Add-XactLastPrepareEnd-to-indicate-the-last-PREPARE-.patchDownload
From 3f2e86bd1efbe0707d02705bb641c5629716c598 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 07:38:35 +0000
Subject: [PATCH 1/2] Add XactLastPrepareEnd to indicate the last PREPARE
 record

We are using XactLastCommitEnd to track transaction activities. This approach
works well for regular transactions, but it has been wrongly re-used for prepared
transactions. This commit adds a new global variable, XactLastPrepareEnd, to
track more appropriately.

Originally reported by Wang Wei
---
 src/backend/access/transam/twophase.c     |  3 +++
 src/backend/access/transam/xlog.c         |  1 +
 src/backend/replication/logical/worker.c  | 10 +++++-----
 src/include/access/xlog.h                 |  1 +
 src/include/replication/worker_internal.h |  4 ++--
 5 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 9a8257fcaf..2d7e15c2c3 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1269,6 +1269,9 @@ EndPrepare(GlobalTransaction gxact)
 	 */
 	SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
 
+	/* remember end of last prepare record */
+	XactLastPrepareEnd = gxact->prepare_end_lsn;
+
 	records.tail = records.head = NULL;
 	records.num_chunks = 0;
 }
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 473a9c5c2f..3dfae5e7d9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -251,6 +251,7 @@ static int	LocalXLogInsertAllowed = -1;
 XLogRecPtr	ProcLastRecPtr = InvalidXLogRecPtr;
 XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
 XLogRecPtr	XactLastCommitEnd = InvalidXLogRecPtr;
+XLogRecPtr	XactLastPrepareEnd = InvalidXLogRecPtr;
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c0bda6269b..46f7a5c3a5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1136,7 +1136,7 @@ apply_handle_prepare(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	store_flush_position(prepare_data.end_lsn, XactLastPrepareEnd);
 
 	in_remote_transaction = false;
 
@@ -1193,7 +1193,7 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	store_flush_position(prepare_data.end_lsn, XactLastPrepareEnd);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1309,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+			store_flush_position(prepare_data.end_lsn, XactLastPrepareEnd);
 
 			in_remote_transaction = false;
 
@@ -1367,7 +1367,7 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			MyParallelShared->last_commit_end = XactLastCommitEnd;
+			MyParallelShared->last_commit_end = XactLastPrepareEnd;
 
 			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
 			pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
@@ -3447,7 +3447,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	/* Need to do this in permanent context */
 	MemoryContextSwitchTo(ApplyContext);
 
-	/* Track commit lsn  */
+	/* Track record lsn  */
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
 	flushpos->local_end = local_lsn;
 	flushpos->remote_end = remote_lsn;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c40fd56b29..9530338209 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -32,6 +32,7 @@ extern PGDLLIMPORT int wal_sync_method;
 extern PGDLLIMPORT XLogRecPtr ProcLastRecPtr;
 extern PGDLLIMPORT XLogRecPtr XactLastRecEnd;
 extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
+extern PGDLLIMPORT XLogRecPtr XactLastPrepareEnd;
 
 /* these variables are GUC parameters related to XLOG */
 extern PGDLLIMPORT int wal_segment_size;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 515aefd519..bdc73d2374 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -163,8 +163,8 @@ typedef struct ParallelApplyWorkerShared
 	pg_atomic_uint32 pending_stream_count;
 
 	/*
-	 * XactLastCommitEnd from the parallel apply worker. This is required by
-	 * the leader worker so it can update the lsn_mappings.
+	 * XactLastCommitEnd or XactLastPrepareEnd from the parallel apply worker.
+	 * This is required by the leader worker so it can update the lsn_mappings.
 	 */
 	XLogRecPtr	last_commit_end;
 
-- 
2.43.0

0002-Prevent-origin-progress-advancement-if-failed-to-app.patchapplication/octet-stream; name=0002-Prevent-origin-progress-advancement-if-failed-to-app.patchDownload
From 2a9649307e8537835b507e53e5a791811020d6e0 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH 2/2] Prevent origin progress advancement if failed to apply a
 transaction

The origin progress is advanced when aborting a transaction, intented to
signify the successful streaming and application of the ROLLBACK from the
publisher to the subscriber in streaming parallel mode. But when an error
occurred during the commit or prepare after setting
replorigin_session_origin_lsn, the origin progress was advanced as well which
is unexpected. This led to skipped transactions that were not replicated again.

Fix it by resetting replorigin_session_origin_lsn in case of error.

Originally reported by Hou Zhijie
---
 src/backend/replication/logical/worker.c  | 28 +++++++++++++++++++++++
 src/include/replication/worker_internal.h |  3 ++-
 src/test/subscription/t/021_twophase.pl   | 14 +++++++++++-
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 46f7a5c3a5..9a1dbead60 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4430,6 +4430,14 @@ start_apply(XLogRecPtr origin_startpos)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Reset the origin data to prevent the advancement of origin progress
+		 * if the transaction failed to apply.
+		 */
+		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_session_origin_lsn = InvalidXLogRecPtr;
+		replorigin_session_origin_timestamp = 0;
+
 		if (MySubscription->disableonerr)
 			DisableSubscriptionAndExit();
 		else
@@ -4640,6 +4648,19 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state. This is needed to prevent the advancement of origin
+ * progress if the transaction failed to apply.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4668,6 +4689,13 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting the
+	 * transaction in ShutdownPostgres(). This is to prevent the advancement
+	 * of origin progress if the transaction failed to apply.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index bdc73d2374..0bf6f737f4 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -164,7 +164,8 @@ typedef struct ParallelApplyWorkerShared
 
 	/*
 	 * XactLastCommitEnd or XactLastPrepareEnd from the parallel apply worker.
-	 * This is required by the leader worker so it can update the lsn_mappings.
+	 * This is required by the leader worker so it can update the
+	 * lsn_mappings.
 	 */
 	XLogRecPtr	last_commit_end;
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 9437cd4c3b..e635be74c6 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.43.0

#2shveta malik
shveta.malik@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#1)
Re: Found issues related with logical replication and 2PC

On Wed, Jul 24, 2024 at 12:25 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Hi hackers,

While creating a patch which allows ALTER SUBSCRIPTION SET (two_phase) [1],
we found some issues related with logical replication and two_phase. I think this
can happen not only HEAD but PG14+, but for now I shared patches for HEAD.

Issue #1

When handling a PREPARE message, the subscriber mistook the wrong lsn position
(the end position of the last commit) as the end position of the current prepare.
This can be fixed by adding a new global variable to record the end position of
the last prepare. 0001 patch fixes the issue.

Thanks for the patches. I have started reviewing this. I reviewed and
tested patch001 alone.

I have a query, shouldn't the local-lsn stored in
apply_handle_commit_prepared() be the end position of
'COMMIT_PREPARED' instead of 'PREPARE'? I put additional logging on
sub and got this:

LOG: apply_handle_prepare - prepare_data.end_lsn: 0/15892E0 ,
XactLastPrepareEnd: 0/1537FD8.
LOG: apply_handle_commit_prepared - prepare_data.end_lsn: 0/1589318
, XactLastPrepareEnd: 0/1537FD8.

In apply_handle_prepare(), remote-lsn ('0/15892E0') is end position of
'PREPARE' and in apply_handle_commit_prepared(), remote-lsn
('0/1589318') is end position of 'COMMIT_PREPARED', while local-lsn in
both cases is end-lsn of 'PREPARE'. Details at [1]Pub: ------ SELECT * FROM pg_get_wal_record_info('0/15892E0'); start_lsn | end_lsn | prev_lsn | record_type ---------+-----------+-----------+----------------- 0/15892E0 | 0/1589318 | 0/15891E8 | COMMIT_PREPARED.

Shouldn't we use 'XactLastCommitEnd' in apply_handle_commit_prepared()
which is the end position of last COMMIT_PREPARED? It is assigned in
the below flow:
apply_handle_commit_prepared-->CommitTransactionCommand...->RecordTransactionCommit?

Please let me know if I have misunderstood.

[1]: Pub: ------ SELECT * FROM pg_get_wal_record_info('0/15892E0'); start_lsn | end_lsn | prev_lsn | record_type ---------+-----------+-----------+----------------- 0/15892E0 | 0/1589318 | 0/15891E8 | COMMIT_PREPARED

Pub:
------
SELECT * FROM pg_get_wal_record_info('0/15892E0');
start_lsn | end_lsn | prev_lsn | record_type
---------+-----------+-----------+-----------------
0/15892E0 | 0/1589318 | 0/15891E8 | COMMIT_PREPARED

--see prev_lsn
SELECT * FROM pg_get_wal_record_info('0/15891E8');
start_lsn | end_lsn | prev_lsn | record_type
---------+-----------+-----------+-------------
0/15891E8 | 0/15892E0 | 0/15891A8 | PREPARE

SELECT * FROM pg_get_wal_record_info('0/1589318');
start_lsn | end_lsn | prev_lsn | record_type
---------+-----------+-----------+---------------
0/1589318 | 0/1589350 | 0/15892E0 | RUNNING_XACTS

--see prev_lsn
SELECT * FROM pg_get_wal_record_info('0/15892E0');
start_lsn | end_lsn | prev_lsn | record_type
---------+-----------+-----------+-----------------
0/15892E0 | 0/1589318 | 0/15891E8 | COMMIT_PREPARED

Sub:
------
SELECT * FROM pg_get_wal_record_info('0/1537FD8');
start_lsn | end_lsn | prev_lsn | record_type
---------+-----------+-----------+------------------
0/1537FD8 | 0/1538030 | 0/1537ED0 | COMMIT_PREPARED

--see prev_lsn:
SELECT * FROM pg_get_wal_record_info('0/1537ED0');
start_lsn | end_lsn | prev_lsn |record_type
---------+-----------+-----------+------------
0/1537ED0 | 0/1537FD8 | 0/1537E90 |PREPARE

thanks
Shveta

#3Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#2)
Re: Found issues related with logical replication and 2PC

On Wed, Aug 7, 2024 at 12:38 PM shveta malik <shveta.malik@gmail.com> wrote:

On Wed, Jul 24, 2024 at 12:25 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Issue #1

When handling a PREPARE message, the subscriber mistook the wrong lsn position
(the end position of the last commit) as the end position of the current prepare.
This can be fixed by adding a new global variable to record the end position of
the last prepare. 0001 patch fixes the issue.

Thanks for the patches. I have started reviewing this. I reviewed and
tested patch001 alone.

It makes sense. As both are different bugs we should discuss them separately.

I have a query, shouldn't the local-lsn stored in
apply_handle_commit_prepared() be the end position of
'COMMIT_PREPARED' instead of 'PREPARE'? I put additional logging on
sub and got this:

LOG: apply_handle_prepare - prepare_data.end_lsn: 0/15892E0 ,
XactLastPrepareEnd: 0/1537FD8.
LOG: apply_handle_commit_prepared - prepare_data.end_lsn: 0/1589318
, XactLastPrepareEnd: 0/1537FD8.

In apply_handle_prepare(), remote-lsn ('0/15892E0') is end position of
'PREPARE' and in apply_handle_commit_prepared(), remote-lsn
('0/1589318') is end position of 'COMMIT_PREPARED', while local-lsn in
both cases is end-lsn of 'PREPARE'. Details at [1].

Shouldn't we use 'XactLastCommitEnd' in apply_handle_commit_prepared()
which is the end position of last COMMIT_PREPARED? It is assigned in
the below flow:
apply_handle_commit_prepared-->CommitTransactionCommand...->RecordTransactionCommit?

I also think so. Additionally, I feel a test case (or some description
of the bug that can arise) should be provided for issue-1.

--
With Regards,
Amit Kapila.

#4Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#3)
Re: Found issues related with logical replication and 2PC

On Wed, Aug 7, 2024 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I also think so. Additionally, I feel a test case (or some description
of the bug that can arise) should be provided for issue-1.

IIUC, the problem could be that we would end up updating the wrong
local_end LSN in lsn_mappings via store_flush_position(). Then
get_flush_position() could end up computing the wrong flush position
and send the confirmation of flush to the publisher even when it is
not flushed. This ideally could lead to a situation where the prepared
transaction is not flushed to disk on the subscriber and because
publisher would have gotten the confirmation earlier than required, it
won't send the prepared transaction again. I think this theory is not
true for prepare transactions because we always flush WAL of prepare
even for asynchronous commit mode. See EndPrepare(). So, if my
analysis is correct, this shouldn't be a bug and ideally, we should
update local_end LSN as InvalidXLogRecPtr and add appropriate
comments.

--
With Regards,
Amit Kapila.

#5shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#4)
Re: Found issues related with logical replication and 2PC

On Wed, Aug 7, 2024 at 5:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Aug 7, 2024 at 3:32 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I also think so. Additionally, I feel a test case (or some description
of the bug that can arise) should be provided for issue-1.

IIUC, the problem could be that we would end up updating the wrong
local_end LSN in lsn_mappings via store_flush_position(). Then
get_flush_position() could end up computing the wrong flush position
and send the confirmation of flush to the publisher even when it is
not flushed. This ideally could lead to a situation where the prepared
transaction is not flushed to disk on the subscriber and because
publisher would have gotten the confirmation earlier than required, it
won't send the prepared transaction again.

Yes, that is what my understanding was.

I think this theory is not
true for prepare transactions because we always flush WAL of prepare
even for asynchronous commit mode. See EndPrepare().

Okay, I was not aware of this. Thanks for explaining.

So, if my
analysis is correct, this shouldn't be a bug and ideally, we should
update local_end LSN as InvalidXLogRecPtr and add appropriate
comments.

Okay, we can do that. Then get_flush_position() can also be changed to
*explicitly* deal with the case where local_end is InvalidXLogRecPtr.
Having said that, even though it is not a bug, shouldn't we still have
the correct mapping updated in lsn_mapping? When remote_end is PREPARE
Or COMMIT_PREPARED, local_end should also point to the same?

thanks
Shveta

#6Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#5)
Re: Found issues related with logical replication and 2PC

On Thu, Aug 8, 2024 at 8:54 AM shveta malik <shveta.malik@gmail.com> wrote:

On Wed, Aug 7, 2024 at 5:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

So, if my
analysis is correct, this shouldn't be a bug and ideally, we should
update local_end LSN as InvalidXLogRecPtr and add appropriate
comments.

Okay, we can do that. Then get_flush_position() can also be changed to
*explicitly* deal with the case where local_end is InvalidXLogRecPtr.

AFAICS, it should be handled without any change as the value of
InvalidXLogRecPtr is 0. So, it should be less than equal to the
local_flush position.

Having said that, even though it is not a bug, shouldn't we still have
the correct mapping updated in lsn_mapping? When remote_end is PREPARE
Or COMMIT_PREPARED, local_end should also point to the same?

Ideally yes, but introducing a new global variable just for this
purpose doesn't sound advisable. We can add in comments that in the
future, if adding such a variable serves some purpose then we can
surely extend the functionality.

--
With Regards,
Amit Kapila.

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#1)
Re: Found issues related with logical replication and 2PC

On Wed, Jul 24, 2024 at 12:25 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

While creating a patch which allows ALTER SUBSCRIPTION SET (two_phase) [1],
we found some issues related with logical replication and two_phase. I think this
can happen not only HEAD but PG14+, but for now I shared patches for HEAD.

Issue #1

When handling a PREPARE message, the subscriber mistook the wrong lsn position
(the end position of the last commit) as the end position of the current prepare.
This can be fixed by adding a new global variable to record the end position of
the last prepare. 0001 patch fixes the issue.

Issue #2

When the subscriber enables two-phase commit but doesn't set max_prepared_transaction >0
and a transaction is prepared on the publisher, the apply worker reports an ERROR
on the subscriber. After that, the prepared transaction is not replayed, which
means it's lost forever. Attached script can emulate the situation.

--
ERROR: prepared transactions are disabled
HINT: Set "max_prepared_transactions" to a nonzero value.
--

The reason is that we advanced the origin progress when aborting the
transaction as well (RecordTransactionAbort->replorigin_session_advance). So,
after setting replorigin_session_origin_lsn, if any ERROR happens when preparing
the transaction, the transaction aborts which incorrectly advances the origin lsn.

An easiest fix is to reset session replication origin before calling the
RecordTransactionAbort(). I think this can happen when 1) LogicalRepApplyLoop()
raises an ERROR or 2) apply worker exits. 0002 patch fixes the issue.

Can we start a separate thread to issue 2? I understand that this one
is also related to two_phase but since both are different issues it is
better to discuss in separate threads. This will also help us refer to
the discussion in future if required.

BTW, why did the 0002 patch change the below code:
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -164,7 +164,8 @@ typedef struct ParallelApplyWorkerShared
  /*
  * XactLastCommitEnd or XactLastPrepareEnd from the parallel apply worker.
- * This is required by the leader worker so it can update the lsn_mappings.
+ * This is required by the leader worker so it can update the
+ * lsn_mappings.
  */
  XLogRecPtr last_commit_end;

--
With Regards,
Amit Kapila.

#8Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#7)
RE: Found issues related with logical replication and 2PC

Dear Amit,

Can we start a separate thread to issue 2? I understand that this one
is also related to two_phase but since both are different issues it is
better to discuss in separate threads. This will also help us refer to
the discussion in future if required.

You are right, we should discuss one topic per thread. Forked: [1]/messages/by-id/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com.

BTW, why did the 0002 patch change the below code:
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -164,7 +164,8 @@ typedef struct ParallelApplyWorkerShared
/*
* XactLastCommitEnd or XactLastPrepareEnd from the parallel apply worker.
- * This is required by the leader worker so it can update the lsn_mappings.
+ * This is required by the leader worker so it can update the
+ * lsn_mappings.
*/
XLogRecPtr last_commit_end;

Opps. Fixed version is posted in [1]/messages/by-id/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com.

[1]: /messages/by-id/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com

Best regards,
Hayato Kuroda
FUJITSU LIMITED

#9shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#6)
Re: Found issues related with logical replication and 2PC

On Thu, Aug 8, 2024 at 9:53 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 8, 2024 at 8:54 AM shveta malik <shveta.malik@gmail.com> wrote:

On Wed, Aug 7, 2024 at 5:43 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

So, if my
analysis is correct, this shouldn't be a bug and ideally, we should
update local_end LSN as InvalidXLogRecPtr and add appropriate
comments.

Okay, we can do that. Then get_flush_position() can also be changed to
*explicitly* deal with the case where local_end is InvalidXLogRecPtr.

AFAICS, it should be handled without any change as the value of
InvalidXLogRecPtr is 0. So, it should be less than equal to the
local_flush position.

Yes, existing code will work, no doubt about that. But generally we
explictly use XLogRecPtrIsInvalid if we need to include or exclude
lsn=0 in some logic. We do not consider 0 lsn for comparisons like
this which we currently have in get_flush_position. Thus stated for an
explicit check. But, yes, the current code will work.

Having said that, even though it is not a bug, shouldn't we still have
the correct mapping updated in lsn_mapping? When remote_end is PREPARE
Or COMMIT_PREPARED, local_end should also point to the same?

Ideally yes, but introducing a new global variable just for this
purpose doesn't sound advisable. We can add in comments that in the
future, if adding such a variable serves some purpose then we can
surely extend the functionality.

Okay. Sounds reasonable.

thanks
Shveta

#10Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#6)
1 attachment(s)
RE: Found issues related with logical replication and 2PC

Dear Amit, Shveta,

Thanks for discussing!

I reported the issue because 1) I feared the risk of data loss and 2) simply
because the coding looked incorrect. However, per discussion, I understood that
it wouldn't lead to loss, and adding a global variable was unacceptable in this
case. I modified the patch completely.

The attached patch avoids using the LastCommitLSN as the local_lsn while applying
PREPARE. get_flush_position() was not changed. Also, it contains changes that
have not been discussed yet:

- Set last_commit_end to InvaldXLogPtr in the PREPARE case.
This causes the same result as when the stream option is not "parallel."
- XactLastCommitEnd was replaced even ROLLBACK PREPARED case.
Since the COMMIT PREPARED record is flushed in RecordTransactionAbortPrepared(),
there is no need to ensure the WAL must be sent.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v2-0001-Not-to-store-the-flush-position-of-the-PREPARE-re.patchapplication/octet-stream; name=v2-0001-Not-to-store-the-flush-position-of-the-PREPARE-re.patchDownload
From 4933bd730d40a4e9ca5c4a427526d91d12e70fa5 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 8 Aug 2024 08:06:30 +0000
Subject: [PATCH v2] Not to store the flush position of the PREPARE record by
 using the last commit position

Previously, when the subscriber mistook the wrong lsn position
(the end position of the last commit) as the end position of the current prepare.
This does not lead to data loss but uses a special value (invalidXLogRecPtr).
---
 src/backend/replication/logical/worker.c | 26 ++++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6dc54c7283..58710e4c20 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1133,7 +1133,16 @@ apply_handle_prepare(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	/*
+	 * Here, we set InvalidXLogRecPtr as the last flushed WAL location because
+	 * we do not have a way to detect the actual value. Even when there are no
+	 * pending transactions while sending feedback so that all the received
+	 * changes are regarded as flushed, there is no risk that prepared
+	 * transactions are lost. Because prepared transactions have already been
+	 * flushed at the end of PREPARE applications. When a variable tracks the
+	 * last PREPARE record, we can use it as local_lsn.
+	 */
+	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
 
 	in_remote_transaction = false;
 
@@ -1251,7 +1260,7 @@ apply_handle_rollback_prepared(StringInfo s)
 
 	pgstat_report_stat(false);
 
-	store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
+	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1306,7 +1315,12 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+			/*
+			 * We set InvalidXLogRecPtr as the last flushed WAL location
+			 * because we do not have a way to detect the actual value. See
+			 * comments in apply_handle_prepare().
+			 */
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
 
 			in_remote_transaction = false;
 
@@ -1364,7 +1378,11 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			MyParallelShared->last_commit_end = XactLastCommitEnd;
+			/*
+			 * InvalidXLogRecPtr as the last flushed WAL location. See comments
+			 * in apply_handle_prepare().
+			 */
+			MyParallelShared->last_commit_end = InvalidXLogRecPtr;
 
 			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
 			pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
-- 
2.43.0

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#10)
1 attachment(s)
Re: Found issues related with logical replication and 2PC

On Thu, Aug 8, 2024 at 2:37 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Thanks for discussing!

I reported the issue because 1) I feared the risk of data loss and 2) simply
because the coding looked incorrect. However, per discussion, I understood that
it wouldn't lead to loss, and adding a global variable was unacceptable in this
case. I modified the patch completely.

The attached patch avoids using the LastCommitLSN as the local_lsn while applying
PREPARE. get_flush_position() was not changed. Also, it contains changes that
have not been discussed yet:

- Set last_commit_end to InvaldXLogPtr in the PREPARE case.
This causes the same result as when the stream option is not "parallel."
- XactLastCommitEnd was replaced even ROLLBACK PREPARED case.
Since the COMMIT PREPARED record is flushed in RecordTransactionAbortPrepared(),
there is no need to ensure the WAL must be sent.

The code changes look mostly good to me. I have changed/added a few
comments in the attached modified version.

--
With Regards,
Amit Kapila.

Attachments:

v3-0001-Change-the-misleading-local-end_lsn-for-prepared-.patchapplication/octet-stream; name=v3-0001-Change-the-misleading-local-end_lsn-for-prepared-.patchDownload
From 0d03cc3380d4904b0a29c10fc740b9ea4888bc48 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 8 Aug 2024 08:06:30 +0000
Subject: [PATCH v4] Change the misleading local end_lsn for prepared
 transactions.

The apply worker was using XactLastCommitEnd as local end_lsn for applying
prepare and rollback_prepare. The XactLastCommitEnd value is the end lsn
of the last commit applied before the prepare transaction which makes no
sense. This LSN is used to decide whether we can send the acknowledgment
of the corresponding remote LSN to the server.

It is okay not to set the local_end LSN with the actual WAL position for
the prepare because we always flush the prepare record. So, we can send
the acknowledgment of the remote_end LSN as soon as prepare is finished.

The current code is misleading but as such doesn't create any problem, so
decided not to backpatch.

Author: Hayato Kuroda
Reviewed-by: Shveta Malik, Amit Kapila
Discussion: https://postgr.es/m/TYAPR01MB5692FA4926754B91E9D7B5F0F5AA2@TYAPR01MB5692.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/worker.c | 31 +++++++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6dc54c7283..da4e1e21fa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1133,7 +1133,17 @@ apply_handle_prepare(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	/*
+	 * It is okay not to set the local_end LSN for the prepare because we always
+	 * flush the prepare record. So, we can send the acknowledgment of the
+	 * remote_end LSN as soon as prepare is finished.
+	 *
+	 * XXX For the sake of consistency with commit, we could have set it with
+	 * the LSN of prepare but as of now we don't track that value similar to
+	 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
+	 * it.
+	 */
+	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
 
 	in_remote_transaction = false;
 
@@ -1251,7 +1261,12 @@ apply_handle_rollback_prepared(StringInfo s)
 
 	pgstat_report_stat(false);
 
-	store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
+	/*
+	 * It is okay not to set the local_end LSN for the rollback of prepared
+	 * transaction because we always flush the WAL record for it. See
+	 * apply_handle_prepare.
+	 */
+	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
@@ -1306,7 +1321,11 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+			 /*
+			  * It is okay not to set the local_end LSN for the prepare because we
+			  * always flush the prepare record. See apply_handle_prepare.
+			  */
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
 
 			in_remote_transaction = false;
 
@@ -1364,7 +1383,11 @@ apply_handle_stream_prepare(StringInfo s)
 
 			CommitTransactionCommand();
 
-			MyParallelShared->last_commit_end = XactLastCommitEnd;
+			/*
+			 * It is okay not to set the local_end LSN for the prepare because we
+			 * always flush the prepare record. See apply_handle_prepare.
+			 */
+			MyParallelShared->last_commit_end = InvalidXLogRecPtr;
 
 			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
 			pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
-- 
2.28.0.windows.1

#12shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#11)
Re: Found issues related with logical replication and 2PC

On Thu, Aug 8, 2024 at 5:53 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 8, 2024 at 2:37 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Thanks for discussing!

I reported the issue because 1) I feared the risk of data loss and 2) simply
because the coding looked incorrect. However, per discussion, I understood that
it wouldn't lead to loss, and adding a global variable was unacceptable in this
case. I modified the patch completely.

The attached patch avoids using the LastCommitLSN as the local_lsn while applying
PREPARE. get_flush_position() was not changed. Also, it contains changes that
have not been discussed yet:

- Set last_commit_end to InvaldXLogPtr in the PREPARE case.
This causes the same result as when the stream option is not "parallel."
- XactLastCommitEnd was replaced even ROLLBACK PREPARED case.
Since the COMMIT PREPARED record is flushed in RecordTransactionAbortPrepared(),
there is no need to ensure the WAL must be sent.

The code changes look mostly good to me. I have changed/added a few
comments in the attached modified version.

Code changes with Amit's correction patch look good to me.

thanks
Shveta

#13Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#11)
1 attachment(s)
RE: Found issues related with logical replication and 2PC

Dear Amit,

The code changes look mostly good to me. I have changed/added a few
comments in the attached modified version.

Thanks for updating the patch! It LGTM. I've tested your patch and confirmed
it did not cause the data loss. I used the source which was applied v3 and additional
fix to visualize the replication command [1]``` @@ -3297,6 +3297,8 @@ apply_dispatch(StringInfo s) saved_command = apply_error_callback_arg.command; apply_error_callback_arg.command = action;.

Method
======

1. Construct a logical replication system with two_phase = true and
synchronous_commit = false
2. attach a walwriter of the subscriber to stop the process
3. Start a transaction and prepare it for the publisher.
4. Wait until the worker replies to the publisher.
5. Stop the subscriber
6. Restart subscriber.
7. Do COMMIT PREPARED

Attached script can construct the same situation.

Result
======

After the step 5, I ran pg_waldump and confirmed PREPARE record existed on
the subscriber.

```
$ pg_waldump data_sub/pg_wal/000000010000000000000001
...
rmgr: Transaction len..., desc: PREPARE gid pg_gid_16389_741: ...
rmgr: XLOG len..., desc: CHECKPOINT_SHUTDOWN ...
```

Also, after the step 7, I confirmed that only the COMMIT PREPARED record
was sent because log output the below line. "75" means the ASCII character 'K';
this indicated that the replication message corresponded to COMMIT PREPARED.
```
LOG: XXX got message 75
```

Additionally, I did another test, which is basically same as above but 1) XLogFlush()
in EndPrepare() was commented out and 2) kill -9 was used at step 5 to emulate a
crash. Since the PREPAREd transaction cannot survive on the subscriber in this case,
so COMMIT PREPARED command on publisher causes an ERROR on the subscriber.
```
ERROR: prepared transaction with identifier "pg_gid_16389_741" does not exist
CONTEXT: processing remote data for replication origin "pg_16389" during message
type "COMMIT PREPARED" in transaction 741, finished at 0/15463C0
```
I think this shows that the backend process can ensure the WAL is persisted so data loss
won't occur.

[1]: ``` @@ -3297,6 +3297,8 @@ apply_dispatch(StringInfo s) saved_command = apply_error_callback_arg.command; apply_error_callback_arg.command = action;
```
@@ -3297,6 +3297,8 @@ apply_dispatch(StringInfo s)
saved_command = apply_error_callback_arg.command;
apply_error_callback_arg.command = action;

+ elog(LOG, "XXX got message %d", action);
```

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

test_0809.shapplication/octet-stream; name=test_0809.shDownload
#14Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#13)
Re: Found issues related with logical replication and 2PC

On Fri, Aug 9, 2024 at 10:34 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

The code changes look mostly good to me. I have changed/added a few
comments in the attached modified version.

Thanks for updating the patch! It LGTM. I've tested your patch and confirmed
it did not cause the data loss.

Thanks for the additional testing. I have pushed this patch.

--
With Regards,
Amit Kapila.