From 277a3ae7a8ed9f08cace293d9b6a06377e3e6ed1 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 21 May 2025 18:02:36 +0900 Subject: [PATCH v2-PG17] Avoid distributing invalidation messages several times --- .../replication/logical/reorderbuffer.c | 120 ++++++++++++++---- src/backend/replication/logical/snapbuild.c | 5 +- src/include/replication/reorderbuffer.h | 3 + 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 03eb005c39d..d92247ca41b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2093,6 +2093,53 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(txn->size == 0); } +/* + * Lookup changes in the transaction and apply only invalidation messages + */ +static void +ReorderBufferProcessInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + ReorderBufferChange *change; + ReorderBufferIterTXNState *volatile iterstate = NULL; + + ReorderBufferIterTXNInit(rb, txn, &iterstate); + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + switch (change->action) + { + /* Only interested in invalidation messages */ + case REORDER_BUFFER_CHANGE_INVALIDATION: + /* + * Execute the invalidation messages locally. + * + * XXX: what if ereport(ERROR) happens here? + */ + ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + + /* Skip other changes because the transaction was aborted */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_MESSAGE: + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_TRUNCATE: + break; + } + } + + /* clean up the iterator */ + ReorderBufferIterTXNFinish(rb, iterstate); +} + /* * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. * @@ -2650,6 +2697,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, errdata = NULL; curtxn->concurrent_abort = true; + ReorderBufferProcessInvalidations(rb, txn); + /* Reset the TXN so that it is allowed to stream remaining data. */ ReorderBufferResetTXN(rb, txn, snapshot_now, command_id, prev_lsn, @@ -3330,41 +3379,60 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs) { - ReorderBufferTXN *txn; + ReorderBufferAddInvalidationsExtended(rb, xid, lsn, nmsgs, msgs, true); +} + +/* + * An extended workhorse version of ReorderBufferAddInvalidations(). This can + * control whether invalidation messages can be accumulated in reorderbuffer + * transactions, which would be distributed to all concurrent transactions at + * commit. + */ +void +ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs, + bool needs_distribute) +{ MemoryContext oldcontext; ReorderBufferChange *change; - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - oldcontext = MemoryContextSwitchTo(rb->context); - /* - * Collect all the invalidations under the top transaction, if available, - * so that we can execute them all together. See comments atop this - * function. - */ - txn = rbtxn_get_toptxn(txn); - Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) - { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else + if (needs_distribute) { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* + * Collect all the invalidations under the top transaction, if + * available, so that we can execute them all together. See comments + * atop ReorderBufferAddInvalidations. + */ + txn = rbtxn_get_toptxn(txn); + + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, + sizeof(SharedInvalidationMessage) * (txn->ninvalidations + nmsgs)); - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } change = ReorderBufferGetChange(rb); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 110e0b0a044..a8225c1d905 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -945,8 +945,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddInvalidationsExtended(builder->reorder, txn->xid, lsn, + ninvalidations, msgs, + false); } } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4c56f219fd8..001d0e716aa 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -709,6 +709,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, CommandId cmin, CommandId cmax, CommandId combocid); extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +extern void ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs, + bool needs_distribute); extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations); extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); -- 2.43.5