From a06407b19c8d168ea966e45c0e483b38d49ddc12 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Fri, 4 Mar 2022 14:48:39 -0500
Subject: [PATCH v6 1/4] Add unbuffered IO API

Wrap unbuffered extends and writes in a new API, directmgr.

When writing data outside of shared buffers, the backend must do a
series of steps to ensure the data is both durable and recoverable.

When writing or extending a page of data outside of shared buffers the
backend must log the write or extend (if table is WAL-logged), checksum
the page (if it is not empty), and write it out before moving on.

Additionally, the backend must fsync the page data to ensure it reaches
permanent storage since checkpointer is unaware of the buffer and could
move the Redo pointer past the associated WAL for this write/extend
before the data is safely on permanent storage.

This commit introduces no functional change. It replaces many current
callers of smgrimmedsync(), smgrextend(), and smgrwrite() with the
equivalent directmgr functions. Consolidating these steps makes IO
outside of shared buffers less error-prone.
---
 contrib/bloom/blinsert.c               | 25 ++++---
 src/backend/access/gist/gistbuild.c    | 50 +++++--------
 src/backend/access/heap/rewriteheap.c  | 71 ++++++-------------
 src/backend/access/nbtree/nbtree.c     | 26 ++++---
 src/backend/access/nbtree/nbtsort.c    | 73 ++++++++-----------
 src/backend/access/spgist/spginsert.c  | 41 +++++------
 src/backend/catalog/storage.c          | 29 ++------
 src/backend/storage/Makefile           |  2 +-
 src/backend/storage/direct/Makefile    | 17 +++++
 src/backend/storage/direct/directmgr.c | 98 ++++++++++++++++++++++++++
 src/include/storage/directmgr.h        | 53 ++++++++++++++
 11 files changed, 296 insertions(+), 189 deletions(-)
 create mode 100644 src/backend/storage/direct/Makefile
 create mode 100644 src/backend/storage/direct/directmgr.c
 create mode 100644 src/include/storage/directmgr.h

diff --git a/contrib/bloom/blinsert.c b/contrib/bloom/blinsert.c
index c94cf34e69..7954a17e2d 100644
--- a/contrib/bloom/blinsert.c
+++ b/contrib/bloom/blinsert.c
@@ -19,8 +19,8 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/indexfsm.h"
-#include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -164,6 +164,16 @@ void
 blbuildempty(Relation index)
 {
 	Page		metapage;
+	UnBufferedWriteState wstate;
+
+	/*
+	 * Though this is an unlogged relation, pass do_wal=true since the init
+	 * fork of an unlogged index must be wal-logged and fsync'd. This currently
+	 * has no effect, as unbuffered_write() does not do log_newpage()
+	 * internally. However, were this to be replaced with unbuffered_extend(),
+	 * do_wal must be true to ensure the data is logged and fsync'd.
+	 */
+	unbuffered_prep(&wstate, true);
 
 	/* Construct metapage. */
 	metapage = (Page) palloc(BLCKSZ);
@@ -176,18 +186,13 @@ blbuildempty(Relation index)
 	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
-	PageSetChecksumInplace(metapage, BLOOM_METAPAGE_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, BLOOM_METAPAGE_BLKNO,
-			  (char *) metapage, true);
+	unbuffered_write(&wstate, RelationGetSmgr(index), INIT_FORKNUM,
+			BLOOM_METAPAGE_BLKNO, metapage);
+
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				BLOOM_METAPAGE_BLKNO, metapage, true);
 
-	/*
-	 * An immediate sync is required even if we xlog'd the page, because the
-	 * write did not go through shared_buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
-	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, RelationGetSmgr(index), INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index e081e6571a..fc09938f80 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -43,6 +43,7 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -91,6 +92,7 @@ typedef struct
 
 	int64		indtuples;		/* number of tuples indexed */
 
