From 63ca62c13fa852c12d52ba0c53d801b7992ecb4b Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sun, 9 Oct 2022 11:33:37 +0200
Subject: [PATCH 3/4] Allow BRIN indexes to produce sorted output

Some BRIN indexes can be used to produce sorted output, by using the
range information to sort tuples incrementally. This is particularly
interesting for LIMIT queries, which only need to scan the first few
rows, and alternative plans (e.g. Seq Scan + Sort) have a very high
startup cost.

Of course, if there are e.g. BTREE indexes this is going to be slower,
but people are unlikely to have both index types on the same column.

This is disabled by default, use enable_brinsort GUC to enable it.
---
 src/backend/access/brin/brin_minmax.c   |  386 ++++++
 src/backend/commands/explain.c          |   44 +
 src/backend/executor/Makefile           |    1 +
 src/backend/executor/execProcnode.c     |   10 +
 src/backend/executor/nodeBrinSort.c     | 1550 +++++++++++++++++++++++
 src/backend/optimizer/path/costsize.c   |  254 ++++
 src/backend/optimizer/path/indxpath.c   |  186 +++
 src/backend/optimizer/path/pathkeys.c   |   50 +
 src/backend/optimizer/plan/createplan.c |  188 +++
 src/backend/optimizer/plan/setrefs.c    |   19 +
 src/backend/optimizer/util/pathnode.c   |   57 +
 src/backend/utils/misc/guc_tables.c     |   10 +
 src/include/access/brin.h               |   35 -
 src/include/access/brin_internal.h      |    1 +
 src/include/catalog/pg_amproc.dat       |   64 +
 src/include/catalog/pg_proc.dat         |    3 +
 src/include/executor/nodeBrinSort.h     |   47 +
 src/include/nodes/execnodes.h           |  103 ++
 src/include/nodes/pathnodes.h           |   11 +
 src/include/nodes/plannodes.h           |   26 +
 src/include/optimizer/cost.h            |    3 +
 src/include/optimizer/pathnode.h        |    9 +
 src/include/optimizer/paths.h           |    3 +
 23 files changed, 3025 insertions(+), 35 deletions(-)
 create mode 100644 src/backend/executor/nodeBrinSort.c
 create mode 100644 src/include/executor/nodeBrinSort.h

diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c
index be1d9b47d5b..9064cd43852 100644
--- a/src/backend/access/brin/brin_minmax.c
+++ b/src/backend/access/brin/brin_minmax.c
@@ -16,6 +16,10 @@
 #include "access/brin_tuple.h"
 #include "access/genam.h"
 #include "access/stratnum.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "catalog/index.h"
+#include "catalog/pg_am.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
@@ -42,6 +46,9 @@ static FmgrInfo *minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
 /* calculate the stats in different ways for cross-checking */
 #define STATS_CROSS_CHECK
 
+/* print info about ranges */
+#define BRINSORT_DEBUG
+
 Datum
 brin_minmax_opcinfo(PG_FUNCTION_ARGS)
 {
@@ -1587,6 +1594,385 @@ cleanup:
 	PG_RETURN_POINTER(stats);
 }
 
+/*
+ * brin_minmax_range_tupdesc
+ *		Create a tuple descriptor to store BrinRange data.
+ */
+static TupleDesc
+brin_minmax_range_tupdesc(BrinDesc *brdesc, AttrNumber attnum)
+{
+	TupleDesc	tupdesc;
+	AttrNumber	attno = 1;
+
+	/* expect minimum and maximum */
+	Assert(brdesc->bd_info[attnum - 1]->oi_nstored == 2);
+
+	tupdesc = CreateTemplateTupleDesc(7);
+
+	/* blkno_start */
+	TupleDescInitEntry(tupdesc, attno++, NULL, INT8OID, -1, 0);
+
+	/* blkno_end (could be calculated as blkno_start + pages_per_range) */
+	TupleDescInitEntry(tupdesc, attno++, NULL, INT8OID, -1, 0);
+
+	/* has_nulls */
+	TupleDescInitEntry(tupdesc, attno++, NULL, BOOLOID, -1, 0);
+
+	/* all_nulls */
+	TupleDescInitEntry(tupdesc, attno++, NULL, BOOLOID, -1, 0);
+
+	/* not_summarized */
+	TupleDescInitEntry(tupdesc, attno++, NULL, BOOLOID, -1, 0);
+
+	/* min_value */
+	TupleDescInitEntry(tupdesc, attno++, NULL,
+					   brdesc->bd_info[attnum - 1]->oi_typcache[0]->type_id,
+								   -1, 0);
+
+	/* max_value */
+	TupleDescInitEntry(tupdesc, attno++, NULL,
+					   brdesc->bd_info[attnum - 1]->oi_typcache[0]->type_id,
+								   -1, 0);
+
+	return tupdesc;
+}
+
+/*
+ * brin_minmax_range_tuple
+ *		Form a minimal tuple representing range info.
+ */
+static MinimalTuple
+brin_minmax_range_tuple(TupleDesc tupdesc,
+						BlockNumber block_start, BlockNumber block_end,
+						bool has_nulls, bool all_nulls, bool not_summarized,
+						Datum min_value, Datum max_value)
+{
+	Datum	values[7];
+	bool	nulls[7];
+
+	memset(nulls, 0, sizeof(nulls));
+
+	values[0] = Int64GetDatum(block_start);
+	values[1] = Int64GetDatum(block_end);
+	values[2] = BoolGetDatum(has_nulls);
+	values[3] = BoolGetDatum(all_nulls);
+	values[4] = BoolGetDatum(not_summarized);
+	values[5] = min_value;
+	values[6] = max_value;
+
+	if (all_nulls || not_summarized)
+	{
+		nulls[5] = true;
+		nulls[6] = true;
+	}
+
+	return heap_form_minimal_tuple(tupdesc, values, nulls);
+}
+
+/*
+ * brin_minmax_scan_init
+ *		Prepare the BrinRangeScanDesc including the sorting info etc.
+ *
+ * We want to have the ranges in roughly this order
+ *
+ * - not-summarized
+ * - summarized, non-null values
+ * - summarized, all-nulls
+ *
+ * We do it this way, because the not-summarized ranges need to be
+ * scanned always (both to produce NULL and non-NULL values), and
+ * we need to read all of them into the tuplesort before producing
+ * anything. So placing them at the beginning is reasonable.
+ *
+ * The all-nulls ranges are placed last, because when processing
+ * NULLs we need to scan everything anyway (some of the ranges might
+ * have has_nulls=true). But for non-NULL values we can abort once
+ * we hit the first all-nulls range.
+ *
+ * The regular ranges are sorted by blkno_start, to make it maybe
+ * a bit more sequential (but this only helps if there are ranges
+ * with the same minval).
+ */
+static BrinRangeScanDesc *
+brin_minmax_scan_init(BrinDesc *bdesc, AttrNumber attnum, bool asc)
+{
+	BrinRangeScanDesc  *scan;
+
+	/* sort by (not_summarized, minval, blkno_start, all_nulls) */
+	AttrNumber			keys[4];
+	Oid					collations[4];
+	bool				nullsFirst[4];
+	Oid					operators[4];
+	Oid					typid;
+	TypeCacheEntry	   *typcache;
+
+	/* we expect to have min/max value for each range, same type for both */
+	Assert(bdesc->bd_info[attnum - 1]->oi_nstored == 2);
+	Assert(bdesc->bd_info[attnum - 1]->oi_typcache[0]->type_id ==
+		   bdesc->bd_info[attnum - 1]->oi_typcache[1]->type_id);
+
+	scan = (BrinRangeScanDesc *) palloc0(sizeof(BrinRangeScanDesc));
+
+	/* build tuple descriptor for range data */
+	scan->tdesc = brin_minmax_range_tupdesc(bdesc, attnum);
+
+	/* initialize ordering info */
+	keys[0] = 5;				/* not_summarized */
+	keys[1] = 4;				/* all_nulls */
+	keys[2] = (asc) ? 6 : 7;	/* min_value (asc) or max_value (desc) */
+	keys[3] = 1;				/* blkno_start */
+
+	collations[0] = InvalidOid;	/* FIXME */
+	collations[1] = InvalidOid;	/* FIXME */
+	collations[2] = InvalidOid;	/* FIXME */
+	collations[3] = InvalidOid;	/* FIXME */
+
+	/* unrelated to the ordering desired by the user */
+	nullsFirst[0] = false;
+	nullsFirst[1] = false;
+	nullsFirst[2] = false;
+	nullsFirst[3] = false;
+
+	/* lookup sort operator for the boolean type (used for not_summarized) */
+	typcache = lookup_type_cache(BOOLOID, TYPECACHE_GT_OPR);
+	operators[0] = typcache->gt_opr;
+
+	/* lookup sort operator for the boolean type (used for all_nulls) */
+	typcache = lookup_type_cache(BOOLOID, TYPECACHE_LT_OPR);
+	operators[1] = typcache->lt_opr;
+
+	/* lookup sort operator for the min/max type */
+	typid = bdesc->bd_info[attnum - 1]->oi_typcache[0]->type_id;
+	typcache = lookup_type_cache(typid, TYPECACHE_LT_OPR | TYPECACHE_GT_OPR);
+	operators[2] = (asc) ? typcache->lt_opr : typcache->gt_opr;
+
+	/* lookup sort operator for the bigint type (used for blkno_start) */
+	typcache = lookup_type_cache(INT8OID, TYPECACHE_LT_OPR);
+	operators[3] = typcache->lt_opr;
+
+	scan->ranges = tuplesort_begin_heap(scan->tdesc,
+										4, /* nkeys */
+										keys,
+										operators,
+										collations,
+										nullsFirst,
+										work_mem,
+										NULL,
+										TUPLESORT_RANDOMACCESS);
+
+	scan->slot = MakeSingleTupleTableSlot(scan->tdesc,
+										  &TTSOpsMinimalTuple);
+
+	return scan;
+}
+
+/*
+ * brin_minmax_scan_add_tuple
+ *		Form and store a tuple representing the BRIN range to the tuplestore.
+ */
+static void
+brin_minmax_scan_add_tuple(BrinRangeScanDesc *scan,
+						  BlockNumber block_start, BlockNumber block_end,
+						  bool has_nulls, bool all_nulls, bool not_summarized,
+						  Datum min_value, Datum max_value)
+{
+	MinimalTuple tup;
+
+	tup = brin_minmax_range_tuple(scan->tdesc, block_start, block_end,
+								  has_nulls, all_nulls, not_summarized,
+								  min_value, max_value);
+
+	ExecStoreMinimalTuple(tup, scan->slot, false);
+
+	tuplesort_puttupleslot(scan->ranges, scan->slot);
+}
+
+#ifdef BRINSORT_DEBUG
+/*
+ * brin_minmax_scan_next
+ *		Return the next BRIN range information from the tuplestore.
+ *
+ * Returns NULL when there are no more ranges.
+ */
+static BrinRange *
+brin_minmax_scan_next(BrinRangeScanDesc *scan)
+{
+	if (tuplesort_gettupleslot(scan->ranges, true, false, scan->slot, NULL))
+	{
+		bool		isnull;
+		BrinRange  *range = (BrinRange *) palloc(sizeof(BrinRange));
+
+		range->blkno_start = slot_getattr(scan->slot, 1, &isnull);
+		range->blkno_end = slot_getattr(scan->slot, 2, &isnull);
+		range->has_nulls = slot_getattr(scan->slot, 3, &isnull);
+		range->all_nulls = slot_getattr(scan->slot, 4, &isnull);
+		range->not_summarized = slot_getattr(scan->slot, 5, &isnull);
+		range->min_value = slot_getattr(scan->slot, 6, &isnull);
+		range->max_value = slot_getattr(scan->slot, 7, &isnull);
+
+		return range;
+	}
+
+	return NULL;
+}
+
+/*
+ * brin_minmax_scan_dump
+ *		Print info about all page ranges stored in the tuplestore.
+ */
+static void
+brin_minmax_scan_dump(BrinRangeScanDesc *scan)
+{
+	BrinRange *range;
+
+	elog(WARNING, "===== dumping =====");
+	while ((range = brin_minmax_scan_next(scan)) != NULL)
+	{
+		elog(WARNING, "[%u %u] has_nulls %d all_nulls %d not_summarized %d values [%f %f]",
+			 range->blkno_start, range->blkno_end,
+			 range->has_nulls, range->all_nulls, range->not_summarized,
+			 DatumGetFloat8(range->min_value), DatumGetFloat8(range->max_value));
+
+		pfree(range);
+	}
+
+	/* reset the tuplestore, so that we can start scanning again */
+	tuplesort_rescan(scan->ranges);
+}
+#endif
+
+static void
+brin_minmax_scan_finalize(BrinRangeScanDesc *scan)
+{
+	tuplesort_performsort(scan->ranges);
+}
+
+/*
+ * brin_minmax_ranges
+ *		Load the BRIN ranges and sort them.
+ */
+Datum
+brin_minmax_ranges(PG_FUNCTION_ARGS)
+{
+	IndexScanDesc	scan = (IndexScanDesc) PG_GETARG_POINTER(0);
+	AttrNumber		attnum = PG_GETARG_INT16(1);
+	bool			asc = PG_GETARG_BOOL(2);
+	BrinOpaque *opaque;
+	Relation	indexRel;
+	Relation	heapRel;
+	BlockNumber nblocks;
+	BlockNumber	heapBlk;
+	Oid			heapOid;
+	BrinMemTuple *dtup;
+	BrinTuple  *btup = NULL;
+	Size		btupsz = 0;
+	Buffer		buf = InvalidBuffer;
+	BlockNumber	pagesPerRange;
+	BrinDesc	   *bdesc;
+	BrinRangeScanDesc *brscan;
+
+	/*
+	 * Determine how many BRIN ranges could there be, allocate space and read
+	 * all the min/max values.
+	 */
+	opaque = (BrinOpaque *) scan->opaque;
+	bdesc = opaque->bo_bdesc;
+	pagesPerRange = opaque->bo_pagesPerRange;
+
+	indexRel = bdesc->bd_index;
+
+	/* make sure the provided attnum is valid */
+	Assert((attnum > 0) && (attnum <= bdesc->bd_tupdesc->natts));
+
+	/*
+	 * We need to know the size of the table so that we know how long to iterate
+	 * on the revmap (and to pre-allocate the arrays).
+	 */
+	heapOid = IndexGetRelation(RelationGetRelid(indexRel), false);
+	heapRel = table_open(heapOid, AccessShareLock);
+	nblocks = RelationGetNumberOfBlocks(heapRel);
+	table_close(heapRel, AccessShareLock);
+
+	/* allocate an initial in-memory tuple, out of the per-range memcxt */
+	dtup = brin_new_memtuple(bdesc);
+
+	/* initialize the scan describing scan of ranges sorted by minval */
+	brscan = brin_minmax_scan_init(bdesc, attnum, asc);
+
+	/*
+	 * Now scan the revmap.  We start by querying for heap page 0,
+	 * incrementing by the number of pages per range; this gives us a full
+	 * view of the table.
+	 */
+	for (heapBlk = 0; heapBlk < nblocks; heapBlk += pagesPerRange)
+	{
+		bool		gottuple = false;
+		BrinTuple  *tup;
+		OffsetNumber off;
+		Size		size;
+
+		CHECK_FOR_INTERRUPTS();
+
+		tup = brinGetTupleForHeapBlock(opaque->bo_rmAccess, heapBlk, &buf,
+									   &off, &size, BUFFER_LOCK_SHARE,
+									   scan->xs_snapshot);
+		if (tup)
+		{
+			gottuple = true;
+			btup = brin_copy_tuple(tup, size, btup, &btupsz);
+			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+		}
+
+		/*
+		 * Ranges with no indexed tuple may contain anything.
+		 */
+		if (!gottuple)
+		{
+			brin_minmax_scan_add_tuple(brscan,
+									   heapBlk, heapBlk + (pagesPerRange - 1),
+									   false, false, true, 0, 0);
+		}
+		else
+		{
+			dtup = brin_deform_tuple(bdesc, btup, dtup);
+			if (dtup->bt_placeholder)
+			{
+				/*
+				 * Placeholder tuples are treated as if not summarized.
+				 *
+				 * XXX Is this correct?
+				 */
+				brin_minmax_scan_add_tuple(brscan,
+										   heapBlk, heapBlk + (pagesPerRange - 1),
+										   false, false, true, 0, 0);
+			}
+			else
+			{
+				BrinValues *bval;
+
+				bval = &dtup->bt_columns[attnum - 1];
+
+				brin_minmax_scan_add_tuple(brscan,
+										   heapBlk, heapBlk + (pagesPerRange - 1),
+										   bval->bv_hasnulls, bval->bv_allnulls, false,
+										   bval->bv_values[0], bval->bv_values[1]);
+			}
+		}
+	}
+
+	if (buf != InvalidBuffer)
+		ReleaseBuffer(buf);
+
+	/* do the sort and any necessary post-processing */
+	brin_minmax_scan_finalize(brscan);
+
+#ifdef BRINSORT_DEBUG
+	brin_minmax_scan_dump(brscan);
+#endif
+
+	PG_RETURN_POINTER(brscan);
+}
+
 /*
  * Cache and return the procedure for the given strategy.
  *
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index f86983c6601..e15b29246b1 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -85,6 +85,8 @@ static void show_sort_keys(SortState *sortstate, List *ancestors,
 						   ExplainState *es);
 static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
 									   List *ancestors, ExplainState *es);
+static void show_brinsort_keys(BrinSortState *sortstate, List *ancestors,
+							   ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 								   ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
@@ -1100,6 +1102,7 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used)
 		case T_IndexScan:
 		case T_IndexOnlyScan:
 		case T_BitmapHeapScan:
+		case T_BrinSort:
 		case T_TidScan:
 		case T_TidRangeScan:
 		case T_SubqueryScan:
@@ -1262,6 +1265,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_IndexOnlyScan:
 			pname = sname = "Index Only Scan";
 			break;
+		case T_BrinSort:
+			pname = sname = "BRIN Sort";
+			break;
 		case T_BitmapIndexScan:
 			pname = sname = "Bitmap Index Scan";
 			break;
@@ -1508,6 +1514,16 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				ExplainScanTarget((Scan *) indexonlyscan, es);
 			}
 			break;
+		case T_BrinSort:
+			{
+				BrinSort  *brinsort = (BrinSort *) plan;
+
+				ExplainIndexScanDetails(brinsort->indexid,
+										brinsort->indexorderdir,
+										es);
+				ExplainScanTarget((Scan *) brinsort, es);
+			}
+			break;
 		case T_BitmapIndexScan:
 			{
 				BitmapIndexScan *bitmapindexscan = (BitmapIndexScan *) plan;
@@ -1790,6 +1806,18 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				ExplainPropertyFloat("Heap Fetches", NULL,
 									 planstate->instrument->ntuples2, 0, es);
 			break;
+		case T_BrinSort:
+			show_scan_qual(((BrinSort *) plan)->indexqualorig,
+						   "Index Cond", planstate, ancestors, es);
+			if (((BrinSort *) plan)->indexqualorig)
+				show_instrumentation_count("Rows Removed by Index Recheck", 2,
+										   planstate, es);
+			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
+			show_brinsort_keys(castNode(BrinSortState, planstate), ancestors, es);
+			if (plan->qual)
+				show_instrumentation_count("Rows Removed by Filter", 1,
+										   planstate, es);
+			break;
 		case T_BitmapIndexScan:
 			show_scan_qual(((BitmapIndexScan *) plan)->indexqualorig,
 						   "Index Cond", planstate, ancestors, es);
@@ -2389,6 +2417,21 @@ show_incremental_sort_keys(IncrementalSortState *incrsortstate,
 						 ancestors, es);
 }
 
+/*
+ * Show the sort keys for a BRIN Sort node.
+ */
+static void
+show_brinsort_keys(BrinSortState *sortstate, List *ancestors, ExplainState *es)
+{
+	BrinSort	   *plan = (BrinSort *) sortstate->ss.ps.plan;
+
+	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
+						 plan->numCols, 0, plan->sortColIdx,
+						 plan->sortOperators, plan->collations,
+						 plan->nullsFirst,
+						 ancestors, es);
+}
+
 /*
  * Likewise, for a MergeAppend node.
  */
