logical replication: patch to ensure timely cleanup of aborted transactions in ReorderBuffer

Started by Vaijayanti Bharadwajabout 1 year ago2 messages
#1Vaijayanti Bharadwaj
vaijayanti.bharadwaj@enterprisedb.com
2 attachment(s)

Hello,

This patch is a proposed fix for an issue that can occur in logical
replication where:
1. streaming of transactions is disabled.
2. First, say there is a transaction that's PREPAREd. This prepared
transaction is not resolved for some time. It holds back the
oldestRunningXid.
3. Large transactions with xid higher than that of the prepared transaction
spill to disk as they are decoded in the ReorderBuffer.
4. There is an unclean shutdown due to which the large transactions get
aborted. But there is no explicit abort record.
5. On startup, the ReorderBuffer continues to decode and hold the aborted
transactions and they continue to accumulate as spill files.
6. Once the prepared transaction is resolved, these transactions will be
cleaned up, but if there is a delay, the aborted transactions could hold up
considerable space.
(A customer had run into this issue at EDB)

0001: is a TAP test that reproduces this and tests for this. It fails
without this and passes with it.
0002: is the patch that fixes this issue.

Thanks,
Vaijayanti

Attachments:

0001-TAP-test-that-passes-with-the-ReorderBuffer-cleanup-.patchapplication/octet-stream; name=0001-TAP-test-that-passes-with-the-ReorderBuffer-cleanup-.patchDownload
From 671c13cc6689654c261fd1d36b64a6c757e8f212 Mon Sep 17 00:00:00 2001
From: Vaijayanti Bharadwaj <vaijayanti.bharadwaj@enterprisedb.com>
Date: Tue, 3 Dec 2024 18:25:25 +0530
Subject: [PATCH 1/2] TAP test that passes with the ReorderBuffer cleanup fix,
 and fails without it

---
 .../subscription/t/050_cleanup_after_crash.pl | 118 ++++++++++++++++++
 1 file changed, 118 insertions(+)
 create mode 100644 src/test/subscription/t/050_cleanup_after_crash.pl

diff --git a/src/test/subscription/t/050_cleanup_after_crash.pl b/src/test/subscription/t/050_cleanup_after_crash.pl
new file mode 100644
index 0000000000..0246258aa7
--- /dev/null
+++ b/src/test/subscription/t/050_cleanup_after_crash.pl
@@ -0,0 +1,118 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Tests to show spill file accumulation when a prepared transaction
+# that is not committed, holds back the lowest xid.
+# And there are incomplete large transactions that get aborted
+# on an unclean shutdown, causing spill files to take up disk space.
+#
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	"max_prepared_transactions = 10
+	 logical_decoding_work_mem = 64kB");
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	"max_prepared_transactions = 10");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rep (a int primary key, value text)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rep SELECT x, md5(x::text) FROM generate_series(1,10) x");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rep (a int primary key, value text)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = off)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(10), 'initial data synced for first sub');
+
+# Create a prepared transaction that holds back the min xid.
+$node_publisher->safe_psql('postgres',
+	qq[
+		BEGIN;
+		INSERT INTO tab_rep SELECT x, md5(x::text) FROM generate_series(11, 100) x;
+		PREPARE TRANSACTION 'TEST1';
+	]
+);
+
+my $psql_session = $node_publisher->background_psql('postgres');
+$psql_session->query_safe(
+	qq[
+		BEGIN;
+		INSERT INTO tab_rep SELECT x, md5(x::text) FROM generate_series(101, 1000000) x;
+	]
+);
+my $running_xid=$psql_session->query_safe("SELECT txid_current();");
+
+sleep(10);
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+sleep(5);
+$result=$node_publisher->safe_psql('postgres',
+	qq[
+		SELECT count(*) = 0 FROM
+		(SELECT pg_ls_dir('pg_replslot/tap_sub')) AS s
+		WHERE s.pg_ls_dir LIKE '%xid-] . $running_xid . q[%']
+	);
+is($result, 't', 'no spill files left over from incomplete transaction');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rep values(generate_series(101, 200));");
+
+$result=$node_publisher->safe_psql('postgres', q[select count(*) = 0 from
+	(select pg_ls_dir('pg_replslot/tap_sub')) as s
+	where s.pg_ls_dir like '%xid-] . $running_xid . q[%']);
+is($result, 't', 'no spill files left over from incomplete transaction');
+
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'TEST1'");
+
+$node_publisher->poll_query_until('postgres', q[select count(*) = 0 from
+    (select pg_ls_dir('pg_replslot/tap_sub')) as s
+    where s.pg_ls_dir like '%xid-] . $running_xid . q[%'])
+  or die "Timed out while waiting for spill files to disappear";
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+# check subscriptions are removed
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'second and third sub are dropped');
+
+# remove the conflicting data
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep");
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.46.0

0002-cleanup-incomplete-transactions-on-startup-in-Reorde.patchapplication/octet-stream; name=0002-cleanup-incomplete-transactions-on-startup-in-Reorde.patchDownload
From 159a9f338412a6ecae675597f55b564b74afb134 Mon Sep 17 00:00:00 2001
From: Vaijayanti Bharadwaj <vaijayanti.bharadwaj@enterprisedb.com>
Date: Tue, 3 Dec 2024 18:36:23 +0530
Subject: [PATCH 2/2] cleanup incomplete transactions on startup in
 ReorderBuffer