+	UnBufferedWriteState ub_wstate;
 	/*
 	 * Extra data structures used during a buffering build. 'gfbb' contains
 	 * information related to managing the build buffers. 'parentMap' is a
@@ -410,13 +412,15 @@ gist_indexsortbuild(GISTBuildState *state)
 	state->pages_written = 0;
 	state->ready_num_pages = 0;
 
+	unbuffered_prep(&state->ub_wstate, RelationNeedsWAL(state->indexrel));
+
 	/*
 	 * Write an empty page as a placeholder for the root page. It will be
 	 * replaced with the real root page at the end.
 	 */
 	page = palloc0(BLCKSZ);
-	smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			   page, true);
+	unbuffered_extend(&state->ub_wstate, RelationGetSmgr(state->indexrel),
+			MAIN_FORKNUM, GIST_ROOT_BLKNO, page, true);
 	state->pages_allocated++;
 	state->pages_written++;
 
@@ -458,27 +462,19 @@ gist_indexsortbuild(GISTBuildState *state)
 
 	/* Write out the root */
 	PageSetLSN(levelstate->pages[0], GistBuildLSN);
-	PageSetChecksumInplace(levelstate->pages[0], GIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			  levelstate->pages[0], true);
+
+	unbuffered_write(&state->ub_wstate, RelationGetSmgr(state->indexrel),
+			MAIN_FORKNUM, GIST_ROOT_BLKNO, levelstate->pages[0]);
+
 	if (RelationNeedsWAL(state->indexrel))
 		log_newpage(&state->indexrel->rd_node, MAIN_FORKNUM, GIST_ROOT_BLKNO,
 					levelstate->pages[0], true);
 
+	unbuffered_finish(&state->ub_wstate, RelationGetSmgr(state->indexrel),
+			MAIN_FORKNUM);
+
 	pfree(levelstate->pages[0]);
 	pfree(levelstate);
-
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
-	if (RelationNeedsWAL(state->indexrel))
-		smgrimmedsync(RelationGetSmgr(state->indexrel), MAIN_FORKNUM);
 }
 
 /*
@@ -645,26 +641,18 @@ gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
 	if (state->ready_num_pages == 0)
 		return;
 
+	/* Currently, the blocks must be buffered in order. */
 	for (int i = 0; i < state->ready_num_pages; i++)
 	{
-		Page		page = state->ready_pages[i];
-		BlockNumber blkno = state->ready_blknos[i];
-
-		/* Currently, the blocks must be buffered in order. */
-		if (blkno != state->pages_written)
+		if (state->ready_blknos[i] != state->pages_written)
 			elog(ERROR, "unexpected block number to flush GiST sorting build");
-
-		PageSetLSN(page, GistBuildLSN);
-		PageSetChecksumInplace(page, blkno);
-		smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
-				   true);
-
 		state->pages_written++;
 	}
 
-	if (RelationNeedsWAL(state->indexrel))
-		log_newpages(&state->indexrel->rd_node, MAIN_FORKNUM, state->ready_num_pages,
-					 state->ready_blknos, state->ready_pages, true);
+	unbuffered_extend_range(&state->ub_wstate,
+			RelationGetSmgr(state->indexrel), MAIN_FORKNUM,
+			state->ready_num_pages, state->ready_blknos, state->ready_pages,
+			false, GistBuildLSN);
 
 	for (int i = 0; i < state->ready_num_pages; i++)
 		pfree(state->ready_pages[i]);
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 2a53826736..c8fa8bb27c 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -81,15 +81,15 @@
  * in the whole table.  Note that if we do fail halfway through a CLUSTER,
  * the old table is still valid, so failure is not catastrophic.
  *
