From f138cbab018b104e416d23175a38141d8827232d Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 22:33:30 -0400
Subject: [PATCH v2.1 13/20] aio: Implement smgr/md.c aio methods

---
 src/include/storage/aio.h             |  17 +-
 src/include/storage/fd.h              |   6 +
 src/include/storage/md.h              |  12 ++
 src/include/storage/smgr.h            |  21 +++
 src/backend/storage/aio/aio_subject.c |   4 +
 src/backend/storage/file/fd.c         |  68 ++++++++
 src/backend/storage/smgr/md.c         | 217 ++++++++++++++++++++++++++
 src/backend/storage/smgr/smgr.c       |  91 +++++++++++
 8 files changed, 434 insertions(+), 2 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index b8c743548c9..07bf92a6b7a 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -57,9 +57,10 @@ typedef enum PgAioSubjectID
 {
 	/* intentionally the zero value, to help catch zeroed memory etc */
 	ASI_INVALID = 0,
+	ASI_SMGR,
 } PgAioSubjectID;
 
-#define ASI_COUNT (ASI_INVALID + 1)
+#define ASI_COUNT (ASI_SMGR + 1)
 
 /*
  * Flags for an IO that can be set with pgaio_io_set_flag().
@@ -90,7 +91,8 @@ typedef enum PgAioHandleFlags
  */
 typedef enum PgAioHandleSharedCallbackID
 {
-	ASC_PLACEHOLDER /* empty enums are invalid */ ,
+	ASC_MD_READV,
+	ASC_MD_WRITEV,
 } PgAioHandleSharedCallbackID;
 
 
