From b016c9ee2e47efe434bf805fb0b310e9a302b0f9 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 28 Sep 2021 14:51:11 -0400
Subject: [PATCH v4 1/2] Add unbuffered IO and avoid immed fsync

Replace unbuffered extends and writes

When writing data outside of shared buffers, the backend must do a
series of steps to ensure the data is both durable and recoverable. When
writing or extending a page of data, the backend must log, checksum, and
write out the page before freeing it.

Additionally, though the data is durable once the WAL entries are on
permanent storage, the XLOG Redo pointer cannot be moved past this WAL
until the page data is safely on permanent storage. If a crash were to
occur before the data is fsync'd, the WAL wouldn't be replayed during
recovery, and the data would be lost. Thus, the backend must ensure that
either the Redo pointer has not moved or that the data is fsync'd before
freeing the page.

This is not a problem with pages written in shared buffers because the
checkpointer will block until all buffers that were dirtied before it
began finish before it moves the Redo pointer past their associated WAL
entries.

This commit makes two main changes:

1) It wraps smgrextend() and smgrwrite() in functions from a new API
   for writing data outside of shared buffers, directmgr.

2) It saves the XLOG Redo pointer location before doing the write or
   extend. It also adds an fsync request for the page to the
   checkpointer's pending-ops table. Then, after doing the write or
   extend, if the Redo pointer has moved (meaning a checkpoint has
   started since it saved it last), then the backend fsync's the page
   itself. Otherwise, it lets the checkpointer take care of fsync'ing
   the page the next time it processes the pending-ops table.
---
 src/backend/access/gist/gistbuild.c       | 17 +++----
 src/backend/access/hash/hashpage.c        |  9 ++--
 src/backend/access/heap/heapam_handler.c  | 16 ++++---
 src/backend/access/heap/rewriteheap.c     | 26 ++++-------
 src/backend/access/heap/visibilitymap.c   |  8 ++--
 src/backend/access/nbtree/nbtree.c        | 17 ++++---
 src/backend/access/nbtree/nbtsort.c       | 39 +++++-----------
 src/backend/access/spgist/spginsert.c     | 25 +++++-----
 src/backend/access/transam/xlog.c         | 13 ++++++
 src/backend/catalog/storage.c             | 25 +++-------
 src/backend/storage/Makefile              |  2 +-
 src/backend/storage/direct/Makefile       | 17 +++++++
 src/backend/storage/direct/directmgr.c    | 57 +++++++++++++++++++++++
 src/backend/storage/freespace/freespace.c | 10 ++--
 src/include/access/xlog.h                 |  1 +
 src/include/storage/directmgr.h           | 44 +++++++++++++++++
 16 files changed, 219 insertions(+), 107 deletions(-)
 create mode 100644 src/backend/storage/direct/Makefile
 create mode 100644 src/backend/storage/direct/directmgr.c
 create mode 100644 src/include/storage/directmgr.h

diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 9854116fca..748ca65492 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -43,6 +43,7 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -91,6 +92,7 @@ typedef struct
 
 	int64		indtuples;		/* number of tuples indexed */
 
+	UnBufferedWriteState ub_wstate;
 	/*
 	 * Extra data structures used during a buffering build. 'gfbb' contains
 	 * information related to managing the build buffers. 'parentMap' is a
@@ -194,6 +196,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.heaprel = heap;
 	buildstate.sortstate = NULL;
 	buildstate.giststate = initGISTstate(index);
+	buildstate.ub_wstate.smgr_rel = RelationGetSmgr(index);
 
 	/*
 	 * Create a temporary memory context that is reset once for each tuple
@@ -403,14 +406,15 @@ gist_indexsortbuild(GISTBuildState *state)
 	state->pages_allocated = 0;
 	state->pages_written = 0;
 	state->ready_num_pages = 0;
+	unbuffered_prep(&state->ub_wstate);
 
 	/*
 	 * Write an empty page as a placeholder for the root page. It will be
 	 * replaced with the real root page at the end.
 	 */
 	page = palloc0(BLCKSZ);
