From 529fa1f57946d70736b2304c2883213e45f7c077 Mon Sep 17 00:00:00 2001
From: John Naylor <jcnaylor@gmail.com>
Date: Mon, 22 Oct 2018 13:39:25 +0700
Subject: [PATCH v6] Avoid creation of the free space map for small tables.

The FSM isn't created if the heap has fewer than 8 blocks. If the last
known good block has insufficient space, try every block before extending
the heap.

If a heap with a FSM is truncated back to below the threshold, the FSM
stays around and can be used as usual.
---
 contrib/pageinspect/expected/page.out     |  77 ++++----
 contrib/pageinspect/sql/page.sql          |  33 ++--
 src/backend/access/brin/brin.c            |   2 +-
 src/backend/access/brin/brin_pageops.c    |   8 +-
 src/backend/access/heap/hio.c             |  57 ++++--
 src/backend/commands/vacuumlazy.c         |  17 +-
 src/backend/storage/freespace/freespace.c | 224 +++++++++++++++++++++-
 src/backend/storage/freespace/indexfsm.c  |   4 +-
 src/include/storage/freespace.h           |   7 +-
 9 files changed, 344 insertions(+), 85 deletions(-)

diff --git a/contrib/pageinspect/expected/page.out b/contrib/pageinspect/expected/page.out
index 3fcd9fbe6d..83e5910453 100644
--- a/contrib/pageinspect/expected/page.out
+++ b/contrib/pageinspect/expected/page.out
@@ -1,48 +1,69 @@
 CREATE EXTENSION pageinspect;
-CREATE TABLE test1 (a int, b int);
-INSERT INTO test1 VALUES (16777217, 131584);
-VACUUM test1;  -- set up FSM
+CREATE TABLE test_rel_forks (a int);
+-- Make sure there are enough blocks in the heap for the FSM to be created.
+INSERT INTO test_rel_forks SELECT g from generate_series(1,10000) g;
+-- set up FSM and VM
+VACUUM test_rel_forks;
 -- The page contents can vary, so just test that it can be read
 -- successfully, but don't keep the output.
-SELECT octet_length(get_raw_page('test1', 'main', 0)) AS main_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 0)) AS main_0;
  main_0 
 --------
    8192
 (1 row)
 
-SELECT octet_length(get_raw_page('test1', 'main', 1)) AS main_1;
-ERROR:  block number 1 is out of range for relation "test1"
-SELECT octet_length(get_raw_page('test1', 'fsm', 0)) AS fsm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 100)) AS main_100;
+ERROR:  block number 100 is out of range for relation "test_rel_forks"
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 0)) AS fsm_0;
  fsm_0 
 -------
   8192
 (1 row)
 
-SELECT octet_length(get_raw_page('test1', 'fsm', 1)) AS fsm_1;
- fsm_1 
--------
-  8192
-(1 row)
-
-SELECT octet_length(get_raw_page('test1', 'vm', 0)) AS vm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 10)) AS fsm_10;
+ERROR:  block number 10 is out of range for relation "test_rel_forks"
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 0)) AS vm_0;
  vm_0 
 ------
  8192
 (1 row)
 
-SELECT octet_length(get_raw_page('test1', 'vm', 1)) AS vm_1;
-ERROR:  block number 1 is out of range for relation "test1"
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 1)) AS vm_1;
+ERROR:  block number 1 is out of range for relation "test_rel_forks"
 SELECT octet_length(get_raw_page('xxx', 'main', 0));
 ERROR:  relation "xxx" does not exist
-SELECT octet_length(get_raw_page('test1', 'xxx', 0));
+SELECT octet_length(get_raw_page('test_rel_forks', 'xxx', 0));
 ERROR:  invalid fork name
 HINT:  Valid fork names are "main", "fsm", "vm", and "init".
-SELECT get_raw_page('test1', 0) = get_raw_page('test1', 'main', 0);
+SELECT * FROM fsm_page_contents(get_raw_page('test_rel_forks', 'fsm', 0));
+ fsm_page_contents 
+-------------------
+ 0: 192           +
+ 1: 192           +
+ 3: 192           +
+ 7: 192           +
+ 15: 192          +
+ 31: 192          +
+ 63: 192          +
+ 127: 192         +
+ 255: 192         +
+ 511: 192         +
+ 1023: 192        +
+ 2047: 192        +
+ 4095: 192        +
+ fp_next_slot: 0  +
+ 
+(1 row)
+
+SELECT get_raw_page('test_rel_forks', 0) = get_raw_page('test_rel_forks', 'main', 0);
  ?column? 
 ----------
  t
 (1 row)
 
