From 3e5fabc6e5a30bb16ec11fdc7fcf65880e172d0c Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Tue, 7 Nov 2023 17:04:28 +0100
Subject: [PATCH v3 1/4] parallel CREATE INDEX for BRIN v2

---
 src/backend/access/brin/brin.c             | 781 ++++++++++++++++++++-
 src/backend/access/nbtree/nbtsort.c        |   2 +-
 src/backend/access/table/tableam.c         |  49 +-
 src/backend/access/transam/parallel.c      |   4 +
 src/backend/catalog/index.c                |   3 +-
 src/backend/executor/nodeSeqscan.c         |   3 +-
 src/backend/utils/sort/tuplesort.c         |   3 +
 src/backend/utils/sort/tuplesortvariants.c | 211 ++++++
 src/include/access/brin.h                  |   3 +
 src/include/access/relscan.h               |   1 +
 src/include/access/tableam.h               |   9 +-
 src/include/utils/tuplesort.h              |  11 +
 12 files changed, 1065 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 25338a90e29..b7cd29c5968 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -33,6 +33,7 @@
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
+#include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -40,7 +41,118 @@
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
+#include "utils/tuplesort.h"
 
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_TUPLESORT			UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000003)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000004)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xA000000000000005)
+
+/*
+ * Status record for spooling/sorting phase.
+ */
+typedef struct BrinSpool
+{
+	Tuplesortstate *sortstate;	/* state data for tuplesort.c */
+	Relation	heap;
+	Relation	index;
+} BrinSpool;
+
+/*
+ * Status for index builds performed in parallel.  This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct BrinShared
+{
+	/*
+	 * These fields are not modified during the build.  They primarily exist for
+	 * the benefit of worker processes that need to create state corresponding
+	 * to that used by the leader.
+	 */
+	Oid			heaprelid;
+	Oid			indexrelid;
+	bool		isconcurrent;
+	BlockNumber	pagesPerRange;
+	int			scantuplesortstates;
+
+	/*
+	 * workersdonecv is used to monitor the progress of workers.  All parallel
+	 * participants must indicate that they are done before leader can use
+	 * results built by the workers (and before leader can write the data into
+	 * the index).
+	 */
+	ConditionVariable workersdonecv;
+
+	/*
+	 * mutex protects all fields before heapdesc.
+	 *
+	 * These fields contain status information of interest to BRIN index
+	 * builds that must work just the same when an index is built in parallel.
+	 */
+	slock_t		mutex;
+
+	/*
+	 * Mutable state that is maintained by workers, and reported back to
+	 * leader at end of the scans.
+	 *
+	 * nparticipantsdone is number of worker processes finished.
+	 *
+	 * reltuples is the total number of input heap tuples.
+	 *
+	 * indtuples is the total number of tuples that made it into the index.
+	 */
+	int			nparticipantsdone;
+	double		reltuples;
+	double		indtuples;
+
+	/*
+	 * ParallelTableScanDescData data follows. Can't directly embed here, as
+	 * implementations of the parallel table scan desc interface might need
+	 * stronger alignment.
+	 */
+} BrinShared;
+
+/*
+ * Return pointer to a BrinShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ * MAXALIGN.
+ */
+#define ParallelTableScanFromBrinShared(shared) \
+	(ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BrinLeader
+{
+	/* parallel context itself */
+	ParallelContext *pcxt;
+
+	/*
+	 * nparticipanttuplesorts is the exact number of worker processes
+	 * successfully launched, plus one leader process if it participates as a
+	 * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
+	 * participating as a worker).
+	 */
+	int			nparticipanttuplesorts;
+
+	/*
+	 * Leader process convenience pointers to shared state (leader avoids TOC
+	 * lookups).
+	 *
+	 * brinshared is the shared state for entire build.  sharedsort is the
+	 * shared, tuplesort-managed state passed to each process tuplesort.
+	 * snapshot is the snapshot used by the scan iff an MVCC snapshot is required.
+	 */
+	BrinShared	   *brinshared;
+	Sharedsort *sharedsort;
+	Snapshot	snapshot;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+} BrinLeader;
 
 /*
  * We use a BrinBuildState during initial construction of a BRIN index.
@@ -50,12 +162,23 @@ typedef struct BrinBuildState
 {
 	Relation	bs_irel;
 	int			bs_numtuples;
+	int			bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
 	BlockNumber bs_currRangeStart;
 	BrinRevmap *bs_rmAccess;
 	BrinDesc   *bs_bdesc;
 	BrinMemTuple *bs_dtuple;
+
+	/*
+	 * bs_leader is only present when a parallel index build is performed, and
+	 * only in the leader process. (Actually, only the leader has a
+	 * BrinBuildState.)
+	 */
+	BrinLeader *bs_leader;
+	int			bs_worker_id;
+	BrinSpool  *bs_spool;
+	BrinSpool  *bs_spool_out;
 } BrinBuildState;
 
 /*
@@ -76,6 +199,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
 						  bool include_partial, double *numSummarized, double *numExisting);
 static void form_and_insert_tuple(BrinBuildState *state);
+static void form_and_spill_tuple(BrinBuildState *state);
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
 						 BrinTuple *b);
 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
@@ -83,6 +207,20 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
 								BrinMemTuple *dtup, const Datum *values, const bool *nulls);
 static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
 
+/* parallel index builds */
+static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+								 bool isconcurrent, int request);
+static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
+static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
+											   Relation heap, Relation index);
+static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
+										  BrinSpool *brinspool,
+										  BrinShared *brinshared,
+										  Sharedsort *sharedsort,
+										  Relation heap, Relation index,
+										  int sortmem, bool progress);
+
 /*
  * BRIN handler function: return IndexAmRoutine with access method parameters
  * and callbacks.
@@ -820,6 +958,67 @@ brinbuildCallback(Relation index,
 							   values, isnull);
 }
 
+/*
+ * A version of the callback, used by parallel index builds. The main difference
+ * is that instead of writing the BRIN tuples into the index, we write them into
+ * a shared tuplestore file, and leave the insertion up to the leader (which may
+ * reorder them a bit etc.). The callback also does not generate empty ranges,
+ * those may be added by the leader when merging results from workers.
+ */
+static void
+brinbuildCallbackParallel(Relation index,
+						  ItemPointer tid,
+						  Datum *values,
+						  bool *isnull,
+						  bool tupleIsAlive,
+						  void *brstate)
+{
+	BrinBuildState *state = (BrinBuildState *) brstate;
+	BlockNumber thisblock;
+
+	thisblock = ItemPointerGetBlockNumber(tid);
+
+	/*
+	 * If we're in a block that belongs to a future range, summarize what
+	 * we've got and start afresh.  Note the scan might have skipped many
+	 * pages, if they were devoid of live tuples; make sure to insert index
+	 * tuples for those too.
+	 */
+	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+	{
+
+		BRIN_elog((DEBUG2,
+				   "brinbuildCallback: completed a range: %u--%u",
+				   state->bs_currRangeStart,
+				   state->bs_currRangeStart + state->bs_pagesPerRange));
+
+		/* create the index tuple and write it into the tuplesort */
+		form_and_spill_tuple(state);
+
+		/*
+		 * set state to correspond to the next range
+		 *
+		 * XXX This has the issue that it skips ranges summarized by other
+		 * workers, but it also skips empty ranges that should have been
+		 * summarized. We'd need to either make the workers aware which
+		 * chunk they are actually processing (which is currently known
+		 * only in the ParallelBlockTableScan bit). Or we could ignore it
+		 * here, and then decide it while "merging" results from workers
+		 * (if there's no entry for the range, it had to be empty so we
+		 * just add an empty one).
+		 */
+		while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+			state->bs_currRangeStart += state->bs_pagesPerRange;
+
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	}
+
+	/* Accumulate the current tuple into the running state */
+	(void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+							   values, isnull);
+}
+
 /*
  * brinbuild() -- build a new BRIN index.
  */