@@ -3812,6 +3855,7 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es)
 		case T_ForeignScan:
 		case T_CustomScan:
 		case T_ModifyTable:
+		case T_BrinSort:
 			/* Assert it's on a real relation */
 			Assert(rte->rtekind == RTE_RELATION);
 			objectname = get_rel_name(rte->relid);
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 11118d0ce02..bcaa2ce8e21 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -38,6 +38,7 @@ OBJS = \
 	nodeBitmapHeapscan.o \
 	nodeBitmapIndexscan.o \
 	nodeBitmapOr.o \
+	nodeBrinSort.o \
 	nodeCtescan.o \
 	nodeCustom.o \
 	nodeForeignscan.o \
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 36406c3af57..4a6dc3f263c 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -79,6 +79,7 @@
 #include "executor/nodeBitmapHeapscan.h"
 #include "executor/nodeBitmapIndexscan.h"
 #include "executor/nodeBitmapOr.h"
+#include "executor/nodeBrinSort.h"
 #include "executor/nodeCtescan.h"
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
@@ -226,6 +227,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 														 estate, eflags);
 			break;
 
+		case T_BrinSort:
+			result = (PlanState *) ExecInitBrinSort((BrinSort *) node,
+													estate, eflags);
+			break;
+
 		case T_BitmapIndexScan:
 			result = (PlanState *) ExecInitBitmapIndexScan((BitmapIndexScan *) node,
 														   estate, eflags);
@@ -639,6 +645,10 @@ ExecEndNode(PlanState *node)
 			ExecEndIndexOnlyScan((IndexOnlyScanState *) node);
 			break;
 
+		case T_BrinSortState:
+			ExecEndBrinSort((BrinSortState *) node);
+			break;
+
 		case T_BitmapIndexScanState:
 			ExecEndBitmapIndexScan((BitmapIndexScanState *) node);
 			break;