+DROP TABLE test_rel_forks;
+CREATE TABLE test1 (a int, b int);
+INSERT INTO test1 VALUES (16777217, 131584);
 SELECT pagesize, version FROM page_header(get_raw_page('test1', 0));
  pagesize | version 
 ----------+---------
@@ -62,26 +83,6 @@ SELECT tuple_data_split('test1'::regclass, t_data, t_infomask, t_infomask2, t_bi
  {"\\x01000001","\\x00020200"}
 (1 row)
 
-SELECT * FROM fsm_page_contents(get_raw_page('test1', 'fsm', 0));
- fsm_page_contents 
--------------------
- 0: 254           +
- 1: 254           +
- 3: 254           +
- 7: 254           +
- 15: 254          +
- 31: 254          +
- 63: 254          +
- 127: 254         +
- 255: 254         +
- 511: 254         +
- 1023: 254        +
- 2047: 254        +
- 4095: 254        +
- fp_next_slot: 0  +
- 
-(1 row)
-
 DROP TABLE test1;
 -- check that using any of these functions with a partitioned table or index
 -- would fail
diff --git a/contrib/pageinspect/sql/page.sql b/contrib/pageinspect/sql/page.sql
index 8ac9991837..ee811759d5 100644
--- a/contrib/pageinspect/sql/page.sql
+++ b/contrib/pageinspect/sql/page.sql
@@ -1,26 +1,35 @@
 CREATE EXTENSION pageinspect;
 
-CREATE TABLE test1 (a int, b int);
-INSERT INTO test1 VALUES (16777217, 131584);
+CREATE TABLE test_rel_forks (a int);
+-- Make sure there are enough blocks in the heap for the FSM to be created.
+INSERT INTO test_rel_forks SELECT g from generate_series(1,10000) g;
 
-VACUUM test1;  -- set up FSM
+-- set up FSM and VM
+VACUUM test_rel_forks;
 
 -- The page contents can vary, so just test that it can be read
 -- successfully, but don't keep the output.
 
-SELECT octet_length(get_raw_page('test1', 'main', 0)) AS main_0;
-SELECT octet_length(get_raw_page('test1', 'main', 1)) AS main_1;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 0)) AS main_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'main', 100)) AS main_100;
 
-SELECT octet_length(get_raw_page('test1', 'fsm', 0)) AS fsm_0;
-SELECT octet_length(get_raw_page('test1', 'fsm', 1)) AS fsm_1;
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 0)) AS fsm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'fsm', 10)) AS fsm_10;
 
-SELECT octet_length(get_raw_page('test1', 'vm', 0)) AS vm_0;
-SELECT octet_length(get_raw_page('test1', 'vm', 1)) AS vm_1;
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 0)) AS vm_0;
+SELECT octet_length(get_raw_page('test_rel_forks', 'vm', 1)) AS vm_1;
 
 SELECT octet_length(get_raw_page('xxx', 'main', 0));
-SELECT octet_length(get_raw_page('test1', 'xxx', 0));
+SELECT octet_length(get_raw_page('test_rel_forks', 'xxx', 0));
+
+SELECT * FROM fsm_page_contents(get_raw_page('test_rel_forks', 'fsm', 0));
+
+SELECT get_raw_page('test_rel_forks', 0) = get_raw_page('test_rel_forks', 'main', 0);
 
-SELECT get_raw_page('test1', 0) = get_raw_page('test1', 'main', 0);
+DROP TABLE test_rel_forks;
+
+CREATE TABLE test1 (a int, b int);
+INSERT INTO test1 VALUES (16777217, 131584);
 
 SELECT pagesize, version FROM page_header(get_raw_page('test1', 0));
 
@@ -29,8 +38,6 @@ SELECT page_checksum(get_raw_page('test1', 0), 0) IS NOT NULL AS silly_checksum_
 SELECT tuple_data_split('test1'::regclass, t_data, t_infomask, t_infomask2, t_bits)
     FROM heap_page_items(get_raw_page('test1', 0));
 
