memory leak in logical WAL sender with pgoutput's cachectx
Hi all,
We recently ran into a memory leak in a production logical-replication WAL-sender
process. A simplified reproduction script is attached.
If you run the script and then call MemoryContextStats(TopMemoryContext). you
will see something like:
"logical replication cache context: 562044928 total in 77 blocks;"
meaning “cachectx” has grown to ~500 MB, and it keeps growing as the number of
tables increases.
The workload can be summarised as follows:
1. CREATE PUBLICATION FOR ALL TABLES
2. CREATE SUBSCRIPTION
3. Repeatedly CREATE TABLE and DROP TABLE
cachectx is used mainly for entry->old_slot, entry->new_slot and entry->attrmap
allocations. When a DROP TABLE causes an invalidation we only set
entry->replicate_valid = false; we do not free those allocations immediately.
They are freed only if the same entry is used again. In some workloads an entry
may never be reused, or it may be reused briefly and then become unreachable
forever (The WAL sender may still need to decode WAL records for tables that
have already been dropped while it is processing the invalidation.)
Given the current design I don’t see a simple fix. Perhaps RelationSyncCache
needs some kind of eviction/cleanup policy to prevent this memory growth in
such scenarios.
Does anyone have ideas or suggestions?
Attachments:
Dear Zhao,
Thanks for raising the issue.
If you run the script and then call MemoryContextStats(TopMemoryContext). you
will see something like:
"logical replication cache context: 562044928 total in 77 blocks;"
meaning “cachectx” has grown to ~500 MB, and it keeps growing as the number
I also ran your script with count=1000, and confirmed that cachectx was grown
to around 50MB:
```
logical replication cache context: 58728448 total in 17 blocks; 3239568 free (62 chunks); 55488880 used
```
cachectx is used mainly for entry->old_slot, entry->new_slot and entry->attrmap
allocations. When a DROP TABLE causes an invalidation we only set
entry->replicate_valid = false; we do not free those allocations immediately.
They are freed only if the same entry is used again. In some workloads an entry
may never be reused, or it may be reused briefly and then become unreachable
forever (The WAL sender may still need to decode WAL records for tables that
have already been dropped while it is processing the invalidation.)
So, your suggestion is that we should sometimes free allocated memory for them,
right? Valid point.
Given the current design I don’t see a simple fix. Perhaps RelationSyncCache
needs some kind of eviction/cleanup policy to prevent this memory growth in
such scenarios.Does anyone have ideas or suggestions?
Naively considered, relsync cahe can be cleaned up if entries were invalidated
many times. Attached patch implemented idea. It could reduce the used memory on
my env:
```
logical replication cache context: 1056768 total in 8 blocks; 556856 free (51 chunks); 499912 used
```
Can you verify that?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchapplication/octet-stream; name=0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchDownload
From 99ee555a88edff6e6ca356f8f4d7d24ab1569c51 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 14 Aug 2025 19:16:10 +0900
Subject: [PATCH] pgoutput: allow cleaning up Relsync cache entries
---
src/backend/replication/pgoutput/pgoutput.c | 208 +++++++++++++-------
1 file changed, 139 insertions(+), 69 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..2134bb3e2bc 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -219,10 +219,15 @@ typedef struct PGOutputTxnData
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
+/* How many times were cache entries invalidated? */
+static int RelationSyncCacheInvalidateCounter = 0;
+
static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
@@ -1764,6 +1769,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
{
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
+ RelationSyncCacheInvalidateCounter = 0;
}
}
@@ -1892,7 +1898,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(toptxn->xid, false);
+ cleanup_streamed_txn(toptxn->xid, false);
}
/*
@@ -1919,7 +1925,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(txn->xid, true);
+ cleanup_streamed_txn(txn->xid, true);
}
/*
@@ -2091,71 +2097,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
publications_valid = true;
}
- /*
- * Reset schema_sent status as the relation definition may have
- * changed. Also reset pubactions to empty in case rel was dropped
- * from a publication. Also free any objects that depended on the
- * earlier definition.
- */
- entry->schema_sent = false;
- entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- bms_free(entry->columns);
- entry->columns = NULL;
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
-
- /*
- * Tuple slots cleanups. (Will be rebuilt later if needed).
- */
- if (entry->old_slot)
- {
- TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->old_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
- if (entry->new_slot)
- {
- TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->new_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
-
- entry->old_slot = NULL;
- entry->new_slot = NULL;
-
- if (entry->attrmap)
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
-
- /*
- * Row filter cache cleanups.
- */
- if (entry->entry_cxt)
- MemoryContextDelete(entry->entry_cxt);
-
- entry->entry_cxt = NULL;
- entry->estate = NULL;
- memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /* Cleanup existing data */
+ cleanup_rel_sync_entry(entry);
/*
* Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2254,124 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
return entry;
}
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+ /*
+ * Reset schema_sent status as the relation definition may have changed.
+ * Also reset pubactions to empty in case rel was dropped from a
+ * publication. Also free any objects that depended on the earlier
+ * definition.
+ */
+ entry->schema_sent = false;
+ entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ {
+ TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+ if (entry->new_slot)
+ {
+ TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Cleanup invalidated entries in the cache.
+ *
+ * Whole entries must be checked to clean up the relsync cache. Since this
+ * might be the quite expensive task, the cleanup would be done when the
+ * invalidation happened many times.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+#define NINVALIDATION_THRESHOLD 100
+
+ Assert(RelationSyncCache);
+
+ /*
+ * Cleanup can be done if the when the invalidation happened many times.
+ */
+ if (RelationSyncCacheInvalidateCounter > NINVALIDATION_THRESHOLD)
+ {
+ RelationSyncEntry *entry;
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, RelationSyncCache);
+ while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+ {
+ /* Skip the valid entry */
+ if (entry->replicate_valid)
+ continue;
+
+ /* Cleanup the entry */
+ cleanup_rel_sync_entry(entry);
+
+ /* Remove the etnry from the cache */
+ if (hash_search(RelationSyncCache,
+ &entry->relid,
+ HASH_REMOVE,
+ NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+
+ /* Reset the counter */
+ RelationSyncCacheInvalidateCounter = 0;
+ }
+}
+
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
@@ -2323,7 +2384,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* cache - so tweak the schema_sent flag accordingly.
*/
static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
@@ -2387,7 +2448,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
if (entry != NULL)
+ {
entry->replicate_valid = false;
+ RelationSyncCacheInvalidateCounter++;
+ }
}
else
{
@@ -2398,8 +2462,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
+ RelationSyncCacheInvalidateCounter++;
}
}
+
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -2429,7 +2496,10 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
+ RelationSyncCacheInvalidateCounter++;
}
+
+ maybe_cleanup_rel_sync_cache();
}
/* Send Replication origin */
--
2.47.1
Hi,
Thanks for the patch.
On Thu, Aug 14, 2025 at 6:39 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Zhao,
Thanks for raising the issue.
If you run the script and then call MemoryContextStats(TopMemoryContext). you
will see something like:
"logical replication cache context: 562044928 total in 77 blocks;"
meaning “cachectx” has grown to ~500 MB, and it keeps growing as the numberI also ran your script with count=1000, and confirmed that cachectx was grown
to around 50MB:```
logical replication cache context: 58728448 total in 17 blocks; 3239568 free (62 chunks); 55488880 used
```cachectx is used mainly for entry->old_slot, entry->new_slot and entry->attrmap
allocations. When a DROP TABLE causes an invalidation we only set
entry->replicate_valid = false; we do not free those allocations immediately.
They are freed only if the same entry is used again. In some workloads an entry
may never be reused, or it may be reused briefly and then become unreachable
forever (The WAL sender may still need to decode WAL records for tables that
have already been dropped while it is processing the invalidation.)So, your suggestion is that we should sometimes free allocated memory for them,
right? Valid point.Given the current design I don’t see a simple fix. Perhaps RelationSyncCache
needs some kind of eviction/cleanup policy to prevent this memory growth in
such scenarios.Does anyone have ideas or suggestions?
Naively considered, relsync cahe can be cleaned up if entries were invalidated
many times. Attached patch implemented idea. It could reduce the used memory on
my env:```
logical replication cache context: 1056768 total in 8 blocks; 556856 free (51 chunks); 499912 used
```Can you verify that?
I had a quick look at it, and have some questions.
Is it safe to free the substructure from within rel_sync_cache_relation_cb()?
I’ also interested in the reasoning behind setting
NINVALIDATION_THRESHOLD to 100.
Best,
Xuneng
Dear Xuneng,
Is it safe to free the substructure from within rel_sync_cache_relation_cb()?
You referred the comment in rel_sync_cache_relation_cb() right? I understood like
that we must not access to any *system caches*, from the comment. Here we do not
re-build caches so that we do not access to the syscaches - it is permitted.
I'm happy if you also confirm the point.
I’ also interested in the reasoning behind setting
NINVALIDATION_THRESHOLD to 100.
This is the debatable point of this implementation. I set to 100 because it was
sufficient with the provided workload, but this may cause the replication lag.
We may have to consider a benchmark workload, measure data, and consider the
appropriate value. 100 is just an initial point.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
On Thursday, August 14, 2025 8:49 PM Hayato Kuroda (Fujitsu) <kuroda.hayato@fujitsu.com> wrote:
Dear Xuneng,
Is it safe to free the substructure from within rel_sync_cache_relation_cb()?
You referred the comment in rel_sync_cache_relation_cb() right? I understood
like that we must not access to any *system caches*, from the comment. Here
we do not re-build caches so that we do not access to the syscaches - it is
permitted.
We should also avoid freeing the cache within the callback function, as
entries might still be accessed after the callback is invoked. This callback can
be triggered during the execution of pgoutput_change, which poses a risk of
concurrent access issues. For reference, see commit 7f481b8, which addresses a
similar issue. Ideally, cache release should occur within the normal code
execution path, such as within pgoutput_change() or get_rel_sync_entry().
I’ also interested in the reasoning behind setting
NINVALIDATION_THRESHOLD to 100.This is the debatable point of this implementation. I set to 100 because it was
sufficient with the provided workload, but this may cause the replication lag.
We may have to consider a benchmark workload, measure data, and consider
the appropriate value. 100 is just an initial point.
I think it's worthwhile to test the performance impact of this approach. If the
table is altered but not dropped, removing the entries could introduce
additional overhead. Although this overhead might be negligible if within an
acceptable threshold, it still warrants consideration. Additionally, if the
number of invalidations is significant, the cleanup process may take a
considerable amount of time, potentially resulting in performance degradation.
In such cases, it might be beneficial to consider destroying the hash table and
creating a new one. Therefore, analyzing these scenarios is essential.
Best Regards,
Hou zj
Hi Zhijie and Hayato-san,
Thanks for your clarification!
On Fri, Aug 15, 2025 at 10:10 AM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
On Thursday, August 14, 2025 8:49 PM Hayato Kuroda (Fujitsu) <kuroda.hayato@fujitsu.com> wrote:
Dear Xuneng,
Is it safe to free the substructure from within rel_sync_cache_relation_cb()?
You referred the comment in rel_sync_cache_relation_cb() right? I understood
like that we must not access to any *system caches*, from the comment. Here
we do not re-build caches so that we do not access to the syscaches - it is
permitted.We should also avoid freeing the cache within the callback function, as
entries might still be accessed after the callback is invoked. This callback can
be triggered during the execution of pgoutput_change, which poses a risk of
concurrent access issues. For reference, see commit 7f481b8, which addresses a
similar issue. Ideally, cache release should occur within the normal code
execution path, such as within pgoutput_change() or get_rel_sync_entry().
I am still getting familiar with this module, so I cannot make a sound
confirmation for this yet. I'll take a look at the commit mentioned by
Zhijie and trace the callback paths.
I’ also interested in the reasoning behind setting
NINVALIDATION_THRESHOLD to 100.This is the debatable point of this implementation. I set to 100 because it was
sufficient with the provided workload, but this may cause the replication lag.
We may have to consider a benchmark workload, measure data, and consider
the appropriate value. 100 is just an initial point.I think it's worthwhile to test the performance impact of this approach. If the
table is altered but not dropped, removing the entries could introduce
additional overhead. Although this overhead might be negligible if within an
acceptable threshold, it still warrants consideration. Additionally, if the
number of invalidations is significant, the cleanup process may take a
considerable amount of time, potentially resulting in performance degradation.
In such cases, it might be beneficial to consider destroying the hash table and
creating a new one. Therefore, analyzing these scenarios is essential.
Intuitively, a heuristic threshold is straightforward but also can be
hard to get it "right" since the workload varies.
Designing and writing tests to reflect the real-world use cases well
may not be that easy as well. I'm wondering whether it's beneficial to
brainstorm other potential alternatives before settling on the current
approach and start benchmarking and performance tuning.
Sequential cleaning could be costful in certain cases. I don't know
much about RelationSyncCache's life cycle, but it seems that we should
be cautious on shortening it without causing subtle issues.
Best,
Xuneng
On Friday, August 15, 2025 10:59 AM Xuneng Zhou <xunengzhou@gmail.com> wrote:
Thanks for your clarification!
On Fri, Aug 15, 2025 at 10:10 AM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:On Thursday, August 14, 2025 8:49 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Xuneng,
Is it safe to free the substructure from within
rel_sync_cache_relation_cb()?
You referred the comment in rel_sync_cache_relation_cb() right? I
understood like that we must not access to any *system caches*, from
the comment. Here we do not re-build caches so that we do not access
to the syscaches - it is permitted.We should also avoid freeing the cache within the callback function,
as entries might still be accessed after the callback is invoked. This
callback can be triggered during the execution of pgoutput_change,
which poses a risk of concurrent access issues. For reference, see
commit 7f481b8, which addresses a similar issue. Ideally, cache
release should occur within the normal code execution path, such as withinpgoutput_change() or get_rel_sync_entry().
I am still getting familiar with this module, so I cannot make a sound
confirmation for this yet. I'll take a look at the commit mentioned by Zhijie and
trace the callback paths.I’ also interested in the reasoning behind setting
NINVALIDATION_THRESHOLD to 100.This is the debatable point of this implementation. I set to 100
because it was sufficient with the provided workload, but this may causethe replication lag.
We may have to consider a benchmark workload, measure data, and
consider the appropriate value. 100 is just an initial point.I think it's worthwhile to test the performance impact of this
approach. If the table is altered but not dropped, removing the
entries could introduce additional overhead. Although this overhead
might be negligible if within an acceptable threshold, it still
warrants consideration. Additionally, if the number of invalidations
is significant, the cleanup process may take a considerable amount of time,potentially resulting in performance degradation.
In such cases, it might be beneficial to consider destroying the hash
table and creating a new one. Therefore, analyzing these scenarios isessential.
Intuitively, a heuristic threshold is straightforward but also can be hard to get it
"right" since the workload varies.
Designing and writing tests to reflect the real-world use cases well may not be
that easy as well. I'm wondering whether it's beneficial to brainstorm other
potential alternatives before settling on the current approach and start
benchmarking and performance tuning.Sequential cleaning could be costful in certain cases. I don't know much about
RelationSyncCache's life cycle, but it seems that we should be cautious on
shortening it without causing subtle issues.
I don't have a great idea that can avoid introducing overhead in all cases.
An alternative approach I considered before is deleting the hash entry upon
invalidation if the entry is not in use. This would require adding an in_use
flag in RelationSyncEntry, setting it to true when get_rel_sync_entry is called,
and marking it as false once the entry is no longer in use. This approach is
inspired from the relcache invalidation. I am slightly not sure if it's worth
the effort, but just share this idea with a POC patch for reference, which has
been modified based on Kuroda-San's version.
Best Regards,
Hou zj
Attachments:
vApproach2-0001-delete-the-unused-hash-entry-on-invalidat.patchapplication/octet-stream; name=vApproach2-0001-delete-the-unused-hash-entry-on-invalidat.patchDownload
From 4a488d4bb9a7d0b97e14edde7854fdf0246de249 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 15 Aug 2025 12:47:52 +0800
Subject: [PATCH vApproach2] delete the unused hash entry on invalidation
---
src/backend/replication/pgoutput/pgoutput.c | 209 +++++++++++++-------
1 file changed, 139 insertions(+), 70 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..feb07529bf1 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -131,6 +131,8 @@ typedef struct RelationSyncEntry
bool schema_sent;
+ int in_use;
+
/*
* This will be PUBLISH_GENCOLS_STORED if the relation contains generated
* columns and the 'publish_generated_columns' parameter is set to
@@ -223,6 +225,9 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void close_rel_sync_entry(RelationSyncEntry *entry);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void delete_rel_sync_entry(RelationSyncEntry *entry);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
@@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
+ {
+ close_rel_sync_entry(relentry);
return;
+ }
break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate)
+ {
+ close_rel_sync_entry(relentry);
return;
+ }
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
+ {
+ close_rel_sync_entry(relentry);
return;
+ }
/*
* This is only possible if deletes are allowed even when replica
@@ -1621,6 +1635,8 @@ cleanup:
ExecDropSingleTupleTableSlot(new_slot);
}
+ close_rel_sync_entry(relentry);
+
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
}
@@ -1658,7 +1674,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry = get_rel_sync_entry(data, relation);
if (!relentry->pubactions.pubtruncate)
+ {
+ close_rel_sync_entry(relentry);
continue;
+ }
/*
* Don't send partitions if the publication wants to send only the
@@ -1666,7 +1685,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
*/
if (relation->rd_rel->relispartition &&
relentry->publish_as_relid != relid)
+ {
+ close_rel_sync_entry(relentry);
continue;
+ }
relids[nrelids++] = relid;
@@ -1675,6 +1697,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
pgoutput_send_begin(ctx, txn);
maybe_send_schema(ctx, change, relation, relentry);
+
+ close_rel_sync_entry(relentry);
}
if (nrelids > 0)
@@ -2048,6 +2072,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
entry->replicate_valid = false;
entry->schema_sent = false;
+ entry->in_use = false;
entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
@@ -2061,6 +2086,10 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->attrmap = NULL;
}
+ Assert(!entry->in_use);
+
+ entry->in_use = true;
+
/* Validate the entry */
if (!entry->replicate_valid)
{
@@ -2091,71 +2120,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
publications_valid = true;
}
- /*
- * Reset schema_sent status as the relation definition may have
- * changed. Also reset pubactions to empty in case rel was dropped
- * from a publication. Also free any objects that depended on the
- * earlier definition.
- */
- entry->schema_sent = false;
- entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- bms_free(entry->columns);
- entry->columns = NULL;
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
-
- /*
- * Tuple slots cleanups. (Will be rebuilt later if needed).
- */
- if (entry->old_slot)
- {
- TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->old_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
- if (entry->new_slot)
- {
- TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->new_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
-
- entry->old_slot = NULL;
- entry->new_slot = NULL;
-
- if (entry->attrmap)
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
-
- /*
- * Row filter cache cleanups.
- */
- if (entry->entry_cxt)
- MemoryContextDelete(entry->entry_cxt);
-
- entry->entry_cxt = NULL;
- entry->estate = NULL;
- memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /* Cleanup existing data */
+ cleanup_rel_sync_entry(entry);
/*
* Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2277,101 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
return entry;
}
+/*
+ * Mark the given entry as unused.
+ */
+static void
+close_rel_sync_entry(RelationSyncEntry *relentry)
+{
+ Assert(relentry->in_use);
+ relentry->in_use = false;
+}
+
+/*
+ * Cleanup attributes in the given entry.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+ /*
+ * Reset schema_sent status as the relation definition may have changed.
+ * Also reset pubactions to empty in case rel was dropped from a
+ * publication. Also free any objects that depended on the earlier
+ * definition.
+ */
+ entry->schema_sent = false;
+ entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ {
+ TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+ if (entry->new_slot)
+ {
+ TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+static void
+delete_rel_sync_entry(RelationSyncEntry *entry)
+{
+ Assert(!entry->in_use);
+
+ cleanup_rel_sync_entry(entry);
+
+ /* Remove the etnry from the cache */
+ if (hash_search(RelationSyncCache, &entry->relid, HASH_REMOVE, NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+}
+
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
@@ -2374,9 +2435,9 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
* Nobody keeps pointers to entries in this hash table around outside
* logical decoding callback calls - but invalidation events can come in
* *during* a callback if we do any syscache access in the callback.
- * Because of that we must mark the cache entry as invalid but not damage
- * any of its substructure here. The next get_rel_sync_entry() call will
- * rebuild it all.
+ * Because of that, if the hash entry is being used, we must mark the cache
+ * entry as invalid but not damage any of its substructure here. The next
+ * get_rel_sync_entry() call will rebuild it all.
*/
if (OidIsValid(relid))
{
@@ -2387,7 +2448,12 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
if (entry != NULL)
- entry->replicate_valid = false;
+ {
+ if (entry->in_use)
+ entry->replicate_valid = false;
+ else
+ delete_rel_sync_entry(entry);
+ }
}
else
{
@@ -2397,7 +2463,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
- entry->replicate_valid = false;
+ if (entry->in_use)
+ entry->replicate_valid = false;
+ else
+ delete_rel_sync_entry(entry);
}
}
}
--
2.50.1.windows.1
On Thu, Aug 14, 2025 at 10:26 PM Zhijie Hou (Fujitsu)
<houzj.fnst@fujitsu.com> wrote:
On Friday, August 15, 2025 10:59 AM Xuneng Zhou <xunengzhou@gmail.com> wrote:
Thanks for your clarification!
On Fri, Aug 15, 2025 at 10:10 AM Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com>
wrote:On Thursday, August 14, 2025 8:49 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Xuneng,
Is it safe to free the substructure from within
rel_sync_cache_relation_cb()?
You referred the comment in rel_sync_cache_relation_cb() right? I
understood like that we must not access to any *system caches*, from
the comment. Here we do not re-build caches so that we do not access
to the syscaches - it is permitted.We should also avoid freeing the cache within the callback function,
as entries might still be accessed after the callback is invoked. This
callback can be triggered during the execution of pgoutput_change,
which poses a risk of concurrent access issues. For reference, see
commit 7f481b8, which addresses a similar issue. Ideally, cache
release should occur within the normal code execution path, such as withinpgoutput_change() or get_rel_sync_entry().
I am still getting familiar with this module, so I cannot make a sound
confirmation for this yet. I'll take a look at the commit mentioned by Zhijie and
trace the callback paths.I’ also interested in the reasoning behind setting
NINVALIDATION_THRESHOLD to 100.This is the debatable point of this implementation. I set to 100
because it was sufficient with the provided workload, but this may causethe replication lag.
We may have to consider a benchmark workload, measure data, and
consider the appropriate value. 100 is just an initial point.I think it's worthwhile to test the performance impact of this
approach. If the table is altered but not dropped, removing the
entries could introduce additional overhead. Although this overhead
might be negligible if within an acceptable threshold, it still
warrants consideration. Additionally, if the number of invalidations
is significant, the cleanup process may take a considerable amount of time,potentially resulting in performance degradation.
In such cases, it might be beneficial to consider destroying the hash
table and creating a new one. Therefore, analyzing these scenarios isessential.
Intuitively, a heuristic threshold is straightforward but also can be hard to get it
"right" since the workload varies.
Designing and writing tests to reflect the real-world use cases well may not be
that easy as well. I'm wondering whether it's beneficial to brainstorm other
potential alternatives before settling on the current approach and start
benchmarking and performance tuning.Sequential cleaning could be costful in certain cases. I don't know much about
RelationSyncCache's life cycle, but it seems that we should be cautious on
shortening it without causing subtle issues.I don't have a great idea that can avoid introducing overhead in all cases.
An alternative approach I considered before is deleting the hash entry upon
invalidation if the entry is not in use. This would require adding an in_use
flag in RelationSyncEntry, setting it to true when get_rel_sync_entry is called,
and marking it as false once the entry is no longer in use. This approach is
inspired from the relcache invalidation. I am slightly not sure if it's worth
the effort, but just share this idea with a POC patch for reference, which has
been modified based on Kuroda-San's version.
IIUC the patch adds logic to close entries and sets in_use to false
after processing each change. For example, in pgoutput_change(), we
close the entries even after we send its change:
@@ -1621,6 +1635,8 @@ cleanup:
ExecDropSingleTupleTableSlot(new_slot);
}
+ close_rel_sync_entry(relentry);
+
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
Given that cache invalidation is executed upon replaying
REORDER_BUFFER_CHANGE_INVALIDATION and the end of a transaction
replay, in which case do we keep the relcache (i.e. just setting
replicate_valid=false) because of in_use=true?
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
Dear Sawada-san,
Given that cache invalidation is executed upon replaying
REORDER_BUFFER_CHANGE_INVALIDATION and the end of a transaction
replay, in which case do we keep the relcache (i.e. just setting
replicate_valid=false) because of in_use=true?
Per old discussion [1]/messages/by-id/OS0PR01MB57168F454E33D53551B8D20294259@OS0PR01MB5716.jpnprd01.prod.outlook.com, logicalrep_write_tuple ->SearchSysCache1 can cause the
cache invalidation (I did not verify though).
I felt in this case in_use can be true and validation can be skipped. Thought?
[1]: /messages/by-id/OS0PR01MB57168F454E33D53551B8D20294259@OS0PR01MB5716.jpnprd01.prod.outlook.com
Best regards,
Hayato Kuroda
FUJITSU LIMITED
On Fri, Aug 15, 2025 at 5:07 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Sawada-san,
Given that cache invalidation is executed upon replaying
REORDER_BUFFER_CHANGE_INVALIDATION and the end of a transaction
replay, in which case do we keep the relcache (i.e. just setting
replicate_valid=false) because of in_use=true?Per old discussion [1], logicalrep_write_tuple ->SearchSysCache1 can cause the
cache invalidation (I did not verify though).
I felt in this case in_use can be true and validation can be skipped. Thought?
I've not verified, but even if that's true, IIUC only one relation's
cache entry can set in_use to true at a time. If my understanding is
correct, when the walsender accepts invalidation messages in
logicalrep_write_tuple() as you mentioned, it doesn't release the
cache of the relation whose change is being sent but does for other
relations. It seems to me too aggressive to release caches compared to
the current behavior.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
On Wed, Aug 13, 2025 at 11:43 PM 赵宇鹏(宇彭) <zhaoyupeng.zyp@alibaba-inc.com> wrote:
Hi all,
We recently ran into a memory leak in a production logical-replication WAL-sender
process. A simplified reproduction script is attached.If you run the script and then call MemoryContextStats(TopMemoryContext). you
will see something like:
"logical replication cache context: 562044928 total in 77 blocks;"
meaning “cachectx” has grown to ~500 MB, and it keeps growing as the number of
tables increases.The workload can be summarised as follows:
1. CREATE PUBLICATION FOR ALL TABLES
2. CREATE SUBSCRIPTION
3. Repeatedly CREATE TABLE and DROP TABLE
I'd like to know more about your use cases so that we can better
understand this problem.
In the workload you summarized, many tables are created and dropped.
Do the changes of these tables need to be replicated to the
subscriber? It seems to me that if tables are present in short periods
of time, we can use temp tables instead unless these changes need to
be replicated to the subscribers.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
Dear Sawada-san,
I've not verified, but even if that's true, IIUC only one relation's
cache entry can set in_use to true at a time.
I also think so.
If my understanding is
correct, when the walsender accepts invalidation messages in
logicalrep_write_tuple() as you mentioned, it doesn't release the
cache of the relation whose change is being sent but does for other
relations. It seems to me too aggressive to release caches compared to
the current behavior.
Valid point. In some cases, publications can be altered then same relsync entries
would be added again.
So... first approach may be better from the perspective. Attached patch counts
the number of invalidated entries. There is a chance to cleanup them only at
COMMIT/ABORT/PREPARE time - hence in_use flag is not needed. Thought?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v2-0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchapplication/octet-stream; name=v2-0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchDownload
From dab41bb18d776d0e514d30441935c9ad5472293d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 14 Aug 2025 19:16:10 +0900
Subject: [PATCH v2] pgoutput: allow cleaning up Relsync cache entries
---
src/backend/replication/pgoutput/pgoutput.c | 224 ++++++++++++++------
1 file changed, 155 insertions(+), 69 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..d78de94f914 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -219,10 +219,15 @@ typedef struct PGOutputTxnData
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
+/* How many relsync cache entries have been invalidated? */
+static int RelationSyncCacheInvalidateCounter = 0;
+
static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
@@ -628,6 +633,9 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -659,6 +667,9 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1764,6 +1775,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
{
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
+ RelationSyncCacheInvalidateCounter = 0;
}
}
@@ -1892,7 +1904,10 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(toptxn->xid, false);
+ cleanup_streamed_txn(toptxn->xid, false);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1919,7 +1934,10 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(txn->xid, true);
+ cleanup_streamed_txn(txn->xid, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1938,6 +1956,9 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -2060,6 +2081,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->columns = NULL;
entry->attrmap = NULL;
}
+ else
+ {
+ /* The entry would be re-used, decrement the counter */
+ RelationSyncCacheInvalidateCounter--;
+ }
/* Validate the entry */
if (!entry->replicate_valid)
@@ -2091,71 +2117,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
publications_valid = true;
}
- /*
- * Reset schema_sent status as the relation definition may have
- * changed. Also reset pubactions to empty in case rel was dropped
- * from a publication. Also free any objects that depended on the
- * earlier definition.
- */
- entry->schema_sent = false;
- entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- bms_free(entry->columns);
- entry->columns = NULL;
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
-
- /*
- * Tuple slots cleanups. (Will be rebuilt later if needed).
- */
- if (entry->old_slot)
- {
- TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->old_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
- if (entry->new_slot)
- {
- TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->new_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
-
- entry->old_slot = NULL;
- entry->new_slot = NULL;
-
- if (entry->attrmap)
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
-
- /*
- * Row filter cache cleanups.
- */
- if (entry->entry_cxt)
- MemoryContextDelete(entry->entry_cxt);
-
- entry->entry_cxt = NULL;
- entry->estate = NULL;
- memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /* Cleanup existing data */
+ cleanup_rel_sync_entry(entry);
/*
* Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2274,124 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
return entry;
}
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+ /*
+ * Reset schema_sent status as the relation definition may have changed.
+ * Also reset pubactions to empty in case rel was dropped from a
+ * publication. Also free any objects that depended on the earlier
+ * definition.
+ */
+ entry->schema_sent = false;
+ entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ {
+ TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+ if (entry->new_slot)
+ {
+ TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Cleanup invalidated entries in the cache.
+ *
+ * Whole entries must be checked to clean up the relsync cache. Since this
+ * might be the quite expensive task, the cleanup would be done when the
+ * invalidation happened many times.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+#define NINVALIDATION_THRESHOLD 100
+
+ Assert(RelationSyncCache);
+
+ /*
+ * Cleanup can be done if the when the invalidation happened many times.
+ */
+ if (RelationSyncCacheInvalidateCounter > NINVALIDATION_THRESHOLD)
+ {
+ RelationSyncEntry *entry;
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, RelationSyncCache);
+ while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+ {
+ /* Skip the valid entry */
+ if (entry->replicate_valid)
+ continue;
+
+ /* Cleanup the entry */
+ cleanup_rel_sync_entry(entry);
+
+ /* Remove the etnry from the cache */
+ if (hash_search(RelationSyncCache,
+ &entry->relid,
+ HASH_REMOVE,
+ NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+
+ /* Reset the counter */
+ RelationSyncCacheInvalidateCounter = 0;
+ }
+}
+
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
@@ -2323,7 +2404,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* cache - so tweak the schema_sent flag accordingly.
*/
static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
@@ -2387,7 +2468,10 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
if (entry != NULL)
+ {
entry->replicate_valid = false;
+ RelationSyncCacheInvalidateCounter++;
+ }
}
else
{
@@ -2398,6 +2482,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
+ RelationSyncCacheInvalidateCounter++;
}
}
}
@@ -2429,6 +2514,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
+ RelationSyncCacheInvalidateCounter++;
}
}
--
2.47.1
On Sun, Aug 17, 2025 at 11:30 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
Dear Sawada-san,
I've not verified, but even if that's true, IIUC only one relation's
cache entry can set in_use to true at a time.I also think so.
If my understanding is
correct, when the walsender accepts invalidation messages in
logicalrep_write_tuple() as you mentioned, it doesn't release the
cache of the relation whose change is being sent but does for other
relations. It seems to me too aggressive to release caches compared to
the current behavior.Valid point. In some cases, publications can be altered then same relsync entries
would be added again.So... first approach may be better from the perspective. Attached patch counts
the number of invalidated entries. There is a chance to cleanup them only at
COMMIT/ABORT/PREPARE time - hence in_use flag is not needed. Thought?
In the proposed patch, we have the following change:
@@ -2060,6 +2081,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->columns = NULL;
entry->attrmap = NULL;
}
+ else
+ {
+ /* The entry would be re-used, decrement the counter */
+ RelationSyncCacheInvalidateCounter--;
+ }
It decrements the counter whenever we successfully find the entry from
the cache but I'm not sure this is the right approach. What if no
cache invalidation happens at all but we retrieve entries from the
cache many times?
I have concerns about the performance implications of iterating
through all entries in the caches within
maybe_cleanup_rel_sync_cache(). If the cache contains numerous
entries, this iteration could potentially cause the walsender to
stall. If we use a larger number NINVALIDATION_THRESHOLD, we can
reduce the number of times we need sequential scans on the hash table
but it would in turn need to free more entries (probably we can have a
cap of the number of entries we can free in one cycle?).
An alternative approach would be to implement a dedicated list (such
as dclist) specifically for tracking invalidated entries. Entries
would be removed from this list when they are reused. We could then
implement a threshold-based cleanup mechanism where invalidated
entries are freed once the list exceeds a predetermined size. While
this approach would minimize the overhead of freeing invalidated
entries, it would incur some additional cost for maintaining the list.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
Hi,
On Wed, Aug 20, 2025 at 8:44 AM Masahiko Sawada <sawada.mshk@gmail.com> wrote:
On Sun, Aug 17, 2025 at 11:30 PM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:Dear Sawada-san,
I've not verified, but even if that's true, IIUC only one relation's
cache entry can set in_use to true at a time.I also think so.
If my understanding is
correct, when the walsender accepts invalidation messages in
logicalrep_write_tuple() as you mentioned, it doesn't release the
cache of the relation whose change is being sent but does for other
relations. It seems to me too aggressive to release caches compared to
the current behavior.Valid point. In some cases, publications can be altered then same relsync entries
would be added again.So... first approach may be better from the perspective. Attached patch counts
the number of invalidated entries. There is a chance to cleanup them only at
COMMIT/ABORT/PREPARE time - hence in_use flag is not needed. Thought?In the proposed patch, we have the following change:
@@ -2060,6 +2081,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->columns = NULL; entry->attrmap = NULL; } + else + { + /* The entry would be re-used, decrement the counter */ + RelationSyncCacheInvalidateCounter--; + }It decrements the counter whenever we successfully find the entry from
the cache but I'm not sure this is the right approach. What if no
cache invalidation happens at all but we retrieve entries from the
cache many times?
This may not be ideal. It decrements on every lookup of an existing
entry, not just when consuming an invalidation, which could make the
counter go
negative. Do we need decrementing logic? Not perfect 1:1 tracking
seems ok in here; though it might make the clean-up a bit more
aggressive.
Best,
Xuneng
Dear Sawada-san,
It decrements the counter whenever we successfully find the entry from
the cache but I'm not sure this is the right approach. What if no
cache invalidation happens at all but we retrieve entries from the
cache many times?
Oh, right. I tried to handle the case that invalidated entries are re-validated
again, the approach was not correct.
I have concerns about the performance implications of iterating
through all entries in the caches within
maybe_cleanup_rel_sync_cache(). If the cache contains numerous
entries, this iteration could potentially cause the walsender to
stall. If we use a larger number NINVALIDATION_THRESHOLD, we can
reduce the number of times we need sequential scans on the hash table
but it would in turn need to free more entries (probably we can have a
cap of the number of entries we can free in one cycle?).
Exactly.
An alternative approach would be to implement a dedicated list (such
as dclist) specifically for tracking invalidated entries. Entries
would be removed from this list when they are reused. We could then
implement a threshold-based cleanup mechanism where invalidated
entries are freed once the list exceeds a predetermined size. While
this approach would minimize the overhead of freeing invalidated
entries, it would incur some additional cost for maintaining the list.
Firstly I also considered but did not choose because of the code complexity.
After considering more, it is not so difficult, PSA new file.
It introduces a global dclist invalidated_caches and a its node in RelationSyncEntry.
RelSyncEntry can be pushed to the list when it is invalidated, and the length is
checked at the end of txn. Listed entries can be released if the length exceeds
the threshold.
A flag "already_pushed" is also needed to avoid queuing the same entry twice.
There is a possibility that same entry can be invalidated twice before cleaning
up it. I spent much time to recognize it :-(.
I'm planning to do a benchmark with this patch.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v3-0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchapplication/octet-stream; name=v3-0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchDownload
From 4898cb800b699c439c284ceff611311cf4e43c9d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 14 Aug 2025 19:16:10 +0900
Subject: [PATCH v3] pgoutput: allow cleaning up Relsync cache entries
A list invalidated_caches is introduced to track invalidated entries. The number
of invalid entries is checked, and they are cleaned up when it exceeds the
threshold. The sweep can happen after a replicated COMMIT/ABORT/PREPARE message.
---
src/backend/replication/pgoutput/pgoutput.c | 244 ++++++++++++++------
src/test/recovery/t/100_cachectx_oom.pl | 188 +++++++++++++++
2 files changed, 363 insertions(+), 69 deletions(-)
create mode 100644 src/test/recovery/t/100_cachectx_oom.pl
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..b5ea100b599 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -185,6 +185,16 @@ typedef struct RelationSyncEntry
* row filter expressions, column list, etc.
*/
MemoryContext entry_cxt;
+
+ /*
+ * A node in the list of invalidated entries.
+ */
+ dlist_node invalidated_node;
+
+ /*
+ * Have this already been pushed to the invalidated list?
+ */
+ bool already_pushed;
} RelationSyncEntry;
/*
@@ -219,10 +229,15 @@ typedef struct PGOutputTxnData
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
+/* RelSyncCache entries which have been invalidated once */
+static dclist_head invalidated_caches = DCLIST_STATIC_INIT(invalidated_caches);
+
static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
@@ -628,6 +643,9 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -659,6 +677,9 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1764,6 +1785,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
{
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
+ dclist_init(&invalidated_caches);
}
}
@@ -1892,7 +1914,10 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(toptxn->xid, false);
+ cleanup_streamed_txn(toptxn->xid, false);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1919,7 +1944,10 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(txn->xid, true);
+ cleanup_streamed_txn(txn->xid, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1938,6 +1966,9 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1968,6 +1999,8 @@ init_rel_sync_cache(MemoryContext cachectx)
Assert(RelationSyncCache != NULL);
+ dclist_init(&invalidated_caches);
+
/* No more to do if we already registered callbacks */
if (relation_callbacks_registered)
return;
@@ -2059,6 +2092,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->publish_as_relid = InvalidOid;
entry->columns = NULL;
entry->attrmap = NULL;
+ entry->already_pushed = false;
}
/* Validate the entry */
@@ -2091,71 +2125,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
publications_valid = true;
}
- /*
- * Reset schema_sent status as the relation definition may have
- * changed. Also reset pubactions to empty in case rel was dropped
- * from a publication. Also free any objects that depended on the
- * earlier definition.
- */
- entry->schema_sent = false;
- entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- bms_free(entry->columns);
- entry->columns = NULL;
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
-
- /*
- * Tuple slots cleanups. (Will be rebuilt later if needed).
- */
- if (entry->old_slot)
- {
- TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->old_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
- if (entry->new_slot)
- {
- TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->new_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
-
- entry->old_slot = NULL;
- entry->new_slot = NULL;
-
- if (entry->attrmap)
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
-
- /*
- * Row filter cache cleanups.
- */
- if (entry->entry_cxt)
- MemoryContextDelete(entry->entry_cxt);
-
- entry->entry_cxt = NULL;
- entry->estate = NULL;
- memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /* Cleanup existing data */
+ cleanup_rel_sync_entry(entry);
/*
* Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2282,118 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
return entry;
}
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+ entry->schema_sent = false;
+ entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ {
+ TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+ if (entry->new_slot)
+ {
+ TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Cleanup invalidated entries in the cache.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+#define NINVALIDATION_THRESHOLD 100
+
+ Assert(RelationSyncCache);
+
+ /*
+ * Cleanup would be done if the list has enough entries.
+ */
+ if (dclist_count(&invalidated_caches) > NINVALIDATION_THRESHOLD)
+ {
+ while (dclist_count(&invalidated_caches))
+ {
+ dlist_node *node;
+ RelationSyncEntry *entry;
+
+ node = dclist_pop_head_node(&invalidated_caches);
+ entry = dclist_container(RelationSyncEntry, invalidated_node,
+ node);
+
+ /*
+ * Skip if the entry is valid. This meant that the relsync cache
+ * was invalidated once but used again.
+ */
+ if (entry->replicate_valid)
+ continue;
+
+ /* Remove the entry from the cache */
+ if (hash_search(RelationSyncCache,
+ &entry->relid,
+ HASH_REMOVE, NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+
+ /* Cleanup the entry */
+ cleanup_rel_sync_entry(entry);
+ }
+
+ Assert(dclist_is_empty(&invalidated_caches));
+ }
+}
+
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
@@ -2323,7 +2406,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* cache - so tweak the schema_sent flag accordingly.
*/
static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
@@ -2387,7 +2470,16 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
if (entry != NULL)
+ {
+ /* Push to the invalidated list if not yet */
+ if (!entry->already_pushed)
+ {
+ dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+ entry->already_pushed = true;
+ }
+
entry->replicate_valid = false;
+ }
}
else
{
@@ -2397,6 +2489,13 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
+ /* Push to the invalidated list if not yet */
+ if (!entry->already_pushed)
+ {
+ dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+ entry->already_pushed = true;
+ }
+
entry->replicate_valid = false;
}
}
@@ -2428,6 +2527,13 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
+ /* Push to the invalidated list if not yet */
+ if (!entry->already_pushed)
+ {
+ dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+ entry->already_pushed = true;
+ }
+
entry->replicate_valid = false;
}
}
diff --git a/src/test/recovery/t/100_cachectx_oom.pl b/src/test/recovery/t/100_cachectx_oom.pl
new file mode 100644
index 00000000000..cb6dd10df76
--- /dev/null
+++ b/src/test/recovery/t/100_cachectx_oom.pl
@@ -0,0 +1,188 @@
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+use Time::HiRes qw(usleep nanosleep);
+
+my $node_primary;
+my $node_subscriber;
+my $pid;
+my $kid;
+my $publisher_connstr;
+my $count = 1000;
+my $process = 3;
+
+$node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->start;
+
+$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', 'wal_receiver_timeout = 0');
+$node_subscriber->start;
+$node_primary->safe_psql('postgres',
+ "CREATE PUBLICATION my_pub FOR ALL TABLES;");
+
+$publisher_connstr = $node_primary->connstr . ' dbname=postgres';
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION my_sub CONNECTION '$publisher_connstr' PUBLICATION my_pub;"
+);
+
+usleep(1_000_000); #1s
+
+$node_primary->safe_psql('postgres', qq(
+ CREATE TABLE my_table(
+ col01 int8,
+ col02 int8,
+ col03 int8,
+ col04 int8,
+ col05 int8,
+ col06 int8,
+ col07 int8,
+ col08 int8,
+ col09 int8,
+ col10 int8,
+ col11 char(256),
+ col12 char(256),
+ col13 char(256),
+ col14 char(256),
+ col15 char(256),
+ col16 char(256),
+ col17 char(256),
+ col18 char(256),
+ col19 char(256),
+ col20 char(256),
+ col21 jsonb,
+ col22 jsonb,
+ col23 jsonb,
+ col24 jsonb,
+ col25 jsonb,
+ col26 jsonb,
+ col27 jsonb,
+ col28 jsonb,
+ col29 jsonb,
+ col30 jsonb,
+ col31 boolean,
+ col32 boolean,
+ col33 boolean,
+ col34 boolean,
+ col35 boolean,
+ col36 boolean,
+ col37 boolean,
+ col38 boolean,
+ col39 boolean,
+ col40 boolean,
+ col41 timestamp,
+ col42 timestamp,
+ col43 timestamp,
+ col44 timestamp,
+ col45 timestamp,
+ col46 timestamp,
+ col47 timestamp,
+ col48 timestamp,
+ col49 timestamp,
+ col50 timestamp);));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE my_table(
+ col01 int8,
+ col02 int8,
+ col03 int8,
+ col04 int8,
+ col05 int8,
+ col06 int8,
+ col07 int8,
+ col08 int8,
+ col09 int8,
+ col10 int8,
+ col11 char(256),
+ col12 char(256),
+ col13 char(256),
+ col14 char(256),
+ col15 char(256),
+ col16 char(256),
+ col17 char(256),
+ col18 char(256),
+ col19 char(256),
+ col20 char(256),
+ col21 jsonb,
+ col22 jsonb,
+ col23 jsonb,
+ col24 jsonb,
+ col25 jsonb,
+ col26 jsonb,
+ col27 jsonb,
+ col28 jsonb,
+ col29 jsonb,
+ col30 jsonb,
+ col31 boolean,
+ col32 boolean,
+ col33 boolean,
+ col34 boolean,
+ col35 boolean,
+ col36 boolean,
+ col37 boolean,
+ col38 boolean,
+ col39 boolean,
+ col40 boolean,
+ col41 timestamp,
+ col42 timestamp,
+ col43 timestamp,
+ col44 timestamp,
+ col45 timestamp,
+ col46 timestamp,
+ col47 timestamp,
+ col48 timestamp,
+ col49 timestamp,
+ col50 timestamp);));
+
+# subscriber
+for (my $i = 0; $i < $process; $i += 1) {
+ $pid = fork();
+ if ($pid == 0)
+ {
+ for (my $j = 0; $j < $count; $j += 1) {
+ $node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE my_table_$i\_$j AS TABLE my_table WITH NO DATA;));
+ if ( $j % 100 == 0 ) {
+ print "\nsubscriber:$j";
+ }
+ }
+ exec "echo '\nchild:$i bye'";
+ }
+}
+while (($kid = wait()) != -1 ) {
+ print "\nReaped child $kid\n";
+}
+
+# primary
+for (my $i = 0; $i < $process; $i += 1) {
+ $pid = fork();
+ if ($pid == 0)
+ {
+ for (my $j = 0; $j < $count; $j += 1) {
+ $node_primary->safe_psql('postgres', qq(
+ CREATE TABLE my_table_$i\_$j AS TABLE my_table WITH NO DATA;
+ INSERT INTO my_table_$i\_$j (col01) VALUES($j);
+ INSERT INTO my_table_$i\_$j (col01) VALUES($j);
+ DROP TABLE my_table_$i\_$j;
+ ));
+ if ( $j % 100 == 0 ) {
+ print "\nnode_primary:$j";
+ }
+ }
+ exec "echo '\nchild:$i bye'";
+ }
+}
+while (($kid = wait()) != -1 ) {
+ print "\nReaped child $kid\n";
+}
+
+usleep(3600_000_000); #3600s
+# usleep(4_000_000);
+
+is(1, 1, "test end");
+$node_primary->stop;
+done_testing();
--
2.47.1
Dear Xuneng,
This may not be ideal. It decrements on every lookup of an existing
entry, not just when consuming an invalidation, which could make the
counter go
negative. Do we need decrementing logic? Not perfect 1:1 tracking
seems ok in here; though it might make the clean-up a bit more
aggressive.
Yeah it was not needed. I posted new approach [1]/messages/by-id/OSCPR01MB14966E0225BC4AA1BC956E1E3F532A@OSCPR01MB14966.jpnprd01.prod.outlook.com and even this has a possibility
that some of listed entries are revived. At that time they won't be removed
immediately, they can be skipped while cleaning up.
[1]: /messages/by-id/OSCPR01MB14966E0225BC4AA1BC956E1E3F532A@OSCPR01MB14966.jpnprd01.prod.outlook.com
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Firstly I also considered but did not choose because of the code complexity.
After considering more, it is not so difficult, PSA new file.
v3 contained 100_cachectm_oom.pl, which won't succeed. Here is a patch
which removed the test file.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
v4-0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchapplication/octet-stream; name=v4-0001-pgoutput-allow-cleaning-up-Relsync-cache-entries.patchDownload
From 0bc3dbff0b1f3ed16b287d7127081658a25cc639 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 21 Aug 2025 14:32:37 +0900
Subject: [PATCH v4] pgoutput: allow cleaning up Relsync cache entries
A list invalidated_caches is introduced to track invalidated entries. The number
of invalid entries is checked, and they are cleaned up when it exceeds the
threshold. The sweep can happen after a replicated COMMIT/ABORT/PREPARE message.
---
src/backend/replication/pgoutput/pgoutput.c | 244 ++++++++++++++------
1 file changed, 175 insertions(+), 69 deletions(-)
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..b5ea100b599 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -185,6 +185,16 @@ typedef struct RelationSyncEntry
* row filter expressions, column list, etc.
*/
MemoryContext entry_cxt;
+
+ /*
+ * A node in the list of invalidated entries.
+ */
+ dlist_node invalidated_node;
+
+ /*
+ * Have this already been pushed to the invalidated list?
+ */
+ bool already_pushed;
} RelationSyncEntry;
/*
@@ -219,10 +229,15 @@ typedef struct PGOutputTxnData
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
+/* RelSyncCache entries which have been invalidated once */
+static dclist_head invalidated_caches = DCLIST_STATIC_INIT(invalidated_caches);
+
static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
@@ -628,6 +643,9 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -659,6 +677,9 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1764,6 +1785,7 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
{
hash_destroy(RelationSyncCache);
RelationSyncCache = NULL;
+ dclist_init(&invalidated_caches);
}
}
@@ -1892,7 +1914,10 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(toptxn->xid, false);
+ cleanup_streamed_txn(toptxn->xid, false);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1919,7 +1944,10 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(txn->xid, true);
+ cleanup_streamed_txn(txn->xid, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1938,6 +1966,9 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+
+ /* Cleanup RelSyncCache if needed */
+ maybe_cleanup_rel_sync_cache();
}
/*
@@ -1968,6 +1999,8 @@ init_rel_sync_cache(MemoryContext cachectx)
Assert(RelationSyncCache != NULL);
+ dclist_init(&invalidated_caches);
+
/* No more to do if we already registered callbacks */
if (relation_callbacks_registered)
return;
@@ -2059,6 +2092,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->publish_as_relid = InvalidOid;
entry->columns = NULL;
entry->attrmap = NULL;
+ entry->already_pushed = false;
}
/* Validate the entry */
@@ -2091,71 +2125,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
publications_valid = true;
}
- /*
- * Reset schema_sent status as the relation definition may have
- * changed. Also reset pubactions to empty in case rel was dropped
- * from a publication. Also free any objects that depended on the
- * earlier definition.
- */
- entry->schema_sent = false;
- entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- bms_free(entry->columns);
- entry->columns = NULL;
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
-
- /*
- * Tuple slots cleanups. (Will be rebuilt later if needed).
- */
- if (entry->old_slot)
- {
- TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->old_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
- if (entry->new_slot)
- {
- TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->new_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
-
- entry->old_slot = NULL;
- entry->new_slot = NULL;
-
- if (entry->attrmap)
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
-
- /*
- * Row filter cache cleanups.
- */
- if (entry->entry_cxt)
- MemoryContextDelete(entry->entry_cxt);
-
- entry->entry_cxt = NULL;
- entry->estate = NULL;
- memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /* Cleanup existing data */
+ cleanup_rel_sync_entry(entry);
/*
* Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2282,118 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
return entry;
}
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+ entry->schema_sent = false;
+ entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ {
+ TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+ if (entry->new_slot)
+ {
+ TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Cleanup invalidated entries in the cache.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+#define NINVALIDATION_THRESHOLD 100
+
+ Assert(RelationSyncCache);
+
+ /*
+ * Cleanup would be done if the list has enough entries.
+ */
+ if (dclist_count(&invalidated_caches) > NINVALIDATION_THRESHOLD)
+ {
+ while (dclist_count(&invalidated_caches))
+ {
+ dlist_node *node;
+ RelationSyncEntry *entry;
+
+ node = dclist_pop_head_node(&invalidated_caches);
+ entry = dclist_container(RelationSyncEntry, invalidated_node,
+ node);
+
+ /*
+ * Skip if the entry is valid. This meant that the relsync cache
+ * was invalidated once but used again.
+ */
+ if (entry->replicate_valid)
+ continue;
+
+ /* Remove the entry from the cache */
+ if (hash_search(RelationSyncCache,
+ &entry->relid,
+ HASH_REMOVE, NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+
+ /* Cleanup the entry */
+ cleanup_rel_sync_entry(entry);
+ }
+
+ Assert(dclist_is_empty(&invalidated_caches));
+ }
+}
+
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
@@ -2323,7 +2406,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* cache - so tweak the schema_sent flag accordingly.
*/
static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
@@ -2387,7 +2470,16 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
HASH_FIND, NULL);
if (entry != NULL)
+ {
+ /* Push to the invalidated list if not yet */
+ if (!entry->already_pushed)
+ {
+ dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+ entry->already_pushed = true;
+ }
+
entry->replicate_valid = false;
+ }
}
else
{
@@ -2397,6 +2489,13 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
+ /* Push to the invalidated list if not yet */
+ if (!entry->already_pushed)
+ {
+ dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+ entry->already_pushed = true;
+ }
+
entry->replicate_valid = false;
}
}
@@ -2428,6 +2527,13 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
hash_seq_init(&status, RelationSyncCache);
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
+ /* Push to the invalidated list if not yet */
+ if (!entry->already_pushed)
+ {
+ dclist_push_tail(&invalidated_caches, &entry->invalidated_node);
+ entry->already_pushed = true;
+ }
+
entry->replicate_valid = false;
}
}
--
2.47.1
Hi,
From what we see in our users’ production environments, the situation is exactly
as previously described. Creating a “publication for all tables” is very common,
because manually choosing individual tables to publish can be cumbersome.
Regular CREATE/DROP TABLE activity is also normal, and the tables are not
necessarily short-lived. Since walsender is intended to be a long-lived process,
its memory footprint keeps accumulating over time.
Even if we ignore DROP TABLE entirely and only consider a large number of tables
that must be published, RelationSyncEntry alone can consume substantial memory.
Many users run multiple walsenders on the same instance, which further increases
memory pressure.
In normal backend processes, many cache structures are never evicted. That
already causes issues, but it is at least somewhat tolerable because a backend
is considered short-lived and a periodic reconnect can release the memory.
A walsender, however, is expected to stay alive much longer, nobody wants
replication sessions to be dropped regularly, so I am genuinely curious why
structures like RelationSyncEntry were not given an LRU-style eviction mechanism
from the start.
Adding an LRU mechanism to RelationSyncEntry has another benefit: it puts an
upper bound on the workload of callbacks such as invalidation_cb, preventing
walsender from stalling when there are a large number of tables. I have
therefore implemented a prototype of this idea (borrowing some code from
Hayato Kuroda). It should keep memory usage under control in more scenarios
while introducing only minimal overhead in theory. I will run additional
performance tests to confirm this.
What do you think of this approach?
Best regards,
Attachments:
v1-0001-add-lru-for-rel_sync_cache.patchapplication/octet-streamDownload
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 80540c017bd..2dc9c6bd91b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -185,6 +185,8 @@ typedef struct RelationSyncEntry
* row filter expressions, column list, etc.
*/
MemoryContext entry_cxt;
+
+ dlist_node node; /* linked list pointers */
} RelationSyncEntry;
/*
@@ -218,11 +220,17 @@ typedef struct PGOutputTxnData
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
+/* least recently used entry list for RelationSyncCache. */
+static dclist_head rel_sync_cache_lru_list = DCLIST_STATIC_INIT(rel_sync_cache_lru_list);
+/* The maximum number of entries in the RelationSyncCache. */
+int max_rel_sync_cache_size = 1024;
static void init_rel_sync_cache(MemoryContext cachectx);
-static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static void cleanup_streamed_txn(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void cleanup_rel_sync_entry(RelationSyncEntry *entry);
+static void maybe_cleanup_rel_sync_cache(void);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
RelationSyncEntry *relentry);
@@ -1892,7 +1900,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(toptxn->xid, false);
+ cleanup_streamed_txn(toptxn->xid, false);
}
/*
@@ -1919,7 +1927,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
- cleanup_rel_sync_cache(txn->xid, true);
+ cleanup_streamed_txn(txn->xid, true);
}
/*
@@ -1968,6 +1976,8 @@ init_rel_sync_cache(MemoryContext cachectx)
Assert(RelationSyncCache != NULL);
+ dclist_init(&rel_sync_cache_lru_list);
+
/* No more to do if we already registered callbacks */
if (relation_callbacks_registered)
return;
@@ -2059,6 +2069,22 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->publish_as_relid = InvalidOid;
entry->columns = NULL;
entry->attrmap = NULL;
+
+ /*
+ * Since this is the most recently used entry, push this entry onto the
+ * end of the LRU list and check if there are any entries that need to
+ * be eliminated.
+ */
+ dclist_push_tail(&rel_sync_cache_lru_list, &entry->node);
+ maybe_cleanup_rel_sync_cache();
+ }
+ else
+ {
+ /*
+ * Move existing entry to the tail of the LRU list to mark it as the
+ * most recently used item.
+ */
+ dclist_move_tail(&rel_sync_cache_lru_list, &entry->node);
}
/* Validate the entry */
@@ -2091,71 +2117,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
publications_valid = true;
}
- /*
- * Reset schema_sent status as the relation definition may have
- * changed. Also reset pubactions to empty in case rel was dropped
- * from a publication. Also free any objects that depended on the
- * earlier definition.
- */
- entry->schema_sent = false;
- entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- bms_free(entry->columns);
- entry->columns = NULL;
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
-
- /*
- * Tuple slots cleanups. (Will be rebuilt later if needed).
- */
- if (entry->old_slot)
- {
- TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->old_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
- if (entry->new_slot)
- {
- TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
-
- Assert(desc->tdrefcount == -1);
-
- ExecDropSingleTupleTableSlot(entry->new_slot);
-
- /*
- * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
- * do it now to avoid any leaks.
- */
- FreeTupleDesc(desc);
- }
-
- entry->old_slot = NULL;
- entry->new_slot = NULL;
-
- if (entry->attrmap)
- free_attrmap(entry->attrmap);
- entry->attrmap = NULL;
-
- /*
- * Row filter cache cleanups.
- */
- if (entry->entry_cxt)
- MemoryContextDelete(entry->entry_cxt);
-
- entry->entry_cxt = NULL;
- entry->estate = NULL;
- memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ /* Cleanup existing data */
+ cleanup_rel_sync_entry(entry);
/*
* Build publication cache. We can't use one provided by relcache as
@@ -2311,6 +2274,106 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
return entry;
}
+/*
+ * Cleanup attributes in the given entry to reuse it.
+ */
+static void
+cleanup_rel_sync_entry(RelationSyncEntry *entry)
+{
+ /*
+ * Reset schema_sent status as the relation definition may have changed.
+ * Also reset pubactions to empty in case rel was dropped from a
+ * publication. Also free any objects that depended on the earlier
+ * definition.
+ */
+ entry->schema_sent = false;
+ entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ {
+ TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+ if (entry->new_slot)
+ {
+ TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
+
+ Assert(desc->tdrefcount == -1);
+
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ /*
+ * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so do
+ * it now to avoid any leaks.
+ */
+ FreeTupleDesc(desc);
+ }
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+}
+
+/*
+ * Try to cleanup the Least Recently Used entry in the RelationSyncCache.
+ */
+static void
+maybe_cleanup_rel_sync_cache(void)
+{
+ Assert(RelationSyncCache);
+
+ if (dclist_count(&rel_sync_cache_lru_list) > max_rel_sync_cache_size)
+ {
+ RelationSyncEntry *entry;
+
+ entry = dclist_head_element(RelationSyncEntry, node,
+ &rel_sync_cache_lru_list);
+
+ dclist_delete_from(&rel_sync_cache_lru_list, &entry->node);
+ cleanup_rel_sync_entry(entry);
+
+ /* Remove the etnry from the cache */
+ if (hash_search(RelationSyncCache,
+ &entry->relid,
+ HASH_REMOVE,
+ NULL) == NULL)
+ elog(ERROR, "hash table corrupted");
+ }
+}
+
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
@@ -2323,7 +2386,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* cache - so tweak the schema_sent flag accordingly.
*/
static void
-cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+cleanup_streamed_txn(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
On Thu, Aug 21, 2025 at 2:03 PM 赵宇鹏(宇彭) <zhaoyupeng.zyp@alibaba-inc.com> wrote:
From what we see in our users’ production environments, the situation is exactly
as previously described. Creating a “publication for all tables” is very common,
because manually choosing individual tables to publish can be cumbersome.
I understand the difficulty in choosing individual tables, but OTOH
all tables may lead to replicating tables that are not even required.
This consumes CPU, network, and disk space without the real need. This
sounds more harmful than once carefully describing publications. We
have another way to combine tables, which is to use TABLES IN SCHEMA.
I don't know if that can help, but anyway, we can't do much if the
user wants to replicate all tables.
Regular CREATE/DROP TABLE activity is also normal, and the tables are not
necessarily short-lived. Since walsender is intended to be a long-lived process,
its memory footprint keeps accumulating over time.Even if we ignore DROP TABLE entirely and only consider a large number of tables
that must be published, RelationSyncEntry alone can consume substantial memory.
Many users run multiple walsenders on the same instance, which further increases
memory pressure.
Agreed, there is a possibility of consuming a large amount of memory,
but still, we should do some experiments to see the real results. We
haven't seen complaints of walsenders consuming large amounts of
memory due to RelationSyncEntry's, so it is also possible that this is
just a theoretical case especially after we have some handling for
dropped tables.
In normal backend processes, many cache structures are never evicted. That
already causes issues, but it is at least somewhat tolerable because a backend
is considered short-lived and a periodic reconnect can release the memory.
A walsender, however, is expected to stay alive much longer, nobody wants
replication sessions to be dropped regularly, so I am genuinely curious why
structures like RelationSyncEntry were not given an LRU-style eviction mechanism
from the start.Adding an LRU mechanism to RelationSyncEntry has another benefit: it puts an
upper bound on the workload of callbacks such as invalidation_cb, preventing
walsender from stalling when there are a large number of tables. I have
therefore implemented a prototype of this idea (borrowing some code from
Hayato Kuroda). It should keep memory usage under control in more scenarios
while introducing only minimal overhead in theory.
Yeah, such an idea is worth considering after we have some tests to
show the high memory usage. BTW, I think it would be better to run
some algorithm to evict entries when we reach the threshold limit as
we do for shared buffers, see StrategyGetBuffer. Otherwise, we may be
paying the cost to maintain such a list when in practice it may be
required only very few times.
--
With Regards,
Amit Kapila.
On Thu, Aug 21, 2025 at 10:53 AM Hayato Kuroda (Fujitsu)
<kuroda.hayato@fujitsu.com> wrote:
I have concerns about the performance implications of iterating
through all entries in the caches within
maybe_cleanup_rel_sync_cache(). If the cache contains numerous
entries, this iteration could potentially cause the walsender to
stall. If we use a larger number NINVALIDATION_THRESHOLD, we can
reduce the number of times we need sequential scans on the hash table
but it would in turn need to free more entries (probably we can have a
cap of the number of entries we can free in one cycle?).Exactly.
So, at least we can try some tests before completely giving up on this idea.
An alternative approach would be to implement a dedicated list (such
as dclist) specifically for tracking invalidated entries. Entries
would be removed from this list when they are reused. We could then
implement a threshold-based cleanup mechanism where invalidated
entries are freed once the list exceeds a predetermined size. While
this approach would minimize the overhead of freeing invalidated
entries, it would incur some additional cost for maintaining the list.Firstly I also considered but did not choose because of the code complexity.
After considering more, it is not so difficult, PSA new file.
The other idea I was thinking of is if somehow we can decode the DROP
TABLE WAL record, say delete of relid from pg_class then we can use
that to remove the corresponding entry from RelationSyncCache.
--
With Regards,
Amit Kapila.
On Thu, Aug 21, 2025 at 2:55 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Aug 21, 2025 at 2:03 PM 赵宇鹏(宇彭) <zhaoyupeng.zyp@alibaba-inc.com> wrote:
From what we see in our users’ production environments, the situation is exactly
as previously described. Creating a “publication for all tables” is very common,
because manually choosing individual tables to publish can be cumbersome.I understand the difficulty in choosing individual tables, but OTOH
all tables may lead to replicating tables that are not even required.
This consumes CPU, network, and disk space without the real need. This
sounds more harmful than once carefully describing publications. We
have another way to combine tables, which is to use TABLES IN SCHEMA.
I don't know if that can help, but anyway, we can't do much if the
user wants to replicate all tables.Regular CREATE/DROP TABLE activity is also normal, and the tables are not
necessarily short-lived. Since walsender is intended to be a long-lived process,
its memory footprint keeps accumulating over time.Even if we ignore DROP TABLE entirely and only consider a large number of tables
that must be published, RelationSyncEntry alone can consume substantial memory.
Many users run multiple walsenders on the same instance, which further increases
memory pressure.Agreed, there is a possibility of consuming a large amount of memory,
but still, we should do some experiments to see the real results. We
haven't seen complaints of walsenders consuming large amounts of
memory due to RelationSyncEntry's, so it is also possible that this is
just a theoretical case especially after we have some handling for
dropped tables.In normal backend processes, many cache structures are never evicted. That
already causes issues, but it is at least somewhat tolerable because a backend
is considered short-lived and a periodic reconnect can release the memory.
A walsender, however, is expected to stay alive much longer, nobody wants
replication sessions to be dropped regularly, so I am genuinely curious why
structures like RelationSyncEntry were not given an LRU-style eviction mechanism
from the start.
FYI as far as I know we've discussed that a similar problem exists
also in normal backend processes especially for users who are using a
connection pool[1]/messages/by-id/20161219.201505.11562604.horiguchi.kyotaro@lab.ntt.co.jp (it was about syscache bloat due to negative
caches).
With your proposed patch, it maintains cache entries with a hard limit
of 1024 entries using LRU eviction. This could become a performance
bottleneck when replicating more than 1,024 tables. Even for databases
having less than 1024 tables it would lead to a negative performance
impact due to the cost of maintaining LRU order. Also, while having a
maximum capacity with an eviction mechanism makes sense, we should
make this limit configurable (probably by size?). If we have the limit
in size it might be worth noting that cache entries consume variable
amounts of memory, and their underlying memory blocks might persist
even after entry eviction.
Adding an LRU mechanism to RelationSyncEntry has another benefit: it puts an
upper bound on the workload of callbacks such as invalidation_cb, preventing
walsender from stalling when there are a large number of tables. I have
therefore implemented a prototype of this idea (borrowing some code from
Hayato Kuroda). It should keep memory usage under control in more scenarios
while introducing only minimal overhead in theory.Yeah, such an idea is worth considering after we have some tests to
show the high memory usage. BTW, I think it would be better to run
some algorithm to evict entries when we reach the threshold limit as
we do for shared buffers, see StrategyGetBuffer. Otherwise, we may be
paying the cost to maintain such a list when in practice it may be
required only very few times.
Yeah, something like clock-sweep could work too. I think we need to
carefully select the cache management algorithm. While it's a well
known fact that maintaining LRU ordering is expensive, I'm really not
sure the clock-sweep is a suitable algorithm for logical replication's
cache use cases. Also, we might want to avoid iterating all caches to
find caches to evict.
Regards,
[1]: /messages/by-id/20161219.201505.11562604.horiguchi.kyotaro@lab.ntt.co.jp
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com