Reduce useless changes before reassembly during logical replication
Hi hackers,
During logical replication, if there is a large write transaction, some
spill files will be written to disk, depending on the setting of
logical_decoding_work_mem.
This behavior can effectively avoid OOM, but if the transaction
generates a lot of change before commit, a large number of files may
fill the disk. For example, you can update a TB-level table.
However, I found an inelegant phenomenon. If the modified large table is not
published, its changes will also be written with a large number of spill files.
Look at an example below:
publisher:
```
create table tbl_pub(id int, val1 text, val2 text,val3 text);
create table tbl_t1(id int, val1 text, val2 text,val3 text);
CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;
```
subscriber:
```
create table tbl_pub(id int, val1 text, val2 text,val3 text);
create table tbl_t1(id int, val1 text, val2 text,val3 text);
CREATE SUBSCRIPTION mysub CONNECTION 'host=127.0.0.1 port=5432
user=postgres dbname=postgres' PUBLICATION mypub;
```
publisher:
```
begin;
insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',
i),repeat('dfds', i) from generate_series(0,999999) i;
```
Later you will see a large number of spill files in the
"/$PGDATA/pg_replslot/mysub/" directory.
```
$ll -sh
total 4.5G
4.0K -rw------- 1 postgres postgres 200 Nov 30 09:24 state
17M -rw------- 1 postgres postgres 17M Nov 30 08:22 xid-750-lsn-0-10000000.spill
12M -rw------- 1 postgres postgres 12M Nov 30 08:20 xid-750-lsn-0-1000000.spill
17M -rw------- 1 postgres postgres 17M Nov 30 08:23 xid-750-lsn-0-11000000.spill
......
```
We can see that table tbl_t1 is not published in mypub. It also won't be sent
downstream because it's not subscribed.
After the transaction is reorganized, the pgoutput decoding plugin filters out
changes to these unpublished relationships when sending logical changes.
See function pgoutput_change.
Most importantly, if we filter out unpublished relationship-related
changes after constructing the changes but before queuing the changes
into a transaction, will it reduce the workload of logical decoding
and avoid disk
or memory growth as much as possible?
The patch in the attachment is a prototype, which can effectively reduce the
memory and disk space usage during logical replication.
Design:
1. Added a callback LogicalDecodeFilterByRelCB for the output plugin.
2. Added this callback function pgoutput_table_filter for the pgoutput plugin.
Its main implementation is based on the table filter in the
pgoutput_change function.
Its main function is to determine whether the change needs to be published based
on the parameters of the publication, and if not, filter it.
3. After constructing a change and before Queue a change into a transaction,
use RelidByRelfilenumber to obtain the relation associated with the change,
just like obtaining the relation in the ReorderBufferProcessTXN function.
4. Relation may be a toast, and there is no good way to get its real
table relation based on toast relation. Here, I get the real table oid
through toast relname, and then get the real table relation.
5. This filtering takes into account INSERT/UPDATE/INSERT. Other
changes have not been considered yet and can be expanded in the future.
Test:
1. Added a test case 034_table_filter.pl
2. Like the case above, create two tables, the published table tbl_pub and
the non-published table tbl_t1
3. Insert 10,000 rows of toast data into tbl_t1 on the publisher, and use
pg_ls_replslotdir to record the total size of the slot directory
every second.
4. Compare the size of the slot directory at the beginning of the
transaction(size1),
the size at the end of the transaction (size2), and the average
size of the entire process(size3).
5. Assert(size1==size2==size3)
Sincerely look forward to your feedback.
Regards, lijie
Attachments:
v1-Reduce-useless-changes-before-reassembly-during-logi.patchapplication/octet-stream; name=v1-Reduce-useless-changes-before-reassembly-during-logi.patchDownload
From 60ecc56984f0c2105a9eee24f2f73d05ee6ff1f3 Mon Sep 17 00:00:00 2001
From: "adger.lj" <adger.lj@alibaba-inc.com>
Date: Thu, 7 Dec 2023 15:51:12 +0800
Subject: [PATCH] Reduce useless changes before reassembly during logical
replication
In order to reduce unnecessary logical replication, irrelevant relationship
changes can be filtered out before reorganizing transaction fragments.
This can effectively reduce useless changes and prevent storage space from
being filled up with irrelevant data.
By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin.
We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache
is reused to determine whether a relationship-related change should be filtered out.
referring to the implementation of the function pgoutput_change, currently only
insert/update/delete is can filtered, and other types of changes are not considered.
Perhaps more detailed analysis can be done and more filters can be filtered.
Most of the code in the FilterByTable function is transplanted from the ReorderBufferProcessTXN
function, which can be called before the ReorderBufferQueueChange function.It is
also the encapsulation of the callback function filter_by_table_cb in logical.c.
In general, this patch concentrates the filtering of changes in the ReorderBufferProcessTXN
function and the pgoutput_change function into the FilterByTable function, and calls
it before the ReorderBufferQueueChange function.
---
src/backend/replication/logical/decode.c | 153 +++++++++++++++++++-
src/backend/replication/logical/logical.c | 31 ++++
src/backend/replication/pgoutput/pgoutput.c | 89 +++++++-----
src/include/replication/logical.h | 2 +
src/include/replication/output_plugin.h | 7 +
src/test/subscription/t/034_table_filter.pl | 91 ++++++++++++
6 files changed, 335 insertions(+), 38 deletions(-)
create mode 100644 src/test/subscription/t/034_table_filter.pl
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index b3f8f908d1..47cda524bd 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
#include "access/xlogrecord.h"
#include "access/xlogutils.h"
#include "catalog/pg_control.h"
+#include "common/string.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/message.h"
@@ -42,6 +43,7 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
+#include "utils/relfilenumbermap.h"
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -155,7 +157,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_PARAMETER_CHANGE:
{
xl_parameter_change *xlrec =
- (xl_parameter_change *) XLogRecGetData(buf->record);
+ (xl_parameter_change *) XLogRecGetData(buf->record);
/*
* If wal_level on the primary is reduced to less than
@@ -581,6 +583,134 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
return filter_by_origin_cb_wrapper(ctx, origin_id);
}
+static bool
+FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change)
+{
+ ReorderBuffer *rb = ctx->reorder;
+ Relation relation = NULL;
+ Oid reloid;
+ bool result = false;
+ bool using_subtxn;
+
+ if (ctx->callbacks.filter_by_table_cb == NULL)
+ return false;
+
+ switch (change->action)
+ {
+ /* intentionally fall through */
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
+ break;
+ default:
+ return false;
+ }
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally. That also allows us to easily enforce that nothing writes
+ * to the database by checking for xid assignments.
+ *
+ * When we're called via the SQL SRF there's already a transaction
+ * started, so start an explicit subtransaction there.
+ */
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("filter change by table");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
+ change->data.tp.rlocator.relNumber);
+ if (reloid == InvalidOid)
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+ reloid,
+ relpathperm(change->data.tp.rlocator,
+ MAIN_FORKNUM));
+
+ if (!RelationIsLogicallyLogged(relation))
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ /*
+ * Ignore temporary heaps created during DDL unless the plugin has asked
+ * for them.
+ */
+ if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ /*
+ * For now ignore sequence changes entirely. Most of the time they don't
+ * log changes using records we understand, so it doesn't make sense to
+ * handle the few cases we do.
+ */
+ if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+ {
+ result = true;
+ goto filter_done;
+ }
+
+ if (IsToastRelation(relation))
+ {
+ Oid real_reloid = InvalidOid;
+
+ /* pg_toast_ len is 9 */
+ char *toast_name = RelationGetRelationName(relation);
+ char *start_ch = &toast_name[9];
+
+ real_reloid = strtoint(start_ch, NULL, 10);
+
+ if (real_reloid == InvalidOid)
+ elog(ERROR, "cannot get the real table oid for toast table %s, error: %m", toast_name);
+
+ RelationClose(relation);
+
+ relation = RelationIdGetRelation(real_reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open real relation with OID %u (for toast table filenumber \"%s\")",
+ reloid,
+ relpathperm(change->data.tp.rlocator,
+ MAIN_FORKNUM));
+ }
+
+ result = filter_by_table_cb_wrapper(ctx, relation, change);
+
+filter_done:
+
+ if (result && RelationIsValid(relation))
+ elog(DEBUG1, "logical filter change by table %s", RelationGetRelationName(relation));
+
+ if (RelationIsValid(relation))
+ RelationClose(relation);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ if (result)
+ ReorderBufferReturnChange(rb, change, false);
+
+ return result;
+}
+
/*
* Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
*/
@@ -940,6 +1070,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change,
xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
@@ -1009,6 +1142,9 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
@@ -1065,6 +1201,9 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
@@ -1201,11 +1340,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
change->data.tp.clear_toast_afterwards = false;
- ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
- buf->origptr, change, false);
-
/* move to the next xl_multi_insert_tuple entry */
data += datalen;
+
+ if (FilterByTable(ctx, change))
+ continue;;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change, false);
}
Assert(data == tupledata + tuplelen);
}
@@ -1240,6 +1382,9 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.clear_toast_afterwards = true;
+ if (FilterByTable(ctx, change))
+ return;
+
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
change, false);
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ca09c683f1..8bd0cefb22 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1225,6 +1225,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
return ret;
}
+bool
+filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_by_table";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+ ctx->end_xact = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_by_table_cb(ctx, relation, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ return ret;
+}
+
static void
message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 425238187f..2966258ea3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static bool pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+ Relation relation,
+ ReorderBufferChange *change);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -259,6 +262,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->filter_by_table_cb = pgoutput_table_filter;
cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
@@ -1415,9 +1419,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- if (!is_publishable_relation(relation))
- return;
-
/*
* Remember the xid for the change in streaming mode. We need to send xid
* with each change in the streaming mode so that subscriber can make
@@ -1428,37 +1429,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
-
- /* First check the table filter */
- switch (action)
- {
- case REORDER_BUFFER_CHANGE_INSERT:
- if (!relentry->pubactions.pubinsert)
- return;
- break;
- case REORDER_BUFFER_CHANGE_UPDATE:
- if (!relentry->pubactions.pubupdate)
- return;
- break;
- case REORDER_BUFFER_CHANGE_DELETE:
- if (!relentry->pubactions.pubdelete)
- return;
-
- /*
- * This is only possible if deletes are allowed even when replica
- * identity is not defined for a table. Since the DELETE action
- * can't be published, we simply return.
- */
- if (!change->data.tp.oldtuple)
- {
- elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
- return;
- }
- break;
- default:
- Assert(false);
- }
-
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
@@ -1684,6 +1654,57 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
return false;
}
+/*
+ * Return true if the relation has not been published, false otherwise.
+ */
+static bool
+pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+ Relation relation,
+ ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ RelationSyncEntry *relentry;
+ ReorderBufferChangeType action = change->action;
+
+ if (!is_publishable_relation(relation))
+ return true;
+
+ relentry = get_rel_sync_entry(data, relation);
+
+ /* First check the table filter */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ if (!relentry->pubactions.pubinsert)
+ return true;
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ if (!relentry->pubactions.pubupdate)
+ return true;
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (!relentry->pubactions.pubdelete)
+ return true;
+
+ /*
+ * This is only possible if deletes are allowed even when replica
+ * identity is not defined for a table. Since the DELETE action
+ * can't be published, we simply return.
+ */
+ if (!change->data.tp.oldtuple)
+ {
+ elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+ return true;
+ }
+ break;
+ default:
+ Assert(false);
+ }
+
+ return false;
+}
+
+
/*
* Shutdown the output plugin.
*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..30a16cd784 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -145,6 +145,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation,
+ ReorderBufferChange *change);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 44988ebdd8..030eb5afb7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -96,6 +96,12 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
+/*
+ * Filter changes by table.
+ */
+typedef bool (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx,
+ Relation relation, ReorderBufferChange *change);
+
/*
* Called to shutdown an output plugin.
*/
@@ -222,6 +228,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeFilterByRelCB filter_by_table_cb;
LogicalDecodeShutdownCB shutdown_cb;
/* streaming of changes at prepare time */
diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl
new file mode 100644
index 0000000000..8d4f962adc
--- /dev/null
+++ b/src/test/subscription/t/034_table_filter.pl
@@ -0,0 +1,91 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "create table tbl_pub(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+ "create table tbl_t1(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;");
+$node_publisher->safe_psql('postgres',
+qq(
+CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$
+DECLARE
+ replication_record pg_stat_replication;
+BEGIN
+ LOOP
+ SELECT *
+ INTO replication_record
+ FROM pg_stat_replication
+ WHERE application_name = 'mysub';
+
+ IF replication_record.replay_lsn = replication_record.write_lsn THEN
+ EXIT;
+ END IF;
+
+ PERFORM pg_sleep(1);
+ END LOOP;
+END;
+\$\$ LANGUAGE plpgsql;));
+
+# Create some preexisting content on subscriber
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+ "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+ "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
+
+# test filter
+$node_publisher->safe_psql('postgres',
+qq(BEGIN;
+insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,9999) i;
+update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001;
+select check_replication_status();
+insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+COMMIT;)
+);
+
+my $minsize = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 order by size asc limit 1;");
+my $maxsize = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 order by size desc limit 1;");
+is($minsize, $maxsize, 'check decode filter table between maxsize and minsize');
+
+
+my $fristrow = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 where id = 1;");
+my $lastrow = $node_publisher->safe_psql('postgres',
+ "select size from tbl_t1 where id = 10001;");
+is($minsize, $maxsize, 'check decode filter table between fristrow and lastrow');
+
+is($minsize, $lastrow, 'check decode filter table between minsize and lastrow');
+
+print "minsize: " . $minsize . "maxsize: " . $maxsize ."fristrow: " . $fristrow ."lastrow: " . $lastrow . "\n";
+
+done_testing();
\ No newline at end of file
--
2.39.3
On Wed, Jan 17, 2024 at 11:45 AM li jie <ggysxcq@gmail.com> wrote:
Hi hackers,
During logical replication, if there is a large write transaction, some
spill files will be written to disk, depending on the setting of
logical_decoding_work_mem.This behavior can effectively avoid OOM, but if the transaction
generates a lot of change before commit, a large number of files may
fill the disk. For example, you can update a TB-level table.However, I found an inelegant phenomenon. If the modified large table is not
published, its changes will also be written with a large number of spill files.
Look at an example below:
Thanks. I agree that decoding and queuing the changes of unpublished
tables' data into reorder buffer is an unnecessary task for walsender.
It takes processing efforts (CPU overhead), consumes disk space and
uses memory configured via logical_decoding_work_mem for a replication
connection inefficiently.
Later you will see a large number of spill files in the
We can see that table tbl_t1 is not published in mypub. It also won't be sent
downstream because it's not subscribed.
After the transaction is reorganized, the pgoutput decoding plugin filters out
changes to these unpublished relationships when sending logical changes.
See function pgoutput_change.
Right. Here's my testing [1]HEAD: postgres=# BEGIN; BEGIN Time: 0.110 ms postgres=*# insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba', i),repeat('dfds', i) from generate_series(0,99999) i; INSERT 0 100000 Time: 379488.265 ms (06:19.488) postgres=*#.
Most importantly, if we filter out unpublished relationship-related
changes after constructing the changes but before queuing the changes
into a transaction, will it reduce the workload of logical decoding
and avoid disk
or memory growth as much as possible?
Right. It can.
The patch in the attachment is a prototype, which can effectively reduce the
memory and disk space usage during logical replication.Design:
1. Added a callback LogicalDecodeFilterByRelCB for the output plugin.2. Added this callback function pgoutput_table_filter for the pgoutput plugin.
Its main implementation is based on the table filter in the
pgoutput_change function.
Its main function is to determine whether the change needs to be published based
on the parameters of the publication, and if not, filter it.3. After constructing a change and before Queue a change into a transaction,
use RelidByRelfilenumber to obtain the relation associated with the change,
just like obtaining the relation in the ReorderBufferProcessTXN function.4. Relation may be a toast, and there is no good way to get its real
table relation based on toast relation. Here, I get the real table oid
through toast relname, and then get the real table relation.5. This filtering takes into account INSERT/UPDATE/INSERT. Other
changes have not been considered yet and can be expanded in the future.
Design of this patch is based on the principle of logical decoding
filtering things out early on and looks very similar to
filter_prepare_cb_wrapper/pg_decode_filter_prepare and
filter_by_origin_cb/pgoutput_origin_filter. Per my understanding this
design looks okay unless I'm missing anything.
Test:
1. Added a test case 034_table_filter.pl
2. Like the case above, create two tables, the published table tbl_pub and
the non-published table tbl_t1
3. Insert 10,000 rows of toast data into tbl_t1 on the publisher, and use
pg_ls_replslotdir to record the total size of the slot directory
every second.
4. Compare the size of the slot directory at the beginning of the
transaction(size1),
the size at the end of the transaction (size2), and the average
size of the entire process(size3).
5. Assert(size1==size2==size3)
I bet that the above test with 10K rows is going to take a noticeable
time on some buildfarm members (it took 6 seconds on my dev system
which is an AWS EC2 instance). And, the above test can get flaky.
Therefore, IMO, the concrete way of testing this feature is by looking
at the server logs for the following message using
PostgreSQL::Test::Cluster log_contains().
+filter_done:
+
+ if (result && RelationIsValid(relation))
+ elog(DEBUG1, "logical filter change by table %s",
RelationGetRelationName(relation));
+
Here are some comments on the v1 patch:
1.
@@ -1415,9 +1419,6 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- if (!is_publishable_relation(relation))
- return;
-
Instead of removing is_publishable_relation from pgoutput_change, I
think it can just be turned into an assertion
Assert(is_publishable_relation(relation));, no?
2.
+ switch (change->action)
+ {
+ /* intentionally fall through */
Perhaps, it must use /* FALLTHROUGH */ just like elsewhere in the
code, otherwise a warning is thrown.
3. From commit message:
Most of the code in the FilterByTable function is transplanted from
the ReorderBufferProcessTXN
function, which can be called before the ReorderBufferQueueChange function.It is
I think the above note can just be above the FilterByTable function
for better understanding.
+static bool
+FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change)
+{
4. Why is FilterByTable(ctx, change) call placed after DecodeXLogTuple
in DecodeInsert, DecodeUpdate and DecodeDelete? Is there a use for
decoded tuples done by DecodeXLogTuple in the new callback
filter_by_table_cb? If not, can we move FilterByTable call before
DecodeXLogTuple to avoid some more extra processing?
5. Why is ReorderBufferChange needed as a parameter to FilterByTable
and filter_by_table_cb? Can't just the LogicalDecodingContext and
relation name, the change action be enough to decide if the table is
publishable or not? If done this way, it can avoid some more
processing, no?
6. Please run pgindent and pgperltidy on the new source code and new
TAP test file respectively.
[1]: HEAD: postgres=# BEGIN; BEGIN Time: 0.110 ms postgres=*# insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba', i),repeat('dfds', i) from generate_series(0,99999) i; INSERT 0 100000 Time: 379488.265 ms (06:19.488) postgres=*#
HEAD:
postgres=# BEGIN;
BEGIN
Time: 0.110 ms
postgres=*# insert into tbl_t1 select i,repeat('xyzzy',
i),repeat('abcba', i),repeat('dfds', i) from generate_series(0,99999)
i;
INSERT 0 100000
Time: 379488.265 ms (06:19.488)
postgres=*#
ubuntu:~/postgres/pg17/bin$ du -sh
/home/ubuntu/postgres/pg17/bin/db17/pg_replslot/mysub
837M /home/ubuntu/postgres/pg17/bin/db17/pg_replslot/mysub
ubuntu:~/postgres/pg17/bin$ du -sh /home/ubuntu/postgres/pg17/bin/db17
2.6G /home/ubuntu/postgres/pg17/bin/db17
PATCHED:
postgres=# BEGIN;
BEGIN
Time: 0.105 ms
postgres=*# insert into tbl_t1 select i,repeat('xyzzy',
i),repeat('abcba', i),repeat('dfds', i) from generate_series(0,99999)
i;
INSERT 0 100000
Time: 380044.554 ms (06:20.045)
ubuntu:~/postgres$ du -sh /home/ubuntu/postgres/pg17/bin/db17/pg_replslot/mysub
8.0K /home/ubuntu/postgres/pg17/bin/db17/pg_replslot/mysub
ubuntu:~/postgres$ du -sh /home/ubuntu/postgres/pg17/bin/db17
1.8G /home/ubuntu/postgres/pg17/bin/db17
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
On Thu, Jan 18, 2024 at 12:12 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Wed, Jan 17, 2024 at 11:45 AM li jie <ggysxcq@gmail.com> wrote:
Hi hackers,
During logical replication, if there is a large write transaction, some
spill files will be written to disk, depending on the setting of
logical_decoding_work_mem.This behavior can effectively avoid OOM, but if the transaction
generates a lot of change before commit, a large number of files may
fill the disk. For example, you can update a TB-level table.However, I found an inelegant phenomenon. If the modified large table is not
published, its changes will also be written with a large number of spill files.
Look at an example below:Thanks. I agree that decoding and queuing the changes of unpublished
tables' data into reorder buffer is an unnecessary task for walsender.
It takes processing efforts (CPU overhead), consumes disk space and
uses memory configured via logical_decoding_work_mem for a replication
connection inefficiently.
This is all true but note that in successful cases (where the table is
published) all the work done by FilterByTable(accessing caches,
transaction-related stuff) can add noticeable overhead as anyway we do
that later in pgoutput_change(). I think I gave the same comment
earlier as well but didn't see any satisfactory answer or performance
data for successful cases to back this proposal. Note, users can
configure to stream_in_progress transactions in which case they
shouldn't see such a big problem. However, I agree that if we can find
some solution where there is no noticeable overhead then that would be
worth considering.
--
With Regards,
Amit Kapila.
On Thu, Jan 18, 2024 at 2:47 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Jan 18, 2024 at 12:12 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:On Wed, Jan 17, 2024 at 11:45 AM li jie <ggysxcq@gmail.com> wrote:
Hi hackers,
During logical replication, if there is a large write transaction, some
spill files will be written to disk, depending on the setting of
logical_decoding_work_mem.This behavior can effectively avoid OOM, but if the transaction
generates a lot of change before commit, a large number of files may
fill the disk. For example, you can update a TB-level table.However, I found an inelegant phenomenon. If the modified large table is not
published, its changes will also be written with a large number of spill files.
Look at an example below:Thanks. I agree that decoding and queuing the changes of unpublished
tables' data into reorder buffer is an unnecessary task for walsender.
It takes processing efforts (CPU overhead), consumes disk space and
uses memory configured via logical_decoding_work_mem for a replication
connection inefficiently.This is all true but note that in successful cases (where the table is
published) all the work done by FilterByTable(accessing caches,
transaction-related stuff) can add noticeable overhead as anyway we do
that later in pgoutput_change().
Right. Overhead for published tables need to be studied. A possible
way is to mark the checks performed in
FilterByTable/filter_by_table_cb and skip the same checks in
pgoutput_change. I'm not sure if this works without any issues though.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
Hi,
This is all true but note that in successful cases (where the table is
published) all the work done by FilterByTable(accessing caches,
transaction-related stuff) can add noticeable overhead as anyway we do
that later in pgoutput_change(). I think I gave the same comment
earlier as well but didn't see any satisfactory answer or performance
data for successful cases to back this proposal.
I did some benchmark yesterday at [1]/messages/by-id/87o7cadqj3.fsf@163.com and found it adds 20% cpu time.
then come out a basic idea, I think it deserves a share. "transaction
related stuff" comes from the syscache/systable access except the
HistorySansphot. and the syscache is required in the following
sistuations:
1. relfilenode (from wal) -> relid.
2. relid -> namespaceid (to check if the relid is a toast relation).
3. if toast, get its origianl relid.
4. access the data from pg_publication_tables.
5. see if the relid is a partition, if yes, we may get its root
relation.
Acutally we already has a RelationSyncCache for #4, and it *only* need
to access syscache when replicate_valid is false, I think this case
should be rare, but the caller doesn't know it, so the caller must
prepare the transaction stuff in advance even in the most case they are
not used. So I think we can get a optimization here.
then the attached patch is made.
Author: yizhi.fzh <yizhi.fzh@alibaba-inc.com>
Date: Wed Feb 21 18:40:03 2024 +0800
Make get_rel_sync_entry less depending on transaction state.
get_rel_sync_entry needs transaction only a replicate_valid = false
entry is found, this should be some rare case. However the caller can't
know if a entry is valid, so they have to prepare the transaction state
before calling this function. Such preparation is expensive.
This patch makes the get_rel_sync_entry can manage a transaction stage
only if necessary. so the callers don't need to prepare it blindly.
Then comes to #1, acutally we have RelfilenumberMapHash as a cache, when
the cache is hit (suppose this is a usual case), no transaction stuff
related. I have two ideas then:
1. Optimize the cache hit sistuation like what we just did for
get_rel_sync_entry for the all the 5 kinds of data and only pay the
effort for cache miss case. for the data for #2, #3, #5, all the keys
are relid, so I think a same HTAB should be OK.
2. add the content for #1, #2, #3, #5 to wal when wal_level is set to
logical.
In either case, the changes for get_rel_sync_entry should be needed.
Note, users can
configure to stream_in_progress transactions in which case they
shouldn't see such a big problem.
People would see the changes is spilled to disk, but the CPU cost for
Reorder should be still paid I think.
[1]: /messages/by-id/87o7cadqj3.fsf@163.com
--
Best Regards
Andy Fan
Attachments:
v1-0001-Make-get_rel_sync_entry-less-depending-on-transac.patchtext/x-diffDownload
From 90b8df330df049bd1d7e881dc6e9b108c17b0924 Mon Sep 17 00:00:00 2001
From: "yizhi.fzh" <yizhi.fzh@alibaba-inc.com>
Date: Wed, 21 Feb 2024 18:40:03 +0800
Subject: [PATCH v1 1/1] Make get_rel_sync_entry less depending on transaction
state.
get_rel_sync_entry needs transaction only a replicate_valid = false
entry is found, this should be some rare case. However the caller can't
know if a entry is valid, so they have to prepare the transaction state
before calling this function. Such preparation is expensive.
This patch makes the get_rel_sync_entry can manage a transaction stage
only if necessary. so the callers don't need to prepare it blindly.
---
src/backend/replication/pgoutput/pgoutput.c | 60 ++++++++++++++++++++-
1 file changed, 59 insertions(+), 1 deletion(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 998f92d671..25e55590a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/tupconvert.h"
+#include "access/relation.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
@@ -214,6 +215,11 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static RelationSyncEntry *get_rel_sync_entry_by_relid(PGOutputData *data,
+ Oid relid);
+static RelationSyncEntry *get_rel_sync_entry_internal(PGOutputData *data,
+ Relation relation,
+ Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -1962,11 +1968,29 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
*/
static RelationSyncEntry *
get_rel_sync_entry(PGOutputData *data, Relation relation)
+{
+ return get_rel_sync_entry_internal(data, relation, InvalidOid);
+}
+
+static RelationSyncEntry *
+__attribute__ ((unused))
+get_rel_sync_entry_by_relid(PGOutputData *data, Oid relid)
+{
+ return get_rel_sync_entry_internal(data, NULL, relid);
+}
+
+static RelationSyncEntry *
+get_rel_sync_entry_internal(PGOutputData *data, Relation relation, Oid oid)
{
RelationSyncEntry *entry;
bool found;
MemoryContext oldctx;
- Oid relid = RelationGetRelid(relation);
+ Oid relid = OidIsValid(oid) ? oid: RelationGetRelid(relation);
+ bool started_xact = false, using_subtxn;
+
+ /* either oid or relation is provided. */
+ Assert(OidIsValid(oid) || RelationIsValid(relation));
+ Assert(!(OidIsValid(oid) && RelationIsValid(relation)));
Assert(RelationSyncCache != NULL);
@@ -1993,6 +2017,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->attrmap = NULL;
}
+ if (!entry->replicate_valid && !IsTransactionOrTransactionBlock())
+ {
+ /*
+ * Validating the entry needs to access syscache, which must
+ * be in a transaction state, if that's not ready, start one.
+ */
+
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction(__func__);
+ else
+ StartTransactionCommand();
+
+ started_xact = true;
+ }
+
/* Validate the entry */
if (!entry->replicate_valid)
{
@@ -2198,9 +2239,19 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
entry->pubactions.pubdelete)
{
+ bool rel_opened = false;
+
+ if (!RelationIsValid(relation))
+ {
+ relation = relation_open(oid, AccessShareLock);
+ rel_opened = true;
+ }
/* Initialize the tuple slot and map */
init_tuple_slot(data, relation, entry);
+ if (rel_opened)
+ relation_close(relation, AccessShareLock);
+
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
@@ -2215,6 +2266,13 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->replicate_valid = true;
}
+ if (started_xact)
+ {
+ AbortCurrentTransaction();
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+
return entry;
}
--
2.34.1
Hi,
Sorry I replied too late.
This is all true but note that in successful cases (where the table is
published) all the work done by FilterByTable(accessing caches,
transaction-related stuff) can add noticeable overhead as anyway we do
that later in pgoutput_change().
You are correct. Frequent opening of transactions and access to cache will
cause a lot of overhead, which Andy has tested and proved.
The root cause is because every dml wal record needs to do this, which is really
wasteful. I use a hash table LocatorFilterCache to solve this problem.
After getting
a RelFileLocator, I go to the hash table to check its
PublicationActions and filter it
based on the PublicationActions to determine whether it has been published.
The effect of my test is very obvious: (perf record)
v1:
Children Self Command Shared O Symbol
+ 22.04% 1.53% postgres postgres [.] FilterByTable
v2:
Children Self Command Shared O Symbol
+ 0.58% 0.00% postgres postgres [.] ReorderBufferFilterByLocator
v1 patch introduces 20% overhead, while v2 only has 0.58%.
Note, users can
configure to stream_in_progress transactions in which case they
shouldn't see such a big problem.
Yes, stream mode can prevent these irrelevant changes from being written to
disk or sent to downstream.
However, CPU and memory consumption will also be incurred when processing
these useless changes. Here is my simple test[1]/messages/by-id/CAGfChW62f5NTNbLsqO-6_CrmKPqBEQtWPcPDafu8pCwZznk=xw@mail.gmail.com:
base on master :
CPU stat: perf stat -p pid -e cycles -I 1000
# time counts unit events
76.007070936 9,691,035 cycles
77.007163484 5,977,694 cycles
78.007252533 5,924,703 cycles
79.007346862 5,861,934 cycles
80.007438070 5,858,264 cycles
81.007527122 6,408,759 cycles
82.007615711 6,397,988 cycles
83.007705685 5,520,407 cycles
84.007794387 5,359,162 cycles
85.007884879 5,194,079 cycles
86.007979797 5,391,270 cycles
87.008069606 5,474,536 cycles
88.008162827 5,594,190 cycles
89.008256327 5,610,023 cycles
90.008349583 5,627,350 cycles
91.008437785 6,273,510 cycles
92.008527938 580,934,205 cycles
93.008620136 4,404,672 cycles
94.008711818 4,599,074 cycles
95.008805591 4,374,958 cycles
96.008894543 4,300,180 cycles
97.008987582 4,157,892 cycles
98.009077445 4,072,178 cycles
99.009163475 4,043,875 cycles
100.009254888 5,382,667 cycles
memory stat: pistat -p pid -r 1 10
07:57:18 AM UID PID minflt/s majflt/s VSZ RSS %MEM Command
07:57:19 AM 1000 11848 233.00 0.00 386872 81276 0.01 postgres
07:57:20 AM 1000 11848 235.00 0.00 387008 82068 0.01 postgres
07:57:21 AM 1000 11848 236.00 0.00 387144 83124 0.01 postgres
07:57:22 AM 1000 11848 236.00 0.00 387144 83916 0.01 postgres
07:57:23 AM 1000 11848 236.00 0.00 387280 84972 0.01 postgres
07:57:24 AM 1000 11848 334.00 0.00 337000 36928 0.00 postgres
07:57:25 AM 1000 11848 3.00 0.00 337000 36928 0.00 postgres
07:57:26 AM 1000 11848 0.00 0.00 337000 36928 0.00 postgres
07:57:27 AM 1000 11848 0.00 0.00 337000 36928 0.00 postgres
07:57:28 AM 1000 11848 0.00 0.00 337000 36928 0.00 postgres
Average: 1000 11848 151.30 0.00 362045 60000 0.01 postgres
After patched:
# time counts unit events
76.007623310 4,237,505 cycles
77.007717436 3,989,618 cycles
78.007813848 3,965,857 cycles
79.007906412 3,601,715 cycles
80.007998111 3,670,835 cycles
81.008092670 3,495,844 cycles
82.008187456 3,822,695 cycles
83.008281335 5,034,146 cycles
84.008374998 3,867,683 cycles
85.008470245 3,996,927 cycles
86.008563783 3,823,893 cycles
87.008658628 3,825,472 cycles
88.008755246 3,823,079 cycles
89.008849719 3,966,083 cycles
90.008945774 4,012,704 cycles
91.009044492 4,026,860 cycles
92.009139621 3,860,912 cycles
93.009242485 3,961,533 cycles
94.009346304 3,799,897 cycles
95.009440164 3,959,602 cycles
96.009534251 3,960,405 cycles
97.009625904 3,762,581 cycles
98.009716518 4,859,490 cycles
99.009807720 3,940,845 cycles
100.009901399 3,888,095 cycles
08:01:47 AM UID PID minflt/s majflt/s VSZ RSS %MEM Command
08:01:48 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:49 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:50 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:51 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:52 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:53 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:54 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:55 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:56 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
08:01:57 AM 1000 19466 0.00 0.00 324424 15140 0.00 postgres
Average: 1000 19466 0.00 0.00 324424 15140 0.00 postgres
Through comparison, it is found that patch is also profitable for stream mode.
Of course, LocatorFilterCache also need to deal with invalidation, such as the
corresponding relation invalidate, or pg_publication changes, just like
RelationSyncCache and RelfilenumberMapHash.
But ddl is a small amount after all, which is insignificant compared to a
large amount of dml.
Another problem is that the LocatorFilterCache looks redundant compared
to RelationSyncCache and RelfilenumberMapHash. like this:
1. RelfilenumberMapHash: relfilenode -> relation oid
2. RelationSyncCache: relation oid-> PublicationActions
3. LocatorFilterCache: RelFileLocator-> PublicationActions
The reason is that you cannot simply access two caches from the
relfilenode --> PublicationActions, and you must use historical
snapshots to access
transactions and relcache in the middle, so there is no good solution
for this for the
time being, ugly but effective.
Therefore, IMO, the concrete way of testing this feature is by looking
at the server logs for the following message using
PostgreSQL::Test::Cluster log_contains().
thinks, done.
Instead of removing is_publishable_relation from pgoutput_change, I
think it can just be turned into an assertion
Assert(is_publishable_relation(relation));, no?
yes, done.
Perhaps, it must use /* FALLTHROUGH */ just like elsewhere in the
code, otherwise a warning is thrown.
/* intentionally fall through */ can also avoid warnings.
Can't just the LogicalDecodingContext and
relation name, the change action be enough to decide if the table is
publishable or not? If done this way, it can avoid some more
processing, no?
yes, RelFileLocator filtering is used directly in v2, and change is
no longer required.
Please run pgindent and pgperltidy on the new source code and new
TAP test file respectively.
ok.
[1]: /messages/by-id/CAGfChW62f5NTNbLsqO-6_CrmKPqBEQtWPcPDafu8pCwZznk=xw@mail.gmail.com
Attachments:
v2-Reduce-useless-changes-before-reassembly-during-logi.patchapplication/octet-stream; name=v2-Reduce-useless-changes-before-reassembly-during-logi.patchDownload
From 1fdc49f2b8b16b4c39759cc5f8cd3e5a0d6bcd55 Mon Sep 17 00:00:00 2001
From: "adger.lj" <adger.lj@alibaba-inc.com>
Date: Wed, 6 Mar 2024 15:21:17 +0800
Subject: [PATCH] Reduce useless changes before reassembly during logical
replication
In order to reduce unnecessary logical replication, irrelevant relationship
changes can be filtered out before reorganizing transaction fragments.
This can effectively reduce useless changes and prevent storage space from
being filled up with irrelevant data. In addition, even though the stream mode
is used, this patch can effectively reduce cpu and memory consumption.
By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin.
We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache
is reused to determine whether a relationship-related change should be filtered out.
referring to the implementation of the function pgoutput_change, currently only
insert/update/delete is can filtered, and other types of changes are not considered.
Perhaps more detailed analysis can be done and more filters can be filtered.
In decoding, a function ReorderBufferFilterByLocator is added, which can decide
whether to add to the change list of transactions according to the RelFileLocator
in the wal record. In order to implement this filtering, we need to find its relation
according to the RelFileLocator, and then judge whether its dml has been published.
This process requires opening transactions and using snapshots and accessing caches.
In order to avoid overhead, a new cache has to be introduced to cache RelFileLocator
to PublicationActions. In this way, additional overhead will only be incurred when
obtaining PublicationActions for a new RelFileLocator, and in most cases, it can be
taken directly from the LocatorCache.
---
src/backend/replication/logical/decode.c | 17 +-
src/backend/replication/logical/logical.c | 28 ++
.../replication/logical/reorderbuffer.c | 366 +++++++++++++++++-
src/backend/replication/pgoutput/pgoutput.c | 28 +-
src/include/replication/logical.h | 3 +
src/include/replication/output_plugin.h | 9 +
src/include/replication/reorderbuffer.h | 1 +
src/test/subscription/t/034_table_filter.pl | 89 +++++
src/tools/pgindent/typedefs.list | 1 +
9 files changed, 532 insertions(+), 10 deletions(-)
create mode 100644 src/test/subscription/t/034_table_filter.pl
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e5ab7b78b7..7e59356dc6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,7 +152,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_PARAMETER_CHANGE:
{
xl_parameter_change *xlrec =
- (xl_parameter_change *) XLogRecGetData(buf->record);
+ (xl_parameter_change *) XLogRecGetData(buf->record);
/*
* If wal_level on the primary is reduced to less than
@@ -914,6 +914,11 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_locator.dbOid != ctx->slot->data.database)
return;
+ /* only interested in our published tables. */
+ if (ReorderBufferFilterByLocator(ctx->reorder, XLogRecGetXid(r), &target_locator,
+ REORDER_BUFFER_CHANGE_INSERT, buf->origptr))
+ return;
+
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
@@ -964,6 +969,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_locator.dbOid != ctx->slot->data.database)
return;
+ /* only interested in our published tables. */
+ if (ReorderBufferFilterByLocator(ctx->reorder, XLogRecGetXid(r), &target_locator,
+ REORDER_BUFFER_CHANGE_UPDATE, buf->origptr))
+ return;
+
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
@@ -1030,6 +1040,11 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_locator.dbOid != ctx->slot->data.database)
return;
+ /* only interested in our published tables. */
+ if (ReorderBufferFilterByLocator(ctx->reorder, XLogRecGetXid(r), &target_locator,
+ REORDER_BUFFER_CHANGE_DELETE, buf->origptr))
+ return;
+
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..9be22862e4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1235,6 +1235,34 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
return ret;
}
+void
+filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, PublicationActions *pubactions)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_by_table";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+ ctx->end_xact = false;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.filter_by_table_cb(ctx, relation, pubactions);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 001f901ee6..ab2b43d271 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -92,6 +92,7 @@
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
+#include "common/string.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -104,8 +105,10 @@
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
+#include "utils/inval.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+#include "utils/syscache.h"
/* entry for a hash table we use to map from xid to our transaction state */
@@ -211,6 +214,14 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
/* GUC variable */
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
+static HTAB *LocatorFilterCache = NULL;
+typedef struct LocatorFilterEntry
+{
+ RelFileLocator relfileocator;
+ Oid relid;
+ PublicationActions pubactions;
+} LocatorFilterEntry;
+
/* ---------------------------------------
* primary reorderbuffer support routines
* ---------------------------------------
@@ -273,6 +284,7 @@ static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static Snapshot ReorderBufferStreamTXNSnapShot(ReorderBuffer *rb, ReorderBufferTXN *txn);
/* ---------------------------------------
* toast reassembly support
@@ -295,6 +307,10 @@ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
bool addition, Size sz);
+static void init_locator_filter_cache(MemoryContext cachectx);
+static void locator_filter_invalidate_syscache_cb(Datum arg, int cacheid, uint32 hashvalue);
+static void locator_filter_invalidate_relcache_cb(Datum arg, Oid relid);
+
/*
* Allocate a new ReorderBuffer and clean out any old serialized state from
* prior ReorderBuffer instances for the same slot.
@@ -1404,7 +1420,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
{
dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
ReorderBufferChange *next_change =
- dlist_container(ReorderBufferChange, node, next);
+ dlist_container(ReorderBufferChange, node, next);
/* txn stays the same */
state->entries[off].lsn = next_change->lsn;
@@ -1435,8 +1451,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
{
/* successfully restored changes from disk */
ReorderBufferChange *next_change =
- dlist_head_element(ReorderBufferChange, node,
- &entry->txn->changes);
+ dlist_head_element(ReorderBufferChange, node,
+ &entry->txn->changes);
elog(DEBUG2, "restored %u/%u changes from disk",
(uint32) entry->txn->nentries_mem,
@@ -3837,7 +3853,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
{
char *data;
Size inval_size = sizeof(SharedInvalidationMessage) *
- change->data.inval.ninvalidations;
+ change->data.inval.ninvalidations;
sz += inval_size;
@@ -3980,6 +3996,38 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
return false;
}
+static Snapshot
+ReorderBufferStreamTXNSnapShot(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ Snapshot snapshot_now;
+ dlist_iter subxact_i;
+
+ Assert(rbtxn_is_toptxn(txn));
+
+ /*
+ * If this transaction has no snapshot, it didn't make any changes to the
+ * database till now, so there's nothing to decode.
+ */
+ if (txn->base_snapshot == NULL)
+ {
+ Assert(txn->ninvalidations == 0);
+ return NULL;
+ }
+
+ dlist_foreach(subxact_i, &txn->subtxns)
+ {
+ ReorderBufferTXN *subtxn;
+
+ subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+ ReorderBufferTransferSnapToParent(txn, subtxn);
+ }
+
+ snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
+ txn, txn->command_id);
+
+ return snapshot_now;
+}
+
/*
* Send data of a large transaction (and its subtransactions) to the
* output plugin, but using the stream API.
@@ -4202,7 +4250,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
dlist_foreach_modify(cleanup_iter, &txn->changes)
{
ReorderBufferChange *cleanup =
- dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
+ dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
dlist_delete(&cleanup->node);
ReorderBufferReturnChange(rb, cleanup, true);
@@ -4427,7 +4475,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INVALIDATION:
{
Size inval_size = sizeof(SharedInvalidationMessage) *
- change->data.inval.ninvalidations;
+ change->data.inval.ninvalidations;
change->data.inval.invalidations =
MemoryContextAlloc(rb->context, inval_size);
@@ -4932,7 +4980,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
dlist_foreach_modify(it, &ent->chunks)
{
ReorderBufferChange *change =
- dlist_container(ReorderBufferChange, node, it.cur);
+ dlist_container(ReorderBufferChange, node, it.cur);
dlist_delete(&change->node);
ReorderBufferReturnChange(rb, change, true);
@@ -5270,3 +5318,307 @@ restart:
*cmax = ent->cmax;
return true;
}
+
+/*
+ * Determine whether the record needs to be added to the transaction
+ * change list based on the RelFileLocator.
+ * In order to avoid excessive overhead, we use caching, that is, we need
+ * to construct information about whether the RelFileLocator is published
+ * during the first access or after the cache invalidated.
+ * A Historical Snapshot is required here. Although the current transaction
+ * may not be completely reorganized, the base snapshot can still be used for
+ * gradual iteration, just like the snapshot is processed in the ReorderBufferStreamTXN
+ * function, but we're just read-only here.
+ */
+bool
+ReorderBufferFilterByLocator(ReorderBuffer *rb, TransactionId xid, RelFileLocator *relfileocator, ReorderBufferChangeType action, XLogRecPtr lsn)
+{
+ LogicalDecodingContext *ctx = rb->private_data;
+ LocatorFilterEntry *entry;
+ bool found;
+ Relation relation = NULL;
+ Oid reloid = InvalidOid;
+ bool using_subtxn;
+ bool filter = false;
+ Snapshot snapshot_now = NULL;
+ ReorderBufferTXN *txn,
+ *toptxn;
+
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ toptxn = rbtxn_get_toptxn(txn);
+
+ if (ctx->callbacks.filter_by_table_cb == NULL)
+ return false;
+
+ /*
+ * If you don't filter it before reaching the restart lsn, let the
+ * subsequent processing it.
+ */
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, lsn) || ctx->fast_forward)
+ return false;
+
+ if (LocatorFilterCache == NULL)
+ init_locator_filter_cache(CacheMemoryContext);
+
+ /* Find cached relation info, creating if not found */
+ entry = (LocatorFilterEntry *) hash_search(LocatorFilterCache,
+ relfileocator,
+ HASH_ENTER, &found);
+ Assert(entry != NULL);
+ if (found)
+ goto filter_done;
+
+ entry->pubactions.pubdelete = entry->pubactions.pubinsert
+ = entry->pubactions.pubtruncate = entry->pubactions.pubupdate = false;
+
+ /* Constructs a temporary historical snapshot. */
+ snapshot_now = ReorderBufferStreamTXNSnapShot(rb, toptxn);
+
+ if (snapshot_now == NULL)
+ return false;
+
+ /* build data to be able to lookup the CommandIds of catalog tuples */
+ ReorderBufferBuildTupleCidHash(rb, toptxn);
+
+ /* setup the initial snapshot */
+ SetupHistoricSnapshot(snapshot_now, toptxn->tuplecid_hash);
+
+ entry->relid = InvalidOid;
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("filter change by table");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenumber(relfileocator->spcOid, relfileocator->relNumber);
+ if (reloid == InvalidOid)
+ goto init_cache_done;
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+ reloid,
+ relpathperm(*relfileocator,
+ MAIN_FORKNUM));
+
+ if (!RelationIsLogicallyLogged(relation))
+ goto init_cache_done;
+
+ /*
+ * Ignore temporary heaps created during DDL unless the plugin has asked
+ * for them.
+ */
+ if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+ goto init_cache_done;
+
+ /*
+ * For now ignore sequence changes entirely. Most of the time they don't
+ * log changes using records we understand, so it doesn't make sense to
+ * handle the few cases we do.
+ */
+ if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+ goto init_cache_done;
+
+ if (IsToastRelation(relation))
+ {
+ Oid real_reloid = InvalidOid;
+
+ /* pg_toast_ len is 9 */
+ char *toast_name = RelationGetRelationName(relation);
+ char *start_ch = &toast_name[9];
+
+ real_reloid = strtoint(start_ch, NULL, 10);
+
+ if (real_reloid == InvalidOid)
+ elog(ERROR, "cannot get the real table oid for toast table %s, error: %m", toast_name);
+
+ RelationClose(relation);
+
+ relation = RelationIdGetRelation(real_reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open real relation with OID %u (for toast table filenumber \"%s\")",
+ reloid,
+ relpathperm(*relfileocator,
+ MAIN_FORKNUM));
+ }
+
+ filter_by_table_cb_wrapper(ctx, relation, &entry->pubactions);
+ entry->relid = reloid;
+
+init_cache_done:
+
+ if (RelationIsValid(relation))
+ RelationClose(relation);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ TeardownHistoricSnapshot(false);
+ pfree(snapshot_now);
+
+filter_done:
+ /* check the table filter */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ if (!entry->pubactions.pubinsert)
+ filter = true;
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ if (!entry->pubactions.pubupdate)
+ filter = true;
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (!entry->pubactions.pubdelete)
+ filter = true;
+ break;
+ default:
+ filter = true;
+ }
+
+ if (filter)
+ elog(DEBUG1, "logical filter change by table %u", entry->relid);
+
+ return filter;
+}
+
+
+/*
+ * Initialize the locator filter cache for a decoding session.
+ *
+ * The hash table is destroyed at the end of a decoding session.
+ * After the relcache invalidated, the corresponding LocatorFilterEntry also needs to be recalculated.
+ * After these three syscache(NAMESPACEOID, PUBLICATIONRELMAP, PUBLICATIONNAMESPACEMAP) invalidated,
+ * all locator cache need to be invalidated. Just like RelationSyncCache.
+ */
+static void
+init_locator_filter_cache(MemoryContext cachectx)
+{
+ HASHCTL ctl;
+ static bool relfile_callbacks_registered = false;
+
+ /* Nothing to do if hash table already exists */
+ if (LocatorFilterCache != NULL)
+ return;
+
+ /* Make a new hash table for the cache */
+ ctl.keysize = sizeof(RelFileLocator);
+ ctl.entrysize = sizeof(LocatorFilterEntry);
+ ctl.hcxt = cachectx;
+
+ LocatorFilterCache = hash_create("logical replication output RelFile cache",
+ 128, &ctl,
+ HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
+
+ Assert(LocatorFilterCache != NULL);
+
+ /* No more to do if we already registered callbacks */
+ if (relfile_callbacks_registered)
+ return;
+
+ /* We must update the cache entry for a relation after a relcache flush */
+ CacheRegisterRelcacheCallback(locator_filter_invalidate_relcache_cb, (Datum) 0);
+
+ /*
+ * Flush all cache entries after a pg_namespace change, in case it was a
+ * schema rename affecting a relation being replicated.
+ */
+ CacheRegisterSyscacheCallback(NAMESPACEOID,
+ locator_filter_invalidate_syscache_cb,
+ (Datum) 0);
+
+ /*
+ * Flush all cache entries after any publication changes. (We need no
+ * callback entry for pg_publication, because publication_invalidation_cb
+ * will take care of it.)
+ */
+ CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
+ locator_filter_invalidate_syscache_cb,
+ (Datum) 0);
+ CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
+ locator_filter_invalidate_syscache_cb,
+ (Datum) 0);
+
+ relfile_callbacks_registered = true;
+}
+
+/*
+ * Relation locator map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
+ */
+static void
+locator_filter_invalidate_syscache_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+ HASH_SEQ_STATUS status;
+ LocatorFilterEntry *entry;
+
+ /*
+ * We can get here if the plugin was used in SQL interface as the
+ * LocatorFilterCache is destroyed when the decoding finishes, but there
+ * is no way to unregister the invalidation callbacks.
+ */
+ if (LocatorFilterCache == NULL)
+ return;
+
+ /*
+ * We have no easy way to identify which cache entries this invalidation
+ * event might have affected, so just mark them all invalid.
+ */
+ hash_seq_init(&status, LocatorFilterCache);
+ while ((entry = (LocatorFilterEntry *) hash_seq_search(&status)) != NULL)
+ {
+ if (hash_search(LocatorFilterCache,
+ &entry->relfileocator,
+ HASH_REMOVE,
+ NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+}
+
+
+/*
+ * Flush mapping entries when pg_class is updated in a relevant fashion.
+ */
+static void
+locator_filter_invalidate_relcache_cb(Datum arg, Oid relid)
+{
+ LocatorFilterEntry *entry;
+
+ HASH_SEQ_STATUS status;
+
+ /*
+ * We can get here if the plugin was used in SQL interface as the
+ * LocatorFilterCache is destroyed when the decoding finishes, but there
+ * is no way to unregister the invalidation callbacks.
+ */
+ if (LocatorFilterCache == NULL)
+ return;
+
+ /*
+ * If relid is InvalidOid, signaling a complete reset, we must remove all
+ * entries, otherwise just remove the specific relation's entry. Always
+ * remove negative cache entries.
+ */
+ hash_seq_init(&status, LocatorFilterCache);
+ while ((entry = (LocatorFilterEntry *) hash_seq_search(&status)) != NULL)
+ {
+ if (relid == InvalidOid || /* complete reset */
+ entry->relid == InvalidOid || /* negative cache entry */
+ entry->relid == relid) /* individual flushed relation */
+ {
+ if (hash_search(LocatorFilterCache,
+ &entry->relfileocator,
+ HASH_REMOVE,
+ NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+ }
+}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d2b35cfb96..8948fc56d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -56,6 +56,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pgoutput_table_filter(LogicalDecodingContext *ctx,
+ Relation relation,
+ PublicationActions *pubactions);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -258,6 +261,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->filter_by_table_cb = pgoutput_table_filter;
cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
@@ -1414,8 +1418,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- if (!is_publishable_relation(relation))
- return;
+ Assert(is_publishable_relation(relation));
/*
* Remember the xid for the change in streaming mode. We need to send xid
@@ -1683,6 +1686,27 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
return false;
}
+/*
+ * Return pubactions for relation
+ */
+static void
+pgoutput_table_filter(LogicalDecodingContext *ctx,
+ Relation relation,
+ PublicationActions *pubactions)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ RelationSyncEntry *relentry;
+
+ if (!is_publishable_relation(relation))
+ return;
+
+ relentry = get_rel_sync_entry(data, relation);
+
+ if (pubactions)
+ *pubactions = relentry->pubactions;
+}
+
+
/*
* Shutdown the output plugin.
*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..f6beecfe49 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -145,6 +145,9 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern void filter_by_table_cb_wrapper(LogicalDecodingContext *ctx,
+ Relation relation,
+ PublicationActions *pubactions);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 44988ebdd8..7b051b47eb 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -10,6 +10,7 @@
#define OUTPUT_PLUGIN_H
#include "replication/reorderbuffer.h"
+#include "catalog/pg_publication.h"
struct LogicalDecodingContext;
struct OutputPluginCallbacks;
@@ -96,6 +97,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
+/*
+ * Filter changes by table.
+ */
+typedef void (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx,
+ Relation relation,
+ PublicationActions *pubactions);
+
/*
* Called to shutdown an output plugin.
*/
@@ -222,6 +230,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeFilterByRelCB filter_by_table_cb;
LogicalDecodeShutdownCB shutdown_cb;
/* streaming of changes at prepare time */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..f1dd145f3a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -730,5 +730,6 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
extern void StartupReorderBuffer(void);
+extern bool ReorderBufferFilterByLocator(ReorderBuffer *rb, TransactionId xid, RelFileLocator *relfileocator, ReorderBufferChangeType action, XLogRecPtr lsn);
#endif
diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl
new file mode 100644
index 0000000000..0b329645c2
--- /dev/null
+++ b/src/test/subscription/t/034_table_filter.pl
@@ -0,0 +1,89 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->append_conf('postgresql.conf', 'log_min_messages = DEBUG1');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_publisher->safe_psql('postgres',
+ "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;");
+$node_publisher->safe_psql('postgres',
+qq(
+CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$
+DECLARE
+ replication_record pg_stat_replication;
+BEGIN
+ LOOP
+ SELECT *
+ INTO replication_record
+ FROM pg_stat_replication
+ WHERE application_name = 'mysub';
+
+ IF replication_record.replay_lsn = replication_record.write_lsn THEN
+ EXIT;
+ END IF;
+
+ PERFORM pg_sleep(1);
+ END LOOP;
+END;
+\$\$ LANGUAGE plpgsql;));
+
+# Create some preexisting content on subscriber
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+ "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+ "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
+
+# test history snapshot
+$node_publisher->safe_psql('postgres',
+ "insert into tbl_pub select i, 'xyzzy', 'abcba', 'truncated' from generate_series(1,9) i;truncate tbl_pub;");
+$node_publisher->safe_psql('postgres',
+ "insert into tbl_pub select i, 'xyzzy', 'abcba', 'truncated' from generate_series(1,9) i;");
+my $pub_count = $node_publisher->safe_psql('postgres',
+ "select count(*) from tbl_pub;");
+is($pub_count, 9, 'check that the historical snapshot is correct.');
+
+# test table change filter
+my $logstart = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres',
+qq(BEGIN;
+insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,99) i;
+update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001;
+select check_replication_status();
+insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+COMMIT;)
+);
+
+my $filter_table_oid = $node_publisher->safe_psql('postgres', "select oid from pg_class where relname='tbl_t1';");
+ok($node_publisher->log_contains("logical filter change by table " . $filter_table_oid, $logstart),
+ "the change of the tbl_t1 table is filtered.");
+
+done_testing();
\ No newline at end of file
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 95ae7845d8..54cfb7e45e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1394,6 +1394,7 @@ LOCALLOCK
LOCALLOCKOWNER
LOCALLOCKTAG
LOCALPREDICATELOCK
+LocatorFilterEntry
LOCK
LOCKMASK
LOCKMETHODID
--
2.39.3