From b567a8a34ea9b384cd2115700de56ea20b80510f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Fri, 10 Feb 2023 22:35:43 +0100
Subject: [PATCH 2/6] Sort the array in brinrescan() and do binsearch

This allows optimizing the check in brin_minmax_consistent.
---
 src/backend/access/brin/brin.c        |  69 ++++++++++
 src/backend/access/brin/brin_minmax.c | 175 +++++++++++++++++++++-----
 2 files changed, 210 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index a600f37c15..40a631d376 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -38,6 +38,7 @@
 #include "utils/datum.h"
 #include "utils/guc.h"
 #include "utils/index_selfuncs.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -721,6 +722,16 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	return totalpages * 10;
 }
 
+static int
+compare_array_values(const void *a, const void *b, void *arg)
+{
+	Datum	da = * (Datum *) a;
+	Datum	db = * (Datum *) b;
+	SortSupport	ssup = (SortSupport) arg;
+
+	return ApplySortComparator(da, false, db, false, ssup);
+}
+
 /*
  * Re-initialize state for a BRIN index scan
  */
@@ -736,6 +747,64 @@ brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 	 * here someday, too.
 	 */
 
+	/*
+	 * Presort the array for SK_SEARCHARRAY scan keys.
+	 *
+	 * We simply stash the value back into the ScanKey, because that way it's
+	 * transparent for the opclass. But there's a couple issues with this:
+	 *
+	 * 1) On rescans, we'll preprocess the array again, unnecessarily. And the
+	 * memory is likely retained too, so this may be a memory leak.
+	 *
+	 * 2) It assumes we want to preprocess the keys into the same data type.
+	 * That works for minmax (where we just sort the array), but may not work
+	 * for other opclasses - e.g. for "bloom" we might want pre-compute the
+	 * h1/h2 hashes (two uint64 values) and "inclusion" might try building
+	 * a union of the values, or something like that. And none of this fits
+	 * into an array of the same data type (which may be swapped into ScanKey).
+	 *
+	 * FIXME Needs to handle NULLs correctly.
+	 */
+	for (int i = 0; i < nscankeys; i++)
+	{
+		ScanKey		key = &scankey[i];
+		ArrayType  *arrayval;
+		int16		elmlen;
+		bool		elmbyval;
+		char		elmalign;
+		int			num_elems;
+		Datum	   *elem_values;
+		bool	   *elem_nulls;
+		TypeCacheEntry *type;
+		SortSupportData ssup;
+
+		if (!(key->sk_flags & SK_SEARCHNULL))
+			continue;
+
+		arrayval = DatumGetArrayTypeP(key->sk_argument);
+
+		get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
+							 &elmlen, &elmbyval, &elmalign);
+
+		deconstruct_array(arrayval,
+						  ARR_ELEMTYPE(arrayval),
+						  elmlen, elmbyval, elmalign,
+						  &elem_values, &elem_nulls, &num_elems);
+
+		type = lookup_type_cache(ARR_ELEMTYPE(arrayval), TYPECACHE_LT_OPR);
+
+		memset(&ssup, 0, sizeof(SortSupportData));
+		PrepareSortSupportFromOrderingOp(type->lt_opr, &ssup);
+
+		qsort_interruptible(elem_values, num_elems, sizeof(Datum),
+							compare_array_values, &ssup);
+
+		arrayval = construct_array_builtin(elem_values, num_elems,
+										   ARR_ELEMTYPE(arrayval));
+
+		key->sk_argument = PointerGetDatum(arrayval);
+	}
+
 	if (scankey && scan->numberOfKeys > 0)
 		memmove(scan->keyData, scankey,
 				scan->numberOfKeys * sizeof(ScanKeyData));
diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c
index c054d8ff42..facc01901a 100644
--- a/src/backend/access/brin/brin_minmax.c
+++ b/src/backend/access/brin/brin_minmax.c
@@ -22,6 +22,7 @@
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "utils/sortsupport.h"
 
 typedef struct MinmaxOpaque
 {
@@ -127,6 +128,63 @@ brin_minmax_add_value(PG_FUNCTION_ARGS)
 	PG_RETURN_BOOL(updated);
 }
 
