From 5e19887efa57f77b17df45a65a101b84721f665c Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sat, 8 Apr 2023 16:39:51 +0200
Subject: [PATCH 2/2] switch CREATE INDEX for BRIN to parallel scan

---
 src/backend/access/brin/brin.c      | 110 +++++++++++-----------------
 src/backend/access/nbtree/nbtsort.c |   2 +-
 src/backend/access/table/tableam.c  |  26 +++++--
 src/backend/executor/nodeSeqscan.c  |   3 +-
 src/include/access/relscan.h        |   1 +
 src/include/access/tableam.h        |   9 ++-
 6 files changed, 73 insertions(+), 78 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 13d94931efc..796c0ecf06e 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -85,12 +85,8 @@ typedef struct BrinShared
 	 */
 	slock_t		mutex;
 
-	/*
-	 * 
-	 */
+	/* XXX Probably not needed. Identifies the worker. */
 	int			last_worker_id;
-	BlockNumber	next_range;
-	BlockNumber	last_range;
 
 	/*
 	 * Mutable state that is maintained by workers, and reported back to
@@ -105,8 +101,23 @@ typedef struct BrinShared
 	int			nparticipantsdone;
 	double		reltuples;
 	double		indtuples;
+
+	/*
+	 * ParallelTableScanDescData data follows. Can't directly embed here, as
+	 * implementations of the parallel table scan desc interface might need
+	 * stronger alignment.
+	 */
 } BrinShared;
 
+/*
+ * Return pointer to a BrinShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ * MAXALIGN.
+ */
+#define ParallelTableScanFromBrinShared(shared) \
+	(ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
+
 /*
  * Status for leader in parallel index build.
  */
@@ -975,8 +986,20 @@ brinbuildCallbackParallel(Relation index,
 		/* create the index tuple and write it into the temporary file */
 		form_and_spill_tuple(state);
 
-		/* set state to correspond to the next range */
-		state->bs_currRangeStart += state->bs_pagesPerRange;
+		/*
+		 * set state to correspond to the next range
+		 *
+		 * XXX This has the issue that it skips ranges summarized by other
+		 * workers, but it also skips empty ranges that should have been
+		 * summarized. We'd need to either make the workers aware which
+		 * chunk they are actually processing (which is currently known
+		 * only in the ParallelBlockTableScan bit). Or we could ignore it
+		 * here, and then decide it while "merging" results from workers
+		 * (if there's no entry for the range, it had to be empty so we
+		 * just add an empty one).
+		 */
+		while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+			state->bs_currRangeStart += state->bs_pagesPerRange;
 
 		/* re-initialize state for it */
 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
@@ -2243,8 +2266,10 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 
 	/* Track work assigned to workers etc. */
 	brinshared->last_worker_id = 0;
-	brinshared->next_range = 0;
-	brinshared->last_range = RelationGetNumberOfBlocks(heap);
+
+	table_parallelscan_initialize(heap,
+								  ParallelTableScanFromBrinShared(brinshared),
+								  snapshot, brinshared->pagesPerRange);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -2452,6 +2477,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinShared *brinshared,
 							  Relation heap, Relation index, int sortmem,
 							  bool progress)
 {
+	TableScanDesc	scan;
 	double		reltuples;
 	IndexInfo  *indexInfo;
 	char		name[MAXPGPATH];
@@ -2468,68 +2494,16 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinShared *brinshared,
 
 	state->bs_file = BufFileCreateFileSet(&brinshared->fileset.fs, name);
 
-	/* Get chunks of the table, do TID Scans and build the ranges */
-	while (true)
-	{
-		TableScanDesc	scan;
-		BlockNumber		startBlock,
-						lastBlock,
-						chunkBlocks;
-		ItemPointerData	mintid,
-						maxtid;
+	scan = table_beginscan_parallel(heap,
+									ParallelTableScanFromBrinShared(brinshared));
 
-		/*
-		 * Acquire larger chunks of data - this matters especially for low
-		 * pages_per_range settings (e.g. set to 1). Otherwise there would
-		 * be a lot of trashing and overhead with multiple workers.
-		 *
-		 * Not sure where's the sweet spot. Maybe tie this to the prefetching
-		 * too (maintenance_effective_io_concucrrency)?
-		 *
-		 * FIXME The chunkBlocks needs to be a multiple of bs_pagesPerRange.
-		 */
-		chunkBlocks = Max(128, state->bs_pagesPerRange);
-
-		SpinLockAcquire(&brinshared->mutex);
-		startBlock = brinshared->next_range;
-		lastBlock = brinshared->last_range;
-		brinshared->next_range += chunkBlocks;
-		SpinLockRelease(&brinshared->mutex);
-
-		state->bs_currRangeStart = startBlock;
-
-		/* did we reach the end of the heap relation? */
-		if (startBlock > lastBlock)
-			break;
-
-		/* re-initialize state for it */
-		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+									   brinbuildCallbackParallel, state, scan);
 
-		ItemPointerSet(&mintid, startBlock, 0);
-		ItemPointerSet(&maxtid, startBlock + (chunkBlocks - 1),
-					   MaxHeapTuplesPerPage);
+	/* spill the last tuple */
+	form_and_spill_tuple(state);
 
-		/* start tidscan to read the relevant part of the table */
-		scan = table_beginscan_tidrange(heap, SnapshotAny,	// FIXME which snapshot to use?
-										&mintid, &maxtid);
-
-#ifdef USE_PREFETCH
-		/* do prefetching (this prefetches the whole range. not sure that's good) */
-		for (BlockNumber blkno = startBlock; blkno < startBlock + chunkBlocks; blkno++)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, blkno);
-#endif
-
-		reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
-										   brinbuildCallbackParallel, state, scan);
-
-		/* spill the last tuple */
-		form_and_spill_tuple(state);
-
-		state->bs_reltuples += reltuples;
-
-		/* set state to invalid range */
-		state->bs_currRangeStart = InvalidBlockNumber;
-	}
+	state->bs_reltuples += reltuples;
 
 	/*
 	 * Done.  Record ambuild statistics.
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 6ad3f3c54d5..69ec9d5fe9c 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot);
+								  snapshot, InvalidBlockNumber);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 771438c8cec..834cc9cd7c6 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot)
+							  Snapshot snapshot, BlockNumber chunk_factor)
 {
-	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
+	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
@@ -395,16 +395,21 @@ table_block_parallelscan_estimate(Relation rel)
 }
 
 Size
-table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor)
 {
 	ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
 
 	bpscan->base.phs_relid = RelationGetRelid(rel);
 	bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
-	/* compare phs_syncscan initialization to similar logic in initscan */
+	bpscan->phs_chunk_factor = chunk_factor;
+	/* compare phs_syncscan initialization to similar logic in initscan
+	 *
+	 * Disable sync scans if the chunk factor is set (valid block number).
+	 */
 	bpscan->base.phs_syncscan = synchronize_seqscans &&
 		!RelationUsesLocalBuffers(rel) &&
