[bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

Started by Hayato Kuroda (Fujitsu)over 1 year ago24 messages
#1Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
2 attachment(s)

Dear hackers,

This thread forks from [1]/messages/by-id/TYAPR01MB5692FA4926754B91E9D7B5F0F5AA2@TYAPR01MB5692.jpnprd01.prod.outlook.com. Here can be used to discuss second item.
Below part contains the same statements written in [1]/messages/by-id/TYAPR01MB5692FA4926754B91E9D7B5F0F5AA2@TYAPR01MB5692.jpnprd01.prod.outlook.com, but I did copy-and-paste
just in case. Attached patch is almost the same but bit modified based on the comment
from Amit [2]/messages/by-id/CAA4eK1L-r8OKGdBwC6AeXSibrjr9xKsg8LjGpX_PDR5Go-A9TA@mail.gmail.com - an unrelated change is removed.

Found issue
=====
When the subscriber enables two-phase commit but doesn't set max_prepared_transaction >0
and a transaction is prepared on the publisher, the apply worker reports an ERROR
on the subscriber. After that, the prepared transaction is not replayed, which
means it's lost forever. Attached script can emulate the situation.

--
ERROR: prepared transactions are disabled
HINT: Set "max_prepared_transactions" to a nonzero value.
--

The reason is that we advanced the origin progress when aborting the
transaction as well (RecordTransactionAbort->replorigin_session_advance). So,
after setting replorigin_session_origin_lsn, if any ERROR happens when preparing
the transaction, the transaction aborts which incorrectly advances the origin lsn.

An easiest fix is to reset session replication origin before calling the
RecordTransactionAbort(). I think this can happen when 1) LogicalRepApplyLoop()
raises an ERROR or 2) apply worker exits. Attached patch can fix the issue on HEAD.

[1]: /messages/by-id/TYAPR01MB5692FA4926754B91E9D7B5F0F5AA2@TYAPR01MB5692.jpnprd01.prod.outlook.com
[2]: /messages/by-id/CAA4eK1L-r8OKGdBwC6AeXSibrjr9xKsg8LjGpX_PDR5Go-A9TA@mail.gmail.com

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v2-0001-Prevent-origin-progress-advancement-if-failed-to-.patchapplication/octet-stream; name=v2-0001-Prevent-origin-progress-advancement-if-failed-to-.patchDownload
From eb4e9737040beb08e4a2567643dee2dfe02c4e8b Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v2] Prevent origin progress advancement if failed to apply a
 transaction

The origin progress is advanced when aborting a transaction, intented to
signify the successful streaming and application of the ROLLBACK from the
publisher to the subscriber in streaming parallel mode. But when an error
occurred during the commit or prepare after setting
replorigin_session_origin_lsn, the origin progress was advanced as well which
is unexpected. This led to skipped transactions that were not replicated again.

Fix it by resetting replorigin_session_origin_lsn in case of error.

Originally reported by Hou Zhijie
---
 src/backend/replication/logical/worker.c | 28 ++++++++++++++++++++++++
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6dc54c7283..70e4113644 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Reset the origin data to prevent the advancement of origin progress
+		 * if the transaction failed to apply.
+		 */
+		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_session_origin_lsn = InvalidXLogRecPtr;
+		replorigin_session_origin_timestamp = 0;
+
 		if (MySubscription->disableonerr)
 			DisableSubscriptionAndExit();
 		else
@@ -4619,6 +4627,19 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state. This is needed to prevent the advancement of origin
+ * progress if the transaction failed to apply.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4647,6 +4668,13 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting the
+	 * transaction in ShutdownPostgres(). This is to prevent the advancement
+	 * of origin progress if the transaction failed to apply.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af33..19147f31e2 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.43.0

test_2pc.shapplication/octet-stream; name=test_2pc.shDownload
#2Amit Kapila
amit.kapila16@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#1)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Thu, Aug 8, 2024 at 10:37 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

...

An easiest fix is to reset session replication origin before calling the
RecordTransactionAbort(). I think this can happen when 1) LogicalRepApplyLoop()
raises an ERROR or 2) apply worker exits. Attached patch can fix the issue on HEAD.

