From 0d37a829e768772ef3e9c080f96333e24cdd43b7 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sun, 26 Mar 2023 00:44:01 +0100
Subject: [PATCH 1/2] parallel CREATE INDEX for BRIN

---
 src/backend/access/brin/brin.c        | 714 +++++++++++++++++++++++++-
 src/backend/access/transam/parallel.c |   4 +
 src/backend/catalog/index.c           |   3 +-
 src/include/access/brin.h             |   3 +
 4 files changed, 719 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 3c6a956eaa3..13d94931efc 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -31,8 +31,10 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
+#include "storage/buffile.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
+#include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -41,6 +43,98 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xA000000000000004)
+
+
+/*
+ * Status for index builds performed in parallel.  This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct BrinShared
+{
+	/*
+	 * These fields are not modified during the build.  They primarily exist for
+	 * the benefit of worker processes that need to create state corresponding
+	 * to that used by the leader.
+	 */
+	Oid			heaprelid;
+	Oid			indexrelid;
+	bool		isconcurrent;
+	BlockNumber	pagesPerRange;
+
+	/*
+	 * workersdonecv is used to monitor the progress of workers.  All parallel
+	 * participants must indicate that they are done before leader can use
+	 * results built by the workers (and before leader can write the data into
+	 * the index).
+	 */
+	ConditionVariable workersdonecv;
+
+	/* Used to pass built BRIN tuples from workers to leader (for insert). */
+	SharedFileSet	fileset;
+
+	/*
+	 * mutex protects all fields before heapdesc.
+	 *
+	 * These fields contain status information of interest to BRIN index
+	 * builds that must work just the same when an index is built in parallel.
+	 */
+	slock_t		mutex;
+
+	/*
+	 * 
+	 */
+	int			last_worker_id;
+	BlockNumber	next_range;
+	BlockNumber	last_range;
+
+	/*
+	 * Mutable state that is maintained by workers, and reported back to
+	 * leader at end of the scans.
+	 *
+	 * nparticipantsdone is number of worker processes finished.
+	 *
+	 * reltuples is the total number of input heap tuples.
+	 *
+	 * indtuples is the total number of tuples that made it into the index.
+	 */
+	int			nparticipantsdone;
+	double		reltuples;
+	double		indtuples;
+} BrinShared;
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BrinLeader
+{
+	/* parallel context itself */
+	ParallelContext *pcxt;
+
+	/*
+	 * nparticipantworkers is the exact number of worker processes successfully
+	 * launched, plus one leader process if it participates as a worker (only
+	 * DISABLE_LEADER_PARTICIPATION builds avoid leader participating as a
+	 * worker).
+	 */
+	int			nparticipantworkers;
+
+	/*
+	 * Leader process convenience pointers to shared state (leader avoids TOC
+	 * lookups).
+	 *
+	 * brinshared is the shared state for entire build.  snapshot is the snapshot
+	 * used by the scan iff an MVCC snapshot is required.
+	 */
+	BrinShared	   *brinshared;
+	Snapshot	snapshot;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+} BrinLeader;
 
 /*
  * We use a BrinBuildState during initial construction of a BRIN index.
@@ -50,12 +144,22 @@ typedef struct BrinBuildState
 {
 	Relation	bs_irel;
 	int			bs_numtuples;
+	int			bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
 	BlockNumber bs_currRangeStart;
 	BrinRevmap *bs_rmAccess;
 	BrinDesc   *bs_bdesc;
 	BrinMemTuple *bs_dtuple;
+
+	/*
+	 * bs_leader is only present when a parallel index build is performed, and
+	 * only in the leader process. (Actually, only the leader has a
+	 * BrinBuildState.)
+	 */
+	BrinLeader *bs_leader;
+	int			bs_worker_id;
+	BufFile	   *bs_file;
 } BrinBuildState;
 
 /*
@@ -76,6 +180,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
 						  bool include_partial, double *numSummarized, double *numExisting);
 static void form_and_insert_tuple(BrinBuildState *state);
+static void form_and_spill_tuple(BrinBuildState *state);
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
 						 BrinTuple *b);
 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
@@ -83,6 +188,18 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
 								BrinMemTuple *dtup, Datum *values, bool *nulls);
 static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
 
+/* parallel index builds */
+static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+								 bool isconcurrent, int request);
+static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
+static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
+											   Relation heap, Relation index);
+static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
+										  BrinShared *brinshared,
+										  Relation heap, Relation index,
+										  int sortmem, bool progress);
+
 /*
  * BRIN handler function: return IndexAmRoutine with access method parameters
  * and callbacks.
@@ -822,6 +939,54 @@ brinbuildCallback(Relation index,
 							   values, isnull);
 }
 
+/*
+ * A version of the callback, used by parallel index builds. The main difference
+ * is that instead of writing the BRIN tuples into the index, we write them into
+ * a temporary file, and leave the insertion up to the leader (which may reorder
+ * them a bit etc.).
+ */
+static void
+brinbuildCallbackParallel(Relation index,
+						  ItemPointer tid,
+						  Datum *values,
+						  bool *isnull,
+						  bool tupleIsAlive,
+						  void *brstate)
+{
+	BrinBuildState *state = (BrinBuildState *) brstate;
+	BlockNumber thisblock;
+
+	thisblock = ItemPointerGetBlockNumber(tid);
+
+	/*
+	 * If we're in a block that belongs to a future range, summarize what
+	 * we've got and start afresh.  Note the scan might have skipped many
+	 * pages, if they were devoid of live tuples; make sure to insert index
+	 * tuples for those too.
+	 */
+	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+	{
+
+		BRIN_elog((DEBUG2,
+				   "brinbuildCallback: completed a range: %u--%u",
+				   state->bs_currRangeStart,
+				   state->bs_currRangeStart + state->bs_pagesPerRange));
+
+		/* create the index tuple and write it into the temporary file */
+		form_and_spill_tuple(state);
+
+		/* set state to correspond to the next range */
+		state->bs_currRangeStart += state->bs_pagesPerRange;
+
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	}
+
+	/* Accumulate the current tuple into the running state */
+	(void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+							   values, isnull);
+}
+
 /*
  * brinbuild() -- build a new BRIN index.
  */
