From 60f76fc83ee8752362e037c1e19ed089d861e026 Mon Sep 17 00:00:00 2001
From: Sokolov Yura <funny.falcon@postgrespro.ru>
Date: Mon, 3 Jul 2017 15:14:07 +0300
Subject: [PATCH] fsm&vacuum: write increasing of free space on upper levels

Every RecordPageWithFreeSpace update upper levels, if amount of free
spaces increased.
Also, do FreeSpaceMapVacuum after scanning heap and before vacuuming
indices.
---
 src/backend/commands/vacuumlazy.c         | 16 +++++-----
 src/backend/storage/freespace/freespace.c | 49 ++++++++++++++++++++++++++-----
 src/backend/storage/freespace/fsmpage.c   |  4 ++-
 src/include/storage/fsm_internals.h       |  2 +-
 4 files changed, 53 insertions(+), 18 deletions(-)

diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index fc9c4f0fb1..a7fff0c5ae 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -595,7 +595,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 					maxoff;
 		bool		tupgone,
 					hastup;
-		int			prev_dead_count;
 		int			nfrozen;
 		Size		freespace;
 		bool		all_visible_according_to_vm = false;
@@ -925,7 +924,6 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 		has_dead_tuples = false;
 		nfrozen = 0;
 		hastup = false;
-		prev_dead_count = vacrelstats->num_dead_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
 
 		/*
@@ -1245,16 +1243,16 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			vacrelstats->nonempty_pages = blkno + 1;
 
 		/*
-		 * If we remembered any tuples for deletion, then the page will be
-		 * visited again by lazy_vacuum_heap, which will compute and record
-		 * its post-compaction free space.  If not, then we're done with this
-		 * page, so remember its free space as-is.  (This path will always be
-		 * taken if there are no indexes.)
+		 * heap_page_prune could free a bit of space. Lets record it
+		 * immediatly despite it will by recorded again in lazy_vacuum_heap
+		 * after more compaction.
 		 */
-		if (vacrelstats->num_dead_tuples == prev_dead_count)
-			RecordPageWithFreeSpace(onerel, blkno, freespace);
+		RecordPageWithFreeSpace(onerel, blkno, freespace);
 	}
 
+	/* fix up all tiny bits of freed space before vacuuming indices */
+	FreeSpaceMapVacuum(onerel);
+
 	/* report that everything is scanned and vacuumed */
 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
 
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 4648473523..ca0c356f28 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -107,6 +107,8 @@ static Size fsm_space_cat_to_avail(uint8 cat);
 /* workhorse functions for various operations */
 static int fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
 				   uint8 newValue, uint8 minValue);
+static void fsm_set_recursive(Relation rel, FSMAddress addr, uint16 slot,
+				  uint8 new_cat, bool only_increase);
 static BlockNumber fsm_search(Relation rel, uint8 min_cat);
 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof);
 static BlockNumber fsm_get_lastblckno(Relation rel, FSMAddress addr);
@@ -173,9 +175,8 @@ RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
 /*
  * RecordPageWithFreeSpace - update info about a page.
  *
- * Note that if the new spaceAvail value is higher than the old value stored
- * in the FSM, the space might not become visible to searchers until the next
- * FreeSpaceMapVacuum call, which updates the upper level pages.
+ * It tries to updates upper levels immediatly, if new spaceAvail is higher
+ * than the old value stored in the FSM.
  */
 void
 RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
@@ -187,7 +188,7 @@ RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
 	/* Get the location of the FSM byte representing the heap block */
 	addr = fsm_get_location(heapBlk, &slot);
 
