From 484cfc9659b7ec69a41279db26eb61e6f7ae9b35 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 28 Jul 2023 22:35:32 +1200 Subject: [PATCH v2 7/8] Provide basic streaming read API. "Streaming reads" can be used as a more efficient replacement for ReadBuffer() calls. The client code supplies a callback that can say which block to read next, and then consumes individual buffers one at a time. This division allows the PgStreamingRead object to build larger calls to CompleteReadBuffers(), which in turn produce large preadv() calls instead of traditional single-block pread() calls. Unless the read is purely sequential, posix_fadvise() calls are also issued. This API is intended as a stepping stone, allowing for true asynchronous implementation in later work. Code that adapts to the streaming read API would automatically benefit. This work is based on lots of discussions with Andres Freund on how to prepare that pathway. Author: Thomas Munro Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com --- src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/streaming_read.c | 540 +++++++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/meson.build | 1 + src/include/storage/bufmgr.h | 2 + src/include/storage/streaming_read.h | 53 +++ src/tools/pgindent/typedefs.list | 2 + 9 files changed, 619 insertions(+), 2 deletions(-) create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/streaming_read.c create mode 100644 src/include/storage/streaming_read.h diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca2..eec03f6f2b 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 0000000000..bcab44c802 --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + streaming_read.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 0000000000..156e87cab7 --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2023, PostgreSQL Global Development Group + +backend_sources += files( + 'streaming_read.c', +) diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c new file mode 100644 index 0000000000..de538af7f0 --- /dev/null +++ b/src/backend/storage/aio/streaming_read.c @@ -0,0 +1,540 @@ +#include "postgres.h" + +#include "storage/streaming_read.h" +#include "utils/rel.h" + +/* + * Element type for PgStreamingRead's circular array of clusters of buffers. + * + * For hits and RBM_WILL_ZERO, need_to_complete is false, we have just one + * buffer in each cluster, already pinned and ready for use. + * + * For misses that require a physical read, need_to_complete is true, and + * buffers[] holds a group of of neighboring blocks, so we can complete them + * with a single call to CompleteReadBuffers(). We can also issue a single + * prefetch for it as soon as it has grown to its largest possible size, if + * our random access heuristics determine that is a good idea. + */ +typedef struct PgStreamingReadCluster +{ + bool advice_issued; + bool need_complete; + + BufferManagerRelation bmr; + ForkNumber forknum; + BlockNumber blocknum; + int nblocks; + + int per_io_data_index[MAX_BUFFERS_PER_TRANSFER]; + bool need_advice[MAX_BUFFERS_PER_TRANSFER]; + Buffer buffers[MAX_BUFFERS_PER_TRANSFER]; +} PgStreamingReadCluster; + +struct PgStreamingRead +{ + int max_ios; + int ios_in_progress; + int ios_in_progress_trigger; + int max_pinned_buffers; + int pinned_buffers; + int pinned_buffers_trigger; + int next_tail_buffer; + int ramp_up_pin_limit; + int ramp_up_pin_stall; + bool finished; + uintptr_t pgsr_private; + PgStreamingReadBufferCB simple_cb; + PgStreamingReadBufferExtendedCB extended_cb; + BufferAccessStrategy strategy; + + /* If using a simple callback, relation and fork are fixed. */ + BufferManagerRelation simple_bmr; + ForkNumber simple_forknum; + + /* Next expected prefetch, for sequential prefetch avoidance. */ + BufferManagerRelation seq_bmr; + ForkNumber seq_forknum; + BlockNumber seq_blocknum; + + /* Space for optional per-I/O private data. */ + size_t per_io_data_size; + void *per_io_data; + int per_io_data_next; + + /* Circular buffer of clusters. */ + int size; + int head; + int tail; + PgStreamingReadCluster clusters[FLEXIBLE_ARRAY_MEMBER]; +}; + +static PgStreamingRead * +pg_streaming_read_buffer_alloc_internal(int max_ios, + size_t per_io_data_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy) +{ + PgStreamingRead *pgsr; + int size; + int max_pinned_buffers; + + Assert(max_ios > 0); + + /* + * We allow twice as many buffers to be pinned as I/Os. This allows us to + * look further ahead for blocks that need to be read in. + */ + max_pinned_buffers = max_ios * 2; + + /* Don't allow this backend to pin too many buffers. */ + LimitAdditionalPins((uint32 *) &max_pinned_buffers); + max_pinned_buffers = Max(2, max_pinned_buffers); + max_ios = max_pinned_buffers / 2; + Assert(max_ios > 0); + Assert(max_pinned_buffers > 0); + Assert(max_pinned_buffers > max_ios); + + /* + * pgsr->clusters is a circular buffer. When it is empty, head == tail. + * When it is full, there is an empty element between head and tail. Head + * can also be empty (nblocks == 0). So we need two extra elements. + */ + size = max_pinned_buffers + 2; + + pgsr = (PgStreamingRead *) + palloc0(offsetof(PgStreamingRead, clusters) + + sizeof(pgsr->clusters[0]) * size); + + pgsr->per_io_data_size = per_io_data_size; + pgsr->max_ios = max_ios; + pgsr->max_pinned_buffers = max_pinned_buffers; + pgsr->pgsr_private = pgsr_private; + pgsr->strategy = strategy; + pgsr->size = size; + + /* + * We don't try to reach max_pinned_buffers immediately, we start of with + * a limit of 1 and increase that quickly as a way of ramping up. This is + * intended to avoid generating I/O for callers that give up after a small + * number of pages. + * + * XXX Should this be a policy choice for the caller to specify? + */ + pgsr->ramp_up_pin_limit = 1; + + /* + * We look ahead when the number of pinned buffers falls below this + * number. This encourages the formation of large vectored reads. + */ + pgsr->pinned_buffers_trigger = + Max(max_ios, max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER); + + /* Space the callback to store extra data along with each block. */ + if (per_io_data_size) + pgsr->per_io_data = palloc(per_io_data_size * max_pinned_buffers); + + return pgsr; +} + +PgStreamingRead * +pg_streaming_read_buffer_alloc(int max_ios, + size_t per_io_data_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb) +{ + PgStreamingRead *result; + + result = pg_streaming_read_buffer_alloc_internal(max_ios, + per_io_data_size, + pgsr_private, + strategy); + result->simple_cb = next_block_cb; + result->simple_bmr = bmr; + result->simple_forknum = forknum; + + return result; +} + + +PgStreamingRead * +pg_streaming_read_buffer_alloc_ext(int max_ios, + size_t per_io_data_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy, + PgStreamingReadBufferExtendedCB next_block_cb) +{ + PgStreamingRead *result; + + result = pg_streaming_read_buffer_alloc_internal(max_ios, + per_io_data_size, + pgsr_private, + strategy); + result->extended_cb = next_block_cb; + + return result; +} + +/* + * Issue WILLNEED advice for the head cluster, and allocate a new head + * cluster. + * + * We don't have true asynchronous I/O to actually submit, but this is + * equivalent because it might start I/O on systems that understand WILLNEED + * advice. We count it as an I/O in progress. + */ +static PgStreamingReadCluster * +pg_streaming_read_submit(PgStreamingRead *pgsr) +{ + PgStreamingReadCluster *head_cluster; + + head_cluster = &pgsr->clusters[pgsr->head]; + Assert(head_cluster->nblocks > 0); + +#ifdef USE_PREFETCH + + /* + * Don't bother with advice if there will be no call to + * CompleteReadBuffers() or direct I/O is enabled. + */ + if (head_cluster->need_complete && + (io_direct_flags & IO_DIRECT_DATA) == 0) + { + /* + * Purely sequential advice is known to hurt performance on some + * systems, so only issue it if this looks random. + */ + if (head_cluster->bmr.smgr != pgsr->seq_bmr.smgr || + head_cluster->bmr.rel != pgsr->seq_bmr.rel || + head_cluster->forknum != pgsr->seq_forknum || + head_cluster->blocknum != pgsr->seq_blocknum) + { + SMgrRelation smgr = + head_cluster->bmr.smgr ? head_cluster->bmr.smgr + : RelationGetSmgr(head_cluster->bmr.rel); + + Assert(!head_cluster->advice_issued); + + for (int i = 0; i < head_cluster->nblocks; i++) + { + if (head_cluster->need_advice[i]) + { + BlockNumber first_blocknum = head_cluster->blocknum + i; + int nblocks = 1; + + /* + * How many adjacent blocks can we merge with to reduce + * system calls? Usually this is all of them, unless + * there are overlapping reads and our timing is unlucky. + */ + while ((i + 1) < head_cluster->nblocks && + head_cluster->need_advice[i + 1]) + { + nblocks++; + i++; + } + + smgrprefetch(smgr, + head_cluster->forknum, + first_blocknum, + nblocks); + } + + } + + /* + * Count this as an I/O that is concurrently in progress. We + * might have called smgrprefetch() more than once, if some of the + * buffers in the range were already in buffer pool but not valid + * yet, because of a concurrent read, but for now we choose to + * track this as one I/O. + */ + head_cluster->advice_issued = true; + pgsr->ios_in_progress++; + } + + /* Remember the point after this, for the above heuristics. */ + pgsr->seq_bmr = head_cluster->bmr; + pgsr->seq_forknum = head_cluster->forknum; + pgsr->seq_blocknum = head_cluster->blocknum + head_cluster->nblocks; + } +#endif + + /* Create a new head cluster. There must be space. */ + Assert(pgsr->size > pgsr->max_pinned_buffers); + Assert((pgsr->head + 1) % pgsr->size != pgsr->tail); + if (++pgsr->head == pgsr->size) + pgsr->head = 0; + head_cluster = &pgsr->clusters[pgsr->head]; + head_cluster->nblocks = 0; + + return head_cluster; +} + +void +pg_streaming_read_prefetch(PgStreamingRead *pgsr) +{ + /* + * If we're still ramping up, we may have to stall to wait for buffers to + * be consumed first before we do any more prefetching. + */ + if (pgsr->ramp_up_pin_stall > 0) + { + Assert(pgsr->pinned_buffers > 0); + return; + } + + /* If we're finished or can't start one more I/O, then no prefetching. */ + if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * We'll also wait until the number of pinned buffers falls below our + * trigger level, so that we have the chance to create a full cluster. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + do + { + BufferManagerRelation bmr; + ForkNumber forknum; + BlockNumber blocknum; + ReadBufferMode mode; + Buffer buffer; + bool found; + bool allocated; + bool need_complete; + PgStreamingReadCluster *head_cluster; + void *per_io_data; + + /* Do we have a full-sized cluster? */ + head_cluster = &pgsr->clusters[pgsr->head]; + if (head_cluster->nblocks == lengthof(head_cluster->buffers)) + { + Assert(head_cluster->need_complete); + head_cluster = pg_streaming_read_submit(pgsr); + + /* + * Give up now if I/O is saturated, or we wouldn't be able form + * another full cluster after this due to the pin limit. Once + * ramp_up_limit is not limiting us, we want to be able to create + * full sized clusters as much as possible. + */ + if (pgsr->ios_in_progress == pgsr->max_ios || + pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + break; + } + + per_io_data = (char *) pgsr->per_io_data + + pgsr->per_io_data_size * pgsr->per_io_data_next; + + /* Find out which block the callback wants to read next. */ + if (pgsr->simple_cb) + { + /* Simple callbacks use fixed relation, fork and normal mode. */ + blocknum = pgsr->simple_cb(pgsr, pgsr->pgsr_private, per_io_data); + if (blocknum == InvalidBlockNumber) + { + pgsr->finished = true; + break; + } + bmr = pgsr->simple_bmr; + forknum = pgsr->simple_forknum; + mode = RBM_NORMAL; + } + else + { + /* Extended callbacks choose the relation, fork, mode. */ + if (!pgsr->extended_cb(pgsr, pgsr->pgsr_private, per_io_data, + &bmr, &forknum, &blocknum, &mode)) + { + pgsr->finished = true; + break; + } + } + + Assert(mode == RBM_NORMAL || mode == RBM_WILL_ZERO); + Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers); + + buffer = PrepareReadBuffer(bmr, + forknum, + blocknum, + pgsr->strategy, + &found, + &allocated); + pgsr->pinned_buffers++; + + need_complete = !found && mode != RBM_WILL_ZERO; + + /* Is there a head cluster that we can't extend? */ + head_cluster = &pgsr->clusters[pgsr->head]; + if (head_cluster->nblocks > 0 && + (!need_complete || + !head_cluster->need_complete || + head_cluster->bmr.smgr != bmr.smgr || + head_cluster->bmr.rel != bmr.rel || + head_cluster->forknum != forknum || + head_cluster->blocknum + head_cluster->nblocks != blocknum)) + { + /* Submit it so we can start a new one. */ + head_cluster = pg_streaming_read_submit(pgsr); + Assert(head_cluster->nblocks == 0); + } + + if (head_cluster->nblocks == 0) + { + /* Initialize the cluster. */ + head_cluster->bmr = bmr; + head_cluster->forknum = forknum; + head_cluster->blocknum = blocknum; + head_cluster->advice_issued = false; + head_cluster->need_complete = need_complete; + } + else + { + /* We'll extend an existing cluster by one buffer. */ + Assert(head_cluster->bmr.smgr == bmr.smgr); + Assert(head_cluster->bmr.rel == bmr.rel); + Assert(head_cluster->forknum == forknum); + Assert(head_cluster->blocknum + head_cluster->nblocks == blocknum); + Assert(head_cluster->need_complete); + } + + head_cluster->per_io_data_index[head_cluster->nblocks] = pgsr->per_io_data_next++; + head_cluster->need_advice[head_cluster->nblocks] = allocated; + head_cluster->buffers[head_cluster->nblocks] = buffer; + head_cluster->nblocks++; + + if (pgsr->per_io_data_next == pgsr->max_pinned_buffers) + pgsr->per_io_data_next = 0; + + } while (pgsr->ios_in_progress < pgsr->max_ios && + pgsr->pinned_buffers < pgsr->max_pinned_buffers && + pgsr->pinned_buffers < pgsr->ramp_up_pin_limit); + + /* If we've hit the ramp-up limit, insert a stall. */ + if (pgsr->pinned_buffers >= pgsr->ramp_up_pin_limit) + { + /* Can't get here if an earlier stall hasn't finished. */ + Assert(pgsr->ramp_up_pin_stall == 0); + /* Don't do any more prefetching until these buffers are consumed. */ + pgsr->ramp_up_pin_stall = pgsr->ramp_up_pin_limit; + /* Double it. It will soon be out of the way. */ + pgsr->ramp_up_pin_limit *= 2; + } + + /* Submit the head cluster, if there is one. */ + if (pgsr->clusters[pgsr->head].nblocks > 0) + pg_streaming_read_submit(pgsr); +} + +void +pg_streaming_read_reset(PgStreamingRead *pgsr) +{ + pgsr->finished = false; +} + +Buffer +pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_io_data) +{ + pg_streaming_read_prefetch(pgsr); + + /* See if we have one buffer to return. */ + while (pgsr->tail != pgsr->head) + { + PgStreamingReadCluster *tail_cluster; + + tail_cluster = &pgsr->clusters[pgsr->tail]; + + /* + * Do we need to perform an I/O before returning the buffers from this + * cluster? + */ + if (tail_cluster->need_complete) + { + CompleteReadBuffers(tail_cluster->bmr, + tail_cluster->buffers, + tail_cluster->forknum, + tail_cluster->blocknum, + tail_cluster->nblocks, + false, + pgsr->strategy); + tail_cluster->need_complete = false; + + /* We only counted this I/O as running if we issued advice. */ + if (tail_cluster->advice_issued) + pgsr->ios_in_progress--; + } + + /* Are there more buffers available in this cluster? */ + if (pgsr->next_tail_buffer < tail_cluster->nblocks) + { + int buffer_index; + Buffer buffer; + + buffer_index = pgsr->next_tail_buffer++; + buffer = tail_cluster->buffers[buffer_index]; + + Assert(BufferIsValid(buffer)); + + /* We are giving away ownership of this pinned buffer. */ + Assert(pgsr->pinned_buffers > 0); + pgsr->pinned_buffers--; + + if (pgsr->ramp_up_pin_stall > 0) + pgsr->ramp_up_pin_stall--; + + if (per_io_data) + *per_io_data = (char *) pgsr->per_io_data + + tail_cluster->per_io_data_index[buffer_index] * + pgsr->per_io_data_size; + + return buffer; + } + + /* Advance tail to next cluster, if there is one. */ + if (++pgsr->tail == pgsr->size) + pgsr->tail = 0; + pgsr->next_tail_buffer = 0; + } + + Assert(pgsr->pinned_buffers == 0); + Assert(pgsr->ramp_up_pin_stall == 0); + + return InvalidBuffer; +} + +void +pg_streaming_read_free(PgStreamingRead *pgsr) +{ + Buffer buffer; + + /* Stop reading ahead, and unpin anything that wasn't consumed. */ + pgsr->finished = true; + for (;;) + { + buffer = pg_streaming_read_buffer_get_next(pgsr, NULL); + if (buffer == InvalidBuffer) + break; + ReleaseBuffer(buffer); + } + + if (pgsr->per_io_data) + pfree(pgsr->per_io_data); + pfree(pgsr); +} + +int +pg_streaming_read_ios(PgStreamingRead *pgsr) +{ + return pgsr->ios_in_progress; +} + +int +pg_streaming_read_pins(PgStreamingRead *pgsr) +{ + return pgsr->pinned_buffers; +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 1d79907020..179f90ab99 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1908,7 +1908,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 6ea9faa439..b196d9f885 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2023, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index e29ca85077..6e1e99a4db 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -270,6 +270,8 @@ extern void ZeroBuffer(Buffer buffer, ReadBufferMode mode); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern void LimitAdditionalPins(uint32 *additional_pins); + /* in buf_init.c */ extern void InitBufferPool(void); extern Size BufferShmemSize(void); diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h new file mode 100644 index 0000000000..dbf8c279d9 --- /dev/null +++ b/src/include/storage/streaming_read.h @@ -0,0 +1,53 @@ +#ifndef STREAMING_READ_H +#define STREAMING_READ_H + +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/smgr.h" + +/* + * For most sequential access, callers can user this size to build full sized + * reads without pinning many extra buffers. + */ +#define PG_STREAMING_READ_DEFAULT_MAX_IOS MAX_BUFFERS_PER_TRANSFER + +struct PgStreamingRead; +typedef struct PgStreamingRead PgStreamingRead; + +/* "Simple" callback that returns a block number. */ +typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr, + uintptr_t pgsr_private, + void *per_io_private); + +/* Callback allowing the relation, fork, block and mode to vary. */ +typedef bool (*PgStreamingReadBufferExtendedCB) (PgStreamingRead *pgsr, + uintptr_t pgsr_private, + void *per_io_private, + BufferManagerRelation *bmr, + ForkNumber *forknum, + BlockNumber *blockNum, + ReadBufferMode *mode); + +extern PgStreamingRead *pg_streaming_read_buffer_alloc(int max_ios, + size_t per_io_private_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb); + +extern PgStreamingRead *pg_streaming_read_buffer_alloc_ext(int max_ios, + size_t per_io_private_size, + uintptr_t pgsr_private, + BufferAccessStrategy strategy, + PgStreamingReadBufferExtendedCB next_block_cb); + +extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); +extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_io_private); +extern void pg_streaming_read_reset(PgStreamingRead *pgsr); +extern void pg_streaming_read_free(PgStreamingRead *pgsr); + +extern int pg_streaming_read_ios(PgStreamingRead *pgsr); +extern int pg_streaming_read_pins(PgStreamingRead *pgsr); + +#endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index dba3498a13..7d00eff1d5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2081,6 +2081,8 @@ PgStat_TableCounts PgStat_TableStatus PgStat_TableXactStatus PgStat_WalStats +PgStreamingRead +PgStreamingReadCluster PgXmlErrorContext PgXmlStrictness Pg_finfo_record -- 2.42.1