@@ -883,18 +1048,46 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
+	/*
+	 * Attempt to launch parallel worker scan when required
+	 *
+	 * XXX plan_create_index_workers makes the number of workers dependent on
+	 * maintenance_work_mem, requiring 32MB for each worker. That makes sense
+	 * for btree, but not for BRIN, which can do away with much less memory.
+	 * So maybe make that somehow less strict, optionally?
+	 */
+	if (indexInfo->ii_ParallelWorkers > 0)
+		_brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
+							 indexInfo->ii_ParallelWorkers);
+
 	/*
 	 * Now scan the relation.  No syncscan allowed here because we want the
 	 * heap blocks in physical order.
 	 */
-	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
-									   brinbuildCallback, (void *) state, NULL);
 
-	/* process the final batch */
-	form_and_insert_tuple(state);
+	/* no parallel index build, just do the usual thing */
+	if (state->bs_leader == NULL)
+	{
+		reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
+										   brinbuildCallback, (void *) state, NULL);
+
+		/* process the final batch */
+		form_and_insert_tuple(state);
+
+		/* track the number of relation tuples */
+		state->bs_reltuples = reltuples;
+	}
+
+	/*
+	 * In parallel mode, wait for workers to complete, and then read
+	 * the tuples from the temporary files and insert them into the index.
+	 */
+	if (state->bs_leader)
+		_brin_end_parallel(state->bs_leader, state);
 
 	/* release resources */
 	idxtuples = state->bs_numtuples;
+	reltuples = state->bs_reltuples;
 	brinRevmapTerminate(state->bs_rmAccess);
 	terminate_brin_buildstate(state);
 