Few comments:
=============
*
@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
  }
  PG_CATCH();
  {
+ /*
+ * Reset the origin data to prevent the advancement of origin progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

*
+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

--
With Regards,
Amit Kapila.

#3shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#2)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Thu, Aug 8, 2024 at 12:03 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 8, 2024 at 10:37 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

...

An easiest fix is to reset session replication origin before calling the
RecordTransactionAbort(). I think this can happen when 1) LogicalRepApplyLoop()
raises an ERROR or 2) apply worker exits. Attached patch can fix the issue on HEAD.

Few comments:
=============
*
@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

*
+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

+1

Basic tests work fine on this patch. Just thinking out loud,
SetupApplyOrSyncWorker() is called for table-sync worker as well and
IIUC tablesync worker does not deal with 2PC txns. So do we even need
to register replorigin_reset() for tablesync worker as well? If we may
hit such an issue in general, then perhaps we need it in table-sync
worker otherwise not. It needs some investigation. Thoughts?

thanks
Shveta

#4Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: shveta malik (#3)
RE: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Thursday, August 8, 2024 6:01 PM shveta malik <shveta.malik@gmail.com> wrote:

On Thu, Aug 8, 2024 at 12:03 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Thu, Aug 8, 2024 at 10:37 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

...

An easiest fix is to reset session replication origin before calling
the RecordTransactionAbort(). I think this can happen when 1)
LogicalRepApplyLoop() raises an ERROR or 2) apply worker exits.

Attached patch can fix the issue on HEAD.

Few comments:
=============
*
@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin
+ progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

*
+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the
+ advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

+1

Basic tests work fine on this patch. Just thinking out loud,
SetupApplyOrSyncWorker() is called for table-sync worker as well and IIUC
tablesync worker does not deal with 2PC txns. So do we even need to register
replorigin_reset() for tablesync worker as well? If we may hit such an issue in
general, then perhaps we need it in table-sync worker otherwise not. It
needs some investigation. Thoughts?

I think this is a general issue that can occur not only due to 2PC. IIUC, this
problem should arise if any ERROR arises after setting the
replorigin_session_origin_lsn but before the CommitTransactionCommand is
completed. If so, I think we should register it for tablesync worker as well.

Best Regards,
Hou zj

#5Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#4)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Thu, Aug 8, 2024 at 3:41 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Thursday, August 8, 2024 6:01 PM shveta malik <shveta.malik@gmail.com> wrote:

On Thu, Aug 8, 2024 at 12:03 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Thu, Aug 8, 2024 at 10:37 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

...

An easiest fix is to reset session replication origin before calling
the RecordTransactionAbort(). I think this can happen when 1)
LogicalRepApplyLoop() raises an ERROR or 2) apply worker exits.

Attached patch can fix the issue on HEAD.

Few comments:
=============
*
@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin
+ progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

*
+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the
+ advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

+1

Basic tests work fine on this patch. Just thinking out loud,
SetupApplyOrSyncWorker() is called for table-sync worker as well and IIUC
tablesync worker does not deal with 2PC txns. So do we even need to register
replorigin_reset() for tablesync worker as well? If we may hit such an issue in
general, then perhaps we need it in table-sync worker otherwise not. It
needs some investigation. Thoughts?

I think this is a general issue that can occur not only due to 2PC. IIUC, this
problem should arise if any ERROR arises after setting the
replorigin_session_origin_lsn but before the CommitTransactionCommand is
completed. If so, I think we should register it for tablesync worker as well.

As pointed out earlier, won't using PG_ENSURE_ERROR_CLEANUP() instead
of PG_CATCH() be enough?

--
With Regards,
Amit Kapila.

#6shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#5)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Thu, Aug 8, 2024 at 6:08 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Aug 8, 2024 at 3:41 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

On Thursday, August 8, 2024 6:01 PM shveta malik <shveta.malik@gmail.com> wrote:

On Thu, Aug 8, 2024 at 12:03 PM Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Thu, Aug 8, 2024 at 10:37 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

...

An easiest fix is to reset session replication origin before calling
the RecordTransactionAbort(). I think this can happen when 1)
LogicalRepApplyLoop() raises an ERROR or 2) apply worker exits.

Attached patch can fix the issue on HEAD.

Few comments:
=============
*
@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin
+ progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

*
+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the
+ advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

+1

Basic tests work fine on this patch. Just thinking out loud,
SetupApplyOrSyncWorker() is called for table-sync worker as well and IIUC
tablesync worker does not deal with 2PC txns. So do we even need to register
replorigin_reset() for tablesync worker as well? If we may hit such an issue in
general, then perhaps we need it in table-sync worker otherwise not. It
needs some investigation. Thoughts?

I think this is a general issue that can occur not only due to 2PC. IIUC, this
problem should arise if any ERROR arises after setting the
replorigin_session_origin_lsn but before the CommitTransactionCommand is
completed. If so, I think we should register it for tablesync worker as well.

As pointed out earlier, won't using PG_ENSURE_ERROR_CLEANUP() instead
of PG_CATCH() be enough?

Yes, I think it should suffice. IIUC, we are going to change
'replorigin_session_origin_lsn' only in start_apply() and not before
that, and thus ensuring its reset during any ERROR or FATAL in
start_apply() is good enough. And I guess we don't want this
origin-reset to be called during regular shutdown, isn't it? But
registering it through before_shmem_exit() will make the
reset-function to be called during normal shutdown as well.
And to answer my previous question (as Hou-San also pointed out), we
do need it in table-sync worker as well. So a change in start_apply
will make sure the fix is valid for both apply and tablesync worker.

thanks
Shveta

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#6)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Fri, Aug 9, 2024 at 9:35 AM shveta malik <shveta.malik@gmail.com> wrote:

On Thu, Aug 8, 2024 at 6:08 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

I think this is a general issue that can occur not only due to 2PC. IIUC, this
problem should arise if any ERROR arises after setting the
replorigin_session_origin_lsn but before the CommitTransactionCommand is
completed. If so, I think we should register it for tablesync worker as well.

As pointed out earlier, won't using PG_ENSURE_ERROR_CLEANUP() instead
of PG_CATCH() be enough?

Yes, I think it should suffice. IIUC, we are going to change
'replorigin_session_origin_lsn' only in start_apply() and not before
that, and thus ensuring its reset during any ERROR or FATAL in
start_apply() is good enough.

Right, I also think so.

And I guess we don't want this
origin-reset to be called during regular shutdown, isn't it?

Agreed. OTOH, there was no harm even if such a reset function is invoked.

But
registering it through before_shmem_exit() will make the
reset-function to be called during normal shutdown as well.

True and unless I am missing something we won't need it. I would like
to hear the opinions of Hou-San and Kuroda-San on the same.

And to answer my previous question (as Hou-San also pointed out), we
do need it in table-sync worker as well. So a change in start_apply
will make sure the fix is valid for both apply and tablesync worker.

As table-sync workers can also apply transactions after the initial
copy, we need it for table-sync during its apply phase.

--
With Regards,
Amit Kapila.

#8Hayato Kuroda (Fujitsu)
kuroda.hayato@fujitsu.com
In reply to: Amit Kapila (#2)
1 attachment(s)
RE: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

Dear Amit, Shveta, Hou,

Thanks for giving many comments! I've updated the patch.

@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

I didn't use the function because arguments of calling function looked strange,
but ideally I can. Fixed.

+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

There are two reasons to add a shmem-exit callback. One is to support a FATAL,
another one is to support the case that user does the shutdown request while
applying changes. In this case, I think ShutdownPostgres() can be called so that
the session origin may advance.

However, I think we cannot use PG_ENSURE_ERROR_CLEANUP()/PG_END_ENSURE_ERROR_CLEANUP
macros here. According to codes, it assumes that any before-shmem callbacks are
not registered within the block because the cleanup function is registered and canceled
within the macro. LogicalRepApplyLoop() can register the function when
it handles COMMIT PREPARED message so it breaks the rule.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

Attachments:

v3-0001-Prevent-origin-progress-advancement-if-failed-to-.patchapplication/octet-stream; name=v3-0001-Prevent-origin-progress-advancement-if-failed-to-.patchDownload
From ec22f424ede4cc828d79363502be38c04d43ac58 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v3] Prevent origin progress advancement if failed to apply a
 transaction

The origin progress is advanced when aborting a transaction, intented to
signify the successful streaming and application of the ROLLBACK from the
publisher to the subscriber in streaming parallel mode. But when an error
occurred during the commit or prepare after setting
replorigin_session_origin_lsn, the origin progress was advanced as well which
is unexpected. This led to skipped transactions that were not replicated again.

Fix it by resetting replorigin_session_origin_lsn in case of error.

Originally reported by Hou Zhijie
---
 src/backend/replication/logical/worker.c | 27 ++++++++++++++++++++++++
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++++-
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 245e9be6f2..c2849c8bfa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -413,6 +413,8 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+static void replorigin_reset(int code, Datum arg);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -4432,6 +4434,12 @@ start_apply(XLogRecPtr origin_startpos)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Reset the origin data to prevent the advancement of origin progress
+		 * if the transaction failed to apply.
+		 */
+		replorigin_reset(0, (Datum) 0);
+
 		if (MySubscription->disableonerr)
 			DisableSubscriptionAndExit();
 		else
@@ -4642,6 +4650,18 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state. This is needed to prevent the advancement of origin
+ * progress if the transaction failed to apply.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4670,6 +4690,13 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting the
+	 * transaction in ShutdownPostgres(). This is to prevent the advancement
+	 * of origin progress if the transaction failed to apply.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af33..19147f31e2 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.43.0

#9shveta malik
shveta.malik@gmail.com
In reply to: Hayato Kuroda (Fujitsu) (#8)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Fri, Aug 9, 2024 at 2:39 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Amit, Shveta, Hou,

Thanks for giving many comments! I've updated the patch.

@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

I didn't use the function because arguments of calling function looked strange,
but ideally I can. Fixed.

+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

There are two reasons to add a shmem-exit callback. One is to support a FATAL,
another one is to support the case that user does the shutdown request while
applying changes. In this case, I think ShutdownPostgres() can be called so that
the session origin may advance.

Agree that we need the 'reset' during shutdown flow as well. Details at [1]Shutdown analysis:

However, I think we cannot use PG_ENSURE_ERROR_CLEANUP()/PG_END_ENSURE_ERROR_CLEANUP
macros here. According to codes, it assumes that any before-shmem callbacks are
not registered within the block because the cleanup function is registered and canceled
within the macro. LogicalRepApplyLoop() can register the function when
it handles COMMIT PREPARED message so it breaks the rule.

Yes, on reanalyzing, we can not use PG_ENSURE_ERROR_CLEANUP in this
flow due to the limitation of cancel_before_shmem_exit() that it can
cancel only the last registered callback, while in our flow we have
other callbacks also registered after we register our reset one.

[1]: Shutdown analysis:
Shutdown analysis:

I did a test where we make apply worker wait for say 0sec right after
it updates 'replorigin_session_origin_lsn' in
apply_handle_prepare_internal() (say it at code-point1). During this
wait, we triggered a subscriber shutdown.Under normal circumstances,
everything works fine: after the wait, the apply worker processes the
SIGTERM (via LogicalRepApplyLoop-->ProcessInterrupts()) only after the
prepare phase is complete, meaning the PREPARE LSN is flushed, and the
origin LSN is correctly advanced in EndPrepare() before the worker
shuts down. But, if we insert a LOG statement between code-point1 and
EndPrepare(), the apply worker processes the SIGTERM during the LOG
operation, as errfinish() triggers CHECK_FOR_INTERRUPTS at the end,
which causes the origin LSN to be incorrectly advanced during
shutdown. And thus the subsequent COMMIT PREPARED on the publisher
results in ERROR on subscriber; as the 'PREPARE' is lost on the
subscriber and is not resent by the publisher. ERROR: prepared
transaction with identifier "pg_gid_16403_757" does not exist

A similar problem can also occur without introducing any additional
LOG statements, but by simply setting log_min_messages=debug5. This
causes the apply worker to output a few DEBUG messages upon receiving
a shutdown signal (after code-point1) before it reaches EndPrepare().
As a result, it ends up processing the SIGTERM (during logging)and
invoking AbortOutOfAnyTransaction(), which incorrectly advances the
origin LSN.

thanks
Shveta

#10shveta malik
shveta.malik@gmail.com
In reply to: shveta malik (#9)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Mon, Aug 12, 2024 at 3:36 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 9, 2024 at 2:39 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

Dear Amit, Shveta, Hou,

Thanks for giving many comments! I've updated the patch.

@@ -4409,6 +4409,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin data to prevent the advancement of origin progress
+ * if the transaction failed to apply.
+ */
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;

Can't we call replorigin_reset() instead here?

I didn't use the function because arguments of calling function looked strange,
but ideally I can. Fixed.

+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

There are two reasons to add a shmem-exit callback. One is to support a FATAL,
another one is to support the case that user does the shutdown request while
applying changes. In this case, I think ShutdownPostgres() can be called so that
the session origin may advance.

Agree that we need the 'reset' during shutdown flow as well. Details at [1]

However, I think we cannot use PG_ENSURE_ERROR_CLEANUP()/PG_END_ENSURE_ERROR_CLEANUP
macros here. According to codes, it assumes that any before-shmem callbacks are
not registered within the block because the cleanup function is registered and canceled
within the macro. LogicalRepApplyLoop() can register the function when
it handles COMMIT PREPARED message so it breaks the rule.

Yes, on reanalyzing, we can not use PG_ENSURE_ERROR_CLEANUP in this
flow due to the limitation of cancel_before_shmem_exit() that it can
cancel only the last registered callback, while in our flow we have
other callbacks also registered after we register our reset one.

[1]
Shutdown analysis:

I did a test where we make apply worker wait for say 0sec right after

Correction here: 0sec -->10sec

Show quoted text

it updates 'replorigin_session_origin_lsn' in
apply_handle_prepare_internal() (say it at code-point1). During this
wait, we triggered a subscriber shutdown.Under normal circumstances,
everything works fine: after the wait, the apply worker processes the
SIGTERM (via LogicalRepApplyLoop-->ProcessInterrupts()) only after the
prepare phase is complete, meaning the PREPARE LSN is flushed, and the
origin LSN is correctly advanced in EndPrepare() before the worker
shuts down. But, if we insert a LOG statement between code-point1 and
EndPrepare(), the apply worker processes the SIGTERM during the LOG
operation, as errfinish() triggers CHECK_FOR_INTERRUPTS at the end,
which causes the origin LSN to be incorrectly advanced during
shutdown. And thus the subsequent COMMIT PREPARED on the publisher
results in ERROR on subscriber; as the 'PREPARE' is lost on the
subscriber and is not resent by the publisher. ERROR: prepared
transaction with identifier "pg_gid_16403_757" does not exist

A similar problem can also occur without introducing any additional
LOG statements, but by simply setting log_min_messages=debug5. This
causes the apply worker to output a few DEBUG messages upon receiving
a shutdown signal (after code-point1) before it reaches EndPrepare().
As a result, it ends up processing the SIGTERM (during logging)and
invoking AbortOutOfAnyTransaction(), which incorrectly advances the
origin LSN.

thanks
Shveta

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#9)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Mon, Aug 12, 2024 at 3:37 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 9, 2024 at 2:39 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

There are two reasons to add a shmem-exit callback. One is to support a FATAL,
another one is to support the case that user does the shutdown request while
applying changes. In this case, I think ShutdownPostgres() can be called so that
the session origin may advance.

Agree that we need the 'reset' during shutdown flow as well. Details at [1]

Thanks for the detailed analysis. I agree with your analysis that we
need to reset the origin information for the shutdown path to avoid it
being advanced incorrectly. However, the patch doesn't have sufficient
comments to explain why we need to reset it for both the ERROR and
Shutdown paths. Can we improve the comments in the patch?

Also, for the ERROR path, can we reset the origin information in
apply_error_callback()?

BTW, this needs to be backpatched till 16 when it has been introduced
by the parallel apply feature as part of commit 216a7848. So, can we
test this patch in back-branches as well?

--
With Regards,
Amit Kapila.

#12shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#11)
1 attachment(s)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Tue, Aug 13, 2024 at 9:48 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Aug 12, 2024 at 3:37 PM shveta malik <shveta.malik@gmail.com> wrote:

On Fri, Aug 9, 2024 at 2:39 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:

+ /*
+ * Register a callback to reset the origin state before aborting the
+ * transaction in ShutdownPostgres(). This is to prevent the advancement
+ * of origin progress if the transaction failed to apply.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);

I think we need this despite resetting the origin-related variables in
PG_CATCH block to handle FATAL error cases, right? If so, can we use
PG_ENSURE_ERROR_CLEANUP() instead of PG_CATCH()?

There are two reasons to add a shmem-exit callback. One is to support a FATAL,
another one is to support the case that user does the shutdown request while
applying changes. In this case, I think ShutdownPostgres() can be called so that
the session origin may advance.

Agree that we need the 'reset' during shutdown flow as well. Details at [1]

Thanks for the detailed analysis. I agree with your analysis that we
need to reset the origin information for the shutdown path to avoid it
being advanced incorrectly. However, the patch doesn't have sufficient
comments to explain why we need to reset it for both the ERROR and
Shutdown paths. Can we improve the comments in the patch?

Also, for the ERROR path, can we reset the origin information in
apply_error_callback()?

Please find v4 attached. Addressed comments in that.

Manual testing done on v4:
1) Error and Fatal case
2) Shutdown after replorigin_session_origin_lsn was set in
apply_handle_prepare() and before EndPrepare was called.
2a) with log_min_messages=debug5. This will result in processing
of shutdown signal by errfinish() before PREPARE is over.
2b) with default log_min_messages. This will result in processing
of shutdown signal by LogicalRepApplyLoop() after ongoing PREPARE is
over.

BTW, this needs to be backpatched till 16 when it has been introduced
by the parallel apply feature as part of commit 216a7848. So, can we
test this patch in back-branches as well?

Sure, will do next.

thanks
Shveta

Attachments:

v4-0001-Prevent-origin-progress-advancement-if-failed-to-.patchapplication/octet-stream; name=v4-0001-Prevent-origin-progress-advancement-if-failed-to-.patchDownload
From 7ec18bfbee7c841fb237d49ff823045cd755e51c Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v4] Prevent origin progress advancement if failed to apply a
 transaction

The origin progress is advanced when aborting a transaction, intented to
signify the successful streaming and application of the ROLLBACK from the
publisher to the subscriber in streaming parallel mode. But when an error
occurred during the commit or prepare after setting
replorigin_session_origin_lsn, the origin progress was advanced as well which
is unexpected. This led to skipped transactions that were not replicated again.

Fix it by resetting replorigin_session_origin_lsn in case of error.

Originally reported by Hou Zhijie
---
 src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++
 src/backend/utils/error/elog.c           | 17 +++++++++
 src/include/utils/elog.h                 |  1 +
 src/test/subscription/t/021_twophase.pl  | 14 +++++++-
 4 files changed, 77 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6dc54c7283..837e63ea45 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4619,6 +4619,19 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state. This is needed to prevent the advancement of origin
+ * progress if the transaction failed to apply or a shutdown was triggered
+ * before worker could commit the transaction.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4647,6 +4660,25 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting the
+	 * transaction in ShutdownPostgres(). This is necessary to handle
+	 * situations where a user shuts down a subscriber while the worker is
+	 * applying changes. In such cases, the worker may shut down before
+	 * committing the transaction. If the worker does not reset the origin
+	 * state information before calling AbortOutOfAnyTransaction() during
+	 * shutdown, the origin LSN will advance incorrectly, resulting in the
+	 * loss of that transaction, as it will not be replayed when the
+	 * subscriber is restarted.
+	 *
+	 * Since errfinish includes CHECK_FOR_INTERRUPTS(), any LOG or DEBUG
+	 * statements placed in the code after the origin state is set may process
+	 * a shutdown signal before committing the current apply operation, which
+	 * could lead to above explained situation.
+	 *
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
@@ -4873,12 +4905,26 @@ void
 apply_error_callback(void *arg)
 {
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+	int			elevel;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
 	Assert(errarg->origin_name);
 
+	elevel = geterrlevel();
+
+	/*
+	 * Reset the origin state to prevent the advancement of origin progress if
+	 * the transaction fails to apply. It is crucial to reset this information
+	 * before calling AbortOutOfAnyTransaction(). Otherwise, the abort will
+	 * incorrectly advance the origin based on the current origin state for
+	 * which the worker could not perform the commit. This will result in
+	 * transaction loss, as that transaction will never be replayed.
+	 */
+	if (elevel >= ERROR)
+		replorigin_reset(0, (Datum) 0);
+
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 943d8588f3..e26b9f3a66 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1568,6 +1568,23 @@ geterrcode(void)
 	return edata->sqlerrcode;
 }
 
+/*
+ * geterrlevel --- return the currently set SQLSTATE error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	return edata->elevel;
+}
+
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 054dd2bf62..e54eca5b48 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int	internalerrquery(const char *query);
 extern int	err_generic_string(int field, const char *str);
 
 extern int	geterrcode(void);
+extern int	geterrlevel(void);
 extern int	geterrposition(void);
 extern int	getinternalerrposition(void);
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af33..19147f31e2 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.34.1

#13shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#11)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Tue, Aug 13, 2024 at 9:48 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

BTW, this needs to be backpatched till 16 when it has been introduced
by the parallel apply feature as part of commit 216a7848. So, can we
test this patch in back-branches as well?

I was able to reproduce the problem on REL_16_STABLE and REL_17_STABLE
through both the flows (shutdown and apply-error). The patch v4 fixes
the issues on both.

thanks
Shveta

#14Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#12)
1 attachment(s)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Wed, Aug 14, 2024 at 10:26 AM shveta malik <shveta.malik@gmail.com> wrote:

Thanks for the detailed analysis. I agree with your analysis that we
need to reset the origin information for the shutdown path to avoid it
being advanced incorrectly. However, the patch doesn't have sufficient
comments to explain why we need to reset it for both the ERROR and
Shutdown paths. Can we improve the comments in the patch?

Also, for the ERROR path, can we reset the origin information in
apply_error_callback()?

Please find v4 attached. Addressed comments in that.

The patch looks mostly good to me. I have slightly changed a few of
the comments in the attached. What do you think of the attached?

--
With Regards,
Amit Kapila.

Attachments:

v5-0001-Don-t-advance-origin-during-apply-failure.patchapplication/octet-stream; name=v5-0001-Don-t-advance-origin-during-apply-failure.patchDownload
From 295bcde464c3dda541da3e36ed440389a380d769 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v5] Don't advance origin during apply failure.

We advance origin progress during abort on successful streaming and
application of ROLLBACK in parallel streaming mode. But the origin
shouldn't be advanced during an error or unsuccessful apply due to
shutdown. Otherwise, it will result in a transaction loss as such a
transaction won't be sent again by the server.

Reported-by: Hou Zhijie
Author: Hayato Kuroda and Shveta Malik
Reviewed-by: Amit Kapila
Backpatch-through: 16
Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/worker.c | 35 ++++++++++++++++++++++++
 src/backend/utils/error/elog.c           | 17 ++++++++++++
 src/include/utils/elog.h                 |  1 +
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++-
 4 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cdea6295d8..bdb8fc6482 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an in-complete transaction which could
+	 * otherwise lead its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
@@ -4967,12 +4991,23 @@ void
 apply_error_callback(void *arg)
 {
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+	int			elevel;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
 	Assert(errarg->origin_name);
 
+	elevel = geterrlevel();
+
+	/*
+	 * Reset the origin state to prevent the advancement of origin progress if
+	 * we fail to apply. Otherwise, this will result in transaction loss as
+	 * that transaction won't be sent again by the server.
+	 */
+	if (elevel >= ERROR)
+		replorigin_reset(0, (Datum) 0);
+
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 943d8588f3..5cbb5b5416 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1568,6 +1568,23 @@ geterrcode(void)
 	return edata->sqlerrcode;
 }
 
+/*
+ * geterrlevel --- return the currently set error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	return edata->elevel;
+}
+
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 054dd2bf62..e54eca5b48 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int	internalerrquery(const char *query);
 extern int	err_generic_string(int field, const char *str);
 
 extern int	geterrcode(void);
+extern int	geterrlevel(void);
 extern int	geterrposition(void);
 extern int	getinternalerrposition(void);
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af33..19147f31e2 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.28.0.windows.1

#15shveta malik
shveta.malik@gmail.com
In reply to: Amit Kapila (#14)
2 attachment(s)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Tue, Aug 20, 2024 at 11:36 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Aug 14, 2024 at 10:26 AM shveta malik <shveta.malik@gmail.com> wrote:

Thanks for the detailed analysis. I agree with your analysis that we
need to reset the origin information for the shutdown path to avoid it
being advanced incorrectly. However, the patch doesn't have sufficient
comments to explain why we need to reset it for both the ERROR and
Shutdown paths. Can we improve the comments in the patch?

Also, for the ERROR path, can we reset the origin information in
apply_error_callback()?

Please find v4 attached. Addressed comments in that.

The patch looks mostly good to me. I have slightly changed a few of
the comments in the attached. What do you think of the attached?

Looks good to me. Please find backported patches attached.

thanks
Shveta

Attachments:

v1-0001-Don-t-advance-origin-during-apply-failure_PG17.patchapplication/octet-stream; name=v1-0001-Don-t-advance-origin-during-apply-failure_PG17.patchDownload
From 089abd1ddd9c9e7ccf2520fd9d9638f99715b6cc Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v1] Don't advance origin during apply failure.

We advance origin progress during abort on successful streaming and
application of ROLLBACK in parallel streaming mode. But the origin
shouldn't be advanced during an error or unsuccessful apply due to
shutdown. Otherwise, it will result in a transaction loss as such a
transaction won't be sent again by the server.

Reported-by: Hou Zhijie
Author: Hayato Kuroda and Shveta Malik
Reviewed-by: Amit Kapila
Backpatch-through: 16
Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/worker.c | 35 ++++++++++++++++++++++++
 src/backend/utils/error/elog.c           | 17 ++++++++++++
 src/include/utils/elog.h                 |  1 +
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++-
 4 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..d091a1dd27 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4639,6 +4639,17 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4667,6 +4678,19 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an in-complete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
@@ -4893,12 +4917,23 @@ void
 apply_error_callback(void *arg)
 {
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+	int			elevel;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
 	Assert(errarg->origin_name);
 
+	elevel = geterrlevel();
+
+	/*
+	 * Reset the origin state to prevent the advancement of origin progress if
+	 * we fail to apply. Otherwise, this will result in transaction loss as
+	 * that transaction won't be sent again by the server.
+	 */
+	if (elevel >= ERROR)
+		replorigin_reset(0, (Datum) 0);
+
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index d1d1632bdd..b924b524d0 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1568,6 +1568,23 @@ geterrcode(void)
 	return edata->sqlerrcode;
 }
 
