From 21b404d8a6f400bb6da1bbfca7509a41f2b9f002 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 11 Jun 2024 14:32:47 +1200
Subject: [PATCH v2] Use streaming I/O for HNSW blocks.

If data is not already in PostgreSQL's cache, it will be accessed using
the new ReadStream API in PostgreSQL 17.  We know the next 'm' HNSW
blocks we will access, so the ReadStream can read them into the
kernel's page cache asynchronously, up to the limit of the
effective_io_concurrency setting.   While that currently defaults to
only 1, even 1 provides some speedup for cold caches, and higher number
help more.

XXX This is a proof-of-concept

Author: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/CA%2BhUKGJ_7NKd46nx1wbyXWriuZSNzsTfm%2BrhEuvU6nxZi3-KVw%40mail.gmail.com
---
 src/hnsw.h      |   1 +
 src/hnswutils.c | 113 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 114 insertions(+)

diff --git a/src/hnsw.h b/src/hnsw.h
index 480ad9f..e738241 100644
--- a/src/hnsw.h
+++ b/src/hnsw.h
@@ -136,6 +136,7 @@ struct HnswElementData
 	uint8		deleted;
 	uint32		hash;
 	HnswNeighborsPtr neighbors;
+	Buffer		buffer;
 	BlockNumber blkno;
 	OffsetNumber offno;
 	OffsetNumber neighborOffno;
diff --git a/src/hnswutils.c b/src/hnswutils.c
index 96c5026..46e76be 100644
--- a/src/hnswutils.c
+++ b/src/hnswutils.c
@@ -14,6 +14,10 @@
 #include "utils/memdebug.h"
 #include "utils/rel.h"
 
+#if PG_VERSION_NUM >= 170000
+#include "storage/read_stream.h"
+#endif
+
 #if PG_VERSION_NUM >= 130000
 #include "common/hashfn.h"
 #else
@@ -278,6 +282,9 @@ HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
 	HnswElement element = palloc(sizeof(HnswElementData));
 	char	   *base = NULL;
 
+#if PG_VERSION_NUM >= 170000
+	element->buffer = InvalidBuffer;
+#endif
 	element->blkno = blkno;
 	element->offno = offno;
 	HnswPtrStore(base, element->neighbors, (HnswNeighborArrayPtr *) NULL);
@@ -555,7 +562,20 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index,
 	HnswElementTuple etup;
 
 	/* Read vector */
+#if PG_VERSION_NUM >= 170000
+	if (element->buffer != InvalidBuffer)
+	{
+		/* Buffer pinned already. */
+		buf = element->buffer;
+		Assert(BufferGetBlockNumber(buf) == element->blkno);
+	}
+	else
+	{
+		buf = ReadBuffer(index, element->blkno);
+	}
+#else
 	buf = ReadBuffer(index, element->blkno);
+#endif
 	LockBuffer(buf, BUFFER_LOCK_SHARE);
 	page = BufferGetPage(buf);
 
@@ -717,6 +737,34 @@ CountElement(char *base, HnswElement skipElement, HnswCandidate * hc)
 	return e->heaptidsLength != 0;
 }
 
+#if PG_VERSION_NUM >= 170000
+typedef struct HnswSearchLayerNextBlockData {
+	char	   *base;
+	HnswCandidate **items;
+	int			nitems;
+	int			i;
+} HnswSearchLayerNextBlockData;
+
+/*
+ * Callback used to feed block numbers to the ReadStream.
+ */
+static BlockNumber
+HnswSearchLayerNextBlock(ReadStream *stream,
+						 void *callback_data,
+						 void *per_buffer_data)
+{
+	HnswSearchLayerNextBlockData *data = callback_data;
+	HnswElement hce;
+
+	if (data->i == data->nitems)
+		return InvalidBlockNumber;
+
+	hce = HnswPtrAccess(data->base, data->items[data->i++]->element);
+
+	return hce->blkno;
+}
+#endif
+
 /*
  * Algorithm 2 from paper
  */
@@ -732,6 +780,27 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 	HnswNeighborArray *neighborhoodData = NULL;
 	Size		neighborhoodSize = 0;
 
+#if PG_VERSION_NUM >= 170000
+	HnswSearchLayerNextBlockData stream_callback_data = {0};
+	ReadStream *stream;
+
+	/*
+	 * If we're searching an index, create a stream so that we can generate
+	 * some I/O asynchronicity when the index is cold, if
+	 * effective_io_concurrency is configured.
+	 */
+	if (index)
+		stream = read_stream_begin_relation(READ_STREAM_FULL,
+											NULL,
+											index,
+											MAIN_FORKNUM,
+											HnswSearchLayerNextBlock,
+											&stream_callback_data,
+											0);
+	else
+		stream = NULL;
+#endif
+
 	InitVisited(base, &v, index, ef, m);
 
 	/* Create local memory for neighborhood if needed */
@@ -767,6 +836,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 		HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner;
 		HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner;
 		HnswElement cElement;
+		HnswCandidate *items[HNSW_MAX_SIZE];
+		int nitems;
 
 		if (c->distance > f->distance)
 			break;
@@ -788,6 +859,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 			neighborhood = neighborhoodData;
 		}
 
+		/* Build a list of indexes of neighbors to visit. */
+		nitems = 0;
 		for (int i = 0; i < neighborhood->length; i++)
 		{
 			HnswCandidate *e = &neighborhood->items[i];
@@ -796,6 +869,35 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 			AddToVisited(base, &v, e, index, &visited);
 
 			if (!visited)
+				items[nitems++] = e;
+		}
+
+#if PG_VERSION_NUM >= 170000
+		if (stream)
+		{
+			/*
+			 * Give the callback the information it needs to find future block
+			 * numbers.
+			 */
+			stream_callback_data.base = base;
+			stream_callback_data.items = items;
+			stream_callback_data.nitems = nitems;
+			stream_callback_data.i = 0;
+
+			/*
+			 * Reset the stream.  This is necessary because each time the
+			 * callback runs out of data, the stream needs to be resetarted
+			 * before it tries to look ahead again.
+			 */
+			read_stream_reset(stream);
+		}
+#endif
+
+		/* Visit them. */
+		for (int i = 0; i < nitems; i++)
+		{
+			HnswCandidate *e = items[i];
+
 			{
 				float		eDistance;
 				HnswElement eElement = HnswPtrAccess(base, e->element);
@@ -806,7 +908,13 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 				if (index == NULL)
 					eDistance = GetCandidateDistance(base, e, q, procinfo, collation);
 				else
+				{
+#if PG_VERSION_NUM >= 170000
+					if (stream)
+						eElement->buffer = read_stream_next_buffer(stream, NULL);
+#endif
 					HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting, alwaysAdd ? NULL : &f->distance);
+				}
 
 				if (eDistance < f->distance || alwaysAdd)
 				{
@@ -844,6 +952,11 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 		}
 	}
 
+#if PG_VERSION_NUM >= 170000
+	if (stream)
+		read_stream_end(stream);
+#endif
+
 	/* Add each element of W to w */
 	while (!pairingheap_is_empty(W))
 	{
-- 
2.46.0

