From a0125596277647b593bc7d1b5df2bbbfe859cbd1 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sat, 11 Feb 2023 19:43:38 +0100
Subject: [PATCH 3/6] Support SK_SEARCHARRAY in BRIN minmax-multi

Similar approach to minmax, but the issues with deconstructing the array
over and over are even more serious.
---
 src/backend/access/brin/brin_minmax_multi.c | 388 ++++++++++++++++----
 1 file changed, 322 insertions(+), 66 deletions(-)

diff --git a/src/backend/access/brin/brin_minmax_multi.c b/src/backend/access/brin/brin_minmax_multi.c
index 0ace6035be..43e9143f2a 100644
--- a/src/backend/access/brin/brin_minmax_multi.c
+++ b/src/backend/access/brin/brin_minmax_multi.c
@@ -2562,6 +2562,63 @@ brin_minmax_multi_add_value(PG_FUNCTION_ARGS)
 	PG_RETURN_BOOL(modified);
 }
 
+
+static int
+compare_array_values(const void *a, const void *b, void *arg)
+{
+	Datum	da = * (Datum *) a;
+	Datum	db = * (Datum *) b;
+	SortSupport	ssup = (SortSupport) arg;
+
+	return ApplySortComparator(da, false, db, false, ssup);
+}
+
+/*
+ * lower_boundary
+ *		Determine lowest index so that (values[index] >= minvalue).
+ *
+ * The array of values is expected to be sorted, so this is the first value
+ * that may fall into the [minvalue, maxvalue] range, as it exceeds minval.
+ * It's not guaranteed, though, as it might exceed maxvalue too.
+ */
+static int
+lower_boundary(Datum *values, int nvalues, Datum minvalue, SortSupport ssup)
+{
+	int		start = 0,
+			end = (nvalues - 1);
+
+	/* everything exceeds minval and might match */
+	if (compare_array_values(&minvalue, &values[start], ssup) <= 0)
+		return 0;
+
+	/* nothing could match */
+	if (compare_array_values(&minvalue, &values[end], ssup) > 0)
+		return nvalues;
+
+	while ((end - start) > 0)
+	{
+		int midpoint;
+		int r;
+
+		midpoint = start + (end - start) / 2;
+
+		r = compare_array_values(&minvalue, &values[midpoint], ssup);
+
+		if (r > 0)
+			start = Max(midpoint, start + 1);
+		else
+			end = midpoint;
+	}
+
+	/* the value should meet the (v >=minvalue) requirement */
+	Assert(compare_array_values(&values[start], &minvalue, ssup) >= 0);
+
+	/* we know start can't be 0, so it's legal to subtract 1 */
+	Assert(compare_array_values(&values[start-1], &minvalue, ssup) < 0);
+
+	return start;
+}
+
 /*
  * Given an index tuple corresponding to a certain page range and a scan key,
  * return whether the scan key is consistent with the index tuple's min/max
@@ -2591,6 +2648,15 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 	serialized = (SerializedRanges *) PG_DETOAST_DATUM(column->bv_values[0]);
 	ranges = brin_range_deserialize(serialized->maxvalues, serialized);
 
+	/*
+	 * XXX Would it make sense to have a quick initial check on the whole
+	 * summary? We know most page ranges are not expected to match, and we
+	 * know the ranges/values are sorted so we could check global min/max
+	 * (essentially what regular minmax is doing) and bail if no match is
+	 * possible. That should be cheap and might save a lot on inspecting
+	 * the individual ranges/values.
+	 */
+
 	/* inspect the ranges, and for each one evaluate the scan keys */
 	for (rangeno = 0; rangeno < ranges->nranges; rangeno++)
 	{
@@ -2611,67 +2677,205 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 			attno = key->sk_attno;
 			subtype = key->sk_subtype;
 			value = key->sk_argument;
-			switch (key->sk_strategy)
+
+			if (likely(!(key->sk_flags & SK_SEARCHARRAY)))
 			{
-				case BTLessStrategyNumber:
-				case BTLessEqualStrategyNumber:
-					finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
-															   key->sk_strategy);
-					/* first value from the array */
-					matches = FunctionCall2Coll(finfo, colloid, minval, value);
-					break;
+				switch (key->sk_strategy)
+				{
+					case BTLessStrategyNumber:
+					case BTLessEqualStrategyNumber:
+						finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																   key->sk_strategy);
+						/* first value from the array */
+						matches = FunctionCall2Coll(finfo, colloid, minval, value);
+						break;
 
-				case BTEqualStrategyNumber:
-					{
-						Datum		compar;
-						FmgrInfo   *cmpFn;
+					case BTEqualStrategyNumber:
+						{
+							Datum		compar;
+							FmgrInfo   *cmpFn;
+
+							/* by default this range does not match */
+							matches = false;
+
+							/*
+							 * Otherwise, need to compare the new value with
+							 * boundaries of all the ranges. First check if it's
+							 * less than the absolute minimum, which is the first
+							 * value in the array.
+							 */
+							cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																	   BTGreaterStrategyNumber);
+							compar = FunctionCall2Coll(cmpFn, colloid, minval, value);
+
+							/* smaller than the smallest value in this range */
+							if (DatumGetBool(compar))
+								break;
+
+							cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																	   BTLessStrategyNumber);
+							compar = FunctionCall2Coll(cmpFn, colloid, maxval, value);
+
+							/* larger than the largest value in this range */
+							if (DatumGetBool(compar))
+								break;
+
+							/*
+							 * We haven't managed to eliminate this range, so
+							 * consider it matching.
+							 */
+							matches = true;
 
-						/* by default this range does not match */
-						matches = false;
+							break;
+						}
+					case BTGreaterEqualStrategyNumber:
+					case BTGreaterStrategyNumber:
+						finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																   key->sk_strategy);
+						/* last value from the array */
+						matches = FunctionCall2Coll(finfo, colloid, maxval, value);
+						break;
 