+/*
+ * geterrlevel --- return the currently set error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	return edata->elevel;
+}
+
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 054dd2bf62..e54eca5b48 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int	internalerrquery(const char *query);
 extern int	err_generic_string(int field, const char *str);
 
 extern int	geterrcode(void);
+extern int	geterrlevel(void);
 extern int	geterrposition(void);
 extern int	getinternalerrposition(void);
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 9437cd4c3b..e635be74c6 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.34.1

v1-0001-Don-t-advance-origin-during-apply-failure_PG16.patchapplication/octet-stream; name=v1-0001-Don-t-advance-origin-during-apply-failure_PG16.patchDownload
From 0a76f73a39776315bbea653c0a533edd06edcc13 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Wed, 14 Aug 2024 13:02:31 +0530
Subject: [PATCH v1] Don't advance origin during apply failure.

We advance origin progress during abort on successful streaming and
application of ROLLBACK in parallel streaming mode. But the origin
shouldn't be advanced during an error or unsuccessful apply due to
shutdown. Otherwise, it will result in a transaction loss as such a
transaction won't be sent again by the server.

Reported-by: Hou Zhijie
Author: Hayato Kuroda and Shveta Malik
Reviewed-by: Amit Kapila
Backpatch-through: 16
Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/worker.c | 35 ++++++++++++++++++++++++
 src/backend/utils/error/elog.c           | 17 ++++++++++++
 src/include/utils/elog.h                 |  1 +
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++-
 4 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf764..18f86c73bd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4404,6 +4404,17 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 	pfree(syncslotname);
 }
 
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /*
  * Run the apply loop with error handling. Disable the subscription,
  * if necessary.
@@ -4553,6 +4564,19 @@ ApplyWorkerMain(Datum main_arg)
 
 	InitializeApplyWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an in-complete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	InitializingApplyWorker = false;
 
 	/* Connect to the origin and start the replication. */
