From 34375eafec828bc96ffbbbd6e2eb432ef5424daa Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <arseniy.mukhin.dev@gmail.com>
Date: Sun, 6 Jul 2025 21:47:05 +0300
Subject: [PATCH v6 5/5] using withinRange function for heap all indexed check

---
 contrib/amcheck/amcheck--1.5--1.6.sql   |   5 +-
 contrib/amcheck/expected/check_brin.out |   4 +-
 contrib/amcheck/sql/check_brin.sql      |   4 +-
 contrib/amcheck/verify_brin.c           | 292 +++++-------------------
 4 files changed, 57 insertions(+), 248 deletions(-)

diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql
index 6337e065bb1..d4f44495bba 100644
--- a/contrib/amcheck/amcheck--1.5--1.6.sql
+++ b/contrib/amcheck/amcheck--1.5--1.6.sql
@@ -9,12 +9,11 @@
 --
 CREATE FUNCTION brin_index_check(index regclass,
                                  regularpagescheck boolean default false,
-                                 heapallindexed boolean default false,
-                                 consistent_operator_names text[] default '{}'
+                                 heapallindexed boolean default false
 )
     RETURNS VOID
 AS 'MODULE_PATHNAME', 'brin_index_check'
 LANGUAGE C STRICT PARALLEL RESTRICTED;
 
 -- We don't want this to be available to public
-REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC;
\ No newline at end of file
+REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean) FROM PUBLIC;
\ No newline at end of file
diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out
index 0aa90dafa20..05067858aa9 100644
--- a/contrib/amcheck/expected/check_brin.out
+++ b/contrib/amcheck/expected/check_brin.out
@@ -113,7 +113,7 @@ SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, rando
 FROM generate_series(1, 10000);
 -- create some empty ranges
 DELETE FROM brintest WHERE id > 2000 AND id < 4000;
-SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}');
+SELECT brin_index_check('brintest_idx'::REGCLASS, true, true);
  brin_index_check 
 ------------------
  
@@ -122,7 +122,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}');
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
-SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}');
+SELECT brin_index_check('brintest_idx'::REGCLASS, true, true);
  brin_index_check 
 ------------------
  
diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql
index 0f58567f76f..7993ee0f4d9 100644
--- a/contrib/amcheck/sql/check_brin.sql
+++ b/contrib/amcheck/sql/check_brin.sql
@@ -88,12 +88,12 @@ FROM generate_series(1, 10000);
 -- create some empty ranges
 DELETE FROM brintest WHERE id > 2000 AND id < 4000;
 
-SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}');
+SELECT brin_index_check('brintest_idx'::REGCLASS, true, true);
 
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
-SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}');
+SELECT brin_index_check('brintest_idx'::REGCLASS, true, true);
 -- cleanup
 DROP TABLE brintest;
 
diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c
index 01a69b616cc..a60499978b3 100644
--- a/contrib/amcheck/verify_brin.c
+++ b/contrib/amcheck/verify_brin.c
@@ -40,7 +40,6 @@ typedef struct BrinCheckState
 
 	bool		regularpagescheck;
 	bool		heapallindexed;
-	ArrayType  *consistent_oper_names;
 
 	/* BRIN check common fields */
 
@@ -71,14 +70,9 @@ typedef struct BrinCheckState
 
 	/* Heap all indexed check fields */
 
-	String	  **operatorNames;
 	BrinRevmap *revmap;
 	Buffer		buf;
-	FmgrInfo   *consistentFn;
-	/* Scan keys for regular values */
-	ScanKey    *nonnull_sk;
-	/* Scan keys for null values */
-	ScanKey    *isnull_sk;
+	FmgrInfo   *withinRangeFn;
 	double		range_cnt;
 	/* first block of the next range */
 	BlockNumber nextrangeBlk;
@@ -115,8 +109,6 @@ static ItemId PageGetItemIdCareful(BrinCheckState * state);
 
 static void check_heap_all_indexed(BrinCheckState * state);
 
-static void check_and_prepare_operator_names(BrinCheckState * state);
-
 static void brin_check_callback(Relation index,
 								ItemPointer tid,
 								Datum *values,
@@ -126,10 +118,6 @@ static void brin_check_callback(Relation index,
 
 static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid);
 
-static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno);
-
-static ScanKey prepare_isnull_scan_key(AttrNumber attno);
-
 static void brin_check_ereport(BrinCheckState * state, const char *fmt);
 
 static void revmap_item_ereport(BrinCheckState * state, const char *fmt);