@@ -1299,12 +1492,15 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
 
 	state->bs_irel = idxRel;
 	state->bs_numtuples = 0;
+	state->bs_reltuples = 0;
 	state->bs_currentInsertBuf = InvalidBuffer;
 	state->bs_pagesPerRange = pagesPerRange;
 	state->bs_currRangeStart = 0;
 	state->bs_rmAccess = revmap;
 	state->bs_bdesc = brin_build_desc(idxRel);
 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+	state->bs_leader = NULL;
+	state->bs_worker_id = 0;
 
 	return state;
 }
@@ -1597,6 +1793,31 @@ form_and_insert_tuple(BrinBuildState *state)
 	pfree(tup);
 }
 
+/*
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and write it to a temporary file (leader will insert it into the
+ * index later).
+ */
+static void
+form_and_spill_tuple(BrinBuildState *state)
+{
+	BrinTuple  *tup;
+	Size		size;
+
+	Assert(state->bs_file);
+
+	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+						  state->bs_dtuple, &size);
+
+	BufFileWrite(state->bs_file, &size, sizeof(Size));
+	BufFileWrite(state->bs_file, &state->bs_currRangeStart, sizeof(BlockNumber));
+	BufFileWrite(state->bs_file, tup, size);
+
+	state->bs_numtuples++;
+
+	pfree(tup);
+}
+
 /*
  * Given two deformed tuples, adjust the first one so that it's consistent
  * with the summary values in both.
@@ -1916,3 +2137,488 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
 
 	return true;
 }
+
+static void
+_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+					 bool isconcurrent, int request)
+{
+	ParallelContext *pcxt;
+	Snapshot	snapshot;
+	Size		estbrinshared;
+	BrinShared   *brinshared;
+	BrinLeader   *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	bool		leaderparticipates = true;
+	int			querylen;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+	leaderparticipates = false;
+#endif
+
+	/*
+	 * Enter parallel mode, and create context for parallel build of brin
+	 * index
+	 */
+	EnterParallelMode();
+	Assert(request > 0);
+	pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
+								 request);
+
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
+	 * concurrent build, we take a regular MVCC snapshot and index whatever's
+	 * live according to that.
+	 */
+	if (!isconcurrent)
+		snapshot = SnapshotAny;
+	else
+		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+	/*
+	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
+	 */
+	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
+
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/*
+	 * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+	 * and PARALLEL_KEY_BUFFER_USAGE.
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgWalUsage or
+	 * pgBufferUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+
+	/* Everyone's had a chance to ask for space, so now create the DSM */
+	InitializeParallelDSM(pcxt);
+
+	/* If no DSM segment was available, back out (do serial build) */
+	if (pcxt->seg == NULL)
+	{
+		if (IsMVCCSnapshot(snapshot))
+			UnregisterSnapshot(snapshot);
+		DestroyParallelContext(pcxt);
+		ExitParallelMode();
+		return;
+	}
+
+	/* Store shared build state, for which we reserved space */
+	brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
+	/* Initialize immutable state */
+	brinshared->heaprelid = RelationGetRelid(heap);
+	brinshared->indexrelid = RelationGetRelid(index);
+	brinshared->isconcurrent = isconcurrent;
+	brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+	ConditionVariableInit(&brinshared->workersdonecv);
+	SpinLockInit(&brinshared->mutex);
+
+	/* */
+	SharedFileSetInit(&brinshared->fileset, pcxt->seg);
+
+	/* Initialize mutable state */
+	brinshared->nparticipantsdone = 0;
+	brinshared->reltuples = 0.0;
+	brinshared->indtuples = 0.0;
+
+	/* Track work assigned to workers etc. */
+	brinshared->last_worker_id = 0;
+	brinshared->next_range = 0;
+	brinshared->last_range = RelationGetNumberOfBlocks(heap);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
+
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+	}
+
+	/*
+	 * Allocate space for each worker's WalUsage and BufferUsage; no need to
+	 * initialize.
+	 */
+	walusage = shm_toc_allocate(pcxt->toc,
+								mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+	bufferusage = shm_toc_allocate(pcxt->toc,
+								   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+
+	/* Launch workers, saving status for leader/caller */
+	LaunchParallelWorkers(pcxt);
+	brinleader->pcxt = pcxt;
+	brinleader->nparticipantworkers = pcxt->nworkers_launched;
+	if (leaderparticipates)
+		brinleader->nparticipantworkers++;
+	brinleader->brinshared = brinshared;
+	brinleader->snapshot = snapshot;
+	brinleader->walusage = walusage;
+	brinleader->bufferusage = bufferusage;
+
+	/* If no workers were successfully launched, back out (do serial build) */
+	if (pcxt->nworkers_launched == 0)
+	{
+		_brin_end_parallel(brinleader, NULL);
+		return;
+	}
+
+	/* Save leader state now that it's clear build will be parallel */
+	buildstate->bs_leader = brinleader;
+
+	/* Join heap scan ourselves */
+	if (leaderparticipates)
+		_brin_leader_participate_as_worker(buildstate, heap, index);
+
+	/*
+	 * Caller needs to wait for all launched workers when we return.  Make
+	 * sure that the failure-to-start case will not hang forever.
+	 */
+	WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
+{
+	int			i;
+	BrinShared *brinshared = brinleader->brinshared;
+
+	/* Shutdown worker processes */
+	WaitForParallelWorkersToFinish(brinleader->pcxt);
+
+	if (!state)
+		return;
+
+	/* copy the data into leader state (we have to wait for the workers ) */
+	state->bs_reltuples = brinshared->reltuples;
+	state->bs_numtuples = brinshared->indtuples;
+
+	/*
+	 * XXX maybe we should sort the ranges by rangeStart? That'd give us index
+	 * that is cheaper to walk sequentially, because we'd not have any page
+	 * misses (mostly getting data from the same page as before). Although the
+	 * index should be pretty small in general, and thus cached. OTOH each
+	 * worker should produce tuples in the right order, so we could just merge
+	 * sort them.
+	 *
+	 * XXX Alternatively, we could arrange the build so that the workers read
+	 * a continuous chunk of the table. For example, with K workers we might
+	 * leave the first 1/K to the first worker, then 1/K to the second etc.
+	 * The we would not need to reorder anything, we would just read the
+	 * results for all workers.
+	 *
+	 * XXX That's also mean we don't do many small TID scans, as now.
+	 *
+	 * The problem is we don't know how many workers will be started while
+	 * determining this, but maybe we could postpone that decision somehow?
+	 * We'd have to wait for all the launched workers to attach, I guess.
+	 */
+	for (i = 1; i <= brinshared->last_worker_id; i++)
+	{
+		BufFile	   *f;
+		char		name[MAXPGPATH];
+		int64		fsize;
+		int64		fpos = 0;
+
+		snprintf(name, MAXPGPATH, "tuples.%d", i);
+
+		f = BufFileOpenFileSet(&brinshared->fileset.fs, name, O_RDONLY, false);
+		fsize = BufFileSize(f);
+
+		while (fpos < fsize)
+		{
+			Size		size;
+			BlockNumber	rangeStart;
+			BrinTuple  *tup;
+
+			if (BufFileRead(f, &size, sizeof(Size)) != sizeof(Size))
+				elog(ERROR, "failed read");
+
+			if (BufFileRead(f, &rangeStart, sizeof(BlockNumber)) != sizeof(BlockNumber))
+				elog(ERROR, "failed read");
+
+			tup = (BrinTuple *) palloc(size);
+
+			if (BufFileRead(f, tup, size) != size)
+				elog(ERROR, "failed read");
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf, rangeStart, tup, size);
+
+			fpos += sizeof(Size) + sizeof(BlockNumber) + size;
+		}
+
+		BufFileClose(f);
+	}
+
+	/*
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
+	 */
+	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+
+	/* Free last reference to MVCC snapshot, if one was used */
+	if (IsMVCCSnapshot(brinleader->snapshot))
+		UnregisterSnapshot(brinleader->snapshot);
+	DestroyParallelContext(brinleader->pcxt);
+	ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * brin index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+{
+	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+	return add_size(BUFFERALIGN(sizeof(BrinShared)),
+					table_parallelscan_estimate(heap, snapshot));
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
+{
+	BrinLeader *brinleader = buildstate->bs_leader;
+	int			sortmem;
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinleader->nparticipantworkers;
+
+	/* Perform work common to all participants */
+	_brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
+								  heap, index, sortmem, true);
+
+	/* insert the last range */
+	form_and_spill_tuple(buildstate);
+
+	BufFileClose(buildstate->bs_file);
+	buildstate->bs_file = NULL;
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds).  All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_brin_parallel_scan_and_build(BrinBuildState *state, BrinShared *brinshared,
+							  Relation heap, Relation index, int sortmem,
+							  bool progress)
+{
+	double		reltuples;
+	IndexInfo  *indexInfo;
+	char		name[MAXPGPATH];
+
+	/* Join parallel scan */
+	indexInfo = BuildIndexInfo(index);
+	indexInfo->ii_Concurrent = brinshared->isconcurrent;
+
+	SpinLockAcquire(&brinshared->mutex);
+	state->bs_worker_id = (++brinshared->last_worker_id);
+	SpinLockRelease(&brinshared->mutex);
+
+	snprintf(name, MAXPGPATH, "tuples.%d", state->bs_worker_id);
+
+	state->bs_file = BufFileCreateFileSet(&brinshared->fileset.fs, name);
+
+	/* Get chunks of the table, do TID Scans and build the ranges */
+	while (true)
+	{
+		TableScanDesc	scan;
+		BlockNumber		startBlock,
+						lastBlock,
+						chunkBlocks;
+		ItemPointerData	mintid,
+						maxtid;
+
+		/*
+		 * Acquire larger chunks of data - this matters especially for low
+		 * pages_per_range settings (e.g. set to 1). Otherwise there would
+		 * be a lot of trashing and overhead with multiple workers.
+		 *
+		 * Not sure where's the sweet spot. Maybe tie this to the prefetching
+		 * too (maintenance_effective_io_concucrrency)?
+		 *
+		 * FIXME The chunkBlocks needs to be a multiple of bs_pagesPerRange.
+		 */
+		chunkBlocks = Max(128, state->bs_pagesPerRange);
+
+		SpinLockAcquire(&brinshared->mutex);
+		startBlock = brinshared->next_range;
+		lastBlock = brinshared->last_range;
+		brinshared->next_range += chunkBlocks;
+		SpinLockRelease(&brinshared->mutex);
+
+		state->bs_currRangeStart = startBlock;
+
+		/* did we reach the end of the heap relation? */
+		if (startBlock > lastBlock)
+			break;
+
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+
+		ItemPointerSet(&mintid, startBlock, 0);
+		ItemPointerSet(&maxtid, startBlock + (chunkBlocks - 1),
+					   MaxHeapTuplesPerPage);
+
+		/* start tidscan to read the relevant part of the table */
+		scan = table_beginscan_tidrange(heap, SnapshotAny,	// FIXME which snapshot to use?
+										&mintid, &maxtid);
+
+#ifdef USE_PREFETCH
+		/* do prefetching (this prefetches the whole range. not sure that's good) */
+		for (BlockNumber blkno = startBlock; blkno < startBlock + chunkBlocks; blkno++)
+			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, blkno);
+#endif
+
+		reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+										   brinbuildCallbackParallel, state, scan);
+
+		/* spill the last tuple */
+		form_and_spill_tuple(state);
+
+		state->bs_reltuples += reltuples;
+
+		/* set state to invalid range */
+		state->bs_currRangeStart = InvalidBlockNumber;
+	}
+
+	/*
+	 * Done.  Record ambuild statistics.
+	 */
+	SpinLockAcquire(&brinshared->mutex);
+	brinshared->nparticipantsdone++;
+	brinshared->reltuples += state->bs_reltuples;
+	brinshared->indtuples += state->bs_numtuples;
+	SpinLockRelease(&brinshared->mutex);
+
+	/* Notify leader */
+	ConditionVariableSignal(&brinshared->workersdonecv);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+	char	   *sharedquery;
+	BrinShared *brinshared;
+	BrinBuildState *buildstate;
+	Relation	heapRel;
+	Relation	indexRel;
+	LOCKMODE	heapLockmode;
+	LOCKMODE	indexLockmode;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	int			sortmem;
+
+	/*
+	 * The only possible status flag that can be set to the parallel worker is
+	 * PROC_IN_SAFE_IC.
+	 */
+	Assert((MyProc->statusFlags == 0) ||
+		   (MyProc->statusFlags == PROC_IN_SAFE_IC));
+
+	/* Set debug_query_string for individual workers first */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+
+	/* Report the query string from leader */
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/* Look up brin shared state */
+	brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
+
+	/* Open relations using lock modes known to be obtained by index.c */
+	if (!brinshared->isconcurrent)
+	{
+		heapLockmode = ShareLock;
+		indexLockmode = AccessExclusiveLock;
+	}
+	else
+	{
+		heapLockmode = ShareUpdateExclusiveLock;
+		indexLockmode = RowExclusiveLock;
+	}
+
+	/* Open relations within worker */
+	heapRel = table_open(brinshared->heaprelid, heapLockmode);
+	indexRel = index_open(brinshared->indexrelid, indexLockmode);
+
+	buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange);
+
+	/* Attach to the shared fileset. */
+	SharedFileSetAttach(&brinshared->fileset, seg);
+
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
+	/* FIXME tie this to number of participants, somehow */
+	sortmem = maintenance_work_mem / 2;
+	_brin_parallel_scan_and_build(buildstate, brinshared,
+								  heapRel, indexRel, sortmem, false);
+
+	/* insert the last range */
+	form_and_spill_tuple(buildstate);
+
+	BufFileClose(buildstate->bs_file);
+	buildstate->bs_file = NULL;
+
+	/* Report WAL/buffer usage during parallel execution */
+	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+						  &walusage[ParallelWorkerNumber]);
+
+	index_close(indexRel, indexLockmode);
+	table_close(heapRel, heapLockmode);
+}
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 2b8bc2f58dd..72086212f47 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/brin.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -144,6 +145,9 @@ static const struct
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
 	},