@@ -139,6 +141,17 @@ typedef union
 
 typedef union PgAioSubjectData
 {
+	struct
+	{
+		RelFileLocator rlocator;	/* physical relation identifier */
+		BlockNumber blockNum;	/* blknum relative to begin of reln */
+		int			nblocks;
+		ForkNumber	forkNum:8;	/* don't waste 4 byte for four values */
+		bool		is_temp;	/* proc can be inferred by owning AIO */
+		bool		release_lock;
+		int8		mode;
+	}			smgr;
+
 	/* just as an example placeholder for later */
 	struct
 	{
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 1456ab383a4..e993e1b671f 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -101,6 +101,8 @@ extern PGDLLIMPORT int max_safe_fds;
  * prototypes for functions in fd.c
  */
 
+struct PgAioHandle;
+
 /* Operations on virtual Files --- equivalent to Unix kernel file ops */
 extern File PathNameOpenFile(const char *fileName, int fileFlags);
 extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
@@ -109,6 +111,10 @@ extern void FileClose(File file);
 extern int	FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int	FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
+extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int	FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int	FileSync(File file, uint32 wait_event_info);
 extern int	FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern int	FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index b72293c79a5..ede77695853 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -19,6 +19,10 @@
 #include "storage/smgr.h"
 #include "storage/sync.h"
 
+struct PgAioHandleSharedCallbacks;
+extern const struct PgAioHandleSharedCallbacks aio_md_readv_cb;
+extern const struct PgAioHandleSharedCallbacks aio_md_writev_cb;
+
 /* md storage manager functionality */
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
@@ -36,9 +40,16 @@ extern uint32 mdmaxcombine(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum);
 extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
+extern void mdstartreadv(struct PgAioHandle *ioh,
+						 SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+						 void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
 					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+extern void mdstartwritev(struct PgAioHandle *ioh,
+						  SMgrRelation reln, ForkNumber forknum,
+						  BlockNumber blocknum,
+						  const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -46,6 +57,7 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
+extern int	mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 899d0d681c5..66730bc24fa 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,11 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+struct PgAioHandle;
+struct PgAioSubjectInfo;
+
+extern const struct PgAioSubjectInfo aio_smgr_subject_info;
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -97,10 +102,19 @@ extern uint32 smgrmaxcombine(SMgrRelation reln, ForkNumber forknum,
 extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 					  BlockNumber blocknum,
 					  void **buffers, BlockNumber nblocks);
+extern void smgrstartreadv(struct PgAioHandle *ioh,
+						   SMgrRelation reln, ForkNumber forknum,
+						   BlockNumber blocknum,
+						   void **buffers, BlockNumber nblocks);
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffers, BlockNumber nblocks,
 					   bool skipFsync);
+extern void smgrstartwritev(struct PgAioHandle *ioh,
+							SMgrRelation reln, ForkNumber forknum,
+							BlockNumber blocknum,
+							const void **buffers, BlockNumber nblocks,
+							bool skipFsync);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -109,6 +123,7 @@ extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
 						 int nforks, BlockNumber *nblocks);
 extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
 extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum);
+extern int	smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 extern void AtEOXact_SMgr(void);
 extern bool ProcessBarrierSmgrRelease(void);
 
@@ -126,4 +141,10 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+extern void pgaio_io_set_subject_smgr(struct PgAioHandle *ioh,
+									  SMgrRelationData *smgr,
+									  ForkNumber forknum,
+									  BlockNumber blocknum,
+									  int nblocks);
+
 #endif							/* SMGR_H */
diff --git a/src/backend/storage/aio/aio_subject.c b/src/backend/storage/aio/aio_subject.c
index 51ee3b3969d..14be8432f5a 100644
--- a/src/backend/storage/aio/aio_subject.c
+++ b/src/backend/storage/aio/aio_subject.c
@@ -20,6 +20,7 @@
 #include "storage/aio_internal.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
+#include "storage/md.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 
@@ -28,9 +29,12 @@ static const PgAioSubjectInfo *aio_subject_info[] = {
 	[ASI_INVALID] = &(PgAioSubjectInfo) {
 		.name = "invalid",
 	},
+	[ASI_SMGR] = &aio_smgr_subject_info,
 };
 
 static const PgAioHandleSharedCallbacks *aio_shared_cbs[] = {
+	[ASC_MD_READV] = &aio_md_readv_cb,
+	[ASC_MD_WRITEV] = &aio_md_writev_cb,
 };
 
 
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index ec1505802b9..f5ff554f946 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -95,6 +95,7 @@
 #include "pgstat.h"
 #include "portability/mem.h"
 #include "postmaster/startup.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
@@ -1295,6 +1296,8 @@ LruDelete(File file)
 
 	vfdP = &VfdCache[file];
 
+	pgaio_closing_fd(vfdP->fd);
+
 	/*
 	 * Close the file.  We aren't expecting this to fail; if it does, better
 	 * to leak the FD than to mess up our internal state.
@@ -1988,6 +1991,8 @@ FileClose(File file)
 
 	if (!FileIsNotOpen(file))
 	{
+		pgaio_closing_fd(vfdP->fd);
+
 		/* close the file */
 		if (close(vfdP->fd) != 0)
 		{
@@ -2211,6 +2216,32 @@ retry:
 	return returnCode;
 }
 
+int
+FileStartReadV(struct PgAioHandle *ioh, File file,
+			   int iovcnt, off_t offset,
+			   uint32 wait_event_info)
+{
+	int			returnCode;
+	Vfd		   *vfdP;
+
+	Assert(FileIsValid(file));
+
+	DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d",
+			   file, VfdCache[file].fileName,
+			   (int64) offset,
+			   iovcnt));
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
+	vfdP = &VfdCache[file];
+
+	pgaio_io_prep_readv(ioh, vfdP->fd, iovcnt, offset);
+
+	return 0;
+}
+
 ssize_t
 FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
 		   uint32 wait_event_info)
@@ -2316,6 +2347,34 @@ retry:
 	return returnCode;
 }
 