@@ -4916,12 +4940,23 @@ void
 apply_error_callback(void *arg)
 {
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+	int			elevel;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
 	Assert(errarg->origin_name);
 
+	elevel = geterrlevel();
+
+	/*
+	 * Reset the origin state to prevent the advancement of origin progress if
+	 * we fail to apply. Otherwise, this will result in transaction loss as
+	 * that transaction won't be sent again by the server.
+	 */
+	if (elevel >= ERROR)
+		replorigin_reset(0, (Datum) 0);
+
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 7112fb0006..3347f24d4a 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1571,6 +1571,23 @@ geterrcode(void)
 	return edata->sqlerrcode;
 }
 
+/*
+ * geterrlevel --- return the currently set SQLSTATE error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	return edata->elevel;
+}
+
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 0292e88b4f..8bb55e5b3c 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int	internalerrquery(const char *query);
 extern int	err_generic_string(int field, const char *str);
 
 extern int	geterrcode(void);
+extern int	geterrlevel(void);
 extern int	geterrposition(void);
 extern int	getinternalerrposition(void);
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 8ce4cfc983..822932c1db 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.34.1

#16Amit Kapila
amit.kapila16@gmail.com
In reply to: shveta malik (#15)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Tue, Aug 20, 2024 at 2:01 PM shveta malik <shveta.malik@gmail.com> wrote:

On Tue, Aug 20, 2024 at 11:36 AM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Wed, Aug 14, 2024 at 10:26 AM shveta malik <shveta.malik@gmail.com> wrote:

Thanks for the detailed analysis. I agree with your analysis that we
need to reset the origin information for the shutdown path to avoid it
being advanced incorrectly. However, the patch doesn't have sufficient
comments to explain why we need to reset it for both the ERROR and
Shutdown paths. Can we improve the comments in the patch?

Also, for the ERROR path, can we reset the origin information in
apply_error_callback()?

Please find v4 attached. Addressed comments in that.

The patch looks mostly good to me. I have slightly changed a few of
the comments in the attached. What do you think of the attached?

Looks good to me. Please find backported patches attached.

Pushed.

--
With Regards,
Amit Kapila.

#17Tom Lane
tgl@sss.pgh.pa.us
In reply to: Amit Kapila (#16)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

Amit Kapila <amit.kapila16@gmail.com> writes:

On Tue, Aug 20, 2024 at 2:01 PM shveta malik <shveta.malik@gmail.com> wrote:

Looks good to me. Please find backported patches attached.

Pushed.

I came across this commit while preparing release notes, and I'm
worried about whether it doesn't create more problems than it solves.
The intent stated in the thread subject is to prevent an apply worker
from advancing past a prepared transaction if the subscriber side
doesn't permit prepared transactions. However, it appears to me that
the committed patch doesn't permit an apply worker to advance past
any failing transaction whatsoever. Was any thought given to how
a DBA would get out of such a situation and get replication flowing
again? In the prepared-xact case, it's at least clear that you
could increase max_prepared_transactions and restart the subscriber
installation. In the general case, it's not very obvious that you'd
even know what the problem is let alone have an easy way to fix it.

In other words: I thought the original design here was to
intentionally ignore apply errors and keep going, on the theory that
that was better than blocking replication altogether. This commit
has reversed that decision, on the strength of little or no
discussion AFAICS. Are we really ready to push this into minor
releases of stable branches? Is it a good idea even on HEAD?

regards, tom lane

#18Amit Kapila
amit.kapila16@gmail.com
In reply to: Tom Lane (#17)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Fri, Nov 8, 2024 at 12:53 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:

I came across this commit while preparing release notes, and I'm
worried about whether it doesn't create more problems than it solves.
The intent stated in the thread subject is to prevent an apply worker
from advancing past a prepared transaction if the subscriber side
doesn't permit prepared transactions. However, it appears to me that
the committed patch doesn't permit an apply worker to advance past
any failing transaction whatsoever. Was any thought given to how
a DBA would get out of such a situation and get replication flowing
again? In the prepared-xact case, it's at least clear that you
could increase max_prepared_transactions and restart the subscriber
installation. In the general case, it's not very obvious that you'd
even know what the problem is let alone have an easy way to fix it.

This is by design, so we don't let the apply worker proceed in case of
any ERROR. For example, the apply worker keeps retrying to apply the
transaction when there is a unique key violation error while applying
(which could be due to the subscriber side having a unique key defined
but the publisher doesn't or the subscriber already has a row with the
same value). We need manual intervention to continue the replication.
To do that, she can create a subscription with the option
'disable_on_error'. Then apply worker will stop on ERROR instead of
retrying. Then, she can either manually remove a conflicting row or
use ALTER SUBSCRIPTION ... SKIP ... to get past the conflicting/error
transaction. Alternatively, the transaction can also be skipped by
calling the pg_replication_origin_advance() function. To use SKIP or
origin_advance function, in the ERROR log we print the LSN (CONTEXT:
processing remote data for replication origin "pg_16395" during
"INSERT" for replication target relation "public.test" in transaction
725 finished at 0/14C0378). She needs to use LSN value 0/14C0378 to
skip the error transaction. We have explained this in the docs [1]https://www.postgresql.org/docs/devel/logical-replication-conflicts.html.

In other words: I thought the original design here was to
intentionally ignore apply errors and keep going, on the theory that
that was better than blocking replication altogether

No, that was not the intention because otherwise, we will silently
create inconsistency on the subscriber side.

. This commit

has reversed that decision, on the strength of little or no
discussion AFAICS. Are we really ready to push this into minor
releases of stable branches? Is it a good idea even on HEAD?

I hope the explanation above addresses your concern.

[1]: https://www.postgresql.org/docs/devel/logical-replication-conflicts.html

--
With Regards,
Amit Kapila.

#19Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Zhijie Hou (Fujitsu) (#4)
1 attachment(s)
RE: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

Hi,

When reviewing some parallel apply related codes, I noticed a bug in the
parallel apply worker, similar to the issue discussed in this thread.

The issue is that the logical replication parallel apply worker may erroneously
advance the origin progress during an error or unsuccessful apply. This can lead
to transaction loss, as these transactions will not be resent by the server.
Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker.

The original fix involved registering a before_shmem_exit callback to reset the
origin information, preventing the worker from advancing it during transaction
abortion on shutdown. The attached patch registers the same callback for the
parallel apply worker, ensuring consistent behavior across all workers.

Best Regards,
Hou zj

Attachments:

v1-0001-Fix-unexpected-origin-advancement-during-parallel.patchapplication/octet-stream; name=v1-0001-Fix-unexpected-origin-advancement-during-parallel.patchDownload
From 0d626e462fbf25f4973de5b4771835f57ed5e56d Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 22 Dec 2025 14:01:08 +0800
Subject: [PATCH v1] Fix unexpected origin advancement during parallel apply
 failure

The logical replication parallel apply worker may erroneously advance the origin
progress during an error or unsuccessful apply. This can lead to transaction
loss, as these transactions will not be resent by the server.

Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker, by registering a before_shmem_exit callback to reset the origin
information, preventing the worker from advancing it during transaction abortion
on shutdown. This commit registers the same callback for the parallel apply
worker, ensuring consistent behavior across all workers.
---
 src/backend/replication/logical/worker.c      | 30 +++++++++-------
 .../subscription/t/023_twophase_stream.pl     | 34 +++++++++++++++++++
 2 files changed, 51 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc64476a9ef..e009c0c9d48 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5849,6 +5849,23 @@ InitializeLogRepWorker(void)
 					   MySubscription->name));
 
 	CommitTransactionCommand();
+
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an in-complete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 *
+	 * Register this callback here to ensure that all types of logical
+	 * replication workers that set up origins and apply remote transactions
+	 * are protected.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
 }
 
 /*
@@ -5892,19 +5909,6 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
-	/*
-	 * Register a callback to reset the origin state before aborting any
-	 * pending transaction during shutdown (see ShutdownPostgres()). This will
-	 * avoid origin advancement for an in-complete transaction which could
-	 * otherwise lead to its loss as such a transaction won't be sent by the
-	 * server again.
-	 *
-	 * Note that even a LOG or DEBUG statement placed after setting the origin
-	 * state may process a shutdown signal before committing the current apply
-	 * operation. So, it is important to register such a callback here.
-	 */
-	before_shmem_exit(replorigin_reset, (Datum) 0);
-
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index e01347ca699..9b9f189308e 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -429,6 +429,40 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(1), 'transaction is committed on subscriber');
 