- * We can't use the normal heap_insert function to insert into the new
- * heap, because heap_insert overwrites the visibility information.
- * We use a special-purpose raw_heap_insert function instead, which
- * is optimized for bulk inserting a lot of tuples, knowing that we have
- * exclusive access to the heap.  raw_heap_insert builds new pages in
- * local storage.  When a page is full, or at the end of the process,
- * we insert it to WAL as a single record and then write it to disk
- * directly through smgr.  Note, however, that any data sent to the new
- * heap's TOAST table will go through the normal bufmgr.
+ * We can't use the normal heap_insert function to insert into the new heap,
+ * because heap_insert overwrites the visibility information. We use a
+ * special-purpose raw_heap_insert function instead, which is optimized for
+ * bulk inserting a lot of tuples, knowing that we have exclusive access to the
+ * heap.  raw_heap_insert builds new pages in local storage.  When a page is
+ * full, or at the end of the process, we insert it to WAL as a single record
+ * and then write it to disk directly through directmgr.  Note, however, that
+ * any data sent to the new heap's TOAST table will go through the normal
+ * bufmgr.
  *
  *
  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
@@ -119,9 +119,9 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
-#include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -152,6 +152,7 @@ typedef struct RewriteStateData
 	HTAB	   *rs_old_new_tid_map; /* unmatched B tuples */
 	HTAB	   *rs_logical_mappings;	/* logical remapping files */
 	uint32		rs_num_rewrite_mappings;	/* # in memory mappings */
+	UnBufferedWriteState rs_unbuffered_wstate;
 }			RewriteStateData;
 
 /*
@@ -265,6 +266,10 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
 
+	unbuffered_prep(&state->rs_unbuffered_wstate,
+			RelationNeedsWAL(state->rs_new_rel));
+
+
 	/* Initialize hash tables used to track update chains */
 	hash_ctl.keysize = sizeof(TidHashKey);
 	hash_ctl.entrysize = sizeof(UnresolvedTupData);
