From f68147500e88741debdae730763fb57d42c6ab79 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 28 Sep 2021 14:51:11 -0400
Subject: [PATCH v1] Add unbuffered IO and avoid immed fsync

Replace unbuffered extends and writes
---
 src/backend/access/gist/gistbuild.c       | 16 +++----
 src/backend/access/hash/hashpage.c        |  9 ++--
 src/backend/access/heap/heapam_handler.c  | 16 ++++---
 src/backend/access/heap/rewriteheap.c     | 25 ++++-------
 src/backend/access/heap/visibilitymap.c   |  8 ++--
 src/backend/access/nbtree/nbtree.c        | 17 ++++---
 src/backend/access/nbtree/nbtsort.c       | 39 +++++-----------
 src/backend/access/spgist/spginsert.c     | 25 +++++------
 src/backend/access/transam/xlog.c         | 13 ++++++
 src/backend/catalog/storage.c             | 25 +++--------
 src/backend/storage/Makefile              |  2 +-
 src/backend/storage/direct/Makefile       | 17 +++++++
 src/backend/storage/direct/directmgr.c    | 55 +++++++++++++++++++++++
 src/backend/storage/freespace/freespace.c | 10 +++--
 src/include/access/xlog.h                 |  1 +
 15 files changed, 171 insertions(+), 107 deletions(-)
 create mode 100644 src/backend/storage/direct/Makefile
 create mode 100644 src/backend/storage/direct/directmgr.c

diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index baad28c09f..34f712590c 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -43,6 +43,7 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -91,6 +92,7 @@ typedef struct
 
 	int64		indtuples;		/* number of tuples indexed */
 
+	UnBufferedWriteState ub_wstate;
 	/*
 	 * Extra data structures used during a buffering build. 'gfbb' contains
 	 * information related to managing the build buffers. 'parentMap' is a
@@ -194,6 +196,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.heaprel = heap;
 	buildstate.sortstate = NULL;
 	buildstate.giststate = initGISTstate(index);
+	buildstate.ub_wstate.smgr_rel = RelationGetSmgr(index);
 
 	/*
 	 * Create a temporary memory context that is reset once for each tuple
@@ -403,14 +406,14 @@ gist_indexsortbuild(GISTBuildState *state)
 	state->pages_allocated = 0;
 	state->pages_written = 0;
 	state->ready_num_pages = 0;
+	unbuffered_prep(&state->ub_wstate);
 
 	/*
 	 * Write an empty page as a placeholder for the root page. It will be
 	 * replaced with the real root page at the end.
 	 */
 	page = palloc0(BLCKSZ);
-	smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			   page, true);
+	unbuffered_extend(&state->ub_wstate, MAIN_FORKNUM, GIST_ROOT_BLKNO, page, true);
 	state->pages_allocated++;
 	state->pages_written++;
 
@@ -450,13 +453,12 @@ gist_indexsortbuild(GISTBuildState *state)
 
 	/* Write out the root */
 	PageSetLSN(pagestate->page, GistBuildLSN);
-	PageSetChecksumInplace(pagestate->page, GIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			  pagestate->page, true);
+	unbuffered_write(&state->ub_wstate, MAIN_FORKNUM, GIST_ROOT_BLKNO, pagestate->page);
 	if (RelationNeedsWAL(state->indexrel))
 		log_newpage(&state->indexrel->rd_node, MAIN_FORKNUM, GIST_ROOT_BLKNO,
 					pagestate->page, true);
 
+	unbuffered_finish(&state->ub_wstate, MAIN_FORKNUM);
 	pfree(pagestate->page);
 	pfree(pagestate);
 }
@@ -570,9 +572,7 @@ gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
 			elog(ERROR, "unexpected block number to flush GiST sorting build");
 
 		PageSetLSN(page, GistBuildLSN);
-		PageSetChecksumInplace(page, blkno);
-		smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
-				   true);
+		unbuffered_extend(&state->ub_wstate, MAIN_FORKNUM, blkno, page, false);
 
 		state->pages_written++;
 	}
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 159646c7c3..fcc0e28a36 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -32,6 +32,7 @@
 #include "access/hash_xlog.h"
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/smgr.h"
@@ -990,8 +991,10 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	PGAlignedBlock zerobuf;
 	Page		page;
 	HashPageOpaque ovflopaque;