+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 0));
+$node_subscriber->restart;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(2);
+	PREPARE TRANSACTION 'xact';
+	COMMIT PREPARED 'xact';
+	});
+
+$offset = -s $node_subscriber->logfile;
+
+# Confirm the ERROR is reported because max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/,
+	$offset);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(2), 'transaction is committed on subscriber after retrying');
+
 ###############################
 # check all the cleanup
 ###############################
-- 
2.51.1.windows.1

#20Chao Li
li.evan.chao@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#19)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Dec 22, 2025, at 17:01, Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com> wrote:

Hi,

When reviewing some parallel apply related codes, I noticed a bug in the
parallel apply worker, similar to the issue discussed in this thread.

The issue is that the logical replication parallel apply worker may erroneously
advance the origin progress during an error or unsuccessful apply. This can lead
to transaction loss, as these transactions will not be resent by the server.
Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker.

The original fix involved registering a before_shmem_exit callback to reset the
origin information, preventing the worker from advancing it during transaction
abortion on shutdown. The attached patch registers the same callback for the
parallel apply worker, ensuring consistent behavior across all workers.

Best Regards,
Hou zj
<v1-0001-Fix-unexpected-origin-advancement-during-parallel.patch>

So, ParallelApplyWorkerMain() only calls InitializeLogRepWorker() but SetupApplyOrSyncWorker(), by moving before_shmem_exit() into InitializeLogRepWorker(), ParallelApplyWorkerMain()() gets the exit callback.

