From 48593363cdca9358efb3784cb8e3b33952764842 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@2ndquadrant.com>
Date: Tue, 14 Feb 2023 20:29:33 +0100
Subject: [PATCH 4/9] Introduce BRIN_PROCNUM_PREPROCESS procedure

Allow BRIN opclasses to define an optional procedure to preprocess scan
keys, and call it from brinrescan(). This allows the opclass to modify
the keys in various ways - sort arrays, calculate hashes, ...

Note: The procedure is optional, so existing opclasses don't need to add
it. But if it uses the existing BRIN_PROCNUM_CONSISTENT function, it'll
get broken. If we want to make this backwards-compatible, we might check
if BRIN_PROCNUM_PREPROCESS exist from BRIN_PROCNUM_CONSISTENT, and
adjust behavior based on that.
---
 src/backend/access/brin/brin.c     | 72 ++++++++++++++++++++++++++----
 src/include/access/brin_internal.h |  1 +
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 85ae795949..ef3d64daf6 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -66,6 +66,12 @@ typedef struct BrinOpaque
 	BlockNumber bo_pagesPerRange;
 	BrinRevmap *bo_rmAccess;
 	BrinDesc   *bo_bdesc;
+
+	/* preprocessed scan keys */
+	int			bo_numScanKeys;		/* number of (preprocessed) scan keys */
+	ScanKey	   *bo_scanKeys;		/* modified copy of scan->keyData */
+	MemoryContext bo_scanKeysCxt;	/* scan-lifespan context for key data */
+
 } BrinOpaque;
 
 #define BRIN_ALL_BLOCKRANGES	InvalidBlockNumber
@@ -334,6 +340,11 @@ brinbeginscan(Relation r, int nkeys, int norderbys)
 	opaque->bo_rmAccess = brinRevmapInitialize(r, &opaque->bo_pagesPerRange,
 											   scan->xs_snapshot);
 	opaque->bo_bdesc = brin_build_desc(r);
+
+	opaque->bo_numScanKeys = 0;
+	opaque->bo_scanKeys = NULL;
+	opaque->bo_scanKeysCxt = NULL;
+
 	scan->opaque = opaque;
 
 	return scan;
@@ -456,7 +467,7 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	/* Preprocess the scan keys - split them into per-attribute arrays. */
 	for (int keyno = 0; keyno < scan->numberOfKeys; keyno++)
 	{
-		ScanKey		key = &scan->keyData[keyno];
+		ScanKey		key = opaque->bo_scanKeys[keyno];
 		AttrNumber	keyattno = key->sk_attno;
 
 		/*
@@ -735,17 +746,62 @@ void
 brinrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		   ScanKey orderbys, int norderbys)
 {
-	/*
-	 * Other index AMs preprocess the scan keys at this point, or sometime
-	 * early during the scan; this lets them optimize by removing redundant
-	 * keys, or doing early returns when they are impossible to satisfy; see
-	 * _bt_preprocess_keys for an example.  Something like that could be added
-	 * here someday, too.
-	 */
+	BrinOpaque *bo = (BrinOpaque *) scan->opaque;
+	Relation	idxRel = scan->indexRelation;
+	MemoryContext	oldcxt;
 
 	if (scankey && scan->numberOfKeys > 0)
 		memmove(scan->keyData, scankey,
 				scan->numberOfKeys * sizeof(ScanKeyData));
+
+	/*
+	 * Use the BRIN_PROCNUM_PREPROCESS procedure (if defined) to preprocess
+	 * the scan keys. The procedure may do anything, as long as the result
+	 * looks like a ScanKey. If there's no procedure, we keep the original
+	 * scan key.
+	 *
+	 * FIXME Probably need fixes to handle NULLs correctly.
+	 */
+	if (bo->bo_scanKeysCxt == NULL)
+		bo->bo_scanKeysCxt = AllocSetContextCreate(CurrentMemoryContext,
+												   "BRIN scan keys context",
+												   ALLOCSET_SMALL_SIZES);
+	else
+		MemoryContextReset(bo->bo_scanKeysCxt);
+
+	oldcxt = MemoryContextSwitchTo(bo->bo_scanKeysCxt);
+
+	bo->bo_scanKeys = palloc0(sizeof(ScanKey) * nscankeys);
+
+	for (int i = 0; i < nscankeys; i++)
+	{
+		FmgrInfo   *finfo;
+		ScanKey		key = &scan->keyData[i];
+		Oid			procid;
+		Datum		ret;
+
+		/* fetch key preprocess support procedure if specified */
+		procid = index_getprocid(idxRel, key->sk_attno,
+								 BRIN_PROCNUM_PREPROCESS);
+
+		/* not specified, just point to the original key */
+		if (!OidIsValid(procid))
+		{
+			bo->bo_scanKeys[i] = key;
+			continue;
+		}
+
+		finfo = index_getprocinfo(idxRel, key->sk_attno,
+								  BRIN_PROCNUM_PREPROCESS);
+
+		ret = FunctionCall2(finfo,
+							PointerGetDatum(bo->bo_bdesc),
+							PointerGetDatum(key));
+
+		bo->bo_scanKeys[i] = (ScanKey) DatumGetPointer(ret);
+	}
+
+	MemoryContextSwitchTo(oldcxt);
 }
 
 /*
diff --git a/src/include/access/brin_internal.h b/src/include/access/brin_internal.h
index 97ddc925b2..d6a51f2bc4 100644
--- a/src/include/access/brin_internal.h
+++ b/src/include/access/brin_internal.h
@@ -73,6 +73,7 @@ typedef struct BrinDesc
 #define BRIN_PROCNUM_UNION			4
 #define BRIN_MANDATORY_NPROCS		4
 #define BRIN_PROCNUM_OPTIONS 		5	/* optional */
+#define BRIN_PROCNUM_PREPROCESS		6	/* optional */
 /* procedure numbers up to 10 are reserved for BRIN future expansion */
 #define BRIN_FIRST_OPTIONAL_PROCNUM 11
 #define BRIN_LAST_OPTIONAL_PROCNUM	15
-- 
2.39.1