+	UnBufferedWriteState ub_wstate;
 
 	lastblock = firstblock + nblocks - 1;
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 
 	/*
 	 * Check for overflow in block number calculation; if so, we cannot extend
@@ -1000,6 +1003,8 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	if (lastblock < firstblock || lastblock == InvalidBlockNumber)
 		return false;
 
+	unbuffered_prep(&ub_wstate);
+
 	page = (Page) zerobuf.data;
 
 	/*
@@ -1024,9 +1029,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 					zerobuf.data,
 					true);
 
-	PageSetChecksumInplace(page, lastblock);
-	smgrextend(RelationGetSmgr(rel), MAIN_FORKNUM, lastblock, zerobuf.data,
-			   false);
+	unbuffered_extend(&ub_wstate, MAIN_FORKNUM, lastblock, zerobuf.data, false);
 
 	return true;
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 9befe012a9..fa4780e186 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -38,6 +38,7 @@
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
@@ -575,6 +576,7 @@ heapam_relation_set_new_filenode(Relation rel,
 								 MultiXactId *minmulti)
 {
 	SMgrRelation srel;
+	UnBufferedWriteState ub_wstate;
 
 	/*
 	 * Initialize to the minimum XID that could put tuples in the table. We
@@ -594,15 +596,15 @@ heapam_relation_set_new_filenode(Relation rel,
 	*minmulti = GetOldestMultiXactId();
 
 	srel = RelationCreateStorage(*newrnode, persistence);
+	ub_wstate.smgr_rel = srel;
+	unbuffered_prep(&ub_wstate);
 
 	/*
 	 * If required, set up an init fork for an unlogged table so that it can
-	 * be correctly reinitialized on restart.  An immediate sync is required
-	 * even if the page has been logged, because the write did not go through
-	 * shared_buffers and therefore a concurrent checkpoint may have moved the
-	 * redo pointer past our xlog record.  Recovery may as well remove it
-	 * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
-	 * record. Therefore, logging is necessary even if wal_level=minimal.
+	 * be correctly reinitialized on restart.
+	 * Recovery may as well remove our xlog record while replaying, for
+	 * example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record. Therefore,
+	 * logging is necessary even if wal_level=minimal.
 	 */
 	if (persistence == RELPERSISTENCE_UNLOGGED)
 	{
@@ -611,7 +613,7 @@ heapam_relation_set_new_filenode(Relation rel,
 			   rel->rd_rel->relkind == RELKIND_TOASTVALUE);
 		smgrcreate(srel, INIT_FORKNUM, false);
 		log_smgrcreate(newrnode, INIT_FORKNUM);
-		smgrimmedsync(srel, INIT_FORKNUM);
+		unbuffered_finish(&ub_wstate, INIT_FORKNUM);
 	}
 
 	smgrclose(srel);
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 986a776bbd..6d1f4c8f6b 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -119,6 +119,7 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
@@ -152,6 +153,7 @@ typedef struct RewriteStateData
 	HTAB	   *rs_old_new_tid_map; /* unmatched B tuples */
 	HTAB	   *rs_logical_mappings;	/* logical remapping files */
 	uint32		rs_num_rewrite_mappings;	/* # in memory mappings */
+	UnBufferedWriteState rs_unbuffered_wstate;
 }			RewriteStateData;
 
 /*
@@ -264,6 +266,9 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
+	state->rs_unbuffered_wstate.smgr_rel = RelationGetSmgr(state->rs_new_rel);
+
+	unbuffered_prep(&state->rs_unbuffered_wstate);
 
 	/* Initialize hash tables used to track update chains */
 	hash_ctl.keysize = sizeof(TidHashKey);
@@ -324,21 +329,12 @@ end_heap_rewrite(RewriteState state)
 						state->rs_buffer,
 						true);
 