LGTM.

Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/

#21Chao Li
li.evan.chao@gmail.com
In reply to: Chao Li (#20)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Dec 22, 2025, at 17:28, Chao Li <li.evan.chao@gmail.com> wrote:

On Dec 22, 2025, at 17:01, Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com> wrote:

Hi,

When reviewing some parallel apply related codes, I noticed a bug in the
parallel apply worker, similar to the issue discussed in this thread.

The issue is that the logical replication parallel apply worker may erroneously
advance the origin progress during an error or unsuccessful apply. This can lead
to transaction loss, as these transactions will not be resent by the server.
Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker.

The original fix involved registering a before_shmem_exit callback to reset the
origin information, preventing the worker from advancing it during transaction
abortion on shutdown. The attached patch registers the same callback for the
parallel apply worker, ensuring consistent behavior across all workers.

Best Regards,
Hou zj
<v1-0001-Fix-unexpected-origin-advancement-during-parallel.patch>

So, ParallelApplyWorkerMain() only calls InitializeLogRepWorker() but SetupApplyOrSyncWorker(), by moving before_shmem_exit() into InitializeLogRepWorker(), ParallelApplyWorkerMain()() gets the exit callback.

LGTM.

I just noticed a nitpick while reviewing the other your patch:
```
+ * avoid origin advancement for an in-complete transaction which could
```

“In-complete” should be just “incomplete”.

Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/

#22Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#19)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Mon, Dec 22, 2025 at 2:31 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

When reviewing some parallel apply related codes, I noticed a bug in the
parallel apply worker, similar to the issue discussed in this thread.

The issue is that the logical replication parallel apply worker may erroneously
advance the origin progress during an error or unsuccessful apply. This can lead
to transaction loss, as these transactions will not be resent by the server.
Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker.

The original fix involved registering a before_shmem_exit callback to reset the
origin information, preventing the worker from advancing it during transaction
abortion on shutdown. The attached patch registers the same callback for the
parallel apply worker, ensuring consistent behavior across all workers.

Thanks for reporting the issue and patch.

+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf('postgresql.conf',

How does the test ensure that error is raised by parallel apply
worker? I see that in the previous test, we set
'debug_logical_replication_streaming = immediate', so that should help
to invoke parallel apply worker. But is there a more direct way to
ensure the same? Can we test for LOG like: "ERROR: logical
replication parallel apply worker exited due to error"?

--
With Regards,
Amit Kapila.

#23Zhijie Hou (Fujitsu)
houzj.fnst@fujitsu.com
In reply to: Amit Kapila (#22)
3 attachment(s)
RE: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Tuesday, December 23, 2025 12:21 PM Amit Kapila <amit.kapila16@gmail.com> wrote:

On Mon, Dec 22, 2025 at 2:31 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

When reviewing some parallel apply related codes, I noticed a bug in
the parallel apply worker, similar to the issue discussed in this thread.

The issue is that the logical replication parallel apply worker may
erroneously advance the origin progress during an error or
unsuccessful apply. This can lead to transaction loss, as these transactions

will not be resent by the server.

Commit 3f28b2fc addressed a similar issue in both the apply worker and
table sync worker.

The original fix involved registering a before_shmem_exit callback to
reset the origin information, preventing the worker from advancing it
during transaction abortion on shutdown. The attached patch registers
the same callback for the parallel apply worker, ensuring consistent

behavior across all workers.

Thanks for reporting the issue and patch.

+# Test the ability to re-apply a transaction when a parallel apply
+worker fails # to prepare the transaction due to insufficient
+max_prepared_transactions # setting.
+$node_subscriber->append_conf('postgresql.conf',

How does the test ensure that error is raised by parallel apply worker? I see
that in the previous test, we set 'debug_logical_replication_streaming =
immediate', so that should help to invoke parallel apply worker. But is there a
more direct way to ensure the same? Can we test for LOG like: "ERROR:
logical replication parallel apply worker exited due to error"?

OK, I have added a general log test for "ERROR .. logical replication parallel
apply worker ..." to ensure that it's the parallel apply worker that failed to
apply the transaction.

And I also addressed the comments by Li Chao.

Here are the updated patches for all branches.

Best Regards,
Hou zj

Attachments:

v2_PG16-0001-Fix-unexpected-origin-advancement-during-par.patchapplication/octet-stream; name=v2_PG16-0001-Fix-unexpected-origin-advancement-during-par.patchDownload
From 3cf0f657f4506f6710fe032c4f43553be262e89d Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Tue, 23 Dec 2025 13:38:37 +0800
Subject: [PATCH v2_PG16] Fix unexpected origin advancement during parallel
 apply failure

The logical replication parallel apply worker may erroneously advance the origin
progress during an error or unsuccessful apply. This can lead to transaction
loss, as these transactions will not be resent by the server.

Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker, by registering a before_shmem_exit callback to reset the origin
information, preventing the worker from advancing it during transaction abortion
on shutdown. This commit registers the same callback for the parallel apply
worker, ensuring consistent behavior across all workers.
---
 src/backend/replication/logical/worker.c      | 30 +++++++------
 .../subscription/t/023_twophase_stream.pl     | 45 +++++++++++++++++++
 2 files changed, 62 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9b5c641941f..d6bbffd7c8d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4544,6 +4544,23 @@ InitializeApplyWorker(void)
 						MySubscription->name)));
 
 	CommitTransactionCommand();
+
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an incomplete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 *
+	 * Register this callback here to ensure that all types of logical
+	 * replication workers that set up origins and apply remote transactions
+	 * are protected.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
 }
 
 /* Logical Replication Apply worker entry point */
