From 82e688a1734cda5076ab1d6fdd5997f1e8cbe792 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 21 May 2025 18:02:36 +0900 Subject: [PATCH v4-PG17] Avoid distributing invalidation messages several times --- .../replication/logical/reorderbuffer.c | 367 ++++++++++++++---- src/backend/replication/logical/snapbuild.c | 5 +- src/include/replication/reorderbuffer.h | 6 + 3 files changed, 307 insertions(+), 71 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 03eb005c39d..34c5dd88335 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -239,11 +239,30 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); */ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state); +static void ReorderBufferIterTXNInitForInval(ReorderBuffer *rb, + ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); +static void ReorderBufferIterTXNInitInternal(ReorderBuffer *rb, + ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state, + bool iterate_inval); + static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); +static ReorderBufferChange *ReorderBufferIterTXNNextForInval(ReorderBuffer *rb, ReorderBufferIterTXNState *state); +static ReorderBufferChange *ReorderBufferIterTXNNextInternal(ReorderBuffer *rb, ReorderBufferIterTXNState *state, + bool iterate_inval); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); +static void ReorderBufferQueueChangeInternal(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + ReorderBufferChange *change, bool toast_insert, + bool for_inval); +static void ReorderBufferExecuteInvalidationsInQueue(ReorderBuffer *rb, ReorderBufferTXN *txn); + +static void ReorderBufferQueueInval(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, ReorderBufferChange *change); + /* * --------------------------------------- * Disk serialization support functions @@ -428,6 +447,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb) dlist_init(&txn->changes); dlist_init(&txn->tuplecids); dlist_init(&txn->subtxns); + dlist_init(&txn->distributed_inval); /* InvalidCommandId is not zero, so set it explicitly */ txn->command_id = InvalidCommandId; @@ -787,6 +807,24 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert) +{ + ReorderBufferQueueChangeInternal(rb, xid, lsn, change, toast_insert, false); +} + +static void +ReorderBufferQueueInval(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, ReorderBufferChange *change) +{ + ReorderBufferQueueChangeInternal(rb, xid, lsn, change, false, true); +} + +/* + * Actual workhorse for ReorderBufferQueueChange and ReorderBufferQueueInval + */ +static void +ReorderBufferQueueChangeInternal(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, ReorderBufferChange *change, + bool toast_insert, bool for_inval) { ReorderBufferTXN *txn; @@ -797,7 +835,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, * transaction is aborted. So there is no point in collecting further * changes for it. */ - if (txn->concurrent_abort) + if (!for_inval && txn->concurrent_abort) { /* * We don't need to update memory accounting for this change as we @@ -825,22 +863,84 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, } change->lsn = lsn; - change->txn = txn; + + /* + * Associate the change with current transaction only if we are not + * distributing invalidation messages. Associated one has shorter + * lifetime than the target transaction, which may cause dangling + * link issues. + */ + change->txn = for_inval ? NULL : txn; Assert(InvalidXLogRecPtr != lsn); - dlist_push_tail(&txn->changes, &change->node); - txn->nentries++; - txn->nentries_mem++; - /* update memory accounting information */ - ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, - ReorderBufferChangeSize(change)); + if (for_inval) + { + dlist_push_tail(&txn->distributed_inval, &change->node); + txn->nentries_distr++; + } + else + { + dlist_push_tail(&txn->changes, &change->node); + txn->nentries++; + txn->nentries_mem++; + + /* + * For normal changes, we must update memory usage and process pertial + * changes. This is not needed for distributed ones, because they + * won't be added to the total amount of memory. + */ + + /* update memory accounting information */ + ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, + ReorderBufferChangeSize(change)); + + /* process partial change */ + ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); - /* process partial change */ - ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); + /* check the memory limits and evict something if needed */ + ReorderBufferCheckMemoryLimit(rb); + } +} + +static void +ReorderBufferExecuteInvalidationsInQueue(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + ReorderBufferChange *change; + ReorderBufferIterTXNState *volatile iterstate = NULL; + + ReorderBufferIterTXNInitForInval(rb, txn, &iterstate); + while ((change = ReorderBufferIterTXNNextForInval(rb, iterstate)) != NULL) + { + /* XXX do we have to do something here? */ + + switch (change->action) + { + /* Only interested in invalidation */ + case REORDER_BUFFER_CHANGE_INVALIDATION: + /* Execute the invalidation messages locally */ + ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + + /* Skip other changes because the transaction was aborted */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_MESSAGE: + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_TRUNCATE: + break; + } + } - /* check the memory limits and evict something if needed */ - ReorderBufferCheckMemoryLimit(rb); + /* clean up the iterator */ + ReorderBufferIterTXNFinish(rb, iterstate); } /* @@ -1261,36 +1361,61 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state) +{ + ReorderBufferIterTXNInitInternal(rb, txn, iter_state, false); +} + +static void +ReorderBufferIterTXNInitForInval(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) +{ + ReorderBufferIterTXNInitInternal(rb, txn, iter_state, true); +} + +/* + * Actual workhorse for ReorderBufferIterTXNInit and ReorderBufferIterTXNInitForInval + */ +static void +ReorderBufferIterTXNInitInternal(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state, + bool iterate_inval) { Size nr_txns = 0; ReorderBufferIterTXNState *state; dlist_iter cur_txn_i; int32 off; + uint64 nentries; *iter_state = NULL; /* Check ordering of changes in the toplevel transaction. */ - AssertChangeLsnOrder(txn); + if (!iterate_inval) + AssertChangeLsnOrder(txn); + + nentries = iterate_inval ? txn->nentries_distr : txn->nentries; /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder * buffer, we count the one we were directly passed.) */ - if (txn->nentries > 0) + if (nentries > 0) nr_txns++; - dlist_foreach(cur_txn_i, &txn->subtxns) + if (!iterate_inval) { - ReorderBufferTXN *cur_txn; + dlist_foreach(cur_txn_i, &txn->subtxns) + { + ReorderBufferTXN *cur_txn; - cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - /* Check ordering of changes in this subtransaction. */ - AssertChangeLsnOrder(cur_txn); + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(cur_txn); - if (cur_txn->nentries > 0) - nr_txns++; + if (cur_txn->nentries > 0) + nr_txns++; + } } /* allocate iteration state */ @@ -1324,11 +1449,12 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, off = 0; /* add toplevel transaction if it contains changes */ - if (txn->nentries > 0) + if (nentries > 0) { ReorderBufferChange *cur_change; + dlist_head *head; - if (rbtxn_is_serialized(txn)) + if (!iterate_inval && rbtxn_is_serialized(txn)) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); @@ -1336,8 +1462,10 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, &state->entries[off].segno); } + head = iterate_inval ? &txn->distributed_inval : &txn->changes; + cur_change = dlist_head_element(ReorderBufferChange, node, - &txn->changes); + head); state->entries[off].lsn = cur_change->lsn; state->entries[off].change = cur_change; @@ -1346,34 +1474,42 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } - /* add subtransactions if they contain changes */ - dlist_foreach(cur_txn_i, &txn->subtxns) + if (!iterate_inval) { - ReorderBufferTXN *cur_txn; - - cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - - if (cur_txn->nentries > 0) + /* add subtransactions if they contain changes */ + dlist_foreach(cur_txn_i, &txn->subtxns) { - ReorderBufferChange *cur_change; + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); - if (rbtxn_is_serialized(cur_txn)) + if (cur_txn->nentries > 0) { - /* serialize remaining changes */ - ReorderBufferSerializeTXN(rb, cur_txn); - ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].file, - &state->entries[off].segno); - } - cur_change = dlist_head_element(ReorderBufferChange, node, - &cur_txn->changes); + ReorderBufferChange *cur_change; + dlist_head *head; + + if (rbtxn_is_serialized(cur_txn)) + { + /* serialize remaining changes */ + ReorderBufferSerializeTXN(rb, cur_txn); + ReorderBufferRestoreChanges(rb, cur_txn, + &state->entries[off].file, + &state->entries[off].segno); + } + + head = iterate_inval ? &cur_txn->distributed_inval : &cur_txn->changes; + + cur_change = dlist_head_element(ReorderBufferChange, node, + head); - state->entries[off].lsn = cur_change->lsn; - state->entries[off].change = cur_change; - state->entries[off].txn = cur_txn; + state->entries[off].lsn = cur_change->lsn; + state->entries[off].change = cur_change; + state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + } } + } /* assemble a valid binary heap */ @@ -1388,10 +1524,29 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) +{ + return ReorderBufferIterTXNNextInternal(rb, state, false); +} + +static ReorderBufferChange * +ReorderBufferIterTXNNextForInval(ReorderBuffer *rb, ReorderBufferIterTXNState *state) +{ + return ReorderBufferIterTXNNextInternal(rb, state, true); +} + +/* + * Actual workhorse for ReorderBufferIterTXNNext and ReorderBufferIterTXNNextForInval + */ +static ReorderBufferChange * +ReorderBufferIterTXNNextInternal(ReorderBuffer *rb, ReorderBufferIterTXNState *state, + bool iterate_inval) { ReorderBufferChange *change; ReorderBufferIterTXNEntry *entry; int32 off; + dlist_head *head; + uint64 nentries; + uint64 nentries_mem; /* nothing there anymore */ if (state->heap->bh_size == 0) @@ -1411,15 +1566,33 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) change = entry->change; + if (iterate_inval) + { + head = &entry->txn->distributed_inval; + nentries = entry->txn->nentries_distr; + + /* + * The inval queue won't be serialized, so the number of entries on the + * memory is same as whole entries. + */ + nentries_mem = entry->txn->nentries_distr; + } + else + { + head = &entry->txn->changes; + nentries = entry->txn->nentries; + nentries_mem = entry->txn->nentries_mem; + } + /* * update heap with information about which transaction has the next * relevant change in LSN order */ /* there are in-memory changes */ - if (dlist_has_next(&entry->txn->changes, &entry->change->node)) + if (dlist_has_next(head, &entry->change->node)) { - dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); + dlist_node *next = dlist_next_node(head, &change->node); ReorderBufferChange *next_change = dlist_container(ReorderBufferChange, node, next); @@ -1432,7 +1605,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) } /* try to load changes from disk */ - if (entry->txn->nentries != entry->txn->nentries_mem) + if (nentries != nentries_mem) { /* * Ugly: restoring changes will reuse *Change records, thus delete the @@ -1453,11 +1626,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) /* successfully restored changes from disk */ ReorderBufferChange *next_change = dlist_head_element(ReorderBufferChange, node, - &entry->txn->changes); + head); elog(DEBUG2, "restored %u/%u changes from disk", - (uint32) entry->txn->nentries_mem, - (uint32) entry->txn->nentries); + (uint32) nentries_mem, + (uint32) nentries); Assert(entry->txn->nentries_mem); /* txn stays the same */ @@ -1477,6 +1650,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) /* * Deallocate the iterator + * + * XXX No need to prepare internal function because it does not refer txn. */ static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, @@ -1558,6 +1733,16 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Update the memory counter */ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); + /* cleanup inval queue */ + dlist_foreach_modify(iter, &txn->distributed_inval) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + ReorderBufferReturnChange(rb, change, false); + } + /* * Cleanup the tuplecids we stored for decoding catalog snapshot access. * They are always stored in the toplevel transaction. @@ -2575,6 +2760,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* make sure there's no cache pollution */ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + ReorderBufferExecuteInvalidationsInQueue(rb, txn); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2623,6 +2809,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + /* XXX Should this put in the concurrent_abort case? */ + ReorderBufferExecuteInvalidationsInQueue(rb, txn); + if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -3329,6 +3518,21 @@ void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs) +{ + ReorderBufferAddInvalidationsExtended(rb, xid, lsn, nmsgs, msgs, true); +} + +/* + * An extended workhorse version of ReorderBufferAddInvalidations(). This can + * control whether invalidation messages can be accumulated in reorderbuffer + * transactions, which would be distributed to all concurrent transactions at + * commit. + */ +void +ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs, + bool needs_distribute) { ReorderBufferTXN *txn; MemoryContext oldcontext; @@ -3340,31 +3544,34 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, /* * Collect all the invalidations under the top transaction, if available, - * so that we can execute them all together. See comments atop this - * function. + * so that we can execute them all together. See comments atop + * ReorderBufferAddInvalidations. */ txn = rbtxn_get_toptxn(txn); Assert(nmsgs > 0); - /* Accumulate invalidations. */ - if (txn->ninvalidations == 0) + if (needs_distribute) { - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - palloc(sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); - } - else - { - txn->invalidations = (SharedInvalidationMessage *) - repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * - (txn->ninvalidations + nmsgs)); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, + sizeof(SharedInvalidationMessage) * (txn->ninvalidations + nmsgs)); - memcpy(txn->invalidations + txn->ninvalidations, msgs, - nmsgs * sizeof(SharedInvalidationMessage)); - txn->ninvalidations += nmsgs; + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } change = ReorderBufferGetChange(rb); @@ -3377,6 +3584,28 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, ReorderBufferQueueChange(rb, xid, lsn, change, false); + /* + * If the message is distributed from other transactions, also add to + * another queue. + * + * XXX: IIUC This must be done only to the toptxns, but is it right? + */ + if (!needs_distribute && !TransactionIdIsValid(txn->toplevel_xid)) + { + ReorderBufferChange *inval_change; + + /* Duplicate the inval change to queue it */ + inval_change = ReorderBufferGetChange(rb); + inval_change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + inval_change->data.inval.ninvalidations = nmsgs; + inval_change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(inval_change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueInval(rb, xid, lsn, inval_change); + } + MemoryContextSwitchTo(oldcontext); } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 110e0b0a044..a8225c1d905 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -945,8 +945,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact { Assert(msgs != NULL); - ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn, - ninvalidations, msgs); + ReorderBufferAddInvalidationsExtended(builder->reorder, txn->xid, lsn, + ninvalidations, msgs, + false); } } } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4c56f219fd8..7bfd199c4b7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -422,6 +422,9 @@ typedef struct ReorderBufferTXN * Private data pointer of the output plugin. */ void *output_plugin_private; + + dlist_head distributed_inval; + uint64 nentries_distr; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -709,6 +712,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, CommandId cmin, CommandId cmax, CommandId combocid); extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); +extern void ReorderBufferAddInvalidationsExtended(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs, + bool needs_distribute); extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations); extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); -- 2.47.1