-SELECT * FROM fsm_page_contents(get_raw_page('test1', 'fsm', 0));
-
 DROP TABLE test1;
 
 -- check that using any of these functions with a partitioned table or index
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index e95fbbcea7..7c5b1af764 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -1148,7 +1148,7 @@ terminate_brin_buildstate(BrinBuildState *state)
 		freespace = PageGetFreeSpace(page);
 		blk = BufferGetBlockNumber(state->bs_currentInsertBuf);
 		ReleaseBuffer(state->bs_currentInsertBuf);
-		RecordPageWithFreeSpace(state->bs_irel, blk, freespace);
+		RecordPageWithFreeSpace(state->bs_irel, blk, freespace, InvalidBlockNumber);
 		FreeSpaceMapVacuumRange(state->bs_irel, blk, blk + 1);
 	}
 
diff --git a/src/backend/access/brin/brin_pageops.c b/src/backend/access/brin/brin_pageops.c
index 040cb62e55..70d93878ba 100644
--- a/src/backend/access/brin/brin_pageops.c
+++ b/src/backend/access/brin/brin_pageops.c
@@ -310,7 +310,7 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
 
 		if (extended)
 		{
-			RecordPageWithFreeSpace(idxrel, newblk, freespace);
+			RecordPageWithFreeSpace(idxrel, newblk, freespace, InvalidBlockNumber);
 			FreeSpaceMapVacuumRange(idxrel, newblk, newblk + 1);
 		}
 
@@ -461,7 +461,7 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
 
 	if (extended)
 	{
-		RecordPageWithFreeSpace(idxrel, blk, freespace);
+		RecordPageWithFreeSpace(idxrel, blk, freespace, InvalidBlockNumber);
 		FreeSpaceMapVacuumRange(idxrel, blk, blk + 1);
 	}
 
@@ -654,7 +654,7 @@ brin_page_cleanup(Relation idxrel, Buffer buf)
 
 	/* Measure free space and record it */
 	RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buf),
-							br_page_get_freespace(page));
+							br_page_get_freespace(page), InvalidBlockNumber);
 }
 
 /*
@@ -895,7 +895,7 @@ brin_initialize_empty_new_buffer(Relation idxrel, Buffer buffer)
 	 * pages whose FSM records were forgotten in a crash.
 	 */
 	RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buffer),
-							br_page_get_freespace(page));
+							br_page_get_freespace(page), InvalidBlockNumber);
 }
 
 
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index b8b5871559..092ca5b571 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -24,6 +24,8 @@
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 
+/*#define TRACE_TARGETBLOCK */
+
 
 /*
  * RelationPutHeapTuple - place tuple at specified page
@@ -239,8 +241,12 @@ RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
 		 * Immediately update the bottom level of the FSM.  This has a good
 		 * chance of making this page visible to other concurrently inserting
 		 * backends, and we want that to happen without delay.
+		 *
+		 * Since we know the table will end up with more than extraBlocks
+		 * number of pages, we pass that number to avoid an unnecessary
+		 * system call and to make sure the FSM has been created.
 		 */
-		RecordPageWithFreeSpace(relation, blockNum, freespace);
+		RecordPageWithFreeSpace(relation, blockNum, freespace, extraBlocks);
 	}
 	while (--extraBlocks > 0);
 
@@ -378,24 +384,15 @@ RelationGetBufferForTuple(Relation relation, Size len,
 		 * target.
 		 */
 		targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
-
-		/*
-		 * If the FSM knows nothing of the rel, try the last page before we
-		 * give up and extend.  This avoids one-tuple-per-page syndrome during
-		 * bootstrapping or in a recently-started system.
-		 */
-		if (targetBlock == InvalidBlockNumber)
-		{
-			BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
-
-			if (nblocks > 0)
-				targetBlock = nblocks - 1;
-		}
 	}
 
 loop:
 	while (targetBlock != InvalidBlockNumber)
 	{
+
+#ifdef TRACE_TARGETBLOCK
+		elog(DEBUG1, "Attempting block %u", targetBlock);
+#endif
 		/*
 		 * Read and exclusive-lock the target block, as well as the other
 		 * block if one was given, taking suitable care with lock ordering and
@@ -484,6 +481,16 @@ loop:
 		{
 			/* use this page as future insert target, too */
 			RelationSetTargetBlock(relation, targetBlock);
+
+			/*
+			 * In case we used an in-memory map of available blocks, reset
+			 * it for next use.
+			 */
+			ClearLocalMap();
+
+#ifdef TRACE_TARGETBLOCK
+			elog(DEBUG1, "Returning buffer for block %u", targetBlock);
+#endif
 			return buffer;
 		}
 