@@ -4581,19 +4598,6 @@ ApplyWorkerMain(Datum main_arg)
 
 	InitializeApplyWorker();
 
-	/*
-	 * Register a callback to reset the origin state before aborting any
-	 * pending transaction during shutdown (see ShutdownPostgres()). This will
-	 * avoid origin advancement for an in-complete transaction which could
-	 * otherwise lead to its loss as such a transaction won't be sent by the
-	 * server again.
-	 *
-	 * Note that even a LOG or DEBUG statement placed after setting the origin
-	 * state may process a shutdown signal before committing the current apply
-	 * operation. So, it is important to register such a callback here.
-	 */
-	before_shmem_exit(replorigin_reset, (Datum) 0);
-
 	InitializingApplyWorker = false;
 
 	/* Connect to the origin and start the replication. */
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index 0303807846e..39ad688a7bd 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -429,6 +429,51 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(1), 'transaction is committed on subscriber');
 
+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 0
+debug_logical_replication_streaming = buffered
+));
+$node_subscriber->restart;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(2);
+	PREPARE TRANSACTION 'xact';
+	COMMIT PREPARED 'xact';
+	});
+
+$offset = -s $node_subscriber->logfile;
+
+# Confirm the ERROR is reported because max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/,
+	$offset);
+
+# Confirm that the parallel apply worker has encountered an error. The check
+# focuses on the worker type as a keyword, since the error message content may
+# differ based on whether the leader initially detected the parallel apply
+# worker's failure or received a signal from it.
+$node_subscriber->wait_for_log(
+	qr/ERROR: .*logical replication parallel apply worker.*/,
+	$offset);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(2), 'transaction is committed on subscriber after retrying');
+
 ###############################
 # check all the cleanup
 ###############################
