commit 16f0387dbeba3570836b506241bf0a1820de3390
Author: anastasia <a.lubennikova@postgrespro.ru>
Date:   Mon Aug 10 20:07:22 2020 +0300

    brin_summarize_fix_REL_12_v2.patch

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 267a6ee25a..c651d8b04e 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1159,6 +1159,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 	BlockNumber previous_blkno = InvalidBlockNumber;
 	BlockNumber root_blkno = InvalidBlockNumber;
 	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
+	OffsetNumber root_offsets_size = 0;
 
 	/*
 	 * sanity checks
@@ -1324,6 +1325,11 @@ heapam_index_build_range_scan(Relation heapRelation,
 		 * buffer continuously while visiting the page, so no pruning
 		 * operation can occur either.
 		 *
+		 * It is essential, though, to check the root_offsets_size bound
+		 * before accessing the array, because concurrently inserted HOT tuples
+		 * don't have a valid cached root offset and we need to build the map
+		 * once again for them.
+		 *
 		 * Also, although our opinions about tuple liveness could change while
 		 * we scan the page (due to concurrent transaction commits/aborts),
 		 * the chain root locations won't, so this info doesn't need to be
@@ -1338,7 +1344,7 @@ heapam_index_build_range_scan(Relation heapRelation,
 			Page		page = BufferGetPage(hscan->rs_cbuf);
 
 			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
-			heap_get_root_tuples(page, root_offsets);
+			root_offsets_size = heap_get_root_tuples(page, root_offsets);
 			LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 
 			root_blkno = hscan->rs_cblock;
@@ -1625,6 +1631,25 @@ heapam_index_build_range_scan(Relation heapRelation,
 
 			offnum = ItemPointerGetOffsetNumber(&heapTuple->t_self);
 
+			/*
+			 * As we do not hold buffer lock, concurrent insertion can happen.
+			 * If so, collect the map once again to find the root offset for
+			 * the new tuple.
+			 * (MaxOffsetNumber+1) is a special value that we use to
+			 * differentiate uninitialized entries.
+			 */
+			if (root_offsets_size < offnum ||
+				root_offsets[offnum - 1] == (MaxOffsetNumber+1))
+			{
+				Page	page = BufferGetPage(hscan->rs_cbuf);
+
+				LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
+				root_offsets_size = heap_get_root_tuples(page, root_offsets);
+				LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+			}
+
+			Assert(root_offsets_size >= offnum);
+
 			if (!OffsetNumberIsValid(root_offsets[offnum - 1]))
 				ereport(ERROR,
 						(errcode(ERRCODE_DATA_CORRUPTED),
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 256df4de10..3810481df7 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -732,7 +732,9 @@ heap_page_prune_execute(Buffer buffer,
  * root_offsets[k - 1] = j.
  *
  * The passed-in root_offsets array must have MaxHeapTuplesPerPage entries.
- * We zero out all unused entries.
+ * The return value is the size of the root_offsets array. The caller must
+ * never access values that are out of this bound.
+ * We also set special value of (MaxOffsetNumber+1) to unused entries.
  *
  * The function must be called with at least share lock on the buffer, to
  * prevent concurrent prune operations.
@@ -740,12 +742,13 @@ heap_page_prune_execute(Buffer buffer,
  * Note: The information collected here is valid only as long as the caller
  * holds a pin on the buffer. Once pin is released, a tuple might be pruned
  * and reused by a completely unrelated tuple.
+ *
  */
-void
+OffsetNumber
 heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 {
-	OffsetNumber offnum,
-				maxoff;
+	OffsetNumber offnum, maxoff;
+	OffsetNumber last_valid_offnum = 0;
 
 	MemSet(root_offsets, 0, MaxHeapTuplesPerPage * sizeof(OffsetNumber));
 
@@ -759,7 +762,14 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 
 		/* skip unused and dead items */
 		if (!ItemIdIsUsed(lp) || ItemIdIsDead(lp))
+		{
+			/*
+			 * (MaxOffsetNumber+1) is a special value that we use to
+			 * differentiate values that we've skipped.
+			 */
+			root_offsets[offnum - 1] = (MaxOffsetNumber+1);
 			continue;
+		}
 
 		if (ItemIdIsNormal(lp))
 		{
@@ -778,6 +788,8 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 			 * Remember it in the mapping.
 			 */
 			root_offsets[offnum - 1] = offnum;
+			if (offnum > last_valid_offnum)
+				last_valid_offnum = offnum;
 
 			/* If it's not the start of a HOT-chain, we're done with it */
 			if (!HeapTupleHeaderIsHotUpdated(htup))
@@ -820,6 +832,8 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 
 			/* Remember the root line pointer for this item */
 			root_offsets[nextoffnum - 1] = offnum;
+			if (nextoffnum > last_valid_offnum)
+				last_valid_offnum = nextoffnum;
 
 			/* Advance to next chain member, if any */
 			if (!HeapTupleHeaderIsHotUpdated(htup))
@@ -832,4 +846,6 @@ heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
 			priorXmax = HeapTupleHeaderGetUpdateXid(htup);
 		}
 	}
+
+	return last_valid_offnum;
 }
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b31de38910..b7f570887e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -180,7 +180,7 @@ extern void heap_page_prune_execute(Buffer buffer,
 									OffsetNumber *redirected, int nredirected,
 									OffsetNumber *nowdead, int ndead,
 									OffsetNumber *nowunused, int nunused);
-extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
+extern OffsetNumber heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
 
 /* in heap/vacuumlazy.c */
 struct VacuumParams;