@@ -148,7 +136,6 @@ brin_index_check(PG_FUNCTION_ARGS)
 
 	state->regularpagescheck = PG_GETARG_BOOL(1);
 	state->heapallindexed = PG_GETARG_BOOL(2);
-	state->consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3);
 
 	amcheck_lock_relation_and_check(indrelid,
 									BRIN_AM_OID,
@@ -173,15 +160,6 @@ brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonl
 	state->bdesc = brin_build_desc(idxrel);
 	state->natts = state->bdesc->bd_tupdesc->natts;
 
-	/*
-	 * We know how many attributes index has, so let's process operator names
-	 * array
-	 */
-	if (state->heapallindexed)
-	{
-		check_and_prepare_operator_names(state);
-	}
-
 	check_brin_index_structure(state);
 
 	if (state->heapallindexed)
@@ -848,8 +826,8 @@ PageGetItemIdCareful(BrinCheckState * state)
 /*
  * Check that every heap tuple are consistent with the index.
  *
- * Here we generate ScanKey for every heap tuple and test it against
- * appropriate range using consistentFn (for ScanKey generation logic look 'prepare_nonnull_scan_key')
+ * We use withinRange function for every nonnull value (in case
+ * of io_regular_nulls = false we use withinRange function for null values too).
  *
  * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls'
  * are not too "narrow" for each range, which means:
@@ -875,12 +853,10 @@ check_heap_all_indexed(BrinCheckState * state)
 	state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange);
 	state->dtup = brin_new_memtuple(state->bdesc);
 	state->checkable_range = false;
-	state->consistentFn = palloc0_array(FmgrInfo, state->natts);
+	state->withinRangeFn = palloc0_array(FmgrInfo, state->natts);
 	state->range_cnt = 0;
 	/* next range is the first range in the beginning */
 	state->nextrangeBlk = 0;
-	state->nonnull_sk = palloc0_array(ScanKey, state->natts);
-	state->isnull_sk = palloc0_array(ScanKey, state->natts);
 	state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext,
 											"brin check range context",
 											ALLOCSET_DEFAULT_SIZES);
@@ -888,19 +864,32 @@ check_heap_all_indexed(BrinCheckState * state)
 												"brin check tuple context",
 												ALLOCSET_DEFAULT_SIZES);
 
-	/*
-	 * Prepare "non-null" and "is_null" scan keys and consistent fn for each
-	 * attribute
-	 */
+	/* Prepare withinRange function for each attribute */
 	for (AttrNumber attno = 1; attno <= state->natts; attno++)
 	{
-		FmgrInfo   *tmp;
+		if (RegProcedureIsValid(index_getprocid(state->idxrel, attno, BRIN_PROCNUM_WITHINRANGE)))
+		{
+			FmgrInfo   *fn = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_WITHINRANGE);
 
-		tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT);
-		fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext);
+			fmgr_info_copy(&state->withinRangeFn[attno - 1], fn, CurrentMemoryContext);
+		}
+		else
+		{
+			/*
+			 * If there is no withinRange function for the attribute, notice
+			 * user about it. We continue heap all indexed check even for
+			 * indexes with just one attribute as even without support
+			 * function some errors could be detected.
+			 */
+			ereport(NOTICE,
+					errmsg("Missing support function %d for attribute %d of index \"%s\". "
+						   "Skip heap all indexed check for this attribute.",
+						   BRIN_PROCNUM_WITHINRANGE,
+						   attno,
+						   RelationGetRelationName(state->idxrel)
+						   ));
+		}
 
-		state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno);
-		state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno);
 	}
 
 	indexInfo = BuildIndexInfo(idxrel);
@@ -926,152 +915,6 @@ check_heap_all_indexed(BrinCheckState * state)
 	MemoryContextDelete(state->heaptupleCtx);
 }
 
