From f9d5aee070eef54d5898d4ee7862d7900aa59209 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sat, 11 Feb 2023 15:23:25 +0100
Subject: [PATCH 4/6] Support SK_SEARCHARRAY in BRIN inclusion

---
 src/backend/access/brin/brin_inclusion.c | 177 ++++++++++++++++-------
 1 file changed, 123 insertions(+), 54 deletions(-)

diff --git a/src/backend/access/brin/brin_inclusion.c b/src/backend/access/brin/brin_inclusion.c
index 248116c149..935452d09d 100644
--- a/src/backend/access/brin/brin_inclusion.c
+++ b/src/backend/access/brin/brin_inclusion.c
@@ -30,6 +30,7 @@
 #include "access/skey.h"
 #include "catalog/pg_amop.h"
 #include "catalog/pg_type.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
@@ -239,43 +240,20 @@ brin_inclusion_add_value(PG_FUNCTION_ARGS)
 }
 
 /*
- * BRIN inclusion consistent function
- *
- * We're no longer dealing with NULL keys in the consistent function, that is
- * now handled by the AM code. That means we should not get any all-NULL ranges
- * either, because those can't be consistent with regular (not [IS] NULL) keys.
+ * Check consistency of a single scalar value with the BRIN range.
  *
- * All of the strategies are optional.
+ * Called for both scalar scankeys and for each value in SK_SEARCHARRAY.
  */
-Datum
-brin_inclusion_consistent(PG_FUNCTION_ARGS)
+static bool
+brin_inclusion_consistent_value(BrinDesc *bdesc, BrinValues *column,
+								AttrNumber attno,
+								StrategyNumber strategy, Oid subtype,
+								Oid colloid, Datum unionval, Datum query)
 {
-	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
-	BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
-	ScanKey		key = (ScanKey) PG_GETARG_POINTER(2);
-	Oid			colloid = PG_GET_COLLATION(),
-				subtype;
-	Datum		unionval;
-	AttrNumber	attno;
-	Datum		query;
 	FmgrInfo   *finfo;
 	Datum		result;
 
-	/* This opclass uses the old signature with only three arguments. */
-	Assert(PG_NARGS() == 3);
-
-	/* Should not be dealing with all-NULL ranges. */
-	Assert(!column->bv_allnulls);
-
-	/* It has to be checked, if it contains elements that are not mergeable. */
-	if (DatumGetBool(column->bv_values[INCLUSION_UNMERGEABLE]))
-		PG_RETURN_BOOL(true);
-
-	attno = key->sk_attno;
-	subtype = key->sk_subtype;
-	query = key->sk_argument;
-	unionval = column->bv_values[INCLUSION_UNION];
-	switch (key->sk_strategy)
+	switch (strategy)
 	{
 			/*
 			 * Placement strategies
@@ -294,49 +272,49 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverRightStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTOverLeftStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTRightStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTOverRightStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTRightStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTBelowStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverAboveStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTOverBelowStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTAboveStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTOverAboveStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTBelowStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		case RTAboveStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverBelowStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 			/*
 			 * Overlap and contains strategies
@@ -352,9 +330,9 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 		case RTSubStrategyNumber:
 		case RTSubEqualStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
-													key->sk_strategy);
+													strategy);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_DATUM(result);
+			return (DatumGetBool(result));
 
 			/*
 			 * Contained by strategies
@@ -374,9 +352,9 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTOverlapStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return (true);
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return (column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 			/*
 			 * Adjacent strategy
@@ -393,12 +371,12 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTOverlapStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return (true);
 
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTAdjacentStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_DATUM(result);
+			return (DatumGetBool(result));
 
 			/*
 			 * Basic comparison strategies
@@ -428,9 +406,9 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTRightStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (!DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return (true);
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return (column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 		case RTSameStrategyNumber:
 		case RTEqualStrategyNumber:
@@ -438,30 +416,121 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTContainsStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return (true);
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return (column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 		case RTGreaterEqualStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (!DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return (true);
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return (column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 		case RTGreaterStrategyNumber:
 			/* no need to check for empty elements */
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return (!DatumGetBool(result));
 
 		default:
 			/* shouldn't happen */
-			elog(ERROR, "invalid strategy number %d", key->sk_strategy);
-			PG_RETURN_BOOL(false);
+			elog(ERROR, "invalid strategy number %d", strategy);
+			return (false);
+	}
+}
+
+/*
+ * BRIN inclusion consistent function
+ *
+ * We're no longer dealing with NULL keys in the consistent function, that is
+ * now handled by the AM code. That means we should not get any all-NULL ranges
+ * either, because those can't be consistent with regular (not [IS] NULL) keys.
+ *
+ * All of the strategies are optional.
+ */
+Datum
+brin_inclusion_consistent(PG_FUNCTION_ARGS)
+{
+	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
+	BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
+	ScanKey		key = (ScanKey) PG_GETARG_POINTER(2);
+	Oid			colloid = PG_GET_COLLATION(),
+				subtype;
+	Datum		unionval;
+	AttrNumber	attno;
+	Datum		query;
+
+	/* This opclass uses the old signature with only three arguments. */
+	Assert(PG_NARGS() == 3);
+
+	/* Should not be dealing with all-NULL ranges. */
+	Assert(!column->bv_allnulls);
+
+	/* It has to be checked, if it contains elements that are not mergeable. */
+	if (DatumGetBool(column->bv_values[INCLUSION_UNMERGEABLE]))
+		PG_RETURN_BOOL(true);
+
+	attno = key->sk_attno;
+	subtype = key->sk_subtype;
+	query = key->sk_argument;
+	unionval = column->bv_values[INCLUSION_UNION];
+
+	/*
+	 * For regular (scalar) scan keys, we simply compare the value to the
+	 * range min/max values, and we're done. For SK_SEARCHARRAY keys we
+	 * need to deparse the array and loop through the values.
+	 */
+	if (likely(!(key->sk_flags & SK_SEARCHARRAY)))
+	{
+		bool tmp;
+
+		tmp = brin_inclusion_consistent_value(bdesc, column, attno,
+											  key->sk_strategy,
+											  subtype, colloid,
+											  unionval, query);
+		PG_RETURN_BOOL(tmp);
+	}
+	else
+	{
+		ArrayType  *arrayval;
+		int16		elmlen;
+		bool		elmbyval;
+		char		elmalign;
+		int			num_elems;
+		Datum	   *elem_values;
+		bool	   *elem_nulls;
+		bool		matches = false;
+
+		arrayval = DatumGetArrayTypeP(key->sk_argument);
+
+		get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
+							 &elmlen, &elmbyval, &elmalign);
+
+		deconstruct_array(arrayval,
+						  ARR_ELEMTYPE(arrayval),
+						  elmlen, elmbyval, elmalign,
+						  &elem_values, &elem_nulls, &num_elems);
+
+		/* have to loop through all elements, having them sorted does not help */
+		for (int i = 0; i < num_elems; i++)
+		{
+			Datum 	query_element = elem_values[i];
+
+			matches = brin_inclusion_consistent_value(bdesc, column, attno,
+													  key->sk_strategy,
+													  subtype, colloid,
+													  unionval, query_element);
+
+			if (matches)
+				break;
+		}
+
+		/* we could get here for empty array, e.g. with "@> '{}'::point[]" */
+		PG_RETURN_BOOL(matches);
 	}
 }
 
-- 
2.39.1