-						/*
-						 * Otherwise, need to compare the new value with
-						 * boundaries of all the ranges. First check if it's
-						 * less than the absolute minimum, which is the first
-						 * value in the array.
-						 */
-						cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
-																   BTGreaterStrategyNumber);
-						compar = FunctionCall2Coll(cmpFn, colloid, minval, value);
+					default:
+						/* shouldn't happen */
+						elog(ERROR, "invalid strategy number %d", key->sk_strategy);
+						matches = 0;
+						break;
+				}
+			}
+			else
+			{
+				/*
+				 * FIXME This is really wrong, because it deserializes the
+				 * array over and over for each value in the minmax-multi
+				 * summary range.
+				 *
+				 * FIXME In fact, this is even worse than for brin_minmax.c
+				 * because we deconstruct it for every range in the summary
+				 * (so if there are 32 values, that's 16 ranges, and we'll
+				 * deconstruct it again for each of those).
+				 *
+				 * XXX We could deconstruct it once, when we need it for the
+				 * first time. But even better we should do it only once while
+				 * preprocessing the scan keys.
+				 */
+				ArrayType  *arrayval;
+				int16		elmlen;
+				bool		elmbyval;
+				char		elmalign;
+				int			num_elems;
+				Datum	   *elem_values;
+				bool	   *elem_nulls;
 
-						/* smaller than the smallest value in this range */
-						if (DatumGetBool(compar))
-							break;
+				arrayval = DatumGetArrayTypeP(key->sk_argument);
 
-						cmpFn = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
-																   BTLessStrategyNumber);
-						compar = FunctionCall2Coll(cmpFn, colloid, maxval, value);
+				get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
+									 &elmlen, &elmbyval, &elmalign);
 
-						/* larger than the largest value in this range */
-						if (DatumGetBool(compar))
-							break;
+				deconstruct_array(arrayval,
+								  ARR_ELEMTYPE(arrayval),
+								  elmlen, elmbyval, elmalign,
+								  &elem_values, &elem_nulls, &num_elems);
+
+				switch (key->sk_strategy)
+				{
+					case BTLessStrategyNumber:
+					case BTLessEqualStrategyNumber:
+						finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																   key->sk_strategy);
+						/* first value from the array */
+						matches = FunctionCall2Coll(finfo, colloid, minval,
+													elem_values[num_elems-1]);
+						break;
+
+					case BTEqualStrategyNumber:
 
 						/*
-						 * We haven't managed to eliminate this range, so
-						 * consider it matching.
+						 * See brin_minmax.c for description of what this is doing.
 						 */
-						matches = true;
-
+						{
+							Datum val;
+							SortSupportData ssup;
+							int			lower;
+							TypeCacheEntry *type;
+
+							/* Is the first (smallest) value after the BRIN range? */
+							val = elem_values[0];
+
+							finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																	   BTLessEqualStrategyNumber);
+							matches = FunctionCall2Coll(finfo, colloid, val, maxval);
+
+							/* minval > max(range values) */
+							if (!DatumGetBool(matches))
+								break;
+
+							/* Is the last (largest) value before the BRIN range? */
+							val = elem_values[num_elems-1];
+
+							finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																	   BTGreaterEqualStrategyNumber);
+							matches = FunctionCall2Coll(finfo, colloid, val, minval);
+
+							/* maxval < min(range values) */
+							if (!DatumGetBool(matches))
+								break;
+
+							/*
+							 * OK, there might be some values matching the range. We have
+							 * to search them one by one, or perhaps try binsearch.
+							 */
+							type = lookup_type_cache(ARR_ELEMTYPE(arrayval), TYPECACHE_LT_OPR);
+
+							memset(&ssup, 0, sizeof(SortSupportData));
+							PrepareSortSupportFromOrderingOp(type->lt_opr, &ssup);
+
+							lower = lower_boundary(elem_values, num_elems, minval, &ssup);
+
+							/* no elements can possibly match */
+							if (lower == num_elems)
+							{
+								matches = BoolGetDatum(false);
+								break;
+							}
+
+							/*
+							 * OK, the first element must match the upper boundary too
+							 * (if it does not, no following elements can).
+							 */
+							val = elem_values[lower];
+
+							/*
+							 * In the equality case (WHERE col = someval), we want to return
+							 * the current page range if the minimum value in the range <=
+							 * scan key, and the maximum value >= scan key.
+							 */
+							finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																	   BTLessEqualStrategyNumber);
+							matches = FunctionCall2Coll(finfo, colloid, minval, val);
+							if (!DatumGetBool(matches))
+								break;
+							/* max() >= scankey */
+							finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																	   BTGreaterEqualStrategyNumber);
+							matches = FunctionCall2Coll(finfo, colloid, maxval, val);
+							break;
+						}
+					case BTGreaterEqualStrategyNumber:
+					case BTGreaterStrategyNumber:
+						finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																   key->sk_strategy);
+						/* last value from the array */
+						matches = FunctionCall2Coll(finfo, colloid, maxval,
+													elem_values[0]);
 						break;
