Reduce useless changes before reassembly during logical replication
Hi hackers,
During logical replication, if there is a large write transaction, some
spill files will be written to disk, depending on the setting of
logical_decoding_work_mem.
This behavior can effectively avoid OOM, but if the transaction
generates a lot of change before commit, a large number of files may
fill the disk. For example, you can update a TB-level table.
Of course, this is also inevitable.
But I found an inelegant phenomenon. If the updated large table is not
published, its changes will also be written with a large number of spill files.
Look at an example below:
publisher:
```
create table tbl_pub(id int, val1 text, val2 text,val3 text);
create table tbl_t1(id int, val1 text, val2 text,val3 text);
CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;
```
subscriber:
```
create table tbl_pub(id int, val1 text, val2 text,val3 text);
create table tbl_t1(id int, val1 text, val2 text,val3 text);
CREATE SUBSCRIPTION mysub CONNECTION 'host=127.0.0.1 port=5432
user=postgres dbname=postgres' PUBLICATION mypub;
```
publisher:
```
begin;
insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',
i),repeat('dfds', i) from generate_series(0,999999) i;
```
Later you will see a large number of spill files in the
"/$PGDATA/pg_replslot/mysub/" directory.
```
$ll -sh
total 4.5G
4.0K -rw------- 1 postgres postgres 200 Nov 30 09:24 state
17M -rw------- 1 postgres postgres 17M Nov 30 08:22 xid-750-lsn-0-10000000.spill
12M -rw------- 1 postgres postgres 12M Nov 30 08:20 xid-750-lsn-0-1000000.spill
17M -rw------- 1 postgres postgres 17M Nov 30 08:23 xid-750-lsn-0-11000000.spill
......
```
We can see that table tbl_t1 is not published in mypub. It also won't be sent
downstream because it's not subscribed.
After the transaction is reorganized, the pgoutput decoding plugin filters out
changes to these unpublished relationships when sending logical changes.
See function pgoutput_change.
Most importantly, if we filter out unpublished relationship-related
changes after
constructing the changes but before queuing the changes into a transaction,
will it reduce the workload of logical decoding and avoid disk or memory growth
as much as possible?
Attached is the patch I used to implement this optimization.
Design:
1. Added a callback LogicalDecodeFilterByRelCB for the output plugin.
2. Added this callback function pgoutput_table_filter for the pgoutput plugin.
Its main implementation is based on the table filter in the
pgoutput_change function.
Its main function is to determine whether the change needs to be published based
on the parameters of the publication, and if not, filter it.
3. After constructing a change and before Queue a change into a transaction,
use RelidByRelfilenumber to obtain the relation associated with the change,
just like obtaining the relation in the ReorderBufferProcessTXN function.
4. Relation may be a toast, and there is no good way to get its real
table relation based on toast relation. Here, I get the real table oid
through toast relname, and then get the real table relation.
5. This filtering takes into account INSERT/UPDATE/INSERT. Other
changes have not been considered yet and can be expanded in the future.
Test:
1. Added a test case 034_table_filter.pl
2. Like the case above, create two tables, the published table tbl_pub and
the non-published table tbl_t1
3. Insert 10,000 rows of toast data into tbl_t1 on the publisher, and use
pg_ls_replslotdir to record the total size of the slot directory
every second.
4. Compare the size of the slot directory at the beginning of the
transaction(size1),
the size at the end of the transaction (size2), and the average
size of the entire process(size3).
5. Assert(size1==size2==size3)
Sincerely look forward to your feedback.
Regards, lijie
Attachments:
v1-Reduce-useless-changes-before-reassemble-during-logi.patchapplication/octet-stream; name=v1-Reduce-useless-changes-before-reassemble-during-logi.patchDownload
From 00975d764175454fcb78e4ec56fbd7069ab3bcbd Mon Sep 17 00:00:00 2001
From: "adger.lj" <adger.lj@alibaba-inc.com>
Date: Thu, 7 Dec 2023 15:51:12 +0800
Subject: [PATCH] Reduce useless changes before reassemble during logical
replication
In order to reduce unnecessary logical replication, irrelevant relationship
changes can be filtered out before reorganizing transaction fragments.
This can effectively reduce useless changes and prevent storage space from
being filled up with irrelevant data.
By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin.
We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache
is reused to determine whether a relationship-related change should be filtered out.
referring to the implementation of the function pgoutput_change, currently only
insert/update/delete is can filtered, and other types of changes are not considered.
Perhaps more detailed analysis can be done and more filters can be filtered.
Most of the code in the FilterByTable function is transplanted from the ReorderBufferProcessTXN
function, which can be called before the ReorderBufferQueueChange function.It is
also the encapsulation of the callback function filter_by_table_cb in logical.c.
In general, this patch concentrates the filtering of changes in the ReorderBufferProcessTXN
function and the pgoutput_change function into the FilterByTable function, and calls
it before the ReorderBufferQueueChange function.
---
src/backend/replication/logical/decode.c | 153 +++++++++++++++++++-
src/backend/replication/logical/logical.c | 31 ++++
src/backend/replication/pgoutput/pgoutput.c | 89 +++++++-----
src/include/replication/logical.h | 2 +
src/include/replication/output_plugin.h | 7 +
src/test/subscription/t/034_table_filter.pl | 91 ++++++++++++
6 files changed, 335 insertions(+), 38 deletions(-)
create mode 100644 src/test/subscription/t/034_table_filter.pl
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1237118e84..615b059890 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
#include "access/xlogrecord.h"
#include "access/xlogutils.h"
#include "catalog/pg_control.h"
+#include "common/string.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/message.h"
@@ -42,6 +43,7 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
+#include "utils/relfilenumbermap.h"
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -155,7 +157,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_PARAMETER_CHANGE:
{
xl_parameter_change *xlrec =
- (xl_parameter_change *) XLogRecGetData(buf->record);
+ (xl_parameter_change *) XLogRecGetData(buf->record);
/*
* If wal_level on the primary is reduced to less than
@@ -581,6 +583,134 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
+static bool
+FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change)
+{
+ ReorderBuffer *rb = ctx->reorder;
+ Relation relation = NULL;
+ Oid reloid;
+ bool result = false;
+ bool using_subtxn;
+
+ if (ctx->callbacks.filter_by_table_cb == NULL)
+ return false;
+
+ switch (change->action)
+ {
+ /* intentionally fall through */
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
+ break;
+ default:
+ return false;
+ }
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally. That also allows us to easily enforce that nothing writes
+ * to the database by checking for xid assignments.
+ *
+ * When we're called via the SQL SRF there's already a transaction
+ * started, so start an explicit subtransaction there.
+ */
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("filter change by table");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
+ change->data.tp.rlocator.relNumber);
+ if (reloid == InvalidOid)
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+ reloid,
+ relpathperm(change->data.tp.rlocator,
+ MAIN_FORKNUM));
+
+ if (!RelationIsLogicallyLogged(relation))
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ /*
+ * Ignore temporary heaps created during DDL unless the plugin has asked
+ * for them.
+ */
+ if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ /*
+ * For now ignore sequence changes entirely. Most of the time they don't
+ * log changes using records we understand, so it doesn't make sense to
+ * handle the few cases we do.
+ */
+ if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ if (IsToastRelation(relation))
+ {
+ Oid real_reloid = InvalidOid;
+
+ /* pg_toast_ len is 9 */
+ char *toast_name = RelationGetRelationName(relation);
+ char *start_ch = &toast_name[9];
+
+ real_reloid = strtoint(start_ch, NULL, 10);
+
+ if (real_reloid == InvalidOid)
+ elog(ERROR, "cannot get the real table oid for toast table %s, error: %m", toast_name);
+
+ RelationClose(relation);
+
+ relation = RelationIdGetRelation(real_reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open real relation with OID %u (for toast table filenumber \"%s\")",
+ reloid,
+ relpathperm(change->data.tp.rlocator,
+ MAIN_FORKNUM));
+ }
+
+ result = filter_by_table_cb_wrapper(ctx, relation, change);
+
+filter_done:
+
+ if (result && RelationIsValid(relation))
+ elog(DEBUG1, "logical filter change by table %s", RelationGetRelationName(relation));
+
+ if (RelationIsValid(relation))
+ RelationClose(relation);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ if (result)
+ ReorderBufferReturnChange(rb, change, false);
+
+ return result;
+}
+
/*
* Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
*/
@@ -940,6 +1070,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change,
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
@@ -1009,6 +1142,9 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
@@ -1065,6 +1201,9 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
@@ -1201,11 +1340,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
change->data.tp.clear_toast_afterwards = false;
- ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
- buf->origptr, change, false);
-
/* move to the next xl_multi_insert_tuple entry */
data += datalen;
+
+ if (FilterByTable(ctx, change))
+ continue;;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change, false);
}
Assert(data == tupledata + tuplelen);
}
@@ -1240,6 +1382,9 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8288da5277..f5da5199dc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1225,6 +1225,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
return ret;
}
+bool
+filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_by_table";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+ ctx->end_xact = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_by_table_cb(ctx, relation, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ return ret;
+}
+
static void
message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f9ed1083df..a02cab94d3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static bool pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+ Relation relation,
+ ReorderBufferChange *change);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -259,6 +262,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->filter_by_table_cb = pgoutput_table_filter;
cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
@@ -1410,9 +1414,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- if (!is_publishable_relation(relation))
- return;
-
/*
* Remember the xid for the change in streaming mode. We need to send xid
* with each change in the streaming mode so that subscriber can make
@@ -1423,37 +1424,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
-
- /* First check the table filter */
- switch (action)
- {
- case REORDER_BUFFER_CHANGE_INSERT:
- if (!relentry->pubactions.pubinsert)
- return;
- break;
- case REORDER_BUFFER_CHANGE_UPDATE:
- if (!relentry->pubactions.pubupdate)
- return;
- break;
- case REORDER_BUFFER_CHANGE_DELETE:
- if (!relentry->pubactions.pubdelete)
- return;
-
- /*
- * This is only possible if deletes are allowed even when replica
- * identity is not defined for a table. Since the DELETE action
- * can't be published, we simply return.
- */
- if (!change->data.tp.oldtuple)
- {
- elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
- return;
- }
- break;
- default:
- Assert(false);
- }
-
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
@@ -1679,6 +1649,57 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
return false;
}
+/*
+ * Return true if the relation has not been published, false otherwise.
+ */
+static bool
+pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+ Relation relation,
+ ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ RelationSyncEntry *relentry;
+ ReorderBufferChangeType action = change->action;
+
+ if (!is_publishable_relation(relation))
+ return true;
+
+ relentry = get_rel_sync_entry(data, relation);
+
+ /* First check the table filter */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ if (!relentry->pubactions.pubinsert)
+ return true;
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ if (!relentry->pubactions.pubupdate)
+ return true;
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (!relentry->pubactions.pubdelete)
+ return true;
+
+ /*
+ * This is only possible if deletes are allowed even when replica
+ * identity is not defined for a table. Since the DELETE action
+ * can't be published, we simply return.
+ */
+ if (!change->data.tp.oldtuple)
+ {
+ elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+ return true;
+ }
+ break;
+ default:
+ Assert(false);
+ }
+
+ return false;
+}
+
+
/*
* Shutdown the output plugin.
*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dffc0d1564..d89467c744 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -145,6 +145,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation,
+ ReorderBufferChange *change);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2ffcf17505..84c3bcb200 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -96,6 +96,12 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
+/*
+ * Filter changes by table.
+ */
+typedef bool (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx,
+ Relation relation, ReorderBufferChange *change);
+
/*
* Called to shutdown an output plugin.
*/
@@ -222,6 +228,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeFilterByRelCB filter_by_table_cb;
LogicalDecodeShutdownCB shutdown_cb;
/* streaming of changes at prepare time */
diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl
new file mode 100644
index 0000000000..8d4f962adc
--- /dev/null
+++ b/src/test/subscription/t/034_table_filter.pl
@@ -0,0 +1,91 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "create table tbl_pub(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+ "create table tbl_t1(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;");
+$node_publisher->safe_psql('postgres',
+qq(
+CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$
+DECLARE
+ replication_record pg_stat_replication;
+BEGIN
+ LOOP
+ SELECT *
+ INTO replication_record
+ FROM pg_stat_replication
+ WHERE application_name = 'mysub';
+
+ IF replication_record.replay_lsn = replication_record.write_lsn THEN
+ EXIT;
+ END IF;
+
+ PERFORM pg_sleep(1);
+ END LOOP;
+END;
+\$\$ LANGUAGE plpgsql;));
+
+# Create some preexisting content on subscriber
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+ "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+ "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
+
+# test filter
+$node_publisher->safe_psql('postgres',
+qq(BEGIN;
+insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,9999) i;
+update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001;
+select check_replication_status();
+insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+COMMIT;)
+);
+
+my $minsize = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 order by size asc limit 1;");
+my $maxsize = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 order by size desc limit 1;");
+is($minsize, $maxsize, 'check decode filter table between maxsize and minsize');
+
+
+my $fristrow = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 where id = 1;");
+my $lastrow = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 where id = 10001;");
+is($minsize, $maxsize, 'check decode filter table between fristrow and lastrow');
+
+is($minsize, $lastrow, 'check decode filter table between minsize and lastrow');
+
+print "minsize: " . $minsize . "maxsize: " . $maxsize ."fristrow: " . $fristrow ."lastrow: " . $lastrow . "\n";
+
+done_testing();
\ No newline at end of file
--
2.39.3
Hi Jie,
Most importantly, if we filter out unpublished relationship-related
changes after
constructing the changes but before queuing the changes into a transaction,
will it reduce the workload of logical decoding and avoid disk or memory growth
as much as possible?
Thanks for the report!
Discarding the unused changes as soon as possible looks like a valid
optimization for me, but I pretty like more experienced people have a
double check.
Attached is the patch I used to implement this optimization.
After a quick look at the patch, I found FilterByTable is too expensive
because of the StartTransaction and AbortTransaction. With your above
setup and run the below test:
insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',
i),repeat('dfds', i) from generate_series(0,999100) i;
perf the wal sender of mypub for 30 seconds, then I get:
- 22.04% 1.53% postgres postgres [.] FilterByTable - 20.51% FilterByTable
AbortTransaction ResourceOwnerReleaseInternal LockReleaseAll hash_seq_search
The main part comes from AbortTransaction, and the 20% is not trivial.
From your patch:
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally.
+ ....
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("filter change by table");
+ else
+ StartTransactionCommand();
Acutally FilterByTable here is simpler than "decoding", we access
syscache only when we find an entry in get_rel_sync_entry and the
replicate_valid is false, and the invalid case should rare.
What I'm thinking now is we allow the get_rel_sync_sync_entry build its
own transaction state *only when it find a invalid entry*. if the caller
has built it already, like the existing cases in master, nothing will
happen except a simple transaction state check. Then in the
FilterByTable case we just leave it for get_rel_sync_sync_entry. See the
attachemnt for the idea.
--
Best Regards
Andy Fan
Attachments:
v1-0001-Make-get_rel_sync_entry-less-depending-on-transac.patchtext/x-diffDownload
From 90b8df330df049bd1d7e881dc6e9b108c17b0924 Mon Sep 17 00:00:00 2001
From: "yizhi.fzh" <yizhi.fzh@alibaba-inc.com>
Date: Wed, 21 Feb 2024 18:40:03 +0800
Subject: [PATCH v1 1/1] Make get_rel_sync_entry less depending on transaction
state.
get_rel_sync_entry needs transaction only a replicate_valid = false
entry is found, this should be some rare case. However the caller can't
know if a entry is valid, so they have to prepare the transaction state
before calling this function. Such preparation is expensive.
This patch makes the get_rel_sync_entry can manage a transaction stage
only if necessary. so the callers don't need to prepare it blindly.
---
src/backend/replication/pgoutput/pgoutput.c | 60 ++++++++++++++++++++-
1 file changed, 59 insertions(+), 1 deletion(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..25e55590a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/tupconvert.h"
+#include "access/relation.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
@@ -214,6 +215,11 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static RelationSyncEntry *get_rel_sync_entry_by_relid(PGOutputData *data,
+ Oid relid);
+static RelationSyncEntry *get_rel_sync_entry_internal(PGOutputData *data,
+ Relation relation,
+ Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -1962,11 +1968,29 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Relation relation)
+{
+ return get_rel_sync_entry_internal(data, relation, InvalidOid);
+}
+
+static RelationSyncEntry *
+__attribute__ ((unused))
+get_rel_sync_entry_by_relid(PGOutputData *data, Oid relid)
+{
+ return get_rel_sync_entry_internal(data, NULL, relid);
+}
+
+static RelationSyncEntry *
+get_rel_sync_entry_internal(PGOutputData *data, Relation relation, Oid oid)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
- Oid relid = RelationGetRelid(relation);
+ Oid relid = OidIsValid(oid) ? oid: RelationGetRelid(relation);
+ bool started_xact = false, using_subtxn;
+
+ /* either oid or relation is provided. */
+ Assert(OidIsValid(oid) || RelationIsValid(relation));
+ Assert(!(OidIsValid(oid) && RelationIsValid(relation)));
Assert(RelationSyncCache != NULL);
@@ -1993,6 +2017,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->attrmap = NULL;
}
+ if (!entry->replicate_valid && !IsTransactionOrTransactionBlock())
+ {
+ /*
+ * Validating the entry needs to access syscache, which must
+ * be in a transaction state, if that's not ready, start one.
+ */
+
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction(__func__);
+ else
+ StartTransactionCommand();
+
+ started_xact = true;
+ }
+
/* Validate the entry */
if (!entry->replicate_valid)
{
@@ -2198,9 +2239,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
entry->pubactions.pubdelete)
{
+ bool rel_opened = false;
+
+ if (!RelationIsValid(relation))
+ {
+ relation = relation_open(oid, AccessShareLock);
+ rel_opened = true;
+ }
/* Initialize the tuple slot and map */
init_tuple_slot(data, relation, entry);
+ if (rel_opened)
+ relation_close(relation, AccessShareLock);
+
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
@@ -2215,6 +2266,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->replicate_valid = true;
}
+ if (started_xact)
+ {
+ AbortCurrentTransaction();
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+
return entry;
}
--
2.34.1