-		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
-
-		smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-				   state->rs_blockno, (char *) state->rs_buffer, true);
+		unbuffered_extend(&state->rs_unbuffered_wstate, MAIN_FORKNUM,
+				state->rs_blockno, state->rs_buffer, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is the same as in storage.c's RelationCopyStorage(): we're
-	 * writing data that's not in shared buffers, and so a CHECKPOINT
-	 * occurring during the rewriteheap operation won't have fsync'd data we
-	 * wrote before the checkpoint.
-	 */
 	if (RelationNeedsWAL(state->rs_new_rel))
-		smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
+		unbuffered_finish(&state->rs_unbuffered_wstate, MAIN_FORKNUM);
 
 	logical_end_heap_rewrite(state);
 
@@ -690,10 +686,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			 * need for smgr to schedule an fsync for this write; we'll do it
 			 * ourselves in end_heap_rewrite.
 			 */
-			PageSetChecksumInplace(page, state->rs_blockno);
-
-			smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-					   state->rs_blockno, (char *) page, true);
+			unbuffered_extend(&state->rs_unbuffered_wstate, MAIN_FORKNUM, state->rs_blockno, page, false);
 
 			state->rs_blockno++;
 			state->rs_buffer_valid = false;
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 114fbbdd30..0f357fc4ff 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -92,6 +92,7 @@
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/inval.h"
@@ -616,7 +617,9 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	BlockNumber vm_nblocks_now;
 	PGAlignedBlock pg;
 	SMgrRelation reln;
+	UnBufferedWriteState ub_wstate;
 
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 	PageInit((Page) pg.data, BLCKSZ, 0);
 
 	/*
@@ -654,9 +657,8 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	/* Now extend the file */
 	while (vm_nblocks_now < vm_nblocks)
 	{
-		PageSetChecksumInplace((Page) pg.data, vm_nblocks_now);
-
-		smgrextend(reln, VISIBILITYMAP_FORKNUM, vm_nblocks_now, pg.data, false);
+		// TODO: aren't these pages empty? why checksum them
+		unbuffered_extend(&ub_wstate, VISIBILITYMAP_FORKNUM, vm_nblocks_now, (Page) pg.data, false);
 		vm_nblocks_now++;
 	}
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 40ad0956e0..c3e3418570 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -29,6 +29,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/condition_variable.h"
+#include "storage/directmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -150,6 +151,11 @@ void
 btbuildempty(Relation index)
 {
 	Page		metapage;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = RelationGetSmgr(index);
+
+	unbuffered_prep(&wstate);
 
 	/* Construct metapage. */
 	metapage = (Page) palloc(BLCKSZ);
@@ -162,18 +168,15 @@ btbuildempty(Relation index)
 	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
-	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, BTREE_METAPAGE,
-			  (char *) metapage, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, BTREE_METAPAGE, metapage);
 	log_newpage(&RelationGetSmgr(index)->smgr_rnode.node, INIT_FORKNUM,
 				BTREE_METAPAGE, metapage, true);
 
 	/*
-	 * An immediate sync is required even if we xlog'd the page, because the
-	 * write did not go through shared_buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
+	 * Even though we xlog'd the page, a concurrent checkpoint may have moved
+	 * the redo pointer past our xlog record, so we may still need to fsync.
 	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 54c8eb1289..9cb9757875 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -57,6 +57,7 @@
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/rel.h"
@@ -253,6 +254,7 @@ typedef struct BTWriteState
 	BlockNumber btws_pages_alloced; /* # pages allocated */
 	BlockNumber btws_pages_written; /* # pages written out */
 	Page		btws_zeropage;	/* workspace for filling zeroes */
+	UnBufferedWriteState ub_wstate;
 } BTWriteState;
 
 
@@ -560,6 +562,8 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 
 	wstate.heap = btspool->heap;
 	wstate.index = btspool->index;
+	wstate.ub_wstate.smgr_rel = RelationGetSmgr(btspool->index);
+	wstate.ub_wstate.redo = InvalidXLogRecPtr;
 	wstate.inskey = _bt_mkscankey(wstate.index, NULL);
 	/* _bt_mkscankey() won't set allequalimage without metapage */
 	wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