@@ -881,18 +1080,93 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	revmap = brinRevmapInitialize(index, &pagesPerRange);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
+	state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	state->bs_spool->heap = heap;
+	state->bs_spool->index = index;
+
+	/*
+	 * Attempt to launch parallel worker scan when required
+	 *
+	 * XXX plan_create_index_workers makes the number of workers dependent on
+	 * maintenance_work_mem, requiring 32MB for each worker. That makes sense
+	 * for btree, but not for BRIN, which can do away with much less memory.
+	 * So maybe make that somehow less strict, optionally?
+	 */
+	if (indexInfo->ii_ParallelWorkers > 0)
+		_brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
+							 indexInfo->ii_ParallelWorkers);
+
 	/*
 	 * Now scan the relation.  No syncscan allowed here because we want the
 	 * heap blocks in physical order.
 	 */
-	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
-									   brinbuildCallback, (void *) state, NULL);
 
-	/* process the final batch */
-	form_and_insert_tuple(state);
+	/*
+	 * If parallel build requested and at least one worker process was
+	 * successfully launched, set up coordination state
+	 */
+	if (state->bs_leader)
+	{
+		SortCoordinate coordinate;
+
+		coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+		coordinate->isWorker = false;
+		coordinate->nParticipants =
+			state->bs_leader->nparticipanttuplesorts;
+		coordinate->sharedsort = state->bs_leader->sharedsort;
+
+
+		/*
+		 * Begin serial/leader tuplesort.
+		 *
+		 * In cases where parallelism is involved, the leader receives the same
+		 * share of maintenance_work_mem as a serial sort (it is generally treated
+		 * in the same way as a serial sort once we return).  Parallel worker
+		 * Tuplesortstates will have received only a fraction of
+		 * maintenance_work_mem, though.
+		 *
+		 * We rely on the lifetime of the Leader Tuplesortstate almost not
+		 * overlapping with any worker Tuplesortstate's lifetime.  There may be
+		 * some small overlap, but that's okay because we rely on leader
+		 * Tuplesortstate only allocating a small, fixed amount of memory here.
+		 * When its tuplesort_performsort() is called (by our caller), and
+		 * significant amounts of memory are likely to be used, all workers must
+		 * have already freed almost all memory held by their Tuplesortstates
+		 * (they are about to go away completely, too).  The overall effect is
+		 * that maintenance_work_mem always represents an absolute high watermark
+		 * on the amount of memory used by a CREATE INDEX operation, regardless of
+		 * the use of parallelism or any other factor.
+		 */
+		state->bs_spool_out = (BrinSpool *) palloc0(sizeof(BrinSpool));
+		state->bs_spool_out->heap = heap;
+		state->bs_spool_out->index = index;
+
+		state->bs_spool_out->sortstate =
+			tuplesort_begin_index_brin(heap, index,
+									   maintenance_work_mem, coordinate,
+									   TUPLESORT_NONE);
+
+		/*
+		 * In parallel mode, wait for workers to complete, and then read all
+		 * tuples from the shared tuplesort and insert them into the index.
+		 */
+		_brin_end_parallel(state->bs_leader, state);
+	}
+	else	/* no parallel index build, just do the usual thing */
+	{
+		reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
+										   brinbuildCallback, (void *) state, NULL);
+
+		/* process the final batch */
+		form_and_insert_tuple(state);
+
+		/* track the number of relation tuples */
+		state->bs_reltuples = reltuples;
+	}
 
 	/* release resources */
 	idxtuples = state->bs_numtuples;