-	smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			   page, true);
+	unbuffered_extend(&state->ub_wstate, MAIN_FORKNUM, GIST_ROOT_BLKNO, page,
+			true);
 	state->pages_allocated++;
 	state->pages_written++;
 
@@ -450,13 +454,12 @@ gist_indexsortbuild(GISTBuildState *state)
 
 	/* Write out the root */
 	PageSetLSN(pagestate->page, GistBuildLSN);
-	PageSetChecksumInplace(pagestate->page, GIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			  pagestate->page, true);
+	unbuffered_write(&state->ub_wstate, MAIN_FORKNUM, GIST_ROOT_BLKNO, pagestate->page);
 	if (RelationNeedsWAL(state->indexrel))
 		log_newpage(&state->indexrel->rd_node, MAIN_FORKNUM, GIST_ROOT_BLKNO,
 					pagestate->page, true);
 
+	unbuffered_finish(&state->ub_wstate, MAIN_FORKNUM);
 	pfree(pagestate->page);
 	pfree(pagestate);
 }
@@ -570,9 +573,7 @@ gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
 			elog(ERROR, "unexpected block number to flush GiST sorting build");
 
 		PageSetLSN(page, GistBuildLSN);
-		PageSetChecksumInplace(page, blkno);
-		smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
-				   true);
+		unbuffered_extend(&state->ub_wstate, MAIN_FORKNUM, blkno, page, false);
 
 		state->pages_written++;
 	}
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index ee351aea09..722420adf5 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -32,6 +32,7 @@
 #include "access/hash_xlog.h"
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/smgr.h"
@@ -990,8 +991,10 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	PGAlignedBlock zerobuf;
 	Page		page;
 	HashPageOpaque ovflopaque;
+	UnBufferedWriteState ub_wstate;
 
 	lastblock = firstblock + nblocks - 1;
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 
 	/*
 	 * Check for overflow in block number calculation; if so, we cannot extend
@@ -1000,6 +1003,8 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	if (lastblock < firstblock || lastblock == InvalidBlockNumber)
 		return false;
 
+	unbuffered_prep(&ub_wstate);
+
 	page = (Page) zerobuf.data;
 
 	/*
@@ -1024,9 +1029,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 					zerobuf.data,
 					true);
 
-	PageSetChecksumInplace(page, lastblock);
-	smgrextend(RelationGetSmgr(rel), MAIN_FORKNUM, lastblock, zerobuf.data,
-			   false);
+	unbuffered_extend(&ub_wstate, MAIN_FORKNUM, lastblock, zerobuf.data, false);
 
 	return true;
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 39ef8a0b77..8824e39a91 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -38,6 +38,7 @@
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
@@ -575,6 +576,7 @@ heapam_relation_set_new_filenode(Relation rel,
 								 MultiXactId *minmulti)
 {
 	SMgrRelation srel;
+	UnBufferedWriteState ub_wstate;
 
 	/*
 	 * Initialize to the minimum XID that could put tuples in the table. We
@@ -594,15 +596,15 @@ heapam_relation_set_new_filenode(Relation rel,
 	*minmulti = GetOldestMultiXactId();
 
 	srel = RelationCreateStorage(*newrnode, persistence);
+	ub_wstate.smgr_rel = srel;
+	unbuffered_prep(&ub_wstate);
 
 	/*
 	 * If required, set up an init fork for an unlogged table so that it can
-	 * be correctly reinitialized on restart.  An immediate sync is required
-	 * even if the page has been logged, because the write did not go through
-	 * shared_buffers and therefore a concurrent checkpoint may have moved the
-	 * redo pointer past our xlog record.  Recovery may as well remove it
-	 * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
-	 * record. Therefore, logging is necessary even if wal_level=minimal.
+	 * be correctly reinitialized on restart.
+	 * Recovery may as well remove our xlog record while replaying, for
+	 * example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record. Therefore,
+	 * logging is necessary even if wal_level=minimal.
 	 */
 	if (persistence == RELPERSISTENCE_UNLOGGED)
 	{
@@ -611,7 +613,7 @@ heapam_relation_set_new_filenode(Relation rel,
 			   rel->rd_rel->relkind == RELKIND_TOASTVALUE);
 		smgrcreate(srel, INIT_FORKNUM, false);
 		log_smgrcreate(newrnode, INIT_FORKNUM);
-		smgrimmedsync(srel, INIT_FORKNUM);
+		unbuffered_finish(&ub_wstate, INIT_FORKNUM);
 	}
 
 	smgrclose(srel);
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index aa265edf60..0bb6eec9f9 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -119,6 +119,7 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
@@ -152,6 +153,7 @@ typedef struct RewriteStateData
 	HTAB	   *rs_old_new_tid_map; /* unmatched B tuples */
 	HTAB	   *rs_logical_mappings;	/* logical remapping files */
 	uint32		rs_num_rewrite_mappings;	/* # in memory mappings */