---
 src/backend/replication/logical/decode.c      | 18 +++++-
 .../replication/logical/reorderbuffer.c       | 61 +++++++++++++++++++
 src/include/replication/reorderbuffer.h       |  1 +
 3 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e73576ad12..eb95ced41d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -136,10 +136,26 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	switch (info)
 	{
-			/* this is also used in END_OF_RECOVERY checkpoints */
+			/*
+			 * this is also used in END_OF_RECOVERY checkpoints
+			 * This is a point where, if there are any transactions with
+			 * with ReorderBuffer, that are incomplete (they do not have a
+			 * commit, abort or prepare record),then they must be aborted.
+			 * They exist due to a prior unclean termination, which did not allow
+			 * logging an abort.
+			 * If not aborted, these transactions may be held by the ReorderBuffer
+			 * for long periods of time, especially if a prepared transaction
+			 * precedes it that does not get committed or aborted and holds down
+			 * the oldestRunningXid
+			 */
 		case XLOG_CHECKPOINT_SHUTDOWN:
 		case XLOG_END_OF_RECOVERY:
 			SnapBuildSerializationPoint(builder, buf->origptr);
+			elog(DEBUG2,
+				"decoding: recovery/shutdown checkpoint at lsn=%X/%X",
+				(uint32)(buf->origptr >> 32),
+				(uint32)(buf->origptr));
+			ReorderBufferAbortOlderThanLSN(ctx->reorder, buf->origptr);
 
 			break;
 		case XLOG_CHECKPOINT_ONLINE:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e3a5c7b660..ed123f4169 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2965,6 +2965,67 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
+/*
+ * Abort all transactions that aren't actually running anymore because the
+ * server restarted.
+ * This is called since there may be prepared transactions holding back
+ * back the oldestRunningXid. This is a version of ReorderBufferAbortOld
+ * that aborts all older transactions, that are not in prepared state.
+ */
+void
+ReorderBufferAbortOlderThanLSN(ReorderBuffer *rb, XLogRecPtr cutoff)
+{
+	dlist_mutable_iter it;
+
+	/*
+	 * Iterate through all (potential) toplevel TXNs and abort all that are
+	 * older than what possibly can be running. Once we've found the first
+	 * that is alive we stop, there might be some that acquired an xid earlier
+	 * but started writing later, but it's unlikely and they will be cleaned
+	 * up in a later call to this function.
+	 */
+	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
+	{
+		ReorderBufferTXN *txn;
+
+		txn = dlist_container(ReorderBufferTXN, node, it.cur);
+
+		if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
+		{
+			elog(DEBUG2, "txn: %u is prepared and cannot be cleaned up", txn->xid);
+		}
+		else if (txn->first_lsn < cutoff)
+		{
+			/*
+			 * We set final_lsn on a transaction when we decode its commit or
+			 * abort record, but we never see those records for crashed
+			 * transactions.  To ensure cleanup of these transactions, set
+			 * final_lsn to that of their last change; this causes
+			 * ReorderBufferRestoreCleanup to do the right thing.
+			 */
+			if (rbtxn_is_serialized(txn) && txn->final_lsn == InvalidXLogRecPtr)
+			{
+				ReorderBufferChange *last =
+				dlist_tail_element(ReorderBufferChange, node, &txn->changes);
+
+				txn->final_lsn = last->lsn;
+			}
+
+			elog(DEBUG2, "aborting an incomplete transaction %u", txn->xid);
+
+			/* remove potential on-disk data, and deallocate this tx */
+			ReorderBufferCleanupTXN(rb, txn);
+		}
+		else if (txn->first_lsn >= cutoff)
+		{
+			/* Should not reach here */
+			elog(FATAL,
+				"ReorderBufferAbortOlderThanLSN: found transaction in the future: %u",
+				txn->xid);
+		}
+	}
+}
+
 /*
  * Abort all transactions that aren't actually running anymore because the
  * server restarted.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3bc365a7b0..4762ef14f7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -698,6 +698,7 @@ extern void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 							   TimestampTz abort_time);
 extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
+extern void ReorderBufferAbortOlderThanLSN(ReorderBuffer *rb, XLogRecPtr cutoff);
 extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 
-- 
2.46.0

#2Ashutosh Bapat
ashutosh.bapat.oss@gmail.com
In reply to: Vaijayanti Bharadwaj (#1)
Re: logical replication: patch to ensure timely cleanup of aborted transactions in ReorderBuffer

Hi Vaijayanti,

On Fri, Dec 6, 2024 at 2:50 PM Vaijayanti Bharadwaj
<vaijayanti.bharadwaj@enterprisedb.com> wrote:

Hello,

This patch is a proposed fix for an issue that can occur in logical replication where:
1. streaming of transactions is disabled.
2. First, say there is a transaction that's PREPAREd. This prepared transaction is not resolved for some time. It holds back the oldestRunningXid.
3. Large transactions with xid higher than that of the prepared transaction spill to disk as they are decoded in the ReorderBuffer.
4. There is an unclean shutdown due to which the large transactions get aborted. But there is no explicit abort record.
5. On startup, the ReorderBuffer continues to decode and hold the aborted transactions and they continue to accumulate as spill files.
6. Once the prepared transaction is resolved, these transactions will be cleaned up, but if there is a delay, the aborted transactions could hold up considerable space.
(A customer had run into this issue at EDB)

0001: is a TAP test that reproduces this and tests for this. It fails without this and passes with it.
0002: is the patch that fixes this issue.

A similar problem was discussed at [1]/messages/by-id/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com. That thread seems to have
halted without a commit. Can you please check if the solution there
works for you? If so review the patch and help move the thread ahead.

[1]: /messages/by-id/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com

--
Best Wishes,
Ashutosh Bapat