-/*
- * Check operator names array input parameter and convert it to array of strings
- * Empty input array means we use "=" operator for every attribute
- */
-static void
-check_and_prepare_operator_names(BrinCheckState * state)
-{
-	Oid			element_type = ARR_ELEMTYPE(state->consistent_oper_names);
-	int16		typlen;
-	bool		typbyval;
-	char		typalign;
-	Datum	   *values;
-	bool	   *elem_nulls;
-	int			num_elems;
-
-	state->operatorNames = palloc(sizeof(String) * state->natts);
-
-	get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
-	deconstruct_array(state->consistent_oper_names, element_type, typlen, typbyval, typalign,
-					  &values, &elem_nulls, &num_elems);
-
-	/* If we have some input check it and convert to String** */
-	if (num_elems != 0)
-	{
-		if (num_elems != state->natts)
-		{
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("Operator names array length %u, but index has %u attributes",
-							num_elems, state->natts)));
-		}
-
-		for (int i = 0; i < num_elems; i++)
-		{
-			if (elem_nulls[i])
-			{
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("Operator names array contains NULL")));
-			}
-			state->operatorNames[i] = makeString(TextDatumGetCString(values[i]));
-		}
-	}
-	else
-	{
-		/* If there is no input just use "=" operator for all attributes */
-		for (int i = 0; i < state->natts; i++)
-		{
-			state->operatorNames[i] = makeString("=");
-		}
-	}
-}
-
-/*
- * Prepare ScanKey for index attribute.
- *
- * ConsistentFn requires ScanKey, so we need to generate ScanKey for every
- * attribute somehow. We want ScanKey that would result in TRUE for every heap
- * tuple within the range when we use its indexed value as sk_argument.
- * To generate such a ScanKey we need to define the right operand type and the strategy number.
- * Right operand type is a type of data that index is built on, so it's 'opcintype'.
- * There is no strategy number that we can always use,
- * because every opclass defines its own set of operators it supports and strategy number
- * for the same operator can differ from opclass to opclass.
- * So to get strategy number we look up an operator that gives us desired behavior
- * and which both operand types are 'opcintype' and then retrieve the strategy number for it.
- * Most of the time we can use '='. We let user define operator name in case opclass doesn't
- * support '=' operator. Also, if such operator doesn't exist, we can't proceed with the check.
- *
- * Generated once, and will be reused for all heap tuples.
- * Argument field will be filled for every heap tuple before
- * consistent function invocation, so leave it NULL for a while.
- *
- */
-static ScanKey
-prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno)
-{
-	ScanKey		scanKey;
-	Oid			opOid;
-	Oid			opFamilyOid;
-	bool		defined;
-	StrategyNumber strategy;
-	RegProcedure opRegProc;
-	List	   *operNameList;
-	int			attindex = attno - 1;
-	Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex);
-	Oid			type = state->idxrel->rd_opcintype[attindex];
-	String	   *opname = state->operatorNames[attno - 1];
-
-	opFamilyOid = state->idxrel->rd_opfamily[attindex];
-	operNameList = list_make1(opname);
-	opOid = OperatorLookup(operNameList, type, type, &defined);
-
-	if (opOid == InvalidOid)
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_UNDEFINED_FUNCTION),
-				 errmsg("There is no operator %s for type %u",
-						opname->sval, type)));
-	}
-
-	strategy = get_op_opfamily_strategy(opOid, opFamilyOid);
-
-	if (strategy == 0)
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("operator %s is not a member of operator family \"%s\"",
-						opname->sval,
-						get_opfamily_name(opFamilyOid, false))));
-	}
-
-	opRegProc = get_opcode(opOid);
-	scanKey = palloc0(sizeof(ScanKeyData));
-	ScanKeyEntryInitialize(
-						   scanKey,
-						   0,
-						   attno,
-						   strategy,
-						   type,
-						   attr->attcollation,
-						   opRegProc,
-						   (Datum) NULL
-		);
-	pfree(operNameList);
-
-	return scanKey;
-}
-
-static ScanKey
-prepare_isnull_scan_key(AttrNumber attno)
-{
-	ScanKey		scanKey;
-
-	scanKey = palloc0(sizeof(ScanKeyData));
-	ScanKeyEntryInitialize(scanKey,
-						   SK_ISNULL | SK_SEARCHNULL,
-						   attno,
-						   InvalidStrategy,
-						   InvalidOid,
-						   InvalidOid,
-						   InvalidOid,
-						   (Datum) 0);
-	return scanKey;
-}
-
 /*
  * We walk from the first range (blkno = 0) to the last as the scan proceed.
  * For every heap tuple we check if we are done with the current range, and we need to move further
@@ -1155,10 +998,8 @@ brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull
 
 /*
  * We check hasnulls flags for null values and oi_regular_nulls = true,
- * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not,
- * For all other cases we call consistentFn with appropriate scanKey:
- * - for oi_regular_nulls = false and null values we use 'isNull' scanKey,
- * - for nonnull values we use 'nonnull' scanKey
+ * check allnulls is false for all nonnull values not matter oi_regular_nulls is true or not,
+ * We use withinRangeFn for all nonnull values and null values if io_regular_nulls = false.
  */
 static void
 check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid)