+	reltuples = state->bs_reltuples;
 	brinRevmapTerminate(state->bs_rmAccess);
 	terminate_brin_buildstate(state);
 
@@ -1312,12 +1586,16 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
 
 	state->bs_irel = idxRel;
 	state->bs_numtuples = 0;
+	state->bs_reltuples = 0;
 	state->bs_currentInsertBuf = InvalidBuffer;
 	state->bs_pagesPerRange = pagesPerRange;
 	state->bs_currRangeStart = 0;
 	state->bs_rmAccess = revmap;
 	state->bs_bdesc = brin_build_desc(idxRel);
 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+	state->bs_leader = NULL;
+	state->bs_worker_id = 0;
+	state->bs_spool = NULL;
 
 	return state;
 }
@@ -1609,6 +1887,32 @@ form_and_insert_tuple(BrinBuildState *state)
 	pfree(tup);
 }
 
+/*
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and write it to a (shared) tuplestore (the leader will insert it
+ * into the index later).
+ */
+static void
+form_and_spill_tuple(BrinBuildState *state)
+{
+	BrinTuple  *tup;
+	Size		size;
+
+	/* don't insert empty tuples in parallel build */
+	if (state->bs_dtuple->bt_empty_range)
+		return;
+
+	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+						  state->bs_dtuple, &size);
+
+	/* write the BRIN tuple to the tuplesort */
+	tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size);
+
+	state->bs_numtuples++;
+
+	pfree(tup);
+}
+
 /*
  * Given two deformed tuples, adjust the first one so that it's consistent
  * with the summary values in both.
@@ -1928,3 +2232,472 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
 
 	return true;
 }
+
+static void
+_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+					 bool isconcurrent, int request)
+{
+	ParallelContext *pcxt;
+	int			scantuplesortstates;
+	Snapshot	snapshot;
+	Size		estbrinshared;
+	Size		estsort;
+	BrinShared   *brinshared;
+	Sharedsort   *sharedsort;
+	BrinLeader   *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	bool		leaderparticipates = true;
+	int			querylen;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+	leaderparticipates = false;
+#endif
+
+	/*
+	 * Enter parallel mode, and create context for parallel build of brin
+	 * index
+	 */
+	EnterParallelMode();
+	Assert(request > 0);
+	pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
+								 request);
+
+	scantuplesortstates = leaderparticipates ? request + 1 : request;
+
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
+	 * concurrent build, we take a regular MVCC snapshot and index whatever's
+	 * live according to that.
+	 */
+	if (!isconcurrent)
+		snapshot = SnapshotAny;
+	else
+		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+	/*
+	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
+	 */
+	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
+	estsort = tuplesort_estimate_shared(scantuplesortstates);
+	shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+
+	shm_toc_estimate_keys(&pcxt->estimator, 2);
+
+	/*
+	 * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+	 * and PARALLEL_KEY_BUFFER_USAGE.
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgWalUsage or
+	 * pgBufferUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+
+	/* Everyone's had a chance to ask for space, so now create the DSM */
+	InitializeParallelDSM(pcxt);
+
+	/* If no DSM segment was available, back out (do serial build) */
+	if (pcxt->seg == NULL)
+	{
+		if (IsMVCCSnapshot(snapshot))
+			UnregisterSnapshot(snapshot);
+		DestroyParallelContext(pcxt);
+		ExitParallelMode();
+		return;
+	}
+
+	/* Store shared build state, for which we reserved space */
+	brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
+	/* Initialize immutable state */
+	brinshared->heaprelid = RelationGetRelid(heap);
+	brinshared->indexrelid = RelationGetRelid(index);
+	brinshared->isconcurrent = isconcurrent;
+	brinshared->scantuplesortstates = scantuplesortstates;
+	brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+	ConditionVariableInit(&brinshared->workersdonecv);
+	SpinLockInit(&brinshared->mutex);
+
+	/* Initialize mutable state */
+	brinshared->nparticipantsdone = 0;
+	brinshared->reltuples = 0.0;
+	brinshared->indtuples = 0.0;
+
+	table_parallelscan_initialize(heap,
+								  ParallelTableScanFromBrinShared(brinshared),
+								  snapshot, brinshared->pagesPerRange);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+	tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+								pcxt->seg);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+	}
+
+	/*
+	 * Allocate space for each worker's WalUsage and BufferUsage; no need to
+	 * initialize.
+	 */
+	walusage = shm_toc_allocate(pcxt->toc,
+								mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+	bufferusage = shm_toc_allocate(pcxt->toc,
+								   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+
+	/* Launch workers, saving status for leader/caller */
+	LaunchParallelWorkers(pcxt);
+	brinleader->pcxt = pcxt;
+	brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
+	if (leaderparticipates)
+		brinleader->nparticipanttuplesorts++;
+	brinleader->brinshared = brinshared;
+	brinleader->sharedsort = sharedsort;
+	brinleader->snapshot = snapshot;
+	brinleader->walusage = walusage;
+	brinleader->bufferusage = bufferusage;
+
+	/* If no workers were successfully launched, back out (do serial build) */
+	if (pcxt->nworkers_launched == 0)
+	{
+		_brin_end_parallel(brinleader, NULL);
+		return;
+	}
+
+	/* Save leader state now that it's clear build will be parallel */
+	buildstate->bs_leader = brinleader;
+
+	/* Join heap scan ourselves */
+	if (leaderparticipates)
+		_brin_leader_participate_as_worker(buildstate, heap, index);
+
+	/*
+	 * Caller needs to wait for all launched workers when we return.  Make
+	 * sure that the failure-to-start case will not hang forever.
+	 */
+	WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
+{
+	int			i;
+	BrinTuple  *btup;
+	Size		tuplen;
+	BrinShared *brinshared = brinleader->brinshared;
+	BlockNumber	prevblkno = InvalidBlockNumber;
+	BrinTuple  *emptyTuple = NULL;
+	Size		emptySize;
+
+	/* Shutdown worker processes */
+	WaitForParallelWorkersToFinish(brinleader->pcxt);
+
+	if (!state)
+		return;
+
+	/* copy the data into leader state (we have to wait for the workers ) */
+	state->bs_reltuples = brinshared->reltuples;
+	state->bs_numtuples = brinshared->indtuples;
+
+	tuplesort_performsort(state->bs_spool_out->sortstate);
+
+	/*
+	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
+	 * That probably gives us an index that is cheaper to scan, thanks to mostly
+	 * getting data from the same index page as before.
+	 *
+	 * FIXME This probably needs some memory management fixes - we're reading
+	 * tuples from the tuplesort, we're allocating an emty tuple, and so on.
+	 * Probably better to release this memory.
+	 *
+	 * XXX We can't quite free the BrinTuple, though, because that's a field
+	 * in BrinSortTuple.
+	 */
+	while ((btup = tuplesort_getbrintuple(state->bs_spool_out->sortstate, &tuplen, true)) != NULL)
+	{
+		/*
+		 * We should not get two summaries for the same range. The workers
+		 * are producing ranges for non-overlapping sections of the table.
+		 */
+		Assert(btup->bt_blkno != prevblkno);
+
+		/* Ranges should be multiples of pages_per_range for the index. */
+		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+
+		/* Fill empty ranges for all ranges missing in the tuplesort. */
+		prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+		while (prevblkno + state->bs_pagesPerRange < btup->bt_blkno)
+		{
+			/* the missing range */
+			prevblkno += state->bs_pagesPerRange;
+
+			/* Did we already build the empty range? If not, do it now. */
+			if (emptyTuple == NULL)
+			{
+				BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+				emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+			}
+			else
+			{
+				/* we already have am "empty range" tuple, just set the block */
+				emptyTuple->bt_blkno = prevblkno;
+			}
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf,
+						  emptyTuple->bt_blkno, emptyTuple, emptySize);
+		}
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf, btup->bt_blkno, btup, tuplen);
+
+		prevblkno = btup->bt_blkno;
+	}
+
+	tuplesort_end(state->bs_spool_out->sortstate);
+
+	/*
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
+	 */
+	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+
+	/* Free last reference to MVCC snapshot, if one was used */
+	if (IsMVCCSnapshot(brinleader->snapshot))
+		UnregisterSnapshot(brinleader->snapshot);
+	DestroyParallelContext(brinleader->pcxt);
+	ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * brin index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+{
+	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+	return add_size(BUFFERALIGN(sizeof(BrinShared)),
+					table_parallelscan_estimate(heap, snapshot));
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
+{
+	BrinLeader *brinleader = buildstate->bs_leader;
+	int			sortmem;
+
+	/* Allocate memory and initialize private spool */
+	buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	buildstate->bs_spool->heap = buildstate->bs_spool->heap;
+	buildstate->bs_spool->index = buildstate->bs_spool->index;
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
+
+	/* Perform work common to all participants */
+	_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared,
+								  brinleader->sharedsort, heap, index, sortmem, true);
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds).  All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
+							  BrinShared *brinshared, Sharedsort *sharedsort,
+							  Relation heap, Relation index, int sortmem,
+							  bool progress)
+{
+	SortCoordinate coordinate;
+	TableScanDesc	scan;
+	double		reltuples;
+	IndexInfo  *indexInfo;
+
+	/* Initialize local tuplesort coordination state */
+	coordinate = palloc0(sizeof(SortCoordinateData));
+	coordinate->isWorker = true;
+	coordinate->nParticipants = -1;
+	coordinate->sharedsort = sharedsort;
+
+	/* Begin "partial" tuplesort */
+	brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap,
+													  brinspool->index,
+													  sortmem, coordinate,
+													  TUPLESORT_NONE);
+
+	/* Join parallel scan */
+	indexInfo = BuildIndexInfo(index);
+	indexInfo->ii_Concurrent = brinshared->isconcurrent;
+
+	scan = table_beginscan_parallel(heap,
+									ParallelTableScanFromBrinShared(brinshared));
+
+	reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+									   brinbuildCallbackParallel, state, scan);
+
+	/* insert the last item */
+	form_and_spill_tuple(state);
+
+	/* sort the BRIN ranges built by this worker */
+	tuplesort_performsort(brinspool->sortstate);
+
+	state->bs_reltuples += reltuples;
+
+	/*
+	 * Done.  Record ambuild statistics.
+	 */
+	SpinLockAcquire(&brinshared->mutex);
+	brinshared->nparticipantsdone++;
+	brinshared->reltuples += state->bs_reltuples;
+	brinshared->indtuples += state->bs_numtuples;
+	SpinLockRelease(&brinshared->mutex);
+
+	/* Notify leader */
+	ConditionVariableSignal(&brinshared->workersdonecv);
+
+	tuplesort_end(brinspool->sortstate);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+	char	   *sharedquery;
+	BrinShared *brinshared;
+	Sharedsort *sharedsort;
+	BrinBuildState *buildstate;
+	Relation	heapRel;
+	Relation	indexRel;
+	LOCKMODE	heapLockmode;
+	LOCKMODE	indexLockmode;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	int			sortmem;
+
+	/*
+	 * The only possible status flag that can be set to the parallel worker is
+	 * PROC_IN_SAFE_IC.
+	 */
+	Assert((MyProc->statusFlags == 0) ||
+		   (MyProc->statusFlags == PROC_IN_SAFE_IC));
+
+	/* Set debug_query_string for individual workers first */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+
+	/* Report the query string from leader */
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/* Look up brin shared state */
+	brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
+
+	/* Open relations using lock modes known to be obtained by index.c */
+	if (!brinshared->isconcurrent)
+	{
+		heapLockmode = ShareLock;
+		indexLockmode = AccessExclusiveLock;
+	}
+	else
+	{
+		heapLockmode = ShareUpdateExclusiveLock;
+		indexLockmode = RowExclusiveLock;
+	}
+
+	/* Open relations within worker */
+	heapRel = table_open(brinshared->heaprelid, heapLockmode);
+	indexRel = index_open(brinshared->indexrelid, indexLockmode);
+
+	buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange);
+
+	/* Initialize worker's own spool */
+	buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	buildstate->bs_spool->heap = heapRel;
+	buildstate->bs_spool->index = indexRel;
+
+	/* Look up shared state private to tuplesort.c */
+	sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+	tuplesort_attach_shared(sharedsort, seg);
+
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
+
+	_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool,
+								  brinshared, sharedsort,
+								  heapRel, indexRel, sortmem, false);
+
+	/* Report WAL/buffer usage during parallel execution */
+	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+						  &walusage[ParallelWorkerNumber]);
+
+	index_close(indexRel, indexLockmode);
+	table_close(heapRel, heapLockmode);
+}
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index c2665fce411..6241baeea86 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot);
+								  snapshot, InvalidBlockNumber);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index c6bdb7e1c68..4af0d433e9d 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot)
+							  Snapshot snapshot, BlockNumber chunk_factor)
 {
-	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
+	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
@@ -395,16 +395,21 @@ table_block_parallelscan_estimate(Relation rel)
 }
 
 Size
