From 660ee4b9f7ba6c08cc8bc00b18bdbe6c83eb581b Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 6 May 2023 17:13:31 +1200
Subject: [PATCH] Unlink PHJ temporary files proactively when repartitioning.

XXX Draft

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 5fd1c5553b..1baf2be815 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -73,6 +73,7 @@ static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch
 static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable);
 static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable);
 static void ExecParallelHashRepartitionRest(HashJoinTable hashtable);
+static void ExecParallelHashDeleteOldPartitions(HashJoinTable hashtable);
 static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable,
 													 dsa_pointer *shared);
 static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
@@ -1217,6 +1218,9 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 			/* Wait for the above to be finished. */
 			BarrierArriveAndWait(&pstate->grow_batches_barrier,
 								 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
+			/* It's now safe to free the previous partitions on disk. */
+			ExecParallelHashDeleteOldPartitions(hashtable);
+
 			/* Fall through. */
 
 		case PHJ_GROW_BATCHES_DECIDE:
@@ -1438,6 +1442,34 @@ ExecParallelHashMergeCounters(HashJoinTable hashtable)
 	LWLockRelease(&pstate->lock);
 }
 
+/*
+ * After all attached participants have finished repartitioning, it is safe
+ * to unlink the files holding the previous generation of batches.
+ */
+static void
+ExecParallelHashDeleteOldPartitions(HashJoinTable hashtable)
+{
+	ParallelHashJoinState *pstate = hashtable->parallel_state;
+	int			old_nbatch = pstate->old_nbatch;
+	ParallelHashJoinBatch *old_batches;
+
+	old_batches = (ParallelHashJoinBatch *)
+		dsa_get_address(hashtable->area, pstate->old_batches);
+	for (int i = 1; i < old_nbatch; ++i)
+	{
+		ParallelHashJoinBatch *shared =
+		NthParallelHashJoinBatch(old_batches, i);
+		SharedTuplestoreAccessor *accessor;
+
+		accessor = sts_attach(ParallelHashJoinBatchInner(shared),
+							  ParallelWorkerNumber + 1,
+							  &pstate->fileset);
+		sts_dispose(accessor);
+		/* XXX free */
+	}
+
+}
+
 /*
  * ExecHashIncreaseNumBuckets
  *		increase the original number of buckets in order to reduce
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 0831249159..ceff3b3313 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -584,6 +584,23 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
 	return NULL;
 }
 
+/*
+ * Free any disk space consumed by this process.  After this, data should not
+ * be read from this SharedTuplestore by any process, so it must be known that
+ * all readers have finished and will not try to read again.  The reason we
+ * unlink only files created by this backend is that the file size limit
+ * mechanism is per-process, but the counters could go negative if one process
+ * freed more than it had consumed.
+ */
+void
+sts_dispose(SharedTuplestoreAccessor *accessor)
+{
+	char		name[MAXPGPATH];
+
+	sts_filename(name, accessor, accessor->participant);
+	BufFileDeleteFileSet(&accessor->fileset->fs, name, true);
+}
+
 /*
  * Create the name used for the BufFile that a given participant will write.
  */
diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h
index c7075ad055..34f7d27198 100644
--- a/src/include/utils/sharedtuplestore.h
+++ b/src/include/utils/sharedtuplestore.h
@@ -58,4 +58,6 @@ extern void sts_puttuple(SharedTuplestoreAccessor *accessor,
 extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor,
 										   void *meta_data);
 
+extern void sts_dispose(SharedTuplestoreAccessor *accessor);
+
 #endif							/* SHAREDTUPLESTORE_H */
-- 
2.40.1