@@ -557,11 +564,18 @@ loop:
 				goto loop;
 			}
 
+#ifdef TRACE_TARGETBLOCK
+			elog(DEBUG1, "Bulk-extending relation");
+#endif
 			/* Time to bulk-extend. */
 			RelationAddExtraBlocks(relation, bistate);
 		}
 	}
 
+#ifdef TRACE_TARGETBLOCK
+	elog(DEBUG1, "Extending relation");
+#endif
+
 	/*
 	 * In addition to whatever extension we performed above, we always add at
 	 * least one block to satisfy our own request.
@@ -600,10 +614,11 @@ loop:
 	 * risk wiping out valid data).
 	 */
 	page = BufferGetPage(buffer);
+	targetBlock = BufferGetBlockNumber(buffer);
 
 	if (!PageIsNew(page))
 		elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
-			 BufferGetBlockNumber(buffer),
+			 targetBlock,
 			 RelationGetRelationName(relation));
 
 	PageInit(page, BufferGetPageSize(buffer), 0);
@@ -623,7 +638,17 @@ loop:
 	 * current backend to make more insertions or not, which is probably a
 	 * good bet most of the time.  So for now, don't add it to FSM yet.
 	 */
-	RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
+	RelationSetTargetBlock(relation, targetBlock);
+
+	/*
+	 * In case we used an in-memory map of available blocks, reset
+	 * it for next use.
+	 */
+	ClearLocalMap();
+
+#ifdef TRACE_TARGETBLOCK
+	elog(DEBUG1, "Returning buffer for block %u", targetBlock);
+#endif
 
 	return buffer;
 }
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 8996d366e9..9f4d8adcdb 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -154,7 +154,7 @@ static BufferAccessStrategy vac_strategy;
 static void lazy_scan_heap(Relation onerel, int options,
 			   LVRelStats *vacrelstats, Relation *Irel, int nindexes,
 			   bool aggressive);
-static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
+static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks);
 static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
 static void lazy_vacuum_index(Relation indrel,
 				  IndexBulkDeleteResult **stats,
@@ -759,7 +759,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			pgstat_progress_update_multi_param(2, hvp_index, hvp_val);
 
 			/* Remove tuples from heap */
-			lazy_vacuum_heap(onerel, vacrelstats);
+			lazy_vacuum_heap(onerel, vacrelstats, nblocks);
 
 			/*
 			 * Forget the now-vacuumed tuples, and press on, but be careful
@@ -897,7 +897,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			MarkBufferDirty(buf);
 			UnlockReleaseBuffer(buf);
 
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 			continue;
 		}
 
@@ -936,7 +936,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			}
 
 			UnlockReleaseBuffer(buf);
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 			continue;
 		}
 
@@ -1339,7 +1339,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		 * taken if there are no indexes.)
 		 */
 		if (vacrelstats->num_dead_tuples == prev_dead_count)
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+			RecordPageWithFreeSpace(onerel, blkno, freespace, nblocks);
 	}
 
 	/* report that everything is scanned and vacuumed */
@@ -1401,7 +1401,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		/* Remove tuples from heap */
 		pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
 									 PROGRESS_VACUUM_PHASE_VACUUM_HEAP);
-		lazy_vacuum_heap(onerel, vacrelstats);
+		lazy_vacuum_heap(onerel, vacrelstats, nblocks);
 		vacrelstats->num_index_scans++;
 	}
 
@@ -1472,9 +1472,10 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
  * Note: the reason for doing this as a second pass is we cannot remove
  * the tuples until we've removed their index entries, and we want to
  * process index entry removal in batches as large as possible.
+ * Note: nblocks is passed as an optimization for RecordPageWithFreeSpace().
  */
 static void
-lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
+lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats, BlockNumber nblocks)
 {
 	int			tupindex;
 	int			npages;
@@ -1511,7 +1512,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
 		freespace = PageGetHeapFreeSpace(page);
 
 		UnlockReleaseBuffer(buf);
-		RecordPageWithFreeSpace(onerel, tblk, freespace);
+		RecordPageWithFreeSpace(onerel, tblk, freespace, nblocks);
 		npages++;
 	}
 
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 7c4ad1c449..1005459f7c 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -76,6 +76,11 @@
 #define FSM_ROOT_LEVEL	(FSM_TREE_DEPTH - 1)
 #define FSM_BOTTOM_LEVEL 0
 
+/* Status codes for local map. */
+#define FSM_LOCAL_ZERO	0x00	/* Beyond the end of the relation */
+#define FSM_LOCAL_AVAIL	0x01	/* Available to try */
+#define FSM_LOCAL_TRIED	0x02	/* Already tried, not enough space */
+
 /*
  * The internal FSM routines work on a logical addressing scheme. Each
  * level of the tree can be thought of as a separately addressable file.
@@ -89,6 +94,9 @@ typedef struct
 /* Address of the root page. */
 static const FSMAddress FSM_ROOT_ADDRESS = {FSM_ROOT_LEVEL, 0};
 
+/* Local map of block numbers to try. */
+static char FSMLocalMap[HEAP_FSM_CREATION_THRESHOLD] = {FSM_LOCAL_ZERO};
+
 /* functions to navigate the tree */
 static FSMAddress fsm_get_child(FSMAddress parent, uint16 slot);
 static FSMAddress fsm_get_parent(FSMAddress child, uint16 *slot);
@@ -111,6 +119,10 @@ static BlockNumber fsm_search(Relation rel, uint8 min_cat);
 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
 				BlockNumber start, BlockNumber end,
 				bool *eof);
+static bool fsm_allow_writes(Relation rel, BlockNumber heapblk,
+				BlockNumber nblocks, BlockNumber *get_nblocks);
+static void fsm_local_set(Relation rel, BlockNumber nblocks);
+static BlockNumber fsm_local_search(void);
 
 
 /******** Public API ********/
@@ -132,8 +144,35 @@ BlockNumber
 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
 {
 	uint8		min_cat = fsm_space_needed_to_cat(spaceNeeded);
+	BlockNumber target_block,
+				nblocks;
+
+	/* First try the FSM, if it exists. */
+	target_block = fsm_search(rel, min_cat);
 
-	return fsm_search(rel, min_cat);
+	if (target_block == InvalidBlockNumber &&
+		rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		nblocks = RelationGetNumberOfBlocks(rel);
+
+		if (nblocks > HEAP_FSM_CREATION_THRESHOLD)
+		{
+			/*
+			 * If the FSM knows nothing of the rel, try the last page before
+			 * we give up and extend.  This avoids one-tuple-per-page syndrome
+			 * during bootstrapping or in a recently-started system.
+			 */
+			target_block = nblocks - 1;
+		}
+		else if (nblocks > 0)
+		{
+			/* Create or update local map and get first candidate block. */
+			fsm_local_set(rel, nblocks);
+			target_block = fsm_local_search();
+		}
+	}
+
+	return target_block;
 }
 
 /*
@@ -154,6 +193,32 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
 	FSMAddress	addr;
 	uint16		slot;
 	int			search_slot;
+	BlockNumber nblocks = InvalidBlockNumber;
+
+	if (oldPage < HEAP_FSM_CREATION_THRESHOLD &&
+		rel->rd_rel->relkind == RELKIND_RELATION)
+	{
+		/* First try the local map, if it exists. */
+		if (FSMLocalMap[oldPage] == FSM_LOCAL_AVAIL)
+		{
+			FSMLocalMap[oldPage] = FSM_LOCAL_TRIED;
+			return fsm_local_search();
+		}
+
+		/*
+		 * If we have neither a local map nor an FSM, we probably just
+		 * tried the target block in the relcache and failed, so we'll
+		 * need to create the local map.
+		 */
+		if (!fsm_allow_writes(rel, oldPage, InvalidBlockNumber, &nblocks))
+		{
+			/* Create local map and get first candidate block. */
+			fsm_local_set(rel, nblocks);
+			return fsm_local_search();
+		}
+	}
+
+	/* Normal FSM logic follows */
 
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(oldPage, &slot);
@@ -178,11 +243,17 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
  * FreeSpaceMapVacuum call, which updates the upper level pages.
  */
 void
-RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
+RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
+						Size spaceAvail, BlockNumber nblocks)
 {
 	int			new_cat = fsm_space_avail_to_cat(spaceAvail);
 	FSMAddress	addr;
 	uint16		slot;
+	BlockNumber dummy;
+
+	/* Note: We do not have a local map to update. */
+	if (!fsm_allow_writes(rel, heapBlk, nblocks, &dummy))
+		return;
 
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(heapBlk, &slot);
@@ -190,6 +261,16 @@ RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
 	fsm_set_and_search(rel, addr, slot, new_cat, 0);
 }
 
+/*
+ * Clear the local map. We must call this when we have found a block with
+ * enough free space, or when we extend the relation.
+ */
+void
+ClearLocalMap(void)
+{
+	memset(FSMLocalMap, 0, sizeof(FSMLocalMap));
+}
+
 /*
  * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
  *		WAL replay
@@ -204,11 +285,35 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
 	BlockNumber blkno;
 	Buffer		buf;
 	Page		page;
+	bool		write_to_fsm;
 
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(heapBlk, &slot);
 	blkno = fsm_logical_to_physical(addr);
 
+	/* This is meant to mirror the logic in fsm_allow_writes() */
+	if (heapBlk > HEAP_FSM_CREATION_THRESHOLD)
+		write_to_fsm = true;
+	else
+	{
+		/* Open the relation at smgr level */
+		SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
+
+		if (smgrexists(smgr, FSM_FORKNUM))
+			write_to_fsm = true;
+		else
+		{
+			BlockNumber heap_nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
+			if (heap_nblocks > HEAP_FSM_CREATION_THRESHOLD)
+				write_to_fsm = true;
+			else
+				write_to_fsm = false;
+		}
+	}
+
+	if (!write_to_fsm)
+		return;
+
 	/* If the page doesn't exist already, extend */
 	buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
@@ -904,3 +1009,118 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 
 	return max_avail;
 }