-					}
-				case BTGreaterEqualStrategyNumber:
-				case BTGreaterStrategyNumber:
-					finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
-															   key->sk_strategy);
-					/* last value from the array */
-					matches = FunctionCall2Coll(finfo, colloid, maxval, value);
-					break;
 
-				default:
-					/* shouldn't happen */
-					elog(ERROR, "invalid strategy number %d", key->sk_strategy);
-					matches = 0;
-					break;
+					default:
+						/* shouldn't happen */
+						elog(ERROR, "invalid strategy number %d", key->sk_strategy);
+						matches = 0;
+						break;
+				}
 			}
 
 			/* the range has to match all the scan keys */
@@ -2713,28 +2917,80 @@ brin_minmax_multi_consistent(PG_FUNCTION_ARGS)
 			attno = key->sk_attno;
 			subtype = key->sk_subtype;
 			value = key->sk_argument;
-			switch (key->sk_strategy)
+			if (likely(!(key->sk_flags & SK_SEARCHARRAY)))
 			{
-				case BTLessStrategyNumber:
-				case BTLessEqualStrategyNumber:
-				case BTEqualStrategyNumber:
-				case BTGreaterEqualStrategyNumber:
-				case BTGreaterStrategyNumber:
-
-					finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
-															   key->sk_strategy);
-					matches = FunctionCall2Coll(finfo, colloid, val, value);
-					break;
+				switch (key->sk_strategy)
+				{
+					case BTLessStrategyNumber:
+					case BTLessEqualStrategyNumber:
+					case BTEqualStrategyNumber:
+					case BTGreaterEqualStrategyNumber:
+					case BTGreaterStrategyNumber:
+
+						finfo = minmax_multi_get_strategy_procinfo(bdesc, attno, subtype,
+																   key->sk_strategy);
+						matches = FunctionCall2Coll(finfo, colloid, val, value);
+						break;
 
-				default:
-					/* shouldn't happen */
-					elog(ERROR, "invalid strategy number %d", key->sk_strategy);
-					matches = 0;
-					break;
+					default:
+						/* shouldn't happen */
+						elog(ERROR, "invalid strategy number %d", key->sk_strategy);
+						matches = 0;
+						break;
+				}
+			}
+			else
+			{
+				/*
+				 * FIXME This is really wrong, because it deserializes the
+				 * array over and over for each value in the minmax-multi
+				 * summary.
+				 */
+				ArrayType  *arrayval;
+				int16		elmlen;
+				bool		elmbyval;
+				char		elmalign;
+				int			num_elems;
+				Datum	   *elem_values;
+				bool	   *elem_nulls;
+
+				SortSupportData ssup;
+				int			lower;
+				TypeCacheEntry *type;
+
+				arrayval = DatumGetArrayTypeP(key->sk_argument);
+
+				get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
+									 &elmlen, &elmbyval, &elmalign);
+
+				deconstruct_array(arrayval,
+								  ARR_ELEMTYPE(arrayval),
+								  elmlen, elmbyval, elmalign,
+								  &elem_values, &elem_nulls, &num_elems);
+
+				/* assume not maches */
+				matches = false;
+
+				/*
+				 * OK, there might be some values matching the range. We have
+				 * to search them one by one, or perhaps try binsearch.
+				 */
+				type = lookup_type_cache(ARR_ELEMTYPE(arrayval), TYPECACHE_LT_OPR);
+
+				memset(&ssup, 0, sizeof(SortSupportData));
+				PrepareSortSupportFromOrderingOp(type->lt_opr, &ssup);
+
+				lower = lower_boundary(elem_values, num_elems, value, &ssup);
+
+				if ((lower < num_elems) &&
+					(compare_array_values(&elem_values[lower], &value, &ssup) == 0))
+				{
+					matches = true;
+				}
 			}
 
 			/* the range has to match all the scan keys */
-			matching &= DatumGetBool(matches);
+			matching &= matches;
 
 			/* once we find a non-matching key, we're done */
 			if (!matching)
-- 
2.39.1