+	{
+		"_brin_parallel_build_main", _brin_parallel_build_main
+	},
 	{
 		"parallel_vacuum_main", parallel_vacuum_main
 	}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 352e43d0e61..5bdd025bb25 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2980,7 +2980,8 @@ index_build(Relation heapRelation,
 	 * Note that planner considers parallel safety for us.
 	 */
 	if (parallel && IsNormalProcessingMode() &&
-		indexRelation->rd_rel->relam == BTREE_AM_OID)
+		(indexRelation->rd_rel->relam == BTREE_AM_OID ||
+		 indexRelation->rd_rel->relam == BRIN_AM_OID))
 		indexInfo->ii_ParallelWorkers =
 			plan_create_index_workers(RelationGetRelid(heapRelation),
 									  RelationGetRelid(indexRelation));
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index ed66f1b3d51..3451ecb211f 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -11,6 +11,7 @@
 #define BRIN_H
 
 #include "nodes/execnodes.h"
+#include "storage/shm_toc.h"
 #include "utils/relcache.h"
 
 
@@ -52,4 +53,6 @@ typedef struct BrinStatsData
 
 extern void brinGetStats(Relation index, BrinStatsData *stats);
 
+extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc);
+
 #endif							/* BRIN_H */
-- 
2.40.1