@@ -317,28 +322,13 @@ end_heap_rewrite(RewriteState state)
 	/* Write the last page, if any */
 	if (state->rs_buffer_valid)
 	{
-		if (RelationNeedsWAL(state->rs_new_rel))
-			log_newpage(&state->rs_new_rel->rd_node,
-						MAIN_FORKNUM,
-						state->rs_blockno,
-						state->rs_buffer,
-						true);
-
-		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
-
-		smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-				   state->rs_blockno, (char *) state->rs_buffer, true);
+		unbuffered_extend(&state->rs_unbuffered_wstate,
+				RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
+				state->rs_blockno, state->rs_buffer, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is the same as in storage.c's RelationCopyStorage(): we're
-	 * writing data that's not in shared buffers, and so a CHECKPOINT
-	 * occurring during the rewriteheap operation won't have fsync'd data we
-	 * wrote before the checkpoint.
-	 */
-	if (RelationNeedsWAL(state->rs_new_rel))
-		smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
+	unbuffered_finish(&state->rs_unbuffered_wstate,
+			RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
 
 	logical_end_heap_rewrite(state);
 
@@ -676,24 +666,9 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			 * contains a tuple.  Hence, unlike RelationGetBufferForTuple(),
 			 * enforce saveFreeSpace unconditionally.
 			 */
-
-			/* XLOG stuff */
-			if (RelationNeedsWAL(state->rs_new_rel))
-				log_newpage(&state->rs_new_rel->rd_node,
-							MAIN_FORKNUM,
-							state->rs_blockno,
-							page,
-							true);
-
-			/*
-			 * Now write the page. We say skipFsync = true because there's no
-			 * need for smgr to schedule an fsync for this write; we'll do it
-			 * ourselves in end_heap_rewrite.
-			 */
-			PageSetChecksumInplace(page, state->rs_blockno);
-
-			smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-					   state->rs_blockno, (char *) page, true);
+			unbuffered_extend(&state->rs_unbuffered_wstate,
+					RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
+					state->rs_blockno, page, false);
 
 			state->rs_blockno++;
 			state->rs_buffer_valid = false;
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index c9b4964c1e..c7bf971917 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -30,10 +30,10 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/condition_variable.h"
+#include "storage/directmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
-#include "storage/smgr.h"
 #include "utils/builtins.h"
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
@@ -152,6 +152,16 @@ void
 btbuildempty(Relation index)
 {
 	Page		metapage;
+	UnBufferedWriteState wstate;
+
+	/*
+	 * Though this is an unlogged relation, pass do_wal=true since the init
+	 * fork of an unlogged index must be wal-logged and fsync'd. This currently
+	 * has no effect, as unbuffered_write() does not do log_newpage()
+	 * internally. However, were this to be replaced with unbuffered_extend(),
+	 * do_wal must be true to ensure the data is logged and fsync'd.
+	 */
+	unbuffered_prep(&wstate, true);
 
 	/* Construct metapage. */
 	metapage = (Page) palloc(BLCKSZ);
@@ -164,18 +174,12 @@ btbuildempty(Relation index)
 	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
-	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, BTREE_METAPAGE,
-			  (char *) metapage, true);
+	unbuffered_write(&wstate, RelationGetSmgr(index), INIT_FORKNUM,
+			BTREE_METAPAGE, metapage);
 	log_newpage(&RelationGetSmgr(index)->smgr_rnode.node, INIT_FORKNUM,
 				BTREE_METAPAGE, metapage, true);
 
-	/*
-	 * An immediate sync is required even if we xlog'd the page, because the
-	 * write did not go through shared_buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
-	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, RelationGetSmgr(index), INIT_FORKNUM);
 }
 
 /*
@@ -959,7 +963,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	 * delete some deletable tuples.  Hence, we must repeatedly check the
 	 * relation length.  We must acquire the relation-extension lock while
 	 * doing so to avoid a race condition: if someone else is extending the
-	 * relation, there is a window where bufmgr/smgr have created a new
+	 * relation, there is a window where bufmgr/directmgr have created a new
 	 * all-zero page but it hasn't yet been write-locked by _bt_getbuf(). If
 	 * we manage to scan such a page here, we'll improperly assume it can be
 	 * recycled.  Taking the lock synchronizes things enough to prevent a
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 8a19de2f66..e280253127 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -23,13 +23,13 @@
  * many upper pages if the keys are reasonable-size) without risking a lot of
  * cascading splits during early insertions.
  *
- * Formerly the index pages being built were kept in shared buffers, but
- * that is of no value (since other backends have no interest in them yet)
- * and it created locking problems for CHECKPOINT, because the upper-level
- * pages were held exclusive-locked for long periods.  Now we just build
- * the pages in local memory and smgrwrite or smgrextend them as we finish
- * them.  They will need to be re-read into shared buffers on first use after
- * the build finishes.
+ * Formerly the index pages being built were kept in shared buffers, but that
+ * is of no value (since other backends have no interest in them yet) and it
+ * created locking problems for CHECKPOINT, because the upper-level pages were
+ * held exclusive-locked for long periods.  Now we just build the pages in
+ * local memory and write or extend them with directmgr as we finish them.
+ * They will need to be re-read into shared buffers on first use after the
+ * build finishes.
  *
  * This code isn't concerned about the FSM at all. The caller is responsible
  * for initializing that.
@@ -57,7 +57,7 @@
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
-#include "storage/smgr.h"
+#include "storage/directmgr.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/rel.h"
 #include "utils/sortsupport.h"
@@ -256,6 +256,7 @@ typedef struct BTWriteState
 	BlockNumber btws_pages_alloced; /* # pages allocated */
 	BlockNumber btws_pages_written; /* # pages written out */
 	Page		btws_zeropage;	/* workspace for filling zeroes */
+	UnBufferedWriteState ub_wstate;
 } BTWriteState;
 
 