-- 
2.51.1.windows.1

v2_PG17_PG18-0001-Fix-unexpected-origin-advancement-durin.patchapplication/octet-stream; name=v2_PG17_PG18-0001-Fix-unexpected-origin-advancement-durin.patchDownload
From 823f182da91667002da0848e3622b03e056b8120 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Tue, 23 Dec 2025 13:45:16 +0800
Subject: [PATCH v2_PG17_PG18] Fix unexpected origin advancement during
 parallel apply failure

The logical replication parallel apply worker may erroneously advance the origin
progress during an error or unsuccessful apply. This can lead to transaction
loss, as these transactions will not be resent by the server.

Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker, by registering a before_shmem_exit callback to reset the origin
information, preventing the worker from advancing it during transaction abortion
on shutdown. This commit registers the same callback for the parallel apply
worker, ensuring consistent behavior across all workers.
---
 src/backend/replication/logical/worker.c      | 30 +++++++------
 .../subscription/t/023_twophase_stream.pl     | 45 +++++++++++++++++++
 2 files changed, 62 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f77adfffc83..313d3a82660 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4673,6 +4673,23 @@ InitializeLogRepWorker(void)
 						MySubscription->name)));
 
 	CommitTransactionCommand();
+
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an incomplete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 *
+	 * Register this callback here to ensure that all types of logical
+	 * replication workers that set up origins and apply remote transactions
+	 * are protected.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
 }
 
 /*
@@ -4714,19 +4731,6 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
-	/*
-	 * Register a callback to reset the origin state before aborting any
-	 * pending transaction during shutdown (see ShutdownPostgres()). This will
-	 * avoid origin advancement for an in-complete transaction which could
-	 * otherwise lead to its loss as such a transaction won't be sent by the
-	 * server again.
-	 *
-	 * Note that even a LOG or DEBUG statement placed after setting the origin
-	 * state may process a shutdown signal before committing the current apply
-	 * operation. So, it is important to register such a callback here.
-	 */
-	before_shmem_exit(replorigin_reset, (Datum) 0);
-
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index 23c762e8adf..5a5516ba787 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -429,6 +429,51 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(1), 'transaction is committed on subscriber');
 
+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 0
+debug_logical_replication_streaming = buffered
+));
+$node_subscriber->restart;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(2);
+	PREPARE TRANSACTION 'xact';
+	COMMIT PREPARED 'xact';
+	});
+
+$offset = -s $node_subscriber->logfile;
+
+# Confirm the ERROR is reported because max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/,
+	$offset);
+
+# Confirm that the parallel apply worker has encountered an error. The check
+# focuses on the worker type as a keyword, since the error message content may
+# differ based on whether the leader initially detected the parallel apply
+# worker's failure or received a signal from it.
+$node_subscriber->wait_for_log(
+	qr/ERROR: .*logical replication parallel apply worker.*/,
+	$offset);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(2), 'transaction is committed on subscriber after retrying');
+
 ###############################
 # check all the cleanup
 ###############################
-- 
2.51.1.windows.1

v2_HEAD-0001-Fix-unexpected-origin-advancement-during-par.patchapplication/octet-stream; name=v2_HEAD-0001-Fix-unexpected-origin-advancement-during-par.patchDownload
From a6cb317116e4710c7989306848dca2610761d944 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 22 Dec 2025 14:01:08 +0800
Subject: [PATCH v2_HEAD] Fix unexpected origin advancement during parallel
 apply failure

The logical replication parallel apply worker may erroneously advance the origin
progress during an error or unsuccessful apply. This can lead to transaction
loss, as these transactions will not be resent by the server.

Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker, by registering a before_shmem_exit callback to reset the origin
information, preventing the worker from advancing it during transaction abortion
on shutdown. This commit registers the same callback for the parallel apply
worker, ensuring consistent behavior across all workers.
---
 src/backend/replication/logical/worker.c      | 30 +++++++------
 .../subscription/t/023_twophase_stream.pl     | 45 +++++++++++++++++++
 2 files changed, 62 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d1ee0261c64..718408bb599 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5849,6 +5849,23 @@ InitializeLogRepWorker(void)
 					   MySubscription->name));
 
 	CommitTransactionCommand();
+
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an incomplete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 *
+	 * Register this callback here to ensure that all types of logical
+	 * replication workers that set up origins and apply remote transactions
+	 * are protected.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
 }
 
 /*
@@ -5892,19 +5909,6 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
-	/*
-	 * Register a callback to reset the origin state before aborting any
-	 * pending transaction during shutdown (see ShutdownPostgres()). This will
-	 * avoid origin advancement for an in-complete transaction which could
-	 * otherwise lead to its loss as such a transaction won't be sent by the
-	 * server again.
-	 *
-	 * Note that even a LOG or DEBUG statement placed after setting the origin
-	 * state may process a shutdown signal before committing the current apply
-	 * operation. So, it is important to register such a callback here.
-	 */
-	before_shmem_exit(replorigin_reset, (Datum) 0);
-
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index e01347ca699..dc629425daa 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -429,6 +429,51 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(1), 'transaction is committed on subscriber');
 
+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 0
+debug_logical_replication_streaming = buffered
+));
+$node_subscriber->restart;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(2);
+	PREPARE TRANSACTION 'xact';
+	COMMIT PREPARED 'xact';
+	});
+
+$offset = -s $node_subscriber->logfile;
+
+# Confirm the ERROR is reported because max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/,
+	$offset);
+
+# Confirm that the parallel apply worker has encountered an error. The check
+# focuses on the worker type as a keyword, since the error message content may
+# differ based on whether the leader initially detected the parallel apply
+# worker's failure or received a signal from it.
+$node_subscriber->wait_for_log(
+	qr/ERROR: .*logical replication parallel apply worker.*/,
+	$offset);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(2), 'transaction is committed on subscriber after retrying');
+
 ###############################
 # check all the cleanup
 ###############################
-- 
2.51.1.windows.1

#24Amit Kapila
amit.kapila16@gmail.com
In reply to: Zhijie Hou (Fujitsu) (#23)
Re: [bug fix] prepared transaction might be lost when max_prepared_transactions is zero on the subscriber

On Tue, Dec 23, 2025 at 12:12 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:

Here are the updated patches for all branches.

Pushed.

--
With Regards,
Amit Kapila.