+	UnBufferedWriteState rs_unbuffered_wstate;
 }			RewriteStateData;
 
 /*
@@ -264,6 +266,9 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
+	state->rs_unbuffered_wstate.smgr_rel = RelationGetSmgr(state->rs_new_rel);
+
+	unbuffered_prep(&state->rs_unbuffered_wstate);
 
 	/* Initialize hash tables used to track update chains */
 	hash_ctl.keysize = sizeof(TidHashKey);
@@ -324,21 +329,12 @@ end_heap_rewrite(RewriteState state)
 						state->rs_buffer,
 						true);
 
-		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
-
-		smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-				   state->rs_blockno, (char *) state->rs_buffer, true);
+		unbuffered_extend(&state->rs_unbuffered_wstate, MAIN_FORKNUM,
+				state->rs_blockno, state->rs_buffer, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is the same as in storage.c's RelationCopyStorage(): we're
-	 * writing data that's not in shared buffers, and so a CHECKPOINT
-	 * occurring during the rewriteheap operation won't have fsync'd data we
-	 * wrote before the checkpoint.
-	 */
 	if (RelationNeedsWAL(state->rs_new_rel))
-		smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
+		unbuffered_finish(&state->rs_unbuffered_wstate, MAIN_FORKNUM);
 
 	logical_end_heap_rewrite(state);
 
@@ -690,10 +686,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			 * need for smgr to schedule an fsync for this write; we'll do it
 			 * ourselves in end_heap_rewrite.
 			 */
-			PageSetChecksumInplace(page, state->rs_blockno);
-
-			smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-					   state->rs_blockno, (char *) page, true);
+			unbuffered_extend(&state->rs_unbuffered_wstate, MAIN_FORKNUM,
+					state->rs_blockno, page, false);
 
 			state->rs_blockno++;
 			state->rs_buffer_valid = false;
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 9032d4758f..1a7482e267 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -92,6 +92,7 @@
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/inval.h"
@@ -616,7 +617,9 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	BlockNumber vm_nblocks_now;
 	PGAlignedBlock pg;
 	SMgrRelation reln;