@@ -643,13 +644,6 @@ _bt_blnewpage(uint32 level)
 static void
 _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 {
-	/* XLOG stuff */
-	if (wstate->btws_use_wal)
-	{
-		/* We use the XLOG_FPI record type for this */
-		log_newpage(&wstate->index->rd_node, MAIN_FORKNUM, blkno, page, true);
-	}
-
 	/*
 	 * If we have to write pages nonsequentially, fill in the space with
 	 * zeroes until we come back and overwrite.  This is not logically
@@ -661,33 +655,33 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 	{
 		if (!wstate->btws_zeropage)
 			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
-		/* don't set checksum for all-zero page */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
-				   wstate->btws_pages_written++,
-				   (char *) wstate->btws_zeropage,
-				   true);
+
+		unbuffered_extend(&wstate->ub_wstate, RelationGetSmgr(wstate->index),
+				MAIN_FORKNUM, wstate->btws_pages_written++,
+				wstate->btws_zeropage, true);
 	}
 
-	PageSetChecksumInplace(page, blkno);
 
-	/*
-	 * Now write the page.  There's no need for smgr to schedule an fsync for
-	 * this write; we'll do it ourselves before ending the build.
-	 */
+	/* Now write the page. Either we are extending the file... */
 	if (blkno == wstate->btws_pages_written)
 	{
-		/* extending the file... */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				   (char *) page, true);
+		unbuffered_extend(&wstate->ub_wstate, RelationGetSmgr(wstate->index),
+				MAIN_FORKNUM, blkno, page, false);
+
 		wstate->btws_pages_written++;
 	}
+
+	/* or we are overwriting a block we zero-filled before. */
 	else
 	{
-		/* overwriting a block we zero-filled before */
-		smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				  (char *) page, true);
-	}
+		unbuffered_write(&wstate->ub_wstate, RelationGetSmgr(wstate->index),
+				MAIN_FORKNUM, blkno, page);
+
+		/* We use the XLOG_FPI record type for this */
+		if (wstate->btws_use_wal)
+			log_newpage(&wstate->index->rd_node, MAIN_FORKNUM, blkno, page, true);
 
+	}
 	pfree(page);
 }
 
@@ -1195,6 +1189,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	int64		tuples_done = 0;
 	bool		deduplicate;
 
+	unbuffered_prep(&wstate->ub_wstate, wstate->btws_use_wal);
+
 	deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
 		BTGetDeduplicateItems(wstate->index);
 
@@ -1421,17 +1417,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
 
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
-	if (wstate->btws_use_wal)
-		smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
+	unbuffered_finish(&wstate->ub_wstate, RelationGetSmgr(wstate->index),
+			MAIN_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index bfb74049d0..318dbee823 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -25,7 +25,7 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
-#include "storage/smgr.h"
+#include "storage/directmgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -156,48 +156,43 @@ void
 spgbuildempty(Relation index)
 {
 	Page		page;
+	UnBufferedWriteState wstate;
+
+	/*
+	 * Though this is an unlogged relation, pass do_wal=true since the init
+	 * fork of an unlogged index must be wal-logged and fsync'd. This currently
+	 * has no effect, as unbuffered_write() does not do log_newpage()
+	 * internally. However, were this to be replaced with unbuffered_extend(),
+	 * do_wal must be true to ensure the data is logged and fsync'd.
+	 */
+	unbuffered_prep(&wstate, true);
 
 	/* Construct metapage. */
 	page = (Page) palloc(BLCKSZ);
 	SpGistInitMetapage(page);
 
-	/*
-	 * Write the page and log it unconditionally.  This is important
-	 * particularly for indexes created on tablespaces and databases whose
-	 * creation happened after the last redo pointer as recovery removes any
-	 * of their existing content when the corresponding create records are
-	 * replayed.
-	 */
-	PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, RelationGetSmgr(index), INIT_FORKNUM,
+			SPGIST_METAPAGE_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_METAPAGE_BLKNO, page, true);
 
 	/* Likewise for the root page. */
 	SpGistInitPage(page, SPGIST_LEAF);
 
-	PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_ROOT_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, RelationGetSmgr(index), INIT_FORKNUM,
+			SPGIST_ROOT_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_ROOT_BLKNO, page, true);
 
 	/* Likewise for the null-tuples root page. */
 	SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
 
