From 259a1f79511bb5acb03167e44eadb2d3fd5ab644 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 21 May 2025 18:02:36 +0900 Subject: [PATCH] Avoid distributing invalidation messages several times --- .../replication/logical/reorderbuffer.c | 71 ++++++++++++------- src/backend/replication/logical/snapbuild.c | 5 +- src/include/replication/reorderbuffer.h | 3 + 3 files changed, 51 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 03eb005c39d..868fcd8a344 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3330,41 +3330,60 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs) { - ReorderBufferTXN *txn; + ReorderBufferAddInvalidationsExtended(rb, xid, lsn, nmsgs, msgs, true); +} + +/* + * An extended workhorse version of ReorderBufferAddInvalidations(). This can + * control whether invalidation messages can be accumulated in reorderbuffer + * transactions, which would be distributed to all concurrent transactions at + * commit. + */ +void +ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs, + bool needs_distribute) +{ MemoryContext oldcontext; ReorderBufferChange *change; - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - oldcontext = MemoryContextSwitchTo(rb->context); - /* - * Collect all the invalidations under the top transaction, if available, - * so that we can execute them all together. See comments atop this - * function. - */ - txn = rbtxn_get_toptxn(txn); - Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) - { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else + if (needs_distribute) { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* + * Collect all the invalidations under the top transaction, if + * available, so that we can execute them all together. See comments + * atop ReorderBufferAddInvalidations. + */ + txn = rbtxn_get_toptxn(txn); - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, + sizeof(SharedInvalidationMessage) * (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } change = ReorderBufferGetChange(rb); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 110e0b0a044..a8225c1d905 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -945,8 +945,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddInvalidationsExtended(builder->reorder, txn->xid, lsn, + ninvalidations, msgs, + false); } } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4c56f219fd8..001d0e716aa 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -709,6 +709,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, CommandId cmin, CommandId cmax, CommandId combocid); extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +extern void ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs, + bool needs_distribute); extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations); extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); -- 2.43.5