+
+static int
+compare_array_values(const void *a, const void *b, void *arg)
+{
+	Datum	da = * (Datum *) a;
+	Datum	db = * (Datum *) b;
+	SortSupport	ssup = (SortSupport) arg;
+
+	return ApplySortComparator(da, false, db, false, ssup);
+}
+
+/*
+ * lower_boundary
+ *		Determine lowest index so that (values[index] >= minvalue).
+ *
+ * The array of values is expected to be sorted, so this is the first value
+ * that may fall into the [minvalue, maxvalue] range, as it exceeds minval.
+ * It's not guaranteed, though, as it might exceed maxvalue too.
+ */
+static int
+lower_boundary(Datum *values, int nvalues, Datum minvalue, SortSupport ssup)
+{
+	int		start = 0,
+			end = (nvalues - 1);
+
+	/* everything exceeds minval and might match */
+	if (compare_array_values(&minvalue, &values[start], ssup) <= 0)
+		return 0;
+
+	/* nothing could match */
+	if (compare_array_values(&minvalue, &values[end], ssup) > 0)
+		return nvalues;
+
+	while ((end - start) > 0)
+	{
+		int midpoint;
+		int r;
+
+		midpoint = start + (end - start) / 2;
+
+		r = compare_array_values(&minvalue, &values[midpoint], ssup);
+
+		if (r > 0)
+			start = Max(midpoint, start + 1);
+		else
+			end = midpoint;
+	}
+
+	/* the value should meet the (v >=minvalue) requirement */
+	Assert(compare_array_values(&values[start], &minvalue, ssup) >= 0);
+
+	/* we know start can't be 0, so it's legal to subtract 1 */
+	Assert(compare_array_values(&values[start-1], &minvalue, ssup) < 0);
+
+	return start;
+}
+
 /*
  * Given an index tuple corresponding to a certain page range and a scan key,
  * return whether the scan key is consistent with the index tuple's min/max
@@ -223,6 +281,10 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
 		get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
 							 &elmlen, &elmbyval, &elmalign);
 
+		/*
+		 * FIXME We shouldn't deconstruct the array over and over for each page
+		 * range. We should preprocess it once, and then just use it.
+		 */
 		deconstruct_array(arrayval,
 						  ARR_ELEMTYPE(arrayval),
 						  elmlen, elmbyval, elmalign,
@@ -232,19 +294,14 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
 		{
 			case BTLessStrategyNumber:
 			case BTLessEqualStrategyNumber:
-
-				for (int j = 0; j < num_elems; j++)
-				{
-					Datum val = elem_values[j];
-
-					finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
-														 key->sk_strategy);
-					matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
-												val);
-
-					if (DatumGetBool(matches))
-						break;
-				}
+				/*
+				 * Check the last (largest) value in the array - at least this
+				 * value has to exceed the range minval.
+				 */
+				finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+													 key->sk_strategy);
+				matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
+											elem_values[num_elems-1]);
 				break;
 			case BTEqualStrategyNumber:
 
@@ -253,44 +310,94 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
 				 * the current page range if the minimum value in the range <=
 				 * scan key, and the maximum value >= scan key.
 				 *
-				 * XXX For any of the array values.
+				 * We do this in two phases. We check the array min/max values to see
+				 * if there even can be a matching value, and if yes we do a binary
+				 * search to find the first value that exceeds range minval. And then
+				 * we check if it actually matches the range.
+				 *
+				 * XXX The first phase is probably unnecessary, because lower_bound()
+				 * does pretty much exactly that too.
 				 */
-				for (int j = 0; j < num_elems; j++)
 				{
-					Datum val = elem_values[j];
+					Datum val;
+					SortSupportData ssup;
+					int			lower;
+					TypeCacheEntry *type;
+
+					/* Is the first (smallest) value after the BRIN range? */
+					val = elem_values[0];
 
 					finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
 														 BTLessEqualStrategyNumber);
-					matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
-												val);
+					matches = FunctionCall2Coll(finfo, colloid, val, column->bv_values[1]);
+
+					/* minval > max(range values) */
 					if (!DatumGetBool(matches))
-						continue;
+						break;
+
+					/* Is the last (largest) value before the BRIN range? */
+					val = elem_values[num_elems-1];
 
-					/* max() >= scankey */
 					finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
 														 BTGreaterEqualStrategyNumber);
-					matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
-												val);
+					matches = FunctionCall2Coll(finfo, colloid, val, column->bv_values[0]);
 
-					if (DatumGetBool(matches))
+					/* maxval < min(range values) */
+					if (!DatumGetBool(matches))
 						break;
-				}
-				break;
-			case BTGreaterEqualStrategyNumber:
-			case BTGreaterStrategyNumber:
 
-				for (int j = 0; j < num_elems; j++)
-				{
-					Datum val = elem_values[j];
+					/*
+					 * OK, there might be some values matching the range. We have
+					 * to search them one by one, or perhaps try binsearch.
+					 */
+					type = lookup_type_cache(ARR_ELEMTYPE(arrayval), TYPECACHE_LT_OPR);
+
+					memset(&ssup, 0, sizeof(SortSupportData));
+					PrepareSortSupportFromOrderingOp(type->lt_opr, &ssup);
 
+					lower = lower_boundary(elem_values, num_elems, column->bv_values[0], &ssup);
+
+					/* no elements can possibly match */
+					if (lower == num_elems)
+					{
+						matches = BoolGetDatum(false);
+						break;
+					}
+
+					/*
+					 * OK, the first element must match the upper boundary too
+					 * (if it does not, no following elements can).
+					 */
+					val = elem_values[lower];
+
+					/*
+					 * In the equality case (WHERE col = someval), we want to return
+					 * the current page range if the minimum value in the range <=
+					 * scan key, and the maximum value >= scan key.
+					 */
 					finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
-														 key->sk_strategy);
-					matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
+														 BTLessEqualStrategyNumber);
+					matches = FunctionCall2Coll(finfo, colloid, column->bv_values[0],
 												val);
-
-					if (DatumGetBool(matches))
+					if (!DatumGetBool(matches))
 						break;
+					/* max() >= scankey */
+					finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+														 BTGreaterEqualStrategyNumber);
+					matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
+												val);
+					break;
 				}
+			case BTGreaterEqualStrategyNumber:
+			case BTGreaterStrategyNumber:
+				/*
+				 * Check the first (smallest) value in the array - at least this
+				 * value has to be smaller than the range maxval.
+				 */
+				finfo = minmax_get_strategy_procinfo(bdesc, attno, subtype,
+													 key->sk_strategy);
+				matches = FunctionCall2Coll(finfo, colloid, column->bv_values[1],
+											elem_values[0]);
 				break;
 			default:
 				/* shouldn't happen */
-- 
2.39.1