@@ -656,31 +660,19 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 		if (!wstate->btws_zeropage)
 			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
 		/* don't set checksum for all-zero page */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
-				   wstate->btws_pages_written++,
-				   (char *) wstate->btws_zeropage,
-				   true);
+		unbuffered_extend(&wstate->ub_wstate, MAIN_FORKNUM, wstate->btws_pages_written++, wstate->btws_zeropage, true);
 	}
 
-	PageSetChecksumInplace(page, blkno);
 
-	/*
-	 * Now write the page.  There's no need for smgr to schedule an fsync for
-	 * this write; we'll do it ourselves before ending the build.
-	 */
+	/* Now write the page. Either we are extending the file... */
 	if (blkno == wstate->btws_pages_written)
 	{
-		/* extending the file... */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				   (char *) page, true);
+		unbuffered_extend(&wstate->ub_wstate, MAIN_FORKNUM, blkno, page, false);
 		wstate->btws_pages_written++;
 	}
+	/* or we are overwriting a block we zero-filled before. */
 	else
-	{
-		/* overwriting a block we zero-filled before */
-		smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				  (char *) page, true);
-	}
+		unbuffered_write(&wstate->ub_wstate, MAIN_FORKNUM, blkno, page);
 
 	pfree(page);
 }
@@ -1189,6 +1181,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	int64		tuples_done = 0;
 	bool		deduplicate;
 
+
+	unbuffered_prep(&wstate->ub_wstate);
 	deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
 		BTGetDeduplicateItems(wstate->index);
 
@@ -1415,17 +1409,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
 
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
 	if (wstate->btws_use_wal)
-		smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
+		unbuffered_finish(&wstate->ub_wstate, MAIN_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index cc4394b1c8..1aeb8bc714 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -25,6 +25,7 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -156,6 +157,10 @@ void
 spgbuildempty(Relation index)
 {
 	Page		page;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = RelationGetSmgr(index);
+	unbuffered_prep(&wstate);
 
 	/* Construct metapage. */
 	page = (Page) palloc(BLCKSZ);
@@ -168,36 +173,30 @@ spgbuildempty(Relation index)
 	 * of their existing content when the corresponding create records are
 	 * replayed.
 	 */
-	PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_METAPAGE_BLKNO, page, true);
 
 	/* Likewise for the root page. */
 	SpGistInitPage(page, SPGIST_LEAF);
 
-	PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_ROOT_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_ROOT_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_ROOT_BLKNO, page, true);
 
 	/* Likewise for the null-tuples root page. */
 	SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
 
-	PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_NULL_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_NULL_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_NULL_BLKNO, page, true);
 
 	/*
-	 * An immediate sync is required even if we xlog'd the pages, because the
-	 * writes did not go through shared buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
+	 * Because the writes did not go through shared buffers, if a concurrent
+	 * checkpoint moved the redo pointer past our xlog record, an immediate
+	 * sync is required even if we xlog'd the pages.
 	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..d11a928b62 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12941,6 +12941,19 @@ CheckForStandbyTrigger(void)
 	return false;
 }
 
+bool RedoRecPtrChanged(XLogRecPtr comparator_ptr)
+{
+	XLogRecPtr ptr;
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ptr = XLogCtl->RedoRecPtr;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	if (RedoRecPtr < ptr)
+		RedoRecPtr = ptr;
+
+	return RedoRecPtr != comparator_ptr;
+}
+
 /*
  * Remove the files signaling a standby promotion request.
  */
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index c5ad28d71f..c63085b1aa 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -28,6 +28,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
@@ -420,6 +421,10 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	bool		copying_initfork;
 	BlockNumber nblocks;
 	BlockNumber blkno;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = dst;
+	unbuffered_prep(&wstate);
 
 	page = (Page) buf.data;
 
@@ -477,27 +482,11 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		if (use_wal)
 			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false);
 
