From 7c9be5a2a71a0eead626c8f9ac248008bc6db3e9 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Tue, 7 Nov 2023 20:02:03 +0100
Subject: [PATCH v3 4/4] remove tableam changes

---
 src/backend/access/brin/brin.c      | 115 +++++++++++++++++++++++++---
 src/backend/access/nbtree/nbtsort.c |   2 +-
 src/backend/access/table/tableam.c  |  49 ++----------
 src/backend/executor/nodeSeqscan.c  |   3 +-
 src/include/access/relscan.h        |   1 -
 src/include/access/tableam.h        |   9 +--
 6 files changed, 115 insertions(+), 64 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index d54d39a535e..daadd051e30 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -2334,7 +2334,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 
 	table_parallelscan_initialize(heap,
 								  ParallelTableScanFromBrinShared(brinshared),
-								  snapshot, brinshared->pagesPerRange);
+								  snapshot);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -2413,6 +2413,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 {
 	int			i;
 	BrinTuple  *btup;
+	BrinMemTuple *memtuple = NULL;
 	Size		tuplen;
 	BrinShared *brinshared = brinleader->brinshared;
 	BlockNumber	prevblkno = InvalidBlockNumber;
@@ -2432,13 +2433,19 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 
 	tuplesort_performsort(spool->sortstate);
 
+	/*
+	 * Initialize BrinMemTuple we'll use to union summaries from workers
+	 * (in case they happened to produce parts of the same paga range).
+	 */
+	memtuple = brin_new_memtuple(state->bs_bdesc);
+
 	/*
 	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
 	 * That probably gives us an index that is cheaper to scan, thanks to mostly
 	 * getting data from the same index page as before.
 	 *
 	 * FIXME This probably needs some memory management fixes - we're reading
-	 * tuples from the tuplesort, we're allocating an emty tuple, and so on.
+	 * tuples from the tuplesort, we're allocating an empty tuple, and so on.
 	 * Probably better to release this memory.
 	 *
 	 * XXX We can't quite free the BrinTuple, though, because that's a field
@@ -2446,14 +2453,65 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	 */
 	while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
 	{
+		/* Ranges should be multiples of pages_per_range for the index. */
+		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+
 		/*
-		 * We should not get two summaries for the same range. The workers
-		 * are producing ranges for non-overlapping sections of the table.
+		 * Do we need to union summaries for the same page range?
+		 *
+		 * If this is the first brin tuple we read, then just deform it into
+		 * the memtuple, and continue with the next one from tuplesort. We
+		 * however may need to insert empty summaries into the index.
+		 *
+		 * If it's the same block as the last we saw, we simply union the
+		 * brin tuple into it, and we're done - we don't even need to insert
+		 * empty ranges, because that was done earlier when we saw the first
+		 * brin tuple (for this range).
+		 *
+		 * Finally, if it's not the first brin tuple, and it's not the same
+		 * page range, we need to do the insert and then deform the tuple
+		 * into the memtuple. Then we'll insert empty ranges before the
+		 * new brin tuple, if needed.
 		 */
-		Assert(btup->bt_blkno != prevblkno);
+		if (prevblkno == InvalidBlockNumber)
+		{
+			/* First brin tuples, just deform into memtuple. */
+			memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
 
-		/* Ranges should be multiples of pages_per_range for the index. */
-		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+			/* continue to insert empty pages before thisblock */
+		}
+		else if (memtuple->bt_blkno == btup->bt_blkno)
+		{
+			/*
+			 * Not the first brin tuple, but same page range as the previous
+			 * one, so we can merge it into the memtuple.
+			 */
+			union_tuples(state->bs_bdesc, memtuple, btup);
+			continue;
+		}
+		else
+		{
+			BrinTuple  *tmp;
+			Size		len;
+
+			/*
+			 * We got brin tuple for a different page range, so form a brin
+			 * tuple from the memtuple, insert it, and re-init the memtuple
+			 * from the new brin tuple.
+			 */
+			tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+								  memtuple, &len);
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+			/* free the formed on-disk tuple */
+			pfree(tmp);
+
+			memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
+
+			/* continue to insert empty pages before thisblock */
+		}
 
 		/* Fill empty ranges for all ranges missing in the tuplesort. */
 		prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
@@ -2480,14 +2538,51 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 						  emptyTuple->bt_blkno, emptyTuple, emptySize);
 		}
 
-		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
-					  &state->bs_currentInsertBuf, btup->bt_blkno, btup, tuplen);
-
 		prevblkno = btup->bt_blkno;
 	}
 
 	tuplesort_end(spool->sortstate);
 
