--- configure.in.orig 2014-05-28 08:29:09.146829394 -0400 +++ configure.in 2014-05-28 16:45:42.406505235 -0400 @@ -1771,6 +1771,12 @@ operating system; use --disable-thread- fi fi +# test whether this system has both the librt-style async io and the gcc atomic compare_and_swap +# and test operation of the latter. +PGAC_FUNC_AIO_ATOMIC_BUILTIN_COMP_SWAP +if test x"$pgac_cv_aio_atomic_builtin_comp_swap" = x"yes"; then + AC_DEFINE(USE_AIO_ATOMIC_BUILTIN_COMP_SWAP, 1, [Define to select librt-style async io and the gcc atomic compare_and_swap.]) +fi # Select semaphore implementation type. if test "$PORTNAME" != "win32"; then --- contrib/pg_stat_statements/pg_stat_statements--1.3.sql.orig 2014-05-28 08:50:32.110559768 -0400 +++ contrib/pg_stat_statements/pg_stat_statements--1.3.sql 2014-05-28 16:45:42.570505896 -0400 @@ -0,0 +1,52 @@ +/* contrib/pg_stat_statements/pg_stat_statements--1.3.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pg_stat_statements VERSION '1.3'" to load this file. \quit + +-- Register functions. +CREATE FUNCTION pg_stat_statements_reset() +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C; + +CREATE FUNCTION pg_stat_statements(IN showtext boolean, + OUT userid oid, + OUT dbid oid, + OUT queryid bigint, + OUT query text, + OUT calls int8, + OUT total_time float8, + OUT rows int8, + OUT shared_blks_hit int8, + OUT shared_blks_read int8, + OUT shared_blks_dirtied int8, + OUT shared_blks_written int8, + OUT local_blks_hit int8, + OUT local_blks_read int8, + OUT local_blks_dirtied int8, + OUT local_blks_written int8, + OUT temp_blks_read int8, + OUT temp_blks_written int8, + OUT blk_read_time float8, + OUT blk_write_time float8 + , OUT aio_read_noneed int8 + , OUT aio_read_discrd int8 + , OUT aio_read_forgot int8 + , OUT aio_read_noblok int8 + , OUT aio_read_failed int8 + , OUT aio_read_wasted int8 + , OUT aio_read_waited int8 + , OUT aio_read_ontime int8 +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'pg_stat_statements_1_3' +LANGUAGE C STRICT VOLATILE; + +-- Register a view on the function for ease of use. +CREATE VIEW pg_stat_statements AS + SELECT * FROM pg_stat_statements(true); + +GRANT SELECT ON pg_stat_statements TO PUBLIC; + +-- Don't want this to be available to non-superusers. +REVOKE ALL ON FUNCTION pg_stat_statements_reset() FROM PUBLIC; --- contrib/pg_stat_statements/Makefile.orig 2014-05-28 08:29:09.166829383 -0400 +++ contrib/pg_stat_statements/Makefile 2014-05-28 16:45:42.590505977 -0400 @@ -4,7 +4,8 @@ MODULE_big = pg_stat_statements OBJS = pg_stat_statements.o EXTENSION = pg_stat_statements -DATA = pg_stat_statements--1.2.sql pg_stat_statements--1.1--1.2.sql \ +DATA = pg_stat_statements--1.3.sql pg_stat_statements--1.2--1.3.sql \ + pg_stat_statements--1.2.sql pg_stat_statements--1.1--1.2.sql \ pg_stat_statements--1.0--1.1.sql pg_stat_statements--unpackaged--1.0.sql ifdef USE_PGXS --- contrib/pg_stat_statements/pg_stat_statements.c.orig 2014-05-28 08:29:09.166829383 -0400 +++ contrib/pg_stat_statements/pg_stat_statements.c 2014-05-28 16:45:42.630506138 -0400 @@ -117,6 +117,7 @@ typedef enum pgssVersion PGSS_V1_0 = 0, PGSS_V1_1, PGSS_V1_2 + ,PGSS_V1_3 } pgssVersion; /* @@ -148,6 +149,16 @@ typedef struct Counters int64 local_blks_written; /* # of local disk blocks written */ int64 temp_blks_read; /* # of temp blocks read */ int64 temp_blks_written; /* # of temp blocks written */ + + int64 aio_read_noneed; /* # of prefetches for which no need for prefetch as block already in buffer pool */ + int64 aio_read_discrd; /* # of prefetches for which buffer not subsequently read and therefore discarded */ + int64 aio_read_forgot; /* # of prefetches for which buffer not subsequently read and then forgotten about */ + int64 aio_read_noblok; /* # of prefetches for which no available BufferAiocb control block */ + int64 aio_read_failed; /* # of aio reads for which aio itself failed or the read failed with an errno */ + int64 aio_read_wasted; /* # of aio reads for which in-progress aio cancelled and disk block not used */ + int64 aio_read_waited; /* # of aio reads for which disk block used but had to wait for it */ + int64 aio_read_ontime; /* # of aio reads for which disk block used and ready on time when requested */ + double blk_read_time; /* time spent reading, in msec */ double blk_write_time; /* time spent writing, in msec */ double usage; /* usage factor */ @@ -274,7 +285,7 @@ void _PG_init(void); void _PG_fini(void); PG_FUNCTION_INFO_V1(pg_stat_statements_reset); -PG_FUNCTION_INFO_V1(pg_stat_statements_1_2); +PG_FUNCTION_INFO_V1(pg_stat_statements_1_3); PG_FUNCTION_INFO_V1(pg_stat_statements); static void pgss_shmem_startup(void); @@ -1026,7 +1037,25 @@ pgss_ProcessUtility(Node *parsetree, con bufusage.temp_blks_read = pgBufferUsage.temp_blks_read - bufusage_start.temp_blks_read; bufusage.temp_blks_written = - pgBufferUsage.temp_blks_written - bufusage_start.temp_blks_written; + pgBufferUsage.temp_blks_written - bufusage.temp_blks_written; + + bufusage.aio_read_noneed = + pgBufferUsage.aio_read_noneed - bufusage.aio_read_noneed; + bufusage.aio_read_discrd = + pgBufferUsage.aio_read_discrd - bufusage.aio_read_discrd; + bufusage.aio_read_forgot = + pgBufferUsage.aio_read_forgot - bufusage.aio_read_forgot; + bufusage.aio_read_noblok = + pgBufferUsage.aio_read_noblok - bufusage.aio_read_noblok; + bufusage.aio_read_failed = + pgBufferUsage.aio_read_failed - bufusage.aio_read_failed; + bufusage.aio_read_wasted = + pgBufferUsage.aio_read_wasted - bufusage.aio_read_wasted; + bufusage.aio_read_waited = + pgBufferUsage.aio_read_waited - bufusage.aio_read_waited; + bufusage.aio_read_ontime = + pgBufferUsage.aio_read_ontime - bufusage.aio_read_ontime; + bufusage.blk_read_time = pgBufferUsage.blk_read_time; INSTR_TIME_SUBTRACT(bufusage.blk_read_time, bufusage_start.blk_read_time); bufusage.blk_write_time = pgBufferUsage.blk_write_time; @@ -1041,6 +1070,7 @@ pgss_ProcessUtility(Node *parsetree, con rows, &bufusage, NULL); + } else { @@ -1224,6 +1254,16 @@ pgss_store(const char *query, uint32 que e->counters.local_blks_written += bufusage->local_blks_written; e->counters.temp_blks_read += bufusage->temp_blks_read; e->counters.temp_blks_written += bufusage->temp_blks_written; + + e->counters.aio_read_noneed += bufusage->aio_read_noneed; + e->counters.aio_read_discrd += bufusage->aio_read_discrd; + e->counters.aio_read_forgot += bufusage->aio_read_forgot; + e->counters.aio_read_noblok += bufusage->aio_read_noblok; + e->counters.aio_read_failed += bufusage->aio_read_failed; + e->counters.aio_read_wasted += bufusage->aio_read_wasted; + e->counters.aio_read_waited += bufusage->aio_read_waited; + e->counters.aio_read_ontime += bufusage->aio_read_ontime; + e->counters.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time); e->counters.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time); e->counters.usage += USAGE_EXEC(total_time); @@ -1257,7 +1297,8 @@ pg_stat_statements_reset(PG_FUNCTION_ARG #define PG_STAT_STATEMENTS_COLS_V1_0 14 #define PG_STAT_STATEMENTS_COLS_V1_1 18 #define PG_STAT_STATEMENTS_COLS_V1_2 19 -#define PG_STAT_STATEMENTS_COLS 19 /* maximum of above */ +#define PG_STAT_STATEMENTS_COLS_V1_3 27 +#define PG_STAT_STATEMENTS_COLS 27 /* maximum of above */ /* * Retrieve statement statistics. @@ -1270,6 +1311,16 @@ pg_stat_statements_reset(PG_FUNCTION_ARG * function. Unfortunately we weren't bright enough to do that for 1.1. */ Datum +pg_stat_statements_1_3(PG_FUNCTION_ARGS) +{ + bool showtext = PG_GETARG_BOOL(0); + + pg_stat_statements_internal(fcinfo, PGSS_V1_3, showtext); + + return (Datum) 0; +} + +Datum pg_stat_statements_1_2(PG_FUNCTION_ARGS) { bool showtext = PG_GETARG_BOOL(0); @@ -1358,6 +1409,10 @@ pg_stat_statements_internal(FunctionCall if (api_version != PGSS_V1_2) elog(ERROR, "incorrect number of output arguments"); break; + case PG_STAT_STATEMENTS_COLS_V1_3: + if (api_version != PGSS_V1_3) + elog(ERROR, "incorrect number of output arguments"); + break; default: elog(ERROR, "incorrect number of output arguments"); } @@ -1534,11 +1589,24 @@ pg_stat_statements_internal(FunctionCall { values[i++] = Float8GetDatumFast(tmp.blk_read_time); values[i++] = Float8GetDatumFast(tmp.blk_write_time); + + if (api_version >= PGSS_V1_3) + { + values[i++] = Int64GetDatumFast(tmp.aio_read_noneed); + values[i++] = Int64GetDatumFast(tmp.aio_read_discrd); + values[i++] = Int64GetDatumFast(tmp.aio_read_forgot); + values[i++] = Int64GetDatumFast(tmp.aio_read_noblok); + values[i++] = Int64GetDatumFast(tmp.aio_read_failed); + values[i++] = Int64GetDatumFast(tmp.aio_read_wasted); + values[i++] = Int64GetDatumFast(tmp.aio_read_waited); + values[i++] = Int64GetDatumFast(tmp.aio_read_ontime); + } } Assert(i == (api_version == PGSS_V1_0 ? PG_STAT_STATEMENTS_COLS_V1_0 : api_version == PGSS_V1_1 ? PG_STAT_STATEMENTS_COLS_V1_1 : api_version == PGSS_V1_2 ? PG_STAT_STATEMENTS_COLS_V1_2 : + api_version == PGSS_V1_3 ? PG_STAT_STATEMENTS_COLS_V1_3 : -1 /* fail if you forget to update this assert */ )); tuplestore_putvalues(tupstore, tupdesc, values, nulls); --- contrib/pg_stat_statements/pg_stat_statements--1.2--1.3.sql.orig 2014-05-28 08:50:32.110559768 -0400 +++ contrib/pg_stat_statements/pg_stat_statements--1.2--1.3.sql 2014-05-28 16:45:42.658506251 -0400 @@ -0,0 +1,51 @@ +/* contrib/pg_stat_statements/pg_stat_statements--1.2--1.3.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION pg_stat_statements UPDATE TO '1.3'" to load this file. \quit + +/* First we have to remove them from the extension */ +ALTER EXTENSION pg_stat_statements DROP VIEW pg_stat_statements; +ALTER EXTENSION pg_stat_statements DROP FUNCTION pg_stat_statements(); + +/* Then we can drop them */ +DROP VIEW pg_stat_statements; +DROP FUNCTION pg_stat_statements(); + +/* Now redefine */ +CREATE FUNCTION pg_stat_statements(IN showtext boolean, + OUT userid oid, + OUT dbid oid, + OUT queryid bigint, + OUT query text, + OUT calls int8, + OUT total_time float8, + OUT rows int8, + OUT shared_blks_hit int8, + OUT shared_blks_read int8, + OUT shared_blks_dirtied int8, + OUT shared_blks_written int8, + OUT local_blks_hit int8, + OUT local_blks_read int8, + OUT local_blks_dirtied int8, + OUT local_blks_written int8, + OUT temp_blks_read int8, + OUT temp_blks_written int8, + OUT blk_read_time float8, + OUT blk_write_time float8 + , OUT aio_read_noneed int8 + , OUT aio_read_discrd int8 + , OUT aio_read_forgot int8 + , OUT aio_read_noblok int8 + , OUT aio_read_failed int8 + , OUT aio_read_wasted int8 + , OUT aio_read_waited int8 + , OUT aio_read_ontime int8 +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'pg_stat_statements_1_3' +LANGUAGE C STRICT VOLATILE; + +CREATE VIEW pg_stat_statements AS + SELECT * FROM pg_stat_statements(true); + +GRANT SELECT ON pg_stat_statements TO PUBLIC; --- postgresql-prefetching-asyncio.README.orig 2014-05-28 09:18:59.386318460 -0400 +++ postgresql-prefetching-asyncio.README 2014-05-28 16:45:42.702506429 -0400 @@ -0,0 +1,542 @@ +Postgresql -- Extended Prefetching using Asynchronous IO +============================================================ + +Postgresql currently (9.3.4) provides a limited prefetching capability +using posix_fadvise to give hints to the Operating System kernel +about which pages it expects to read in the near future. +This capability is used only during the heap-scan phase of bitmap-index scans. +It is controlled via the effective_io_concurrency configuration parameter. + +This capability is now extended in two ways : + . use asynchronous IO into Postgresql shared buffers as an + alternative to posix_fadvise + . Implement prefetching in other types of scan : + . non-bitmap (i.e. simple) index scans - index pages + currently only for B-tree indexes. + (developed by Claudio Freire ) + . non-bitmap (i.e. simple) index scans - heap pages + currently only for B-tree indexes. + . simple heap scans + +Posix asynchronous IO is chosen as the function library for asynchronous IO, +since this is well supported and also fits very well with the model of +the prefetching process, particularly as regards checking for completion +of an asynchronous read. On linux, Posix asynchronous IO is provided +in the librt library. librt uses independently-schedulable threads to +achieve the asynchronicity, rather than kernel functionality. + +In this implementation, use of asynchronous IO is limited to prefetching +while performing one of the three types of scan + . B-tree bitmap index scan - heap pages (as already exists) + . B-tree non-bitmap (i.e. simple) index scans - index and heap pages + . simple heap scans +on permanent relations. It is not used on temporary tables nor for writes. + +The advantages of Posix asynchronous IO into shared buffers +compared to posix_fadvise are : + . Beneficial for non-sequential access patterns as well as sequential + . No restriction on the kinds of IO which can be used + (other kinds of asynchronous IO impose restrictions such as + buffer alignment, use of non-buffered IO). + . Does not interfere with standard linux kernel read-ahead functionality. + (It has been stated in + www.postgresql.org/message-id/CAGTBQpbu2M=-M7NUr6DWr0K8gUVmXVhwKohB-Cnj7kYS1AhH4A@mail.gmail.com + that : + "the kernel stops doing read-ahead when a call to posix_fadvise comes. + I noticed the performance hit, and checked the kernel's code. + It effectively changes the prediction mode from sequential to fadvise, + negating the (assumed) kernel's prefetch logic") + . When the read request is issued after a prefetch has completed, + no delay associated with a kernel call to copy the page from + kernel page buffers into the Postgresql shared buffer, + since it is already there. + Also, in a memory-constrained environment, there is a greater + probability that the prefetched page will "stick" in memory + since the linux kernel victimizes the filesystem page cache in preference + to swapping out user process pages. + . Statistics on prefetch success can be gathered (see "Statistics" below) + which helps the administrator to tune the prefetching settings. + +These benefits are most likely to be obtained in a system whose usage profile +(e.g. from iostat) shows: + . high IO wait from mostly-read activity + . disk access pattern is not entirely sequential + (so kernel readahead can't predict it but postgresql can) + . sufficient spare idle CPU to run the librt pthreads + or, stated another way, the CPU subsystem is relatively powerful + compared to the disk subsystem. +In such ideal conditions, and with a workload with plenty of index scans, +around 10% - 20% improvement in throughput has been achieved. +In an admittedly extreme environment measured by this author, with a workload +consisting of 8 client applications each running similar complex queries +(same query structure but different predicates and constants), +including 2 Bitmap Index Scans and 17 non-bitmap index scans, +on a dual-core Intel laptop (4 hyperthreads) with the database on a single +USB3-attached 500GB disk drive, and no part of the database in filesystem buffers +initially, (filesystem freshly mounted), comparing unpatched build +using posix_fadvise with effective_io_concurrency 4 against same build patched +with async IO and effective_io_concurrency 4 and max_async_io_prefetchers 32, +elapse time repeatably improved from around 640-670 seconds to around 530-550 seconds, +a 17% - 18% improvement. + +The disadvantages of Posix asynchronous IO compared to posix_fadvise are: + . probably higher CPU utilization: + Firstly, the extra work performed by the librt threads adds CPU + overhead, and secondly, if the asynchronous prefetching is effective, + then it will deliver better (greater) overlap of CPU with IO, which + will reduce elapsed times and hence increase CPU utilization percentage + still more (during that shorter elapsed time). + . more context switching, because of the additional threads. + + +Statistics: +___________ + +A number of additional statistics relating to effectiveness of asynchronous IO +are provided as an extension of the existing pg_stat_statements loadable module. +Refer to the appendix "Additional Supplied Modules" in the current +PostgreSQL Documentation for details of this module. + +The following additional statistics are provided for asynchronous IO prefetching: + + . aio_read_noneed : number of prefetches for which no need for prefetch as block already in buffer pool + . aio_read_discrd : number of prefetches for which buffer not subsequently read and therefore discarded + . aio_read_forgot : number of prefetches for which buffer not subsequently read and then forgotten about + . aio_read_noblok : number of prefetches for which no available BufferAiocb control block + . aio_read_failed : number of aio reads for which aio itself failed or the read failed with an errno + . aio_read_wasted : number of aio reads for which in-progress aio cancelled and disk block not used + . aio_read_waited : number of aio reads for which disk block used but had to wait for it + . aio_read_ontime : number of aio reads for which disk block used and ready on time when requested + +Some of these are (hopefully) self-explanatory. Some additional notes: + + . aio_read_discrd and aio_read_forgot : + prefetch was wasted work since the buffer was not subsequently read + The discrd case indicates that the scanner realized this and discarded the buffer, + whereas the forgot case indicates that the scanner did not realize it, + which should not normally occur. + A high number in either suggests lowering effective_io_concurrency. + + . aio_read_noblok : + Any significant number in relation to all the other numbers indicates that + max_async_io_prefetchers should be increased. + + . aio_read_waited : + The page was prefetched but the asynchronous read had not completed by the time the + scanner requested to read it. causes extra overhead in waiting and indicates + prefetching is not providing much if any benefit. + The disk subsystem may be underpowered/overloaded in relation to the available CPU power. + + . aio_read_ontime : + The page was prefetched and the asynchronous read had completed by the time the + scanner requested to read it. Optimal behaviour. If this number if large + in relation to all the other numbers except (possibly) aio_read_noneed, + then prefetching is working well. + +To create the extension with support for these additional statistics, use the following syntax: + CREATE EXTENSION pg_stat_statements VERSION '1.3' +or, if you run the new code against an existing database which already has the extension +( see installation and migration below ), you can + ALTER EXTENSION pg_stat_statements UPDATE TO '1.3' + +A suggested set of commands for displaying these statistics might be : + + /* OPTIONALLY */ DROP extension pg_stat_statements; + CREATE extension pg_stat_statements VERSION '1.3'; + /* run your workload */ + select userid , dbid , substring(query from 1 for 24) , calls , total_time , rows , shared_blks_read , blk_read_time , blk_write_time \ + , aio_read_noneed , aio_read_noblok , aio_read_failed , aio_read_wasted , aio_read_waited , aio_read_ontime , aio_read_forgot \ + from pg_stat_statements where shared_blks_read > 0; + + +Installation and Build Configuration: +_____________________________________ + +1. First - a prerequsite: +# as well as requiring all the usual package build tools such as gcc , make etc, +# as described in the instructions for building postgresql, +# the following is required : + gnu autoconf at version 2.69 : +# run the following command +autoconf -V +# it *must* return +autoconf (GNU Autoconf) 2.69 + +2. If you don't have it or it is a different version, +then you must obtain version 2.69 (which is the current version) +from your distribution provider or from the gnu software download site. + +3. Also you must have the source tree for postgresql version 9.4 (development version). +# all the following commands assume your current working directory is the top of the source tree. + +4. cd to top of source tree : +# check it appears to be a postgresql source tree +ls -ld configure.in src +# should show both the file and the directory +grep PostgreSQL COPYRIGHT +# should show PostgreSQL Database Management System + +5. Apply the patch : +patch -b -p0 -i +# should report no errors, 43 files patched (see list at bottom of this README) +# and all hunks applied +# check the patch was appplied to configure.in +ls -ld configure.in.orig configure.in +# should show both files + +6. Rebuild the configure script with the patched configure.in : +mv configure configure.orig; +autoconf configure.in >configure;echo "rc= $? from autoconf"; chmod +x configure; +ls -lrt configure.orig configure; + +7. run the new configure script : +# if you have run configure before, +# then you may first want to save existing config.status and config.log if they exist, +# and then specify same configure flags and options as you specified before. +# the patch does not alter or extend the set of configure options +# if unsure, run ./configure --help +# if still unsure, run ./configure +./configure + + + +8. now check that configure decided that this environment supports asynchronous IO : +grep USE_AIO_ATOMIC_BUILTIN_COMP_SWAP src/include/pg_config.h +# it should show +#define USE_AIO_ATOMIC_BUILTIN_COMP_SWAP 1 +# if not, apparently your environment does not support asynch IO - +# the config.log will show how it came to that conclusion, +# also check for : +# . a librt.so somewhere in the loader's library path (probably under /lib , /lib64 , or /usr) +# . your gcc must support the atomic compare_and_swap __sync_bool_compare_and_swap built-in function +# do not proceed without this define being set. + +9. do you want to use the new code on an existing cluster + that was created using the same code base but without the patch? + If so then run this nasty-looking command : + (cut-and-paste it into a terminal window or a shell-script file) + Otherwise continue to step 10. + see Migration note below for explanation. +############################################################################################### + fl=src/Makefile.global; typeset -i bkx=0; while [[ $bkx < 200 ]]; do { + bkfl="${fl}.bak${bkx}"; if [[ -a ${bkfl} ]]; then ((bkx=bkx+1)); else break; fi; + }; done; + if [[ -a ${bkfl} ]]; then echo "sorry cannot find a backup name for $fl"; + elif [[ -a $fl ]]; then { + mv $fl $bkfl && { + sed -e "/^CFLAGS =/ s/\$/ -DAVOID_CATALOG_MIGRATION_FOR_ASYNCIO/" $bkfl > $fl; + str="diff -w $bkfl $fl";echo "$str"; eval "$str"; + }; + }; + else echo "ooopppss $fl is missing"; + fi; +############################################################################################### +# it should report something like +diff -w Makefile.global.bak0 Makefile.global +222c222 +< CFLAGS = XXXX +--- +> CFLAGS = XXXX -DAVOID_CATALOG_MIGRATION_FOR_ASYNCIO +# where XXXX is some set of flags + + +10. now run the rest of the build process as usual - + follow instructions in file INSTALL if that file exists, + else e.g. run +make && make install + +If the build fails with the following error: +undefined reference to `aio_init' +Then edit the following file +src/include/pg_config_manual.h +and add the following line at the bottom: + +#define DONT_HAVE_AIO_INIT + +and then run +make clean && make && make install +See notes to section Runtime Configuration below for more information on this. + + + +Migration , Runtime Configuration, and Use: +___________________________________________ + + +Database Migration: +___________________ + +The new prefetching code for non-bitmap index scans introduces a new btree-index +function named btpeeknexttuple. The correct way to add such a function involves +also adding it to the catalog as an internal function in pg_proc. +However, this results in the new built code considering an existing database to be +incompatible, i.e requiring backup on the old code and restore on the new. +This is normal behaviour for migration to a new version of postgresql, and is +also a valid way of migrating a database for use with this asynchronous IO feature, +but in this case it may be inconvenient. + +As an alternative, the new code may be compiled with the macro define +AVOID_CATALOG_MIGRATION_FOR_ASYNCIO +which does what it says by not altering the catalog. The patched build can then +be run against an existing database cluster initdb'd using the unpatched build. + +There are no known ill-effects of so doing, but : + . in any case, it is strongly suggested to make a backup of any precious database + before accessing it with a patched build + . be aware that if this asynchronous IO feature is eventually released as part of postgresql, + migration will probably be required anyway. + +This option to avoid catalog migration is intended as a convenience for a quick test, +and also makes it easier to obtain performance comparisons on the same database. + + + +Runtime Configuration: +______________________ + +One new configuration parameter settable in postgresql.conf and +in any other way as described in the postgresql documentation : + +max_async_io_prefetchers + Maximum number of background processes concurrently using asynchronous + librt threads to prefetch pages into shared memory buffers + +This number can be thought of as the maximum number +of librt threads concurrently active, each working on a list of +from 1 to target_prefetch_pages pages ( see notes 1 and 2 ). + +In practice, this number simply controls how many prefetch requests in total +may be active concurrently : + max_async_io_prefetchers * target_prefetch_pages ( see note 1) + +default is max_connections/6 +and recall that the default for max_connections is 100 + + +note 1 a number based on effective_io_concurrency and approximately n * ln(n) + where n is effective_io_concurrency + +note 2 Provided that the gnu extension to Posix AIO which provides the +aio_init() function is present, then aio_init() is called +to set the librt maximum number of threads to max_async_io_prefetchers, +and to set the maximum number of concurrent aio read requests to the product of + max_async_io_prefetchers * target_prefetch_pages + + +As well as this regular configuration parameter, +there are several other parameters that can be set via environment variable. +The reason why they are environment vars rather than regular configuration parameters +is that it is not expected that they should need to be set, but they may be useful : + variable name values default meaning + PG_TRY_PREFETCHING_FOR_BITMAP [Y|N] Y whether to prefetch bitmap heap scans + PG_TRY_PREFETCHING_FOR_ISCAN [Y|N|integer[,[N|Y]]] 256,N whether to prefetch non-bitmap index scans + also numeric size of list of prefetched blocks + also whether to prefetch forward-sequential-pattern index pages + PG_TRY_PREFETCHING_FOR_BTREE [Y|N] Y whether to prefetch heap pages in non-bitmap index scans + PG_TRY_PREFETCHING_FOR_HEAP [Y|N] N whether to prefetch relation (un-indexed) heap scans + + +The setting for PG_TRY_PREFETCHING_FOR_ISCAN is a litle complicated. +It can be set to Y or N to control prefetching of non-bitmap index scans; +But in addition it can be set to an integer, which both implies Y +and also sets the size of a list used to remember prefetched but unread heap pages. +This list is an optimization used to avoid re-prefetching and maximise the potential +set of prefetchable blocks indexed by one index page. +And if set to an integer, this integer may be followed by either ,Y or ,N +to specify to prefetch index pages which are being accessed forward-sequentially. +It has been found that prefetching is not of great benefit for this access pattern, +and so it is not the default, but also does no harm (provided sufficient CPU capacity). + + + +Usage : +______ + + +There are no changes in usage other than as noted under Configuration and Statistics. +However, in order to assess benefit from this feature, it will be useful to +understand the query access plans of your workload using EXPLAIN. Before doing that, +make sure that statistics are up to date using ANALYZE. + + + +Internals: +__________ + + +Internal changes span two areas and the interface between them : + + . buffer manager layer + . programming interface for scanner to call buffer manager + . scanner layer + + . buffer manager layer + ____________________ + + changes comprise : + . allocating, pinning , unpinning buffers + this is complex and discussed briefly below in "Buffer Management" + . acquiring and releasing a BufferAiocb, the control block + associated with a single aio_read, and checking for its completion + a new file, backend/storage/buffer/buf_async.c, provides three new functions, + BufStartAsync BufReleaseAsync BufCheckAsync + which handle this. + . calling librt asynch io functions + this follows the example of all other filesystem interfaces + and is straightforward. + two new functions are provided in fd.c: + FileStartaio FileCompleteaio + and corresponding interfaces in smgr.c + + . programming interface for scanner to call buffer manager + ________________________________________________________ + . calling interface for existing function PrefetchBuffer is modified : + . one new argument, BufferAccessStrategy strategy + . now returns an int return code which indicates : + whether pin count on buffer has been increased by 1 + whether block was already present in a buffer + . new function DiscardBuffer + . discard buffer used for a previously prefetched page + which scanner decides it does not want to read. + . same arguments as for PrefetchBuffer except for omission of BufferAccessStrategy + . note - this is different from the existing function ReleaseBuffer + in that ReleaseBuffer takes a buffer_descriptor as argument + for a buffer which has been read, but has similar purpose. + + . scanner layer + _____________ + common to all scanners is that the scanner which wishes to prefetch must do two things: + . decide which pages to prefetch and call PrefetchBuffer to prefetch them + nodeBitmapHeapscan already does this (but note one extra argument on PrefetchBuffer) + . remember which pages it has prefetched in some list (actual or conceptual, e.g. a page range), + removing each page from this list if and when it subsequently reads the page. + . at end of scan, call DiscardBuffer for every remembered (i.e. prefetched not unread) page + how this list of prefetched pages is implemented varies for each of the three scanners and four scan types: + . bitmap index scan - heap pages + . non-bitmap (i.e. simple) index scans - index pages + . non-bitmap (i.e. simple) index scans - heap pages + . simple heap scans + The consequences of forgetting to call DiscardBuffer on a prefetched but unread page are: + . counted in aio_read_forgot (see "Statistics" above) + . may incur an annoying but harmless warning in the pg_log "Buffer Leak ... " + (the buffer is released at commit) + This does sometimes happen ... + + + +Buffer Management +_________________ + +With async io, PrefetchBuffer must allocate and pin a buffer, which is relatively straightforward, +but also every other part of buffer manager must know about the possibility that a buffer may be in +a state of async_io_in_progress state and be prepared to determine the possible completion. +That is, one backend BK1 may start the io but another BK2 may try to read it before BK1 does. +Posix Asynchronous IO provides a means for waiting on this or another task's read if in progress, +namely aio_suspend(), which this extension uses. Therefore, although StartBufferIO and TerminateBufferIO +are called as part of asynchronous prefetching, their role is limited to maintaining the buffer descriptor flags, +and they do not track the asynchronous IO itself. Instead, asynchronous IOs are tracked in +a separate set of shared control blocks, the BufferAiocb list - +refer to include/storage/buf_internals.h +Checking asynchronous io status is handled in backend/storage/buffer/buf_async.c BufCheckAsync function. +Read the commentary for this function for more details. + +Pinning and unpinning of buffers is the most complex aspect of asynch io prefetching, +and the logic is spread throughout BufStartAsync , BufCheckAsync , and many functions in bufmgr.c. +When a backend BK2 requests ReadBuffer of a page for which asynch read is in progress, +buffer manager has to determine which backend BK1 pinned this buffer during previous PrefetchBuffer, +and for example must not be re-pinned a second time if BK2 is BK1. +Information concerning which backend initiated the prefetch is held in the BufferAiocb. + +The trickiest case concerns the scenario in which : + . BK1 initiates prefetch and acquires a pin + . BK2 possibly waits for completion and then reads the buffer, and perhaps later on + releases it by ReleaseBuffer. + . Since the asynchronous IO is no longer in progress, there is no longer any + BufferAiocb associated with it. Yet buffer manager must remember that BK1 holds a + "prefetch" pin, i.e. a pin which must not be repeated if and when BK1 finally issues ReadBuffer. + . The solution to this problem is to invent the concept of a "banked" pin, + which is a pin obtained when prefetch was issued, identied as in "banked" status only if and when + the associated asynchronous IO terminates, and redeemable by the next use by same task, + either by ReadBuffer or DiscardBuffer. + The pid of the backend which holds a banked pin on a buffer (there can be at most one such backend) + is stored in the buffer descriptor. + This is done without increasing size of the buffer descriptor, which is important since + there may be a very large number of these. This does overload the relevant field in the descriptor. + Refer to include/storage/buf_internals.h for more details + and search for BM_AIO_PREFETCH_PIN_BANKED in storage/buffer/bufmgr.c and backend/storage/buffer/buf_async.c + +______________________________________________________________________________ +The following 43 files are changed in this feature (output of the patch command) : + +patching file configure.in +patching file contrib/pg_stat_statements/pg_stat_statements--1.3.sql +patching file contrib/pg_stat_statements/Makefile +patching file contrib/pg_stat_statements/pg_stat_statements.c +patching file contrib/pg_stat_statements/pg_stat_statements--1.2--1.3.sql +patching file config/c-library.m4 +patching file src/backend/postmaster/postmaster.c +patching file src/backend/executor/nodeBitmapHeapscan.c +patching file src/backend/executor/nodeIndexscan.c +patching file src/backend/executor/instrument.c +patching file src/backend/storage/buffer/Makefile +patching file src/backend/storage/buffer/bufmgr.c +patching file src/backend/storage/buffer/buf_async.c +patching file src/backend/storage/buffer/buf_init.c +patching file src/backend/storage/smgr/md.c +patching file src/backend/storage/smgr/smgr.c +patching file src/backend/storage/file/fd.c +patching file src/backend/storage/lmgr/proc.c +patching file src/backend/access/heap/heapam.c +patching file src/backend/access/heap/syncscan.c +patching file src/backend/access/index/indexam.c +patching file src/backend/access/index/genam.c +patching file src/backend/access/nbtree/nbtsearch.c +patching file src/backend/access/nbtree/nbtinsert.c +patching file src/backend/access/nbtree/nbtpage.c +patching file src/backend/access/nbtree/nbtree.c +patching file src/backend/nodes/tidbitmap.c +patching file src/backend/utils/misc/guc.c +patching file src/backend/utils/mmgr/aset.c +patching file src/include/executor/instrument.h +patching file src/include/storage/bufmgr.h +patching file src/include/storage/smgr.h +patching file src/include/storage/fd.h +patching file src/include/storage/buf_internals.h +patching file src/include/catalog/pg_am.h +patching file src/include/catalog/pg_proc.h +patching file src/include/pg_config_manual.h +patching file src/include/access/nbtree.h +patching file src/include/access/heapam.h +patching file src/include/access/relscan.h +patching file src/include/nodes/tidbitmap.h +patching file src/include/utils/rel.h +patching file src/include/pg_config.h.in + + +Future Possibilities: +____________________ + +There are several possible extensions of this feature : + . Extend prefetching of index scans to types of index + other than B-tree. + This should be fairly straightforward, but requires some + good base of benchmarkable workloads to prove the value. + . Investigate why asynchronous IO prefetching does not greatly + improve sequential relation heap scans and possibly find how to + achieve a benefit. + . Build knowledge of asycnhronous IO prefetching into the + Query Planner costing. + This is far from straightforward. The Postgresql Query Planner's + costing model is based on resource consumption rather than elapsed time. + Use of asynchronous IO prefetching is intended to improve elapsed time + as the expense of (probably) higher resource consumption. + Although Costing understands about the reduced cost of reading buffered + blocks, it does not take asynchronicity or overlap of CPU with disk + into account. A naive approach might be to try to tweak the Query + Planner's Cost Constant configuration parameters + such as seq_page_cost , random_page_cost + but this is hazardous as explained in the Documentation. + + + +John Lumby, johnlumby(at)hotmail(dot)com --- config/c-library.m4.orig 2014-05-28 08:29:09.142829396 -0400 +++ config/c-library.m4 2014-05-28 16:45:42.746506606 -0400 @@ -367,3 +367,50 @@ if test "$pgac_cv_type_locale_t" = 'yes AC_DEFINE(LOCALE_T_IN_XLOCALE, 1, [Define to 1 if `locale_t' requires .]) fi])])# PGAC_HEADER_XLOCALE + + +# PGAC_FUNC_AIO_ATOMIC_BUILTIN_COMP_SWAP +# --------------------------------------- +# test whether this system has both the librt-style async io and the gcc atomic compare_and_swap +# and test operation of the latter. +# +AC_DEFUN([PGAC_FUNC_AIO_ATOMIC_BUILTIN_COMP_SWAP], +[AC_MSG_CHECKING([whether have both librt-style async io and the gcc atomic compare_and_swap]) +AC_CACHE_VAL(pgac_cv_aio_atomic_builtin_comp_swap, +pgac_save_LIBS=$LIBS +LIBS=" -lrt $pgac_save_LIBS" +[AC_TRY_RUN([#include +#include +#include "aio.h" + +int main (int argc, char *argv[]) + { + int rc; + struct aiocb volatile * first_aiocb; + struct aiocb volatile * second_aiocb; + struct aiocb volatile * my_aiocbp = (struct aiocb *)20000008; + + first_aiocb = (struct aiocb *)20000008; + second_aiocb = (struct aiocb *)40000008; + + /* return zero as success if two comp-swaps both worked as expected - + ** first compares equal and swaps, second compares unequal + */ + rc = (__sync_bool_compare_and_swap (&my_aiocbp, first_aiocb, second_aiocb)); + if (rc) { + rc = (__sync_bool_compare_and_swap (&my_aiocbp, first_aiocb, second_aiocb)); + } else { + rc = -1; + } + + return rc; +}], +[pgac_cv_aio_atomic_builtin_comp_swap=yes], +[pgac_cv_aio_atomic_builtin_comp_swap=no], +[pgac_cv_aio_atomic_builtin_comp_swap=cross]) +])dnl AC_CACHE_VAL +AC_MSG_RESULT([$pgac_cv_aio_atomic_builtin_comp_swap]) +if test x"$pgac_cv_aio_atomic_builtin_comp_swap" != x"yes"; then +LIBS=$pgac_save_LIBS +fi +])# PGAC_FUNC_AIO_ATOMIC_BUILTIN_COMP_SWAP --- src/backend/postmaster/postmaster.c.orig 2014-05-28 08:29:09.322829301 -0400 +++ src/backend/postmaster/postmaster.c 2014-05-28 16:45:42.814506880 -0400 @@ -123,6 +123,11 @@ #include "storage/spin.h" #endif +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +void ReportFreeBAiocbs(void); +int CountInuseBAiocbs(void); +extern int hwmBufferAiocbs; /* high water mark of in-use BufferAiocbs in pool */ +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ /* * Possible types of a backend. Beyond being the possible bkend_type values in @@ -1493,9 +1498,15 @@ ServerLoop(void) fd_set readmask; int nSockets; time_t now, +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + count_baiocb_time, +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ last_touch_time; last_touch_time = time(NULL); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + count_baiocb_time = time(NULL); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ nSockets = initMasks(&readmask); @@ -1654,6 +1665,19 @@ ServerLoop(void) last_touch_time = now; } +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + /* maintain the hwm of used baiocbs every 10 seconds */ + if ((now - count_baiocb_time) >= 10) + { + int inuseBufferAiocbs; /* current in-use BufferAiocbs in pool */ + inuseBufferAiocbs = CountInuseBAiocbs(); + if (inuseBufferAiocbs > hwmBufferAiocbs) { + hwmBufferAiocbs = inuseBufferAiocbs; + } + count_baiocb_time = now; + } +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + /* * If we already sent SIGQUIT to children and they are slow to shut * down, it's time to send them SIGKILL. This doesn't happen @@ -3444,6 +3468,9 @@ PostmasterStateMachine(void) signal_child(PgStatPID, SIGQUIT); } } +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + ReportFreeBAiocbs(); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ } } --- src/backend/executor/nodeBitmapHeapscan.c.orig 2014-05-28 08:29:09.270829328 -0400 +++ src/backend/executor/nodeBitmapHeapscan.c 2014-05-28 16:45:42.834506961 -0400 @@ -34,6 +34,8 @@ * ExecEndBitmapHeapScan releases all storage. */ #include "postgres.h" +#include +#include #include "access/relscan.h" #include "access/transam.h" @@ -47,6 +49,10 @@ #include "utils/snapmgr.h" #include "utils/tqual.h" +#ifdef USE_PREFETCH +extern unsigned int prefetch_dbOid; /* database oid of relations on which prefetching to be done - 0 means all */ +extern unsigned int prefetch_bitmap_scans; /* boolean whether to prefetch bitmap heap scans */ +#endif /* USE_PREFETCH */ static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node); static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres); @@ -111,10 +117,21 @@ BitmapHeapNext(BitmapHeapScanState *node node->tbmres = tbmres = NULL; #ifdef USE_PREFETCH - if (target_prefetch_pages > 0) - { + if ( prefetch_bitmap_scans + && (target_prefetch_pages > 0) + && ( ( (prefetch_dbOid > 0) + && (prefetch_dbOid == scan->rs_rd->rd_node.dbNode) + ) + || (prefetch_dbOid == 0) + ) + /* sufficient number of blocks - at least twice the target_prefetch_pages */ + && (scan->rs_nblocks > (2*target_prefetch_pages)) + ) { node->prefetch_iterator = prefetch_iterator = tbm_begin_iterate(tbm); node->prefetch_pages = 0; + if (prefetch_iterator) { + tbm_zero(prefetch_iterator); /* zero list of prefetched and unread blocknos */ + } node->prefetch_target = -1; } #endif /* USE_PREFETCH */ @@ -138,12 +155,14 @@ BitmapHeapNext(BitmapHeapScanState *node } #ifdef USE_PREFETCH + if (prefetch_iterator) { if (node->prefetch_pages > 0) { /* The main iterator has closed the distance by one page */ node->prefetch_pages--; + tbm_subtract(prefetch_iterator, tbmres->blockno); /* remove this blockno from list of prefetched and unread blocknos */ } - else if (prefetch_iterator) + else { /* Do not let the prefetch iterator get behind the main one */ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); @@ -151,6 +170,7 @@ BitmapHeapNext(BitmapHeapScanState *node if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno) elog(ERROR, "prefetch and main iterators are out of sync"); } + } #endif /* USE_PREFETCH */ /* @@ -239,16 +259,26 @@ BitmapHeapNext(BitmapHeapScanState *node while (node->prefetch_pages < node->prefetch_target) { TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator); + int PrefetchBufferRc; /* return value from PrefetchBuffer - refer to bufmgr.h */ + if (tbmpre == NULL) { /* No more pages to prefetch */ - tbm_end_iterate(prefetch_iterator); - node->prefetch_iterator = prefetch_iterator = NULL; + /* let ExecEndBitmapHeapScan terminate the prefetch_iterator + ** tbm_end_iterate(prefetch_iterator); + ** node->prefetch_iterator = NULL; + */ + prefetch_iterator = NULL; break; } node->prefetch_pages++; - PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno); + PrefetchBufferRc = PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno , 0); + /* add this blockno to list of prefetched and unread blocknos + ** if pin count did not increase then indicate so in the Unread_Pfetched list + */ + tbm_add(prefetch_iterator + ,( (PrefetchBufferRc & PREFTCHRC_BUF_PIN_INCREASED) ? tbmpre->blockno : InvalidBlockNumber ) ); } } #endif /* USE_PREFETCH */ @@ -482,12 +512,31 @@ ExecEndBitmapHeapScan(BitmapHeapScanStat { Relation relation; HeapScanDesc scanDesc; + TBMIterator *prefetch_iterator; /* * extract information from the node */ relation = node->ss.ss_currentRelation; scanDesc = node->ss.ss_currentScanDesc; + prefetch_iterator = node->prefetch_iterator; + +#ifdef USE_PREFETCH + /* before any other cleanup, discard any prefetched but unread buffers */ + if (prefetch_iterator != NULL) { + TBMIterateResult *tbmpre = tbm_locate_IterateResult(prefetch_iterator); + BlockNumber *Unread_Pfetched_base = tbmpre->Unread_Pfetched_base; + unsigned int Unread_Pfetched_next = tbmpre->Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count = tbmpre->Unread_Pfetched_count; + + while ((Unread_Pfetched_count--) > 0) { + DiscardBuffer( scanDesc->rs_rd, MAIN_FORKNUM, *(Unread_Pfetched_base+Unread_Pfetched_next)); + Unread_Pfetched_next++; + if (Unread_Pfetched_next >= target_prefetch_pages) + Unread_Pfetched_next = 0; + } + } +#endif /* USE_PREFETCH */ /* * Free the exprcontext --- src/backend/executor/nodeIndexscan.c.orig 2014-05-28 08:29:09.270829328 -0400 +++ src/backend/executor/nodeIndexscan.c 2014-05-28 16:45:42.858507057 -0400 @@ -35,8 +35,13 @@ #include "utils/rel.h" + static TupleTableSlot *IndexNext(IndexScanState *node); +#ifdef USE_PREFETCH +extern unsigned int prefetch_dbOid; /* database oid of relations on which prefetching to be done - 0 means all */ +extern unsigned int prefetch_index_scans; /* boolean whether to prefetch bitmap heap scans */ +#endif /* USE_PREFETCH */ /* ---------------------------------------------------------------- * IndexNext @@ -418,7 +423,12 @@ ExecEndIndexScan(IndexScanState *node) * close the index relation (no-op if we didn't open it) */ if (indexScanDesc) + { index_endscan(indexScanDesc); + + /* note - at this point all scan controlblock resources have been freed by IndexScanEnd called by index_endscan */ + + } if (indexRelationDesc) index_close(indexRelationDesc, NoLock); @@ -609,6 +619,33 @@ ExecInitIndexScan(IndexScan *node, EStat indexstate->iss_NumScanKeys, indexstate->iss_NumOrderByKeys); +#ifdef USE_PREFETCH + /* initialize prefetching */ + indexstate->iss_ScanDesc->pfch_index_page_list = (struct pfch_index_pagelist*)0; + indexstate->iss_ScanDesc->pfch_block_item_list = (struct pfch_block_item*)0; + if ( prefetch_index_scans + && (target_prefetch_pages > 0) + && (!RelationUsesLocalBuffers(indexstate->iss_ScanDesc->heapRelation)) /* I think this must always be true for an indexed heap ? */ + && ( ( (prefetch_dbOid > 0) + && (prefetch_dbOid == indexstate->iss_ScanDesc->heapRelation->rd_node.dbNode) + ) + || (prefetch_dbOid == 0) + ) + ) { + indexstate->iss_ScanDesc->pfch_index_page_list = palloc( sizeof(struct pfch_index_pagelist) + ( (target_prefetch_pages-1) * sizeof(struct pfch_index_item) ) ); + indexstate->iss_ScanDesc->pfch_block_item_list = palloc( prefetch_index_scans * sizeof(struct pfch_block_item) ); + if ( ( (struct pfch_index_pagelist*)0 != indexstate->iss_ScanDesc->pfch_index_page_list ) + && ( (struct pfch_block_item*)0 != indexstate->iss_ScanDesc->pfch_block_item_list ) + ) { + indexstate->iss_ScanDesc->pfch_used = 0; + indexstate->iss_ScanDesc->pfch_next = prefetch_index_scans; /* ensure first entry is at index 0 */ + indexstate->iss_ScanDesc->pfch_index_page_list->pfch_index_pagelist_next = (struct pfch_index_pagelist*)0; + indexstate->iss_ScanDesc->pfch_index_page_list->pfch_index_item_count = 0; + indexstate->iss_ScanDesc->do_prefetch = 1; + } + } +#endif /* USE_PREFETCH */ + /* * If no run-time keys to calculate, go ahead and pass the scankeys to the * index AM. --- src/backend/executor/instrument.c.orig 2014-05-28 08:29:09.266829330 -0400 +++ src/backend/executor/instrument.c 2014-05-28 16:45:42.882507154 -0400 @@ -41,6 +41,14 @@ InstrAlloc(int n, int instrument_options { instr[i].need_bufusage = need_buffers; instr[i].need_timer = need_timer; + instr[i].bufusage_start.aio_read_noneed = 0; + instr[i].bufusage_start.aio_read_discrd = 0; + instr[i].bufusage_start.aio_read_forgot = 0; + instr[i].bufusage_start.aio_read_noblok = 0; + instr[i].bufusage_start.aio_read_failed = 0; + instr[i].bufusage_start.aio_read_wasted = 0; + instr[i].bufusage_start.aio_read_waited = 0; + instr[i].bufusage_start.aio_read_ontime = 0; } } @@ -143,6 +151,16 @@ BufferUsageAccumDiff(BufferUsage *dst, dst->local_blks_written += add->local_blks_written - sub->local_blks_written; dst->temp_blks_read += add->temp_blks_read - sub->temp_blks_read; dst->temp_blks_written += add->temp_blks_written - sub->temp_blks_written; + + dst->aio_read_noneed += add->aio_read_noneed - sub->aio_read_noneed; + dst->aio_read_discrd += add->aio_read_discrd - sub->aio_read_discrd; + dst->aio_read_forgot += add->aio_read_forgot - sub->aio_read_forgot; + dst->aio_read_noblok += add->aio_read_noblok - sub->aio_read_noblok; + dst->aio_read_failed += add->aio_read_failed - sub->aio_read_failed; + dst->aio_read_wasted += add->aio_read_wasted - sub->aio_read_wasted; + dst->aio_read_waited += add->aio_read_waited - sub->aio_read_waited; + dst->aio_read_ontime += add->aio_read_ontime - sub->aio_read_ontime; + INSTR_TIME_ACCUM_DIFF(dst->blk_read_time, add->blk_read_time, sub->blk_read_time); INSTR_TIME_ACCUM_DIFF(dst->blk_write_time, --- src/backend/storage/buffer/Makefile.orig 2014-05-28 08:29:09.330829297 -0400 +++ src/backend/storage/buffer/Makefile 2014-05-28 16:45:42.942507396 -0400 @@ -12,6 +12,6 @@ subdir = src/backend/storage/buffer top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o +OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o buf_async.o include $(top_srcdir)/src/backend/common.mk --- src/backend/storage/buffer/bufmgr.c.orig 2014-05-28 08:29:09.334829294 -0400 +++ src/backend/storage/buffer/bufmgr.c 2014-05-28 16:45:42.978507541 -0400 @@ -29,7 +29,7 @@ * buf_table.c -- manages the buffer lookup table */ #include "postgres.h" - +#include #include #include @@ -50,7 +50,6 @@ #include "utils/resowner_private.h" #include "utils/timestamp.h" - /* Note: these two macros only work on shared buffers, not local ones! */ #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) @@ -63,6 +62,8 @@ #define BUF_WRITTEN 0x01 #define BUF_REUSABLE 0x02 +extern volatile struct BAiocbAnchor *BAiocbAnchr; /* anchor for all control blocks pertaining to aio */ + #define DROP_RELS_BSEARCH_THRESHOLD 20 /* GUC variables */ @@ -78,26 +79,33 @@ bool track_io_timing = false; */ int target_prefetch_pages = 0; -/* local state for StartBufferIO and related functions */ +/* local state for StartBufferIO and related functions +** but ONLY for synchronous IO - not altered for aio +*/ static volatile BufferDesc *InProgressBuf = NULL; static bool IsForInput; +pid_t this_backend_pid = 0; /* pid of this backend */ /* local state for LockBufferForCleanup */ static volatile BufferDesc *PinCountWaitBuf = NULL; - -static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence, +extern int +BufStartAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum , BufferAccessStrategy strategy); +extern int +BufCheckAsync(SMgrRelation caller_smgr, Relation caller_reln, volatile BufferDesc *buf_desc, int intention + ,BufferAccessStrategy strategy , int index_for_aio , bool spinLockHeld , LWLockId PartitionLock ); +Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, - bool *hit); -static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy); -static void PinBuffer_Locked(volatile BufferDesc *buf); -static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner); + bool *hit , int index_for_aio); +bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy); +void PinBuffer_Locked(volatile BufferDesc *buf); +void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); static int SyncOneBuffer(int buf_id, bool skip_recently_used); static void WaitIO(volatile BufferDesc *buf); -static bool StartBufferIO(volatile BufferDesc *buf, bool forInput); -static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, +static bool StartBufferIO(volatile BufferDesc *buf, bool forInput , int index_for_aio ); +void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); @@ -106,24 +114,66 @@ static volatile BufferDesc *BufferAlloc( ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, - bool *foundPtr); + int *foundPtr , int index_for_aio ); static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); static int rnode_comparator(const void *p1, const void *p2); +BlockNumber BlocknumOfBuffer(Buffer buffer); +bool BlocknotinBuffer(Buffer buffer, Relation relation, BlockNumber blockNum); /* * PrefetchBuffer -- initiate asynchronous read of a block of a relation * - * This is named by analogy to ReadBuffer but doesn't actually allocate a - * buffer. Instead it tries to ensure that a future ReadBuffer for the given - * block will not be delayed by the I/O. Prefetching is optional. + * This is named by analogy to ReadBuffer but allocates a buffer only if using asynchronous I/O. + * Its purpose is to try to ensure that a future ReadBuffer for the given block + * will not be delayed by the I/O. Prefetching is optional. * No-op if prefetching isn't compiled in. - */ -void -PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) -{ + * + * Originally the prefetch simply called posix_fadvise() to recommend read-ahead into kernel page cache. + * Extended to provide an alternative of issuing an asynchronous aio_read() to read into a buffer. + * This extension has an implication on how this bufmgr component manages concurrent requests + * for the same disk block. + * + * Synchronous IO (read()) does not provide a means for waiting on another task's read if in progress, + * and bufmgr implements its own scheme in StartBufferIO, WaitIO, and TerminateBufferIO. + * + * Asynchronous IO (aio_read()) provides a means for waiting on this or another task's read if in progress, + * namely aio_suspend(), which this extension uses. Therefore, although StartBufferIO and TerminateBufferIO + * are called as part of asynchronous prefetching, their role is limited to maintaining the buffer desc flags, + * and they do not track the asynchronous IO itself. Instead, asynchronous IOs are tracked in + * a separate set of shared control blocks, the BufferAiocb list - + * refer to include/storage/buf_internals.h and storage/buffer/buf_init.c + * + * Another implication of asynchronous IO concerns buffer pinning. + * The buffer used for the prefetch is pinned before aio_read is issued. + * It is expected that the same task (and possibly others) will later ask to read the page + * and eventually release and unpin the buffer. + * However, if the task which issued the aio_read later decides not to read the page, + * and return code indicates delta_pin_count > 0 (see below) + * it *must* instead issue a DiscardBuffer() (see function later in this file) + * so that its pin is released. + * Therefore, each client which uses the PrefetchBuffer service must either always read all + * prefetched pages, or keep track of prefetched pages and discard unread ones at end of scan. + * + * return code: is an int bitmask defined in bufmgr.h + PREFTCHRC_BUF_PIN_INCREASED 0x01 pin count on buffer has been increased by 1 + PREFTCHRC_BLK_ALREADY_PRESENT 0x02 block was already present in a buffer + * + * PREFTCHRC_BLK_ALREADY_PRESENT is a hint to caller that the prefetch may be unnecessary + */ +int +PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum , BufferAccessStrategy strategy) +{ + Buffer buf_id; /* indicates buffer containing the requested block */ + int PrefetchBufferRc = 0; /* return value as described above */ + int PinCountOnEntry = 0; /* pin count on entry */ + int PinCountdelta = 0; /* pin count delta increase */ + + #ifdef USE_PREFETCH + + buf_id = -1; Assert(RelationIsValid(reln)); Assert(BlockNumberIsValid(blockNum)); @@ -145,8 +195,13 @@ PrefetchBuffer(Relation reln, ForkNumber { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ + int BufStartAsyncrc = -1; /* retcode from BufStartAsync : + ** 0 if started successfully (which implies buffer was newly pinned ) + ** -1 if failed for some reason + ** 1+PrivateRefCount if we found desired buffer in buffer pool + ** and we set it likewise if we find buffer in buffer pool + */ LWLock *newPartitionLock; /* buffer partition lock for it */ - int buf_id; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node, @@ -158,28 +213,119 @@ PrefetchBuffer(Relation reln, ForkNumber /* see if the block is in the buffer pool already */ LWLockAcquire(newPartitionLock, LW_SHARED); - buf_id = BufTableLookup(&newTag, newHash); + buf_id = (Buffer)BufTableLookup(&newTag, newHash); + if (buf_id >= 0) { + PinCountOnEntry = PrivateRefCount[buf_id]; /* pin count on entry */ + BufStartAsyncrc = 1 + PinCountOnEntry; /* indicate this backends pin count - see above comment */ + PrefetchBufferRc = PREFTCHRC_BLK_ALREADY_PRESENT; /* indicate buffer present */ + } else { + PrefetchBufferRc = 0; /* indicate buffer not present */ + } LWLockRelease(newPartitionLock); + not_in_buffers: /* If not in buffers, initiate prefetch */ - if (buf_id < 0) + if (buf_id < 0) { + /* try using async aio_read with a buffer */ +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + BufStartAsyncrc = BufStartAsync( reln, forkNum, blockNum , strategy ); + if (BufStartAsyncrc < 0) { + pgBufferUsage.aio_read_noblok++; + } +#else /* not USE_AIO_ATOMIC_BUILTIN_COMP_SWAP so try the alternative that does not read the block into a postgresql buffer */ smgrprefetch(reln->rd_smgr, forkNum, blockNum); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + } - /* - * If the block *is* in buffers, we do nothing. This is not really - * ideal: the block might be just about to be evicted, which would be - * stupid since we know we are going to need it soon. But the only - * easy answer is to bump the usage_count, which does not seem like a - * great solution: when the caller does ultimately touch the block, - * usage_count would get bumped again, resulting in too much - * favoritism for blocks that are involved in a prefetch sequence. A - * real fix would involve some additional per-buffer state, and it's - * not clear that there's enough of a problem to justify that. + if ( (buf_id >= 0) || (BufStartAsyncrc >= 1) ) { + /* The block *is* in buffers. */ +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + pgBufferUsage.aio_read_noneed++; +#ifndef USE_PREFETCH_BUT_DONT_PIN_ALREADY_PRESENT /* jury is out on whether the following wins but it ought to ... */ + /* + ** If this backend already had pinned it, + ** or another backend had banked a pin on it, + ** or there is an IO in progress, + ** or it is not marked valid, + ** then do nothing. + ** Otherwise pin it and mark the buffer's pin as banked by this backend. + ** Note - it may or not be pinned by another backend - + ** it is ok for us to bank a pin on it + ** *provided* the other backend did not bank its pin. + ** The reason for this is that the banked-pin indicator is global - + ** it can identify at most one process. + */ + /* pgBufferUsage.aio_read_wasted++; overload counter - only for debugging */ + if (BufStartAsyncrc == 1) { /* not pinned by me */ + /* pgBufferUsage.aio_read_wasted++; overload counter - only for debugging */ + /* note - all we can say with certainty is that the buffer is not pinned by me + ** we cannot be sure that it is still in buffer pool + ** so must go through the entire locking and searching all over again ... */ + LWLockAcquire(newPartitionLock, LW_SHARED); + buf_id = (Buffer)BufTableLookup(&newTag, newHash); + /* If in buffers, proceed */ + if (buf_id >= 0) { + /* since the block is now present, + ** save the current pin count to ensure final delta is calculated correctly + */ + PinCountOnEntry = PrivateRefCount[buf_id]; /* pin count on entry */ + if ( PinCountOnEntry == 0) { /* paranoid check it's still not pinned by me */ + volatile BufferDesc *buf_desc; + + buf_desc = &BufferDescriptors[buf_id]; /* found buffer descriptor */ + LockBufHdr(buf_desc); + if ( (buf_desc->flags & BM_VALID) /* buffer is valid */ + && (!(buf_desc->flags & (BM_IO_IN_PROGRESS|BM_AIO_IN_PROGRESS|BM_AIO_PREFETCH_PIN_BANKED))) /* buffer is not any of ... */ + ) { + buf_desc->flags |= BM_AIO_PREFETCH_PIN_BANKED; /* bank the pin for next use by this task */ + /* note - we can call PinBuffer_Locked with the BM_AIO_PREFETCH_PIN_BANKED flag set because it is not yet pinned by me */ + buf_desc->freeNext = -(this_backend_pid); /* remember which pid banked it */ + /* pgBufferUsage.aio_read_wasted--; overload counter - not wasted after all - only for debugging */ + + /* Make sure we will have room to remember the buffer pin */ + ResourceOwnerEnlargeBuffers(CurrentResourceOwner); + + PinBuffer_Locked(buf_desc); } -#endif /* USE_PREFETCH */ + else { + UnlockBufHdr(buf_desc); + } + } + } + LWLockRelease(newPartitionLock); + /* although unlikely, maybe it was evicted while we were puttering about */ + if (buf_id < 0) { + pgBufferUsage.aio_read_noneed--; /* back out the accounting */ + goto not_in_buffers; /* and try again */ + } + } +#endif /* USE_PREFETCH_BUT_DONT_PIN_ALREADY_PRESENT */ + +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + + } + + if (buf_id >= 0) { + PinCountdelta = PrivateRefCount[buf_id] - PinCountOnEntry; /* pin count delta increase */ + if ( (PinCountdelta < 0) || (PinCountdelta > 1) ) { + elog(ERROR, + "PrefetchBuffer #%d : incremented pin count by %d on bufdesc %p refcount %u localpins %d\n" + ,(buf_id+1) , PinCountdelta , &BufferDescriptors[buf_id] ,BufferDescriptors[buf_id].refcount , PrivateRefCount[buf_id]); } + } else + if (BufStartAsyncrc == 0) { /* aio started successfully (which implies buffer was newly pinned ) */ + PinCountdelta = 1; + } + + /* set final PrefetchBufferRc according to previous value */ + PrefetchBufferRc |= PinCountdelta; /* set the PREFTCHRC_BUF_PIN_INCREASED bit */ + } + +#endif /* USE_PREFETCH */ + return PrefetchBufferRc; /* return value as described above */ +} /* * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main @@ -252,7 +398,7 @@ ReadBufferExtended(Relation reln, ForkNu */ pgstat_count_buffer_read(reln); buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence, - forkNum, blockNum, mode, strategy, &hit); + forkNum, blockNum, mode, strategy, &hit , 0); if (hit) pgstat_count_buffer_hit(reln); return buf; @@ -280,7 +426,7 @@ ReadBufferWithoutRelcache(RelFileNode rn Assert(InRecovery); return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum, - mode, strategy, &hit); + mode, strategy, &hit , 0); } @@ -288,15 +434,18 @@ ReadBufferWithoutRelcache(RelFileNode rn * ReadBuffer_common -- common logic for all ReadBuffer variants * * *hit is set to true if the request was satisfied from shared buffer cache. + * index_for_aio , if -ve , is negative of ( index of the aiocb in the BufferAiocbs array + 3 ) + * which is passed through to StartBufferIO */ -static Buffer +Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, - BufferAccessStrategy strategy, bool *hit) + BufferAccessStrategy strategy, bool *hit , int index_for_aio ) { volatile BufferDesc *bufHdr; Block bufBlock; bool found; + int allocrc; /* retcode from BufferAlloc */ bool isExtend; bool isLocalBuf = SmgrIsTemp(smgr); @@ -328,16 +477,40 @@ ReadBuffer_common(SMgrRelation smgr, cha } else { + allocrc = mode; /* pass mode to BufferAlloc since it must not wait for async io if RBM_NOREAD_FOR_PREFETCH */ /* * lookup the buffer. IO_IN_PROGRESS is set if the requested block is * not currently in memory. */ bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found); - if (found) - pgBufferUsage.shared_blks_hit++; + strategy, &allocrc , index_for_aio ); + if (allocrc < 0) { + if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page header in block %u of relation %s; zeroing out page", + blockNum, + relpath(smgr->smgr_rnode, forkNum)))); + bufBlock = BufHdrGetBlock(bufHdr); + MemSet((char *) bufBlock, 0, BLCKSZ); + } else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page header in block %u of relation %s", + blockNum, + relpath(smgr->smgr_rnode, forkNum)))); + found = true; + } + else if (allocrc > 0) { + pgBufferUsage.shared_blks_hit++; + found = true; + } + else { pgBufferUsage.shared_blks_read++; + found = false; + } } /* At this point we do NOT hold any locks. */ @@ -410,7 +583,7 @@ ReadBuffer_common(SMgrRelation smgr, cha Assert(bufHdr->flags & BM_VALID); bufHdr->flags &= ~BM_VALID; UnlockBufHdr(bufHdr); - } while (!StartBufferIO(bufHdr, true)); + } while (!StartBufferIO(bufHdr, true, 0)); } } @@ -430,6 +603,7 @@ ReadBuffer_common(SMgrRelation smgr, cha bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + if (mode != RBM_NOREAD_FOR_PREFETCH) { if (isExtend) { /* new buffers are zero-filled */ @@ -499,6 +673,7 @@ ReadBuffer_common(SMgrRelation smgr, cha VacuumPageMiss++; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageMiss; + } TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, smgr->smgr_rnode.node.spcNode, @@ -520,21 +695,39 @@ ReadBuffer_common(SMgrRelation smgr, cha * the default strategy. The selected buffer's usage_count is advanced when * using the default strategy, but otherwise possibly not (see PinBuffer). * - * The returned buffer is pinned and is already marked as holding the - * desired page. If it already did have the desired page, *foundPtr is - * set TRUE. Otherwise, *foundPtr is set FALSE and the buffer is marked + * index_for_aio is an input parm which, if non-zero, identifies a BufferAiocb + * acquired by caller and to be used for any StartBufferIO performed by this routine. + * In this case, if block not found in buffer pool and we allocate a new buffer, + * then we must maintain the spinlock on the buffer and pass it back to caller. + * + * foundPtr is input and output : + * . input - indicates the read-buffer mode ( see bufmgr.h ) + * . output - indicates the status of the buffer - see below + * + * Except for the case of RBM_NOREAD_FOR_PREFETCH and buffer is found, + * the returned buffer is pinned and is already marked as holding the + * desired page. + * If it already did have the desired page and page content is valid, + * *foundPtr is set to 1 + * If it already did have the desired page and mode is RBM_NOREAD_FOR_PREFETCH + * and StartBufferIO returned false + * (meaning it could not initialise the buffer for aio) + * *foundPtr is set to 2 + * If it already did have the desired page but page content is invalid, + * *foundPtr is set to -1 + * this can happen only if the buffer was read by an async read + * and the aio is still in progress or pinned by the issuer of the startaio. + * Otherwise, *foundPtr is set to 0 and the buffer is marked * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. - * - * No locks are held either at entry or exit. + * No locks are held either at entry or exit EXCEPT for case noted above + * of passing an empty buffer back to async io caller ( index_for_aio set ). */ static volatile BufferDesc * BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, - bool *foundPtr) + int *foundPtr , int index_for_aio ) { BufferTag newTag; /* identity of requested block */ uint32 newHash; /* hash value for newTag */ @@ -546,6 +739,13 @@ BufferAlloc(SMgrRelation smgr, char relp int buf_id; volatile BufferDesc *buf; bool valid; + int IntentionBufferrc; /* retcode from BufCheckAsync */ + bool StartBufferIOrc; /* retcode from StartBufferIO */ + ReadBufferMode mode; + + + mode = *foundPtr; + *foundPtr = 0; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -560,21 +760,53 @@ BufferAlloc(SMgrRelation smgr, char relp if (buf_id >= 0) { /* - * Found it. Now, pin the buffer so no one can steal it from the - * buffer pool, and check to see if the correct data has been loaded - * into the buffer. + * Found it. */ + *foundPtr = 1; buf = &BufferDescriptors[buf_id]; - valid = PinBuffer(buf, strategy); - - /* Can release the mapping lock as soon as we've pinned it */ + /* If prefetch mode, then return immediately indicating found, + ** and NOTE in this case only, we did not pin buffer. + ** In theory we might try to check whether the buffer is valid, io in progress, etc + ** but in practice it is simpler to abandon the prefetch if the buffer exists + */ + if (mode == RBM_NOREAD_FOR_PREFETCH) { + /* release the mapping lock and return */ LWLockRelease(newPartitionLock); + } else { + /* note that the current request is for same tag as the one associated with the aio - + ** so simply complete the aio and we have our buffer. + ** If an aio was started on this buffer, + ** check complete and wait for it if not. + ** And, if aio had been started, then the task + ** which issued the start aio already pinned it for this read, + ** so if that task was me and the aio was successful, + ** pass the current pin to this read without dropping and re-acquiring. + ** this is all done by BufCheckAsync + */ + IntentionBufferrc = BufCheckAsync(smgr , 0 , buf, BUF_INTENTION_WANT , strategy , index_for_aio , false , newPartitionLock ); - *foundPtr = TRUE; + /* check to see if the correct data has been loaded into the buffer. */ + valid = (IntentionBufferrc == BUF_INTENT_RC_VALID); - if (!valid) - { + /* check for serious IO errors */ + if (!valid) { + if ( (IntentionBufferrc != BUF_INTENT_RC_INVALID_NO_AIO) + && (IntentionBufferrc != BUF_INTENT_RC_INVALID_AIO) + ) { + *foundPtr = -1; /* inform caller of serious error */ + } + else + if (IntentionBufferrc == BUF_INTENT_RC_INVALID_AIO) { + goto proceed_with_not_found; /* yes, I know, a goto ... think of it as a break out of the if */ + } + } + + /* BufCheckAsync pinned the buffer */ + /* so can now release the mapping lock */ + LWLockRelease(newPartitionLock); + + if (!valid) { /* * We can only get here if (a) someone else is still reading in * the page, or (b) a previous read attempt failed. We have to @@ -582,19 +814,21 @@ BufferAlloc(SMgrRelation smgr, char relp * own read attempt if the page is still not BM_VALID. * StartBufferIO does it all. */ - if (StartBufferIO(buf, true)) + if (StartBufferIO(buf, true, index_for_aio)) { /* * If we get here, previous attempts to read the buffer must * have failed ... but we shall bravely try again. */ - *foundPtr = FALSE; + *foundPtr = 0; + } } } return buf; } + proceed_with_not_found: /* * Didn't find it in the buffer pool. We'll have to initialize a new * buffer. Remember to unlock the mapping lock while doing the work. @@ -619,8 +853,10 @@ BufferAlloc(SMgrRelation smgr, char relp /* Must copy buffer flags while we still hold the spinlock */ oldFlags = buf->flags; - /* Pin the buffer and then release the buffer spinlock */ - PinBuffer_Locked(buf); + /* If an aio was started on this buffer, + ** check complete and cancel it if not. + */ + BufCheckAsync(smgr , 0 , buf, BUF_INTENTION_REJECT_OBTAIN_PIN , 0 , index_for_aio, true , 0 ); /* Now it's safe to release the freelist lock */ if (lock_held) @@ -791,13 +1027,18 @@ BufferAlloc(SMgrRelation smgr, char relp * then set up our own read attempt if the page is still not * BM_VALID. StartBufferIO does it all. */ - if (StartBufferIO(buf, true)) + StartBufferIOrc = StartBufferIO(buf, true , index_for_aio); /* retcode from StartBufferIO */ + if (StartBufferIOrc) { /* * If we get here, previous attempts to read the buffer * must have failed ... but we shall bravely try again. */ - *foundPtr = FALSE; + *foundPtr = 0; + } else + if (mode == RBM_NOREAD_FOR_PREFETCH) { + UnpinBuffer(buf, true); /* must unpin if RBM_NOREAD_FOR_PREFETCH and found */ + *foundPtr = 2; /* inform BufStartAsync that buffer must not be used */ } } @@ -860,10 +1101,17 @@ BufferAlloc(SMgrRelation smgr, char relp * lock. If StartBufferIO returns false, then someone else managed to * read it before we did, so there's nothing left for BufferAlloc() to do. */ - if (StartBufferIO(buf, true)) - *foundPtr = FALSE; - else - *foundPtr = TRUE; + StartBufferIOrc = StartBufferIO(buf, true , index_for_aio); /* retcode from StartBufferIO */ + if (StartBufferIOrc) { + *foundPtr = 0; + } else { + if (mode == RBM_NOREAD_FOR_PREFETCH) { + UnpinBuffer(buf, true); /* must unpin if RBM_NOREAD_FOR_PREFETCH and found */ + *foundPtr = 2; /* inform BufStartAsync that buffer must not be used */ + } else { + *foundPtr = 1; + } + } return buf; } @@ -970,6 +1218,10 @@ retry: /* * Insert the buffer at the head of the list of free buffers. */ + /* avoid confusing freelist with strange-looking freeNext */ + if (buf->freeNext <= FREENEXT_BAIOCB_ORIGIN) { /* means was used for aiocb index */ + buf->freeNext = FREENEXT_NOT_IN_LIST; + } StrategyFreeBuffer(buf); } @@ -1022,6 +1274,56 @@ MarkBufferDirty(Buffer buffer) UnlockBufHdr(bufHdr); } +/* return the blocknum of the block in a buffer if it is valid +** if a shared buffer, it must be pinned +*/ +BlockNumber +BlocknumOfBuffer(Buffer buffer) +{ + volatile BufferDesc *bufHdr; + BlockNumber rc = 0; + + if (BufferIsValid(buffer)) { + if (BufferIsLocal(buffer)) { + bufHdr = &LocalBufferDescriptors[-buffer - 1]; + } else { + bufHdr = &BufferDescriptors[buffer - 1]; + } + + rc = bufHdr->tag.blockNum; + } + + return rc; +} + +/* report whether specified buffer contains same or different block +** if a shared buffer, it must be pinned +*/ +bool +BlocknotinBuffer(Buffer buffer, + Relation relation, + BlockNumber blockNum) +{ + volatile BufferDesc *bufHdr; + bool rc = false; + + if (BufferIsValid(buffer)) { + if (BufferIsLocal(buffer)) { + bufHdr = &LocalBufferDescriptors[-buffer - 1]; + } else { + bufHdr = &BufferDescriptors[buffer - 1]; + } + + rc = + ( (bufHdr->tag.blockNum != blockNum) + || (!(RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) )) + || (bufHdr->tag.forkNum != MAIN_FORKNUM) + ); + } + + return rc; +} + /* * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer() * @@ -1040,18 +1342,18 @@ ReleaseAndReadBuffer(Buffer buffer, Relation relation, BlockNumber blockNum) { - ForkNumber forkNum = MAIN_FORKNUM; volatile BufferDesc *bufHdr; + bool isDifferentBlock; /* requesting different block from that already in buffer ? */ if (BufferIsValid(buffer)) { + /* if a shared buff, we have pin, so it's ok to examine tag without spinlock */ + isDifferentBlock = BlocknotinBuffer(buffer,relation,blockNum); /* requesting different block from that already in buffer ? */ if (BufferIsLocal(buffer)) { Assert(LocalRefCount[-buffer - 1] > 0); bufHdr = &LocalBufferDescriptors[-buffer - 1]; - if (bufHdr->tag.blockNum == blockNum && - RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) && - bufHdr->tag.forkNum == forkNum) + if (!isDifferentBlock) return buffer; ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); LocalRefCount[-buffer - 1]--; @@ -1060,12 +1362,12 @@ ReleaseAndReadBuffer(Buffer buffer, { Assert(PrivateRefCount[buffer - 1] > 0); bufHdr = &BufferDescriptors[buffer - 1]; - /* we have pin, so it's ok to examine tag without spinlock */ - if (bufHdr->tag.blockNum == blockNum && - RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) && - bufHdr->tag.forkNum == forkNum) + BufCheckAsync(0 , relation , bufHdr , ( isDifferentBlock ? BUF_INTENTION_REJECT_FORGET + : BUF_INTENTION_REJECT_KEEP_PIN ) + , 0 , 0 , false , 0 ); /* end any IO and maybe unpin */ + if (!isDifferentBlock) { return buffer; - UnpinBuffer(bufHdr, true); + } } } @@ -1090,11 +1392,12 @@ ReleaseAndReadBuffer(Buffer buffer, * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows * some callers to avoid an extra spinlock cycle. */ -static bool +bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy) { int b = buf->buf_id; bool result; + bool pin_already_banked_by_me = 0; /* buffer is already pinned by me and redeemable */ if (PrivateRefCount[b] == 0) { @@ -1116,12 +1419,34 @@ PinBuffer(volatile BufferDesc *buf, Buff else { /* If we previously pinned the buffer, it must surely be valid */ + /* Errr - is that really true ??? I don't think so : + ** what if I pin, try an IO, in progress, then mistakenly pin again result = true; + */ + LockBufHdr(buf); + pin_already_banked_by_me = ( (PrivateRefCount[b] > 0) && (buf->flags & BM_AIO_PREFETCH_PIN_BANKED) + && ( ( (buf->flags & BM_AIO_IN_PROGRESS) ? ( ((BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf->freeNext))->pidOfAio ) + : (-(buf->freeNext)) ) == this_backend_pid ) + ); + if (pin_already_banked_by_me) { + elog(LOG, "PinBuffer : on buffer %d with banked pin rel=%s, blockNum=%u, with flags %X refcount=%u" + ,buf->buf_id,relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum) + ,buf->tag.blockNum, buf->flags, buf->refcount); + buf->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; + if (!(buf->flags & BM_AIO_IN_PROGRESS)) { + buf->freeNext = FREENEXT_NOT_IN_LIST; /* forget the bank client */ } + } + result = (buf->flags & BM_VALID) != 0; + UnlockBufHdr(buf); + } + + if (!pin_already_banked_by_me) { PrivateRefCount[b]++; Assert(PrivateRefCount[b] > 0); ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); + } return result; } @@ -1138,19 +1463,36 @@ PinBuffer(volatile BufferDesc *buf, Buff * to save a spin lock/unlock cycle, because we need to pin a buffer before * its state can change under us. */ -static void +void PinBuffer_Locked(volatile BufferDesc *buf) { int b = buf->buf_id; + bool pin_already_banked_by_me; /* buffer is already pinned by me and redeemable */ - if (PrivateRefCount[b] == 0) + pin_already_banked_by_me = ( (PrivateRefCount[b] > 0) && (buf->flags & BM_AIO_PREFETCH_PIN_BANKED) + && ( ( (buf->flags & BM_AIO_IN_PROGRESS) ? ( ((BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf->freeNext))->pidOfAio ) + : (-(buf->freeNext)) ) == this_backend_pid ) + ); + if (PrivateRefCount[b] == 0) { buf->refcount++; + } + if (pin_already_banked_by_me) { + elog(LOG, "PinBuffer_Locked : on buffer %d with banked pin rel=%s, blockNum=%u, with flags %X refcount=%u" + ,buf->buf_id,relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum) + ,buf->tag.blockNum, buf->flags, buf->refcount); + buf->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; + if (!(buf->flags & BM_AIO_IN_PROGRESS)) { + buf->freeNext = FREENEXT_NOT_IN_LIST; /* forget the bank client */ + } + } UnlockBufHdr(buf); + if (!pin_already_banked_by_me) { PrivateRefCount[b]++; Assert(PrivateRefCount[b] > 0); ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); } +} /* * UnpinBuffer -- make buffer available for replacement. @@ -1160,29 +1502,68 @@ PinBuffer_Locked(volatile BufferDesc *bu * Most but not all callers want CurrentResourceOwner to be adjusted. * Those that don't should pass fixOwner = FALSE. */ -static void +void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner) { + int b = buf->buf_id; + bool pin_already_banked_by_me; /* buffer is already pinned by me and redeemable */ - if (fixOwner) + if (fixOwner) { ResourceOwnerForgetBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); + } Assert(PrivateRefCount[b] > 0); PrivateRefCount[b]--; if (PrivateRefCount[b] == 0) { + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(buf->content_lock)); Assert(!LWLockHeldByMe(buf->io_in_progress_lock)); LockBufHdr(buf); + /* this backend has released last pin - buffer should not have pin banked by me + ** and if AIO in progress then there should be another backend pin + */ + pin_already_banked_by_me = ( (buf->flags & BM_AIO_PREFETCH_PIN_BANKED) + && ( ( (buf->flags & BM_AIO_IN_PROGRESS) + ? ( ((BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf->freeNext))->pidOfAio ) + : (-(buf->freeNext)) + ) == this_backend_pid ) + ); + if (pin_already_banked_by_me) { + /* this is a strange situation - caller had a banked pin (which callers are supposed not to know about) + ** but either discovered it had it or has over-counted how many pins it has + */ + buf->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; /* redeem the pin although it is now of no use since about to release */ + if (!(buf->flags & BM_AIO_IN_PROGRESS)) { + buf->freeNext = FREENEXT_NOT_IN_LIST; /* forget the bank client */ + } + + /* temporarily suppress logging error to avoid performance degradation - + ** either this task really does not need the buffer in which case the error is harmless + ** or a more severe error will be detected later (possibly immediately below) + elog(LOG, "UnpinBuffer : released last this-backend pin on buffer %d rel=%s, blockNum=%u, but had banked pin flags %X refcount=%u" + ,buf->buf_id,relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum) + ,buf->tag.blockNum, buf->flags, buf->refcount); + */ + } + /* Decrement the shared reference count */ Assert(buf->refcount > 0); buf->refcount--; + if ( (buf->refcount == 0) && (buf->flags & BM_AIO_IN_PROGRESS) ) { + + elog(ERROR, "UnpinBuffer : released last any-backend pin on buffer %d rel=%s, blockNum=%u, but AIO in progress flags %X refcount=%u" + ,buf->buf_id,relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum) + ,buf->tag.blockNum, buf->flags, buf->refcount); + } + + /* Support LockBufferForCleanup() */ if ((buf->flags & BM_PIN_COUNT_WAITER) && buf->refcount == 1) @@ -1657,6 +2038,7 @@ SyncOneBuffer(int buf_id, bool skip_rece volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id]; int result = 0; + /* * Check whether buffer needs writing. * @@ -1789,6 +2171,8 @@ PrintBufferLeakWarning(Buffer buffer) char *path; BackendId backend; + + Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { @@ -1799,12 +2183,28 @@ PrintBufferLeakWarning(Buffer buffer) else { buf = &BufferDescriptors[buffer - 1]; +#ifdef USE_PREFETCH + /* If reason that this buffer is pinned + ** is that it was prefetched with async_io + ** and never read or discarded, then omit the + ** warning, because this is expected in some + ** cases when a scan is closed abnormally. + ** Note that the buffer will be released soon by our caller. + */ + if (buf->flags & BM_AIO_PREFETCH_PIN_BANKED) { + pgBufferUsage.aio_read_forgot++; /* account for it */ + return; + } +#endif /* USE_PREFETCH */ loccount = PrivateRefCount[buffer - 1]; backend = InvalidBackendId; } +/* #if defined(DO_ISCAN_DISCARD_BUFFER) && defined(DO_HEAPSCAN_DISCARD_BUFFER) not yet working safely 130530 */ /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + + elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", @@ -1812,6 +2212,7 @@ PrintBufferLeakWarning(Buffer buffer) buf->tag.blockNum, buf->flags, buf->refcount, loccount); pfree(path); +/* #endif defined(DO_ISCAN_DISCARD_BUFFER) && defined(DO_HEAPSCAN_DISCARD_BUFFER) not yet working safely 130530 */ } /* @@ -1928,7 +2329,7 @@ FlushBuffer(volatile BufferDesc *buf, SM * false, then someone else flushed the buffer before we could, so we need * not do anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, 0)) return; /* Setup error traceback support for ereport() */ @@ -2512,6 +2913,70 @@ FlushDatabaseBuffers(Oid dbid) } } +#ifdef USE_PREFETCH +/* + * DiscardBuffer -- discard shared buffer used for a previously + * prefetched but unread block of a relation + * + * If the buffer is found and pinned with a banked pin, then : + * . if AIO in progress, terminate AIO without waiting + * . if AIO had already completed successfully, + * then mark buffer valid (in case someone else wants it) + * . redeem the banked pin and unpin it. + * + * This function is similar in purpose to ReleaseBuffer (below) + * but sufficiently different that it is a separate function. + * Two important differences are : + * . caller identifies buffer by blocknumber, not buffer number + * . we unpin buffer *only* if the pin is banked, + * *never* if pinned but not banked. + * This is essential as caller may perform a sequence of + * SCAN1 . PrefetchBuffer (and remember block was prefetched) + * SCAN2 . ReadBuffer (but fails to connect this read to the prefetch by SCAN1) + * SCAN1 . DiscardBuffer (SCAN1 terminates early) + * SCAN2 . access tuples in buffer + * Clearly the Discard *must not* unpin the buffer since SCAN2 needs it! + * + * + * caller may pass InvalidBlockNumber as blockNum to mean do nothing + */ +void +DiscardBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum) +{ + BufferTag newTag; /* identity of requested block */ + uint32 newHash; /* hash value for newTag */ + LWLockId newPartitionLock; /* buffer partition lock for it */ + Buffer buf_id; + volatile BufferDesc *buf_desc; + + if (!SmgrIsTemp(reln->rd_smgr)) { + Assert(RelationIsValid(reln)); + if (BlockNumberIsValid(blockNum)) { + + /* create a tag so we can lookup the buffer */ + INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node, + forkNum, blockNum); + + /* determine its hash code and partition lock ID */ + newHash = BufTableHashCode(&newTag); + newPartitionLock = BufMappingPartitionLock(newHash); + + /* see if the block is in the buffer pool already */ + LWLockAcquire(newPartitionLock, LW_SHARED); + buf_id = BufTableLookup(&newTag, newHash); + LWLockRelease(newPartitionLock); + + /* If in buffers, proceed */ + if (buf_id >= 0) { + buf_desc = &BufferDescriptors[buf_id]; /* found buffer descriptor */ + BufCheckAsync(0 , reln, buf_desc , BUF_INTENTION_REJECT_UNBANK , 0 , 0 , false , 0); /* end the IO and unpin if banked */ + pgBufferUsage.aio_read_discrd++; /* account for it */ + } + } + } +} +#endif /* USE_PREFETCH */ + /* * ReleaseBuffer -- release the pin on a buffer */ @@ -2520,26 +2985,23 @@ ReleaseBuffer(Buffer buffer) { volatile BufferDesc *bufHdr; + if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); - ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); if (BufferIsLocal(buffer)) { + ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer); Assert(LocalRefCount[-buffer - 1] > 0); LocalRefCount[-buffer - 1]--; return; } - - bufHdr = &BufferDescriptors[buffer - 1]; - - Assert(PrivateRefCount[buffer - 1] > 0); - - if (PrivateRefCount[buffer - 1] > 1) - PrivateRefCount[buffer - 1]--; else - UnpinBuffer(bufHdr, false); + { + bufHdr = &BufferDescriptors[buffer - 1]; + BufCheckAsync(0 , 0 , bufHdr , BUF_INTENTION_REJECT_NOADJUST , 0 , 0 , false , 0 ); + } } /* @@ -2565,14 +3027,41 @@ UnlockReleaseBuffer(Buffer buffer) void IncrBufferRefCount(Buffer buffer) { + bool pin_already_banked_by_me = false; /* buffer is already pinned by me and redeemable */ + volatile BufferDesc *buf; /* descriptor for a shared buffer */ + Assert(BufferIsPinned(buffer)); + + if (!(BufferIsLocal(buffer))) { + buf = &BufferDescriptors[buffer - 1]; + LockBufHdr(buf); + pin_already_banked_by_me = + ( (buf->flags & BM_AIO_PREFETCH_PIN_BANKED) + && ( ( (buf->flags & BM_AIO_IN_PROGRESS) ? ( ((BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf->freeNext))->pidOfAio ) + : (-(buf->freeNext)) ) == this_backend_pid ) + ); + } + + if (!pin_already_banked_by_me) { ResourceOwnerEnlargeBuffers(CurrentResourceOwner); ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer); + } + if (BufferIsLocal(buffer)) LocalRefCount[-buffer - 1]++; - else + else { + if (pin_already_banked_by_me) { + buf->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; + if (!(buf->flags & BM_AIO_IN_PROGRESS)) { + buf->freeNext = FREENEXT_NOT_IN_LIST; /* forget the bank client */ + } + } + UnlockBufHdr(buf); + if (!pin_already_banked_by_me) { PrivateRefCount[buffer - 1]++; } + } +} /* * MarkBufferDirtyHint @@ -2994,61 +3483,138 @@ WaitIO(volatile BufferDesc *buf) * * In some scenarios there are race conditions in which multiple backends * could attempt the same I/O operation concurrently. If someone else - * has already started I/O on this buffer then we will block on the + * has already started synchronous I/O on this buffer then we will block on the * io_in_progress lock until he's done. * + * if an async io is in progress and we are doing synchronous io, + * then readbuffer uses call to smgrcompleteaio to wait, + * and so we treat this request as if no io in progress + * * Input operations are only attempted on buffers that are not BM_VALID, * and output operations only on buffers that are BM_VALID and BM_DIRTY, * so we can always tell if the work is already done. * + * index_for_aio is an input parm which, if non-zero, identifies a BufferAiocb + * acquired by caller and to be attached to the buffer header for use with async io + * * Returns TRUE if we successfully marked the buffer as I/O busy, * FALSE if someone else already did the work. */ static bool -StartBufferIO(volatile BufferDesc *buf, bool forInput) +StartBufferIO(volatile BufferDesc *buf, bool forInput , int index_for_aio ) { +#ifdef USE_PREFETCH + struct BufferAiocb volatile * BAiocb = (struct BufferAiocb*)0; /* BufferAiocb for use with aio */ +#endif /* USE_PREFETCH */ + + if (!index_for_aio) Assert(!InProgressBuf); for (;;) { + if (!index_for_aio) { /* * Grab the io_in_progress lock so that other processes can wait for * me to finish the I/O. */ LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE); + } LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + /* the following test is intended to distinguish between : + ** . buffer which : + ** . has io in progress + ** AND is not associated with a current aio + ** . not the above + ** Here, "recent" means an aio marked by buf->freeNext <= FREENEXT_BAIOCB_ORIGIN but no longer in progress - + ** this situation arises when the aio has just been cancelled and this process now wishes to recycle the buffer. + ** In this case, the first such would-be recycler (i.e. me) must : + ** . avoid waiting for the cancelled aio to complete + ** . if not myself doing async read, then assume responsibility for posting other future readbuffers. + */ + if ( (buf->flags & BM_AIO_IN_PROGRESS) + || (!(buf->flags & BM_IO_IN_PROGRESS)) + ) break; /* - * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress + * The only way BM_IO_IN_PROGRESS without AIO in progress could be set when the io_in_progress * lock isn't held is if the process doing the I/O is recovering from * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ UnlockBufHdr(buf); + if (!index_for_aio) { LWLockRelease(buf->io_in_progress_lock); + } WaitIO(buf); } - /* Once we get here, there is definitely no I/O active on this buffer */ - +#ifdef USE_PREFETCH + /* Once we get here, there is definitely no synchronous I/O active on this buffer + ** but if being asked to attach a BufferAiocb to the buf header, + ** then we must also check if there is any async io currently + ** in progress or pinned started by a different task. + */ + if (index_for_aio) { + BAiocb = (BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf->freeNext); + if ( (buf->flags & (BM_AIO_IN_PROGRESS|BM_AIO_PREFETCH_PIN_BANKED)) + && (buf->freeNext <= FREENEXT_BAIOCB_ORIGIN) + && (BAiocb->pidOfAio != this_backend_pid) + ) { + /* someone else already doing async I/O */ + UnlockBufHdr(buf); + return false; + } + } +#endif /* USE_PREFETCH */ if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) { /* someone else already did the I/O */ UnlockBufHdr(buf); + if (!index_for_aio) { LWLockRelease(buf->io_in_progress_lock); + } return false; } buf->flags |= BM_IO_IN_PROGRESS; +#ifdef USE_PREFETCH + if (index_for_aio) { + BAiocb = (BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - index_for_aio); + /* insist that no other buffer is using this BufferAiocb for async IO */ + if (BAiocb->BAiocbbufh == (struct sbufdesc *)0) { + BAiocb->BAiocbbufh = buf; + } + if (BAiocb->BAiocbbufh != buf) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("AIO control block %p to be used by %p already in use by %p" + ,BAiocb ,buf , BAiocb->BAiocbbufh))); + } + /* note - there is no need to register self as an dependent of BAiocb + ** as we shall not unlock buf_desc before we free the BAiocb + */ + + buf->flags |= BM_AIO_IN_PROGRESS; + buf->freeNext = index_for_aio; + /* at this point, this buffer appears to have an in-progress aio_read, + ** and any other task which is able to look inside the buffer might try waiting on that aio - + ** except we have not yet issued the aio! So we must keep the buffer header locked + ** from here all the way back to the BufStartAsync caller + */ + } else { +#endif /* USE_PREFETCH */ + UnlockBufHdr(buf); InProgressBuf = buf; IsForInput = forInput; +#ifdef USE_PREFETCH + } +#endif /* USE_PREFETCH */ return true; } @@ -3058,7 +3624,7 @@ StartBufferIO(volatile BufferDesc *buf, * (Assumptions) * My process is executing IO for the buffer * BM_IO_IN_PROGRESS bit is set for the buffer - * We hold the buffer's io_in_progress lock + * if no async IO in progress, then We hold the buffer's io_in_progress lock * The buffer is Pinned * * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the @@ -3070,26 +3636,32 @@ StartBufferIO(volatile BufferDesc *buf, * BM_IO_ERROR in a failure case. For successful completion it could * be 0, or BM_VALID if we just finished reading in the page. */ -static void +void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, int set_flag_bits) { - Assert(buf == InProgressBuf); + int flags_on_entry; LockBufHdr(buf); + flags_on_entry = buf->flags; + + if (! (flags_on_entry & BM_AIO_IN_PROGRESS) ) + Assert( buf == InProgressBuf ); + Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR | BM_AIO_IN_PROGRESS); if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); buf->flags |= set_flag_bits; UnlockBufHdr(buf); + if (! (flags_on_entry & BM_AIO_IN_PROGRESS) ) { InProgressBuf = NULL; - LWLockRelease(buf->io_in_progress_lock); } +} /* * AbortBufferIO: Clean up any active buffer I/O after an error. --- src/backend/storage/buffer/buf_async.c.orig 2014-05-28 08:50:32.446571884 -0400 +++ src/backend/storage/buffer/buf_async.c 2014-05-28 16:45:43.014507687 -0400 @@ -0,0 +1,920 @@ +/*------------------------------------------------------------------------- + * + * buf_async.c + * buffer manager asynchronous disk read routines + * + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/buffer/buf_async.c + * + *------------------------------------------------------------------------- + */ +/* + * Principal entry points: + * + * BufStartAsync() -- start an asynchronous read of a block into a buffer and + * pin it so that no one can destroy it while this process is using it. + * + * BufCheckAsync() -- check completion of an asynchronous read + * and either claim buffer or discard it + * + * Private helper + * + * BufReleaseAsync() -- release the BAiocb resources used for an asynchronous read + * + * See also these files: + * bufmgr.c -- main buffer manager functions + * buf_init.c -- initialisation of resources + */ +#include "postgres.h" +#include +#include +#include +#include + +#include "catalog/catalog.h" +#include "common/relpath.h" +#include "executor/instrument.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "pgstat.h" +#include "postmaster/bgwriter.h" +#include "storage/buf_internals.h" +#include "storage/bufmgr.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/smgr.h" +#include "storage/standby.h" +#include "utils/rel.h" +#include "utils/resowner_private.h" + +/* + * GUC parameters + */ +int max_async_io_prefetchers; /* Maximum number of backends using asynchronous librt threads to read pages into our buffers */ + +/* Note: these two macros only work on shared buffers, not local ones! */ +#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) +#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) + +extern volatile struct BAiocbAnchor *BAiocbAnchr; /* anchor for all control blocks pertaining to aio */ +extern int numBufferAiocbs; /* total number of BufferAiocbs in pool */ +extern int maxGetBAiocbTries; /* max times we will try to get a free BufferAiocb */ +extern int maxRelBAiocbTries; /* max times we will try to release a BufferAiocb back to freelist */ +extern pid_t this_backend_pid; /* pid of this backend */ + +extern bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy); +extern void PinBuffer_Locked(volatile BufferDesc *buf); +extern Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence, + ForkNumber forkNum, BlockNumber blockNum, + ReadBufferMode mode, BufferAccessStrategy strategy, + bool *hit , int index_for_aio); +extern void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner); +extern void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty, + int set_flag_bits); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +int BufStartAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum , BufferAccessStrategy strategy); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ +int BufCheckAsync(SMgrRelation caller_smgr, Relation caller_reln, volatile BufferDesc *buf_desc + ,int intention , BufferAccessStrategy strategy , int index_for_aio , bool spinLockHeld , LWLockId PartitionLock ); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + +static struct BufferAiocb volatile * cachedBAiocb = (struct BufferAiocb*)0; /* one cached BufferAiocb for use with aio */ + +#ifdef USE_PREFETCH +/* BufReleaseAsync releases a BufferAiocb and returns 0 if successful else non-zero +** it *must* be called : +** EITHER with a valid BAiocb->BAiocbbufh -> buf_desc +** and that buf_desc must be spin-locked +** OR with BAiocb->BAiocbbufh == 0 +*/ +static int +BufReleaseAsync(struct BufferAiocb volatile * BAiocb) +{ + int LockTries; /* max times we will try to release the BufferAiocb */ + volatile struct BufferAiocb *BufferAiocbs; + struct BufferAiocb volatile * oldFreeBAiocb; /* old value of FreeBAiocbs */ + + int failed = 1; /* by end of this function, non-zero will indicate if we failed to return the BAiocb */ + + + if ( ( BAiocb == (struct BufferAiocb*)0 ) + || ( BAiocb == (struct BufferAiocb*)BAIOCB_OCCUPIED ) + || ( ((unsigned long)BAiocb) & 0x1 ) + ) { + elog(ERROR, + "AIO control block corruption on release of aiocb %p - invalid BAiocb" + ,BAiocb); + } + else + if ( (0 == BAiocb->BAiocbDependentCount) /* no dependents */ + && ((struct BufferAiocb*)BAIOCB_OCCUPIED == BAiocb->BAiocbnext) /* not already on freelist */ + ) { + + if ((struct sbufdesc*)0 != BAiocb->BAiocbbufh) { /* if a buffer was attached */ + volatile BufferDesc *buf_desc = BAiocb->BAiocbbufh; + + /* spinlock held so instead of TerminateBufferIO(buf, false , 0); ... */ + if (buf_desc->flags & BM_AIO_PREFETCH_PIN_BANKED) { /* if a pid banked the pin */ + buf_desc->freeNext = -(BAiocb->pidOfAio); /* then remember which pid */ + } + else if (buf_desc->freeNext <= FREENEXT_BAIOCB_ORIGIN) { + buf_desc->freeNext = FREENEXT_NOT_IN_LIST; /* disconnect BufferAiocb from buf_desc */ + } + buf_desc->flags &= ~BM_AIO_IN_PROGRESS; + } + + BAiocb->BAiocbbufh = (struct sbufdesc *)0; /* disconnect buf_desc from BufferAiocb */ + BAiocb->pidOfAio = 0; /* clean */ + LockTries = maxRelBAiocbTries; /* max times we will try to release the BufferAiocb */ + do { + register long long int dividend , remainder; + + /* retrieve old value of FreeBAiocbs */ + BAiocb->BAiocbnext = oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs; /* old value of FreeBAiocbs */ + + /* this is a volatile value unprotected by any lock, so must validate it; + ** safest is to verify that it is identical to one of the BufferAiocbs + ** to do so, verify by direct division that its address offset from first control block + ** is an integral multiple of the control block size + ** that lies within the range [ 0 , (numBufferAiocbs-1) ] + */ + BufferAiocbs = BAiocbAnchr->BufferAiocbs; + remainder = ((long long int)oldFreeBAiocb - (long long int)BufferAiocbs) + % (long long int)(sizeof(struct BufferAiocb)); + failed = (int)remainder; + if (!failed) { + dividend = ((long long int)oldFreeBAiocb - (long long int)BufferAiocbs) + / (long long int)(sizeof(struct BufferAiocb)); + failed = ( (dividend < 0 ) || ( dividend >= numBufferAiocbs) ); + if (!failed) { + if (__sync_bool_compare_and_swap (&(BAiocbAnchr->FreeBAiocbs), oldFreeBAiocb, BAiocb)) { + LockTries = 0; /* end the do loop */ + + goto cheering; /* cant simply break because then failed would be set incorrectly */ + } + } + } + /* if we reach here, we have failed and failed is set to -1 */ + + cheering: ; + + if ( LockTries > 1 ) { + sched_yield(); /* yield to another process, (hopefully a backend) */ + } + } while (LockTries-- > 0); + + if (failed) { +#ifdef LOG_RELBAIOCB_DEPLETION + elog(LOG, + "BufReleaseAsync:AIO control block %p unreleased after tries= %d\n" + ,BAiocb,maxRelBAiocbTries); +#endif /* LOG_RELBAIOCB_DEPLETION */ + } + + } + else + elog(LOG, + "BufReleaseAsync:AIO control block %p either has dependents= %d or is already on freelist %p or has no buf_header %p\n" + ,BAiocb , BAiocb->BAiocbDependentCount , BAiocb->BAiocbnext , BAiocb->BAiocbbufh); + return failed; +} + +/* try using asynchronous aio_read to prefetch into a buffer +** return code : +** 0 if started successfully +** -1 if failed for some reason +** 1+PrivateRefCount if we found desired buffer in buffer pool +** +** There is a harmless race condition here : +** two different backends may both arrive here simultaneously +** to prefetch the same buffer. This is not unlikely when a syncscan is in progress. +** . One will acquire the buffer and issue the smgrstartaio +** . Other will find the buffer on return from ReadBuffer_common with hit = true +** Only the first task has a pin on the buffer since ReadBuffer_common knows not to get a pin +** on a found buffer in prefetch mode. +** Therefore - the second task must simply abandon the prefetch if it finds the buffer in the buffer pool. +** +** if we fail to acquire a BAiocb because of concurrent theft from freelist by other backend, +** retry up to maxGetBAiocbTries times provided that there actually was at least one BAiocb on the freelist. +*/ +int +BufStartAsync(Relation reln, ForkNumber forkNum, BlockNumber blockNum , BufferAccessStrategy strategy) { + + int retcode = -1; + struct BufferAiocb volatile * BAiocb = (struct BufferAiocb*)0; /* BufferAiocb for use with aio */ + int smgrstartaio_rc = -1; /* retcode from smgrstartaio */ + bool do_unpin_buffer = false; /* unpin must be deferred until after buffer descriptor is unlocked */ + Buffer buf_id; + bool hit = false; + volatile BufferDesc *buf_desc = (BufferDesc *)0; + + int LockTries; /* max times we will try to get a free BufferAiocb */ + + struct BufferAiocb volatile * oldFreeBAiocb; /* old value of FreeBAiocbs */ + struct BufferAiocb volatile * newFreeBAiocb; /* new value of FreeBAiocbs */ + + + /* return immediately if no async io resources */ + if (numBufferAiocbs > 0) { + buf_id = (Buffer)0; + + if ( (struct BAiocbAnchor *)0 != BAiocbAnchr ) { + + volatile struct BufferAiocb *BufferAiocbs; + + if ((struct BufferAiocb*)0 != cachedBAiocb) { /* any cached BufferAiocb ? */ + BAiocb = cachedBAiocb; /* yes use it */ + cachedBAiocb = BAiocb->BAiocbnext; + BAiocb->BAiocbnext = (struct BufferAiocb*)BAIOCB_OCCUPIED; /* mark as not on freelist */ + BAiocb->BAiocbDependentCount = 0; /* no dependent yet */ + BAiocb->BAiocbbufh = (struct sbufdesc*)0; + BAiocb->pidOfAio = 0; + } else { + + LockTries = maxGetBAiocbTries; /* max times we will try to get a free BufferAiocb */ + do { + register long long int dividend = -1 , remainder; + /* check if we have a free BufferAiocb */ + + oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs; /* old value of FreeBAiocbs */ + + /* check if we have a free BufferAiocb */ + + /* BAiocbAnchr->FreeBAiocbs is a volatile value unprotected by any lock, + ** and use of compare-and-swap to add and remove items from the list has + ** two potential pitfalls, both relating to the fact that we must + ** access data de-referenced from this pointer before the compare-and-swap. + ** 1) The value we load may be corrupt, e.g. mixture of bytes from + ** two different values, so must validate it; + ** safest is to verify that it is identical to one of the BufferAiocbs. + ** to do so, verify by direct division that its address offset from + ** first control block is an integral multiple of the control block size + ** that lies within the range [ 0 , (numBufferAiocbs-1) ] + ** Thus we completely prevent this pitfall. + ** 2) The content of the item's next pointer may have changed between the + ** time we de-reference it and the time of the compare-and-swap. + ** Thus even though the compare-and-swap succeeds, we might set the + ** new head of the freelist to an invalid value (either a free item + ** that is not the first in the free chain - resulting only in + ** loss of the orphaned free items, or, much worse, an in-use item). + ** In practice this is extremely unlikely because of the implied huge delay + ** in this window interval in this (current) process. Here are two scenarios: + ** legend: + ** P0 - this (current) process, P1, P2 , ... other processes + ** content of freelist shown as BAiocbAnchr->FreeBAiocbs -> first item -> 2nd item ... + ** @[X] means address of X + ** | timeline of window of exposure to problems + ** successive lines in chronological order content of freelist + ** 2.1 P0 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I0] F -> I0 -> I1 -> I2 -> I3 ... + ** P0 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I1] F -> I0 -> I1 -> I2 -> I3 ... + ** | P1 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I0] F -> I0 -> I1 -> I2 -> I3 ... + ** | P1 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I1] F -> I0 -> I1 -> I2 -> I3 ... + ** | P1 swap-remove I0, place I1 at head of list F -> I1 -> I2 -> I3 ... + ** | P2 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I1] F -> I1 -> I2 -> I3 ... + ** | P2 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I2] F -> I1 -> I2 -> I3 ... + ** | P2 swap-remove I1, place I2 at head of list F -> I2 -> I3 ... + ** | P1 complete aio, replace I0 at head of list F -> I0 -> I2 -> I3 ... + ** P0 swap-remove I0, place I1 at head of list F -> I1 IS IN USE !! CORRUPT !! + ** compare-and-swap succeeded but the value of newFreeBAiocb was stale + ** and had become in-use during the window. + ** 2.2 P0 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I0] F -> I0 -> I1 -> I2 -> I3 ... + ** P0 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I1] F -> I0 -> I1 -> I2 -> I3 ... + ** | P1 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I0] F -> I0 -> I1 -> I2 -> I3 ... + ** | P1 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I1] F -> I0 -> I1 -> I2 -> I3 ... + ** | P1 swap-remove I0, place I1 at head of list F -> I1 -> I2 -> I3 ... + ** | P2 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I1] F -> I1 -> I2 -> I3 ... + ** | P2 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I2] F -> I1 -> I2 -> I3 ... + ** | P2 swap-remove I1, place I2 at head of list F -> I2 -> I3 ... + ** | P3 access oldFreeBAiocb = BAiocbAnchr->FreeBAiocbs = @[I2] F -> I2 -> I3 ... + ** | P3 access newFreeBAiocb = oldFreeBAiocb->BAiocbnext = @[I3] F -> I2 -> I3 ... + ** | P3 swap-remove I2, place I3 at head of list F -> I3 ... + ** | P2 complete aio, replace I1 at head of list F -> I1 -> I3 ... + ** | P3 complete aio, replace I2 at head of list F -> I2 -> I1 -> I3 ... + ** | P1 complete aio, replace I0 at head of list F -> I0 -> I2 -> I1 -> I3 ... + ** P0 swap-remove I0, place I1 at head of list F -> I1 -> I3 ... ! I2 is orphaned ! + ** compare-and-swap succeeded but the value of newFreeBAiocb was stale + ** and had moved further down the free list during the window. + ** Unfortunately, we cannot prevent this pitfall but we can detect it (after the fact), + ** by checking that the next pointer of the item we have just removed for our use still points to the same item. + ** This test is not subject to any timing or uncertainty since : + ** . The fact that the compare-and-swap succeeded implies that the item we removed + ** was defintely on the freelist (at the head) when it was removed, + ** and therefore cannot be in use, and therefore its next pointer is no longer volatile. + ** . Although pointers of the anchor and items on the freelist are volatile, + ** the addresses of items never change - they are in an allocated array and never move. + ** E.g. in the above two scenarios, the test is that I0.next still -> I1, + ** and this is true if and only if the second item on the freelist is + ** still the same at the end of the window as it was at the start of the window. + ** Note that we do not insist that it did not change during the window, + ** only that it is still the correct new head of freelist. + ** If this test fails, we abort immediately as the subsystem is damaged and cannot be repaired. + ** Note that at least one aio must have been issued *and* completed during the window + ** for this to occur, and since the window is just one single machine instruction, + ** it is very unlikely in practice. + */ + BufferAiocbs = BAiocbAnchr->BufferAiocbs; + remainder = ((long long int)oldFreeBAiocb - (long long int)BufferAiocbs) + % (long long int)(sizeof(struct BufferAiocb)); + if (remainder == 0) { + dividend = ((long long int)oldFreeBAiocb - (long long int)BufferAiocbs) + / (long long int)(sizeof(struct BufferAiocb)); + } + if ( (remainder == 0) + && ( (dividend >= 0 ) && ( dividend < numBufferAiocbs) ) + ) + { + newFreeBAiocb = oldFreeBAiocb->BAiocbnext; /* tentative new value is second on free list */ + /* Here we are in the exposure window referred to in the above comments, + ** so moving along rapidly ... + */ + if (__sync_bool_compare_and_swap (&(BAiocbAnchr->FreeBAiocbs), oldFreeBAiocb, newFreeBAiocb)) { /* did we get it ? */ + /* We have successfully swapped head of freelist pointed to by oldFreeBAiocb off the list; + ** Here we check that the item we just placed at head of freelist, pointed to by newFreeBAiocb, + ** is the right one + ** + ** also check that the BAiocb we have acquired was not in use + ** i.e. that scenario 2.1 above did not occur just before our compare-and-swap + ** The test is that the BAiocb is not in use. + ** + ** in one hypothetical case, + ** we can be certain that there is no corruption - + ** the case where newFreeBAiocb == 0 and oldFreeBAiocb->BAiocbnext != BAIOCB_OCCUPIED - + ** i.e. we have set the freelist to empty but we have a baiocb chained from ours. + ** in this case our comp_swap removed all BAiocbs from the list (including ours) + ** so the others chained from ours are either orphaned (no harm done) + ** or in use by another backend and will eventually be returned (fine). + */ + if ((struct BufferAiocb *)0 == newFreeBAiocb) { + if ((struct BufferAiocb *)BAIOCB_OCCUPIED == oldFreeBAiocb->BAiocbnext) { + goto baiocb_corruption; + } else if ((struct BufferAiocb *)0 != oldFreeBAiocb->BAiocbnext) { + elog(LOG, + "AIO control block inconsistency on acquiring aiocb %p - its next free %p may be orphaned (no corruption has occurred)" + ,oldFreeBAiocb , oldFreeBAiocb->BAiocbnext); + } + } else { + /* case of newFreeBAiocb not null - so must check more carefully ... */ + remainder = ((long long int)newFreeBAiocb - (long long int)BufferAiocbs) + % (long long int)(sizeof(struct BufferAiocb)); + dividend = ((long long int)newFreeBAiocb - (long long int)BufferAiocbs) + / (long long int)(sizeof(struct BufferAiocb)); + + if ( (newFreeBAiocb != oldFreeBAiocb->BAiocbnext) + || (remainder != 0) + || ( (dividend < 0 ) || ( dividend >= numBufferAiocbs) ) + ) { + goto baiocb_corruption; + } + } + BAiocb = oldFreeBAiocb; + BAiocb->BAiocbnext = (struct BufferAiocb*)BAIOCB_OCCUPIED; /* mark as not on freelist */ + BAiocb->BAiocbDependentCount = 0; /* no dependent yet */ + BAiocb->BAiocbbufh = (struct sbufdesc*)0; + BAiocb->pidOfAio = 0; + + LockTries = 0; /* end the do loop */ + + } + } + + if ( LockTries > 1 ) { + sched_yield(); /* yield to another process, (hopefully a backend) */ + } + } while ( ((struct BufferAiocb*)0 == BAiocb) /* did not get a BAiocb */ + && ((struct BufferAiocb*)0 != oldFreeBAiocb) /* there was a free BAiocb */ + && (LockTries-- > 0) /* told to retry */ + ); + } + } + + if ( BAiocb != (struct BufferAiocb*)0 ) { + /* try an async io */ + BAiocb->BAiocbthis.aio_fildes = -1; /* necessary to ensure any thief realizes aio not yet started */ + BAiocb->pidOfAio = this_backend_pid; + + /* now try to acquire a buffer : + ** note - ReadBuffer_common returns hit=true if the block is found in the buffer pool, + ** in which case there is no need to prefetch. + ** otherwise ReadBuffer_common pins returned buffer and calls StartBufferIO + ** and StartBufferIO : + ** . sets buf_desc->freeNext to negative of ( index of the aiocb in the BufferAiocbs array + 3 ) + ** . sets BAiocb->BAiocbbufh -> buf_desc + ** and in this case the buffer spinlock is held. + ** This is essential as no other task must issue any intention with respect + ** to the buffer until we have started the aio_read. + ** Also note that ReadBuffer_common handles enlarging the ResourceOwner buffer list as needed + ** so I dont need to do that + */ + buf_id = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence, + forkNum, blockNum + ,RBM_NOREAD_FOR_PREFETCH /* tells ReadBuffer not to do any read, just alloc buf */ + ,strategy , &hit , (FREENEXT_BAIOCB_ORIGIN - (BAiocb - (BAiocbAnchr->BufferAiocbs)))); + buf_desc = &BufferDescriptors[buf_id-1]; /* find buffer descriptor */ + + /* normally hit will be false as presumably it was not in the pool + ** when our caller looked - but it could be there now ... + */ + if (hit) { + /* see earlier comments - we must abandon the prefetch */ + retcode = 1 + PrivateRefCount[buf_id]; + BAiocb->BAiocbbufh = (struct sbufdesc *)0; /* indicator that we must release the BAiocb before exiting */ + } else + if ( (buf_id > 0) && ((BufferDesc *)0 != buf_desc) && (buf_desc == BAiocb->BAiocbbufh) ) { + /* buff descriptor header lock should be held. + ** However, just to be safe , now validate that + ** we are still the owner and no other task already stole it. + */ + + buf_desc->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; /* ensure no banked pin */ + /* there should not be any other pid waiting on this buffer + ** so check both of BM_VALID and BM_PIN_COUNT_WAITER are not set + */ + if ( ( !(buf_desc->flags & (BM_VALID|BM_PIN_COUNT_WAITER) ) ) + && (buf_desc->flags & BM_AIO_IN_PROGRESS) + && ((FREENEXT_BAIOCB_ORIGIN - (BAiocb - (BAiocbAnchr->BufferAiocbs))) == buf_desc->freeNext) /* it is still mine */ + && (-1 == BAiocb->BAiocbthis.aio_fildes) /* no thief stole it */ + && (0 == BAiocb->BAiocbDependentCount) /* no dependent */ + ) { + /* we have an empty buffer for our use */ + + BAiocb->BAiocbthis.aio_buf = (void *)(BufHdrGetBlock(buf_desc)); /* Location of actual buffer. */ + + /* note - there is no need to register self as an dependent of BAiocb + ** as we shall not unlock buf_desc before we free the BAiocb + */ + + /* smgrstartaio retcode is returned in smgrstartaio_rc - + ** it indicates whether started or not + */ + smgrstartaio(reln->rd_smgr, forkNum, blockNum , (char *)&(BAiocb->BAiocbthis) , &smgrstartaio_rc ); + + if (smgrstartaio_rc == 0) { + retcode = 0; + buf_desc->flags |= BM_AIO_PREFETCH_PIN_BANKED; /* bank the pin for next use by this task */ + /* we did not register register self as an dependent of BAiocb so no need to unregister */ + } else { + /* failed - return the BufferAiocb to the free list */ + do_unpin_buffer = true; /* unpin must be deferred until after buffer descriptor is unlocked */ + /* spinlock held so instead of TerminateBufferIO(buf_desc, false , 0); ... */ + buf_desc->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR | BM_AIO_IN_PROGRESS | BM_AIO_PREFETCH_PIN_BANKED | BM_VALID); + /* we did not register register self as an dependent of BAiocb so no need to unregister */ + + /* return the BufferAiocb to the free list */ + if ( + BufReleaseAsync(BAiocb) + ) { /* failed ? */ + BAiocb->BAiocbnext = cachedBAiocb; /* then ... */ + cachedBAiocb = BAiocb; /* ... cache it */ + } + + pgBufferUsage.aio_read_failed++; + smgrstartaio_rc = 1; /* to distinguish from aio not even attempted */ + } + } + else { + /* buffer was stolen or in use by other task - return the BufferAiocb to the free list */ + do_unpin_buffer = true; /* unpin must be deferred until after buffer descriptor is unlocked */ + } + + UnlockBufHdr(buf_desc); + if (do_unpin_buffer) { + if (smgrstartaio_rc >= 0) { /* if aio was attempted */ + TerminateBufferIO(buf_desc, false , 0); + } + UnpinBuffer(buf_desc, true); + } + } + else { + BAiocb->BAiocbbufh = (struct sbufdesc *)0; /* indicator that we must release the BAiocb before exiting */ + } + + if ((struct sbufdesc*)0 == BAiocb->BAiocbbufh) { /* we did not associate a buffer */ + /* so return the BufferAiocb to the free list */ + if ( + BufReleaseAsync(BAiocb) + ) { /* failed ? */ + BAiocb->BAiocbnext = cachedBAiocb; /* then ... */ + cachedBAiocb = BAiocb; /* ... cache it */ + } + } + } + } + + return retcode; + + baiocb_corruption:; + elog(PANIC, + "AIO control block corruption on acquiring aiocb %p - its next free %p conflicts with new freelist pointer %p which may be invalid (corruption may have occurred)" + ,oldFreeBAiocb , oldFreeBAiocb->BAiocbnext , newFreeBAiocb); +} +#endif /* USE_PREFETCH */ +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + +/* + * BufCheckAsync -- act upon caller's intention regarding a shared buffer, + * primarily in connection with any async io in progress on the buffer. + * class subvalue intention has two main classes and some subvalues within those : + * +ve 1 . want <=> caller wants the buffer, + * wait for in-progress aio and then always pin + * -ve . reject <=> caller does not want the buffer, + * if there are no dependents, then cancel the aio + * -1, -2 , -3 , ... (see below) and then optionally unpin + * Used when there may have been a previous fetch or prefetch. + * + * buffer is assumed to be an existing member of the shared buffer pool + * as returned by BufTableLookup. + * if AIO in progress, then : + * . terminate AIO, waiting for completion if +ve intention, else without waiting + * . if the AIO had already completed successfully, then mark buffer valid + * . pin/unpin as requested + * + * +ve intention indicates that buffer must be pinned : + * if the strategy parameter is null, then use the PinBuffer_Locked optimization + * to pin and unlock in one operation. But always update buffer usage count. + * + * -ve intention indicates whether and how to unpin : + * BUF_INTENTION_REJECT_KEEP_PIN -1 pin already held, do not unpin, (caller wants to keep it) + * BUF_INTENTION_REJECT_OBTAIN_PIN -2 obtain pin, caller wants it for same buffer + * BUF_INTENTION_REJECT_FORGET -3 unpin and tell resource owner to forget + * BUF_INTENTION_REJECT_NOADJUST -4 unpin and call ResourceOwnerForgetBuffer myself + * instead of telling UnpinBuffer to adjust CurrentResource owner + * (quirky simulation of ReleaseBuffer logic) + * BUF_INTENTION_REJECT_UNBANK -5 unpin only if pin banked by caller + * The behaviour for the -ve case is based on that of ReleaseBuffer, adding handling of async io. + * + * pin/unpin action must take account of whether this backend hold a "disposable" pin on the particular buffer. + * A "disposable" pin is a pin acquired by buffer manager without caller knowing, such as : + * when required to safeguard an async AIO - pin can be held across multiple bufmgr calls + * when required to safeguard waiting for an async AIO - pin acquired and released within this function + * if a disposable pin is held, then : + * if a new pin is requested, the disposable pin must be retained (redeemed) and any flags relating to it unset + * if an unpin is requested, then : + * if either no AIO in progress or this backend did not initiate the AIO + * then the disposable pin must be dropped (redeemed) and any flags relating to it unset + * else log warning and do nothing + * i.e. in either case, there is no longer a disposable pin after this function has completed. + * Note that if intention is BUF_INTENTION_REJECT_UNBANK, + * then caller expects there to be a disposable banked pin + * and if there isn't one, we do nothing + * for all other intentions, if there is no disposable pin, we pin/unpin normally. + * + * index_for_aio indicates the BAiocb to be used for next aio (see PrefetchBuffer) + * BufFreelistLockHeld indicates whether freelistlock is held + * spinLockHeld indicates whether buffer header spinlock is held + * PartitionLock is the buffer partition lock to be used + * + * return code (meaningful ONLY if intention is +ve) indicates validity of buffer : + * -1 buffer is invalid and failed PageHeaderIsValid check + * 0 buffer is not valid + * 1 buffer is valid + * 2 buffer is valid but tag changed - (so content does not match the relation block that caller expects) + */ +int +BufCheckAsync(SMgrRelation caller_smgr, Relation caller_reln, BufferDesc volatile * buf_desc, int intention , BufferAccessStrategy strategy , int index_for_aio , bool spinLockHeld , LWLockId PartitionLock ) +{ + + int retcode = BUF_INTENT_RC_INVALID_NO_AIO; + bool valid = false; + + BufferTag origTag = buf_desc->tag; /* original identity of selected buffer */ + +#ifdef USE_PREFETCH + int smgrcompleteaio_rc; /* retcode from smgrcompleteaio */ + SMgrRelation smgr = caller_smgr; + int aio_successful = -1; /* did the aio_read succeed ? -1 = no aio, 0 unsuccessful , 1 successful */ + BufFlags flags_on_entry; /* for debugging - can be printed in gdb */ + int freeNext_on_entry; /* for debugging - can be printed in gdb */ + int BAiocbDependentCount_after_aio_finished = -1; /* for debugging - can be printed in gdb */ + bool disposable_pin = false; /* this backend had a disposable pin on entry or pins the buffer while waiting for aio_read to complete */ + bool pin_already_banked_by_me; /* buffer is already pinned by me and redeemable */ + int local_intention; +#endif /* USE_PREFETCH */ + + + +#ifdef USE_PREFETCH + if (!spinLockHeld) { + /* lock buffer header */ + LockBufHdr(buf_desc); + } + + flags_on_entry = buf_desc->flags; + freeNext_on_entry = buf_desc->freeNext; + pin_already_banked_by_me = + ( (flags_on_entry & BM_AIO_PREFETCH_PIN_BANKED) + && ( ( (flags_on_entry & BM_AIO_IN_PROGRESS) ? ( ((BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - freeNext_on_entry))->pidOfAio ) + : (-(freeNext_on_entry)) ) == this_backend_pid ) + ); + + if (pin_already_banked_by_me) { + if (PrivateRefCount[buf_desc->buf_id] == 0) { /* but do we actually have a pin ?? */ + /* this is an anomalous situation - somehow our disposable pin was lost without us noticing + ** if AIO is in progress and we started it, + ** then this is disastrous - two backends might both issue IO on same buffer + ** otherwise, it is harmless, and simply means we have no disposable pin, + ** but we must update flags to "notice" the fact now + */ + if (flags_on_entry & BM_AIO_IN_PROGRESS) { + elog(ERROR, "BufCheckAsync : AIO control block issuer of aio_read lost pin with BM_AIO_IN_PROGRESS on buffer %d rel=%s, blockNum=%u, flags 0x%X refcount=%u intention= %d" + ,buf_desc->buf_id,relpathbackend(buf_desc->tag.rnode, InvalidBackendId, buf_desc->tag.forkNum) + ,buf_desc->tag.blockNum, flags_on_entry, buf_desc->refcount ,intention); + } else { + elog(LOG, "BufCheckAsync : AIO control block issuer of aio_read lost pin on buffer %d rel=%s, blockNum=%u, with flags 0x%X refcount=%u intention= %d" + ,buf_desc->buf_id,relpathbackend(buf_desc->tag.rnode, InvalidBackendId, buf_desc->tag.forkNum) + ,buf_desc->tag.blockNum, flags_on_entry, buf_desc->refcount ,intention); + buf_desc->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; /* redeem the banked pin */ + /* since AIO not in progress, disconnect the buffer from banked pin */ + buf_desc->freeNext = FREENEXT_NOT_IN_LIST; /* and forget the bank client */ + pin_already_banked_by_me = false; + } + } else { + disposable_pin = true; + } + } + + /* the case of BUF_INTENTION_REJECT_UNBANK is handled specially : + ** if this backend has a banked pin, then proceed just as for BUF_INTENTION_REJECT_FORGET + ** else the call is a no-op -- unlock buf header and return immediately + */ + local_intention = intention; + if (intention == BUF_INTENTION_REJECT_UNBANK) { + if (pin_already_banked_by_me) { + local_intention = BUF_INTENTION_REJECT_FORGET; + } else { + goto unlock_buf_header; /* code following the unlock will do nothing since local_intention still set to BUF_INTENTION_REJECT_UNBANK */ + } + } + +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + /* we do not expect that BM_AIO_IN_PROGRESS is set without freeNext identifying the BAiocb */ + if ( (buf_desc->flags & BM_AIO_IN_PROGRESS) && (buf_desc->freeNext == FREENEXT_NOT_IN_LIST) ) { + + elog(ERROR, "BufCheckAsync : found BM_AIO_IN_PROGRESS without a BAiocb on buffer %d rel=%s, blockNum=%u, flags %X refcount=%u" + ,buf_desc->buf_id,relpathbackend(buf_desc->tag.rnode, InvalidBackendId, buf_desc->tag.forkNum) + ,buf_desc->tag.blockNum, buf_desc->flags, buf_desc->refcount); + } + /* check whether aio in progress */ + if ( ( (struct BAiocbAnchor *)0 != BAiocbAnchr ) + && (buf_desc->flags & BM_AIO_IN_PROGRESS) + && (buf_desc->freeNext <= FREENEXT_BAIOCB_ORIGIN) /* has a valid BAiocb */ + && ((FREENEXT_BAIOCB_ORIGIN - buf_desc->freeNext) < numBufferAiocbs) /* double-check */ + ) { /* this is aio */ + struct BufferAiocb volatile * BAiocb = (BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf_desc->freeNext); /* BufferAiocb associated with this aio */ + if ((struct BufferAiocb*)BAIOCB_OCCUPIED == BAiocb->BAiocbnext) { /* ensure BAiocb is occupied */ + aio_successful = 0; /* tentatively the aio_read did not succeed */ + retcode = BUF_INTENT_RC_INVALID_AIO; + + if (smgr == NULL) { + if (caller_reln == NULL) { + smgr = smgropen(buf_desc->tag.rnode, InvalidBackendId); + } else { + smgr = caller_reln->rd_smgr; + } + } + + /* assert that this AIO is not using the same BufferAiocb as the one caller asked us to use */ + if ((index_for_aio < 0) && (index_for_aio == buf_desc->freeNext)) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("AIO control block index %d to be used by %p already in use by %p" + ,index_for_aio, buf_desc, BAiocb->BAiocbbufh))); + } + + /* Call smgrcompleteaio only if either we want buffer or there are no dependents. + ** In the other case of reject and there are dependents, + ** then one of them will do it. + */ + if ( (local_intention > 0) || (0 == BAiocb->BAiocbDependentCount) ) { + if (local_intention > 0) { + /* wait for in-progress aio and then pin + ** OR if I did not issue the aio and do not have a pin + ** then pin now before waiting to ensure the buffer does not become unpinned while I wait + ** we may potentially wait for io to complete + ** so release buf header lock so that others may also wait here + */ + BAiocb->BAiocbDependentCount++; /* register self as dependent */ + if (PrivateRefCount[buf_desc->buf_id] == 0) { /* if this buffer not pinned by me */ + disposable_pin = true; /* this backend has pinned the buffer while waiting for aio_read to complete */ + PinBuffer_Locked(buf_desc); + } else { + UnlockBufHdr(buf_desc); + } + LWLockRelease(PartitionLock); + + smgrcompleteaio_rc = 1; /* tell smgrcompleteaio to wait */ + } else { + smgrcompleteaio_rc = 0; /* tell smgrcompleteaio to cancel */ + } + + smgrcompleteaio( smgr , (char *)&(BAiocb->BAiocbthis) , &smgrcompleteaio_rc ); + if ( (smgrcompleteaio_rc == 0) || (smgrcompleteaio_rc == 1) ) { + aio_successful = 1; + } + + /* statistics */ + if (local_intention > 0) { + if (smgrcompleteaio_rc == 0) { + /* completed successfully and did not have to wait */ + pgBufferUsage.aio_read_ontime++; + } else if (smgrcompleteaio_rc == 1) { + /* completed successfully and did have to wait */ + pgBufferUsage.aio_read_waited++; + } else { + /* bad news - read failed and so buffer not usable + ** the buffer is still pinned so unpin and proceed with "not found" case + */ + pgBufferUsage.aio_read_failed++; + } + + /* regain locks and handle the validity of the buffer and intention regarding it */ + LWLockAcquire(PartitionLock, LW_SHARED); + LockBufHdr(buf_desc); + BAiocb->BAiocbDependentCount--; /* unregister self as dependent */ + } else { + pgBufferUsage.aio_read_wasted++; /* regardless of whether aio_successful */ + } + + + if (local_intention > 0) { + /* verify the buffer is still ours and has same identity + ** There is one slightly tricky point here - + ** if there are other dependents, then each of them will perform this same check + ** this is unavoidable as the correct setting of retcode and the BM_VALID flag + ** is required by each dependent, so we may not leave it to the last one to do it. + ** It should not do any harm and easier to let them all do it than try to avoid. + */ + if ((FREENEXT_BAIOCB_ORIGIN - (BAiocb - (BAiocbAnchr->BufferAiocbs))) == buf_desc->freeNext) { /* it is still mine */ + + if (aio_successful) { + /* validate page header. If valid, then mark the buffer as valid */ + if (PageIsVerified((Page)(BufHdrGetBlock(buf_desc)) , ((BAiocb->BAiocbthis).aio_offset/BLCKSZ))) { + buf_desc->flags |= BM_VALID; + if (BUFFERTAGS_EQUAL(origTag , buf_desc->tag)) { + retcode = BUF_INTENT_RC_VALID; + } else { + retcode = BUF_INTENT_RC_CHANGED_TAG; + } + } else { + retcode = BUF_INTENT_RC_BADPAGE; + } + } + } + } + + BAiocbDependentCount_after_aio_finished = BAiocb->BAiocbDependentCount; + + /* if no dependents, then disconnect the BAiocb and update buffer header */ + if (BAiocbDependentCount_after_aio_finished == 0 ) { + + + /* return the BufferAiocb to the free list */ + buf_desc->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR | BM_AIO_IN_PROGRESS); + if ( + BufReleaseAsync(BAiocb) + ) { /* failed ? */ + BAiocb->BAiocbnext = cachedBAiocb; /* then ... */ + cachedBAiocb = BAiocb; /* ... cache it */ + } + } + + } + } + } +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + + /* note whether buffer is valid before unlocking spinlock */ + valid = ((buf_desc->flags & BM_VALID) != 0); + + /* if there was a disposable pin on entry to this function (i.e. marked in buffer flags) + ** then unmark it - refer to prologue comments talking about : + ** if a disposable pin is held, then : + ** ... + ** i.e. in either case, there is no longer a disposable pin after this function has completed. + */ + if (pin_already_banked_by_me) { + buf_desc->flags &= ~BM_AIO_PREFETCH_PIN_BANKED; /* redeem the banked pin */ + /* if AIO not in progress, then disconnect the buffer from BAiocb and/or banked pin */ + if (!(buf_desc->flags & BM_AIO_IN_PROGRESS)) { + buf_desc->freeNext = FREENEXT_NOT_IN_LIST; /* and forget the bank client */ + } + /********** for debugging ***************** + else elog(LOG, "BufCheckAsync : found BM_AIO_IN_PROGRESS when redeeming banked pin on buffer %d rel=%s, blockNum=%u, flags %X refcount=%u" + ,buf_desc->buf_id,relpathbackend(buf_desc->tag.rnode, InvalidBackendId, buf_desc->tag.forkNum) + ,buf_desc->tag.blockNum, buf_desc->flags, buf_desc->refcount); + ********** for debugging *****************/ + } + + /* If we are to obtain new pin, then use pin optimization - pin and unlock. + ** However, if the caller is the same backend who issued the aio_read, + ** then he ought to have obtained the pin at that time and must not acquire + ** a "second" one since this is logically the same read - he would have obtained + ** a single pin if using synchronous read and we emulate that behaviour. + ** Its important to understand that the caller is not aware that he already obtained a pin - + ** because calling PrefetchBuffer did not imply a pin - + ** so we must track that via the pidOfAio field in the BAiocb. + ** And to add one further complication : + ** we assume that although PrefetchBuffer pinned the buffer, + ** it did not increment the usage count. + ** (because it called PinBuffer_Locked which does not do that) + ** so in this case, we must increment the usage count without double-pinning. + ** yes its ugly - and theres a goto! + */ + if ( (local_intention > 0) + || (local_intention == BUF_INTENTION_REJECT_OBTAIN_PIN) + ) { + + /* Make sure we will have room to remember the buffer pin */ + ResourceOwnerEnlargeBuffers(CurrentResourceOwner); + + /* here we really want a version of PinBuffer_Locked which updates usage count ... */ + if ( (PrivateRefCount[buf_desc->buf_id] == 0) /* if this buffer not previously pinned by me */ + || pin_already_banked_by_me /* or I had a disposable pin on entry */ + ) { + if (strategy == NULL) + { + if (buf_desc->usage_count < BM_MAX_USAGE_COUNT) + buf_desc->usage_count++; + } + else + { + if (buf_desc->usage_count == 0) + buf_desc->usage_count = 1; + } + } + + /* now pin buffer unless we have a disposable */ + if (!disposable_pin) { /* this backend neither banked pin for aio nor pinned the buffer while waiting for aio_read to complete */ + PinBuffer_Locked(buf_desc); + goto unlocked_it; + } + else + /* if this task previously issued the aio or pinned the buffer while waiting for aio_read to complete + ** and aio was unsuccessful, then release the pin + */ + if ( disposable_pin + && (aio_successful == 0) /* aio_read failed ? */ + ) { + UnpinBuffer(buf_desc, true); + } + } + + unlock_buf_header: + UnlockBufHdr(buf_desc); + unlocked_it: +#endif /* USE_PREFETCH */ + + /* now do any requested pin (if not done immediately above) or unpin/forget */ + if (local_intention == BUF_INTENTION_REJECT_KEEP_PIN) { + /* the caller is supposed to hold a pin already so there should be nothing to do ... */ + if (PrivateRefCount[buf_desc->buf_id] == 0) { + elog(LOG, "request to keep pin on unpinned buffer %d",buf_desc->buf_id); + + valid = PinBuffer(buf_desc, strategy); + } + } + else + if ( ( (local_intention == BUF_INTENTION_REJECT_FORGET) + || (local_intention == BUF_INTENTION_REJECT_NOADJUST) + ) + && (PrivateRefCount[buf_desc->buf_id] > 0) /* if this buffer was previously pinned by me ... */ + ) { + + if (local_intention == BUF_INTENTION_REJECT_FORGET) { + UnpinBuffer(buf_desc, true); /* ... then release the pin */ + } else + if (local_intention == BUF_INTENTION_REJECT_NOADJUST) { + /* following code moved from ReleaseBuffer : + ** not sure why we can't simply UnpinBuffer(buf_desc, true) + ** but better leave it the way it was + */ + ResourceOwnerForgetBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(buf_desc)); + if (PrivateRefCount[buf_desc->buf_id] > 1) { + PrivateRefCount[buf_desc->buf_id]--; + } else { + UnpinBuffer(buf_desc, false); + } + } + } + + /* if retcode has not been set to one of the unusual conditions + ** namely failed header validity or tag changed + ** then the setting of valid takes precedence + ** over whatever retcode may be currently set to. + */ + if ( ( (retcode == BUF_INTENT_RC_INVALID_NO_AIO) || (retcode == BUF_INTENT_RC_INVALID_AIO) ) && valid) { + retcode = BUF_INTENT_RC_VALID; + } else + if ((retcode == BUF_INTENT_RC_VALID) && (!valid)) { + if (aio_successful == -1) { /* aio not attempted */ + retcode = BUF_INTENT_RC_INVALID_NO_AIO; + } else { + retcode = BUF_INTENT_RC_INVALID_AIO; + } + } + + return retcode; +} --- src/backend/storage/buffer/buf_init.c.orig 2014-05-28 08:29:09.330829297 -0400 +++ src/backend/storage/buffer/buf_init.c 2014-05-28 16:45:43.038507784 -0400 @@ -13,15 +13,89 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include +#include #include "storage/bufmgr.h" #include "storage/buf_internals.h" - +#include /* for getenv() */ +#include /* for strtoul() */ BufferDesc *BufferDescriptors; char *BufferBlocks; -int32 *PrivateRefCount; +int32 *PrivateRefCount; /* array of counts per buffer of how many times this task has pinned this buffer */ + +volatile struct BAiocbAnchor *BAiocbAnchr = (struct BAiocbAnchor *)0; /* anchor for all control blocks pertaining to aio */ +int CountInuseBAiocbs(void); /* keep compiler happy */ +void ReportFreeBAiocbs(void); /* keep compiler happy */ + +extern int MaxConnections; /* max number of client connections which postmaster will allow */ +int numBufferAiocbs = 0; /* total number of BufferAiocbs in pool (0 <=> no async io) */ +int hwmBufferAiocbs = 0; /* high water mark of in-use BufferAiocbs in pool + ** (not required to be accurate, kindly maintained for us somehow by postmaster) + */ + +#ifdef USE_PREFETCH +unsigned int prefetch_dbOid = 0; /* database oid of relations on which prefetching to be done - 0 means all */ +unsigned int prefetch_bitmap_scans = 1; /* boolean whether to prefetch bitmap heap scans */ +unsigned int prefetch_heap_scans = 0; /* boolean whether to prefetch non-bitmap heap scans */ +unsigned int prefetch_sequential_index_scans = 0; /* boolean whether to prefetch sequential-access non-bitmap index scans */ +unsigned int prefetch_index_scans = 256; /* boolean whether to prefetch non-bitmap index scans also numeric size of pfch_list */ +unsigned int prefetch_btree_heaps = 1; /* boolean whether to prefetch heap pages in _bt_next for non-bitmap index scans */ +#endif /* USE_PREFETCH */ + +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +int maxGetBAiocbTries = 1; /* max times we will try to get a free BufferAiocb */ +int maxRelBAiocbTries = 1; /* max times we will try to release a BufferAiocb back to freelist */ + +/* locking protocol for manipulating the BufferAiocbs and FreeBAiocbs list : +** 1. ownership of a BufferAiocb : +** to gain ownership of a BufferAiocb, a task must +** EITHER remove it from FreeBAiocbs (it is now temporary owner and no other task can find it) +** if decision is to attach it to a buffer descriptor header, then +** . lock the buffer descriptor header +** . check NOT flags & BM_AIO_IN_PROGRESS +** . attach to buffer descriptor header +** . increment the BufferAiocb.dependent_count +** . unlock the buffer descriptor header +** and ownership scope is from lock to unlock +*** OR locate it by dereferencing the pointer in a buffer descriptor, +** in which case : +** . lock the buffer descriptor header +** . check flags & BM_AIO_IN_PROGRESS +** . increment the BufferAiocb.dependent_count +** . if decision is to return to FreeBAiocbs, +** then (with buffer descriptor header still locked) +** . turn off BM_AIO_IN_PROGRESS +** . IF the BufferAiocb.dependent_count == 1 (I am sole dependent) +** . THEN +** . . decrement the BufferAiocb.dependent_count +** . return to FreeBAiocbs (see below) +** . unlock the buffer descriptor header +** and ownership scope is from lock to either return to FreeBAiocbs or unlock +** 2. adding and removing from FreeBAiocbs : +** two alternative methods - controlled by conditional macro definition LOCK_BAIOCB_FOR_GET_REL +** 2.1 LOCK_BAIOCB_FOR_GET_REL is defined - use a lock +** . lock BufFreelistLock exclusive +** . add / remove from FreeBAiocbs +** . unlock BufFreelistLock exclusive +** advantage of this method - never fails to add or remove +** 2.2 LOCK_BAIOCB_FOR_GET_REL is not defined - use compare_and_swap +** . retrieve the current Freelist pointer and validate +** . compare_and_swap on/off the FreeBAiocb list +** . unlock BufFreelistLock exclusive +** advantage of this method - never waits +** to avoid losing a FreeBAiocbs, save it in a process-local cache and reuse +*/ +#else /* not USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + /* this dummy structure is to ensure that references to these fields in other bufmgr runtime code + ** that is not conditionally ifdefd on USE_AIO_ATOMIC_BUILTIN_COMP_SWAP compiles and runs correctly + */ + struct BufferAiocb dummy_BAiocbAnchr = { (struct BufferAiocb*)0 , (struct BufferAiocb*)0 }; +int maxGetBAiocbTries = -1; /* max times we will try to get a free BufferAiocb */ +int maxRelBAiocbTries = -1; /* max times we will try to release a BufferAiocb back to freelist */ +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ /* * Data Structures: @@ -73,7 +147,14 @@ void InitBufferPool(void) { bool foundBufs, - foundDescs; + foundDescs + , foundAiocbs + ; +#if defined(USE_PREFETCH) || defined(USE_AIO_ATOMIC_BUILTIN_COMP_SWAP) + char *envvarpointer = (char *)0; /* might point to an environment variable string */ + char *charptr; +#endif /* USE_PREFETCH */ + BufferDescriptors = (BufferDesc *) ShmemInitStruct("Buffer Descriptors", @@ -83,6 +164,142 @@ InitBufferPool(void) ShmemInitStruct("Buffer Blocks", NBuffers * (Size) BLCKSZ, &foundBufs); + +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + BAiocbAnchr = (struct BAiocbAnchor *)0; /* anchor for all control blocks pertaining to aio */ + if (max_async_io_prefetchers < 0) { /* negative value indicates to initialize to something sensible during buf_init */ + max_async_io_prefetchers = MaxConnections/6; /* default allows for average of MaxConnections/6 concurrent prefetchers - reasonable ??? */ + } + + if ((target_prefetch_pages > 0) && (max_async_io_prefetchers > 0)) { + int ix; + volatile struct BufferAiocb *BufferAiocbs; + volatile struct BufferAiocb * volatile FreeBAiocbs; + + numBufferAiocbs = (target_prefetch_pages*max_async_io_prefetchers); /* target_prefetch_pages per prefetcher */ + BAiocbAnchr = (struct BAiocbAnchor *) + ShmemInitStruct("Buffer Aiocbs", + sizeof(struct BAiocbAnchor) + (numBufferAiocbs * sizeof(struct BufferAiocb)), &foundAiocbs); + if (BAiocbAnchr) { + BufferAiocbs = BAiocbAnchr->BufferAiocbs = (struct BufferAiocb*)(((char *)BAiocbAnchr) + sizeof(struct BAiocbAnchor)); + FreeBAiocbs = (struct BufferAiocb*)0; + for (ix = (numBufferAiocbs-1); ix>=0; ix--) { + (BufferAiocbs+ix)->BAiocbnext = FreeBAiocbs; /* init the free list, last one -> 0 */ + (BufferAiocbs+ix)->BAiocbbufh = (struct sbufdesc*)0; + (BufferAiocbs+ix)->BAiocbDependentCount = 0; + (BufferAiocbs+ix)->pidOfAio = 0; + FreeBAiocbs = (BufferAiocbs+ix); + + } + BAiocbAnchr->FreeBAiocbs = FreeBAiocbs; + envvarpointer = getenv("PG_MAX_GET_BAIOCB_TRIES"); + if ( (envvarpointer != (char *)0) + && ( ('1' <= *envvarpointer) && ('9' >= *envvarpointer) ) + ) { + maxGetBAiocbTries = strtol(envvarpointer, 0, 10); + } + envvarpointer = getenv("PG_MAX_REL_BAIOCB_TRIES"); + if ( (envvarpointer != (char *)0) + && ( ('1' <= *envvarpointer) && ('9' >= *envvarpointer) ) + ) { + maxRelBAiocbTries = strtol(envvarpointer, 0, 10); + } + + /* init the aio subsystem max number of threads and max number of requests + ** max number of threads <--> max_async_io_prefetchers + ** max number of requests <--> numBufferAiocbs = (target_prefetch_pages*max_async_io_prefetchers) + ** there is no return code so we just hope. + */ + smgrinitaio(max_async_io_prefetchers , numBufferAiocbs); + + } + } +#else /* not USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + /* this dummy structure is to ensure that references to these fields in other bufmgr runtime code + ** that is not conditionally ifdefd on USE_AIO_ATOMIC_BUILTIN_COMP_SWAP compiles and runs correctly + */ + BAiocbAnchr = &dummy_BAiocbAnchr; +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + + +#ifdef USE_PREFETCH + envvarpointer = getenv("PG_TRY_PREFETCHING_FOR_BITMAP"); + if (envvarpointer != (char *)0) { + if ( ('N' == *envvarpointer) || ('n' == *envvarpointer) ) { + prefetch_bitmap_scans = 0; + } else + if ( ('Y' == *envvarpointer) || ('y' == *envvarpointer) ) { + prefetch_bitmap_scans = 1; + } + } + envvarpointer = getenv("PG_TRY_PREFETCHING_FOR_ISCAN"); + if (envvarpointer != (char *)0) { + if ( ('N' == *envvarpointer) || ('n' == *envvarpointer) ) { + prefetch_index_scans = 0; + } else + if ( ('Y' == *envvarpointer) || ('y' == *envvarpointer) ) { + prefetch_index_scans = 1; + } else + if ( ('1' <= *envvarpointer) && ('9' >= *envvarpointer) ) { + prefetch_index_scans = strtol(envvarpointer, &charptr, 10); + if (charptr && (',' == *charptr)) { /* optional sequential prefetch in index scans */ + charptr++; /* following the comma ... */ + if ( ('Y' == *charptr) || ('y' == *charptr) || ('1' == *charptr) ) { + prefetch_sequential_index_scans = 1; + } + } + } + /* if prefeching for ISCAN, then we require size of pfch_list to be at least target_prefetch_pages */ + if ( (prefetch_index_scans > 0) + && (prefetch_index_scans < target_prefetch_pages) + ) { + prefetch_index_scans = target_prefetch_pages; + } + } + envvarpointer = getenv("PG_TRY_PREFETCHING_FOR_BTREE"); + if (envvarpointer != (char *)0) { + if ( ('N' == *envvarpointer) || ('n' == *envvarpointer) ) { + prefetch_btree_heaps = 0; + } else + if ( ('Y' == *envvarpointer) || ('y' == *envvarpointer) ) { + prefetch_btree_heaps = 1; + } + } + envvarpointer = getenv("PG_TRY_PREFETCHING_FOR_HEAP"); + if (envvarpointer != (char *)0) { + if ( ('N' == *envvarpointer) || ('n' == *envvarpointer) ) { + prefetch_heap_scans = 0; + } else + if ( ('Y' == *envvarpointer) || ('y' == *envvarpointer) ) { + prefetch_heap_scans = 1; + } + } + envvarpointer = getenv("PG_PREFETCH_DBOID"); + if ( (envvarpointer != (char *)0) + && ( ('1' <= *envvarpointer) && ('9' >= *envvarpointer) ) + ) { + errno = 0; /* required in order to distinguish error from 0 */ + prefetch_dbOid = (unsigned int)strtoul((const char *)envvarpointer, 0, 10); + if (errno) { + prefetch_dbOid = 0; + } + } + elog(LOG, "prefetching initialised with target_prefetch_pages= %d " + ", max_async_io_prefetchers= %d implying aio concurrency= %d " + ", prefetching_for_bitmap= %s " + ", prefetching_for_heap= %s " + ", prefetching_for_iscan= %d with sequential_index_page_prefetching= %s " + ", prefetching_for_btree= %s" + ,target_prefetch_pages ,max_async_io_prefetchers ,numBufferAiocbs + ,(prefetch_bitmap_scans ? "Y" : "N") + ,(prefetch_heap_scans ? "Y" : "N") + ,prefetch_index_scans + ,(prefetch_sequential_index_scans ? "Y" : "N") + ,(prefetch_btree_heaps ? "Y" : "N") + ); +#endif /* USE_PREFETCH */ + + if (foundDescs || foundBufs) { /* both should be present or neither */ @@ -176,3 +393,80 @@ BufferShmemSize(void) return size; } + +/* imprecise count of number of in-use BAiocbs at any time + * we scan the array read-only without latching so are subject to unstable result + * (but since the array is in well-known contiguous storage, + * we are not subject to segmentation violation) + * This function may be called at any time and just does its best + * return the count of what we counted. + */ +int +CountInuseBAiocbs(void) +{ + volatile struct BufferAiocb *BAiocb; + int count = 0; + int ix; + + + if (BAiocbAnchr != (struct BAiocbAnchor *)0 ) { + BAiocb = BAiocbAnchr->BufferAiocbs; /* start of list */ + + for (ix = (numBufferAiocbs-1); ix>=0; ix--) { + if ((struct BufferAiocb*)BAIOCB_OCCUPIED == (BAiocb+ix)->BAiocbnext) { /* not on freelist ? */ + count++; + } + } + } + return count; +} + +/* + * report how many free BAiocbs at shutdown + * DO NOT call this while backends are actively working!! + * this report is useful when compare_and_swap method used (see above) + * as it can be used to deduce how many BAiocbs were in process-local caches - + * (original_number_on_freelist_at_startup - this_reported_number_at_shutdown) + */ +void +ReportFreeBAiocbs(void) +{ + volatile struct BufferAiocb *BAiocb; + volatile struct BufferAiocb *BufferAiocbs; + int count = 0; + int fx , ix; + + + if (BAiocbAnchr != (struct BAiocbAnchor *)0 ) { + BAiocb = BAiocbAnchr->FreeBAiocbs; /* start of free list */ + BufferAiocbs = BAiocbAnchr->BufferAiocbs; + + for (ix = (numBufferAiocbs-1); ix>=0; ix--) { + (BufferAiocbs+ix)->BAiocbDependentCount = 0; /* use this as marker for finding it on freelist */ + } + for (fx = (numBufferAiocbs-1); ( (fx>=0) && ( BAiocb != (struct BufferAiocb*)0 ) ); fx--) { + + /* check if it is a valid BufferAiocb */ + for (ix = (numBufferAiocbs-1); ix>=0; ix--) { + if ((BufferAiocbs+ix) == BAiocb) { /* is it this one ? */ + break; + } + } + if (ix >= 0) { + if (BAiocb->BAiocbDependentCount) { /* seen it already ? */ + elog(LOG, "ReportFreeBAiocbs closed cycle on AIO control block freelist %p" + ,BAiocb); + fx = 0; /* give up at this point */ + } + BAiocb->BAiocbDependentCount = 1; /* use this as marker for finding it on freelist */ + count++; + BAiocb = BAiocb->BAiocbnext; + } else { + elog(LOG, "ReportFreeBAiocbs invalid item on AIO control block freelist %p" + ,BAiocb); + fx = 0; /* give up at this point */ + } + } + } + elog(LOG, "ReportFreeBAiocbs AIO control block list : poolsize= %d in-use-hwm= %d final-free= %d" ,numBufferAiocbs , hwmBufferAiocbs , count); +} --- src/backend/storage/smgr/md.c.orig 2014-05-28 08:29:09.338829292 -0400 +++ src/backend/storage/smgr/md.c 2014-05-28 16:45:43.070507912 -0400 @@ -647,6 +647,62 @@ mdprefetch(SMgrRelation reln, ForkNumber } +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +/* + * mdinitaio() -- init the aio subsystem max number of threads and max number of requests + */ +void +mdinitaio(int max_aio_threads, int max_aio_num) +{ + FileInitaio( max_aio_threads, max_aio_num ); +} + +/* + * mdstartaio() -- start aio read of the specified block of a relation + */ +void +mdstartaio(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum , char *aiocbp , int *retcode ) +{ +#ifdef USE_PREFETCH + off_t seekpos; + MdfdVec *v; + int local_retcode; + + v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL); + + seekpos = (off_t) BLCKSZ *(blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE); + + local_retcode = FileStartaio(v->mdfd_vfd, seekpos, BLCKSZ , aiocbp); + if (retcode) { + *retcode = local_retcode; + } +#endif /* USE_PREFETCH */ +} + + +/* + * mdcompleteaio() -- complete aio read of the specified block of a relation + * on entry, *inoutcode should indicate : + * . non-0 <=> check if complete and wait if not + * . 0 <=> cancel io immediately + */ +void +mdcompleteaio( char *aiocbp , int *inoutcode ) +{ +#ifdef USE_PREFETCH + int local_retcode; + + local_retcode = FileCompleteaio(aiocbp, (inoutcode ? *inoutcode : 0)); + if (inoutcode) { + *inoutcode = local_retcode; + } +#endif /* USE_PREFETCH */ +} +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + + /* * mdread() -- Read the specified block from a relation. */ --- src/backend/storage/smgr/smgr.c.orig 2014-05-28 08:29:09.338829292 -0400 +++ src/backend/storage/smgr/smgr.c 2014-05-28 16:45:43.094508008 -0400 @@ -49,6 +49,12 @@ typedef struct f_smgr BlockNumber blocknum, char *buffer, bool skipFsync); void (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + void (*smgr_initaio) (int max_aio_threads, int max_aio_num); + void (*smgr_startaio) (SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum , char *aiocbp , int *retcode ); + void (*smgr_completeaio) ( char *aiocbp , int *inoutcode ); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ void (*smgr_read) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, @@ -66,7 +72,11 @@ typedef struct f_smgr static const f_smgr smgrsw[] = { /* magnetic disk */ {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, - mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync, + mdprefetch +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + ,mdinitaio, mdstartaio, mdcompleteaio +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + , mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync, mdpreckpt, mdsync, mdpostckpt } }; @@ -612,6 +622,35 @@ smgrprefetch(SMgrRelation reln, ForkNumb (*(smgrsw[reln->smgr_which].smgr_prefetch)) (reln, forknum, blocknum); } +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +/* + * smgrinitaio() -- initialize the aio subsystem max number of threads and max number of requests + */ +void +smgrinitaio(int max_aio_threads, int max_aio_num) +{ + (*(smgrsw[0].smgr_initaio)) ( max_aio_threads, max_aio_num ); +} + +/* + * smgrstartaio() -- Initiate aio read of the specified block of a relation. + */ +void +smgrstartaio(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum , char *aiocbp , int *retcode ) +{ + (*(smgrsw[reln->smgr_which].smgr_startaio)) (reln, forknum, blocknum , aiocbp , retcode ); +} + +/* + * smgrcompleteaio() -- Complete aio read of the specified block of a relation. + */ +void +smgrcompleteaio(SMgrRelation reln, char *aiocbp , int *inoutcode ) +{ + (*(smgrsw[reln->smgr_which].smgr_completeaio)) ( aiocbp , inoutcode ); +} +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + /* * smgrread() -- read a particular block from a relation into the supplied * buffer. --- src/backend/storage/file/fd.c.orig 2014-05-28 08:29:09.334829294 -0400 +++ src/backend/storage/file/fd.c 2014-05-28 16:45:43.122508122 -0400 @@ -77,6 +77,9 @@ #include "utils/guc.h" #include "utils/resowner_private.h" +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +#include "aio.h" +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ /* * We must leave some file descriptors free for system(), the dynamic loader, @@ -1239,6 +1242,10 @@ FileClose(File file) * We could add an implementation using libaio in the future; but note that * this API is inappropriate for libaio, which wants to have a buffer provided * to read into. + * Also note that a new, different implementation of asynchronous prefetch + * using librt, not libaio, is provided by the two functions following this one, + * FileStartaio and FileCompleteaio. These also require to have a buffer provided + * to read into, which the new async_io support provides. */ int FilePrefetch(File file, off_t offset, int amount) @@ -1266,6 +1273,139 @@ FilePrefetch(File file, off_t offset, in #endif } +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +/* + * FileInitaio - initialize the aio subsystem max number of threads and max number of requests + * input parms + * max_aio_threads; maximum number of threads + * max_aio_num; maximum number of concurrent aio read requests + * + * on linux, man page for the librt implemenation of aio_init() says : + * This function is a GNU extension. + * If your posix aio does not have it, then add the following line to + * src/include/pg_config_manual.h + * #define DONT_HAVE_AIO_INIT + * to render it as a no-op + */ +void +FileInitaio(int max_aio_threads, int max_aio_num ) +{ +#ifndef DONT_HAVE_AIO_INIT + struct aioinit aioinit_struct; /* structure to pass to aio_init */ + + aioinit_struct.aio_threads = max_aio_threads; /* maximum number of threads */ + aioinit_struct.aio_num = max_aio_num; /* maximum number of concurrent aio read requests */ + aioinit_struct.aio_idle_time = 1; /* we dont want to alter this but aio_init does not ignore it so set to the default */ + aio_init(&aioinit_struct); +#endif /* ndef DONT_HAVE_AIO_INIT */ + return; +} + +/* + * FileStartaio - initiate asynchronous read of a given range of the file. + * The logical seek position is unaffected. + * + * use standard posix aio (librt) + * ASSUME BufferAiocb.aio_buf already set to -> buffer by caller + * return 0 if successfully started, else non-zero + */ +int +FileStartaio(File file, off_t offset, int amount , char *aiocbp ) +{ + int returnCode; + struct aiocb *my_aiocbp = (struct aiocb *)aiocbp; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileStartaio: %d (%s) " INT64_FORMAT " %d", + file, VfdCache[file].fileName, + (int64) offset, amount)); + + returnCode = FileAccess(file); + if (returnCode >= 0) { + + my_aiocbp->aio_fildes = VfdCache[file].fd; + my_aiocbp->aio_lio_opcode = LIO_READ; + my_aiocbp->aio_nbytes = amount; + my_aiocbp->aio_offset = offset; + returnCode = aio_read(my_aiocbp); + } + + return returnCode; +} + +/* + * FileCompleteaio - complete asynchronous aio read + * normal_wait indicates whether to cancel or wait - + * 0 <=> cancel + * 1 <=> wait + * + * use standard posix aio (librt) + * return 0 if successfull and did not have to wait, + * 1 if successfull and had to wait, + * else x'ff' + */ +int +FileCompleteaio( char *aiocbp , int normal_wait ) +{ + int returnCode; + int aio_errno; + struct aiocb *my_aiocbp = (struct aiocb *)aiocbp; + const struct aiocb const*cblist[1]; + int fd; + struct timespec my_timeout = { 0 , 10000 }; + int max_polls; + + fd = my_aiocbp->aio_fildes; + cblist[0] = my_aiocbp; + returnCode = aio_errno = aio_error(my_aiocbp); + /* note that aio_error returns 0 if op already completed successfully */ + + /* first handle normal case of waiting for op to complete */ + if (normal_wait) { + while (aio_errno == EINPROGRESS) { + max_polls = 256; + my_timeout.tv_sec = 0; my_timeout.tv_nsec = 10000; + returnCode = aio_suspend(cblist , 1 , &my_timeout); + while ((returnCode < 0) && (EAGAIN == errno) && (max_polls-- > 0)) { + my_timeout.tv_sec = 0; my_timeout.tv_nsec = 10000; + returnCode = aio_suspend(cblist , 1 , &my_timeout); + } + returnCode = aio_errno = aio_error(my_aiocbp); + /* now return_code is from aio_error */ + if (returnCode == 0) { + returnCode = 1; /* successful but had to wait */ + } + } + if (aio_errno) { + elog(LOG, "FileCompleteaio: %d %d", fd, returnCode); + returnCode = 0xff; /* unsuccessful */ + } + } else { + if (aio_errno == EINPROGRESS) { + do { + max_polls = 256; + my_timeout.tv_sec = 0; my_timeout.tv_nsec = 10000; + returnCode = aio_cancel(fd, my_aiocbp); + while ((returnCode == AIO_NOTCANCELED) && (max_polls-- > 0)) { + my_timeout.tv_sec = 0; my_timeout.tv_nsec = 10000; + returnCode = aio_cancel(fd, my_aiocbp); + } + returnCode = aio_errno = aio_error(my_aiocbp); + } while (aio_errno == EINPROGRESS); + returnCode = 0xff; /* unsuccessful */ + } + if (returnCode != 0) + returnCode = 0xff; /* unsuccessful */ + } + + DO_DB(elog(LOG, "FileCompleteaio: %d %d", + fd, returnCode)); + + return returnCode; +} +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + int FileRead(File file, char *buffer, int amount) { --- src/backend/storage/lmgr/proc.c.orig 2014-05-28 08:29:09.338829292 -0400 +++ src/backend/storage/lmgr/proc.c 2014-05-28 16:45:43.146508219 -0400 @@ -52,6 +52,7 @@ #include "utils/timeout.h" #include "utils/timestamp.h" +extern pid_t this_backend_pid; /* pid of this backend */ /* GUC variables */ int DeadlockTimeout = 1000; @@ -361,6 +362,7 @@ InitProcess(void) MyPgXact->xid = InvalidTransactionId; MyPgXact->xmin = InvalidTransactionId; MyProc->pid = MyProcPid; + this_backend_pid = getpid(); /* pid of this backend */ /* backendId, databaseId and roleId will be filled in later */ MyProc->backendId = InvalidBackendId; MyProc->databaseId = InvalidOid; --- src/backend/access/heap/heapam.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/heap/heapam.c 2014-05-28 16:45:43.202508444 -0400 @@ -71,6 +71,28 @@ #include "utils/syscache.h" #include "utils/tqual.h" +#ifdef USE_PREFETCH +#include +#include +#include +#include + +#include "executor/instrument.h" + +extern unsigned int prefetch_dbOid; /* database oid of relations on which prefetching to be done - 0 means all */ +extern unsigned int prefetch_heap_scans; /* boolean whether to prefetch non-bitmap heap scans */ + +/* special values for scan->rs_prefetch_target indicating as follows : */ +#define PREFETCH_MAYBE 0xffffffff /* prefetch permitted but not yet in effect */ +#define PREFETCH_DISABLED 0xfffffffe /* prefetch disabled and not permitted */ +/* PREFETCH_WRAP_POINT indicates a pretcher who has reached the point where the scan would wrap - +** at this point the prefetcher runs on the spot until scan catches up. +** This *must* be < maximum valid setting of target_prefetch_pages aka effective_io_concurrency. +*/ +#define PREFETCH_WRAP_POINT 0x0fffffff + +#endif /* USE_PREFETCH */ + /* GUC variable */ bool synchronize_seqscans = true; @@ -115,6 +137,8 @@ static XLogRecPtr log_heap_new_cid(Relat static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified, bool *copy); +static void heap_unread_add(HeapScanDesc scan, BlockNumber blockno); +static void heap_unread_subtract(HeapScanDesc scan, BlockNumber blockno); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -292,9 +316,148 @@ initscan(HeapScanDesc scan, ScanKey key, * Currently, we don't have a stats counter for bitmap heap scans (but the * underlying bitmap index scans will be counted). */ - if (!scan->rs_bitmapscan) +#ifdef USE_PREFETCH + /* by default, no prefetching on any scan */ + scan->rs_prefetch_target = PREFETCH_DISABLED; /* tentatively disable */ + scan->rs_pfchblock = 0; /* scanner will reset this to be ahead of scan */ + scan->rs_Unread_Pfetched_base = (BlockNumber *)0; /* list of prefetched but unread blocknos */ + scan->rs_Unread_Pfetched_next = 0; /* next unread blockno */ + scan->rs_Unread_Pfetched_count = 0; /* number of valid unread blocknos */ +#endif /* USE_PREFETCH */ + if (!scan->rs_bitmapscan) { + pgstat_count_heap_scan(scan->rs_rd); +#ifdef USE_PREFETCH + /* bitmap scans do their own prefetching - + ** for others, set up prefetching now + */ + if ( prefetch_heap_scans + && (target_prefetch_pages > 0) + && (!RelationUsesLocalBuffers(scan->rs_rd)) + ) { + /* prefetch_dbOid may be set to a database Oid to specify only prefetch in that db */ + if ( ( (prefetch_dbOid > 0) + && (prefetch_dbOid == scan->rs_rd->rd_node.dbNode) + ) + || (prefetch_dbOid == 0) + ) { + scan->rs_prefetch_target = PREFETCH_MAYBE; /* permitted but let the scan decide */ + } + else { + } + } +#endif /* USE_PREFETCH */ + } +} + +/* add this blockno to list of prefetched and unread blocknos +** use the one identified by the (next+count|modulo circumference) index if it is unused, +** else search for the first available slot if there is one, +** else error. +*/ +static void +heap_unread_add(HeapScanDesc scan, BlockNumber blockno) +{ + BlockNumber *available_P; /* where to store new blockno */ + unsigned int Unread_Pfetched_index = scan->rs_Unread_Pfetched_next + + scan->rs_Unread_Pfetched_count; /* index of next unused slot */ + + /* caller is not supposed to pass InvalidBlockNumber but check anyway */ + if (blockno != InvalidBlockNumber) { + + /* ensure there is some room somewhere */ + if (scan->rs_Unread_Pfetched_count < target_prefetch_pages) { + + /* try the "next+count" one */ + if (Unread_Pfetched_index >= target_prefetch_pages) { + Unread_Pfetched_index -= target_prefetch_pages; /* modulo circumference */ + } + available_P = (scan->rs_Unread_Pfetched_base + Unread_Pfetched_index); /* where to store new blockno */ + if (*available_P == InvalidBlockNumber) { /* unused */ + goto store_blockno; + } else { + /* slow-search the entire list */ + for (Unread_Pfetched_index = 0; Unread_Pfetched_index < target_prefetch_pages; Unread_Pfetched_index++) { + available_P = (scan->rs_Unread_Pfetched_base + Unread_Pfetched_index); /* where to store new blockno */ + if (*available_P == InvalidBlockNumber) { /* unused */ + /* before storing this blockno, + ** since the next pointer did not locate an unused slot, + ** set it to one which is more likely to be so for the next time + */ + scan->rs_Unread_Pfetched_next = Unread_Pfetched_index; + goto store_blockno; + } + } + } + } + + /* if we reach here, either there was no available slot + ** or we thought there was one and didn't find any -- + */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("heap_unread_add overflowed list cannot add blockno %d", blockno))); + + return; + } + + store_blockno: + *available_P = blockno; + scan->rs_Unread_Pfetched_count++; /* update count */ + + return; +} + +/* remove specified blockno from list of prefetched and unread blocknos. +/* Usually this will be found at the rs_Unread_Pfetched_next item - +** else search for it. If not found, inore it - no error results. +*/ +static void +heap_unread_subtract(HeapScanDesc scan, BlockNumber blockno) +{ + unsigned int Unread_Pfetched_index = scan->rs_Unread_Pfetched_next; /* index of next unread blockno */ + BlockNumber *candidate_P; /* location of callers blockno - maybe */ + BlockNumber nextUnreadPfetched; + + /* caller is not supposed to pass InvalidBlockNumber but check anyway */ + if ( (blockno != InvalidBlockNumber) + && ( scan->rs_Unread_Pfetched_count > 0 ) /* if the list is not empty */ + ) { + + /* take modulo of the circumference. + ** actually rs_Unread_Pfetched_next should never exceed the circumference but check anyway. + */ + if (Unread_Pfetched_index >= target_prefetch_pages) { + Unread_Pfetched_index -= target_prefetch_pages; } + candidate_P = (scan->rs_Unread_Pfetched_base + Unread_Pfetched_index); + nextUnreadPfetched = *candidate_P; + + if ( nextUnreadPfetched == blockno ) { + goto remove_blockno; + } else { + /* slow-search the entire list */ + for (Unread_Pfetched_index = 0; Unread_Pfetched_index < target_prefetch_pages; Unread_Pfetched_index++) { + candidate_P = (scan->rs_Unread_Pfetched_base + Unread_Pfetched_index); /* where to store new blockno */ + if (*candidate_P == blockno) { /* unused */ + goto remove_blockno; + } + } + } + + remove_blockno: + *candidate_P = InvalidBlockNumber; + + scan->rs_Unread_Pfetched_next = (Unread_Pfetched_index+1); /* update next pfchd unread */ + if (scan->rs_Unread_Pfetched_next >= target_prefetch_pages) { + scan->rs_Unread_Pfetched_next = 0; + } + scan->rs_Unread_Pfetched_count--; /* update count */ + } + + return; +} + /* * heapgetpage - subroutine for heapgettup() @@ -304,7 +467,7 @@ initscan(HeapScanDesc scan, ScanKey key, * which tuples on the page are visible. */ static void -heapgetpage(HeapScanDesc scan, BlockNumber page) +heapgetpage(HeapScanDesc scan, BlockNumber page , BlockNumber prefetchHWM) { Buffer buffer; Snapshot snapshot; @@ -314,6 +477,10 @@ heapgetpage(HeapScanDesc scan, BlockNumb OffsetNumber lineoff; ItemId lpp; bool all_visible; +#ifdef USE_PREFETCH + int PrefetchBufferRc; /* indicates whether requested prefetch block already in a buffer and if pin count on buffer has been increased */ +#endif /* USE_PREFETCH */ + Assert(page < scan->rs_nblocks); @@ -336,6 +503,98 @@ heapgetpage(HeapScanDesc scan, BlockNumb RBM_NORMAL, scan->rs_strategy); scan->rs_cblock = page; +#ifdef USE_PREFETCH + + heap_unread_subtract(scan, page); + + /* maybe prefetch some pages starting with rs_pfchblock */ + if (scan->rs_prefetch_target >= 0) { /* prefetching enabled on this scan ? */ + int next_block_to_be_read = (page+1); /* next block to be read = lowest possible prefetchable block */ + int num_to_pfch_this_time; /* eventually holds the number of blocks to prefetch now */ + int prefetchable_range; /* size of the area ahead of the current prefetch position */ + + /* check if prefetcher reached wrap point and the scan has now wrapped */ + if ( (page == 0) && (scan->rs_prefetch_target == PREFETCH_WRAP_POINT) ) { + scan->rs_prefetch_target = 1; + scan->rs_pfchblock = next_block_to_be_read; + } else + if (scan->rs_pfchblock < next_block_to_be_read) { + scan->rs_pfchblock = next_block_to_be_read; /* next block to be prefetched must be ahead of one we just read */ + } + + /* now we know where we would start prefetching - + ** next question - if this is a sync scan, ensure we do not prefetch behind the HWM + ** debatable whether to require strict inequality or >= - >= works better in practice + */ + if ( (!scan->rs_syncscan) || (scan->rs_pfchblock >= prefetchHWM) ) { + + /* now we know where we will start prefetching - + ** next question - how many? + ** apply two limits : + ** 1. target prefetch distance + ** 2. number of available blocks ahead of us + */ + + /* 1. target prefetch distance */ + num_to_pfch_this_time = next_block_to_be_read + scan->rs_prefetch_target; /* page beyond prefetch target */ + num_to_pfch_this_time -= scan->rs_pfchblock; /* convert to offset */ + + /* first do prefetching up to our current limit ... + ** highest page number that a scan (pre)-fetches is scan->rs_nblocks-1 + ** note - prefetcher does not wrap a prefetch range - + ** instead just stop and then start again if and when main scan wraps + */ + if (scan->rs_pfchblock <= scan->rs_startblock) { /* if on second leg towards startblock */ + prefetchable_range = ((int)(scan->rs_startblock) - (int)(scan->rs_pfchblock)); + } + else { /* on first leg towards nblocks */ + prefetchable_range = ((int)(scan->rs_nblocks) - (int)(scan->rs_pfchblock)); + } + if (prefetchable_range > 0) { /* if theres a range to prefetch */ + + /* 2. number of available blocks ahead of us */ + if (num_to_pfch_this_time > prefetchable_range) { + num_to_pfch_this_time = prefetchable_range; + } + while (num_to_pfch_this_time-- > 0) { + PrefetchBufferRc = PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, scan->rs_pfchblock, scan->rs_strategy); + /* if pin acquired on buffer, then remember in case of future Discard */ + if (PrefetchBufferRc & PREFTCHRC_BUF_PIN_INCREASED) { + heap_unread_add(scan, scan->rs_pfchblock); + } + scan->rs_pfchblock++; + /* if syncscan and requested block was already in buffer pool, + ** this suggests that another scanner is ahead of us and we should advance + */ + if ( (scan->rs_syncscan) && (PrefetchBufferRc & PREFTCHRC_BLK_ALREADY_PRESENT) ) { + scan->rs_pfchblock++; + num_to_pfch_this_time--; + } + } + } + else { + /* we must not modify scan->rs_pfchblock here + ** because it is needed for possible DiscardBuffer at end of scan ... + ** ... instead ... + */ + scan->rs_prefetch_target = PREFETCH_WRAP_POINT; /* mark this prefetcher as waiting to wrap */ + } + + /* ... then adjust prefetching limit : by doubling on each iteration */ + if (scan->rs_prefetch_target == 0) { + scan->rs_prefetch_target = 1; + } + else { + scan->rs_prefetch_target *= 2; + if (scan->rs_prefetch_target > target_prefetch_pages) { + scan->rs_prefetch_target = target_prefetch_pages; + } + } + } + } +#endif /* USE_PREFETCH */ + + if (!scan->rs_pageatatime) return; @@ -452,6 +711,8 @@ heapgettup(HeapScanDesc scan, OffsetNumber lineoff; int linesleft; ItemId lpp; + BlockNumber prefetchHWM = 0; /* HWM of prefetch BlockNum for all participants in same sync scan */ + int ix; /* * calculate next starting lineoff, given scan direction @@ -470,7 +731,25 @@ heapgettup(HeapScanDesc scan, return; } page = scan->rs_startblock; /* first page */ - heapgetpage(scan, page); +#ifdef USE_PREFETCH + /* decide if we shall do prefetching : only if : + ** . prefetching enabled for this scan + ** . not a bitmap scan (which do their own) + ** . sufficient number of blocks - at least twice the target_prefetch_pages + */ + if ( (!scan->rs_bitmapscan) + && (scan->rs_prefetch_target >= PREFETCH_MAYBE) + && (scan->rs_nblocks > (2*target_prefetch_pages)) + ) { + scan->rs_prefetch_target = 1; /* do prefetching on forward non-bitmap scan */ + scan->rs_Unread_Pfetched_base = (BlockNumber *)palloc(sizeof(BlockNumber)*target_prefetch_pages); + /* Initialise the list */ + for (ix = 0; ix < target_prefetch_pages; ix++) { + *(scan->rs_Unread_Pfetched_base + ix) = InvalidBlockNumber; + } + } +#endif /* USE_PREFETCH */ + heapgetpage(scan, page, prefetchHWM); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; } @@ -516,7 +795,7 @@ heapgettup(HeapScanDesc scan, page = scan->rs_startblock - 1; else page = scan->rs_nblocks - 1; - heapgetpage(scan, page); + heapgetpage(scan, page, prefetchHWM); } else { @@ -557,7 +836,7 @@ heapgettup(HeapScanDesc scan, page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) - heapgetpage(scan, page); + heapgetpage(scan, page, prefetchHWM); /* Since the tuple was previously fetched, needn't lock page here */ dp = (Page) BufferGetPage(scan->rs_cbuf); @@ -660,8 +939,10 @@ heapgettup(HeapScanDesc scan, * a little bit backwards on every invocation, which is confusing. * We don't guarantee any specific ordering in general, though. */ - if (scan->rs_syncscan) - ss_report_location(scan->rs_rd, page); + if (scan->rs_syncscan) { + prefetchHWM = scan->rs_pfchblock; + ss_report_location(scan->rs_rd, page, &prefetchHWM); + } } /* @@ -671,6 +952,22 @@ heapgettup(HeapScanDesc scan, { if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); +#ifdef USE_PREFETCH + if ( (scan->rs_pfchblock > 0) && (scan->rs_cblock != InvalidBlockNumber) ) { + BlockNumber *Unread_Pfetched_base = scan->rs_Unread_Pfetched_base; + unsigned int Unread_Pfetched_next = scan->rs_Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count = scan->rs_Unread_Pfetched_count; + + /* we have prefetched but not read the range from cblock+1 to pfchblock-1 if that is non-empty */ + while ((Unread_Pfetched_count--) > 0) { + DiscardBuffer( scan->rs_rd, MAIN_FORKNUM, *(Unread_Pfetched_base+Unread_Pfetched_next)); + heap_unread_subtract(scan, *(Unread_Pfetched_base+Unread_Pfetched_next)); /* remove this blockno from list of prefetched and unread blocknos */ + Unread_Pfetched_next++; + if (Unread_Pfetched_next >= target_prefetch_pages) + Unread_Pfetched_next = 0; + } + } +#endif /* USE_PREFETCH */ scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; tuple->t_data = NULL; @@ -678,7 +975,7 @@ heapgettup(HeapScanDesc scan, return; } - heapgetpage(scan, page); + heapgetpage(scan, page, prefetchHWM); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); @@ -727,6 +1024,8 @@ heapgettup_pagemode(HeapScanDesc scan, OffsetNumber lineoff; int linesleft; ItemId lpp; + BlockNumber prefetchHWM = 0; /* HWM of prefetch BlockNum for all participants in same sync scan */ + int ix; /* * calculate next starting lineindex, given scan direction @@ -745,7 +1044,25 @@ heapgettup_pagemode(HeapScanDesc scan, return; } page = scan->rs_startblock; /* first page */ - heapgetpage(scan, page); +#ifdef USE_PREFETCH + /* decide if we shall do prefetching : only if : + ** . prefetching enabled for this scan + ** . not a bitmap scan (which do their own) + ** . sufficient number of blocks - at least twice the target_prefetch_pages + */ + if ( (!scan->rs_bitmapscan) + && (scan->rs_prefetch_target >= PREFETCH_MAYBE) + && (scan->rs_nblocks > (2*target_prefetch_pages)) + ) { + scan->rs_prefetch_target = 1; /* do prefetching on forward non-bitmap scan */ + scan->rs_Unread_Pfetched_base = (BlockNumber *)palloc(sizeof(BlockNumber)*target_prefetch_pages); + /* Initialise the list */ + for (ix = 0; ix < target_prefetch_pages; ix++) { + *(scan->rs_Unread_Pfetched_base + ix) = InvalidBlockNumber; + } + } +#endif /* USE_PREFETCH */ + heapgetpage(scan, page, prefetchHWM); lineindex = 0; scan->rs_inited = true; } @@ -788,7 +1105,7 @@ heapgettup_pagemode(HeapScanDesc scan, page = scan->rs_startblock - 1; else page = scan->rs_nblocks - 1; - heapgetpage(scan, page); + heapgetpage(scan, page, prefetchHWM); } else { @@ -826,7 +1143,7 @@ heapgettup_pagemode(HeapScanDesc scan, page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) - heapgetpage(scan, page); + heapgetpage(scan, page, prefetchHWM); /* Since the tuple was previously fetched, needn't lock page here */ dp = (Page) BufferGetPage(scan->rs_cbuf); @@ -921,8 +1238,10 @@ heapgettup_pagemode(HeapScanDesc scan, * a little bit backwards on every invocation, which is confusing. * We don't guarantee any specific ordering in general, though. */ - if (scan->rs_syncscan) - ss_report_location(scan->rs_rd, page); + if (scan->rs_syncscan) { + prefetchHWM = scan->rs_pfchblock; + ss_report_location(scan->rs_rd, page, &prefetchHWM); + } } /* @@ -932,6 +1251,22 @@ heapgettup_pagemode(HeapScanDesc scan, { if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); +#ifdef USE_PREFETCH + if ( (scan->rs_pfchblock > 0) && (scan->rs_cblock != InvalidBlockNumber) ) { + BlockNumber *Unread_Pfetched_base = scan->rs_Unread_Pfetched_base; + unsigned int Unread_Pfetched_next = scan->rs_Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count = scan->rs_Unread_Pfetched_count; + + /* we have prefetched but not read the range from cblock+1 to pfchblock-1 if that is non-empty */ + while ((Unread_Pfetched_count--) > 0) { + DiscardBuffer( scan->rs_rd, MAIN_FORKNUM, *(Unread_Pfetched_base+Unread_Pfetched_next)); + heap_unread_subtract(scan, *(Unread_Pfetched_base+Unread_Pfetched_next)); /* remove this blockno from list of prefetched and unread blocknos */ + Unread_Pfetched_next++; + if (Unread_Pfetched_next >= target_prefetch_pages) + Unread_Pfetched_next = 0; + } + } +#endif /* USE_PREFETCH */ scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; tuple->t_data = NULL; @@ -939,7 +1274,7 @@ heapgettup_pagemode(HeapScanDesc scan, return; } - heapgetpage(scan, page); + heapgetpage(scan, page, prefetchHWM); dp = (Page) BufferGetPage(scan->rs_cbuf); lines = scan->rs_ntuples; @@ -1394,6 +1729,23 @@ void heap_rescan(HeapScanDesc scan, ScanKey key) { + +#ifdef USE_PREFETCH + if ( (scan->rs_pfchblock > 0) && (scan->rs_cblock != InvalidBlockNumber) ) { + BlockNumber *Unread_Pfetched_base = scan->rs_Unread_Pfetched_base; + unsigned int Unread_Pfetched_next = scan->rs_Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count = scan->rs_Unread_Pfetched_count; + + /* we have prefetched but not read the range from cblock+1 to pfchblock-1 if that is non-empty */ + while ((Unread_Pfetched_count--) > 0) { + DiscardBuffer( scan->rs_rd, MAIN_FORKNUM, *(Unread_Pfetched_base+Unread_Pfetched_next)); + heap_unread_subtract(scan, *(Unread_Pfetched_base+Unread_Pfetched_next)); /* remove this blockno from list of prefetched and unread blocknos */ + Unread_Pfetched_next++; + if (Unread_Pfetched_next >= target_prefetch_pages) + Unread_Pfetched_next = 0; + } + } +#endif /* USE_PREFETCH */ /* * unpin scan buffers */ @@ -1418,6 +1770,23 @@ heap_endscan(HeapScanDesc scan) { /* Note: no locking manipulations needed */ + +#ifdef USE_PREFETCH + if ( (scan->rs_pfchblock > 0) && (scan->rs_cblock != InvalidBlockNumber) ) { + BlockNumber *Unread_Pfetched_base = scan->rs_Unread_Pfetched_base; + unsigned int Unread_Pfetched_next = scan->rs_Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count = scan->rs_Unread_Pfetched_count; + + /* we have prefetched but not read the range from cblock+1 to pfchblock-1 if that is non-empty */ + while ((Unread_Pfetched_count--) > 0) { + DiscardBuffer( scan->rs_rd, MAIN_FORKNUM, *(Unread_Pfetched_base+Unread_Pfetched_next)); + heap_unread_subtract(scan, *(Unread_Pfetched_base+Unread_Pfetched_next)); /* remove this blockno from list of prefetched and unread blocknos */ + Unread_Pfetched_next++; + if (Unread_Pfetched_next >= target_prefetch_pages) + Unread_Pfetched_next = 0; + } + } +#endif /* USE_PREFETCH */ /* * unpin scan buffers */ @@ -1435,6 +1804,10 @@ heap_endscan(HeapScanDesc scan) if (scan->rs_strategy != NULL) FreeAccessStrategy(scan->rs_strategy); + if (scan->rs_Unread_Pfetched_base) { + pfree(scan->rs_Unread_Pfetched_base); + } + if (scan->rs_temp_snap) UnregisterSnapshot(scan->rs_snapshot); @@ -1464,7 +1837,6 @@ heap_endscan(HeapScanDesc scan) #define HEAPDEBUG_3 #endif /* !defined(HEAPDEBUGALL) */ - HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction) { @@ -6347,6 +6719,25 @@ heap_markpos(HeapScanDesc scan) void heap_restrpos(HeapScanDesc scan) { + + +#ifdef USE_PREFETCH + if ( (scan->rs_pfchblock > 0) && (scan->rs_cblock != InvalidBlockNumber) ) { + BlockNumber *Unread_Pfetched_base = scan->rs_Unread_Pfetched_base; + unsigned int Unread_Pfetched_next = scan->rs_Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count = scan->rs_Unread_Pfetched_count; + + /* we have prefetched but not read the range from cblock+1 to pfchblock-1 if that is non-empty */ + while ((Unread_Pfetched_count--) > 0) { + DiscardBuffer( scan->rs_rd, MAIN_FORKNUM, *(Unread_Pfetched_base+Unread_Pfetched_next)); + heap_unread_subtract(scan, *(Unread_Pfetched_base+Unread_Pfetched_next)); /* remove this blockno from list of prefetched and unread blocknos */ + Unread_Pfetched_next++; + if (Unread_Pfetched_next >= target_prefetch_pages) + Unread_Pfetched_next = 0; + } + } +#endif /* USE_PREFETCH */ + /* XXX no amrestrpos checking that ammarkpos called */ if (!ItemPointerIsValid(&scan->rs_mctid)) --- src/backend/access/heap/syncscan.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/heap/syncscan.c 2014-05-28 16:45:43.254508653 -0400 @@ -90,6 +90,7 @@ typedef struct ss_scan_location_t { RelFileNode relfilenode; /* identity of a relation */ BlockNumber location; /* last-reported location in the relation */ + BlockNumber prefetchHWM; /* high-water-mark of prefetched Blocknum */ } ss_scan_location_t; typedef struct ss_lru_item_t @@ -113,7 +114,7 @@ static ss_scan_locations_t *scan_locatio /* prototypes for internal functions */ static BlockNumber ss_search(RelFileNode relfilenode, - BlockNumber location, bool set); + BlockNumber location, bool set , BlockNumber *prefetchHWMp); /* @@ -160,6 +161,7 @@ SyncScanShmemInit(void) item->location.relfilenode.dbNode = InvalidOid; item->location.relfilenode.relNode = InvalidOid; item->location.location = InvalidBlockNumber; + item->location.prefetchHWM = InvalidBlockNumber; item->prev = (i > 0) ? (&scan_locations->items[i - 1]) : NULL; @@ -185,7 +187,7 @@ SyncScanShmemInit(void) * data structure. */ static BlockNumber -ss_search(RelFileNode relfilenode, BlockNumber location, bool set) +ss_search(RelFileNode relfilenode, BlockNumber location, bool set , BlockNumber *prefetchHWMp) { ss_lru_item_t *item; @@ -206,6 +208,22 @@ ss_search(RelFileNode relfilenode, Block { item->location.relfilenode = relfilenode; item->location.location = location; + /* if prefetch information requested, + ** then reconcile and either update or report back the new HWM. + */ + if (prefetchHWMp) + { + if ( (item->location.prefetchHWM == InvalidBlockNumber) + || (item->location.prefetchHWM < *prefetchHWMp) + ) + { + item->location.prefetchHWM = *prefetchHWMp; + } + else + { + *prefetchHWMp = item->location.prefetchHWM; + } + } } else if (set) item->location.location = location; @@ -252,7 +270,7 @@ ss_get_location(Relation rel, BlockNumbe BlockNumber startloc; LWLockAcquire(SyncScanLock, LW_EXCLUSIVE); - startloc = ss_search(rel->rd_node, 0, false); + startloc = ss_search(rel->rd_node, 0, false , 0); LWLockRelease(SyncScanLock); /* @@ -282,7 +300,7 @@ ss_get_location(Relation rel, BlockNumbe * same relfilenode. */ void -ss_report_location(Relation rel, BlockNumber location) +ss_report_location(Relation rel, BlockNumber location , BlockNumber *prefetchHWMp) { #ifdef TRACE_SYNCSCAN if (trace_syncscan) @@ -306,7 +324,7 @@ ss_report_location(Relation rel, BlockNu { if (LWLockConditionalAcquire(SyncScanLock, LW_EXCLUSIVE)) { - (void) ss_search(rel->rd_node, location, true); + (void) ss_search(rel->rd_node, location, true , prefetchHWMp); LWLockRelease(SyncScanLock); } #ifdef TRACE_SYNCSCAN --- src/backend/access/index/indexam.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/index/indexam.c 2014-05-28 16:45:43.298508831 -0400 @@ -79,6 +79,55 @@ #include "utils/tqual.h" +#ifdef USE_PREFETCH +bool BlocknotinBuffer(Buffer buffer, Relation relation, BlockNumber blockNum); +BlockNumber BlocknumOfBuffer(Buffer buffer); +void index_mark_or_evict_block(IndexScanDesc scan , BlockNumber blocknumber , int markit); + +extern unsigned int prefetch_index_scans; /* boolean whether to prefetch bitmap heap scans */ + +/* if specified block number is present in the prefetch array, +** then either mark it as not to be discarded or evict it according to input param +*/ +void index_mark_or_evict_block(IndexScanDesc scan , BlockNumber blocknumber , int markit) +{ + unsigned short int pfchx , pfchy , pfchz; /* indexes in BlockIdData array */ + + if ( scan->do_prefetch + && ((struct pfch_block_item*)0 != scan->pfch_block_item_list) + /* no need to check for scan->pfch_next < prefetch_index_scans + ** since we will do nothing if scan->pfch_used == 0 + */ + ) { + /* search the prefetch list to find if the block is a member */ + for (pfchx = 0; pfchx < scan->pfch_used; pfchx++) { + if (BlockIdGetBlockNumber(&(((scan->pfch_block_item_list)+pfchx)->pfch_blockid)) == blocknumber) { + if (markit) { + /* mark it as not to be discarded */ + ((scan->pfch_block_item_list)+pfchx)->pfch_discard &= ~PREFTCHRC_BUF_PIN_INCREASED; + } else { + /* shuffle all following the evictee to the left + ** and update next pointer if its element moves + */ + pfchy = (scan->pfch_used - 1); /* current rightmost */ + scan->pfch_used = pfchy; + + while (pfchy > pfchx) { + pfchz = pfchx + 1; + BlockIdCopy((&(((scan->pfch_block_item_list)+pfchx)->pfch_blockid)), (&(((scan->pfch_block_item_list)+pfchz)->pfch_blockid))); + ((scan->pfch_block_item_list)+pfchx)->pfch_discard = ((scan->pfch_block_item_list)+pfchz)->pfch_discard; + if (scan->pfch_next == pfchz) { + scan->pfch_next = pfchx; + } + pfchx = pfchz; /* advance */ + } + } + } + } + } +} +#endif /* USE_PREFETCH */ + /* ---------------------------------------------------------------- * macros used in index_ routines * @@ -253,6 +302,11 @@ index_beginscan(Relation heapRelation, */ scan->heapRelation = heapRelation; scan->xs_snapshot = snapshot; +#ifdef USE_PREFETCH + scan->do_prefetch = 0; /* no prefetching by default */ + scan->pfch_index_page_list = (struct pfch_index_pagelist*)0; + scan->pfch_block_item_list = (struct pfch_block_item*)0; +#endif /* USE_PREFETCH */ return scan; } @@ -277,6 +331,11 @@ index_beginscan_bitmap(Relation indexRel * up by RelationGetIndexScan. */ scan->xs_snapshot = snapshot; +#ifdef USE_PREFETCH + scan->do_prefetch = 0; /* no prefetching by default */ + scan->pfch_index_page_list = (struct pfch_index_pagelist*)0; + scan->pfch_block_item_list = (struct pfch_block_item*)0; +#endif /* USE_PREFETCH */ return scan; } @@ -311,6 +370,9 @@ index_beginscan_internal(Relation indexR Int32GetDatum(nkeys), Int32GetDatum(norderbys))); + scan->heap_tids_seen = 0; + scan->heap_tids_fetched = 0; + return scan; } @@ -342,6 +404,12 @@ index_rescan(IndexScanDesc scan, /* Release any held pin on a heap page */ if (BufferIsValid(scan->xs_cbuf)) { +#ifdef USE_PREFETCH + /* if specified block number is present in the prefetch array, then evict it */ + if (scan->do_prefetch) { + index_mark_or_evict_block(scan , BlocknumOfBuffer(scan->xs_cbuf) , 0); + } +#endif /* USE_PREFETCH */ ReleaseBuffer(scan->xs_cbuf); scan->xs_cbuf = InvalidBuffer; } @@ -373,10 +441,30 @@ index_endscan(IndexScanDesc scan) /* Release any held pin on a heap page */ if (BufferIsValid(scan->xs_cbuf)) { +#ifdef USE_PREFETCH + /* if specified block number is present in the prefetch array, then evict it */ + if (scan->do_prefetch) { + index_mark_or_evict_block(scan , BlocknumOfBuffer(scan->xs_cbuf) , 0); + } +#endif /* USE_PREFETCH */ ReleaseBuffer(scan->xs_cbuf); scan->xs_cbuf = InvalidBuffer; } +#ifdef USE_PREFETCH + /* discard prefetched but unread buffers */ + if ( scan->do_prefetch + && ((struct pfch_block_item*)0 != scan->pfch_block_item_list) + ) { + unsigned short int pfchx; /* index in BlockIdData array */ + for (pfchx = 0; pfchx < scan->pfch_used; pfchx++) { + if (((scan->pfch_block_item_list)+pfchx)->pfch_discard) { + DiscardBuffer(scan->heapRelation, MAIN_FORKNUM, BlockIdGetBlockNumber(&(((scan->pfch_block_item_list)+pfchx)->pfch_blockid))); + } + } + } +#endif /* USE_PREFETCH */ + /* End the AM's scan */ FunctionCall1(procedure, PointerGetDatum(scan)); @@ -472,6 +560,12 @@ index_getnext_tid(IndexScanDesc scan, Sc /* ... but first, release any held pin on a heap page */ if (BufferIsValid(scan->xs_cbuf)) { +#ifdef USE_PREFETCH + /* if specified block number is present in the prefetch array, then evict it */ + if (scan->do_prefetch) { + index_mark_or_evict_block(scan , BlocknumOfBuffer(scan->xs_cbuf) , 0); + } +#endif /* USE_PREFETCH */ ReleaseBuffer(scan->xs_cbuf); scan->xs_cbuf = InvalidBuffer; } @@ -479,6 +573,11 @@ index_getnext_tid(IndexScanDesc scan, Sc } pgstat_count_index_tuples(scan->indexRelation, 1); + if (scan->heap_tids_seen++ >= (~0)) { + /* Avoid integer overflow */ + scan->heap_tids_seen = 1; + scan->heap_tids_fetched = 0; + } /* Return the TID of the tuple we found. */ return &scan->xs_ctup.t_self; @@ -502,6 +601,10 @@ index_getnext_tid(IndexScanDesc scan, Sc * enough information to do it efficiently in the general case. * ---------------- */ +#if defined(USE_PREFETCH) && defined(AVOID_CATALOG_MIGRATION_FOR_ASYNCIO) +extern Datum btpeeknexttuple(IndexScanDesc scan); +#endif /* USE_PREFETCH */ + HeapTuple index_fetch_heap(IndexScanDesc scan) { @@ -509,16 +612,105 @@ index_fetch_heap(IndexScanDesc scan) bool all_dead = false; bool got_heap_tuple; + + /* We can skip the buffer-switching logic if we're in mid-HOT chain. */ if (!scan->xs_continue_hot) { /* Switch to correct buffer if we don't have it already */ Buffer prev_buf = scan->xs_cbuf; +#ifdef USE_PREFETCH + + /* If the old block is different from new block, then evict old + ** block from prefetched array. It is arguable we should leave it + ** in the array because it's likely to remain in the buffer pool + ** for a while, but in that case , if we encounter the block + ** again, prefetching it again does no harm. + ** (and note that, if it's not pinned, prefetching it will try to + ** pin it since prefetch tries to bank a pin for a buffer in the buffer pool). + ** therefore it should usually win. + */ + if ( scan->do_prefetch + && ( BufferIsValid(prev_buf) ) + && (BlocknotinBuffer(prev_buf,scan->heapRelation,ItemPointerGetBlockNumber(tid))) + && (scan->pfch_next < prefetch_index_scans) /* ensure there is an entry */ + ) { + index_mark_or_evict_block(scan , BlocknumOfBuffer(prev_buf) , 0); + } + +#endif /* USE_PREFETCH */ scan->xs_cbuf = ReleaseAndReadBuffer(scan->xs_cbuf, scan->heapRelation, ItemPointerGetBlockNumber(tid)); + /* If the new block had been prefetched and pinned, + ** then mark that it no longer requires to be discarded. + ** Of course, we don't evict the entry, + ** because we want to remember that it was recently prefetched. + */ + index_mark_or_evict_block(scan , BlocknumOfBuffer(prev_buf) , 1); + + scan->heap_tids_fetched++; + +#ifdef USE_PREFETCH + /* try prefetching next data block + ** (next meaning one containing TIDs from matching keys + ** in same index page and different from any block + ** we previously prefetched and listed in prefetched array) + */ + { + FmgrInfo *procedure; + bool found; /* did we find the "next" heap tid in current index page */ + int PrefetchBufferRc; /* indicates whether requested prefetch block already in a buffer and if pin count on buffer has been increased */ + + if (scan->do_prefetch) { +#ifdef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO + procedure = &scan->indexRelation->rd_aminfo->ampeeknexttuple; /* is incorrect but avoids adding function to catalog */ +#else /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + GET_SCAN_PROCEDURE(ampeeknexttuple); /* is correct but requires adding function to catalog */ +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + + if ( procedure /* does the index access method support peektuple? */ +#ifndef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO + && procedure->fn_addr /* procedure->fn_addr is non-null only if in catalog */ +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + ) { + int iterations = 1; /* how many iterations of prefetching shall we try - + ** if used entries in prefetch list is < target_prefetch_pages + ** then 2, else 1 + ** this should result in gradually and smoothly increasing up to target_prefetch_pages + */ + /* note we trust InitIndexScan verified this scan is forwards only and so set that */ + if (scan->pfch_used < target_prefetch_pages) { + iterations = 2; + } + do { + found = DatumGetBool( +#ifdef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO + btpeeknexttuple(scan) /* pass scan as direct parameter since cant use fmgr because not in catalog */ +#else /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + FunctionCall1(procedure, PointerGetDatum(scan)) /* use fmgr to call it because in catalog */ +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + ); + if (found) { + /* btpeeknexttuple set pfch_next to point to the item in block_item_list to be prefetched */ + PrefetchBufferRc = PrefetchBuffer(scan->heapRelation, MAIN_FORKNUM, BlockIdGetBlockNumber((&((scan->pfch_block_item_list + scan->pfch_next))->pfch_blockid)) , 0); + /* elog(LOG,"index_fetch_heap prefetched rel %u blockNum %u" + ,scan->heapRelation->rd_node.relNode ,BlockIdGetBlockNumber(scan->pfch_block_item_list + scan->pfch_next)); + */ + + /* if pin acquired on buffer, then remember in case of future Discard */ + (scan->pfch_block_item_list + scan->pfch_next)->pfch_discard = (PrefetchBufferRc & PREFTCHRC_BUF_PIN_INCREASED); + + + } + } while (--iterations > 0); + } + } + } +#endif /* USE_PREFETCH */ + /* * Prune page, but only if we weren't already on this page */ --- src/backend/access/index/genam.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/index/genam.c 2014-05-28 16:45:43.322508927 -0400 @@ -77,6 +77,12 @@ RelationGetIndexScan(Relation indexRelat scan = (IndexScanDesc) palloc(sizeof(IndexScanDescData)); +#ifdef USE_PREFETCH + scan->do_prefetch = 0; /* no prefetching by default */ + scan->pfch_index_page_list = (struct pfch_index_pagelist*)0; + scan->pfch_block_item_list = (struct pfch_block_item*)0; +#endif /* USE_PREFETCH */ + scan->heapRelation = NULL; /* may be set later */ scan->indexRelation = indexRelation; scan->xs_snapshot = InvalidSnapshot; /* caller must initialize this */ @@ -139,6 +145,19 @@ RelationGetIndexScan(Relation indexRelat void IndexScanEnd(IndexScanDesc scan) { +#ifdef USE_PREFETCH + if (scan->do_prefetch) { + if ( (struct pfch_block_item*)0 != scan->pfch_block_item_list ) { + pfree(scan->pfch_block_item_list); + scan->pfch_block_item_list = (struct pfch_block_item*)0; + } + if ( (struct pfch_index_pagelist*)0 != scan->pfch_index_page_list ) { + pfree(scan->pfch_index_page_list); + scan->pfch_index_page_list = (struct pfch_index_pagelist*)0; + } + } +#endif /* USE_PREFETCH */ + if (scan->keyData != NULL) pfree(scan->keyData); if (scan->orderByData != NULL) --- src/backend/access/nbtree/nbtsearch.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/nbtree/nbtsearch.c 2014-05-28 16:45:43.350509042 -0400 @@ -23,13 +23,16 @@ #include "utils/lsyscache.h" #include "utils/rel.h" +extern unsigned int prefetch_btree_heaps; /* boolean whether to prefetch heap pages in _bt_next for non-bitmap index scans */ +extern unsigned int prefetch_sequential_index_scans; /* boolean whether to prefetch sequential-access non-bitmap index scans */ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum); static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup); -static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); -static Buffer _bt_walk_left(Relation rel, Buffer buf); +static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir, + bool prefetch); +static Buffer _bt_walk_left(IndexScanDesc scan, Relation rel, Buffer buf); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); @@ -226,7 +229,7 @@ _bt_moveright(Relation rel, _bt_relbuf(rel, buf); /* re-acquire the lock in the right mode, and re-check */ - buf = _bt_getbuf(rel, blkno, access); + buf = _bt_getbuf(rel, blkno, access , (struct pfch_index_pagelist*)0); continue; } @@ -1005,7 +1008,7 @@ _bt_first(IndexScanDesc scan, ScanDirect * There's no actually-matching data on this page. Try to advance to * the next page. Return false if there's no matching data at all. */ - if (!_bt_steppage(scan, dir)) + if (!_bt_steppage(scan, dir, false)) return false; } @@ -1040,6 +1043,8 @@ _bt_next(IndexScanDesc scan, ScanDirecti { BTScanOpaque so = (BTScanOpaque) scan->opaque; BTScanPosItem *currItem; + BlockNumber prevblkno = ItemPointerGetBlockNumber( + &scan->xs_ctup.t_self); /* * Advance to next tuple on current page; or if there's no more, try to @@ -1052,11 +1057,53 @@ _bt_next(IndexScanDesc scan, ScanDirecti /* We must acquire lock before applying _bt_steppage */ Assert(BufferIsValid(so->currPos.buf)); LockBuffer(so->currPos.buf, BT_READ); - if (!_bt_steppage(scan, dir)) + if (!_bt_steppage(scan, dir, target_prefetch_pages > 0)) return false; /* Drop the lock, but not pin, on the new page */ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); } + + if (prefetch_btree_heaps && (target_prefetch_pages > 0)) { + BlockNumber currblkno = ItemPointerGetBlockNumber( + &so->currPos.items[so->currPos.itemIndex].heapTid); + + if (currblkno != prevblkno) { + if (so->prefetchBlockCount > 0) + so->prefetchBlockCount--; + + /* If we have heap fetch frequency stats, and it's above ~94%, + * initiate heap prefetches */ + if (so->currPos.moreRight + && scan->heap_tids_seen > 256 + && ( (scan->heap_tids_seen - scan->heap_tids_seen/16) + <= scan->heap_tids_fetched ) ) + { + bool nonsequential = false; + + if (so->prefetchItemIndex <= so->currPos.itemIndex) + so->prefetchItemIndex = so->currPos.itemIndex + 1; + while ( (so->prefetchItemIndex <= so->currPos.lastItem) + && (so->prefetchBlockCount < target_prefetch_pages) ) + { + ItemPointer tid = &so->currPos.items[so->prefetchItemIndex++].heapTid; + BlockNumber blkno = ItemPointerGetBlockNumber(tid); + if (blkno != so->lastHeapPrefetchBlkno) { /* if not a repetition of previous block */ + /* start prefetch on next page, providing : + ** EITHER . we're reading non-sequentially previously or for this block + ** OR . user explicitly specified to prefetch for sequential pattern + ** as it may be counterproductive otherwise + */ + nonsequential = (nonsequential || blkno != (so->lastHeapPrefetchBlkno+1)); + if (prefetch_sequential_index_scans || nonsequential) { + _bt_prefetchbuf(scan->heapRelation, blkno , &scan->pfch_index_page_list ); + } + so->lastHeapPrefetchBlkno = blkno; + so->prefetchBlockCount++; + } + } + } + } + } } else { @@ -1065,11 +1112,53 @@ _bt_next(IndexScanDesc scan, ScanDirecti /* We must acquire lock before applying _bt_steppage */ Assert(BufferIsValid(so->currPos.buf)); LockBuffer(so->currPos.buf, BT_READ); - if (!_bt_steppage(scan, dir)) + if (!_bt_steppage(scan, dir, target_prefetch_pages > 0)) return false; /* Drop the lock, but not pin, on the new page */ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); } + + if (prefetch_btree_heaps && (target_prefetch_pages > 0)) { + BlockNumber currblkno = ItemPointerGetBlockNumber( + &so->currPos.items[so->currPos.itemIndex].heapTid); + + if (currblkno != prevblkno) { + if (so->prefetchBlockCount > 0) + so->prefetchBlockCount--; + + /* If we have heap fetch frequency stats, and it's above ~94%, + * initiate heap prefetches */ + if (so->currPos.moreLeft + && scan->heap_tids_seen > 256 + && ( (scan->heap_tids_seen - scan->heap_tids_seen/16) + <= scan->heap_tids_fetched ) ) + { + bool nonsequential = false; + + if (so->prefetchItemIndex >= so->currPos.itemIndex) + so->prefetchItemIndex = so->currPos.itemIndex - 1; + while ( (so->prefetchItemIndex >= so->currPos.firstItem) + && (so->prefetchBlockCount < target_prefetch_pages) ) + { + ItemPointer tid = &so->currPos.items[so->prefetchItemIndex--].heapTid; + BlockNumber blkno = ItemPointerGetBlockNumber(tid); + if (blkno != so->lastHeapPrefetchBlkno) { /* if not a repetition of previous block */ + /* start prefetch on next page, providing : + ** EITHER . we're reading non-sequentially previously or for this block + ** OR . user explicitly specified to prefetch for sequential pattern + ** as it may be counterproductive otherwise + */ + nonsequential = (nonsequential || blkno != (so->lastHeapPrefetchBlkno+1)); + if (prefetch_sequential_index_scans || nonsequential) { + _bt_prefetchbuf(scan->heapRelation, blkno , &scan->pfch_index_page_list ); + } + so->lastHeapPrefetchBlkno = blkno; + so->prefetchBlockCount++; + } + } + } + } + } } /* OK, itemIndex says what to return */ @@ -1119,9 +1208,11 @@ _bt_readpage(IndexScanDesc scan, ScanDir /* * we must save the page's right-link while scanning it; this tells us * where to step right to after we're done with these items. There is no - * corresponding need for the left-link, since splits always go right. + * corresponding need for the left-link, since splits always go right, + * but we need it for back-sequential scan detection. */ so->currPos.nextPage = opaque->btpo_next; + so->currPos.prevPage = opaque->btpo_prev; /* initialize tuple workspace to empty */ so->currPos.nextTupleOffset = 0; @@ -1156,6 +1247,7 @@ _bt_readpage(IndexScanDesc scan, ScanDir so->currPos.firstItem = 0; so->currPos.lastItem = itemIndex - 1; so->currPos.itemIndex = 0; + so->prefetchItemIndex = 0; } else { @@ -1187,6 +1279,7 @@ _bt_readpage(IndexScanDesc scan, ScanDir so->currPos.firstItem = itemIndex; so->currPos.lastItem = MaxIndexTuplesPerPage - 1; so->currPos.itemIndex = MaxIndexTuplesPerPage - 1; + so->prefetchItemIndex = MaxIndexTuplesPerPage - 1; } return (so->currPos.firstItem <= so->currPos.lastItem); @@ -1224,7 +1317,7 @@ _bt_saveitem(BTScanOpaque so, int itemIn * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. */ static bool -_bt_steppage(IndexScanDesc scan, ScanDirection dir) +_bt_steppage(IndexScanDesc scan, ScanDirection dir, bool prefetch) { BTScanOpaque so = (BTScanOpaque) scan->opaque; Relation rel; @@ -1278,7 +1371,7 @@ _bt_steppage(IndexScanDesc scan, ScanDir /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); /* step right one page */ - so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ , scan->pfch_index_page_list); /* check for deleted page */ page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); @@ -1287,9 +1380,20 @@ _bt_steppage(IndexScanDesc scan, ScanDir PredicateLockPage(rel, blkno, scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreRight if we can stop */ - if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) + if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) { + if ( prefetch && so->currPos.moreRight + /* start prefetch on next page, providing : + ** EITHER . we're reading non-sequentially for this block + ** OR . user explicitly specified to prefetch for sequential pattern + ** as it may be counterproductive otherwise + */ + && (prefetch_sequential_index_scans || opaque->btpo_next != (blkno+1)) + ) { + _bt_prefetchbuf(rel, opaque->btpo_next , &scan->pfch_index_page_list); + } break; } + } /* nope, keep going */ blkno = opaque->btpo_next; } @@ -1317,7 +1421,7 @@ _bt_steppage(IndexScanDesc scan, ScanDir } /* Step to next physical page */ - so->currPos.buf = _bt_walk_left(rel, so->currPos.buf); + so->currPos.buf = _bt_walk_left(scan , rel, so->currPos.buf); /* if we're physically at end of index, return failure */ if (so->currPos.buf == InvalidBuffer) @@ -1332,14 +1436,58 @@ _bt_steppage(IndexScanDesc scan, ScanDir opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (!P_IGNORE(opaque)) { + /* We must rely on the previously saved prevPage link! */ + BlockNumber blkno = so->currPos.prevPage; + PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreLeft if we can stop */ - if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) + if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) { + if (prefetch && so->currPos.moreLeft) { + /* detect back-sequential runs and increase prefetch window blindly + * downwards 2 blocks at a time. This only works in our favor + * for index-only scans, by merging read requests at the kernel, + * so we want to inflate target_prefetch_pages since merged + * back-sequential requests are about as expensive as a single one + */ + if (scan->xs_want_itup && blkno > 0 && opaque->btpo_prev == (blkno-1)) { + BlockNumber backPos; + unsigned int back_prefetch_pages = target_prefetch_pages * 16; + if (back_prefetch_pages > 64) + back_prefetch_pages = 64; + + if (so->backSeqRun == 0) + backPos = (blkno-1); + else + backPos = so->backSeqPos; + so->backSeqRun++; + + if (backPos > 0 && (blkno - backPos) <= back_prefetch_pages) { + _bt_prefetchbuf(rel, backPos-- , &scan->pfch_index_page_list); + /* don't start back-seq prefetch too early */ + if (so->backSeqRun >= back_prefetch_pages + && backPos > 0 + && (blkno - backPos) <= back_prefetch_pages) + { + _bt_prefetchbuf(rel, backPos-- , &scan->pfch_index_page_list); + } + } + + so->backSeqPos = backPos; + } else { + /* start prefetch on next page */ + if (so->backSeqRun != 0) { + if (opaque->btpo_prev > blkno || opaque->btpo_prev < so->backSeqPos) + so->backSeqRun = 0; + } + _bt_prefetchbuf(rel, opaque->btpo_prev , &scan->pfch_index_page_list); + } + } break; } } } + } return true; } @@ -1359,7 +1507,7 @@ _bt_steppage(IndexScanDesc scan, ScanDir * again if it's important. */ static Buffer -_bt_walk_left(Relation rel, Buffer buf) +_bt_walk_left(IndexScanDesc scan, Relation rel, Buffer buf) { Page page; BTPageOpaque opaque; @@ -1387,7 +1535,7 @@ _bt_walk_left(Relation rel, Buffer buf) _bt_relbuf(rel, buf); /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); - buf = _bt_getbuf(rel, blkno, BT_READ); + buf = _bt_getbuf(rel, blkno, BT_READ , scan->pfch_index_page_list ); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); @@ -1631,7 +1779,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDir * There's no actually-matching data on this page. Try to advance to * the next page. Return false if there's no matching data at all. */ - if (!_bt_steppage(scan, dir)) + if (!_bt_steppage(scan, dir, false)) return false; } --- src/backend/access/nbtree/nbtinsert.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/nbtree/nbtinsert.c 2014-05-28 16:45:43.394509218 -0400 @@ -793,7 +793,7 @@ _bt_insertonpg(Relation rel, { Assert(!P_ISLEAF(lpageop)); - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); @@ -972,7 +972,7 @@ _bt_split(Relation rel, Buffer buf, Buff bool isleaf; /* Acquire a new page to split into */ - rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE , (struct pfch_index_pagelist*)0); /* * origpage is the original page to be split. leftpage is a temporary @@ -1175,7 +1175,7 @@ _bt_split(Relation rel, Buffer buf, Buff if (!P_RIGHTMOST(oopaque)) { - sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE); + sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE , (struct pfch_index_pagelist*)0); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); if (sopaque->btpo_prev != origpagenumber) @@ -1817,7 +1817,7 @@ _bt_finish_split(Relation rel, Buffer lb Assert(P_INCOMPLETE_SPLIT(lpageop)); /* Lock right sibling, the one missing the downlink */ - rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); + rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE , (struct pfch_index_pagelist*)0); rpage = BufferGetPage(rbuf); rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage); @@ -1829,7 +1829,7 @@ _bt_finish_split(Relation rel, Buffer lb BTMetaPageData *metad; /* acquire lock on the metapage */ - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); @@ -1877,7 +1877,7 @@ _bt_getstackbuf(Relation rel, BTStack st Page page; BTPageOpaque opaque; - buf = _bt_getbuf(rel, blkno, access); + buf = _bt_getbuf(rel, blkno, access , (struct pfch_index_pagelist*)0); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); @@ -2008,12 +2008,12 @@ _bt_newroot(Relation rel, Buffer lbuf, B lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); /* get a new root page */ - rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE , (struct pfch_index_pagelist*)0); rootpage = BufferGetPage(rootbuf); rootblknum = BufferGetBlockNumber(rootbuf); /* acquire lock on the metapage */ - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); --- src/backend/access/nbtree/nbtpage.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/nbtree/nbtpage.c 2014-05-28 16:45:43.426509347 -0400 @@ -127,7 +127,7 @@ _bt_getroot(Relation rel, int access) Assert(rootblkno != P_NONE); rootlevel = metad->btm_fastlevel; - rootbuf = _bt_getbuf(rel, rootblkno, BT_READ); + rootbuf = _bt_getbuf(rel, rootblkno, BT_READ , (struct pfch_index_pagelist*)0); rootpage = BufferGetPage(rootbuf); rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); @@ -153,7 +153,7 @@ _bt_getroot(Relation rel, int access) rel->rd_amcache = NULL; } - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg); metad = BTPageGetMeta(metapg); @@ -209,7 +209,7 @@ _bt_getroot(Relation rel, int access) * the new root page. Since this is the first page in the tree, it's * a leaf as well as the root. */ - rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE , (struct pfch_index_pagelist*)0); rootblkno = BufferGetBlockNumber(rootbuf); rootpage = BufferGetPage(rootbuf); rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); @@ -350,7 +350,7 @@ _bt_gettrueroot(Relation rel) pfree(rel->rd_amcache); rel->rd_amcache = NULL; - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg); metad = BTPageGetMeta(metapg); @@ -436,7 +436,7 @@ _bt_getrootheight(Relation rel) Page metapg; BTPageOpaque metaopaque; - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metaopaque = (BTPageOpaque) PageGetSpecialPointer(metapg); metad = BTPageGetMeta(metapg); @@ -562,6 +562,170 @@ _bt_log_reuse_page(Relation rel, BlockNu } /* + * _bt_prefetchbuf() -- Prefetch a buffer by block number + * and keep track of prefetched and unread blocknums in pagelist. + * input parms : + * rel and blockno identify block to be prefetched as usual + * pfch_index_page_list_P points to the pointer anchoring the head of the index page list + * Since the pagelist is a kind of optimization, + * handle palloc failure by quietly omitting the keeping track. + */ +void +_bt_prefetchbuf(Relation rel, BlockNumber blkno , struct pfch_index_pagelist** pfch_index_page_list_P) +{ + + int rc = 0; + struct pfch_index_pagelist* pfch_index_plp; /* pointer to current chunk */ + struct pfch_index_item* found_item = 0; + + if ((struct pfch_index_pagelist**)0 == pfch_index_page_list_P) { + pfch_index_plp = (struct pfch_index_pagelist*)0; + } else { + pfch_index_plp = *pfch_index_page_list_P; + } + + if (blkno != P_NEW && blkno != P_NONE) + { + /* prefetch an existing block of the relation + ** but first, check it has not recently already been prefetched and not yet read + */ + found_item = _bt_find_block(blkno , pfch_index_plp); + if ((struct pfch_index_item*)0 == found_item) { /* not found */ + + rc = PrefetchBuffer(rel, MAIN_FORKNUM, blkno , 0); + + /* add the pagenum to the list , indicating its discard status + ** since it's only an optimization, ignore failure such as exceeded allowed space + */ + _bt_add_block( blkno , pfch_index_page_list_P , (uint32)(rc & PREFTCHRC_BUF_PIN_INCREASED)); + + } + } + return; +} + +/* _bt_find_block finds the item referencing specified Block in index page list if present +** and returns the pointer to the pfch_index_item if found, or null if not +*/ +struct pfch_index_item* +_bt_find_block(BlockNumber blkno , struct pfch_index_pagelist* pfch_index_page_list) +{ + + struct pfch_index_item* found_item = 0; + struct pfch_index_pagelist* pfch_index_plp; /* pointer to current chunk */ + int ix, tx; + + pfch_index_plp = pfch_index_page_list; + + while ( (struct pfch_index_pagelist*)0 != pfch_index_plp + && ( (struct pfch_index_item*)0 == found_item) + ) { + ix = 0; + tx = pfch_index_plp->pfch_index_item_count; + while ( (ix < tx) + && ( (struct pfch_index_item*)0 == found_item) + ) { + if (pfch_index_plp->pfch_indexid[ix].pfch_blocknum == blkno) { + found_item = &pfch_index_plp->pfch_indexid[ix]; + } + ix++; + } + pfch_index_plp = pfch_index_plp->pfch_index_pagelist_next; + } + + return found_item; +} + +/* _bt_add_block adds the specified Block to the index page list +** and returns 0 if successful, non-zero if not +*/ +int +_bt_add_block(BlockNumber blkno , struct pfch_index_pagelist** pfch_index_page_list_P , uint32 discard_status) +{ + int rc = 1; + int ix; + struct pfch_index_pagelist* pfch_index_plp; /* pointer to current chunk */ + struct pfch_index_pagelist* pfch_index_page_list_anchor; /* pointer to first chunk if any */ + /* allow expansion of pagelist to 16 chunks + ** which accommodates backwards-sequential index scans + ** where the scanner increases target_prefetch_pages by a factor of up to 16 + ** see code in _bt_steppage + ** note - this creates an undesirable weak dependency on this number in _bt_steppage, + ** but : + ** there is no disaster if the numbers disagree - just sub-optimal use of the list + ** to implement a proper interface would require that chunks have a variable size + ** which would require an extra size variable in each chunk + */ + int num_chunks = 16; + + if ((struct pfch_index_pagelist**)0 == pfch_index_page_list_P) { + pfch_index_page_list_anchor = (struct pfch_index_pagelist*)0; + } else { + pfch_index_page_list_anchor = *pfch_index_page_list_P; + } + pfch_index_plp = pfch_index_page_list_anchor; /* pointer to current chunk */ + + while ( (struct pfch_index_pagelist*)0 != pfch_index_plp ) { + ix = pfch_index_plp->pfch_index_item_count; + if (ix < target_prefetch_pages) { + pfch_index_plp->pfch_indexid[ix].pfch_blocknum = blkno; + pfch_index_plp->pfch_indexid[ix].pfch_discard = discard_status; + pfch_index_plp->pfch_index_item_count = (ix+1); + rc = 0; + goto stored_pagenum; + } + pfch_index_plp = pfch_index_plp->pfch_index_pagelist_next; + num_chunks--; /* keep track of number of chunks */ + } + + /* we did not find any free space in existing chunks - + ** create new chunk if within our limit and we have a pfch_index_page_list + */ + if ( (num_chunks > 0) && ((struct pfch_index_pagelist*)0 != pfch_index_page_list_anchor) ) { + pfch_index_plp = (struct pfch_index_pagelist*)palloc( sizeof(struct pfch_index_pagelist) + ( (target_prefetch_pages-1) * sizeof(struct pfch_index_item) ) ); + if ( (struct pfch_index_pagelist*)0 != pfch_index_plp ) { + pfch_index_plp->pfch_index_pagelist_next = pfch_index_page_list_anchor; /* old head of list is next after this */ + pfch_index_plp->pfch_indexid[0].pfch_blocknum = blkno; + pfch_index_plp->pfch_indexid[0].pfch_discard = discard_status; + pfch_index_plp->pfch_index_item_count = 1; + pfch_index_page_list_P = &pfch_index_plp; /* new head of list is new chunk */ + rc = 0; + } + } + + stored_pagenum:; + return rc; +} + +/* _bt_subtract_block removes a block from the prefetched-but-unread pagelist if present */ +void +_bt_subtract_block(BlockNumber blkno , struct pfch_index_pagelist* pfch_index_page_list) +{ + struct pfch_index_pagelist* pfch_index_plp = pfch_index_page_list; + if ( (blkno != P_NEW) && (blkno != P_NONE) ) { + int ix , jx; + while ( (struct pfch_index_pagelist*)0 != pfch_index_plp ) { + ix = pfch_index_plp->pfch_index_item_count; + while (ix-- > 0) { + if (pfch_index_plp->pfch_indexid[ix].pfch_blocknum == blkno) { + /* move the last item to the curent (now deleted) position and decrement count */ + jx = (pfch_index_plp->pfch_index_item_count-1); /* index of last item ... */ + if (jx > ix) { /* ... is not the current one so move is required */ + pfch_index_plp->pfch_indexid[ix].pfch_blocknum = pfch_index_plp->pfch_indexid[jx].pfch_blocknum; + pfch_index_plp->pfch_indexid[ix].pfch_discard = pfch_index_plp->pfch_indexid[jx].pfch_discard; + ix = jx; + } + pfch_index_plp->pfch_index_item_count = ix; + goto done_subtract; + } + } + pfch_index_plp = pfch_index_plp->pfch_index_pagelist_next; + } + } + done_subtract: return; +} + +/* * _bt_getbuf() -- Get a buffer by block number for read or write. * * blkno == P_NEW means to get an unallocated index page. The page @@ -573,7 +737,7 @@ _bt_log_reuse_page(Relation rel, BlockNu * _bt_checkpage to sanity-check the page (except in P_NEW case). */ Buffer -_bt_getbuf(Relation rel, BlockNumber blkno, int access) +_bt_getbuf(Relation rel, BlockNumber blkno, int access , struct pfch_index_pagelist* pfch_index_page_list) { Buffer buf; @@ -581,6 +745,10 @@ _bt_getbuf(Relation rel, BlockNumber blk { /* Read an existing block of the relation */ buf = ReadBuffer(rel, blkno); + + /* if the block is in the prefetched-but-unread pagelist, remove it */ + _bt_subtract_block( blkno , pfch_index_page_list); + LockBuffer(buf, access); _bt_checkpage(rel, buf); } @@ -702,6 +870,10 @@ _bt_getbuf(Relation rel, BlockNumber blk * bufmgr when one would do. However, now it's mainly just a notational * convenience. The only case where it saves work over _bt_relbuf/_bt_getbuf * is when the target page is the same one already in the buffer. + * + * if prefetching of index pages is changed to use this function, + * then it should be extended to take the index_page_list as parameter + * and call_bt_subtract_block in the same way that _bt_getbuf does. */ Buffer _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access) @@ -712,6 +884,7 @@ _bt_relandgetbuf(Relation rel, Buffer ob if (BufferIsValid(obuf)) LockBuffer(obuf, BUFFER_LOCK_UNLOCK); buf = ReleaseAndReadBuffer(obuf, rel, blkno); + LockBuffer(buf, access); _bt_checkpage(rel, buf); return buf; @@ -965,7 +1138,7 @@ _bt_is_page_halfdead(Relation rel, Block BTPageOpaque opaque; bool result; - buf = _bt_getbuf(rel, blk, BT_READ); + buf = _bt_getbuf(rel, blk, BT_READ , (struct pfch_index_pagelist*)0); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); @@ -1069,7 +1242,7 @@ _bt_lock_branch_parent(Relation rel, Blo Page lpage; BTPageOpaque lopaque; - lbuf = _bt_getbuf(rel, leftsib, BT_READ); + lbuf = _bt_getbuf(rel, leftsib, BT_READ, (struct pfch_index_pagelist*)0); lpage = BufferGetPage(lbuf); lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); @@ -1265,7 +1438,7 @@ _bt_pagedel(Relation rel, Buffer buf) BTPageOpaque lopaque; Page lpage; - lbuf = _bt_getbuf(rel, leftsib, BT_READ); + lbuf = _bt_getbuf(rel, leftsib, BT_READ, (struct pfch_index_pagelist*)0); lpage = BufferGetPage(lbuf); lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); /* @@ -1340,7 +1513,7 @@ _bt_pagedel(Relation rel, Buffer buf) if (!rightsib_empty) break; - buf = _bt_getbuf(rel, rightsib, BT_WRITE); + buf = _bt_getbuf(rel, rightsib, BT_WRITE, (struct pfch_index_pagelist*)0); } return ndeleted; @@ -1593,7 +1766,7 @@ _bt_unlink_halfdead_page(Relation rel, B target = topblkno; /* fetch the block number of the topmost parent's left sibling */ - buf = _bt_getbuf(rel, topblkno, BT_READ); + buf = _bt_getbuf(rel, topblkno, BT_READ, (struct pfch_index_pagelist*)0); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); leftsib = opaque->btpo_prev; @@ -1632,7 +1805,7 @@ _bt_unlink_halfdead_page(Relation rel, B LockBuffer(leafbuf, BT_WRITE); if (leftsib != P_NONE) { - lbuf = _bt_getbuf(rel, leftsib, BT_WRITE); + lbuf = _bt_getbuf(rel, leftsib, BT_WRITE , (struct pfch_index_pagelist*)0); page = BufferGetPage(lbuf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); while (P_ISDELETED(opaque) || opaque->btpo_next != target) @@ -1646,7 +1819,7 @@ _bt_unlink_halfdead_page(Relation rel, B RelationGetRelationName(rel)); return false; } - lbuf = _bt_getbuf(rel, leftsib, BT_WRITE); + lbuf = _bt_getbuf(rel, leftsib, BT_WRITE , (struct pfch_index_pagelist*)0); page = BufferGetPage(lbuf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); } @@ -1701,7 +1874,7 @@ _bt_unlink_halfdead_page(Relation rel, B * And next write-lock the (current) right sibling. */ rightsib = opaque->btpo_next; - rbuf = _bt_getbuf(rel, rightsib, BT_WRITE); + rbuf = _bt_getbuf(rel, rightsib, BT_WRITE , (struct pfch_index_pagelist*)0); page = BufferGetPage(rbuf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (opaque->btpo_prev != target) @@ -1731,7 +1904,7 @@ _bt_unlink_halfdead_page(Relation rel, B if (P_RIGHTMOST(opaque)) { /* rightsib will be the only one left on the level */ - metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE); + metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE , (struct pfch_index_pagelist*)0); metapg = BufferGetPage(metabuf); metad = BTPageGetMeta(metapg); --- src/backend/access/nbtree/nbtree.c.orig 2014-05-28 08:29:09.242829343 -0400 +++ src/backend/access/nbtree/nbtree.c 2014-05-28 16:45:43.450509443 -0400 @@ -30,6 +30,18 @@ #include "tcop/tcopprot.h" #include "utils/memutils.h" +#ifdef USE_PREFETCH +extern unsigned int prefetch_index_scans; /* boolean whether to prefetch non-bitmap index scans also numeric size of pfch_list */ +#endif /* USE_PREFETCH */ + +Datum +btpeeknexttuple( +#ifdef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO + IndexScanDesc scan +#else /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + PG_FUNCTION_ARGS +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ +); /* Working state for btbuild and its callback */ typedef struct @@ -332,6 +344,74 @@ btgettuple(PG_FUNCTION_ARGS) } /* + * btpeeknexttuple() -- peek at the next tuple different from any blocknum in pfch_block_item_list + * without reading a new index page + * and without causing any side-effects such as altering values in control blocks + * if found, store blocknum in next element of pfch_block_item_list + */ +Datum +btpeeknexttuple( +#ifdef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO + IndexScanDesc scan +#else /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + PG_FUNCTION_ARGS +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ +) +{ +#ifndef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO + IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + bool res = false; + int itemIndex; /* current index in items[] */ + + /* + * If we've already initialized this scan, we can just advance it in + * the appropriate direction. If we haven't done so yet, bail out + */ + if ( BTScanPosIsValid(so->currPos) ) { + + itemIndex = so->currPos.itemIndex+1; /* next item */ + + /* This loop handles advancing till we find different data block or end of index page */ + while (itemIndex <= so->currPos.lastItem) { + unsigned short int pfchx; /* index in BlockIdData array */ + for (pfchx = 0; pfchx < scan->pfch_used; pfchx++) { + if (BlockIdEquals((&(((scan->pfch_block_item_list)+pfchx)->pfch_blockid)) , &(so->currPos.items[itemIndex].heapTid.ip_blkid))) { + goto block_match; + } + } + + /* if we reach here, no block in list matched this item */ + res = true; + /* set item in prefetch list + ** prefer unused entry if there is one, else overwrite + */ + if (scan->pfch_used < prefetch_index_scans) { + scan->pfch_next = scan->pfch_used; + } else { + scan->pfch_next++; + if (scan->pfch_next >= prefetch_index_scans) { + scan->pfch_next = 0; + } + } + + BlockIdCopy((&((scan->pfch_block_item_list + scan->pfch_next)->pfch_blockid)) , &(so->currPos.items[itemIndex].heapTid.ip_blkid)); + if (scan->pfch_used <= scan->pfch_next) { + scan->pfch_used = (scan->pfch_next + 1); + } + + goto peek_complete; + + block_match: itemIndex++; + } + } + + peek_complete: + PG_RETURN_BOOL(res); +} + +/* * btgetbitmap() -- gets all matching tuples, and adds them to a bitmap */ Datum @@ -425,6 +505,12 @@ btbeginscan(PG_FUNCTION_ARGS) so->killedItems = NULL; /* until needed */ so->numKilled = 0; + so->backSeqRun = 0; + so->backSeqPos = 0; + so->prefetchItemIndex = 0; + so->lastHeapPrefetchBlkno = P_NONE; + so->prefetchBlockCount = 0; + /* * We don't know yet whether the scan will be index-only, so we do not * allocate the tuple workspace arrays until btrescan. However, we set up @@ -516,6 +602,23 @@ btendscan(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); BTScanOpaque so = (BTScanOpaque) scan->opaque; + struct pfch_index_pagelist* pfch_index_plp; + int ix; + +#ifdef USE_PREFETCH + + /* discard all prefetched but unread index pages listed in the pagelist */ + pfch_index_plp = scan->pfch_index_page_list; + while ( (struct pfch_index_pagelist*)0 != pfch_index_plp ) { + ix = pfch_index_plp->pfch_index_item_count; + while (ix-- > 0) { + if (pfch_index_plp->pfch_indexid[ix].pfch_discard) { + DiscardBuffer( scan->indexRelation , MAIN_FORKNUM , pfch_index_plp->pfch_indexid[ix].pfch_blocknum); + } + } + pfch_index_plp = pfch_index_plp->pfch_index_pagelist_next; + } +#endif /* USE_PREFETCH */ /* we aren't holding any read locks, but gotta drop the pins */ if (BTScanPosIsValid(so->currPos)) --- src/backend/nodes/tidbitmap.c.orig 2014-05-28 08:29:09.278829325 -0400 +++ src/backend/nodes/tidbitmap.c 2014-05-28 16:45:43.474509540 -0400 @@ -44,6 +44,9 @@ #include "nodes/bitmapset.h" #include "nodes/tidbitmap.h" #include "utils/hsearch.h" +#ifdef USE_PREFETCH +extern int target_prefetch_pages; +#endif /* USE_PREFETCH */ /* * The maximum number of tuples per page is not large (typically 256 with @@ -572,7 +575,12 @@ tbm_begin_iterate(TIDBitmap *tbm) * needs of the TBMIterateResult sub-struct. */ iterator = (TBMIterator *) palloc(sizeof(TBMIterator) + - MAX_TUPLES_PER_PAGE * sizeof(OffsetNumber)); + MAX_TUPLES_PER_PAGE * sizeof(OffsetNumber) +#ifdef USE_PREFETCH + /* space for remembering every prefetched but unread blockno */ + + (target_prefetch_pages * sizeof(BlockNumber)) +#endif /* USE_PREFETCH */ + ); iterator->tbm = tbm; /* @@ -1020,3 +1028,68 @@ tbm_comparator(const void *left, const v return 1; return 0; } + +void +tbm_zero(TBMIterator *iterator) /* zero list of prefetched and unread blocknos */ +{ + /* locate the list of prefetched but unread blocknos immediately following the array of offsets + ** and note that tbm_begin_iterate allocates space for (1 + MAX_TUPLES_PER_PAGE) offsets - + ** 1 included in struct TBMIterator and MAX_TUPLES_PER_PAGE additional + */ + iterator->output.Unread_Pfetched_base = ((BlockNumber *)(&(iterator->output.offsets[MAX_TUPLES_PER_PAGE+1]))); + iterator->output.Unread_Pfetched_next = iterator->output.Unread_Pfetched_count = 0; +} + +void +tbm_add(TBMIterator *iterator, BlockNumber blockno) /* add this blockno to list of prefetched and unread blocknos */ +{ + unsigned int Unread_Pfetched_index = iterator->output.Unread_Pfetched_next + iterator->output.Unread_Pfetched_count++; + + if (iterator->output.Unread_Pfetched_count > target_prefetch_pages) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("tbm_add overflowed list cannot add blockno %d", blockno))); + } + + if (Unread_Pfetched_index >= target_prefetch_pages) + Unread_Pfetched_index -= target_prefetch_pages; + *(iterator->output.Unread_Pfetched_base + Unread_Pfetched_index) = blockno; +} + +void +tbm_subtract(TBMIterator *iterator, BlockNumber blockno) /* remove this blockno from list of prefetched and unread blocknos */ +{ + unsigned int Unread_Pfetched_index = iterator->output.Unread_Pfetched_next++; + BlockNumber nextUnreadPfetched; + + /* make a weak check that the next blockno is the one to be removed, + ** although actually in case of disagreement, we ignore callers blockno and remove next anyway, + ** which is really what caller wants + */ + if ( iterator->output.Unread_Pfetched_count == 0 ) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("tbm_subtract empty list cannot subtract blockno %d", blockno))); + } + + if (Unread_Pfetched_index >= target_prefetch_pages) + Unread_Pfetched_index = 0; + nextUnreadPfetched = *(iterator->output.Unread_Pfetched_base + Unread_Pfetched_index); + if ( ( nextUnreadPfetched != blockno ) + && ( nextUnreadPfetched != InvalidBlockNumber ) /* dont report it if the block in the list was InvalidBlockNumber */ + ) { + ereport(NOTICE, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("tbm_subtract will subtract blockno %d not %d", + nextUnreadPfetched, blockno))); + } + if (iterator->output.Unread_Pfetched_next >= target_prefetch_pages) + iterator->output.Unread_Pfetched_next = 0; + iterator->output.Unread_Pfetched_count--; +} + +TBMIterateResult * +tbm_locate_IterateResult(TBMIterator *iterator) +{ + return &(iterator->output); +} --- src/backend/utils/misc/guc.c.orig 2014-05-28 08:29:09.406829256 -0400 +++ src/backend/utils/misc/guc.c 2014-05-28 16:45:43.550509846 -0400 @@ -2264,6 +2264,25 @@ static struct config_int ConfigureNamesI }, { + {"max_async_io_prefetchers", +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + PGC_USERSET, +#else + PGC_INTERNAL, +#endif + RESOURCES_ASYNCHRONOUS, + gettext_noop("Maximum number of background processes concurrently using asynchronous librt threads to prefetch pages into shared memory buffers."), + }, + &max_async_io_prefetchers, +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + -1, 0, 8192, /* boot val -1 indicates to initialize to something sensible during buf_init */ +#else + 0, 0, 0, +#endif + NULL, NULL, NULL + }, + + { {"max_worker_processes", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS, --- src/backend/utils/mmgr/aset.c.orig 2014-05-28 08:29:09.406829256 -0400 +++ src/backend/utils/mmgr/aset.c 2014-05-28 16:45:43.610510088 -0400 @@ -733,6 +733,48 @@ AllocSetAlloc(MemoryContext context, Siz */ fidx = AllocSetFreeIndex(size); chunk = set->freelist[fidx]; +#ifdef MEMORY_CONTEXT_CHECKING + /* an instance of segfault caused by a rogue value in set->freelist[fidx] + ** has been seen - check for it using crude sanity check based on neighbours : + ** if at least one is sufficiently close, then pass, else fail + */ + if (chunk != 0) { + int frx, nrx; /* frx is index, nrx is index of failing neighbour for errmsg */ + for (nrx = -1, frx = 0; (frx < ALLOCSET_NUM_FREELISTS); frx++) { + if ( (frx != fidx) /* not the chosen one */ + && ( ( (unsigned long)(set->freelist[frx]) ) != 0 ) /* not empty */ + ) { + if ( ( (unsigned long)chunk < ( ( (unsigned long)(set->freelist[frx]) ) / 2 ) ) + && ( ( (unsigned long)(set->freelist[frx]) ) < 0x4000000 ) + /*** || ( (unsigned long)chunk > ( ( (unsigned long)(set->freelist[frx]) ) * 2 ) ) ***/ + ) { + nrx = frx; + } else { + nrx = -1; + break; + } + } + } + + if (nrx >= 0) { + /* this must be a rogue value - put this list in the garbage */ + /* build message but be careful to avoid recursively triggering same fault */ + set->freelist[fidx] = NULL; /* mark this list empty */ + elog(WARNING, "detected rogue value %p in freelist index %d compared with neighbour %p whose chunksize %d" + , chunk , fidx , set->freelist[nrx] , set->freelist[nrx]->size); + chunk = NULL; + } + } +#else /* if not MEMORY_CONTEXT_CHECKING make very simple-minded check*/ + if ( (chunk != 0) && ( (unsigned long)chunk < 0x40000 ) ) { + /* this must be a rogue value - put this list in the garbage */ + /* build message but be careful to avoid recursively triggering same fault */ + set->freelist[fidx] = NULL; /* mark this list empty */ + elog(WARNING, "detected rogue value %p in freelist index %d" + , chunk , fidx); + chunk = NULL; + } +#endif if (chunk != NULL) { Assert(chunk->size >= size); --- src/include/executor/instrument.h.orig 2014-05-28 08:29:09.454829232 -0400 +++ src/include/executor/instrument.h 2014-05-28 16:45:43.798510846 -0400 @@ -28,8 +28,18 @@ typedef struct BufferUsage long local_blks_written; /* # of local disk blocks written */ long temp_blks_read; /* # of temp blocks read */ long temp_blks_written; /* # of temp blocks written */ + instr_time blk_read_time; /* time spent reading */ instr_time blk_write_time; /* time spent writing */ + + long aio_read_noneed; /* # of prefetches for which no need for prefetch as block already in buffer pool */ + long aio_read_discrd; /* # of prefetches for which no need for prefetch as block already in buffer pool */ + long aio_read_forgot; /* # of prefetches for which no need for prefetch as block already in buffer pool */ + long aio_read_noblok; /* # of prefetches for which no available BufferAiocb */ + long aio_read_failed; /* # of aio reads for which aio itself failed or the read failed with an errno */ + long aio_read_wasted; /* # of aio reads for which disk block not used */ + long aio_read_waited; /* # of aio reads for which disk block used but had to wait for it */ + long aio_read_ontime; /* # of aio reads for which disk block used and ready on time when requested */ } BufferUsage; /* Flag bits included in InstrAlloc's instrument_options bitmask */ --- src/include/storage/bufmgr.h.orig 2014-05-28 08:29:09.462829227 -0400 +++ src/include/storage/bufmgr.h 2014-05-28 16:45:43.830510976 -0400 @@ -41,6 +41,7 @@ typedef enum RBM_ZERO_ON_ERROR, /* Read, but return an all-zeros page on error */ RBM_NORMAL_NO_LOG /* Don't log page as invalid during WAL * replay; otherwise same as RBM_NORMAL */ + ,RBM_NOREAD_FOR_PREFETCH /* Don't read from disk, don't zero buffer, find buffer only */ } ReadBufferMode; /* in globals.c ... this duplicates miscadmin.h */ @@ -57,6 +58,9 @@ extern int target_prefetch_pages; extern PGDLLIMPORT char *BufferBlocks; extern PGDLLIMPORT int32 *PrivateRefCount; +/* in buf_async.c */; +extern int max_async_io_prefetchers; /* Maximum number of backends using asynchronous librt threads to read pages into our buffers */ + /* in localbuf.c */ extern PGDLLIMPORT int NLocBuffer; extern PGDLLIMPORT Block *LocalBufferBlockPointers; @@ -159,9 +163,15 @@ extern PGDLLIMPORT int32 *LocalRefCount; #define BufferGetPage(buffer) ((Page)BufferGetBlock(buffer)) /* - * prototypes for functions in bufmgr.c + * prototypes for external functions in bufmgr.c and buf_async.c */ -extern void PrefetchBuffer(Relation reln, ForkNumber forkNum, +extern int PrefetchBuffer(Relation reln, ForkNumber forkNum, + BlockNumber blockNum , BufferAccessStrategy strategy); +/* return code is an int bitmask : */ +#define PREFTCHRC_BUF_PIN_INCREASED 0x01 /* pin count on buffer has been increased by 1 */ +#define PREFTCHRC_BLK_ALREADY_PRESENT 0x02 /* block was already present in a buffer */ + +extern void DiscardBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum); extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, --- src/include/storage/smgr.h.orig 2014-05-28 08:29:09.462829227 -0400 +++ src/include/storage/smgr.h 2014-05-28 16:45:43.854511072 -0400 @@ -92,6 +92,12 @@ extern void smgrextend(SMgrRelation reln BlockNumber blocknum, char *buffer, bool skipFsync); extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +extern void smgrinitaio(int max_aio_threads, int max_aio_num); +extern void smgrstartaio(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum , char *aiocbp , int *retcode); +extern void smgrcompleteaio( SMgrRelation reln, char *aiocbp , int *inoutcode ); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ extern void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, @@ -118,6 +124,11 @@ extern void mdextend(SMgrRelation reln, BlockNumber blocknum, char *buffer, bool skipFsync); extern void mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +extern void mdinitaio(int max_aio_threads, int max_aio_num); +extern void mdstartaio(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum , char *aiocbp , int *retcode ); +extern void mdcompleteaio( char *aiocbp , int *inoutcode ); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); extern void mdwrite(SMgrRelation reln, ForkNumber forknum, --- src/include/storage/fd.h.orig 2014-05-28 08:29:09.462829227 -0400 +++ src/include/storage/fd.h 2014-05-28 16:45:43.882511185 -0400 @@ -69,6 +69,11 @@ extern File PathNameOpenFile(FileName fi extern File OpenTemporaryFile(bool interXact); extern void FileClose(File file); extern int FilePrefetch(File file, off_t offset, int amount); +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +extern void FileInitaio(int max_aio_threads, int max_aio_num ); +extern int FileStartaio(File file, off_t offset, int amount , char *aiocbp); +extern int FileCompleteaio( char *aiocbp , int normal_wait ); +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ extern int FileRead(File file, char *buffer, int amount); extern int FileWrite(File file, char *buffer, int amount); extern int FileSync(File file); --- src/include/storage/buf_internals.h.orig 2014-05-28 08:29:09.462829227 -0400 +++ src/include/storage/buf_internals.h 2014-05-28 16:45:43.906511281 -0400 @@ -22,7 +22,9 @@ #include "storage/smgr.h" #include "storage/spin.h" #include "utils/relcache.h" - +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP +#include "aio.h" +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ /* * Flags for buffer descriptors @@ -38,8 +40,23 @@ #define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ #define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ #define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ -#define BM_PERMANENT (1 << 8) /* permanent relation (not - * unlogged) */ +#define BM_PERMANENT (1 << 8) /* permanent relation (not unlogged) */ +#define BM_AIO_IN_PROGRESS (1 << 9) /* aio in progress */ +#define BM_AIO_PREFETCH_PIN_BANKED (1 << 10) /* pinned when prefetch issued + ** and this pin is banked - i.e. + ** redeemable by the next use by same task + ** note that for any one buffer, a pin can be banked + ** by at most one process globally, + ** that is, only one process may bank a pin on the buffer + ** and it may do so only once (may not be stacked) + */ + +/********* +for asynchronous aio-read prefetching, two golden rules concerning buffer pinning and buffer-header flags must be observed: + R1. a buffer marked as BM_AIO_IN_PROGRESS must be pinned by at least one backend + R2. a buffer marked as BM_AIO_PREFETCH_PIN_BANKED must be pinned by the backend identified by + (buf->flags & BM_AIO_IN_PROGRESS) ? ( ((BAiocbAnchr->BufferAiocbs)+(FREENEXT_BAIOCB_ORIGIN - buf->freeNext))->pidOfAio : (-(buf->freeNext)) +*********/ typedef bits16 BufFlags; @@ -140,17 +157,83 @@ typedef struct sbufdesc BufFlags flags; /* see bit definitions above */ uint16 usage_count; /* usage counter for clock sweep code */ unsigned refcount; /* # of backends holding pins on buffer */ - int wait_backend_pid; /* backend PID of pin-count waiter */ + int wait_backend_pid; /* if flags & BM_PIN_COUNT_WAITER + ** then backend PID of pin-count waiter + ** else not set + */ slock_t buf_hdr_lock; /* protects the above fields */ int buf_id; /* buffer's index number (from 0) */ - int freeNext; /* link in freelist chain */ + int volatile freeNext; /* overloaded and much-abused field : + ** EITHER + ** if >= 0 + ** then link in freelist chain + ** OR + ** if < 0 + ** then EITHER + ** if flags & BM_AIO_IN_PROGRESS + ** then negative of (the index of the aiocb in the BufferAiocbs array + 3) + ** else if flags & BM_AIO_PREFETCH_PIN_BANKED + ** then -(pid of task that issued aio_read and pinned buffer) + ** else one of the special values -1 or -2 listed below + */ LWLock *io_in_progress_lock; /* to wait for I/O to complete */ LWLock *content_lock; /* to lock access to buffer contents */ } BufferDesc; +/* structures for control blocks for our implementation of async io */ + +/* if USE_AIO_ATOMIC_BUILTIN_COMP_SWAP is not defined, the following struct is not put into use at runtime +** but it is easier to let the compiler find the definition but hide the reference to aiocb +** which is the only type it would not understand +*/ + +struct BufferAiocb { + struct BufferAiocb volatile * volatile BAiocbnext; /* next free entry or value of BAIOCB_OCCUPIED means in use */ + struct sbufdesc volatile * volatile BAiocbbufh; /* there can be at most one BufferDesc marked BM_AIO_IN_PROGRESS + ** and using this BufferAiocb - + ** if there is one, BAiocbbufh points to it, else BAiocbbufh is zero + ** NOTE BAiocbbufh should be zero for every BufferAiocb on the free list + */ +#ifdef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + struct aiocb volatile BAiocbthis; /* the aio library's control block for one async io */ +#endif /* USE_AIO_ATOMIC_BUILTIN_COMP_SWAP */ + int volatile BAiocbDependentCount; /* count of tasks who depend on this BufferAiocb + ** in the sense that they are waiting for io completion. + ** only a Dependent may move the BufferAiocb onto the freelist + ** and only when that Dependent is the *only* Dependent (count == 1) + ** BAiocbDependentCount is protected by bufferheader spinlock + ** and must be updated only when that spinlock is held + */ + pid_t volatile pidOfAio; /* pid of backend who issued an aio_read using this BAiocb - + ** this backend must have pinned the associated buffer. + */ +}; + +#define BAIOCB_OCCUPIED 0x75f1 /* distinct indicator of a BufferAiocb.BAiocbnext that is NOT on free list */ +#define BAIOCB_FREE 0x7b9d /* distinct indicator of a BufferAiocb.BAiocbbufh that IS on free list */ + +struct BAiocbAnchor { /* anchor for all control blocks pertaining to aio */ + volatile struct BufferAiocb* BufferAiocbs; /* aiocbs ... */ + volatile struct BufferAiocb* volatile FreeBAiocbs; /* ... and their free list */ +}; + +/* values for BufCheckAsync input and retcode */ +#define BUF_INTENTION_WANT 1 /* wants the buffer, wait for in-progress aio and then pin */ +#define BUF_INTENTION_REJECT_KEEP_PIN -1 /* pin already held, do not unpin */ +#define BUF_INTENTION_REJECT_OBTAIN_PIN -2 /* obtain pin, caller wants it for same buffer */ +#define BUF_INTENTION_REJECT_FORGET -3 /* unpin and tell resource owner to forget */ +#define BUF_INTENTION_REJECT_NOADJUST -4 /* unpin and call ResourceOwnerForgetBuffer */ +#define BUF_INTENTION_REJECT_UNBANK -5 /* unpin only if pin banked by caller */ + +#define BUF_INTENT_RC_CHANGED_TAG -5 +#define BUF_INTENT_RC_BADPAGE -4 +#define BUF_INTENT_RC_INVALID_AIO -3 /* invalid and aio was in progress */ +#define BUF_INTENT_RC_INVALID_NO_AIO -1 /* invalid and no aio was in progress */ +#define BUF_INTENT_RC_VALID 1 + #define BufferDescriptorGetBuffer(bdesc) ((bdesc)->buf_id + 1) /* @@ -159,6 +242,7 @@ typedef struct sbufdesc */ #define FREENEXT_END_OF_LIST (-1) #define FREENEXT_NOT_IN_LIST (-2) +#define FREENEXT_BAIOCB_ORIGIN (-3) /* * Macros for acquiring/releasing a shared buffer header's spinlock. --- src/include/catalog/pg_am.h.orig 2014-05-28 08:29:09.446829236 -0400 +++ src/include/catalog/pg_am.h 2014-05-28 16:45:43.926511362 -0400 @@ -67,6 +67,7 @@ CATALOG(pg_am,2601) regproc amcanreturn; /* can indexscan return IndexTuples? */ regproc amcostestimate; /* estimate cost of an indexscan */ regproc amoptions; /* parse AM-specific parameters */ + regproc ampeeknexttuple; /* peek at the next tuple different from any blocknum in pfch_list without reading a new index page */ } FormData_pg_am; /* ---------------- @@ -117,19 +118,19 @@ typedef FormData_pg_am *Form_pg_am; * ---------------- */ -DATA(insert OID = 403 ( btree 5 2 t f t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions )); +DATA(insert OID = 403 ( btree 5 2 t f t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions btpeeknexttuple )); DESCR("b-tree index access method"); #define BTREE_AM_OID 403 -DATA(insert OID = 405 ( hash 1 1 f f t f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions )); +DATA(insert OID = 405 ( hash 1 1 f f t f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions - )); DESCR("hash index access method"); #define HASH_AM_OID 405 -DATA(insert OID = 783 ( gist 0 8 f t f f t t f t t t f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup - gistcostestimate gistoptions )); +DATA(insert OID = 783 ( gist 0 8 f t f f t t f t t t f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup - gistcostestimate gistoptions - )); DESCR("GiST index access method"); #define GIST_AM_OID 783 -DATA(insert OID = 2742 ( gin 0 6 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions )); +DATA(insert OID = 2742 ( gin 0 6 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions - )); DESCR("GIN index access method"); #define GIN_AM_OID 2742 -DATA(insert OID = 4000 ( spgist 0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions )); +DATA(insert OID = 4000 ( spgist 0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions - )); DESCR("SP-GiST index access method"); #define SPGIST_AM_OID 4000 --- src/include/catalog/pg_proc.h.orig 2014-05-28 08:29:09.450829234 -0400 +++ src/include/catalog/pg_proc.h 2014-05-28 16:45:43.966511524 -0400 @@ -536,6 +536,12 @@ DESCR("convert float4 to int4"); DATA(insert OID = 330 ( btgettuple PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 16 "2281 2281" _null_ _null_ _null_ _null_ btgettuple _null_ _null_ _null_ )); DESCR("btree(internal)"); + +#ifndef AVOID_CATALOG_MIGRATION_FOR_ASYNCIO +DATA(insert OID = 3251 ( btpeeknexttuple PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "2281" _null_ _null_ _null_ _null_ btpeeknexttuple _null_ _null_ _null_ )); +DESCR("btree(internal)"); +#endif /* not AVOID_CATALOG_MIGRATION_FOR_ASYNCIO */ + DATA(insert OID = 636 ( btgetbitmap PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 20 "2281 2281" _null_ _null_ _null_ _null_ btgetbitmap _null_ _null_ _null_ )); DESCR("btree(internal)"); DATA(insert OID = 331 ( btinsert PGNSP PGUID 12 1 0 0 0 f f f f t f v 6 0 16 "2281 2281 2281 2281 2281 2281" _null_ _null_ _null_ _null_ btinsert _null_ _null_ _null_ )); --- src/include/pg_config_manual.h.orig 2014-05-28 08:29:09.458829229 -0400 +++ src/include/pg_config_manual.h 2014-05-28 16:45:43.994511636 -0400 @@ -138,9 +138,11 @@ /* * USE_PREFETCH code should be compiled only if we have a way to implement * prefetching. (This is decoupled from USE_POSIX_FADVISE because there - * might in future be support for alternative low-level prefetch APIs.) + * might in future be support for alternative low-level prefetch APIs -- + * -- update October 2013 -- now there is such a new prefetch capability -- + * async_io into postgres buffers - configuration parameter max_async_io_threads) */ -#ifdef USE_POSIX_FADVISE +#if defined(USE_POSIX_FADVISE) || defined(USE_AIO_ATOMIC_BUILTIN_COMP_SWAP) #define USE_PREFETCH #endif --- src/include/access/nbtree.h.orig 2014-05-28 08:29:09.442829238 -0400 +++ src/include/access/nbtree.h 2014-05-28 16:45:44.022511749 -0400 @@ -19,6 +19,7 @@ #include "access/sdir.h" #include "access/xlog.h" #include "access/xlogutils.h" +#include "access/relscan.h" #include "catalog/pg_index.h" /* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */ @@ -524,6 +525,7 @@ typedef struct BTScanPosData Buffer buf; /* if valid, the buffer is pinned */ BlockNumber nextPage; /* page's right link when we scanned it */ + BlockNumber prevPage; /* page's left link when we scanned it */ /* * moreLeft and moreRight track whether we think there may be matching @@ -603,6 +605,15 @@ typedef struct BTScanOpaqueData */ int markItemIndex; /* itemIndex, or -1 if not valid */ + /* prefetch logic state */ + unsigned int backSeqRun; /* number of back-sequential pages in a run */ + BlockNumber backSeqPos; /* blkid last prefetched in back-sequential + runs */ + BlockNumber lastHeapPrefetchBlkno; /* blkid last prefetched from heap */ + int prefetchItemIndex; /* item index within currPos last + fetched by heap prefetch */ + int prefetchBlockCount; /* number of prefetched heap blocks */ + /* keep these last in struct for efficiency */ BTScanPosData currPos; /* current position data */ BTScanPosData markPos; /* marked position, if any */ @@ -655,7 +666,11 @@ extern Buffer _bt_getroot(Relation rel, extern Buffer _bt_gettrueroot(Relation rel); extern int _bt_getrootheight(Relation rel); extern void _bt_checkpage(Relation rel, Buffer buf); -extern Buffer _bt_getbuf(Relation rel, BlockNumber blkno, int access); +extern Buffer _bt_getbuf(Relation rel, BlockNumber blkno, int access , struct pfch_index_pagelist* pfch_index_page_list); +extern void _bt_prefetchbuf(Relation rel, BlockNumber blkno , struct pfch_index_pagelist** pfch_index_page_list_P); +extern struct pfch_index_item* _bt_find_block(BlockNumber blkno , struct pfch_index_pagelist* pfch_index_page_list); +extern int _bt_add_block(BlockNumber blkno , struct pfch_index_pagelist** pfch_index_page_list_P , uint32 discard_status); +extern void _bt_subtract_block(BlockNumber blkno , struct pfch_index_pagelist* pfch_index_page_list); extern Buffer _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access); extern void _bt_relbuf(Relation rel, Buffer buf); --- src/include/access/heapam.h.orig 2014-05-28 08:29:09.442829238 -0400 +++ src/include/access/heapam.h 2014-05-28 16:45:44.046511845 -0400 @@ -175,7 +175,7 @@ extern void heap_page_prune_execute(Buff extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets); /* in heap/syncscan.c */ -extern void ss_report_location(Relation rel, BlockNumber location); +extern void ss_report_location(Relation rel, BlockNumber location , BlockNumber *prefetchHWMp); extern BlockNumber ss_get_location(Relation rel, BlockNumber relnblocks); extern void SyncScanShmemInit(void); extern Size SyncScanShmemSize(void); --- src/include/access/relscan.h.orig 2014-05-28 08:29:09.446829236 -0400 +++ src/include/access/relscan.h 2014-05-28 16:45:44.066511925 -0400 @@ -44,6 +44,24 @@ typedef struct HeapScanDescData bool rs_inited; /* false = scan not init'd yet */ HeapTupleData rs_ctup; /* current tuple in scan, if any */ BlockNumber rs_cblock; /* current block # in scan, if any */ +#ifdef USE_PREFETCH + int rs_prefetch_target; /* target distance (numblocks) for prefetch to reach beyond main scan */ + BlockNumber rs_pfchblock; /* next block # to be prefetched in scan, if any */ + + /* Unread_Pfetched is a "mostly" circular list of recently prefetched blocknos of size target_prefetch_pages + ** the index of the first unread block is held in Unread_Pfetched_next + ** and is advanced when a block is read + ** the count of number of unread blocks is in Unread_Pfetched_count (and this subset can wrap around) + ** "mostly" means that there may be gaps caused by storing entries for blocks which do not need to be discarded - + ** these are indicated by blockno = InvalidBlockNumber, and these slots are reused when found. + */ + BlockNumber *rs_Unread_Pfetched_base; /* where the list of prefetched but unread blocknos starts */ + unsigned int rs_Unread_Pfetched_next; /* where the next unread blockno probably is relative to start -- + ** this is only a hint which may be temporarily stale. + */ + unsigned int rs_Unread_Pfetched_count; /* number of valid unread blocknos in list */ +#endif /* USE_PREFETCH */ + Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ ItemPointerData rs_mctid; /* marked scan position, if any */ @@ -55,6 +73,27 @@ typedef struct HeapScanDescData OffsetNumber rs_vistuples[MaxHeapTuplesPerPage]; /* their offsets */ } HeapScanDescData; +/* pfch_index_items track prefetched and unread index pages - chunks of blocknumbers are chained in singly-linked list from scan->pfch_index_item_list */ +struct pfch_index_item { /* index-relation BlockIds which we will/have prefetched */ + BlockNumber pfch_blocknum; /* Blocknum which we will/have prefetched */ + uint32 pfch_discard; /* whether block is to be discarded when scan is closed */ +}; + +struct pfch_block_item { + struct BlockIdData pfch_blockid; /* BlockId which we will/have prefetched */ + uint32 pfch_discard; /* whether block is to be discarded when scan is closed */ +}; + +/* pfch_index_page_items track prefetched and unread index pages - +** chunks of blocknumbers are chained backwards (newest first, oldest last) +** in singly-linked list from scan->pfch_index_item_list +*/ +struct pfch_index_pagelist { /* index-relation BlockIds which we will/have prefetched */ + struct pfch_index_pagelist* pfch_index_pagelist_next; /* pointer to next chunk if any */ + unsigned int pfch_index_item_count; /* number of used entries in this chunk */ + struct pfch_index_item pfch_indexid[1]; /* in-line list of Blocknums which we will/have prefetched and whether to be discarded */ +}; + /* * We use the same IndexScanDescData structure for both amgettuple-based * and amgetbitmap-based index scans. Some fields are only relevant in @@ -75,8 +114,15 @@ typedef struct IndexScanDescData /* signaling to index AM about killing index tuples */ bool kill_prior_tuple; /* last-returned tuple is dead */ bool ignore_killed_tuples; /* do not return killed entries */ - bool xactStartedInRecovery; /* prevents killing/seeing killed - * tuples */ + bool xactStartedInRecovery; /* prevents killing/seeing killed tuples */ + +#ifdef USE_PREFETCH + struct pfch_index_pagelist* pfch_index_page_list; /* array of index-relation BlockIds which we will/have prefetched */ + struct pfch_block_item* pfch_block_item_list; /* array of heap-relation BlockIds which we will/have prefetched */ + unsigned short int pfch_used; /* number of used elements in BlockIdData array */ + unsigned short int pfch_next; /* next element for prefetch in BlockIdData array */ + int do_prefetch; /* should I prefetch ? */ +#endif /* USE_PREFETCH */ /* index access method's private state */ void *opaque; /* access-method-specific info */ @@ -91,6 +137,10 @@ typedef struct IndexScanDescData /* NB: if xs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ bool xs_recheck; /* T means scan keys must be rechecked */ + /* heap fetch statistics for read-ahead logic */ + unsigned int heap_tids_seen; + unsigned int heap_tids_fetched; + /* state data for traversing HOT chains in index_getnext */ bool xs_continue_hot; /* T if must keep walking HOT chain */ } IndexScanDescData; --- src/include/nodes/tidbitmap.h.orig 2014-05-28 08:29:09.458829229 -0400 +++ src/include/nodes/tidbitmap.h 2014-05-28 16:45:44.106512088 -0400 @@ -41,6 +41,16 @@ typedef struct int ntuples; /* -1 indicates lossy result */ bool recheck; /* should the tuples be rechecked? */ /* Note: recheck is always true if ntuples < 0 */ +#ifdef USE_PREFETCH + /* Unread_Pfetched is a circular list of recently prefetched blocknos of size target_prefetch_pages + ** the index of the first unread block is held in Unread_Pfetched_next + ** and is advanced when a block is read + ** the count of number of unread blocks is in Unread_Pfetched_count (and this subset can wrap around) + */ + BlockNumber *Unread_Pfetched_base; /* where the list of prefetched but unread blocknos starts */ + unsigned int Unread_Pfetched_next; /* where the next unread blockno is relative to start */ + unsigned int Unread_Pfetched_count; /* number of valid unread blocknos in list */ +#endif /* USE_PREFETCH */ OffsetNumber offsets[1]; /* VARIABLE LENGTH ARRAY */ } TBMIterateResult; /* VARIABLE LENGTH STRUCT */ @@ -62,5 +72,8 @@ extern bool tbm_is_empty(const TIDBitmap extern TBMIterator *tbm_begin_iterate(TIDBitmap *tbm); extern TBMIterateResult *tbm_iterate(TBMIterator *iterator); extern void tbm_end_iterate(TBMIterator *iterator); - +extern void tbm_zero(TBMIterator *iterator); /* zero list of prefetched and unread blocknos */ +extern void tbm_add(TBMIterator *iterator, BlockNumber blockno); /* add this blockno to list of prefetched and unread blocknos */ +extern void tbm_subtract(TBMIterator *iterator, BlockNumber blockno); /* remove this blockno from list of prefetched and unread blocknos */ +extern TBMIterateResult *tbm_locate_IterateResult(TBMIterator *iterator); /* locate the TBMIterateResult of an iterator */ #endif /* TIDBITMAP_H */ --- src/include/utils/rel.h.orig 2014-05-28 08:29:09.466829225 -0400 +++ src/include/utils/rel.h 2014-05-28 16:45:44.134512200 -0400 @@ -61,6 +61,7 @@ typedef struct RelationAmInfo FmgrInfo ammarkpos; FmgrInfo amrestrpos; FmgrInfo amcanreturn; + FmgrInfo ampeeknexttuple; /* peek at the next tuple different from any blocknum in pfch_list without reading a new index page */ } RelationAmInfo; --- src/include/pg_config.h.in.orig 2014-05-28 08:29:09.458829229 -0400 +++ src/include/pg_config.h.in 2014-05-28 16:45:44.150512266 -0400 @@ -1,4 +1,4 @@ -/* src/include/pg_config.h.in. Generated from configure.in by autoheader. */ +/* src/include/pg_config.h.in. Generated from - by autoheader. */ /* Define to the type of arg 1 of 'accept' */ #undef ACCEPT_TYPE_ARG1 @@ -748,6 +748,10 @@ /* Define to the appropriate snprintf format for unsigned 64-bit ints. */ #undef UINT64_FORMAT +/* Define to select librt-style async io and the gcc atomic compare_and_swap. + */ +#undef USE_AIO_ATOMIC_BUILTIN_COMP_SWAP + /* Define to 1 to build with assertion checks. (--enable-cassert) */ #undef USE_ASSERT_CHECKING