diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 12edc5772a..70068f6961 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -370,6 +370,8 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); dlist_init(&buffer->txns_by_base_snapshot_lsn); dclist_init(&buffer->catchange_txns); + dlist_init(&buffer->large_txns); + dlist_init(&buffer->mem_txns); /* * Ensure there's no stale data from prior uses of this slot, in case some @@ -1580,6 +1582,10 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) dlist_delete(&txn->node); if (rbtxn_has_catalog_changes(txn)) dclist_delete_from(&rb->catchange_txns, &txn->catchange_node); + if (!dlist_node_is_detached(&txn->large_node)) + dlist_delete_thoroughly(&txn->large_node); + if (!dlist_node_is_detached(&txn->mem_node)) + dlist_delete_thoroughly(&txn->mem_node); /* now remove reference from buffer */ hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found); @@ -1710,6 +1716,11 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR; } + if (!dlist_node_is_detached(&txn->large_node)) + dlist_delete_thoroughly(&txn->large_node); + if (!dlist_node_is_detached(&txn->mem_node)) + dlist_delete_thoroughly(&txn->mem_node); + /* also reset the number of entries in the transaction */ txn->nentries_mem = 0; txn->nentries = 0; @@ -3202,11 +3213,26 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, if (addition) { + /* push the transaction to the on-memory transaction list */ + if (txn->size == 0) + { + Assert(dlist_node_is_detached(&txn->mem_node)); + dlist_push_tail(&rb->mem_txns, &txn->mem_node); + } + txn->size += sz; rb->size += sz; /* Update the total size in the top transaction. */ toptxn->total_size += sz; + + /* + * Push this transaction to the large-txn list if its size is greater + * than 10% of logical_decoding_work_mem limit. + */ + if (dlist_node_is_detached(&txn->large_node) && + txn->size >= (logical_decoding_work_mem * 1024L * 0.1)) + dlist_push_tail(&rb->large_txns, &txn->large_node); } else { @@ -3214,6 +3240,13 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, txn->size -= sz; rb->size -= sz; + /* remove the transaction from the on-memory transaction list */ + if (txn->size == 0) + { + Assert(!dlist_node_is_detached(&txn->mem_node)); + dlist_delete_thoroughly(&txn->mem_node); + } + /* Update the total size in the top transaction. */ toptxn->total_size -= sz; } @@ -3472,38 +3505,43 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) } /* - * Find the largest transaction (toplevel or subxact) to evict (spill to disk). + * Find the transaction to evict (spill to disk). * - * XXX With many subtransactions this might be quite slow, because we'll have - * to walk through all of them. There are some options how we could improve - * that: (a) maintain some secondary structure with transactions sorted by - * amount of changes, (b) not looking for the entirely largest transaction, - * but e.g. for transaction using at least some fraction of the memory limit, - * and (c) evicting multiple transactions at once, e.g. to free a given portion - * of the memory limit (e.g. 50%). + * We use three strategies to pick the transaction to evict. (1) we try to pick + * the (top-level or sub) transaction from the large_txns list. If there is no such + * large transaction, (2) we try to get the oldest transaction. Since we're using + * generational context to record changes (which usually represent 99% of the memory + * used during decoding), evicting transactions in order from the oldest actually + * helps in returning memory to the operating system. Since we use + * toplevel_by_lsn list, only top-level transactions will be picked. In case where + * there is no such transaction further, (3) we pick a transaction that has at + * least one change in memory. */ static ReorderBufferTXN * -ReorderBufferLargestTXN(ReorderBuffer *rb) +ReorderBufferPickTXNToEvict(ReorderBuffer *rb) { - HASH_SEQ_STATUS hash_seq; - ReorderBufferTXNByIdEnt *ent; - ReorderBufferTXN *largest = NULL; + ReorderBufferTXN *txn = NULL; + dlist_iter iter; - hash_seq_init(&hash_seq, rb->by_txn); - while ((ent = hash_seq_search(&hash_seq)) != NULL) + /* Pick the large (top-level or sub) transaction */ + if (!dlist_is_empty(&rb->large_txns)) + return dlist_head_element(ReorderBufferTXN, large_node, &rb->large_txns); + + /* Pick the oldest top-transaction */ + dlist_foreach(iter, &rb->toplevel_by_lsn) { - ReorderBufferTXN *txn = ent->txn; + txn = dlist_container(ReorderBufferTXN, node, iter.cur); - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + if (txn->size > 0) + return txn; } - Assert(largest); - Assert(largest->size > 0); - Assert(largest->size <= rb->size); + /* Pick a transaction that has at least one changes in memory */ + txn = dlist_head_element(ReorderBufferTXN, mem_node, &rb->mem_txns); - return largest; + Assert(txn); + Assert(txn->size > 0); + return txn; } /* @@ -3618,10 +3656,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) else { /* - * Pick the largest transaction (or subtransaction) and evict it - * from memory by serializing it to disk. + * Pick the transaction (or subtransaction) and evict it from memory + * by serializing it to disk. */ - txn = ReorderBufferLargestTXN(rb); + txn = ReorderBufferPickTXNToEvict(rb); /* we know there has to be one, because the size is not zero */ Assert(txn); @@ -3637,6 +3675,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) */ Assert(txn->size == 0); Assert(txn->nentries_mem == 0); + Assert(dlist_node_is_detached(&txn->large_node)); } /* We must be under the memory limit now. */ @@ -3726,6 +3765,15 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); } + Assert(txn->size == 0); + + if (!dlist_node_is_detached(&txn->large_node)) + dlist_delete_thoroughly(&txn->large_node); + + /* transaction does not have any changes on memory */ + if (!dlist_node_is_detached(&txn->mem_node)) + dlist_delete_thoroughly(&txn->mem_node); + Assert(spilled == txn->nentries_mem); Assert(dlist_is_empty(&txn->changes)); txn->nentries_mem = 0; @@ -4097,6 +4145,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* update the decoding stats */ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + if (!dlist_node_is_detached(&txn->large_node)) + dlist_delete_thoroughly(&txn->large_node); + if (!dlist_node_is_detached(&txn->mem_node)) + dlist_delete_thoroughly(&txn->mem_node); + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); @@ -4322,6 +4375,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, restored++; } + /* If the transaction restore changes to memory, push it to the list */ + if (txn->nentries_mem > 0 && !dlist_node_is_detached(&txn->mem_node)) + dlist_push_tail(&rb->mem_txns, &txn->mem_node); + return restored; } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f986101e50..6ab8a398fd 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -420,6 +420,17 @@ typedef struct ReorderBufferTXN */ dlist_node catchange_node; + /* + * A node in the list of large transactions + */ + dlist_node large_node; + + + /* + * A node in the list of in-memory transactions + */ + dlist_node mem_node; + /* * Size of this transaction (changes currently in memory, in bytes). */ @@ -577,6 +588,18 @@ struct ReorderBuffer */ dclist_head catchange_txns; + + /* + * Transactions accounting for more than 10% of logical_decoding_work_mem + * limit (*not* ordered by sizes). + */ + dlist_head large_txns; + + /* + * Transactions having decoded changes in memory. + */ + dlist_head mem_txns; + /* * one-entry sized cache for by_txn. Very frequently the same txn gets * looked up over and over again.