-table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor)
 {
 	ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
 
 	bpscan->base.phs_relid = RelationGetRelid(rel);
 	bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
-	/* compare phs_syncscan initialization to similar logic in initscan */
+	bpscan->phs_chunk_factor = chunk_factor;
+	/* compare phs_syncscan initialization to similar logic in initscan
+	 *
+	 * Disable sync scans if the chunk factor is set (valid block number).
+	 */
 	bpscan->base.phs_syncscan = synchronize_seqscans &&
 		!RelationUsesLocalBuffers(rel) &&
-		bpscan->phs_nblocks > NBuffers / 4;
+		(bpscan->phs_nblocks > NBuffers / 4) &&
+		!BlockNumberIsValid(bpscan->phs_chunk_factor);
 	SpinLockInit(&bpscan->phs_mutex);
 	bpscan->phs_startblock = InvalidBlockNumber;
 	pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
@@ -459,6 +464,25 @@ table_block_parallelscan_startblock_init(Relation rel,
 	pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
 									  PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
 
+	/*
+	 * If the chunk size factor is set, we need to make sure the chunk size is
+	 * a multiple of that value. We round the chunk size to the nearest chunk
+	 * factor multiple, at least one chunk_factor.
+	 *
+	 * XXX Note this may override PARALLEL_SEQSCAN_MAX_CHUNK_SIZE, in case the
+	 * chunk factor (e.g. BRIN pages_per_range) is larger.
+	 */
+	if (pbscan->phs_chunk_factor != InvalidBlockNumber)
+	{
+		/* nearest (smaller) multiple of chunk_factor */
+		pbscanwork->phsw_chunk_size
+			= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
+
+		/* but at least one chunk_factor */
+		pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
+										  pbscan->phs_chunk_factor);
+	}
+
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
@@ -575,6 +599,21 @@ table_block_parallelscan_nextpage(Relation rel,
 			(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
 			pbscanwork->phsw_chunk_size >>= 1;
 
+		/*
+		 * We need to make sure the new chunk_size is still a suitable multiple
+		 * of chunk_factor.
+		 */
+		if (pbscan->phs_chunk_factor != InvalidBlockNumber)
+		{
+			/* nearest (smaller) multiple of chunk_factor */
+			pbscanwork->phsw_chunk_size
+				= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
+
+			/* but at least one chunk_factor */
+			pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
+											  pbscan->phs_chunk_factor);
+		}
+
 		nallocated = pbscanwork->phsw_nallocated =
 			pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
 									pbscanwork->phsw_chunk_size);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 194a1207be6..d78314062e0 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/brin.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -145,6 +146,9 @@ static const struct
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
 	},