-	fsm_set_and_search(rel, addr, slot, new_cat, 0);
+	fsm_set_recursive(rel, addr, slot, new_cat, false);
 }
 
 /*
@@ -257,7 +258,7 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
 	if (PageIsNew(page))
 		PageInit(page, BLCKSZ, 0);
 
-	if (fsm_set_avail(page, slot, new_cat))
+	if (fsm_set_avail(page, slot, new_cat, false))
 		MarkBufferDirtyHint(buf, false);
 	UnlockReleaseBuffer(buf);
 }
@@ -677,7 +678,7 @@ fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
 
 	page = BufferGetPage(buf);
 
-	if (fsm_set_avail(page, slot, newValue))
+	if (fsm_set_avail(page, slot, newValue, false))
 		MarkBufferDirtyHint(buf, false);
 
 	if (minValue != 0)
@@ -693,6 +694,40 @@ fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
 	return newslot;
 }
 
+
+/*
+ * Set value in given FSM page and slot. If maximum value of a FSM page
+ * increased, then update value on upper FSM page.
+ */
+static void
+fsm_set_recursive(Relation rel, FSMAddress addr, uint16 slot, uint8 new_cat, bool only_increase)
+{
+	Buffer		buf;
+	Page		page;
+	uint8		max_avail;
+	uint8		max_avail2;
+
+	buf = fsm_readbuf(rel, addr, true);
+	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+	page = BufferGetPage(buf);
+
+	max_avail = fsm_get_max_avail(page);
+	if (fsm_set_avail(page, slot, new_cat, only_increase))
+		MarkBufferDirtyHint(buf, false);
+	max_avail2 = fsm_get_max_avail(page);
+
+	UnlockReleaseBuffer(buf);
+
+	if (max_avail < new_cat && addr.level != FSM_ROOT_LEVEL)
+	{
+		Assert(max_avail2 >= new_cat);
+
+		addr = fsm_get_parent(addr, &slot);
+		fsm_set_recursive(rel, addr, slot, max_avail2, true);
+	}
+}
+
 /*
  * Search the tree for a heap page with at least min_cat of free space
  */
@@ -828,7 +863,7 @@ fsm_vacuum_page(Relation rel, FSMAddress addr, bool *eof_p)
 			if (fsm_get_avail(page, slot) != child_avail)
 			{
 				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-				fsm_set_avail(BufferGetPage(buf), slot, child_avail);
+				fsm_set_avail(BufferGetPage(buf), slot, child_avail, false);
 				MarkBufferDirtyHint(buf, false);
 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 			}
diff --git a/src/backend/storage/freespace/fsmpage.c b/src/backend/storage/freespace/fsmpage.c
index 987a2f5e53..97999adfad 100644
--- a/src/backend/storage/freespace/fsmpage.c
+++ b/src/backend/storage/freespace/fsmpage.c
@@ -60,7 +60,7 @@ rightneighbor(int x)
  * The caller must hold an exclusive lock on the page.
  */
 bool
-fsm_set_avail(Page page, int slot, uint8 value)
+fsm_set_avail(Page page, int slot, uint8 value, bool only_increasing)
 {
 	int			nodeno = NonLeafNodesPerPage + slot;
 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
@@ -73,6 +73,8 @@ fsm_set_avail(Page page, int slot, uint8 value)
 	/* If the value hasn't changed, we don't need to do anything */
 	if (oldvalue == value && value <= fsmpage->fp_nodes[0])
 		return false;
+	if (only_increasing && oldvalue > value && value <= fsmpage->fp_nodes[0])
+		return false;
 
 	fsmpage->fp_nodes[nodeno] = value;
 
diff --git a/src/include/storage/fsm_internals.h b/src/include/storage/fsm_internals.h
index 4eb3fc12b1..12f4265c52 100644
--- a/src/include/storage/fsm_internals.h
+++ b/src/include/storage/fsm_internals.h
@@ -65,7 +65,7 @@ extern int fsm_search_avail(Buffer buf, uint8 min_cat, bool advancenext,
 				 bool exclusive_lock_held);
 extern uint8 fsm_get_avail(Page page, int slot);
 extern uint8 fsm_get_max_avail(Page page);
-extern bool fsm_set_avail(Page page, int slot, uint8 value);
+extern bool fsm_set_avail(Page page, int slot, uint8 value, bool only_increasing);
 extern bool fsm_truncate_avail(Page page, int nslots);
 extern bool fsm_rebuild_page(Page page);
 
-- 
2.11.0