-		PageSetChecksumInplace(page, blkno);
-
-		/*
-		 * Now write the page.  We say skipFsync = true because there's no
-		 * need for smgr to schedule an fsync for this write; we'll do it
-		 * ourselves below.
-		 */
-		smgrextend(dst, forkNum, blkno, buf.data, true);
+		unbuffered_extend(&wstate, forkNum, blkno, buf.data, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
-	 * occurring during the copy has no way to flush the previously written
-	 * data to disk (indeed it won't know the new rel even exists).  A crash
-	 * later on would replay WAL from the checkpoint, therefore it wouldn't
-	 * replay our earlier WAL entries. If we do not fsync those pages here,
-	 * they might still not be on disk when the crash occurs.
-	 */
 	if (use_wal || copying_initfork)
-		smgrimmedsync(dst, forkNum);
+		unbuffered_finish(&wstate, forkNum);
 }
 
 /*
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..501fae5f9d 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = buffer direct file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/Makefile b/src/backend/storage/direct/Makefile
new file mode 100644
index 0000000000..d82bbed48c
--- /dev/null
+++ b/src/backend/storage/direct/Makefile
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for storage/direct
+#
+# IDENTIFICATION
+#    src/backend/storage/direct/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/storage/direct
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = directmgr.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/directmgr.c b/src/backend/storage/direct/directmgr.c
new file mode 100644
index 0000000000..d6f02487f8
--- /dev/null
+++ b/src/backend/storage/direct/directmgr.c
@@ -0,0 +1,55 @@
+/*-------------------------------------------------------------------------
+ *
+ * directmgr.c
+ *	  routines for managing unbuffered IO
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/direct/directmgr.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+
+#include "access/xlog.h"
+#include "storage/directmgr.h"
+#include "utils/rel.h"
+
+void unbuffered_prep(UnBufferedWriteState *wstate)
+{
+	wstate->redo = GetRedoRecPtr();
+}
+
+/*
+ * When writing data outside shared buffers, a concurrent CHECKPOINT can move
+ * the redo pointer past our WAL entries and won't flush our data to disk. If
+ * the database crashes before the data makes it to disk, our WAL won't be
+ * replayed and the data will be lost.
+ * Thus, if a CHECKPOINT begins between unbuffered_prep() and
+ * unbuffered_finish(), the backend must fsync the data itself.
+ */
+void unbuffered_finish(UnBufferedWriteState *wstate, ForkNumber forknum)
+{
+	if (RedoRecPtrChanged(wstate->redo))
+		smgrimmedsync(wstate->smgr_rel, forknum);
+}
+
+void
+unbuffered_write(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber blocknum, Page page)
+{
+	PageSetChecksumInplace(page, blocknum);
+	smgrwrite(wstate->smgr_rel, forknum, blocknum, (char *) page, false);
+}
+
+void
+unbuffered_extend(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber blocknum, Page page, bool empty)
+{
+	if (!empty)
+		PageSetChecksumInplace(page, blocknum);
+	smgrextend(wstate->smgr_rel, forknum, blocknum, (char *) page, false);
+}
+
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 09d4b16067..26abdfa589 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -26,6 +26,7 @@
 #include "access/htup_details.h"
 #include "access/xlogutils.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/fsm_internals.h"
 #include "storage/lmgr.h"
@@ -608,6 +609,9 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 	BlockNumber fsm_nblocks_now;
 	PGAlignedBlock pg;
 	SMgrRelation reln;
+	UnBufferedWriteState ub_wstate;
+
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 
 	PageInit((Page) pg.data, BLCKSZ, 0);
 
@@ -647,10 +651,8 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 	/* Extend as needed. */
 	while (fsm_nblocks_now < fsm_nblocks)
 	{
-		PageSetChecksumInplace((Page) pg.data, fsm_nblocks_now);
-
-		smgrextend(reln, FSM_FORKNUM, fsm_nblocks_now,
-				   pg.data, false);
+		// TODO: why was it checksumming all zero pages?
+		unbuffered_extend(&ub_wstate, FSM_FORKNUM, fsm_nblocks_now, (Page) pg.data, false);
 		fsm_nblocks_now++;
 	}
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 5e2c94a05f..4a7b0d42de 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -314,6 +314,7 @@ extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
+extern bool RedoRecPtrChanged(XLogRecPtr comparator_ptr);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool PromoteIsTriggered(void);
-- 
2.27.0