diff --git a/src/backend/executor/nodeBrinSort.c b/src/backend/executor/nodeBrinSort.c
new file mode 100644
index 00000000000..ca72c1ed22d
--- /dev/null
+++ b/src/backend/executor/nodeBrinSort.c
@@ -0,0 +1,1550 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeBrinSort.c
+ *	  Routines to support sorted scan of relations using a BRIN index
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * The overall algorithm is roughly this:
+ *
+ * 0) initialize a tuplestore and a tuplesort
+ *
+ * 1) fetch list of page ranges from the BRIN index, sorted by minval
+ *    (with the not-summarized ranges first, and all-null ranges last)
+ *
+ * 2) for NULLS FIRST ordering, walk all ranges that may contain NULL
+ *    values and output them (and return to the beginning of the list)
+ *
+ * 3) while there are ranges in the list, do this:
+ *
+ *   a) get next (distinct) minval from the list, call it watermark
+ *
+ *   b) if there are any tuples in the tuplestore, move them to tuplesort
+ *
+ *   c) process all ranges with (minval < watermark) - read tuples and feed
+ *      them either into tuplestore (when value < watermark) or tuplestore
+ *
+ *   d) sort the tuplestore, output all the tuples
+ *
+ * 4) if some tuples remain in the tuplestore, sort and output them
+ *
+ * 5) for NULLS LAST ordering, walk all ranges that may contain NULL
+ *    values and output them (and return to the beginning of the list)
+ *
+ *
+ * For DESC orderings the process is almost the same, except that we look
+ * at maxval and use '>' operator (but that's transparent).
+ *
+ * There's a couple possible things that might be done in different ways:
+ *
+ * 1) Not using tuplestore, and feeding tuples only to a tuplesort. Then
+ * while producing the tuples, we'd only output tuples up to the current
+ * watermark, and then we'd keep the remaining tuples for the next round.
+ * Either we'd need to transfer them into a second tuplesort, or allow
+ * "reopening" the tuplesort and adding more tuples. And then only the
+ * part since the watermark would get sorted (possibly using a merge-sort
+ * with the already sorted part).
+ *
+ *
+ * 2) The other question is what to do with NULL values - at the moment we
+ * just read the ranges, output the NULL tuples and that's it - we're not
+ * retaining any non-NULL tuples, so that we'll read the ranges again in
+ * the second range. The logic here is that either there are very few
+ * such ranges, so it's won't cost much to just re-read them. Or maybe
+ * there are very many such ranges, and we'd do a lot of spilling to the
+ * tuplestore, and it's not much more expensive to just re-read the source
+ * data. There are counter-examples, though - e.g., there might be many
+ * has_nulls ranges, but with very few non-NULL tuples. In this case it
+ * might be better to actually spill the tuples instead of re-reading all
+ * the ranges. Maybe this is something we can do at run-time, or maybe we
+ * could estimate this at planning time. We do know the null_frac for the
+ * column, so we know the number of NULL rows. And we also know the number
+ * of all_nulls and has_nulls ranges. We can estimate the number of rows
+ * per range, and we can estimate how many non-NULL rows are in the
+ * has_nulls ranges (we don't need to re-read all-nulls ranges). There's
+ * also the filter, which may reduce the amount of rows to store.
+ *
+ * So we'd need to compare two metrics calculated roughly like this:
+ *
+ *   cost(re-reading has-nulls ranges)
+ *      = cost(random_page_cost * n_has_nulls + seq_page_cost * pages_per_range)
+ *
+ *   cost(spilling non-NULL rows from has-nulls ranges)
+ *      = cost(numrows * width / BLCKSZ * seq_page_cost * 2)
+ *
+ * where numrows is the number of non-NULL rows in has_null ranges, which
+ * can be calculated like this:
+ *
+ *   // estimated number of rows in has-null ranges
+ *   rows_in_has_nulls = (reltuples / relpages) * pages_per_range * n_has_nulls
+ *
+ *   // number of NULL rows in the has-nulls ranges
+ *   nulls_in_ranges = reltuples * null_frac - n_all_nulls * (reltuples / relpages)
+ *
+ *   // numrows is the difference, multiplied by selectivity of the index
+ *   // filter condition (value between 0.0 and 1.0)
+ *   numrows = (rows_in_has_nulls - nulls_in_ranges) * selectivity
+ *
+ * This ignores non-summarized ranges, but there should be only very few of
+ * those, so it should not make a huge difference. Otherwise we can divide
+ * them between regular, has-nulls and all-nulls pages to keep the ratio.
+ *
+ *
+ * 3) How large step to make when updating the watermark?
+ *
+ * When updating the watermark, one option is to simply proceed to the next
+ * distinct minval value, which is the smallest possible step we can make.
+ * This may be both fine and very inefficient, depending on how many rows
+ * end up in the tuplesort and how many rows we end up spilling (possibly
+ * repeatedly to the tuplestore).
+ *
+ * When having to sort large number of rows, it's inefficient to run many
+ * tiny sorts, even if it produces correct result. For example when sorting
+ * 1M rows, we may split this as either (a) 100000x sorts of 10 rows, or
+ * (b) 1000 sorts of 1000 rows. The (b) option is almost certainly more
+ * efficient. Maybe sorts of 10k rows would be even better, if it fits
+ * into work_mem.
+ *
+ * This gets back to how large the page ranges are, and if/how much they
+ * overlap. With tiny ranges (e.g. a single-page ranges), a single range
+ * can only add as many rows as we can fit on a single page. So we need
+ * more ranges by default - how many watermark steps that is depends on
+ * how many distinct minval values are there ...
+ *
+ * Then there's overlaps - if ranges do not overlap, we're done and we'll
+ * add the whole range because the next watermark is above maxval. But
+ * when the ranges overlap, we'll only add the first part (assuming the
+ * minval of the next range is the watermark). Assume 10 overlapping
+ * ranges - imagine for example ranges shifted by 10%, so something like
+ *
+ *   [0,100] [10,110], [20,120], [30, 130], ..., [90, 190]
+ *
+ * In the first step we use watermark=10 and load the first range, with
+ * maybe 1000 rows in total. But assuming uniform distribution, only about
+ * 100 rows will go into the tuplesort, the remaining 900 rows will go into
+ * the tuplestore (assuming uniform distribution). Then in the second step
+ * we sort another 100 rows and the remaining 800 rows will be moved into
+ * a new tuplestore. And so on and so on.
+ *
+ * This means that incrementing the watermarks by single steps may be
+ * quite inefficient, and we need to reflect both the range size and
+ * how much the ranges overlap.
+ *
+ * In fact, maybe we should not determine the step as number of minval
+ * values to skip, but how many ranges would that mean reading. Because
+ * if we have a minval with many duplicates, that may load many rows.
+ * Or even better, we could look at how many rows would that mean loading
+ * into the tuplestore - if we track P(x<minval) for each range (e.g. by
+ * calculating average value during ANALYZE, or perhaps by estimating
+ * it from per-column stats), then we know the increment is going to be
+ * about
+ *
+ *     P(x < minval[i]) - P(x < minval[i-1])
+ *
+ * and we can stop once we'd exceed work_mem (with some slack). See comment
+ * for brin_minmax_stats() for more thoughts.
+ *
+ *
+ * 4) LIMIT/OFFSET vs. full sort
+ *
+ * There's one case where very small sorts may be actually optimal, and
+ * that's queries that need to process only very few rows - say, LIMIT
+ * queries with very small bound.
+ *
+ *
+ * FIXME Projection does not work (fails on projection slot expecting
+ * buffer ops, but we're sending it minimal tuple slot).
+ *
+ * FIXME The tlists are not wired quite correctly - the sortColIdx is an
+ * index to the tlist, but we need attnum from the heap table, so that we
+ * can fetch the attribute etc. Or maybe fetching the value from the raw
+ * tuple (before projection) is wrong and needs to be done differently.
+ *
+ * FIXME Indexes on expressions don't work (possibly related to the tlist
+ * being done incorrectly).
+ *
+ * FIXME handling of other brin opclasses (minmax-multi)
+ *
+ * FIXME improve costing
+ *
+ *
+ * Improvement ideas:
+ *
+ * 1) multiple tuplestores for overlapping ranges
+ *
+ * When there are many overlapping ranges (so that maxval > current.maxval),
+ * we're loading all the "future" tuples into a new tuplestore. However, if
+ * there are multiple such ranges (imagine ranges "shifting" by 10%, which
+ * gives us 9 more ranges), we know in the next round we'll only need rows
+ * until the next maxval. We'll not sort these rows, but we'll still shuffle
+ * them around until we get to the proper range (so about 10x each row).
+ * Maybe we should pre-allocate the tuplestores (or maybe even tuplesorts)
+ * for future ranges, and route the tuples to the correct one? Maybe we
+ * could be a bit smarter and discard tuples once we have enough rows for
+ * the preceding ranges (say, with LIMIT queries). We'd also need to worry
+ * about work_mem, though - we can't just use many tuplestores, each with
+ * whole work_mem. So we'd probably use e.g. work_mem/2 for the next one,
+ * and then /4, /8 etc. for the following ones. That's work_mem in total.
+ * And there'd need to be some limit on number of tuplestores, I guess.
+ *
+ * 2) handling NULL values
+ *
+ * We need to handle NULLS FIRST / NULLS LAST cases. The question is how
+ * to do that - the easiest way is to simply do a separate scan of ranges
+ * that might contain NULL values, processing just rows with NULLs, and
+ * discarding other rows. And then process non-NULL values as currently.
+ * The NULL scan would happen before/after this regular phase.
+ *
+ * Byt maybe we could be smarter, and not do separate scans. When reading
+ * a page, we might stash the tuple in a tuplestore, so that we can read
+ * it the next round. Obviously, this might be expensive if we need to
+ * keep too many rows, so the tuplestore would grow too large - in that
+ * case it might be better to just do the two scans.
+ *
+ * 3) parallelism
+ *
+ * Presumably we could do a parallel version of this. The leader or first
+ * worker would prepare the range information, and the workers would then
+ * grab ranges (in a kinda round robin manner), sort them independently,
+ * and then the results would be merged by Gather Merge.
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/nodeBrinSort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ * INTERFACE ROUTINES
+ *		ExecBrinSort			scans a relation using an index
+ *		IndexNext				retrieve next tuple using index
+ *		ExecInitBrinSort		creates and initializes state info.
+ *		ExecReScanBrinSort		rescans the indexed relation.
+ *		ExecEndBrinSort			releases all storage.
+ *		ExecBrinSortMarkPos		marks scan position.
+ *		ExecBrinSortRestrPos	restores scan position.
+ *		ExecBrinSortEstimate	estimates DSM space needed for parallel index scan
+ *		ExecBrinSortInitializeDSM initialize DSM for parallel BrinSort
+ *		ExecBrinSortReInitializeDSM reinitialize DSM for fresh scan
+ *		ExecBrinSortInitializeWorker attach to DSM info in parallel worker
+ */
+#include "postgres.h"
+
+#include "access/brin.h"
+#include "access/brin_internal.h"
+#include "access/nbtree.h"
+#include "access/relscan.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "catalog/index.h"
+#include "catalog/pg_am.h"
+#include "executor/execdebug.h"
+#include "executor/nodeBrinSort.h"
+#include "lib/pairingheap.h"
+#include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
+#include "utils/array.h"
+#include "utils/datum.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+
+static TupleTableSlot *IndexNext(BrinSortState *node);
+static bool IndexRecheck(BrinSortState *node, TupleTableSlot *slot);
+static void ExecInitBrinSortRanges(BrinSort *node, BrinSortState *planstate);
+
+#define BRINSORT_DEBUG
+
+/* do various consistency checks */
+static void
+AssertCheckRanges(BrinSortState *node)
+{
+#ifdef USE_ASSERT_CHECKING
+
+#endif
+}
+
+/*
+ * brinsort_start_tidscan
+ *		Start scanning tuples from a given page range.
+ *
+ * We open a TID range scan for the given range, and initialize the tuplesort.
+ * Optionally, we update the watermark (with either high/low value). We only
+ * need to do this for the main page range, not for the intersecting ranges.
+ *
+ * XXX Maybe we should initialize the tidscan only once, and then do rescan
+ * for the following ranges? And similarly for the tuplesort?
+ */
+static void
+brinsort_start_tidscan(BrinSortState *node)
+{
+	BrinSort   *plan = (BrinSort *) node->ss.ps.plan;
+	EState	   *estate = node->ss.ps.state;
+	BrinRange  *range = node->bs_range;
+
+	/* There must not be any TID scan in progress yet. */
+	Assert(node->ss.ss_currentScanDesc == NULL);
+
+	/* Initialize the TID range scan, for the provided block range. */
+	if (node->ss.ss_currentScanDesc == NULL)
+	{
+		TableScanDesc		tscandesc;
+		ItemPointerData		mintid,
+							maxtid;
+
+		ItemPointerSetBlockNumber(&mintid, range->blkno_start);
+		ItemPointerSetOffsetNumber(&mintid, 0);
+
+		ItemPointerSetBlockNumber(&maxtid, range->blkno_end);
+		ItemPointerSetOffsetNumber(&maxtid, MaxHeapTuplesPerPage);
+
+		elog(DEBUG1, "loading range blocks [%u, %u]",
+			 range->blkno_start, range->blkno_end);
+
+		tscandesc = table_beginscan_tidrange(node->ss.ss_currentRelation,
+											 estate->es_snapshot,
+											 &mintid, &maxtid);
+		node->ss.ss_currentScanDesc = tscandesc;
+	}
+
+	if (node->bs_tuplesortstate == NULL)
+	{
+		TupleDesc	tupDesc = RelationGetDescr(node->ss.ss_currentRelation);
+
+		node->bs_tuplesortstate = tuplesort_begin_heap(tupDesc,
+													plan->numCols,
+													plan->sortColIdx,
+													plan->sortOperators,
+													plan->collations,
+													plan->nullsFirst,
+													work_mem,
+													NULL,
+													TUPLESORT_NONE);
+	}
+
+	if (node->bs_tuplestore == NULL)
+	{
+		node->bs_tuplestore = tuplestore_begin_heap(false, false, work_mem);
+	}
+}
+
+/*
+ * brinsort_end_tidscan
+ *		Finish the TID range scan.
+ */
+static void
+brinsort_end_tidscan(BrinSortState *node)
+{
+	/* get the first range, read all tuples using a tid range scan */
+	if (node->ss.ss_currentScanDesc != NULL)
+	{
+		table_endscan(node->ss.ss_currentScanDesc);
+		node->ss.ss_currentScanDesc = NULL;
+	}
+}
+
+/*
+ * brinsort_update_watermark
+ *		Advance the watermark to the next minval (or maxval for DESC).
+ *
+ * We could could actually advance the watermark by multiple steps (not to
+ * the immediately following minval, but a couple more), to accumulate more
+ * rows in the tuplesort. The number of steps we make correlates with the
+ * amount of data we sort in a given step, but we don't know in advance
+ * how many rows (or bytes) will that actually be. We could do some simple
+ * heuristics (measure past sorts and extrapolate).
+ */
+static void
+brinsort_update_watermark(BrinSortState *node, bool asc)
+{
+	int		cmp;
+	bool	found = false;
+
+	tuplesort_markpos(node->bs_scan->ranges);
+
+	while (tuplesort_gettupleslot(node->bs_scan->ranges, true, false, node->bs_scan->slot, NULL))
+	{
+		bool	isnull;
+		Datum	value;
+		bool	all_nulls;
+		bool	not_summarized;
+
+		all_nulls = DatumGetBool(slot_getattr(node->bs_scan->slot, 4, &isnull));
+		Assert(!isnull);
+
+		not_summarized = DatumGetBool(slot_getattr(node->bs_scan->slot, 5, &isnull));
+		Assert(!isnull);
+
+		/* we ignore ranges that are either all_nulls or not summarized */
+		if (all_nulls || not_summarized)
+			continue;
+
+		/* use either minval or maxval, depending on the ASC / DESC */
+		if (asc)
+			value = slot_getattr(node->bs_scan->slot, 6, &isnull);
+		else
+			value = slot_getattr(node->bs_scan->slot, 7, &isnull);
+
+		if (!node->bs_watermark_set)
+		{
+			node->bs_watermark_set = true;
+			node->bs_watermark = value;
+			continue;
+		}
+
+		cmp = ApplySortComparator(node->bs_watermark, false, value, false,
+								  &node->bs_sortsupport);
+
+		if (cmp < 0)
+		{
+			node->bs_watermark_set = true;
+			node->bs_watermark = value;
+			found = true;
+			break;
+		}
+	}
+
+	tuplesort_restorepos(node->bs_scan->ranges);
+
+	node->bs_watermark_set = found;
+}
+
+/*
+ * brinsort_load_tuples
+ *		Load tuples from the TID range scan, add them to tuplesort/store.
+ *
+ * When called for the "current" range, we don't need to check the watermark,
+ * we know the tuple goes into the tuplesort. So with check_watermark we
+ * skip the comparator call to save CPU cost.
+ */
+static void
+brinsort_load_tuples(BrinSortState *node, bool check_watermark, bool null_processing)
+{
+	BrinSort	   *plan = (BrinSort *) node->ss.ps.plan;
+	TableScanDesc	scan;
+	EState		   *estate;
+	ScanDirection	direction;
+	TupleTableSlot *slot;
+	BrinRange	   *range = node->bs_range;
+
+	estate = node->ss.ps.state;
+	direction = estate->es_direction;
+
+	slot = node->ss.ss_ScanTupleSlot;
+
+	Assert(node->bs_range != NULL);
+
+	/*
+	 * If we're not processign NULLS, and this is all-nulls range, we can
+	 * just skip it - we won't find any non-NULL tuples in it.
+	 *
+	 * XXX Shouldn't happen, thanks to logic in brinsort_next_range().
+	 */
+	if (!null_processing && range->all_nulls)
+		return;
+
+	/*
+	 * Similarly, if we're processing NULLs and this range does not have
+	 * has_nulls flag, we can skip it.
+	 *
+	 * XXX Shouldn't happen, thanks to logic in brinsort_next_range().
+	 */
+	if (null_processing && !(range->has_nulls || range->not_summarized || range->all_nulls))
+		return;
+
+	brinsort_start_tidscan(node);
+
+	scan = node->ss.ss_currentScanDesc;
+
+	/*
+	 * Read tuples, evaluate the filer (so that we don't keep tuples only to
+	 * discard them later), and decide if it goes into the current range
+	 * (tuplesort) or overflow (tuplestore).
+	 */
+	while (table_scan_getnextslot_tidrange(scan, direction, slot))
+	{
+		ExprContext *econtext;
+		ExprState  *qual;
+
+		/*
+		 * Fetch data from node
+		 */
+		qual = node->bs_qual;
+		econtext = node->ss.ps.ps_ExprContext;
+
+		/*
+		 * place the current tuple into the expr context
+		 */
+		econtext->ecxt_scantuple = slot;
+
+		/*
+		 * check that the current tuple satisfies the qual-clause
+		 *
+		 * check for non-null qual here to avoid a function call to ExecQual()
+		 * when the qual is null ... saves only a few cycles, but they add up
+		 * ...
+		 *
+		 * XXX Done here, because in ExecScan we'll get different slot type
+		 * (minimal tuple vs. buffered tuple). Scan expects slot while reading
+		 * from the table (like here), but we're stashing it into a tuplesort.
+		 *
+		 * XXX Maybe we could eliminate many tuples by leveraging the BRIN
+		 * range, by executing the consistent function. But we don't have
+		 * the qual in appropriate format at the moment, so we'd preprocess
+		 * the keys similarly to bringetbitmap(). In which case we should
+		 * probably evaluate the stuff while building the ranges? Although,
+		 * if the "consistent" function is expensive, it might be cheaper
+		 * to do that incrementally, as we need the ranges. Would be a win
+		 * for LIMIT queries, for example.
+		 *
+		 * XXX However, maybe we could also leverage other bitmap indexes,
+		 * particularly for BRIN indexes because that makes it simpler to
+		 * eliminage the ranges incrementally - we know which ranges to
+		 * load from the index, while for other indexes (e.g. btree) we
+		 * have to read the whole index and build a bitmap in order to have
+		 * a bitmap for any range. Although, if the condition is very
+		 * selective, we may need to read only a small fraction of the
+		 * index, so maybe that's OK.
+		 */
+		if (qual == NULL || ExecQual(qual, econtext))
+		{
+			int		cmp = 0;	/* matters for check_watermark=false */
+			Datum	value;
+			bool	isnull;
+
+			value = slot_getattr(slot, plan->sortColIdx[0], &isnull);
+
+			/*
+			 * FIXME Not handling NULLS for now, we need to stash them into
+			 * a separate tuplestore (so that we can output them first or
+			 * last), and then skip them in the regular processing?
+			 */
+			if (null_processing)
+			{
+				/* Stash it to the tuplestore (when NULL, or ignore
+				 * it (when not-NULL). */
+				if (isnull)
+					tuplestore_puttupleslot(node->bs_tuplestore, slot);
+
+				/* NULL or not, we're done */
+				continue;
+			}
+
+			/* we're not processing NULL values, so ignore NULLs */
+			if (isnull)
+				continue;
+
+			/*
+			 * Otherwise compare to watermark, and stash it either to the
+			 * tuplesort or tuplestore.
+			 */
+			if (check_watermark && node->bs_watermark_set)
+				cmp = ApplySortComparator(value, false,
+										  node->bs_watermark, false,
+										  &node->bs_sortsupport);
+
+			if (cmp <= 0)
+				tuplesort_puttupleslot(node->bs_tuplesortstate, slot);
+			else
+				tuplestore_puttupleslot(node->bs_tuplestore, slot);
+		}
+
+		ExecClearTuple(slot);
+	}
+
+	ExecClearTuple(slot);
+
+	brinsort_end_tidscan(node);
+}
+
+/*
+ * brinsort_load_spill_tuples
+ *		Load tuples from the spill tuplestore, and either stash them into
+ *		a tuplesort or a new tuplestore.
+ *
+ * After processing the last range, we want to process all remaining ranges,
+ * so with check_watermark=false we skip the check.
+ */
+static void
+brinsort_load_spill_tuples(BrinSortState *node, bool check_watermark)
+{
+	BrinSort   *plan = (BrinSort *) node->ss.ps.plan;
+	Tuplestorestate *tupstore;
+	TupleTableSlot *slot;
+
+	if (node->bs_tuplestore == NULL)
+		return;
+
+	/* start scanning the existing tuplestore (XXX needed?) */
+	tuplestore_rescan(node->bs_tuplestore);
+
+	/*
+	 * Create a new tuplestore, for tuples that exceed the watermark and so
+	 * should not be included in the current sort.
+	 */
+	tupstore = tuplestore_begin_heap(false, false, work_mem);
+
+	/*
+	 * We need a slot for minimal tuples. The scan slot uses buffered tuples,
+	 * so it'd trigger an error in the loop.
+	 */
+	slot = MakeSingleTupleTableSlot(RelationGetDescr(node->ss.ss_currentRelation),
+									&TTSOpsMinimalTuple);
+
+	while (tuplestore_gettupleslot(node->bs_tuplestore, true, true, slot))
+	{
+		int		cmp = 0;	/* matters for check_watermark=false */
+		bool	isnull;
+		Datum	value;
+
+		value = slot_getattr(slot, plan->sortColIdx[0], &isnull);
+
+		/* We shouldn't have NULL values in the spill, at least not now. */
+		Assert(!isnull);
+
+		if (check_watermark && node->bs_watermark_set)
+			cmp = ApplySortComparator(value, false,
+									  node->bs_watermark, false,
+									  &node->bs_sortsupport);
+
+		if (cmp <= 0)
+			tuplesort_puttupleslot(node->bs_tuplesortstate, slot);
+		else
+			tuplestore_puttupleslot(tupstore, slot);
+	}
+
+	/*
+	 * Discard the existing tuplestore (that we just processed), use the new
+	 * one instead.
+	 */
+	tuplestore_end(node->bs_tuplestore);
+	node->bs_tuplestore = tupstore;
+
+	ExecDropSingleTupleTableSlot(slot);
+}
+
+static bool
+brinsort_next_range(BrinSortState *node, bool asc)
+{
+	/* FIXME free the current bs_range, if any */
+	node->bs_range = NULL;
+
+	/*
+	 * Mark the position, so that we can restore it in case we reach the
+	 * current watermark.
+	 */
+	tuplesort_markpos(node->bs_scan->ranges);
+
+	/*
+	 * Get the next range and return it, unless we can prove it's the last
+	 * range that can possibly match the current conditon (thanks to how we
+	 * order the ranges).
+	 *
+	 * Also skip ranges that can't possibly match (e.g. because we are in
+	 * NULL processing, and the range has no NULLs).
+	 */
+	while (tuplesort_gettupleslot(node->bs_scan->ranges, true, false, node->bs_scan->slot, NULL))
+	{
+		bool		isnull;
+		Datum		value;
+
+		BrinRange  *range = (BrinRange *) palloc(sizeof(BrinRange));
+
+		range->blkno_start = slot_getattr(node->bs_scan->slot, 1, &isnull);
+		range->blkno_end = slot_getattr(node->bs_scan->slot, 2, &isnull);
+		range->has_nulls = slot_getattr(node->bs_scan->slot, 3, &isnull);
+		range->all_nulls = slot_getattr(node->bs_scan->slot, 4, &isnull);
+		range->not_summarized = slot_getattr(node->bs_scan->slot, 5, &isnull);
+		range->min_value = slot_getattr(node->bs_scan->slot, 6, &isnull);
+		range->max_value = slot_getattr(node->bs_scan->slot, 7, &isnull);
+
+		/*
+		 * Not-summarized ranges match irrespectedly of the watermark (if
+		 * it's set at all).
+		 */
+		if (range->not_summarized)
+		{
+			node->bs_range = range;
+			return true;
+		}
+
+		/*
+		 * The range is summarized, but maybe the watermark is not? That
+		 * would mean we're processing NULL values, so we skip ranges that
+		 * can't possibly match (i.e. with all_nulls=has_nulls=false).
+		 */
+		if (!node->bs_watermark_set)
+		{
+			if (range->all_nulls || range->has_nulls)
+			{
+				node->bs_range = range;
+				return true;
+			}
+
+			/* update the position and try the next range */
+			tuplesort_markpos(node->bs_scan->ranges);
+			pfree(range);
+
+			continue;
+		}
+
+		/*
+		 * So now we have a summarized range, and we know the watermark
+		 * is set too (so we're not processing NULLs). We place the ranges
+		 * with only nulls last, so once we hit one we're done.
+		 */
+		if (range->all_nulls)
+		{
+			pfree(range);
+			return false;	/* no more matching ranges */
+		}
+
+		/*
+		 * Compare the range to the watermark, using either the minval or
+		 * maxval, depending on ASC/DESC ordering. If the range precedes the
+		 * watermark, return it. Otherwise abort, all the future ranges are
+		 * either not matching the watermark (thanks to ordering) or contain
+		 * only NULL values.
+		 */
+
+		/* use minval or maxval, depending on ASC / DESC */
+		value = (asc) ? range->min_value : range->max_value;
+
+		/*
+		 * compare it to the current watermark (if set)
+		 *
+		 * XXX We don't use (... <= 0) here, because then we'd load ranges
+		 * with that minval (and there might be multiple), but most of the
+		 * rows would go into the tuplestore, because only rows matching the
+		 * minval exactly would be loaded into tuplesort.
+		 */
+		if (ApplySortComparator(value, false,
+								 node->bs_watermark, false,
+								 &node->bs_sortsupport) < 0)
+		{
+			node->bs_range = range;
+			return true;
+		}
+
+		pfree(range);
+		break;
+	}
+
+	/* not a matching range, we're done */
+	tuplesort_restorepos(node->bs_scan->ranges);
+
+	return false;
+}
+
+static bool
+brinsort_range_with_nulls(BrinSortState *node)
+{
+	BrinRange *range = node->bs_range;
+
+	if (range->all_nulls || range->has_nulls || range->not_summarized)
+		return true;
+
+	return false;
+}
+
+static void
+brinsort_rescan(BrinSortState *node)
+{
+	tuplesort_rescan(node->bs_scan->ranges);
+}
+
+/* ----------------------------------------------------------------
+ *		IndexNext
+ *
+ *		Retrieve a tuple from the BrinSort node's currentRelation
+ *		using the index specified in the BrinSortState information.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+IndexNext(BrinSortState *node)
+{
+	BrinSort   *plan = (BrinSort *) node->ss.ps.plan;
+	EState	   *estate;
+	ScanDirection direction;
+	IndexScanDesc scandesc;
+	TupleTableSlot *slot;
+	bool		nullsFirst;
+	bool		asc;
+
+	/*
+	 * extract necessary information from index scan node
+	 */
+	estate = node->ss.ps.state;
+	direction = estate->es_direction;
+
+	/* flip direction if this is an overall backward scan */
+	/* XXX For BRIN indexes this is always forward direction */
+	// if (ScanDirectionIsBackward(((BrinSort *) node->ss.ps.plan)->indexorderdir))
+	if (false)
+	{
+		if (ScanDirectionIsForward(direction))
+			direction = BackwardScanDirection;
+		else if (ScanDirectionIsBackward(direction))
+			direction = ForwardScanDirection;
+	}
+	scandesc = node->iss_ScanDesc;
+	slot = node->ss.ss_ScanTupleSlot;
+
+	nullsFirst = plan->nullsFirst[0];
+	asc = ScanDirectionIsForward(plan->indexorderdir);
+
+	if (scandesc == NULL)
+	{
+		/*
+		 * We reach here if the index scan is not parallel, or if we're
+		 * serially executing an index scan that was planned to be parallel.
+		 */
+		scandesc = index_beginscan(node->ss.ss_currentRelation,
+								   node->iss_RelationDesc,
+								   estate->es_snapshot,
+								   node->iss_NumScanKeys,
+								   node->iss_NumOrderByKeys);
+
+		node->iss_ScanDesc = scandesc;
+
+		/*
+		 * If no run-time keys to calculate or they are ready, go ahead and
+		 * pass the scankeys to the index AM.
+		 */
+		if (node->iss_NumRuntimeKeys == 0 || node->iss_RuntimeKeysReady)
+			index_rescan(scandesc,
+						 node->iss_ScanKeys, node->iss_NumScanKeys,
+						 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
+
+		/*
+		 * Load info about BRIN ranges, sort them to match the desired ordering.
+		 */
+		ExecInitBrinSortRanges(plan, node);
+		node->bs_phase = BRINSORT_START;
+	}
+
+	/*
+	 * ok, now that we have what we need, fetch the next tuple.
+	 */
+	while (node->bs_phase != BRINSORT_FINISHED)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		elog(DEBUG1, "phase = %d", node->bs_phase);
+
+		AssertCheckRanges(node);
+
+		switch (node->bs_phase)
+		{
+			case BRINSORT_START:
+
+				elog(DEBUG1, "phase = START");
+
+				/*
+				 * If we have NULLS FIRST, move to that stage. Otherwise
+				 * start scanning regular ranges.
+				 */
+				if (nullsFirst)
+					node->bs_phase = BRINSORT_LOAD_NULLS;
+				else
+				{
+					node->bs_phase = BRINSORT_LOAD_RANGE;
+
+					/* set the first watermark */
+					brinsort_update_watermark(node, asc);
+				}
+
+				break;
+
+			case BRINSORT_LOAD_RANGE:
+				{
+					elog(DEBUG1, "phase = LOAD_RANGE");
+
+					/*
+					 * Load tuples matching the new watermark from the existing
+					 * spill tuplestore. We do this before loading tuples from
+					 * the next chunk of ranges, because those will add tuples
+					 * to the spill, and we'd end up processing those twice.
+					 */
+					brinsort_load_spill_tuples(node, true);
+
+					/*
+					 * Load tuples from ranges, until we find a range that has
+					 * min_value >= watermark.
+					 *
+					 * XXX In fact, we are guaranteed to find an exact match
+					 * for the watermark, because of how we pick the watermark.
+					 */
+					while (brinsort_next_range(node, asc))
+						brinsort_load_tuples(node, true, false);
+
+					/*
+					 * If we have loaded any tuples into the tuplesort, try
+					 * sorting it and move to producing the tuples.
+					 *
+					 * XXX The range might have no rows matching the current
+					 * watermark, in which case the tuplesort is empty.
+					 */
+					if (node->bs_tuplesortstate)
+					{
+						tuplesort_performsort(node->bs_tuplesortstate);
+#ifdef BRINSORT_DEBUG
+						{
+							TuplesortInstrumentation stats;
+
+							tuplesort_get_stats(node->bs_tuplesortstate, &stats);
+
+							elog(DEBUG1, "method: %s  space: %ld kB (%s)",
+								 tuplesort_method_name(stats.sortMethod),
+								 stats.spaceUsed,
+								 tuplesort_space_type_name(stats.spaceType));
+						}
+#endif
+					}
+
+					node->bs_phase = BRINSORT_PROCESS_RANGE;
+					break;
+				}
+
+			case BRINSORT_PROCESS_RANGE:
+
+				elog(DEBUG1, "phase BRINSORT_PROCESS_RANGE");
+
+				slot = node->ss.ps.ps_ResultTupleSlot;
+
+				/* read tuples from the tuplesort range, and output them */
+				if (node->bs_tuplesortstate != NULL)
+				{
+					if (tuplesort_gettupleslot(node->bs_tuplesortstate,
+										ScanDirectionIsForward(direction),
+										false, slot, NULL))
+						return slot;
+
+					/* once we're done with the tuplesort, reset it */
+					tuplesort_reset(node->bs_tuplesortstate);
+				}
+
+				/*
+				 * Now that we processed tuples from the last range batch,
+				 * see if we reached the end of if we should try updating
+				 * the watermark once again. If the watermark is not set,
+				 * we've already processed the last range.
+				 */
+				if (!node->bs_watermark_set)
+				{
+					if (nullsFirst)
+						node->bs_phase = BRINSORT_FINISHED;
+					else
+					{
+						brinsort_rescan(node);
+						node->bs_phase = BRINSORT_LOAD_NULLS;
+					}
+				}
+				else
+				{
+					/* updte the watermark and try reading more ranges */
+					node->bs_phase = BRINSORT_LOAD_RANGE;
+					brinsort_update_watermark(node, asc);
+				}
+
+				break;
+
+			case BRINSORT_LOAD_NULLS:
+				{
+					elog(DEBUG1, "phase = LOAD_NULLS");
+
+					/*
+					 * Try loading another range. If there are no more ranges,
+					 * we're done and we move either to loading regular ranges.
+					 * Otherwise check if this range can contain 
+					 */
+					while (true)
+					{
+						/* no more ranges - terminate or load regular ranges */
+						if (!brinsort_next_range(node, asc))
+						{
+							if (nullsFirst)
+							{
+								brinsort_rescan(node);
+								node->bs_phase = BRINSORT_LOAD_RANGE;
+								brinsort_update_watermark(node, asc);
+							}
+							else
+								node->bs_phase = BRINSORT_FINISHED;
+
+							break;
+						}
+
+						/* If this range (may) have nulls, proces them */
+						if (brinsort_range_with_nulls(node))
+							break;
+					}
+
+					if (node->bs_range == NULL)
+						break;
+
+					/*
+					 * There should be nothing left in the tuplestore, because
+					 * we flush that at the end of processing regular tuples,
+					 * and we don't retain tuples between NULL ranges.
+					 */
+					// Assert(node->bs_tuplestore == NULL);
+
+					/*
+					 * Load the next unprocessed / NULL range. We don't need to
+					 * check watermark while processing NULLS.
+					 */
+					brinsort_load_tuples(node, false, true);
+
+					node->bs_phase = BRINSORT_PROCESS_NULLS;
+					break;
+				}
+
+				break;
+
+			case BRINSORT_PROCESS_NULLS:
+
+				elog(DEBUG1, "phase = LOAD_NULLS");
+
+				slot = node->ss.ps.ps_ResultTupleSlot;
+
+				Assert(node->bs_tuplestore != NULL);
+
+				/* read tuples from the tuplesort range, and output them */
+				if (node->bs_tuplestore != NULL)
+				{
+
+					while (tuplestore_gettupleslot(node->bs_tuplestore, true, true, slot))
+						return slot;
+
+					tuplestore_end(node->bs_tuplestore);
+					node->bs_tuplestore = NULL;
+
+					node->bs_phase = BRINSORT_LOAD_NULLS;	/* load next range */
+				}
+
+				break;
+
+			case BRINSORT_FINISHED:
+				elog(ERROR, "unexpected BrinSort phase: FINISHED");
+				break;
+		}
+	}
+
+	/*
+	 * if we get here it means the index scan failed so we are at the end of
+	 * the scan..
+	 */
+	node->iss_ReachedEnd = true;
+	return ExecClearTuple(slot);
+}
+
+/*
+ * IndexRecheck -- access method routine to recheck a tuple in EvalPlanQual
+ */
+static bool
+IndexRecheck(BrinSortState *node, TupleTableSlot *slot)
+{
+	ExprContext *econtext;
+
+	/*
+	 * extract necessary information from index scan node
+	 */
+	econtext = node->ss.ps.ps_ExprContext;
+
+	/* Does the tuple meet the indexqual condition? */
+	econtext->ecxt_scantuple = slot;
+	return ExecQualAndReset(node->indexqualorig, econtext);
+}
+
+
+/* ----------------------------------------------------------------
+ *		ExecBrinSort(node)
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+ExecBrinSort(PlanState *pstate)
+{
+	BrinSortState *node = castNode(BrinSortState, pstate);
+
+	/*
+	 * If we have runtime keys and they've not already been set up, do it now.
+	 */
+	if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
+		ExecReScan((PlanState *) node);
+
+	return ExecScan(&node->ss,
+					(ExecScanAccessMtd) IndexNext,
+					(ExecScanRecheckMtd) IndexRecheck);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecReScanBrinSort(node)
+ *
+ *		Recalculates the values of any scan keys whose value depends on
+ *		information known at runtime, then rescans the indexed relation.
+ *
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanBrinSort(BrinSortState *node)
+{
+	/*
+	 * If we are doing runtime key calculations (ie, any of the index key
+	 * values weren't simple Consts), compute the new key values.  But first,
+	 * reset the context so we don't leak memory as each outer tuple is
+	 * scanned.  Note this assumes that we will recalculate *all* runtime keys
+	 * on each call.
+	 */
+	if (node->iss_NumRuntimeKeys != 0)
+	{
+		ExprContext *econtext = node->iss_RuntimeContext;
+
+		ResetExprContext(econtext);
+		ExecIndexEvalRuntimeKeys(econtext,
+								 node->iss_RuntimeKeys,
+								 node->iss_NumRuntimeKeys);
+	}
+	node->iss_RuntimeKeysReady = true;
+
+	/* reset index scan */
+	if (node->iss_ScanDesc)
+		index_rescan(node->iss_ScanDesc,
+					 node->iss_ScanKeys, node->iss_NumScanKeys,
+					 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
+	node->iss_ReachedEnd = false;
+
+	ExecScanReScan(&node->ss);
+}
+
+
+/* ----------------------------------------------------------------
+ *		ExecEndBrinSort
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndBrinSort(BrinSortState *node)
+{
+	Relation	indexRelationDesc;
+	IndexScanDesc IndexScanDesc;
+
+	/*
+	 * extract information from the node
+	 */
+	indexRelationDesc = node->iss_RelationDesc;
+	IndexScanDesc = node->iss_ScanDesc;
+
+	/*
+	 * clear out tuple table slots
+	 */
+	if (node->ss.ps.ps_ResultTupleSlot)
+		ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+
+	/*
+	 * close the index relation (no-op if we didn't open it)
+	 */
+	if (IndexScanDesc)
+		index_endscan(IndexScanDesc);
+	if (indexRelationDesc)
+		index_close(indexRelationDesc, NoLock);
+
+	if (node->ss.ss_currentScanDesc != NULL)
+		table_endscan(node->ss.ss_currentScanDesc);
+
+	if (node->bs_tuplestore != NULL)
+		tuplestore_end(node->bs_tuplestore);
+	node->bs_tuplestore = NULL;
+
+	if (node->bs_tuplesortstate != NULL)
+		tuplesort_end(node->bs_tuplesortstate);
+	node->bs_tuplesortstate = NULL;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecBrinSortMarkPos
+ *
+ * Note: we assume that no caller attempts to set a mark before having read
+ * at least one tuple.  Otherwise, iss_ScanDesc might still be NULL.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBrinSortMarkPos(BrinSortState *node)
+{
+	EState	   *estate = node->ss.ps.state;
+	EPQState   *epqstate = estate->es_epq_active;
+
+	if (epqstate != NULL)
+	{
+		/*
+		 * We are inside an EvalPlanQual recheck.  If a test tuple exists for
+		 * this relation, then we shouldn't access the index at all.  We would
+		 * instead need to save, and later restore, the state of the
+		 * relsubs_done flag, so that re-fetching the test tuple is possible.
+		 * However, given the assumption that no caller sets a mark at the
+		 * start of the scan, we can only get here with relsubs_done[i]
+		 * already set, and so no state need be saved.
+		 */
+		Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
+
+		Assert(scanrelid > 0);
+		if (epqstate->relsubs_slot[scanrelid - 1] != NULL ||
+			epqstate->relsubs_rowmark[scanrelid - 1] != NULL)
+		{
+			/* Verify the claim above */
+			if (!epqstate->relsubs_done[scanrelid - 1])
+				elog(ERROR, "unexpected ExecBrinSortMarkPos call in EPQ recheck");
+			return;
+		}
+	}
+
+	index_markpos(node->iss_ScanDesc);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecIndexRestrPos
+ * ----------------------------------------------------------------
+ */
+void
+ExecBrinSortRestrPos(BrinSortState *node)
+{
+	EState	   *estate = node->ss.ps.state;
+	EPQState   *epqstate = estate->es_epq_active;
+
+	if (estate->es_epq_active != NULL)
+	{
+		/* See comments in ExecIndexMarkPos */
+		Index		scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
+
+		Assert(scanrelid > 0);
+		if (epqstate->relsubs_slot[scanrelid - 1] != NULL ||
+			epqstate->relsubs_rowmark[scanrelid - 1] != NULL)
+		{
+			/* Verify the claim above */
+			if (!epqstate->relsubs_done[scanrelid - 1])
+				elog(ERROR, "unexpected ExecBrinSortRestrPos call in EPQ recheck");
+			return;
+		}
+	}
+
+	index_restrpos(node->iss_ScanDesc);
+}
+
+/*
+ * somewhat crippled verson of bringetbitmap
+ *
+ * XXX We don't call consistent function (or any other function), so unlike
+ * bringetbitmap we don't set a separate memory context. If we end up filtering
+ * the ranges somehow (e.g. by WHERE conditions), this might be necessary.
+ *
+ * XXX Should be part of opclass, to somewhere in brin_minmax.c etc.
+ */
+static void
+ExecInitBrinSortRanges(BrinSort *node, BrinSortState *planstate)
+{
+	IndexScanDesc	scan = planstate->iss_ScanDesc;
+	Relation	indexRel = planstate->iss_RelationDesc;
+	int			attno;
+	FmgrInfo   *rangeproc;
+	BrinRangeScanDesc *brscan;
+	bool		asc;
+
+	/* BRIN Sort only allows ORDER BY using a single column */
+	Assert(node->numCols == 1);
+
+	/*
+	 * Determine index attnum we're interested in. The sortColIdx has attnums
+	 * from the table, but we need index attnum so that we can fetch the right
+	 * range summary.
+	 *
+	 * XXX Maybe we could/should arrange the tlists differently, so that this
+	 * is not necessary?
+	 *
+	 * FIXME This is broken, node->sortColIdx[0] is an index into the target
+	 * list, not table attnum.
+	 *
+	 * FIXME Also the projection is broken.
+	 */
+	attno = 0;
+	for (int i = 0; i < indexRel->rd_index->indnatts; i++)
+	{
+		if (indexRel->rd_index->indkey.values[i] == node->sortColIdx[0])
+		{
+			attno = (i + 1);
+			break;
+		}
+	}
+
+	/* make sure we matched the argument */
+	Assert(attno > 0);
+
+	/* get procedure to generate sort ranges */
+	rangeproc = index_getprocinfo(indexRel, attno, BRIN_PROCNUM_RANGES);
+
+	/*
+	 * Should not get here without a proc, thanks to the check before
+	 * building the BrinSort path.
+	 */
+	Assert(rangeproc != NULL);
+
+	memset(&planstate->bs_sortsupport, 0, sizeof(SortSupportData));
+	PrepareSortSupportFromOrderingOp(node->sortOperators[0], &planstate->bs_sortsupport);
+
+	/*
+	 * Determine if this ASC or DESC sort, so that we can request the
+	 * ranges in the appropriate order (ordered either by minval for
+	 * ASC, or by maxval for DESC).
+	 */
+	asc = ScanDirectionIsForward(node->indexorderdir);
+
+	/*
+	 * Ask the opclass to produce ranges in appropriate ordering.
+	 *
+	 * XXX Pass info about ASC/DESC, NULLS FIRST/LAST.
+	 */
+	brscan = (BrinRangeScanDesc *) DatumGetPointer(FunctionCall3Coll(rangeproc,
+											InvalidOid,	/* FIXME use proper collation*/
+											PointerGetDatum(scan),
+											Int16GetDatum(attno),
+											BoolGetDatum(asc)));
+
+	/* allocate for space, and also for the alternative ordering */
+	planstate->bs_scan = brscan;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecInitBrinSort
+ *
+ *		Initializes the index scan's state information, creates
+ *		scan keys, and opens the base and index relations.
+ *
+ *		Note: index scans have 2 sets of state information because
+ *			  we have to keep track of the base relation and the
+ *			  index relation.
+ * ----------------------------------------------------------------
+ */
+BrinSortState *
+ExecInitBrinSort(BrinSort *node, EState *estate, int eflags)
+{
+	BrinSortState *indexstate;
+	Relation	currentRelation;
+	LOCKMODE	lockmode;
+
+	/*
+	 * create state structure
+	 */
+	indexstate = makeNode(BrinSortState);
+	indexstate->ss.ps.plan = (Plan *) node;
+	indexstate->ss.ps.state = estate;
+	indexstate->ss.ps.ExecProcNode = ExecBrinSort;
+
+	/*
+	 * Miscellaneous initialization
+	 *
+	 * create expression context for node
+	 */
+	ExecAssignExprContext(estate, &indexstate->ss.ps);
+
+	/*
+	 * open the scan relation
+	 */
+	currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid, eflags);
+
+	indexstate->ss.ss_currentRelation = currentRelation;
+	indexstate->ss.ss_currentScanDesc = NULL;	/* no heap scan here */
+
+	/*
+	 * get the scan type from the relation descriptor.
+	 */
+	ExecInitScanTupleSlot(estate, &indexstate->ss,
+						  RelationGetDescr(currentRelation),
+						  table_slot_callbacks(currentRelation));
+
+	/*
+	 * Initialize result type and projection.
+	 */
+	ExecInitResultTypeTL(&indexstate->ss.ps);
+	ExecAssignScanProjectionInfo(&indexstate->ss);
+
+	/*
+	 * initialize child expressions
+	 *
+	 * Note: we don't initialize all of the indexqual expression, only the
+	 * sub-parts corresponding to runtime keys (see below).  Likewise for
+	 * indexorderby, if any.  But the indexqualorig expression is always
+	 * initialized even though it will only be used in some uncommon cases ---
+	 * would be nice to improve that.  (Problem is that any SubPlans present
+	 * in the expression must be found now...)
+	 */
+	indexstate->ss.ps.qual =
+		ExecInitQual(node->scan.plan.qual, (PlanState *) indexstate);
+	indexstate->indexqualorig =
+		ExecInitQual(node->indexqualorig, (PlanState *) indexstate);
+
+	/*
+	 * If we are just doing EXPLAIN (ie, aren't going to run the plan), stop
+	 * here.  This allows an index-advisor plugin to EXPLAIN a plan containing
+	 * references to nonexistent indexes.
+	 */
+	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+		return indexstate;
+
+	/* Open the index relation. */
+	lockmode = exec_rt_fetch(node->scan.scanrelid, estate)->rellockmode;
+	indexstate->iss_RelationDesc = index_open(node->indexid, lockmode);
+
+	/*
+	 * Initialize index-specific scan state
+	 */
+	indexstate->iss_RuntimeKeysReady = false;
+	indexstate->iss_RuntimeKeys = NULL;
+	indexstate->iss_NumRuntimeKeys = 0;
+
+	/*
+	 * build the index scan keys from the index qualification
+	 */
+	ExecIndexBuildScanKeys((PlanState *) indexstate,
+						   indexstate->iss_RelationDesc,
+						   node->indexqual,
+						   false,
+						   &indexstate->iss_ScanKeys,
+						   &indexstate->iss_NumScanKeys,
+						   &indexstate->iss_RuntimeKeys,
+						   &indexstate->iss_NumRuntimeKeys,
+						   NULL,	/* no ArrayKeys */
+						   NULL);
+
+	/*
+	 * If we have runtime keys, we need an ExprContext to evaluate them. The
+	 * node's standard context won't do because we want to reset that context
+	 * for every tuple.  So, build another context just like the other one...
+	 * -tgl 7/11/00
+	 */
+	if (indexstate->iss_NumRuntimeKeys != 0)
+	{
+		ExprContext *stdecontext = indexstate->ss.ps.ps_ExprContext;
+
+		ExecAssignExprContext(estate, &indexstate->ss.ps);
+		indexstate->iss_RuntimeContext = indexstate->ss.ps.ps_ExprContext;
+		indexstate->ss.ps.ps_ExprContext = stdecontext;
+	}
+	else
+	{
+		indexstate->iss_RuntimeContext = NULL;
+	}
+
+	indexstate->bs_tuplesortstate = NULL;
+	indexstate->bs_qual = indexstate->ss.ps.qual;
+	indexstate->ss.ps.qual = NULL;
+	ExecInitResultTupleSlotTL(&indexstate->ss.ps, &TTSOpsMinimalTuple);
+
+	/*
+	 * all done.
+	 */
+	return indexstate;
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecBrinSortEstimate
+ *
+ *		Compute the amount of space we'll need in the parallel
+ *		query DSM, and inform pcxt->estimator about our needs.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBrinSortEstimate(BrinSortState *node,
+					  ParallelContext *pcxt)
+{
+	EState	   *estate = node->ss.ps.state;
+
+	node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc,
+													 estate->es_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, node->iss_PscanLen);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecBrinSortInitializeDSM
+ *
+ *		Set up a parallel index scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBrinSortInitializeDSM(BrinSortState *node,
+						   ParallelContext *pcxt)
+{
+	EState	   *estate = node->ss.ps.state;
+	ParallelIndexScanDesc piscan;
+
+	piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
+	index_parallelscan_initialize(node->ss.ss_currentRelation,
+								  node->iss_RelationDesc,
+								  estate->es_snapshot,
+								  piscan);
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
+	node->iss_ScanDesc =
+		index_beginscan_parallel(node->ss.ss_currentRelation,
+								 node->iss_RelationDesc,
+								 node->iss_NumScanKeys,
+								 node->iss_NumOrderByKeys,
+								 piscan);
+
+	/*
+	 * If no run-time keys to calculate or they are ready, go ahead and pass
+	 * the scankeys to the index AM.
+	 */
+	if (node->iss_NumRuntimeKeys == 0 || node->iss_RuntimeKeysReady)
+		index_rescan(node->iss_ScanDesc,
+					 node->iss_ScanKeys, node->iss_NumScanKeys,
+					 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecBrinSortReInitializeDSM
+ *
+ *		Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBrinSortReInitializeDSM(BrinSortState *node,
+							 ParallelContext *pcxt)
+{
+	index_parallelrescan(node->iss_ScanDesc);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecBrinSortInitializeWorker
+ *
+ *		Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBrinSortInitializeWorker(BrinSortState *node,
+							  ParallelWorkerContext *pwcxt)
+{
+	ParallelIndexScanDesc piscan;
+
+	piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
+	node->iss_ScanDesc =
+		index_beginscan_parallel(node->ss.ss_currentRelation,
+								 node->iss_RelationDesc,
+								 node->iss_NumScanKeys,
+								 node->iss_NumOrderByKeys,
+								 piscan);
+
+	/*
+	 * If no run-time keys to calculate or they are ready, go ahead and pass
+	 * the scankeys to the index AM.
+	 */
+	if (node->iss_NumRuntimeKeys == 0 || node->iss_RuntimeKeysReady)
+		index_rescan(node->iss_ScanDesc,
+					 node->iss_ScanKeys, node->iss_NumScanKeys,
+					 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 4c6b1d1f55b..64d103b19e9 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -790,6 +790,260 @@ cost_index(IndexPath *path, PlannerInfo *root, double loop_count,
 	path->path.total_cost = startup_cost + run_cost;
 }
 
+void
+cost_brinsort(BrinSortPath *path, PlannerInfo *root, double loop_count,
+		   bool partial_path)
+{
+	IndexOptInfo *index = path->ipath.indexinfo;
+	RelOptInfo *baserel = index->rel;
+	amcostestimate_function amcostestimate;
+	List	   *qpquals;
+	Cost		startup_cost = 0;
+	Cost		run_cost = 0;
+	Cost		cpu_run_cost = 0;
+	Cost		indexStartupCost;
+	Cost		indexTotalCost;
+	Selectivity indexSelectivity;
+	double		indexCorrelation,
+				csquared;
+	double		spc_seq_page_cost,
+				spc_random_page_cost;
+	Cost		min_IO_cost,
+				max_IO_cost;
+	QualCost	qpqual_cost;
+	Cost		cpu_per_tuple;
+	double		tuples_fetched;
+	double		pages_fetched;
+	double		rand_heap_pages;
+	double		index_pages;
+
+	/* Should only be applied to base relations */
+	Assert(IsA(baserel, RelOptInfo) &&
+		   IsA(index, IndexOptInfo));
+	Assert(baserel->relid > 0);
+	Assert(baserel->rtekind == RTE_RELATION);
+
+	/*
+	 * Mark the path with the correct row estimate, and identify which quals
+	 * will need to be enforced as qpquals.  We need not check any quals that
+	 * are implied by the index's predicate, so we can use indrestrictinfo not
+	 * baserestrictinfo as the list of relevant restriction clauses for the
+	 * rel.
+	 */
+	if (path->ipath.path.param_info)
+	{
+		path->ipath.path.rows = path->ipath.path.param_info->ppi_rows;
+		/* qpquals come from the rel's restriction clauses and ppi_clauses */
+		qpquals = list_concat(extract_nonindex_conditions(path->ipath.indexinfo->indrestrictinfo,
+														  path->ipath.indexclauses),
+							  extract_nonindex_conditions(path->ipath.path.param_info->ppi_clauses,
+														  path->ipath.indexclauses));
+	}
+	else
+	{
+		path->ipath.path.rows = baserel->rows;
+		/* qpquals come from just the rel's restriction clauses */
+		qpquals = extract_nonindex_conditions(path->ipath.indexinfo->indrestrictinfo,
+											  path->ipath.indexclauses);
+	}
+
+	if (!enable_indexscan)
+		startup_cost += disable_cost;
+	/* we don't need to check enable_indexonlyscan; indxpath.c does that */
+
+	/*
+	 * Call index-access-method-specific code to estimate the processing cost
+	 * for scanning the index, as well as the selectivity of the index (ie,
+	 * the fraction of main-table tuples we will have to retrieve) and its
+	 * correlation to the main-table tuple order.  We need a cast here because
+	 * pathnodes.h uses a weak function type to avoid including amapi.h.
+	 */
+	amcostestimate = (amcostestimate_function) index->amcostestimate;
+	amcostestimate(root, &path->ipath, loop_count,
+				   &indexStartupCost, &indexTotalCost,
+				   &indexSelectivity, &indexCorrelation,
+				   &index_pages);
+
+	/*
+	 * Save amcostestimate's results for possible use in bitmap scan planning.
+	 * We don't bother to save indexStartupCost or indexCorrelation, because a
+	 * bitmap scan doesn't care about either.
+	 */
+	path->ipath.indextotalcost = indexTotalCost;
+	path->ipath.indexselectivity = indexSelectivity;
+
+	/* all costs for touching index itself included here */
+	startup_cost += indexStartupCost;
+	run_cost += indexTotalCost - indexStartupCost;
+
+	/* estimate number of main-table tuples fetched */
+	tuples_fetched = clamp_row_est(indexSelectivity * baserel->tuples);
+
+	/* fetch estimated page costs for tablespace containing table */
+	get_tablespace_page_costs(baserel->reltablespace,
+							  &spc_random_page_cost,
+							  &spc_seq_page_cost);
+
+	/*----------
+	 * Estimate number of main-table pages fetched, and compute I/O cost.
+	 *
+	 * When the index ordering is uncorrelated with the table ordering,
+	 * we use an approximation proposed by Mackert and Lohman (see
+	 * index_pages_fetched() for details) to compute the number of pages
+	 * fetched, and then charge spc_random_page_cost per page fetched.
+	 *
+	 * When the index ordering is exactly correlated with the table ordering
+	 * (just after a CLUSTER, for example), the number of pages fetched should
+	 * be exactly selectivity * table_size.  What's more, all but the first
+	 * will be sequential fetches, not the random fetches that occur in the
+	 * uncorrelated case.  So if the number of pages is more than 1, we
+	 * ought to charge
+	 *		spc_random_page_cost + (pages_fetched - 1) * spc_seq_page_cost
+	 * For partially-correlated indexes, we ought to charge somewhere between
+	 * these two estimates.  We currently interpolate linearly between the
+	 * estimates based on the correlation squared (XXX is that appropriate?).
+	 *
+	 * If it's an index-only scan, then we will not need to fetch any heap
+	 * pages for which the visibility map shows all tuples are visible.
+	 * Hence, reduce the estimated number of heap fetches accordingly.
+	 * We use the measured fraction of the entire heap that is all-visible,
+	 * which might not be particularly relevant to the subset of the heap
+	 * that this query will fetch; but it's not clear how to do better.
+	 *----------
+	 */
+	if (loop_count > 1)
+	{
+		/*
+		 * For repeated indexscans, the appropriate estimate for the
+		 * uncorrelated case is to scale up the number of tuples fetched in
+		 * the Mackert and Lohman formula by the number of scans, so that we
+		 * estimate the number of pages fetched by all the scans; then
+		 * pro-rate the costs for one scan.  In this case we assume all the
+		 * fetches are random accesses.
+		 */
+		pages_fetched = index_pages_fetched(tuples_fetched * loop_count,
+											baserel->pages,
+											(double) index->pages,
+											root);
+
+		rand_heap_pages = pages_fetched;
+
+		max_IO_cost = (pages_fetched * spc_random_page_cost) / loop_count;
+
+		/*
+		 * In the perfectly correlated case, the number of pages touched by
+		 * each scan is selectivity * table_size, and we can use the Mackert
+		 * and Lohman formula at the page level to estimate how much work is
+		 * saved by caching across scans.  We still assume all the fetches are
+		 * random, though, which is an overestimate that's hard to correct for
+		 * without double-counting the cache effects.  (But in most cases
+		 * where such a plan is actually interesting, only one page would get
+		 * fetched per scan anyway, so it shouldn't matter much.)
+		 */
+		pages_fetched = ceil(indexSelectivity * (double) baserel->pages);
+
+		pages_fetched = index_pages_fetched(pages_fetched * loop_count,
+											baserel->pages,
+											(double) index->pages,
+											root);
+
+		min_IO_cost = (pages_fetched * spc_random_page_cost) / loop_count;
+	}
+	else
+	{
+		/*
+		 * Normal case: apply the Mackert and Lohman formula, and then
+		 * interpolate between that and the correlation-derived result.
+		 */
+		pages_fetched = index_pages_fetched(tuples_fetched,
+											baserel->pages,
+											(double) index->pages,
+											root);
+
+		rand_heap_pages = pages_fetched;
+
+		/* max_IO_cost is for the perfectly uncorrelated case (csquared=0) */
+		max_IO_cost = pages_fetched * spc_random_page_cost;
+
+		/* min_IO_cost is for the perfectly correlated case (csquared=1) */
+		pages_fetched = ceil(indexSelectivity * (double) baserel->pages);
+
+		if (pages_fetched > 0)
+		{
+			min_IO_cost = spc_random_page_cost;
+			if (pages_fetched > 1)
+				min_IO_cost += (pages_fetched - 1) * spc_seq_page_cost;
+		}
+		else
+			min_IO_cost = 0;
+	}
+
+	if (partial_path)
+	{
+		/*
+		 * Estimate the number of parallel workers required to scan index. Use
+		 * the number of heap pages computed considering heap fetches won't be
+		 * sequential as for parallel scans the pages are accessed in random
+		 * order.
+		 */
+		path->ipath.path.parallel_workers = compute_parallel_worker(baserel,
+															  rand_heap_pages,
+															  index_pages,
+															  max_parallel_workers_per_gather);
+
+		/*
+		 * Fall out if workers can't be assigned for parallel scan, because in
+		 * such a case this path will be rejected.  So there is no benefit in
+		 * doing extra computation.
+		 */
+		if (path->ipath.path.parallel_workers <= 0)
+			return;
+
+		path->ipath.path.parallel_aware = true;
+	}
+
+	/*
+	 * Now interpolate based on estimated index order correlation to get total
+	 * disk I/O cost for main table accesses.
+	 */
+	csquared = indexCorrelation * indexCorrelation;
+
+	run_cost += max_IO_cost + csquared * (min_IO_cost - max_IO_cost);
+
+	/*
+	 * Estimate CPU costs per tuple.
+	 *
+	 * What we want here is cpu_tuple_cost plus the evaluation costs of any
+	 * qual clauses that we have to evaluate as qpquals.
+	 */
+	cost_qual_eval(&qpqual_cost, qpquals, root);
+
+	startup_cost += qpqual_cost.startup;
+	cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
+
+	cpu_run_cost += cpu_per_tuple * tuples_fetched;
+
+	/* tlist eval costs are paid per output row, not per tuple scanned */
+	startup_cost += path->ipath.path.pathtarget->cost.startup;
+	cpu_run_cost += path->ipath.path.pathtarget->cost.per_tuple * path->ipath.path.rows;
+
+	/* Adjust costing for parallelism, if used. */
+	if (path->ipath.path.parallel_workers > 0)
+	{
+		double		parallel_divisor = get_parallel_divisor(&path->ipath.path);
+
+		path->ipath.path.rows = clamp_row_est(path->ipath.path.rows / parallel_divisor);
+
+		/* The CPU cost is divided among all the workers. */
+		cpu_run_cost /= parallel_divisor;
+	}
+
+	run_cost += cpu_run_cost;
+
+	path->ipath.path.startup_cost = startup_cost;
+	path->ipath.path.total_cost = startup_cost + run_cost;
+}
+
 /*
  * extract_nonindex_conditions
  *
diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c
index c31fcc917df..18b625460eb 100644
--- a/src/backend/optimizer/path/indxpath.c
+++ b/src/backend/optimizer/path/indxpath.c
@@ -17,12 +17,16 @@
 
 #include <math.h>
 
+#include "access/brin_internal.h"
+#include "access/relation.h"
 #include "access/stratnum.h"
 #include "access/sysattr.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_operator.h"
+#include "catalog/pg_opclass.h"
 #include "catalog/pg_opfamily.h"
 #include "catalog/pg_type.h"
+#include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "nodes/supportnodes.h"
@@ -32,10 +36,13 @@
 #include "optimizer/paths.h"
 #include "optimizer/prep.h"
 #include "optimizer/restrictinfo.h"
+#include "utils/rel.h"
 #include "utils/lsyscache.h"
 #include "utils/selfuncs.h"
 
 
+bool		enable_brinsort = true;
+
 /* XXX see PartCollMatchesExprColl */
 #define IndexCollMatchesExprColl(idxcollation, exprcollation) \
 	((idxcollation) == InvalidOid || (idxcollation) == (exprcollation))
@@ -1127,6 +1134,185 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 		}
 	}
 
+	/*
+	 * If this is a BRIN index with suitable opclass (minmax or such), we may
+	 * try doing BRIN sort. BRIN indexes are not ordered and amcanorderbyop
+	 * is set to false, so we probably will need some new opclass flag to
+	 * mark indexes that support this.
+	 */
+	if (enable_brinsort && pathkeys_possibly_useful)
+	{
+		ListCell *lc;
+		Relation rel2 = relation_open(index->indexoid, NoLock);
+		int		 idx;
+
+		/*
+		 * Try generating sorted paths for each key with the right opclass.
+		 */
+		idx = -1;
+		foreach(lc, index->indextlist)
+		{
+			TargetEntry	   *indextle = (TargetEntry *) lfirst(lc);
+			BrinSortPath   *bpath;
+			Oid				rangeproc;
+			AttrNumber		attnum;
+
+			idx++;
+			attnum = (idx + 1);
+
+			/* skip expressions for now */
+			if (!AttributeNumberIsValid(index->indexkeys[idx]))
+				continue;
+
+			/* XXX ignore non-BRIN indexes */
+			if (rel2->rd_rel->relam != BRIN_AM_OID)
+				continue;
+
+			/*
+			 * XXX Ignore keys not using an opclass with the "ranges" proc.
+			 * For now we only do this for some minmax opclasses, but adding
+			 * it to all minmax is simple, and adding it to minmax-multi
+			 * should not be very hard.
+			 */
+			rangeproc = index_getprocid(rel2, attnum, BRIN_PROCNUM_RANGES);
+			if (!OidIsValid(rangeproc))
+				continue;
+
+			/*
+			 * XXX stuff extracted from build_index_pathkeys, except that we
+			 * only deal with a single index key (producing a single pathkey),
+			 * so we only sort on a single column. I guess we could use more
+			 * index keys and sort on more expressions? Would that mean these
+			 * keys need to be rather well correlated? In any case, it seems
+			 * rather complex to implement, so I leave it as a possible
+			 * future improvement.
+			 *
+			 * XXX This could also use the other BRIN keys (even from other
+			 * indexes) in a different way - we might use the other ranges
+			 * to quickly eliminate some of the chunks, essentially like a
+			 * bitmap, but maybe without using the bitmap. Or we might use
+			 * other indexes through bitmaps.
+			 *
+			 * XXX This fakes a number of parameters, because we don't store
+			 * the btree opclass in the index, instead we use the default
+			 * one for the key data type. And BRIN does not allow specifying
+			 *
+			 * XXX We don't add the path to result, because this function is
+			 * supposed to generate IndexPaths. Instead, we just add the path
+			 * using add_path(). We should be building this in a different
+			 * place, perhaps in create_index_paths() or so.
+			 *
+			 * XXX By building it elsewhere, we could also leverage the index
+			 * paths we've built here, particularly the bitmap index paths,
+			 * which we could use to eliminate many of the ranges.
+			 *
+			 * XXX We don't have any explicit ordering associated with the
+			 * BRIN index, e.g. we don't have ASC/DESC and NULLS FIRST/LAST.
+			 * So this is not encoded in the index, and we can satisfy all
+			 * these cases - but we need to add paths for each combination.
+			 * I wonder if there's a better way to do this.
+			 */
+
+			/* ASC NULLS LAST */
+			index_pathkeys = build_index_pathkeys_brin(root, index, indextle,
+													   idx,
+													   false,	/* reverse_sort */
+													   false);	/* nulls_first */
+
+			useful_pathkeys = truncate_useless_pathkeys(root, rel,
+														index_pathkeys);
+
+			if (useful_pathkeys != NIL)
+			{
+				bpath = create_brinsort_path(root, index,
+											 index_clauses,
+											 useful_pathkeys,
+											 ForwardScanDirection,
+											 index_only_scan,
+											 outer_relids,
+											 loop_count,
+											 false);
+
+				/* cheat and add it anyway */
+				add_path(rel, (Path *) bpath);
+			}
+
+			/* DESC NULLS LAST */
+			index_pathkeys = build_index_pathkeys_brin(root, index, indextle,
+													   idx,
+													   true,	/* reverse_sort */
+													   false);	/* nulls_first */
+
+			useful_pathkeys = truncate_useless_pathkeys(root, rel,
+														index_pathkeys);
+
+			if (useful_pathkeys != NIL)
+			{
+				bpath = create_brinsort_path(root, index,
+											 index_clauses,
+											 useful_pathkeys,
+											 BackwardScanDirection,
+											 index_only_scan,
+											 outer_relids,
+											 loop_count,
+											 false);
+
+				/* cheat and add it anyway */
+				add_path(rel, (Path *) bpath);
+			}
+
+			/* ASC NULLS FIRST */
+			index_pathkeys = build_index_pathkeys_brin(root, index, indextle,
+													   idx,
+													   false,	/* reverse_sort */
+													   true);	/* nulls_first */
+
+			useful_pathkeys = truncate_useless_pathkeys(root, rel,
+														index_pathkeys);
+
+			if (useful_pathkeys != NIL)
+			{
+				bpath = create_brinsort_path(root, index,
+											 index_clauses,
+											 useful_pathkeys,
+											 ForwardScanDirection,
+											 index_only_scan,
+											 outer_relids,
+											 loop_count,
+											 false);
+
+				/* cheat and add it anyway */
+				add_path(rel, (Path *) bpath);
+			}
+
+			/* DESC NULLS FIRST */
+			index_pathkeys = build_index_pathkeys_brin(root, index, indextle,
+													   idx,
+													   true,	/* reverse_sort */
+													   true);	/* nulls_first */
+
+			useful_pathkeys = truncate_useless_pathkeys(root, rel,
+														index_pathkeys);
+
+			if (useful_pathkeys != NIL)
+			{
+				bpath = create_brinsort_path(root, index,
+											 index_clauses,
+											 useful_pathkeys,
+											 BackwardScanDirection,
+											 index_only_scan,
+											 outer_relids,
+											 loop_count,
+											 false);
+
+				/* cheat and add it anyway */
+				add_path(rel, (Path *) bpath);
+			}
+		}
+
+		relation_close(rel2, NoLock);
+	}
+
 	return result;
 }
 
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index a9943cd6e01..83dde6f22eb 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -27,6 +27,7 @@
 #include "optimizer/paths.h"
 #include "partitioning/partbounds.h"
 #include "utils/lsyscache.h"
+#include "utils/typcache.h"
 
 
 static bool pathkey_is_redundant(PathKey *new_pathkey, List *pathkeys);
@@ -630,6 +631,55 @@ build_index_pathkeys(PlannerInfo *root,
 	return retval;
 }
 
+
+List *
+build_index_pathkeys_brin(PlannerInfo *root,
+						  IndexOptInfo *index,
+						  TargetEntry  *tle,
+						  int idx,
+						  bool reverse_sort,
+						  bool nulls_first)
+{
+	TypeCacheEntry *typcache;
+	PathKey		   *cpathkey;
+	Oid				sortopfamily;
+
+	/*
+	 * Get default btree opfamily for the type, extracted from the
+	 * entry in index targetlist.
+	 *
+	 * XXX Is there a better / more correct way to do this?
+	 */
+	typcache = lookup_type_cache(exprType((Node *) tle->expr),
+								 TYPECACHE_BTREE_OPFAMILY);
+	sortopfamily = typcache->btree_opf;
+
+	/*
+	 * OK, try to make a canonical pathkey for this sort key.  Note we're
+	 * underneath any outer joins, so nullable_relids should be NULL.
+	 */
+	cpathkey = make_pathkey_from_sortinfo(root,
+										  tle->expr,
+										  NULL,
+										  sortopfamily,
+										  index->opcintype[idx],
+										  index->indexcollations[idx],
+										  reverse_sort,
+										  nulls_first,
+										  0,
+										  index->rel->relids,
+										  false);
+
+	/*
+	 * There may be no pathkey if we haven't matched any sortkey, in which
+	 * case ignore it.
+	 */
+	if (!cpathkey)
+		return NIL;
+
+	return list_make1(cpathkey);
+}
+
 /*
  * partkey_is_bool_constant_for_query
  *
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index ac86ce90033..395c632f430 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -124,6 +124,8 @@ static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path,
 										  List *tlist, List *scan_clauses);
 static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path,
 								   List *tlist, List *scan_clauses, bool indexonly);
+static BrinSort *create_brinsort_plan(PlannerInfo *root, BrinSortPath *best_path,
+									  List *tlist, List *scan_clauses);
 static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
 											   BitmapHeapPath *best_path,
 											   List *tlist, List *scan_clauses);
@@ -191,6 +193,9 @@ static IndexOnlyScan *make_indexonlyscan(List *qptlist, List *qpqual,
 										 List *indexorderby,
 										 List *indextlist,
 										 ScanDirection indexscandir);
+static BrinSort *make_brinsort(List *qptlist, List *qpqual, Index scanrelid,
+							   Oid indexid, List *indexqual, List *indexqualorig,
+							   ScanDirection indexscandir);
 static BitmapIndexScan *make_bitmap_indexscan(Index scanrelid, Oid indexid,
 											  List *indexqual,
 											  List *indexqualorig);
@@ -410,6 +415,9 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
 		case T_CustomScan:
 			plan = create_scan_plan(root, best_path, flags);
 			break;
+		case T_BrinSort:
+			plan = create_scan_plan(root, best_path, flags);
+			break;
 		case T_HashJoin:
 		case T_MergeJoin:
 		case T_NestLoop:
@@ -776,6 +784,13 @@ create_scan_plan(PlannerInfo *root, Path *best_path, int flags)
 												   scan_clauses);
 			break;
 
+		case T_BrinSort:
+			plan = (Plan *) create_brinsort_plan(root,
+												 (BrinSortPath *) best_path,
+												 tlist,
+												 scan_clauses);
+			break;
+
 		default:
 			elog(ERROR, "unrecognized node type: %d",
 				 (int) best_path->pathtype);
@@ -3180,6 +3195,154 @@ create_indexscan_plan(PlannerInfo *root,
 	return scan_plan;
 }
 
+/*
+ * create_brinsort_plan
+ *	  Returns a brinsort plan for the base relation scanned by 'best_path'
+ *	  with restriction clauses 'scan_clauses' and targetlist 'tlist'.
+ *
+ * This is mostly a slighly simplified version of create_indexscan_plan, with
+ * the unecessary parts removed (we don't support indexonly scans, or reordering
+ * and similar stuff).
+ */
+static BrinSort *
+create_brinsort_plan(PlannerInfo *root,
+					 BrinSortPath *best_path,
+					 List *tlist,
+					 List *scan_clauses)
+{
+	BrinSort   *brinsort_plan;
+	List	   *indexclauses = best_path->ipath.indexclauses;
+	Index		baserelid = best_path->ipath.path.parent->relid;
+	IndexOptInfo *indexinfo = best_path->ipath.indexinfo;
+	Oid			indexoid = indexinfo->indexoid;
+	List	   *qpqual;
+	List	   *stripped_indexquals;
+	List	   *fixed_indexquals;
+	ListCell   *l;
+
+	List	   *pathkeys = best_path->ipath.path.pathkeys;
+
+	/* it should be a base rel... */
+	Assert(baserelid > 0);
+	Assert(best_path->ipath.path.parent->rtekind == RTE_RELATION);
+
+	/*
+	 * Extract the index qual expressions (stripped of RestrictInfos) from the
+	 * IndexClauses list, and prepare a copy with index Vars substituted for
+	 * table Vars.  (This step also does replace_nestloop_params on the
+	 * fixed_indexquals.)
+	 */
+	fix_indexqual_references(root, &best_path->ipath,
+							 &stripped_indexquals,
+							 &fixed_indexquals);
+
+	/*
+	 * The qpqual list must contain all restrictions not automatically handled
+	 * by the index, other than pseudoconstant clauses which will be handled
+	 * by a separate gating plan node.  All the predicates in the indexquals
+	 * will be checked (either by the index itself, or by nodeIndexscan.c),
+	 * but if there are any "special" operators involved then they must be
+	 * included in qpqual.  The upshot is that qpqual must contain
+	 * scan_clauses minus whatever appears in indexquals.
+	 *
+	 * is_redundant_with_indexclauses() detects cases where a scan clause is
+	 * present in the indexclauses list or is generated from the same
+	 * EquivalenceClass as some indexclause, and is therefore redundant with
+	 * it, though not equal.  (The latter happens when indxpath.c prefers a
+	 * different derived equality than what generate_join_implied_equalities
+	 * picked for a parameterized scan's ppi_clauses.)  Note that it will not
+	 * match to lossy index clauses, which is critical because we have to
+	 * include the original clause in qpqual in that case.
+	 *
+	 * In some situations (particularly with OR'd index conditions) we may
+	 * have scan_clauses that are not equal to, but are logically implied by,
+	 * the index quals; so we also try a predicate_implied_by() check to see
+	 * if we can discard quals that way.  (predicate_implied_by assumes its
+	 * first input contains only immutable functions, so we have to check
+	 * that.)
+	 *
+	 * Note: if you change this bit of code you should also look at
+	 * extract_nonindex_conditions() in costsize.c.
+	 */
+	qpqual = NIL;
+	foreach(l, scan_clauses)
+	{
+		RestrictInfo *rinfo = lfirst_node(RestrictInfo, l);
+
+		if (rinfo->pseudoconstant)
+			continue;			/* we may drop pseudoconstants here */
+		if (is_redundant_with_indexclauses(rinfo, indexclauses))
+			continue;			/* dup or derived from same EquivalenceClass */
+		if (!contain_mutable_functions((Node *) rinfo->clause) &&
+			predicate_implied_by(list_make1(rinfo->clause), stripped_indexquals,
+								 false))
+			continue;			/* provably implied by indexquals */
+		qpqual = lappend(qpqual, rinfo);
+	}
+
+	/* Sort clauses into best execution order */
+	qpqual = order_qual_clauses(root, qpqual);
+
+	/* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
+	qpqual = extract_actual_clauses(qpqual, false);
+
+	/*
+	 * We have to replace any outer-relation variables with nestloop params in
+	 * the indexqualorig, qpqual, and indexorderbyorig expressions.  A bit
+	 * annoying to have to do this separately from the processing in
+	 * fix_indexqual_references --- rethink this when generalizing the inner
+	 * indexscan support.  But note we can't really do this earlier because
+	 * it'd break the comparisons to predicates above ... (or would it?  Those
+	 * wouldn't have outer refs)
+	 */
+	if (best_path->ipath.path.param_info)
+	{
+		stripped_indexquals = (List *)
+			replace_nestloop_params(root, (Node *) stripped_indexquals);
+		qpqual = (List *)
+			replace_nestloop_params(root, (Node *) qpqual);
+	}
+
+	/* Finally ready to build the plan node */
+	brinsort_plan = make_brinsort(tlist,
+								  qpqual,
+								  baserelid,
+								  indexoid,
+								  fixed_indexquals,
+								  stripped_indexquals,
+								  best_path->ipath.indexscandir);
+
+	if (pathkeys != NIL)
+	{
+		/*
+		 * Compute sort column info, and adjust the Append's tlist as needed.
+		 * Because we pass adjust_tlist_in_place = true, we may ignore the
+		 * function result; it must be the same plan node.  However, we then
+		 * need to detect whether any tlist entries were added.
+		 */
+		(void) prepare_sort_from_pathkeys((Plan *) brinsort_plan, pathkeys,
+										  best_path->ipath.path.parent->relids,
+										  NULL,
+										  true,
+										  &brinsort_plan->numCols,
+										  &brinsort_plan->sortColIdx,
+										  &brinsort_plan->sortOperators,
+										  &brinsort_plan->collations,
+										  &brinsort_plan->nullsFirst);
+		//tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
+		for (int i = 0; i < brinsort_plan->numCols; i++)
+			elog(DEBUG1, "%d => %d %d %d %d", i,
+				 brinsort_plan->sortColIdx[i],
+				 brinsort_plan->sortOperators[i],
+				 brinsort_plan->collations[i],
+				 brinsort_plan->nullsFirst[i]);
+	}
+
+	copy_generic_path_info(&brinsort_plan->scan.plan, &best_path->ipath.path);
+
+	return brinsort_plan;
+}
+
 /*
  * create_bitmap_scan_plan
  *	  Returns a bitmap scan plan for the base relation scanned by 'best_path'
@@ -5523,6 +5686,31 @@ make_indexscan(List *qptlist,
 	return node;
 }
 
+static BrinSort *
+make_brinsort(List *qptlist,
+			   List *qpqual,
+			   Index scanrelid,
+			   Oid indexid,
+			   List *indexqual,
+			   List *indexqualorig,
+			   ScanDirection indexscandir)
+{
+	BrinSort  *node = makeNode(BrinSort);
+	Plan	   *plan = &node->scan.plan;
+
+	plan->targetlist = qptlist;
+	plan->qual = qpqual;
+	plan->lefttree = NULL;
+	plan->righttree = NULL;
+	node->scan.scanrelid = scanrelid;
+	node->indexid = indexid;
+	node->indexqual = indexqual;
+	node->indexqualorig = indexqualorig;
+	node->indexorderdir = indexscandir;
+
+	return node;
+}
+
 static IndexOnlyScan *
 make_indexonlyscan(List *qptlist,
 				   List *qpqual,
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 1cb0abdbc1f..2584a1f032d 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -609,6 +609,25 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 				return set_indexonlyscan_references(root, splan, rtoffset);
 			}
 			break;
+		case T_BrinSort:
+			{
+				BrinSort  *splan = (BrinSort *) plan;
+
+				splan->scan.scanrelid += rtoffset;
+				splan->scan.plan.targetlist =
+					fix_scan_list(root, splan->scan.plan.targetlist,
+								  rtoffset, NUM_EXEC_TLIST(plan));
+				splan->scan.plan.qual =
+					fix_scan_list(root, splan->scan.plan.qual,
+								  rtoffset, NUM_EXEC_QUAL(plan));
+				splan->indexqual =
+					fix_scan_list(root, splan->indexqual,
+								  rtoffset, 1);
+				splan->indexqualorig =
+					fix_scan_list(root, splan->indexqualorig,
+								  rtoffset, NUM_EXEC_QUAL(plan));
+			}
+			break;
 		case T_BitmapIndexScan:
 			{
 				BitmapIndexScan *splan = (BitmapIndexScan *) plan;
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 70f61ae7b1c..6471bbb5de8 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1030,6 +1030,63 @@ create_index_path(PlannerInfo *root,
 	return pathnode;
 }
 
+
+/*
+ * create_brinsort_path
+ *	  Creates a path node for sorted brin sort scan.
+ *
+ * 'index' is a usable index.
+ * 'indexclauses' is a list of IndexClause nodes representing clauses
+ *			to be enforced as qual conditions in the scan.
+ * 'indexorderbys' is a list of bare expressions (no RestrictInfos)
+ *			to be used as index ordering operators in the scan.
+ * 'indexorderbycols' is an integer list of index column numbers (zero based)
+ *			the ordering operators can be used with.
+ * 'pathkeys' describes the ordering of the path.
+ * 'indexscandir' is ForwardScanDirection or BackwardScanDirection
+ *			for an ordered index, or NoMovementScanDirection for
+ *			an unordered index.
+ * 'indexonly' is true if an index-only scan is wanted.
+ * 'required_outer' is the set of outer relids for a parameterized path.
+ * 'loop_count' is the number of repetitions of the indexscan to factor into
+ *		estimates of caching behavior.
+ * 'partial_path' is true if constructing a parallel index scan path.
+ *
+ * Returns the new path node.
+ */
+BrinSortPath *
+create_brinsort_path(PlannerInfo *root,
+					 IndexOptInfo *index,
+					 List *indexclauses,
+					 List *pathkeys,
+					 ScanDirection indexscandir,
+					 bool indexonly,
+					 Relids required_outer,
+					 double loop_count,
+					 bool partial_path)
+{
+	BrinSortPath  *pathnode = makeNode(BrinSortPath);
+	RelOptInfo *rel = index->rel;
+
+	pathnode->ipath.path.pathtype = T_BrinSort;
+	pathnode->ipath.path.parent = rel;
+	pathnode->ipath.path.pathtarget = rel->reltarget;
+	pathnode->ipath.path.param_info = get_baserel_parampathinfo(root, rel,
+														  required_outer);
+	pathnode->ipath.path.parallel_aware = false;
+	pathnode->ipath.path.parallel_safe = rel->consider_parallel;
+	pathnode->ipath.path.parallel_workers = 0;
+	pathnode->ipath.path.pathkeys = pathkeys;
+
+	pathnode->ipath.indexinfo = index;
+	pathnode->ipath.indexclauses = indexclauses;
+	pathnode->ipath.indexscandir = indexscandir;
+
+	cost_brinsort(pathnode, root, loop_count, partial_path);
+
+	return pathnode;
+}
+
 /*
  * create_bitmap_heap_path
  *	  Creates a path node for a bitmap scan.
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 06dfeb6cd8b..a5ca3bd0cc4 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -977,6 +977,16 @@ struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_brinsort", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of BRIN sort plans."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_brinsort,
+		false,
+		NULL, NULL, NULL
+	},
 	{
 		{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
 			gettext_noop("Enables genetic query optimization."),
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index a7cccac9c90..be05586ec57 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -34,41 +34,6 @@ typedef struct BrinStatsData
 	BlockNumber revmapNumPages;
 } BrinStatsData;
 
-/*
- * Info about ranges for BRIN Sort.
- */
-typedef struct BrinRange
-{
-	BlockNumber blkno_start;
-	BlockNumber blkno_end;
-
-	Datum	min_value;
-	Datum	max_value;
-	bool	has_nulls;
-	bool	all_nulls;
-	bool	not_summarized;
-
-	/*
-	 * Index of the range when ordered by min_value (if there are multiple
-	 * ranges with the same min_value, it's the lowest one).
-	 */
-	uint32	min_index;
-
-	/*
-	 * Minimum min_index from all ranges with higher max_value (i.e. when
-	 * sorted by max_value). If there are multiple ranges with the same
-	 * max_value, it depends on the ordering (i.e. the ranges may get
-	 * different min_index_lowest, depending on the exact ordering).
-	 */
-	uint32	min_index_lowest;
-} BrinRange;
-
-typedef struct BrinRanges
-{
-	int			nranges;
-	BrinRange	ranges[FLEXIBLE_ARRAY_MEMBER];
-} BrinRanges;
-
 typedef struct BrinMinmaxStats
 {
 	int32		vl_len_;		/* varlena header (do not touch directly!) */
diff --git a/src/include/access/brin_internal.h b/src/include/access/brin_internal.h
index f4be357c176..06a36f769c5 100644
--- a/src/include/access/brin_internal.h
+++ b/src/include/access/brin_internal.h
@@ -73,6 +73,7 @@ typedef struct BrinDesc
 #define BRIN_PROCNUM_UNION			4
 #define BRIN_MANDATORY_NPROCS		4
 #define BRIN_PROCNUM_OPTIONS 		5	/* optional */
+#define BRIN_PROCNUM_RANGES 		7	/* optional */
 /* procedure numbers up to 10 are reserved for BRIN future expansion */
 #define BRIN_FIRST_OPTIONAL_PROCNUM 11
 #define BRIN_PROCNUM_STATISTICS		11	/* optional */
diff --git a/src/include/catalog/pg_amproc.dat b/src/include/catalog/pg_amproc.dat
index 558df53206d..7a22eaef33c 100644
--- a/src/include/catalog/pg_amproc.dat
+++ b/src/include/catalog/pg_amproc.dat
@@ -806,6 +806,8 @@
   amprocrighttype => 'bytea', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/bytea_minmax_ops', amproclefttype => 'bytea',
   amprocrighttype => 'bytea', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/bytea_minmax_ops', amproclefttype => 'bytea',
+  amprocrighttype => 'bytea', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # bloom bytea
 { amprocfamily => 'brin/bytea_bloom_ops', amproclefttype => 'bytea',
@@ -839,6 +841,8 @@
   amprocrighttype => 'char', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/char_minmax_ops', amproclefttype => 'char',
   amprocrighttype => 'char', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/char_minmax_ops', amproclefttype => 'char',
+  amprocrighttype => 'char', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # bloom "char"
 { amprocfamily => 'brin/char_bloom_ops', amproclefttype => 'char',
@@ -870,6 +874,8 @@
   amprocrighttype => 'name', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/name_minmax_ops', amproclefttype => 'name',
   amprocrighttype => 'name', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/name_minmax_ops', amproclefttype => 'name',
+  amprocrighttype => 'name', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # bloom name
 { amprocfamily => 'brin/name_bloom_ops', amproclefttype => 'name',
@@ -901,6 +907,8 @@
   amprocrighttype => 'int8', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int8',
   amprocrighttype => 'int8', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int8',
+  amprocrighttype => 'int8', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int2',
   amprocrighttype => 'int2', amprocnum => '1',
@@ -915,6 +923,8 @@
   amprocrighttype => 'int2', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int2',
   amprocrighttype => 'int2', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int2',
+  amprocrighttype => 'int2', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int4',
   amprocrighttype => 'int4', amprocnum => '1',
@@ -929,6 +939,8 @@
   amprocrighttype => 'int4', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int4',
   amprocrighttype => 'int4', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/integer_minmax_ops', amproclefttype => 'int4',
+  amprocrighttype => 'int4', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax multi integer: int2, int4, int8
 { amprocfamily => 'brin/integer_minmax_multi_ops', amproclefttype => 'int2',
@@ -1048,6 +1060,8 @@
   amprocrighttype => 'text', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/text_minmax_ops', amproclefttype => 'text',
   amprocrighttype => 'text', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/text_minmax_ops', amproclefttype => 'text',
+  amprocrighttype => 'text', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # bloom text
 { amprocfamily => 'brin/text_bloom_ops', amproclefttype => 'text',
@@ -1078,6 +1092,8 @@
   amprocrighttype => 'oid', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/oid_minmax_ops', amproclefttype => 'oid',
   amprocrighttype => 'oid', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/oid_minmax_ops', amproclefttype => 'oid',
+  amprocrighttype => 'oid', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax multi oid
 { amprocfamily => 'brin/oid_minmax_multi_ops', amproclefttype => 'oid',
@@ -1128,6 +1144,8 @@
   amprocrighttype => 'tid', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/tid_minmax_ops', amproclefttype => 'tid',
   amprocrighttype => 'tid', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/tid_minmax_ops', amproclefttype => 'tid',
+  amprocrighttype => 'tid', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # bloom tid
 { amprocfamily => 'brin/tid_bloom_ops', amproclefttype => 'tid',
@@ -1181,6 +1199,9 @@
 { amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float4',
   amprocrighttype => 'float4', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float4',
+  amprocrighttype => 'float4', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 { amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float8',
   amprocrighttype => 'float8', amprocnum => '1',
@@ -1197,6 +1218,9 @@
 { amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float8',
   amprocrighttype => 'float8', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/float_minmax_ops', amproclefttype => 'float8',
+  amprocrighttype => 'float8', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi float
 { amprocfamily => 'brin/float_minmax_multi_ops', amproclefttype => 'float4',
@@ -1288,6 +1312,9 @@
 { amprocfamily => 'brin/macaddr_minmax_ops', amproclefttype => 'macaddr',
   amprocrighttype => 'macaddr', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/macaddr_minmax_ops', amproclefttype => 'macaddr',
+  amprocrighttype => 'macaddr', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi macaddr
 { amprocfamily => 'brin/macaddr_minmax_multi_ops', amproclefttype => 'macaddr',
@@ -1344,6 +1371,9 @@
 { amprocfamily => 'brin/macaddr8_minmax_ops', amproclefttype => 'macaddr8',
   amprocrighttype => 'macaddr8', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/macaddr8_minmax_ops', amproclefttype => 'macaddr8',
+  amprocrighttype => 'macaddr8', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi macaddr8
 { amprocfamily => 'brin/macaddr8_minmax_multi_ops',
@@ -1398,6 +1428,8 @@
   amprocrighttype => 'inet', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/network_minmax_ops', amproclefttype => 'inet',
   amprocrighttype => 'inet', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/network_minmax_ops', amproclefttype => 'inet',
+  amprocrighttype => 'inet', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax multi inet
 { amprocfamily => 'brin/network_minmax_multi_ops', amproclefttype => 'inet',
@@ -1471,6 +1503,9 @@
 { amprocfamily => 'brin/bpchar_minmax_ops', amproclefttype => 'bpchar',
   amprocrighttype => 'bpchar', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/bpchar_minmax_ops', amproclefttype => 'bpchar',
+  amprocrighttype => 'bpchar', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # bloom character
 { amprocfamily => 'brin/bpchar_bloom_ops', amproclefttype => 'bpchar',
@@ -1504,6 +1539,8 @@
   amprocrighttype => 'time', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/time_minmax_ops', amproclefttype => 'time',
   amprocrighttype => 'time', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/time_minmax_ops', amproclefttype => 'time',
+  amprocrighttype => 'time', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax multi time without time zone
 { amprocfamily => 'brin/time_minmax_multi_ops', amproclefttype => 'time',
@@ -1557,6 +1594,9 @@
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamp',
   amprocrighttype => 'timestamp', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamp',
+  amprocrighttype => 'timestamp', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamptz',
   amprocrighttype => 'timestamptz', amprocnum => '1',
@@ -1573,6 +1613,9 @@
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamptz',
   amprocrighttype => 'timestamptz', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'timestamptz',
+  amprocrighttype => 'timestamptz', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'date',
   amprocrighttype => 'date', amprocnum => '1',
@@ -1587,6 +1630,8 @@
   amprocrighttype => 'date', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'date',
   amprocrighttype => 'date', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/datetime_minmax_ops', amproclefttype => 'date',
+  amprocrighttype => 'date', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax multi datetime (date, timestamp, timestamptz)
 { amprocfamily => 'brin/datetime_minmax_multi_ops',
@@ -1716,6 +1761,9 @@
 { amprocfamily => 'brin/interval_minmax_ops', amproclefttype => 'interval',
   amprocrighttype => 'interval', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/interval_minmax_ops', amproclefttype => 'interval',
+  amprocrighttype => 'interval', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi interval
 { amprocfamily => 'brin/interval_minmax_multi_ops',
@@ -1772,6 +1820,9 @@
 { amprocfamily => 'brin/timetz_minmax_ops', amproclefttype => 'timetz',
   amprocrighttype => 'timetz', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/timetz_minmax_ops', amproclefttype => 'timetz',
+  amprocrighttype => 'timetz', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi time with time zone
 { amprocfamily => 'brin/timetz_minmax_multi_ops', amproclefttype => 'timetz',
@@ -1824,6 +1875,8 @@
   amprocrighttype => 'bit', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/bit_minmax_ops', amproclefttype => 'bit',
   amprocrighttype => 'bit', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/bit_minmax_ops', amproclefttype => 'bit',
+  amprocrighttype => 'bit', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax bit varying
 { amprocfamily => 'brin/varbit_minmax_ops', amproclefttype => 'varbit',
@@ -1841,6 +1894,9 @@
 { amprocfamily => 'brin/varbit_minmax_ops', amproclefttype => 'varbit',
   amprocrighttype => 'varbit', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/varbit_minmax_ops', amproclefttype => 'varbit',
+  amprocrighttype => 'varbit', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax numeric
 { amprocfamily => 'brin/numeric_minmax_ops', amproclefttype => 'numeric',
@@ -1858,6 +1914,9 @@
 { amprocfamily => 'brin/numeric_minmax_ops', amproclefttype => 'numeric',
   amprocrighttype => 'numeric', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/numeric_minmax_ops', amproclefttype => 'numeric',
+  amprocrighttype => 'numeric', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi numeric
 { amprocfamily => 'brin/numeric_minmax_multi_ops', amproclefttype => 'numeric',
@@ -1912,6 +1971,8 @@
   amprocrighttype => 'uuid', amprocnum => '4', amproc => 'brin_minmax_union' },
 { amprocfamily => 'brin/uuid_minmax_ops', amproclefttype => 'uuid',
   amprocrighttype => 'uuid', amprocnum => '11', amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/uuid_minmax_ops', amproclefttype => 'uuid',
+  amprocrighttype => 'uuid', amprocnum => '7', amproc => 'brin_minmax_ranges' },
 
 # minmax multi uuid
 { amprocfamily => 'brin/uuid_minmax_multi_ops', amproclefttype => 'uuid',
@@ -1988,6 +2049,9 @@
 { amprocfamily => 'brin/pg_lsn_minmax_ops', amproclefttype => 'pg_lsn',
   amprocrighttype => 'pg_lsn', amprocnum => '11',
   amproc => 'brin_minmax_stats' },
+{ amprocfamily => 'brin/pg_lsn_minmax_ops', amproclefttype => 'pg_lsn',
+  amprocrighttype => 'pg_lsn', amprocnum => '7',
+  amproc => 'brin_minmax_ranges' },
 
 # minmax multi pg_lsn
 { amprocfamily => 'brin/pg_lsn_minmax_multi_ops', amproclefttype => 'pg_lsn',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1dd9177b01c..18e0824a08e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8411,6 +8411,9 @@
   proname => 'brin_minmax_stats', prorettype => 'bool',
   proargtypes => 'internal internal int2 int2 internal int4',
   prosrc => 'brin_minmax_stats' },
+{ oid => '9980', descr => 'BRIN minmax support',
+  proname => 'brin_minmax_ranges', prorettype => 'bool',
+  proargtypes => 'internal int2 bool', prosrc => 'brin_minmax_ranges' },
 
 # BRIN minmax multi
 { oid => '4616', descr => 'BRIN multi minmax support',
diff --git a/src/include/executor/nodeBrinSort.h b/src/include/executor/nodeBrinSort.h
new file mode 100644
index 00000000000..2c860d926ea
--- /dev/null
+++ b/src/include/executor/nodeBrinSort.h
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeBrinSort.h
+ *
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeBrinSort.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEBrinSort_H
+#define NODEBrinSort_H
+
+#include "access/genam.h"
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+
+extern BrinSortState *ExecInitBrinSort(BrinSort *node, EState *estate, int eflags);
+extern void ExecEndBrinSort(BrinSortState *node);
+extern void ExecBrinSortMarkPos(BrinSortState *node);
+extern void ExecBrinSortRestrPos(BrinSortState *node);
+extern void ExecReScanBrinSort(BrinSortState *node);
+extern void ExecBrinSortEstimate(BrinSortState *node, ParallelContext *pcxt);
+extern void ExecBrinSortInitializeDSM(BrinSortState *node, ParallelContext *pcxt);
+extern void ExecBrinSortReInitializeDSM(BrinSortState *node, ParallelContext *pcxt);
+extern void ExecBrinSortInitializeWorker(BrinSortState *node,
+										  ParallelWorkerContext *pwcxt);
+
+/*
+ * These routines are exported to share code with nodeIndexonlyscan.c and
+ * nodeBitmapBrinSort.c
+ */
+extern void ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
+								   List *quals, bool isorderby,
+								   ScanKey *scanKeys, int *numScanKeys,
+								   IndexRuntimeKeyInfo **runtimeKeys, int *numRuntimeKeys,
+								   IndexArrayKeyInfo **arrayKeys, int *numArrayKeys);
+extern void ExecIndexEvalRuntimeKeys(ExprContext *econtext,
+									 IndexRuntimeKeyInfo *runtimeKeys, int numRuntimeKeys);
+extern bool ExecIndexEvalArrayKeys(ExprContext *econtext,
+								   IndexArrayKeyInfo *arrayKeys, int numArrayKeys);
+extern bool ExecIndexAdvanceArrayKeys(IndexArrayKeyInfo *arrayKeys, int numArrayKeys);
+
+#endif							/* NODEBrinSort_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 01b1727fc09..381c2fcd3d6 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1549,6 +1549,109 @@ typedef struct IndexScanState
 	Size		iss_PscanLen;
 } IndexScanState;
 
+typedef enum {
+	BRINSORT_START,
+	BRINSORT_LOAD_RANGE,
+	BRINSORT_PROCESS_RANGE,
+	BRINSORT_LOAD_NULLS,
+	BRINSORT_PROCESS_NULLS,
+	BRINSORT_FINISHED
+} BrinSortPhase;
+
+typedef struct BrinRangeScanDesc
+{
+	/* range info tuple descriptor */
+	TupleDesc		tdesc;
+
+	/* ranges, sorted by minval, blkno_start */
+	Tuplesortstate *ranges;
+
+	/* distinct minval (sorted) */
+	Tuplestorestate *minvals;
+
+	/* slot for accessing the tuplesort/tuplestore */
+	TupleTableSlot  *slot;
+
+} BrinRangeScanDesc;
+
+/*
+ * Info about ranges for BRIN Sort.
+ */
+typedef struct BrinRange
+{
+	BlockNumber blkno_start;
+	BlockNumber blkno_end;
+
+	Datum	min_value;
+	Datum	max_value;
+	bool	has_nulls;
+	bool	all_nulls;
+	bool	not_summarized;
+
+	/*
+	 * Index of the range when ordered by min_value (if there are multiple
+	 * ranges with the same min_value, it's the lowest one).
+	 */
+	uint32	min_index;
+
+	/*
+	 * Minimum min_index from all ranges with higher max_value (i.e. when
+	 * sorted by max_value). If there are multiple ranges with the same
+	 * max_value, it depends on the ordering (i.e. the ranges may get
+	 * different min_index_lowest, depending on the exact ordering).
+	 */
+	uint32	min_index_lowest;
+} BrinRange;
+
+typedef struct BrinRanges
+{
+	int			nranges;
+	BrinRange	ranges[FLEXIBLE_ARRAY_MEMBER];
+} BrinRanges;
+
+typedef struct BrinSortState
+{
+	ScanState	ss;				/* its first field is NodeTag */
+	ExprState  *indexqualorig;
+	List	   *indexorderbyorig;
+	struct ScanKeyData *iss_ScanKeys;
+	int			iss_NumScanKeys;
+	struct ScanKeyData *iss_OrderByKeys;
+	int			iss_NumOrderByKeys;
+	IndexRuntimeKeyInfo *iss_RuntimeKeys;
+	int			iss_NumRuntimeKeys;
+	bool		iss_RuntimeKeysReady;
+	ExprContext *iss_RuntimeContext;
+	Relation	iss_RelationDesc;
+	struct IndexScanDescData *iss_ScanDesc;
+
+	/* These are needed for re-checking ORDER BY expr ordering */
+	pairingheap *iss_ReorderQueue;
+	bool		iss_ReachedEnd;
+	Datum	   *iss_OrderByValues;
+	bool	   *iss_OrderByNulls;
+	SortSupport iss_SortSupport;
+	bool	   *iss_OrderByTypByVals;
+	int16	   *iss_OrderByTypLens;
+	Size		iss_PscanLen;
+
+	/* */
+	BrinRangeScanDesc *bs_scan;
+	BrinRange	   *bs_range;
+	ExprState	   *bs_qual;
+	Datum			bs_watermark;
+	bool			bs_watermark_set;
+	BrinSortPhase	bs_phase;
+	SortSupportData	bs_sortsupport;
+
+	/*
+	 * We need two tuplesort instances - one for current range, one for
+	 * spill-over tuples from the overlapping ranges
+	 */
+	void		   *bs_tuplesortstate;
+	Tuplestorestate *bs_tuplestore;
+} BrinSortState;
+
 /* ----------------
  *	 IndexOnlyScanState information
  *
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 6bda383bead..e79c904a8fc 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -1596,6 +1596,17 @@ typedef struct IndexPath
 	Selectivity indexselectivity;
 } IndexPath;
 
+/*
+ * read sorted data from brin index
+ *
+ * We use IndexPath, because that's what amcostestimate is expecting, but
+ * we typedef it as a separate struct.
+ */
+typedef struct BrinSortPath
+{
+	IndexPath	ipath;
+} BrinSortPath;
+
 /*
  * Each IndexClause references a RestrictInfo node from the query's WHERE
  * or JOIN conditions, and shows how that restriction can be applied to
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 21e642a64c4..c4ef5362acc 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -495,6 +495,32 @@ typedef struct IndexOnlyScan
 	ScanDirection indexorderdir;	/* forward or backward or don't care */
 } IndexOnlyScan;
 
+
+typedef struct BrinSort
+{
+	Scan		scan;
+	Oid			indexid;		/* OID of index to scan */
+	List	   *indexqual;		/* list of index quals (usually OpExprs) */
+	List	   *indexqualorig;	/* the same in original form */
+	ScanDirection indexorderdir;	/* forward or backward or don't care */
+
+	/* number of sort-key columns */
+	int			numCols;
+
+	/* their indexes in the target list */
+	AttrNumber *sortColIdx pg_node_attr(array_size(numCols));
+
+	/* OIDs of operators to sort them by */
+	Oid		   *sortOperators pg_node_attr(array_size(numCols));
+
+	/* OIDs of collations */
+	Oid		   *collations pg_node_attr(array_size(numCols));
+
+	/* NULLS FIRST/LAST directions */
+	bool	   *nullsFirst pg_node_attr(array_size(numCols));
+
+} BrinSort;
+
 /* ----------------
  *		bitmap index scan node
  *
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 204e94b6d10..b77440728d1 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -69,6 +69,7 @@ extern PGDLLIMPORT bool enable_parallel_append;
 extern PGDLLIMPORT bool enable_parallel_hash;
 extern PGDLLIMPORT bool enable_partition_pruning;
 extern PGDLLIMPORT bool enable_async_append;
+extern PGDLLIMPORT bool enable_brinsort;
 extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
@@ -79,6 +80,8 @@ extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 							ParamPathInfo *param_info);
 extern void cost_index(IndexPath *path, PlannerInfo *root,
 					   double loop_count, bool partial_path);
+extern void cost_brinsort(BrinSortPath *path, PlannerInfo *root,
+						  double loop_count, bool partial_path);
 extern void cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 								  ParamPathInfo *param_info,
 								  Path *bitmapqual, double loop_count);
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 050f00e79a4..11caad3ec51 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -49,6 +49,15 @@ extern IndexPath *create_index_path(PlannerInfo *root,
 									Relids required_outer,
 									double loop_count,
 									bool partial_path);
+extern BrinSortPath *create_brinsort_path(PlannerInfo *root,
+									IndexOptInfo *index,
+									List *indexclauses,
+									List *pathkeys,
+									ScanDirection indexscandir,
+									bool indexonly,
+									Relids required_outer,
+									double loop_count,
+									bool partial_path);
 extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root,
 											   RelOptInfo *rel,
 											   Path *bitmapqual,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 41f765d3422..6aa50257730 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -213,6 +213,9 @@ extern Path *get_cheapest_fractional_path_for_pathkeys(List *paths,
 extern Path *get_cheapest_parallel_safe_total_inner(List *paths);
 extern List *build_index_pathkeys(PlannerInfo *root, IndexOptInfo *index,
 								  ScanDirection scandir);
+extern List *build_index_pathkeys_brin(PlannerInfo *root, IndexOptInfo *index,
+								  TargetEntry *tle, int idx,
+								  bool reverse_sort, bool nulls_first);
 extern List *build_partition_pathkeys(PlannerInfo *root, RelOptInfo *partrel,
 									  ScanDirection scandir, bool *partialkeys);
 extern List *build_expression_pathkey(PlannerInfo *root, Expr *expr,
-- 
2.25.1