@@ -1177,78 +1018,47 @@ check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls,
 	for (attindex = 0; attindex < state->natts; attindex++)
 	{
 		BrinValues *bval;
-		Datum		consistentFnResult;
-		bool		consistent;
-		ScanKey		scanKey;
+		Datum		withinRangeFnResult;
+		bool		withinRange;
 		bool		oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls;
 
 		bval = &dtup->bt_columns[attindex];
 
-		if (nulls[attindex])
+		/*
+		 * Use hasnulls flag for oi_regular_nulls is true. Otherwise, delegate
+		 * check to withinRangeFn
+		 */
+		if (nulls[attindex] && oi_regular_nulls)
 		{
-			/*
-			 * Use hasnulls flag for oi_regular_nulls is true. Otherwise,
-			 * delegate check to consistentFn
-			 */
-			if (oi_regular_nulls)
+			/* We have null value, so hasnulls or allnulls must be true */
+			if (!(bval->bv_hasnulls || bval->bv_allnulls))
 			{
-				/* We have null value, so hasnulls or allnulls must be true */
-				if (!(bval->bv_hasnulls || bval->bv_allnulls))
-				{
-					all_consist_ereport(state, tid, "range hasnulls and allnulls are false, but contains a null value");
-				}
-				continue;
+				all_consist_ereport(state, tid, "range hasnulls and allnulls are false, but contains a null value");
 			}
-
-			/*
-			 * In case of null and oi_regular_nulls = false we use isNull
-			 * scanKey for invocation of consistentFn
-			 */
-			scanKey = state->isnull_sk[attindex];
+			continue;
 		}
-		else
-		{
-			/* We have a nonnull value, so allnulls should be false */
-			if (bval->bv_allnulls)
-			{
-				all_consist_ereport(state, tid, "range allnulls is true, but contains nonnull value");
-			}
 
-			/* use "attr = value" scan key for nonnull values */
-			scanKey = state->nonnull_sk[attindex];
-			scanKey->sk_argument = values[attindex];
+		/* If we have a nonnull value allnulls should be false */
+		if (!nulls[attindex] && bval->bv_allnulls)
+		{
+			all_consist_ereport(state, tid, "range allnulls is true, but contains nonnull value");
 		}
 
 		/* If oi_regular_nulls = true we should never get there with null */
 		Assert(!oi_regular_nulls || !nulls[attindex]);
 
-		if (state->consistentFn[attindex].fn_nargs >= 4)
-		{
-			consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex],
-												   state->idxrel->rd_indcollation[attindex],
-												   PointerGetDatum(state->bdesc),
-												   PointerGetDatum(bval),
-												   PointerGetDatum(&scanKey),
-												   Int32GetDatum(1)
-				);
-		}
-		else
-		{
-			consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex],
-												   state->idxrel->rd_indcollation[attindex],
-												   PointerGetDatum(state->bdesc),
-												   PointerGetDatum(bval),
-												   PointerGetDatum(scanKey)
-				);
-		}
-
-		consistent = DatumGetBool(consistentFnResult);
+		withinRangeFnResult = FunctionCall4Coll(&state->withinRangeFn[attindex],
+												state->idxrel->rd_indcollation[attindex],
+												PointerGetDatum(bdesc),
+												PointerGetDatum(bval),
+												values[attindex],
+												nulls[attindex]);
 
-		if (!consistent)
+		withinRange = DatumGetBool(withinRangeFnResult);
+		if (!withinRange)
 		{
 			all_consist_ereport(state, tid, "heap tuple inconsistent with index");
 		}
-
 	}
 
 	MemoryContextSwitchTo(oldCtx);
-- 
2.43.0