+	UnBufferedWriteState ub_wstate;
 
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 	PageInit((Page) pg.data, BLCKSZ, 0);
 
 	/*
@@ -654,9 +657,8 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	/* Now extend the file */
 	while (vm_nblocks_now < vm_nblocks)
 	{
-		PageSetChecksumInplace((Page) pg.data, vm_nblocks_now);
-
-		smgrextend(reln, VISIBILITYMAP_FORKNUM, vm_nblocks_now, pg.data, false);
+		// TODO: aren't these pages empty? why checksum them
+		unbuffered_extend(&ub_wstate, VISIBILITYMAP_FORKNUM, vm_nblocks_now, (Page) pg.data, false);
 		vm_nblocks_now++;
 	}
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 13024af2fa..6ab6651420 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -29,6 +29,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/condition_variable.h"
+#include "storage/directmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -151,6 +152,11 @@ void
 btbuildempty(Relation index)
 {
 	Page		metapage;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = RelationGetSmgr(index);
+
+	unbuffered_prep(&wstate);
 
 	/* Construct metapage. */
 	metapage = (Page) palloc(BLCKSZ);
@@ -163,18 +169,15 @@ btbuildempty(Relation index)
 	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
-	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, BTREE_METAPAGE,
-			  (char *) metapage, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, BTREE_METAPAGE, metapage);
 	log_newpage(&RelationGetSmgr(index)->smgr_rnode.node, INIT_FORKNUM,
 				BTREE_METAPAGE, metapage, true);
 
 	/*
-	 * An immediate sync is required even if we xlog'd the page, because the
-	 * write did not go through shared_buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
+	 * Even though we xlog'd the page, a concurrent checkpoint may have moved
+	 * the redo pointer past our xlog record, so we may still need to fsync.
 	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index dc220146fd..5687acd99d 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -57,6 +57,7 @@
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/rel.h"
@@ -253,6 +254,7 @@ typedef struct BTWriteState
 	BlockNumber btws_pages_alloced; /* # pages allocated */
 	BlockNumber btws_pages_written; /* # pages written out */
 	Page		btws_zeropage;	/* workspace for filling zeroes */
+	UnBufferedWriteState ub_wstate;
 } BTWriteState;
 
 
@@ -560,6 +562,8 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 
 	wstate.heap = btspool->heap;
 	wstate.index = btspool->index;
+	wstate.ub_wstate.smgr_rel = RelationGetSmgr(btspool->index);
+	wstate.ub_wstate.redo = InvalidXLogRecPtr;
 	wstate.inskey = _bt_mkscankey(wstate.index, NULL);
 	/* _bt_mkscankey() won't set allequalimage without metapage */
 	wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
@@ -656,31 +660,19 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 		if (!wstate->btws_zeropage)
 			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
 		/* don't set checksum for all-zero page */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
-				   wstate->btws_pages_written++,
-				   (char *) wstate->btws_zeropage,
-				   true);
+		unbuffered_extend(&wstate->ub_wstate, MAIN_FORKNUM, wstate->btws_pages_written++, wstate->btws_zeropage, true);
 	}
 
-	PageSetChecksumInplace(page, blkno);
 
-	/*
-	 * Now write the page.  There's no need for smgr to schedule an fsync for
-	 * this write; we'll do it ourselves before ending the build.
-	 */
+	/* Now write the page. Either we are extending the file... */
 	if (blkno == wstate->btws_pages_written)
 	{
-		/* extending the file... */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				   (char *) page, true);
+		unbuffered_extend(&wstate->ub_wstate, MAIN_FORKNUM, blkno, page, false);
 		wstate->btws_pages_written++;
 	}
+	/* or we are overwriting a block we zero-filled before. */
 	else
-	{
-		/* overwriting a block we zero-filled before */
-		smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				  (char *) page, true);
-	}
+		unbuffered_write(&wstate->ub_wstate, MAIN_FORKNUM, blkno, page);
 
 	pfree(page);
 }
@@ -1189,6 +1181,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	int64		tuples_done = 0;
 	bool		deduplicate;
 
+
+	unbuffered_prep(&wstate->ub_wstate);
 	deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
 		BTGetDeduplicateItems(wstate->index);
 
@@ -1415,17 +1409,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
 
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
 	if (wstate->btws_use_wal)