+int
+FileStartWriteV(struct PgAioHandle *ioh, File file,
+				int iovcnt, off_t offset,
+				uint32 wait_event_info)
+{
+	int			returnCode;
+	Vfd		   *vfdP;
+
+	Assert(FileIsValid(file));
+
+	DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d",
+			   file, VfdCache[file].fileName,
+			   (int64) offset,
+			   iovcnt));
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
+	vfdP = &VfdCache[file];
+
+	/* FIXME: think about / reimplement  temp_file_limit */
+
+	pgaio_io_prep_writev(ioh, vfdP->fd, iovcnt, offset);
+
+	return 0;
+}
+
 int
 FileSync(File file, uint32 wait_event_info)
 {
@@ -2499,6 +2558,12 @@ FilePathName(File file)
 int
 FileGetRawDesc(File file)
 {
+	int			returnCode;
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
 	Assert(FileIsValid(file));
 	return VfdCache[file].fd;
 }
@@ -2779,6 +2844,7 @@ FreeDesc(AllocateDesc *desc)
 			result = closedir(desc->desc.dir);
 			break;
 		case AllocateDescRawFD:
+			pgaio_closing_fd(desc->desc.fd);
 			result = close(desc->desc.fd);
 			break;
 		default:
@@ -2847,6 +2913,8 @@ CloseTransientFile(int fd)
 	/* Only get here if someone passes us a file not in allocatedDescs */
 	elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
 
+	pgaio_closing_fd(fd);
+
 	return close(fd);
 }
 
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 6cd81a61faa..f96308490d9 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -31,6 +31,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/md.h"
@@ -931,6 +932,49 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+void
+mdstartreadv(PgAioHandle *ioh,
+			 SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			 void **buffers, BlockNumber nblocks)
+{
+	off_t		seekpos;
+	MdfdVec    *v;
+	BlockNumber nblocks_this_segment;
+	struct iovec *iov;
+	int			iovcnt;
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	nblocks_this_segment =
+		Min(nblocks,
+			RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+	if (nblocks_this_segment != nblocks)
+		elog(ERROR, "read crossing segment boundary");
+
+	iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+	Assert(nblocks <= iovcnt);
+
+	iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
+
+	Assert(iovcnt <= nblocks_this_segment);
+
+	pgaio_io_set_subject_smgr(ioh,
+							  reln,
+							  forknum,
+							  blocknum,
+							  nblocks);
+	pgaio_io_add_shared_cb(ioh, ASC_MD_READV);
+
+	FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
+}
+
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
  *
@@ -1036,6 +1080,49 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+void
+mdstartwritev(PgAioHandle *ioh,
+			  SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			  const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	off_t		seekpos;
+	MdfdVec    *v;
+	BlockNumber nblocks_this_segment;
+	struct iovec *iov;
+	int			iovcnt;
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	nblocks_this_segment =
+		Min(nblocks,
+			RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+	if (nblocks_this_segment != nblocks)
+		elog(ERROR, "write crossing segment boundary");
+
+	iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+	Assert(nblocks <= iovcnt);
+
+	iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment);
+
+	Assert(iovcnt <= nblocks_this_segment);
+
+	pgaio_io_set_subject_smgr(ioh,
+							  reln,
+							  forknum,
+							  blocknum,
+							  nblocks);
+	pgaio_io_add_shared_cb(ioh, ASC_MD_WRITEV);
+
+	FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
+}
+
 
 /*
  * mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1357,6 +1444,21 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 	}
 }
 
+int
+mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+	MdfdVec    *v = mdopenfork(reln, forknum, EXTENSION_FAIL);
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL);
+
+	*off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	return FileGetRawDesc(v->mdfd_vfd);
+}
+
 /*
  * register_dirty_segment() -- Mark a relation segment as needing fsync
  *
@@ -1832,3 +1934,118 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
 	 */
 	return ftag->rlocator.dbOid == candidate->rlocator.dbOid;
 }
