From f0eec7e9efcc995a95052af9fd230f7e23706efb Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 21 May 2025 18:02:36 +0900 Subject: [PATCH v4-master] Avoid distributing invalidation messages several times --- .../replication/logical/reorderbuffer.c | 367 ++++++++++++++---- src/backend/replication/logical/snapbuild.c | 5 +- src/include/replication/reorderbuffer.h | 6 + 3 files changed, 307 insertions(+), 71 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 67655111875..212ff4e0466 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -240,11 +240,30 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); */ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state); +static void ReorderBufferIterTXNInitForInval(ReorderBuffer *rb, + ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); +static void ReorderBufferIterTXNInitInternal(ReorderBuffer *rb, + ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state, + bool iterate_inval); + static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); +static ReorderBufferChange *ReorderBufferIterTXNNextForInval(ReorderBuffer *rb, ReorderBufferIterTXNState *state); +static ReorderBufferChange *ReorderBufferIterTXNNextInternal(ReorderBuffer *rb, ReorderBufferIterTXNState *state, + bool iterate_inval); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); +static void ReorderBufferQueueChangeInternal(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + ReorderBufferChange *change, bool toast_insert, + bool for_inval); +static void ReorderBufferExecuteInvalidationsInQueue(ReorderBuffer *rb, ReorderBufferTXN *txn); + +static void ReorderBufferQueueInval(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, ReorderBufferChange *change); + /* * --------------------------------------- * Disk serialization support functions @@ -431,6 +450,7 @@ ReorderBufferAllocTXN(ReorderBuffer *rb) dlist_init(&txn->changes); dlist_init(&txn->tuplecids); dlist_init(&txn->subtxns); + dlist_init(&txn->distributed_inval); /* InvalidCommandId is not zero, so set it explicitly */ txn->command_id = InvalidCommandId; @@ -790,6 +810,24 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert) +{ + ReorderBufferQueueChangeInternal(rb, xid, lsn, change, toast_insert, false); +} + +static void +ReorderBufferQueueInval(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, ReorderBufferChange *change) +{ + ReorderBufferQueueChangeInternal(rb, xid, lsn, change, false, true); +} + +/* + * Actual workhorse for ReorderBufferQueueChange and ReorderBufferQueueInval + */ +static void +ReorderBufferQueueChangeInternal(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, ReorderBufferChange *change, + bool toast_insert, bool for_inval) { ReorderBufferTXN *txn; @@ -800,7 +838,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, * previous changes or by checking its CLOG, there is no point in * collecting further changes for it. */ - if (rbtxn_is_aborted(txn)) + if (!for_inval && rbtxn_is_aborted(txn)) { /* * We don't need to update memory accounting for this change as we @@ -828,22 +866,84 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, } change->lsn = lsn; - change->txn = txn; + + /* + * Associate the change with current transaction only if we are not + * distributing invalidation messages. Associated one has shorter + * lifetime than the target transaction, which may cause dangling + * link issues. + */ + change->txn = for_inval ? NULL : txn; Assert(InvalidXLogRecPtr != lsn); - dlist_push_tail(&txn->changes, &change->node); - txn->nentries++; - txn->nentries_mem++; - /* update memory accounting information */ - ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, - ReorderBufferChangeSize(change)); + if (for_inval) + { + dlist_push_tail(&txn->distributed_inval, &change->node); + txn->nentries_distr++; + } + else + { + dlist_push_tail(&txn->changes, &change->node); + txn->nentries++; + txn->nentries_mem++; + + /* + * For normal changes, we must update memory usage and process pertial + * changes. This is not needed for distributed ones, because they + * won't be added to the total amount of memory. + */ + + /* update memory accounting information */ + ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, + ReorderBufferChangeSize(change)); - /* process partial change */ - ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); + /* process partial change */ + ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); - /* check the memory limits and evict something if needed */ - ReorderBufferCheckMemoryLimit(rb); + /* check the memory limits and evict something if needed */ + ReorderBufferCheckMemoryLimit(rb); + } +} + +static void +ReorderBufferExecuteInvalidationsInQueue(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + ReorderBufferChange *change; + ReorderBufferIterTXNState *volatile iterstate = NULL; + + ReorderBufferIterTXNInitForInval(rb, txn, &iterstate); + while ((change = ReorderBufferIterTXNNextForInval(rb, iterstate)) != NULL) + { + /* XXX do we have to do something here? */ + + switch (change->action) + { + /* Only interested in invalidation */ + case REORDER_BUFFER_CHANGE_INVALIDATION: + /* Execute the invalidation messages locally */ + ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + + /* Skip other changes because the transaction was aborted */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_MESSAGE: + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_TRUNCATE: + break; + } + } + + /* clean up the iterator */ + ReorderBufferIterTXNFinish(rb, iterstate); } /* @@ -1264,36 +1364,61 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state) +{ + ReorderBufferIterTXNInitInternal(rb, txn, iter_state, false); +} + +static void +ReorderBufferIterTXNInitForInval(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) +{ + ReorderBufferIterTXNInitInternal(rb, txn, iter_state, true); +} + +/* + * Actual workhorse for ReorderBufferIterTXNInit and ReorderBufferIterTXNInitForInval + */ +static void +ReorderBufferIterTXNInitInternal(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state, + bool iterate_inval) { Size nr_txns = 0; ReorderBufferIterTXNState *state; dlist_iter cur_txn_i; int32 off; + uint64 nentries; *iter_state = NULL; /* Check ordering of changes in the toplevel transaction. */ - AssertChangeLsnOrder(txn); + if (!iterate_inval) + AssertChangeLsnOrder(txn); + + nentries = iterate_inval ? txn->nentries_distr : txn->nentries; /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder * buffer, we count the one we were directly passed.) */ - if (txn->nentries > 0) + if (nentries > 0) nr_txns++; - dlist_foreach(cur_txn_i, &txn->subtxns) + if (!iterate_inval) { - ReorderBufferTXN *cur_txn; + dlist_foreach(cur_txn_i, &txn->subtxns) + { + ReorderBufferTXN *cur_txn; - cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - /* Check ordering of changes in this subtransaction. */ - AssertChangeLsnOrder(cur_txn); + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(cur_txn); - if (cur_txn->nentries > 0) - nr_txns++; + if (cur_txn->nentries > 0) + nr_txns++; + } } /* allocate iteration state */ @@ -1327,11 +1452,12 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, off = 0; /* add toplevel transaction if it contains changes */ - if (txn->nentries > 0) + if (nentries > 0) { ReorderBufferChange *cur_change; + dlist_head *head; - if (rbtxn_is_serialized(txn)) + if (!iterate_inval && rbtxn_is_serialized(txn)) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); @@ -1339,8 +1465,10 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, &state->entries[off].segno); } + head = iterate_inval ? &txn->distributed_inval : &txn->changes; + cur_change = dlist_head_element(ReorderBufferChange, node, - &txn->changes); + head); state->entries[off].lsn = cur_change->lsn; state->entries[off].change = cur_change; @@ -1349,34 +1477,42 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } - /* add subtransactions if they contain changes */ - dlist_foreach(cur_txn_i, &txn->subtxns) + if (!iterate_inval) { - ReorderBufferTXN *cur_txn; - - cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - - if (cur_txn->nentries > 0) + /* add subtransactions if they contain changes */ + dlist_foreach(cur_txn_i, &txn->subtxns) { - ReorderBufferChange *cur_change; + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - if (rbtxn_is_serialized(cur_txn)) + if (cur_txn->nentries > 0) { - /* serialize remaining changes */ - ReorderBufferSerializeTXN(rb, cur_txn); - ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].file, - &state->entries[off].segno); - } - cur_change = dlist_head_element(ReorderBufferChange, node, - &cur_txn->changes); + ReorderBufferChange *cur_change; + dlist_head *head; + + if (rbtxn_is_serialized(cur_txn)) + { + /* serialize remaining changes */ + ReorderBufferSerializeTXN(rb, cur_txn); + ReorderBufferRestoreChanges(rb, cur_txn, + &state->entries[off].file, + &state->entries[off].segno); + } + + head = iterate_inval ? &cur_txn->distributed_inval : &cur_txn->changes; + + cur_change = dlist_head_element(ReorderBufferChange, node, + head); - state->entries[off].lsn = cur_change->lsn; - state->entries[off].change = cur_change; - state->entries[off].txn = cur_txn; + state->entries[off].lsn = cur_change->lsn; + state->entries[off].change = cur_change; + state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + } } + } /* assemble a valid binary heap */ @@ -1391,10 +1527,29 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) +{ + return ReorderBufferIterTXNNextInternal(rb, state, false); +} + +static ReorderBufferChange * +ReorderBufferIterTXNNextForInval(ReorderBuffer *rb, ReorderBufferIterTXNState *state) +{ + return ReorderBufferIterTXNNextInternal(rb, state, true); +} + +/* + * Actual workhorse for ReorderBufferIterTXNNext and ReorderBufferIterTXNNextForInval + */ +static ReorderBufferChange * +ReorderBufferIterTXNNextInternal(ReorderBuffer *rb, ReorderBufferIterTXNState *state, + bool iterate_inval) { ReorderBufferChange *change; ReorderBufferIterTXNEntry *entry; int32 off; + dlist_head *head; + uint64 nentries; + uint64 nentries_mem; /* nothing there anymore */ if (state->heap->bh_size == 0) @@ -1414,15 +1569,33 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) change = entry->change; + if (iterate_inval) + { + head = &entry->txn->distributed_inval; + nentries = entry->txn->nentries_distr; + + /* + * The inval queue won't be serialized, so the number of entries on the + * memory is same as whole entries. + */ + nentries_mem = entry->txn->nentries_distr; + } + else + { + head = &entry->txn->changes; + nentries = entry->txn->nentries; + nentries_mem = entry->txn->nentries_mem; + } + /* * update heap with information about which transaction has the next * relevant change in LSN order */ /* there are in-memory changes */ - if (dlist_has_next(&entry->txn->changes, &entry->change->node)) + if (dlist_has_next(head, &entry->change->node)) { - dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); + dlist_node *next = dlist_next_node(head, &change->node); ReorderBufferChange *next_change = dlist_container(ReorderBufferChange, node, next); @@ -1435,7 +1608,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) } /* try to load changes from disk */ - if (entry->txn->nentries != entry->txn->nentries_mem) + if (nentries != nentries_mem) { /* * Ugly: restoring changes will reuse *Change records, thus delete the @@ -1456,11 +1629,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) /* successfully restored changes from disk */ ReorderBufferChange *next_change = dlist_head_element(ReorderBufferChange, node, - &entry->txn->changes); + head); elog(DEBUG2, "restored %u/%u changes from disk", - (uint32) entry->txn->nentries_mem, - (uint32) entry->txn->nentries); + (uint32) nentries_mem, + (uint32) nentries); Assert(entry->txn->nentries_mem); /* txn stays the same */ @@ -1480,6 +1653,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) /* * Deallocate the iterator + * + * XXX No need to prepare internal function because it does not refer txn. */ static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, @@ -1561,6 +1736,16 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Update the memory counter */ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); + /* cleanup inval queue */ + dlist_foreach_modify(iter, &txn->distributed_inval) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + ReorderBufferFreeChange(rb, change, false); + } + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -2662,6 +2847,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* make sure there's no cache pollution */ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidationsInQueue(rb, txn); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2713,6 +2899,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + /* XXX Should this put in the concurrent_abort case? */ + ReorderBufferExecuteInvalidationsInQueue(rb, txn); + if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -3438,6 +3627,21 @@ void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs) +{ + ReorderBufferAddInvalidationsExtended(rb, xid, lsn, nmsgs, msgs, true); +} + +/* + * An extended workhorse version of ReorderBufferAddInvalidations(). This can + * control whether invalidation messages can be accumulated in reorderbuffer + * transactions, which would be distributed to all concurrent transactions at + * commit. + */ +void +ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs, + bool needs_distribute) { ReorderBufferTXN *txn; MemoryContext oldcontext; @@ -3449,31 +3653,34 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, /* * Collect all the invalidations under the top transaction, if available, - * so that we can execute them all together. See comments atop this - * function. + * so that we can execute them all together. See comments atop + * ReorderBufferAddInvalidations. */ txn = rbtxn_get_toptxn(txn); Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) + if (needs_distribute) { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else - { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, + sizeof(SharedInvalidationMessage) * (txn->ninvalidations + nmsgs)); - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } change = ReorderBufferAllocChange(rb); @@ -3486,6 +3693,28 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, ReorderBufferQueueChange(rb, xid, lsn, change, false); + /* + * If the message is distributed from other transactions, also add to + * another queue. + * + * XXX: IIUC This must be done only to the toptxns, but is it right? + */ + if (!needs_distribute && !TransactionIdIsValid(txn->toplevel_xid)) + { + ReorderBufferChange *inval_change; + + /* Duplicate the inval change to queue it */ + inval_change = ReorderBufferAllocChange(rb); + inval_change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + inval_change->data.inval.ninvalidations = nmsgs; + inval_change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(inval_change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueInval(rb, xid, lsn, inval_change); + } + MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 0d7bddbe4ed..ceeabc6f420 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -807,8 +807,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddInvalidationsExtended(builder->reorder, txn->xid, lsn, + ninvalidations, msgs, + false); } } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 24e88c409ba..51969e41db2 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -452,6 +452,9 @@ typedef struct ReorderBufferTXN * Private data pointer of the output plugin. */ void *output_plugin_private; + + dlist_head distributed_inval; + uint64 nentries_distr; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -738,6 +741,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, CommandId cmin, CommandId cmax, CommandId combocid); extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +extern void ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs, + bool needs_distribute); extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations); extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); -- 2.47.1