+	/* Fill empty ranges for all ranges missing in the tuplesort. */
+	prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+	while (prevblkno + state->bs_pagesPerRange < memtuple->bt_blkno)
+	{
+		/* the missing range */
+		prevblkno += state->bs_pagesPerRange;
+
+		/* Did we already build the empty range? If not, do it now. */
+		if (emptyTuple == NULL)
+		{
+			BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+			emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+		}
+		else
+		{
+			/* we already have am "empty range" tuple, just set the block */
+			emptyTuple->bt_blkno = prevblkno;
+		}
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf,
+					  emptyTuple->bt_blkno, emptyTuple, emptySize);
+	}
+
+	/**/
+	if (prevblkno != InvalidBlockNumber)
+	{
+		BrinTuple  *tmp;
+		Size		len;
+
+		tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+							  memtuple, &len);
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+		pfree(tmp);
+	}
+
 	/*
 	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
 	 * or we might get incomplete data.)
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 6241baeea86..c2665fce411 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot, InvalidBlockNumber);
+								  snapshot);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 4af0d433e9d..c6bdb7e1c68 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot, BlockNumber chunk_factor)
+							  Snapshot snapshot)
 {
-	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor);
+	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
@@ -395,21 +395,16 @@ table_block_parallelscan_estimate(Relation rel)
 }
 
 Size
-table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor)
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
 {
 	ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
 
 	bpscan->base.phs_relid = RelationGetRelid(rel);
 	bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
-	bpscan->phs_chunk_factor = chunk_factor;
-	/* compare phs_syncscan initialization to similar logic in initscan
-	 *
-	 * Disable sync scans if the chunk factor is set (valid block number).
-	 */
+	/* compare phs_syncscan initialization to similar logic in initscan */
 	bpscan->base.phs_syncscan = synchronize_seqscans &&
 		!RelationUsesLocalBuffers(rel) &&
-		(bpscan->phs_nblocks > NBuffers / 4) &&
-		!BlockNumberIsValid(bpscan->phs_chunk_factor);
+		bpscan->phs_nblocks > NBuffers / 4;
 	SpinLockInit(&bpscan->phs_mutex);
 	bpscan->phs_startblock = InvalidBlockNumber;
 	pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
@@ -464,25 +459,6 @@ table_block_parallelscan_startblock_init(Relation rel,
 	pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
 									  PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
 
-	/*
-	 * If the chunk size factor is set, we need to make sure the chunk size is
-	 * a multiple of that value. We round the chunk size to the nearest chunk
-	 * factor multiple, at least one chunk_factor.
-	 *
-	 * XXX Note this may override PARALLEL_SEQSCAN_MAX_CHUNK_SIZE, in case the
-	 * chunk factor (e.g. BRIN pages_per_range) is larger.
-	 */
-	if (pbscan->phs_chunk_factor != InvalidBlockNumber)
-	{
-		/* nearest (smaller) multiple of chunk_factor */
-		pbscanwork->phsw_chunk_size
-			= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
-
-		/* but at least one chunk_factor */
-		pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
-										  pbscan->phs_chunk_factor);
-	}
-
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
@@ -599,21 +575,6 @@ table_block_parallelscan_nextpage(Relation rel,
 			(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
 			pbscanwork->phsw_chunk_size >>= 1;
 
-		/*
-		 * We need to make sure the new chunk_size is still a suitable multiple
-		 * of chunk_factor.
-		 */
-		if (pbscan->phs_chunk_factor != InvalidBlockNumber)
-		{
-			/* nearest (smaller) multiple of chunk_factor */
-			pbscanwork->phsw_chunk_size
-				= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
-
-			/* but at least one chunk_factor */
-			pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
-											  pbscan->phs_chunk_factor);
-		}
-
 		nallocated = pbscanwork->phsw_nallocated =
 			pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
 									pbscanwork->phsw_chunk_size);
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 529a7ed3284..49a5933aff6 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -262,8 +262,7 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot,
-								  InvalidBlockNumber);
+								  estate->es_snapshot);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 72a20d882f5..d03360eac04 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -79,7 +79,6 @@ typedef struct ParallelBlockTableScanDescData
 	BlockNumber phs_nblocks;	/* # blocks in relation at start of scan */
 	slock_t		phs_mutex;		/* mutual exclusion for setting startblock */
 	BlockNumber phs_startblock; /* starting block number */
-	BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */
 	pg_atomic_uint64 phs_nallocated;	/* number of blocks allocated to
 										 * workers so far. */
 }			ParallelBlockTableScanDescData;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index d94e4d32aa1..dbb709b56ce 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -390,8 +390,7 @@ typedef struct TableAmRoutine
 	 * relation.
 	 */
 	Size		(*parallelscan_initialize) (Relation rel,
-											ParallelTableScanDesc pscan,
-											BlockNumber chunk_factor);
+											ParallelTableScanDesc pscan);
 
 	/*
 	 * Reinitialize `pscan` for a new scan. `rel` will be the same relation as
@@ -1149,8 +1148,7 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot,
-										  BlockNumber chunk_factor);
+										  Snapshot snapshot);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -2066,8 +2064,7 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 
 extern Size table_block_parallelscan_estimate(Relation rel);
 extern Size table_block_parallelscan_initialize(Relation rel,
-												ParallelTableScanDesc pscan,
-												BlockNumber chunk_factor);
+												ParallelTableScanDesc pscan);
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
-- 
2.41.0