+
+
+
+static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result);
+static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result);
+static void md_readv_error(PgAioResult result, const PgAioSubjectData *subject_data, int elevel);
+
+const struct PgAioHandleSharedCallbacks aio_md_readv_cb = {
+	.complete = md_readv_complete,
+	.error = md_readv_error,
+};
+
+const struct PgAioHandleSharedCallbacks aio_md_writev_cb = {
+	.complete = md_writev_complete,
+};
+
+static PgAioResult
+md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioSubjectData *sd = pgaio_io_get_subject_data(ioh);
+	PgAioResult result = prior_result;
+
+	elog(DEBUG3, "%s: %d %d", __func__, prior_result.status, prior_result.result);
+
+	if (prior_result.result < 0)
+	{
+		result.status = ARS_ERROR;
+		result.id = ASC_MD_READV;
+		result.error_data = -prior_result.result;
+		result.result = 0;
+
+		md_readv_error(result, sd, LOG);
+
+		return result;
+	}
+
+	result.result /= BLCKSZ;
+
+	if (result.result == 0)
+	{
+		/* consider 0 blocks read a failure */
+		result.status = ARS_ERROR;
+		result.id = ASC_MD_READV;
+		result.error_data = 0;
+
+		md_readv_error(result, sd, LOG);
+	}
+
+	if (result.status != ARS_ERROR &&
+		result.result < sd->smgr.nblocks)
+	{
+		/* partial reads should be retried at upper level */
+		result.id = ASC_MD_READV;
+		result.status = ARS_PARTIAL;
+	}
+
+	/* AFIXME: post-read portion of mdreadv() */
+
+	return result;
+}
+
+static void
+md_readv_error(PgAioResult result, const PgAioSubjectData *subject_data, int elevel)
+{
+	MemoryContext oldContext = CurrentMemoryContext;
+
+	/* AFIXME: */
+	oldContext = MemoryContextSwitchTo(ErrorContext);
+
+	if (result.error_data != 0)
+	{
+		errno = result.error_data;	/* for errcode_for_file_access() */
+
+		ereport(elevel,
+				errcode_for_file_access(),
+				errmsg("could not read blocks %u..%u in file \"%s\": %m",
+					   subject_data->smgr.blockNum,
+					   subject_data->smgr.blockNum + subject_data->smgr.nblocks,
+					   relpathperm(subject_data->smgr.rlocator, subject_data->smgr.forkNum)
+					   )
+			);
+	}
+	else
+	{
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
+					   subject_data->smgr.blockNum,
+					   subject_data->smgr.blockNum + subject_data->smgr.nblocks - 1,
+					   relpathperm(subject_data->smgr.rlocator, subject_data->smgr.forkNum),
+					   result.result * (size_t) BLCKSZ,
+					   subject_data->smgr.nblocks * (size_t) BLCKSZ
+					   )
+			);
+	}
+
+	MemoryContextSwitchTo(oldContext);
+}
+
+
+static PgAioResult
+md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	elog(DEBUG3, "%s: %d %d", __func__, prior_result.status, prior_result.result);
+
+	if (prior_result.status == ARS_ERROR)
+	{
+		/* AFIXME: complain */
+		return prior_result;
+	}
+
+	prior_result.result /= BLCKSZ;
+
+	return prior_result;
+}
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index ee31db85eec..2dacb361a4f 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -53,6 +53,7 @@
 
 #include "access/xlogutils.h"
 #include "lib/ilist.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
 #include "storage/md.h"
@@ -93,10 +94,19 @@ typedef struct f_smgr
 	void		(*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
 							   BlockNumber blocknum,
 							   void **buffers, BlockNumber nblocks);
+	void		(*smgr_startreadv) (struct PgAioHandle *ioh,
+									SMgrRelation reln, ForkNumber forknum,
+									BlockNumber blocknum,
+									void **buffers, BlockNumber nblocks);
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
 								bool skipFsync);
