From cd43ed5f81d929ed21b8d3b015b3625abdfdeeba Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Tue, 9 Sep 2025 02:24:49 -0700 Subject: [PATCH v4 5/7] Use Instrumentation stack for parallel query aggregation in workers --- src/backend/access/brin/brin.c | 6 ++++-- src/backend/access/gin/gininsert.c | 6 ++++-- src/backend/access/nbtree/nbtsort.c | 6 ++++-- src/backend/commands/vacuumparallel.c | 6 ++++-- src/backend/executor/execParallel.c | 6 ++++-- src/backend/executor/instrument.c | 19 +++++++++---------- src/include/executor/instrument.h | 4 ++-- 7 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 2f7d1437919..a36606eed0e 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -2870,6 +2870,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; + Instrumentation *instr; WalUsage *walusage; BufferUsage *bufferusage; int sortmem; @@ -2919,7 +2920,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) tuplesort_attach_shared(sharedsort, seg); /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* * Might as well use reliable figure when doling out maintenance_work_mem @@ -2934,7 +2935,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &bufferusage[ParallelWorkerNumber], &walusage[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 3d71b442aa9..b454934c109 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -2083,6 +2083,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; + Instrumentation *instr; WalUsage *walusage; BufferUsage *bufferusage; int sortmem; @@ -2151,7 +2152,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) tuplesort_attach_shared(sharedsort, seg); /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* * Might as well use reliable figure when doling out maintenance_work_mem @@ -2166,7 +2167,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &bufferusage[ParallelWorkerNumber], &walusage[ParallelWorkerNumber]); index_close(indexRel, indexLockmode); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index e90964080ca..717d6c1a11f 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1750,6 +1750,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Relation indexRel; LOCKMODE heapLockmode; LOCKMODE indexLockmode; + Instrumentation *instr; WalUsage *walusage; BufferUsage *bufferusage; int sortmem; @@ -1825,7 +1826,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) } /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* Perform sorting of spool, and possibly a spool2 */ sortmem = maintenance_work_mem / btshared->scantuplesortstates; @@ -1835,7 +1836,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Report WAL/buffer usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &bufferusage[ParallelWorkerNumber], &walusage[ParallelWorkerNumber]); #ifdef BTREE_BUILD_STATS diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 0feea1d30ec..c5309a015e6 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -994,6 +994,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) PVIndStats *indstats; PVShared *shared; TidStore *dead_items; + Instrumentation *instr; BufferUsage *buffer_usage; WalUsage *wal_usage; int nindexes; @@ -1083,7 +1084,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) error_context_stack = &errcallback; /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* Process indexes to perform vacuum/cleanup */ parallel_vacuum_process_safe_indexes(&pvs); @@ -1091,7 +1092,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); /* Report any remaining cost-based vacuum delay time */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index e87810d292e..061c6a4aa69 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -1434,6 +1434,7 @@ void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; + Instrumentation *instr; BufferUsage *buffer_usage; WalUsage *wal_usage; DestReceiver *receiver; @@ -1494,7 +1495,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) * leader, which also doesn't count buffer accesses and WAL activity that * occur during executor startup. */ - InstrStartParallelQuery(); + instr = InstrStartParallelQuery(); /* * Run the plan. If we specified a tuple bound, be careful not to demand @@ -1510,7 +1511,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + InstrEndParallelQuery(instr, + &buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 37055d01f61..02c33b7dead 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -20,10 +20,8 @@ #include "utils/resowner.h" BufferUsage pgBufferUsage; -static BufferUsage save_pgBufferUsage; WalUsage pgWalUsage; InstrStack *pgInstrStack = NULL; -static WalUsage save_pgWalUsage; static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); static void WalUsageAdd(WalUsage *dst, WalUsage *add); @@ -400,21 +398,22 @@ InstrAggNode(NodeInstrumentation * dst, NodeInstrumentation * add) } /* start instrumentation during parallel executor startup */ -void +Instrumentation * InstrStartParallelQuery(void) { - save_pgBufferUsage = pgBufferUsage; - save_pgWalUsage = pgWalUsage; + Instrumentation *instr = InstrAlloc(1, INSTRUMENT_BUFFERS | INSTRUMENT_WAL); + + InstrStart(instr); + return instr; } /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) +InstrEndParallelQuery(Instrumentation *instr, BufferUsage *bufusage, WalUsage *walusage) { - memset(bufusage, 0, sizeof(BufferUsage)); - BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); - memset(walusage, 0, sizeof(WalUsage)); - WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); + InstrStop(instr, true); + memcpy(bufusage, &instr->stack->bufusage, sizeof(BufferUsage)); + memcpy(walusage, &instr->stack->walusage, sizeof(WalUsage)); } /* accumulate work done by workers in leader's stats */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 30d81fceaaa..adcbc75a757 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -186,8 +186,8 @@ extern void InstrUpdateTupleCount(NodeInstrumentation * instr, double nTuples); extern void InstrEndLoop(NodeInstrumentation * instr); extern void InstrAggNode(NodeInstrumentation * dst, NodeInstrumentation * add); -extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern Instrumentation *InstrStartParallelQuery(void); +extern void InstrEndParallelQuery(Instrumentation *instr, BufferUsage *bufusage, WalUsage *walusage); extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); -- 2.47.1