+	{
+		"_brin_parallel_build_main", _brin_parallel_build_main
+	},
 	{
 		"parallel_vacuum_main", parallel_vacuum_main
 	}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 143fae01ebd..37e4305d50a 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2982,7 +2982,8 @@ index_build(Relation heapRelation,
 	 * Note that planner considers parallel safety for us.
 	 */
 	if (parallel && IsNormalProcessingMode() &&
-		indexRelation->rd_rel->relam == BTREE_AM_OID)
+		(indexRelation->rd_rel->relam == BTREE_AM_OID ||
+		 indexRelation->rd_rel->relam == BRIN_AM_OID))
 		indexInfo->ii_ParallelWorkers =
 			plan_create_index_workers(RelationGetRelid(heapRelation),
 									  RelationGetRelid(indexRelation));
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 49a5933aff6..529a7ed3284 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -262,7 +262,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot);
+								  estate->es_snapshot,
+								  InvalidBlockNumber);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index ab6353bdcd1..4c6b396a8a8 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -1328,6 +1328,7 @@ tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbre
 			break;
 
 		default:
+			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
@@ -1462,6 +1463,7 @@ tuplesort_performsort(Tuplesortstate *state)
 			break;
 
 		default:
+			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
@@ -1718,6 +1720,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 			return false;
 
 		default:
+			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			return false;		/* keep compiler quiet */
 	}
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 2cd508e5130..343ed4bbc54 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/brin_tuple.h"
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
@@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups,
 								 int count);
 static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups,
 							   int count);
+static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups,
+									int count);
 static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups,
 							   int count);
 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
@@ -69,10 +72,16 @@ static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
 								  Tuplesortstate *state);
 static int	comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b,
 										   Tuplesortstate *state);
+static int	comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+								  Tuplesortstate *state);
 static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
 						  LogicalTape *tape, unsigned int len);
+static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape,
+								SortTuple *stup);
+static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+							   LogicalTape *tape, unsigned int len);
 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
 							 Tuplesortstate *state);
 static int	comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b,
@@ -128,6 +137,16 @@ typedef struct
 	uint32		max_buckets;
 } TuplesortIndexHashArg;
 
+/*
+ * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase.
+ */
+typedef struct
+{
+	TuplesortIndexArg index;
+
+	/* XXX do we need something here? */
+} TuplesortIndexBrinArg;
+
 /*
  * Data struture pointed by "TuplesortPublic.arg" for the Datum case.
  * Set by tuplesort_begin_datum and used only by the DatumTuple routines.
@@ -140,6 +159,22 @@ typedef struct
 	int			datumTypeLen;
 } TuplesortDatumArg;
 
+/*
+ * Computing BrinTuple size with only the tuple is difficult, so we want to track
+ * the length for r referenced by SortTuple. That's what BrinSortTuple is meant
+ * to do - it's essentially a BrinTuple prefixed by length. We only write the
+ * BrinTuple to the logtapes, though.
+ */
+typedef struct BrinSortTuple
+{
+	Size		tuplen;
+	BrinTuple	tuple;
+} BrinSortTuple;
+
+/* Size of the BrinSortTuple, given length of the BrinTuple. */
+#define BRINSORTTUPLE_SIZE(len)		(offsetof(BrinSortTuple, tuple) + (len))
+
+
 Tuplesortstate *
 tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