-		smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
+		unbuffered_finish(&wstate->ub_wstate, MAIN_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index bfb74049d0..9fd5686b8d 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -25,6 +25,7 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -156,6 +157,10 @@ void
 spgbuildempty(Relation index)
 {
 	Page		page;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = RelationGetSmgr(index);
+	unbuffered_prep(&wstate);
 
 	/* Construct metapage. */
 	page = (Page) palloc(BLCKSZ);
@@ -168,36 +173,30 @@ spgbuildempty(Relation index)
 	 * of their existing content when the corresponding create records are
 	 * replayed.
 	 */
-	PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_METAPAGE_BLKNO, page, true);
 
 	/* Likewise for the root page. */
 	SpGistInitPage(page, SPGIST_LEAF);
 
-	PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_ROOT_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_ROOT_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_ROOT_BLKNO, page, true);
 
 	/* Likewise for the null-tuples root page. */
 	SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
 
-	PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_NULL_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_NULL_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_NULL_BLKNO, page, true);
 
 	/*
-	 * An immediate sync is required even if we xlog'd the pages, because the
-	 * writes did not go through shared buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
+	 * Because the writes did not go through shared buffers, if a concurrent
+	 * checkpoint moved the redo pointer past our xlog record, an immediate
+	 * sync is required even if we xlog'd the pages.
 	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c9d4cbf3ff..a06e8d005b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -13191,6 +13191,19 @@ CheckForStandbyTrigger(void)
 	return false;
 }
 
+bool RedoRecPtrChanged(XLogRecPtr comparator_ptr)
+{
+	XLogRecPtr ptr;
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ptr = XLogCtl->RedoRecPtr;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	if (RedoRecPtr < ptr)
+		RedoRecPtr = ptr;
+
+	return RedoRecPtr != comparator_ptr;
+}
+
 /*
  * Remove the files signaling a standby promotion request.
  */
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 9b8075536a..05e66d0434 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -28,6 +28,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
@@ -420,6 +421,10 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	bool		copying_initfork;
 	BlockNumber nblocks;
 	BlockNumber blkno;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = dst;
+	unbuffered_prep(&wstate);
 
 	page = (Page) buf.data;
 
@@ -477,27 +482,11 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		if (use_wal)
 			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false);
 