-		bpscan->phs_nblocks > NBuffers / 4;
+		(bpscan->phs_nblocks > NBuffers / 4) &&
+		!BlockNumberIsValid(bpscan->phs_chunk_factor);
 	SpinLockInit(&bpscan->phs_mutex);
 	bpscan->phs_startblock = InvalidBlockNumber;
 	pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
@@ -459,6 +464,17 @@ table_block_parallelscan_startblock_init(Relation rel,
 	pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
 									  PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
 
+	/*
+	 * If the chunk size factor is set, we need to make sure the chunk size is
+	 * a multiple of that value.
+	 */
+	if (pbscan->phs_chunk_factor != InvalidBlockNumber)
+	{
+		int		nchunks = (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
+
+		pbscanwork->phsw_chunk_size = Max(1, nchunks) * pbscan->phs_chunk_factor;
+	}
+
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 4da0f28f7ba..e017f05f780 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -274,7 +274,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot);
+								  estate->es_snapshot,
+								  InvalidBlockNumber);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index d03360eac04..72a20d882f5 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -79,6 +79,7 @@ typedef struct ParallelBlockTableScanDescData
 	BlockNumber phs_nblocks;	/* # blocks in relation at start of scan */
 	slock_t		phs_mutex;		/* mutual exclusion for setting startblock */
 	BlockNumber phs_startblock; /* starting block number */
+	BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */
 	pg_atomic_uint64 phs_nallocated;	/* number of blocks allocated to
 										 * workers so far. */
 }			ParallelBlockTableScanDescData;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 230bc39cc0e..82297546e27 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -390,7 +390,8 @@ typedef struct TableAmRoutine
 	 * relation.
 	 */
 	Size		(*parallelscan_initialize) (Relation rel,
-											ParallelTableScanDesc pscan);
+											ParallelTableScanDesc pscan,
+											BlockNumber chunk_factor);
 
 	/*
 	 * Reinitialize `pscan` for a new scan. `rel` will be the same relation as
@@ -1148,7 +1149,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot);
+										  Snapshot snapshot,
+										  BlockNumber chunk_factor);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -2064,7 +2066,8 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 
 extern Size table_block_parallelscan_estimate(Relation rel);
 extern Size table_block_parallelscan_initialize(Relation rel,
-												ParallelTableScanDesc pscan);
+												ParallelTableScanDesc pscan,
+												BlockNumber chunk_factor);
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
-- 
2.40.1