+	void		(*smgr_startwritev) (struct PgAioHandle *ioh,
+									 SMgrRelation reln, ForkNumber forknum,
+									 BlockNumber blocknum,
+									 const void **buffers, BlockNumber nblocks,
+									 bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -104,6 +114,7 @@ typedef struct f_smgr
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
+	int			(*smgr_fd) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -121,12 +132,15 @@ static const f_smgr smgrsw[] = {
 		.smgr_prefetch = mdprefetch,
 		.smgr_maxcombine = mdmaxcombine,
 		.smgr_readv = mdreadv,
+		.smgr_startreadv = mdstartreadv,
 		.smgr_writev = mdwritev,
+		.smgr_startwritev = mdstartwritev,
 		.smgr_writeback = mdwriteback,
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
 		.smgr_registersync = mdregistersync,
+		.smgr_fd = mdfd,
 	}
 };
 
@@ -145,6 +159,14 @@ static void smgrshutdown(int code, Datum arg);
 static void smgrdestroy(SMgrRelation reln);
 
 
+static void smgr_aio_reopen(PgAioHandle *ioh);
+
+const struct PgAioSubjectInfo aio_smgr_subject_info = {
+	.name = "smgr",
+	.reopen = smgr_aio_reopen,
+};
+
+
 /*
  * smgrinit(), smgrshutdown() -- Initialize or shut down storage
  *								 managers.
@@ -620,6 +642,19 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 										nblocks);
 }
 
+/*
+ * FILL ME IN
+ */
+void
+smgrstartreadv(struct PgAioHandle *ioh,
+			   SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			   void **buffers, BlockNumber nblocks)
+{
+	smgrsw[reln->smgr_which].smgr_startreadv(ioh,
+											 reln, forknum, blocknum, buffers,
+											 nblocks);
+}
+
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
@@ -651,6 +686,16 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 										 buffers, nblocks, skipFsync);
 }
 
+void
+smgrstartwritev(struct PgAioHandle *ioh,
+				SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+				const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	smgrsw[reln->smgr_which].smgr_startwritev(ioh,
+											  reln, forknum, blocknum, buffers,
+											  nblocks, skipFsync);
+}
+
 /*
  * smgrwriteback() -- Trigger kernel writeback for the supplied range of
  *					   blocks.
@@ -807,6 +852,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+int
+smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+	return smgrsw[reln->smgr_which].smgr_fd(reln, forknum, blocknum, off);
+}
+
 /*
  * AtEOXact_SMgr
  *
@@ -835,3 +886,43 @@ ProcessBarrierSmgrRelease(void)
 	smgrreleaseall();
 	return true;
 }
+
+void
+pgaio_io_set_subject_smgr(PgAioHandle *ioh,
+						  struct SMgrRelationData *smgr,
+						  ForkNumber forknum,
+						  BlockNumber blocknum,
+						  int nblocks)
+{
+	PgAioSubjectData *sd = pgaio_io_get_subject_data(ioh);
+
+	pgaio_io_set_subject(ioh, ASI_SMGR);
+
+	/* backend is implied via IO owner */
+	sd->smgr.rlocator = smgr->smgr_rlocator.locator;
+	sd->smgr.forkNum = forknum;
+	sd->smgr.blockNum = blocknum;
+	sd->smgr.nblocks = nblocks;
+	sd->smgr.is_temp = SmgrIsTemp(smgr);
+	sd->smgr.release_lock = false;
+	sd->smgr.mode = RBM_NORMAL;
+}
+
+static void
+smgr_aio_reopen(PgAioHandle *ioh)
+{
+	PgAioSubjectData *sd = pgaio_io_get_subject_data(ioh);
+	PgAioOpData *od = pgaio_io_get_op_data(ioh);
+	SMgrRelation reln;
+	ProcNumber	procno;
+	uint32		off;
+
+	if (sd->smgr.is_temp)
+		procno = pgaio_io_get_owner(ioh);
+	else
+		procno = INVALID_PROC_NUMBER;
+
+	reln = smgropen(sd->smgr.rlocator, procno);
+	od->read.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
+	Assert(off == od->read.offset);
+}
-- 
2.45.2.827.g557ae147e6