+
+/*
+ * For heaps we prevent extension of the FSM unless the number of pages
+ * exceeds HEAP_FSM_CREATION_THRESHOLD. For tables that don't already have
+ * a FSM, this will save an inode and a few kB of space.
+ *
+ * XXX The API is a little awkward -- if the caller has a valid nblocks,
+ * it can pass it as an optimization to avoid a system call. If the caller
+ * passes InvalidBlockNumber and receives a false return value, it can
+ * get an up-to-date relation size from get_nblocks.
+ */
+static bool
+fsm_allow_writes(Relation rel, BlockNumber heapblk,
+				 BlockNumber nblocks, BlockNumber *get_nblocks)
+{
+	bool		skip_get_nblocks;
+
+	if (heapblk > HEAP_FSM_CREATION_THRESHOLD)
+		return true;
+
+	/* Index rels can always create an FSM. */
+	if (rel->rd_rel->relkind != RELKIND_RELATION)
+		return true;
+
+	/*
+	 * If the caller knows nblocks, we can avoid a system call later.
+	 * If it doesn't, maybe we have relpages from a previous VACUUM.
+	 * Since the table may have extended since then, we still have to
+	 * count the pages later if we can't return now.
+	 */
+	if (nblocks != InvalidBlockNumber)
+	{
+		if (nblocks > HEAP_FSM_CREATION_THRESHOLD)
+			return true;
+		else
+			skip_get_nblocks = true;
+	}
+	else
+	{
+		if (rel->rd_rel->relpages != InvalidBlockNumber &&
+			rel->rd_rel->relpages > HEAP_FSM_CREATION_THRESHOLD)
+			return true;
+		else
+			skip_get_nblocks = false;
+	}
+
+	RelationOpenSmgr(rel);
+	if (smgrexists(rel->rd_smgr, FSM_FORKNUM))
+		return true;
+
+	if (skip_get_nblocks)
+		return false;
+
+	/* last resort */
+	*get_nblocks = RelationGetNumberOfBlocks(rel);
+	if (*get_nblocks > HEAP_FSM_CREATION_THRESHOLD)
+		return true;
+	else
+		return false;
+}
+
+/*
+ * Set or update the local map of blocks to try, for when there is no FSM.
+ * It's designed specifically to be idempotent, so the heap insert code
+ * should only clear the map when it has found a free block or when it
+ * has to extend the relation.  That way, if we have tried all blocks we
+ * know about, but the rel has been extended by another backend, this will
+ * set only newly added blocks to 'available'.
+ */
+static void
+fsm_local_set(Relation rel, BlockNumber nblocks)
+{
+	BlockNumber blkno,
+				cached_target_block;
+
+	/*
+	 * If the blkno is beyond the end of the relation, the status should
+	 * be zero already, but make sure it is.  If the blkno is within the
+	 * relation, mark it available unless it's already been tried.
+	 */
+	for (blkno = 0; blkno < HEAP_FSM_CREATION_THRESHOLD; blkno++)
+	{
+		if (blkno < nblocks)
+			FSMLocalMap[blkno] |= FSM_LOCAL_AVAIL;
+		else
+			FSMLocalMap[blkno] = FSM_LOCAL_ZERO;
+	}
+
+	/* Set the status of the cached target block to 'tried'. */
+	cached_target_block = RelationGetTargetBlock(rel);
+	if (cached_target_block != InvalidBlockNumber &&
+		cached_target_block < HEAP_FSM_CREATION_THRESHOLD)
+		FSMLocalMap[cached_target_block] = FSM_LOCAL_TRIED;
+}
+
+/*
+ * Search the local map for an available block to try, in descending order.
+ *
+ * For use when there is no FSM.
+ */
+static BlockNumber
+fsm_local_search(void)
+{
+	BlockNumber		target_block = HEAP_FSM_CREATION_THRESHOLD;
+
+	do
+	{
+		target_block--;
+		if (FSMLocalMap[target_block] == FSM_LOCAL_AVAIL)
+			return target_block;
+	}
+	while (target_block > 0);
+
+	return InvalidBlockNumber;
+}
diff --git a/src/backend/storage/freespace/indexfsm.c b/src/backend/storage/freespace/indexfsm.c
index e21047b96f..d8fd29a7eb 100644
--- a/src/backend/storage/freespace/indexfsm.c
+++ b/src/backend/storage/freespace/indexfsm.c
@@ -51,7 +51,7 @@ GetFreeIndexPage(Relation rel)
 void
 RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
 {
-	RecordPageWithFreeSpace(rel, freeBlock, BLCKSZ - 1);
+	RecordPageWithFreeSpace(rel, freeBlock, BLCKSZ - 1, InvalidBlockNumber);
 }
 
 
@@ -61,7 +61,7 @@ RecordFreeIndexPage(Relation rel, BlockNumber freeBlock)
 void
 RecordUsedIndexPage(Relation rel, BlockNumber usedBlock)
 {
-	RecordPageWithFreeSpace(rel, usedBlock, 0);
+	RecordPageWithFreeSpace(rel, usedBlock, 0, InvalidBlockNumber);
 }
 
 /*
diff --git a/src/include/storage/freespace.h b/src/include/storage/freespace.h
index 726eb30fb8..0d130ddfe0 100644
--- a/src/include/storage/freespace.h
+++ b/src/include/storage/freespace.h
@@ -18,6 +18,10 @@
 #include "storage/relfilenode.h"
 #include "utils/relcache.h"
 
+/* Only extend a heap's FSM if the heap has greater than this many blocks */
+/* TODO: Performance-test different values. */
+#define HEAP_FSM_CREATION_THRESHOLD 8
+
 /* prototypes for public functions in freespace.c */
 extern Size GetRecordedFreeSpace(Relation rel, BlockNumber heapBlk);
 extern BlockNumber GetPageWithFreeSpace(Relation rel, Size spaceNeeded);
@@ -26,7 +30,8 @@ extern BlockNumber RecordAndGetPageWithFreeSpace(Relation rel,
 							  Size oldSpaceAvail,
 							  Size spaceNeeded);
 extern void RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk,
-						Size spaceAvail);
+						Size spaceAvail, BlockNumber nblocks);
+extern void ClearLocalMap(void);
 extern void XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
 							Size spaceAvail);
 
-- 
2.17.1