@@ -527,6 +562,47 @@ tuplesort_begin_index_gist(Relation heapRel,
 	return state;
 }
 
+Tuplesortstate *
+tuplesort_begin_index_brin(Relation heapRel,
+						   Relation indexRel,
+						   int workMem,
+						   SortCoordinate coordinate,
+						   int sortopt)
+{
+	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+												   sortopt);
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext;
+	TuplesortIndexBrinArg *arg;
+
+	oldcontext = MemoryContextSwitchTo(base->maincontext);
+	arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg));
+
+#ifdef TRACE_SORT
+	if (trace_sort)
+		elog(LOG,
+			 "begin index sort: workMem = %d, randomAccess = %c",
+			 workMem,
+			 sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+	base->nKeys = 1;			/* Only one sort column, the block number */
+
+	base->removeabbrev = removeabbrev_index_brin;
+	base->comparetup = comparetup_index_brin;
+	base->writetup = writetup_index_brin;
+	base->readtup = readtup_index_brin;
+	base->haveDatum1 = true;
+	base->arg = arg;
+
+	arg->index.heapRel = heapRel;
+	arg->index.indexRel = indexRel;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 					  bool nullsFirstFlag, int workMem,
@@ -707,6 +783,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
 							  !stup.isnull1);
 }
 
+/*
+ * Collect one BRIN tuple while collecting input data for sort.
+ */
+void
+tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size)
+{
+	SortTuple	stup;
+	BrinSortTuple *bstup;
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext);
+
+	/* allocate space for the whole BRIN sort tuple */
+	bstup = palloc(BRINSORTTUPLE_SIZE(size));
+	stup.tuple = bstup;
+
+	bstup->tuplen = size;
+	memcpy(&bstup->tuple, tuple, size);
+
+	stup.datum1 = tuple->bt_blkno;
+	stup.isnull1 = false;
+
+	tuplesort_puttuple_common(state, &stup,
+							  base->sortKeys &&
+							  base->sortKeys->abbrev_converter &&
+							  !stup.isnull1);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Accept one Datum while collecting input data for sort.
  *
@@ -850,6 +955,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward)
 	return (IndexTuple) stup.tuple;
 }
 