-	PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_NULL_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, RelationGetSmgr(index), INIT_FORKNUM,
+			SPGIST_NULL_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_NULL_BLKNO, page, true);
 
-	/*
-	 * An immediate sync is required even if we xlog'd the pages, because the
-	 * writes did not go through shared buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
-	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, RelationGetSmgr(index), INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 9b8075536a..0b211895c1 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -28,6 +28,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
@@ -420,6 +421,8 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	bool		copying_initfork;
 	BlockNumber nblocks;
 	BlockNumber blkno;
+	UnBufferedWriteState wstate;
+
 
 	page = (Page) buf.data;
 
@@ -440,6 +443,8 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	use_wal = XLogIsNeeded() &&
 		(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
 
+	unbuffered_prep(&wstate, use_wal);
+
 	nblocks = smgrnblocks(src, forkNum);
 
 	for (blkno = 0; blkno < nblocks; blkno++)
@@ -474,30 +479,10 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		 * page this is, so we have to log the full page including any unused
 		 * space.
 		 */
-		if (use_wal)
-			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false);
-
-		PageSetChecksumInplace(page, blkno);
-
-		/*
-		 * Now write the page.  We say skipFsync = true because there's no
-		 * need for smgr to schedule an fsync for this write; we'll do it
-		 * ourselves below.
-		 */
-		smgrextend(dst, forkNum, blkno, buf.data, true);
+		unbuffered_extend(&wstate, dst, forkNum, blkno, page, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
-	 * occurring during the copy has no way to flush the previously written
-	 * data to disk (indeed it won't know the new rel even exists).  A crash
-	 * later on would replay WAL from the checkpoint, therefore it wouldn't
-	 * replay our earlier WAL entries. If we do not fsync those pages here,
-	 * they might still not be on disk when the crash occurs.
-	 */
-	if (use_wal || copying_initfork)
-		smgrimmedsync(dst, forkNum);
+	unbuffered_finish(&wstate, dst, forkNum);
 }
 
 /*
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..501fae5f9d 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = buffer direct file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/Makefile b/src/backend/storage/direct/Makefile
new file mode 100644
index 0000000000..d82bbed48c
--- /dev/null
+++ b/src/backend/storage/direct/Makefile
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for storage/direct
+#
+# IDENTIFICATION
+#    src/backend/storage/direct/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/storage/direct
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = directmgr.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/directmgr.c b/src/backend/storage/direct/directmgr.c
new file mode 100644
index 0000000000..42c37daa7a
--- /dev/null
+++ b/src/backend/storage/direct/directmgr.c
@@ -0,0 +1,98 @@
+/*-------------------------------------------------------------------------
+ *
+ * directmgr.c
+ *	  routines for managing unbuffered IO
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/direct/directmgr.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+
+#include "access/xlogdefs.h"
+#include "access/xloginsert.h"
+#include "storage/directmgr.h"
+
+void
+unbuffered_prep(UnBufferedWriteState *wstate, bool do_wal)
+{
+	wstate->do_wal = do_wal;
+}
+
+void
+unbuffered_extend(UnBufferedWriteState *wstate, SMgrRelation
+		smgrrel, ForkNumber forknum, BlockNumber blocknum, Page page, bool
+		empty)
+{
+	/*
+	 * Don't checksum empty pages
+	 */
+	if (!empty)
+		PageSetChecksumInplace(page, blocknum);
+
+	smgrextend(smgrrel, forknum, blocknum, (char *) page, true);
+
+	/*
+	 * Don't WAL-log empty pages
+	 */
+	if (!empty && wstate->do_wal)
+		log_newpage(&(smgrrel)->smgr_rnode.node, forknum,
+					blocknum, page, true);
+}
+
+void
+unbuffered_extend_range(UnBufferedWriteState *wstate, SMgrRelation smgrrel,
+		ForkNumber forknum, int num_pages, BlockNumber *blocknums, Page *pages,
+		bool empty, XLogRecPtr custom_lsn)
+{
+	for (int i = 0; i < num_pages; i++)
+	{
+		Page		page = pages[i];
+		BlockNumber blkno = blocknums[i];
+
+		if (!XLogRecPtrIsInvalid(custom_lsn))
+			PageSetLSN(page, custom_lsn);
+
+		if (!empty)
+			PageSetChecksumInplace(page, blkno);
+
+		smgrextend(smgrrel, forknum, blkno, (char *) page, true);
+	}
+
+	if (!empty && wstate->do_wal)
+		log_newpages(&(smgrrel)->smgr_rnode.node, forknum, num_pages,
+				blocknums, pages, true);
+}
+
+void
+unbuffered_write(UnBufferedWriteState *wstate, SMgrRelation smgrrel, ForkNumber
+		forknum, BlockNumber blocknum, Page page)
+{
+	PageSetChecksumInplace(page, blocknum);
+
+	smgrwrite(smgrrel, forknum, blocknum, (char *) page, true);
+}
+
+/*
+ * When writing data outside shared buffers, a concurrent CHECKPOINT can move
+ * the redo pointer past our WAL entries and won't flush our data to disk. If
+ * the database crashes before the data makes it to disk, our WAL won't be
+ * replayed and the data will be lost.
+ * Thus, if a CHECKPOINT begins between unbuffered_prep() and
+ * unbuffered_finish(), the backend must fsync the data itself.
+ */
+void
+unbuffered_finish(UnBufferedWriteState *wstate, SMgrRelation smgrrel,
+		ForkNumber forknum)
+{
+	if (!wstate->do_wal)
+		return;
+
+	smgrimmedsync(smgrrel, forknum);
+}
diff --git a/src/include/storage/directmgr.h b/src/include/storage/directmgr.h
new file mode 100644
index 0000000000..db5e3b1cac
--- /dev/null
+++ b/src/include/storage/directmgr.h
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * directmgr.h
+ *	  POSTGRES unbuffered IO manager definitions.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/directmgr.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DIRECTMGR_H
+#define DIRECTMGR_H
+
+#include "common/relpath.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/smgr.h"
+
+typedef struct UnBufferedWriteState
+{
+	/*
+	 * When writing WAL-logged relation data outside of shared buffers, there
+	 * is a risk of a concurrent CHECKPOINT moving the redo pointer past the
+	 * data's associated WAL entries. To avoid this, callers in this situation
+	 * must fsync the pages they have written themselves. This is necessary
+	 * only if the relation is WAL-logged or in special cases such as the init
+	 * fork of an unlogged index.
+	 */
+	bool do_wal;
+} UnBufferedWriteState;
+/*
+ * prototypes for functions in directmgr.c
+ */
+extern void
+unbuffered_prep(UnBufferedWriteState *wstate, bool do_wal);
+extern void
+unbuffered_extend(UnBufferedWriteState *wstate, SMgrRelation smgrrel,
+		ForkNumber forknum, BlockNumber blocknum, Page page, bool empty);
+extern void
+unbuffered_extend_range(UnBufferedWriteState *wstate, SMgrRelation smgrrel,
+		ForkNumber forknum, int num_pages, BlockNumber *blocknums, Page *pages,
+		bool empty, XLogRecPtr custom_lsn);
+extern void
+unbuffered_write(UnBufferedWriteState *wstate, SMgrRelation smgrrel, ForkNumber
+		forknum, BlockNumber blocknum, Page page);
+extern void
+unbuffered_finish(UnBufferedWriteState *wstate, SMgrRelation smgrrel,
+		ForkNumber forknum);
+
+#endif							/* DIRECTMGR_H */
-- 
2.30.2

