From 7699a9d94ca111669e331c24e8aeccf67a6ec4ba Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 6 May 2023 17:13:31 +1200 Subject: [PATCH v2 1/2] Unlink PHJ temporary files proactively when repartitioning. XXX Draft --- src/backend/executor/nodeHash.c | 32 +++++++++++++++++++++++ src/backend/utils/sort/sharedtuplestore.c | 17 ++++++++++++ src/include/utils/sharedtuplestore.h | 2 ++ 3 files changed, 51 insertions(+) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index f5d3edb90e2..65668c7d9ad 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -71,6 +71,7 @@ static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable); static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable); static void ExecParallelHashRepartitionRest(HashJoinTable hashtable); +static void ExecParallelHashDeleteOldPartitions(HashJoinTable hashtable); static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared); static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, @@ -1349,6 +1350,9 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) /* Wait for the above to be finished. */ BarrierArriveAndWait(&pstate->grow_batches_barrier, WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION); + /* It's now safe to free the previous partitions on disk. */ + ExecParallelHashDeleteOldPartitions(hashtable); + /* Fall through. */ case PHJ_GROW_BATCHES_DECIDE: @@ -1577,6 +1581,34 @@ ExecParallelHashMergeCounters(HashJoinTable hashtable) LWLockRelease(&pstate->lock); } +/* + * After all attached participants have finished repartitioning, it is safe + * to unlink the files holding the previous generation of batches. + */ +static void +ExecParallelHashDeleteOldPartitions(HashJoinTable hashtable) +{ + ParallelHashJoinState *pstate = hashtable->parallel_state; + int old_nbatch = pstate->old_nbatch; + ParallelHashJoinBatch *old_batches; + + old_batches = (ParallelHashJoinBatch *) + dsa_get_address(hashtable->area, pstate->old_batches); + for (int i = 1; i < old_nbatch; ++i) + { + ParallelHashJoinBatch *shared = + NthParallelHashJoinBatch(old_batches, i); + SharedTuplestoreAccessor *accessor; + + accessor = sts_attach(ParallelHashJoinBatchInner(shared), + ParallelWorkerNumber + 1, + &pstate->fileset); + sts_dispose(accessor); + /* XXX free */ + } + +} + /* * ExecHashIncreaseNumBuckets * increase the original number of buckets in order to reduce diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 8f35a255263..894139752c6 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -590,6 +590,23 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) return NULL; } +/* + * Free any disk space consumed by this process. After this, data should not + * be read from this SharedTuplestore by any process, so it must be known that + * all readers have finished and will not try to read again. The reason we + * unlink only files created by this backend is that the file size limit + * mechanism is per-process, but the counters could go negative if one process + * freed more than it had consumed. + */ +void +sts_dispose(SharedTuplestoreAccessor *accessor) +{ + char name[MAXPGPATH]; + + sts_filename(name, accessor, accessor->participant); + BufFileDeleteFileSet(&accessor->fileset->fs, name, true); +} + /* * Create the name used for the BufFile that a given participant will write. */ diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h index 48bfea4d583..4ab84e7aa70 100644 --- a/src/include/utils/sharedtuplestore.h +++ b/src/include/utils/sharedtuplestore.h @@ -58,4 +58,6 @@ extern void sts_puttuple(SharedTuplestoreAccessor *accessor, extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data); +extern void sts_dispose(SharedTuplestoreAccessor *accessor); + #endif /* SHAREDTUPLESTORE_H */ -- 2.51.2