+/*
+ * Fetch the next BRIN tuple in either forward or back direction.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
+ */
+BrinTuple *
+tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext);
+	SortTuple		stup;
+	BrinSortTuple  *btup;
+
+	if (!tuplesort_gettuple_common(state, forward, &stup))
+		stup.tuple = NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (!stup.tuple)
+		return NULL;
+
+	btup = (BrinSortTuple *) stup.tuple;
+
+	*len = btup->tuplen;
+
+	return &btup->tuple;
+}
+
 /*
  * Fetch the next Datum in either forward or back direction.
  * Returns false if no more datums.
@@ -1564,6 +1698,83 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
 								 &stup->isnull1);
 }
 
+/*
+ * Routines specialized for BrinTuple case
+ */
+
+static void
+removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count)
+{
+	int			i;
+
+	for (i = 0; i < count; i++)
+	{
+		BrinSortTuple   *tuple;
+
+		tuple = stups[i].tuple;
+		stups[i].datum1 = tuple->tuple.bt_blkno;
+	}
+}
+
+static int
+comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+					  Tuplesortstate *state)
+{
+	BrinTuple  *tuple1;
+	BrinTuple  *tuple2;
+
+	tuple1 = &((BrinSortTuple *) a)->tuple;
+	tuple2 = &((BrinSortTuple *) b)->tuple;
+
+	if (tuple1->bt_blkno > tuple2->bt_blkno)
+		return 1;
+	else if (tuple1->bt_blkno < tuple2->bt_blkno)
+		return -1;
+
+	/* silence compilers */
+	return 0;
+}
+
+static void
+writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	BrinSortTuple  *tuple = (BrinSortTuple *) stup->tuple;
+	unsigned int	tuplen = tuple->tuplen;
+
+	tuplen = tuplen + sizeof(tuplen);
+	LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen);
+	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+		LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+}
+
+static void
+readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+				   LogicalTape *tape, unsigned int len)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	unsigned int tuplen = len - sizeof(unsigned int);
+
+	/*
+	 * Allocate space for the BRIN sort tuple, which is BrinTuple with an
+	 * extra length field.
+	 */
+	BrinSortTuple *tuple
+		= (BrinSortTuple *) tuplesort_readtup_alloc(state,
+													BRINSORTTUPLE_SIZE(tuplen));
+
+	tuple->tuplen = tuplen;
+
+	LogicalTapeReadExact(tape, &tuple->tuple, tuplen);
+	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+	stup->tuple = (void *) tuple;
+
+	/* set up first-column key value, which is block number */
+	stup->datum1 = tuple->tuple.bt_blkno;
+}
+
 /*
  * Routines specialized for DatumTuple case
  */
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index ed66f1b3d51..3451ecb211f 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -11,6 +11,7 @@
 #define BRIN_H
 
 #include "nodes/execnodes.h"
+#include "storage/shm_toc.h"
 #include "utils/relcache.h"
 
 
@@ -52,4 +53,6 @@ typedef struct BrinStatsData
 
 extern void brinGetStats(Relation index, BrinStatsData *stats);
 
+extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc);
+
 #endif							/* BRIN_H */
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index d03360eac04..72a20d882f5 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -79,6 +79,7 @@ typedef struct ParallelBlockTableScanDescData
 	BlockNumber phs_nblocks;	/* # blocks in relation at start of scan */
 	slock_t		phs_mutex;		/* mutual exclusion for setting startblock */
 	BlockNumber phs_startblock; /* starting block number */
+	BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */
 	pg_atomic_uint64 phs_nallocated;	/* number of blocks allocated to
 										 * workers so far. */
 }			ParallelBlockTableScanDescData;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index dbb709b56ce..d94e4d32aa1 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -390,7 +390,8 @@ typedef struct TableAmRoutine
 	 * relation.
 	 */
 	Size		(*parallelscan_initialize) (Relation rel,
-											ParallelTableScanDesc pscan);
+											ParallelTableScanDesc pscan,
+											BlockNumber chunk_factor);
 
 	/*
 	 * Reinitialize `pscan` for a new scan. `rel` will be the same relation as
@@ -1148,7 +1149,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot);
+										  Snapshot snapshot,
+										  BlockNumber chunk_factor);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -2064,7 +2066,8 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 
 extern Size table_block_parallelscan_estimate(Relation rel);
 extern Size table_block_parallelscan_initialize(Relation rel,
-												ParallelTableScanDesc pscan);
+												ParallelTableScanDesc pscan,
+												BlockNumber chunk_factor);
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 9ed2de76cd6..357eb35311d 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -21,6 +21,7 @@
 #ifndef TUPLESORT_H
 #define TUPLESORT_H
 
+#include "access/brin_tuple.h"
 #include "access/itup.h"
 #include "executor/tuptable.h"
 #include "storage/dsm.h"
@@ -282,6 +283,9 @@ typedef struct
  * The "index_hash" API is similar to index_btree, but the tuples are
  * actually sorted by their hash codes not the raw data.
  *
+ * The "index_brin" API is similar to index_btree, but the tuples are
+ * BrinTuple and are sorted by their block number not the raw data.
+ *
  * Parallel sort callers are required to coordinate multiple tuplesort states
  * in a leader process and one or more worker processes.  The leader process
  * must launch workers, and have each perform an independent "partial"
@@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
 												  Relation indexRel,
 												  int workMem, SortCoordinate coordinate,
 												  int sortopt);
+extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel,
+												  Relation indexRel,
+												  int workMem, SortCoordinate coordinate,
+												  int sortopt);
 extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
 											 Oid sortOperator, Oid sortCollation,
 											 bool nullsFirstFlag,
@@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup);
 extern void tuplesort_putindextuplevalues(Tuplesortstate *state,
 										  Relation rel, ItemPointer self,
 										  const Datum *values, const bool *isnull);
+extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len);
 extern void tuplesort_putdatum(Tuplesortstate *state, Datum val,
 							   bool isNull);
 
@@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
 								   bool copy, TupleTableSlot *slot, Datum *abbrev);
 extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward);
 extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward);
+extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len,
+										 bool forward);
 extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy,
 							   Datum *val, bool *isNull, Datum *abbrev);
 
-- 
2.41.0