-		PageSetChecksumInplace(page, blkno);
-
-		/*
-		 * Now write the page.  We say skipFsync = true because there's no
-		 * need for smgr to schedule an fsync for this write; we'll do it
-		 * ourselves below.
-		 */
-		smgrextend(dst, forkNum, blkno, buf.data, true);
+		unbuffered_extend(&wstate, forkNum, blkno, buf.data, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
-	 * occurring during the copy has no way to flush the previously written
-	 * data to disk (indeed it won't know the new rel even exists).  A crash
-	 * later on would replay WAL from the checkpoint, therefore it wouldn't
-	 * replay our earlier WAL entries. If we do not fsync those pages here,
-	 * they might still not be on disk when the crash occurs.
-	 */
 	if (use_wal || copying_initfork)
-		smgrimmedsync(dst, forkNum);
+		unbuffered_finish(&wstate, forkNum);
 }
 
 /*
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..501fae5f9d 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = buffer direct file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/Makefile b/src/backend/storage/direct/Makefile
new file mode 100644
index 0000000000..d82bbed48c
--- /dev/null
+++ b/src/backend/storage/direct/Makefile
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for storage/direct
+#
+# IDENTIFICATION
+#    src/backend/storage/direct/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/storage/direct
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = directmgr.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/directmgr.c b/src/backend/storage/direct/directmgr.c
new file mode 100644
index 0000000000..d33c2a7a5f
--- /dev/null
+++ b/src/backend/storage/direct/directmgr.c
@@ -0,0 +1,57 @@
+/*-------------------------------------------------------------------------
+ *
+ * directmgr.c
+ *	  routines for managing unbuffered IO
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/direct/directmgr.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+
+#include "access/xlog.h"
+#include "storage/directmgr.h"
+#include "utils/rel.h"
+
+void unbuffered_prep(UnBufferedWriteState *wstate)
+{
+	wstate->redo = GetRedoRecPtr();
+}
+
+/*
+ * When writing data outside shared buffers, a concurrent CHECKPOINT can move
+ * the redo pointer past our WAL entries and won't flush our data to disk. If
+ * the database crashes before the data makes it to disk, our WAL won't be
+ * replayed and the data will be lost.
+ * Thus, if a CHECKPOINT begins between unbuffered_prep() and
+ * unbuffered_finish(), the backend must fsync the data itself.
+ */
+void unbuffered_finish(UnBufferedWriteState *wstate, ForkNumber forknum)
+{
+	if (RedoRecPtrChanged(wstate->redo))
+		smgrimmedsync(wstate->smgr_rel, forknum);
+}
+
+void
+unbuffered_write(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber
+		blocknum, Page page)
+{
+	PageSetChecksumInplace(page, blocknum);
+	smgrwrite(wstate->smgr_rel, forknum, blocknum, (char *) page, false);
+}
+
+void
+unbuffered_extend(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber
+		blocknum, Page page, bool empty)
+{
+	if (!empty)
+		PageSetChecksumInplace(page, blocknum);
+	smgrextend(wstate->smgr_rel, forknum, blocknum, (char *) page, false);
+}
+
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index c88cb91f06..d31f75bd60 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -26,6 +26,7 @@
 #include "access/htup_details.h"
 #include "access/xlogutils.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/fsm_internals.h"
 #include "storage/lmgr.h"
@@ -608,6 +609,9 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 	BlockNumber fsm_nblocks_now;
 	PGAlignedBlock pg;
 	SMgrRelation reln;
+	UnBufferedWriteState ub_wstate;
+
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 
 	PageInit((Page) pg.data, BLCKSZ, 0);
 
@@ -647,10 +651,8 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 	/* Extend as needed. */
 	while (fsm_nblocks_now < fsm_nblocks)
 	{
-		PageSetChecksumInplace((Page) pg.data, fsm_nblocks_now);
-
-		smgrextend(reln, FSM_FORKNUM, fsm_nblocks_now,
-				   pg.data, false);
+		// TODO: why was it checksumming all zero pages?
+		unbuffered_extend(&ub_wstate, FSM_FORKNUM, fsm_nblocks_now, (Page) pg.data, false);
 		fsm_nblocks_now++;
 	}
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index bb0c52686a..4961067f81 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -312,6 +312,7 @@ extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI);
 extern TimeLineID GetWALInsertionTimeLine(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
+extern bool RedoRecPtrChanged(XLogRecPtr comparator_ptr);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool PromoteIsTriggered(void);
diff --git a/src/include/storage/directmgr.h b/src/include/storage/directmgr.h
new file mode 100644
index 0000000000..5435ab1d08
--- /dev/null
+++ b/src/include/storage/directmgr.h
@@ -0,0 +1,44 @@
+/*-------------------------------------------------------------------------
+ *
+ * directmgr.h
+ *	  POSTGRES unbuffered IO manager definitions.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/directmgr.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "access/xlogdefs.h"
+#include "common/relpath.h"
+#include "storage/bufpage.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+#ifndef DIRECTMGR_H
+#define DIRECTMGR_H
+
+/*
+ * After committing the pg_buffer_stats patch, this will contain a pointer to a
+ * PgBufferAccess struct to count the writes and extends done in this way.
+ */ 
+typedef struct UnBufferedWriteState
+{
+	SMgrRelation smgr_rel;
+	XLogRecPtr redo;
+} UnBufferedWriteState;
+/*
+ * prototypes for functions in directmgr.c
+ */
+extern void unbuffered_prep(UnBufferedWriteState *wstate);
+extern void unbuffered_finish(UnBufferedWriteState *wstate, ForkNumber forknum);
+extern void
+unbuffered_extend(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber
+		blocknum, Page page, bool empty);
+extern void
+unbuffered_write(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber
+		blocknum, Page page);
+
+#endif							/* DIRECTMGR_H */
-- 
2.30.2

