From 77fe22011ad412dffc960408e28994bb66d0b06a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Thu, 20 Jul 2023 10:19:08 -0700
Subject: [PATCH v9 3/4] Convert pg_restore's ready_list to a priority queue.

Presently, we spend a lot of time sorting this list so that we pick
the largest items first.  With many tables, this sorting can become
a significant bottleneck.  There are a couple of reports from the
field about this, and it is easily reproducible, so this is not a
hypothetical issue.

This commit improves the performance of pg_restore with many tables
by converting its ready_list to a priority queue, i.e., a binary
heap.  We will first try to run the highest priority item, but if
it cannot be chosen due to the lock heuristic, we'll do a
sequential scan through the heap nodes until we find one that is
runnable.  This means that we might end up picking an item with
much lower priority, but since we expect that we'll typically be
able to choose one of the first few nodes, we should usually pick
an item with a relatively high priority.

On my machine, a basic test with 100,000 tables takes 11.5 minutes
without this patch and 1.5 minutes with it.  Pierre Ducroquet
claims to see a speedup from 30 minutes to 23 minutes for a
real-world dump of over 50,000 tables.

Suggested-by: Tom Lane
Tested-by: Pierre Ducroquet
Reviewed-by: Tom Lane
Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us
---
 src/bin/pg_dump/pg_backup_archiver.c | 197 ++++++++-------------------
 1 file changed, 57 insertions(+), 140 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 4d83381d84..ea17ca4559 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -34,6 +34,7 @@
 #include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/string_utils.h"
+#include "lib/binaryheap.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq-fs.h"
 #include "parallel.h"
@@ -44,24 +45,6 @@
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/*
- * State for tracking TocEntrys that are ready to process during a parallel
- * restore.  (This used to be a list, and we still call it that, though now
- * it's really an array so that we can apply qsort to it.)
- *
- * tes[] is sized large enough that we can't overrun it.
- * The valid entries are indexed first_te .. last_te inclusive.
- * We periodically sort the array to bring larger-by-dataLength entries to
- * the front; "sorted" is true if the valid entries are known sorted.
- */
-typedef struct _parallelReadyList
-{
-	TocEntry  **tes;			/* Ready-to-dump TocEntrys */
-	int			first_te;		/* index of first valid entry in tes[] */
-	int			last_te;		/* index of last valid entry in tes[] */
-	bool		sorted;			/* are valid entries currently sorted? */
-} ParallelReadyList;
-
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
@@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
 static void pending_list_header_init(TocEntry *l);
 static void pending_list_append(TocEntry *l, TocEntry *te);
 static void pending_list_remove(TocEntry *te);
-static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
-static void ready_list_free(ParallelReadyList *ready_list);
-static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
-static void ready_list_remove(ParallelReadyList *ready_list, int i);
-static void ready_list_sort(ParallelReadyList *ready_list);
-static int	TocEntrySizeCompare(const void *p1, const void *p2);
-static void move_to_ready_list(TocEntry *pending_list,
-							   ParallelReadyList *ready_list,
+static int	TocEntrySizeCompareQsort(const void *p1, const void *p2);
+static int	TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
+static void move_to_ready_heap(TocEntry *pending_list,
+							   binaryheap *ready_heap,
 							   RestorePass pass);
-static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
+static TocEntry *pop_next_work_item(binaryheap *ready_heap,
 									ParallelState *pstate);
 static void mark_dump_job_done(ArchiveHandle *AH,
 							   TocEntry *te,
@@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH);
 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-								ParallelReadyList *ready_list);
+								binaryheap *ready_heap);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 
@@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
 		}
 
 		if (ntes > 1)
-			qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
+			qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
 
 		for (int i = 0; i < ntes; i++)
 			DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
@@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 
 			(void) restore_toc_entry(AH, next_work_item, false);
 
-			/* Reduce dependencies, but don't move anything to ready_list */
+			/* Reduce dependencies, but don't move anything to ready_heap */
 			reduce_dependencies(AH, next_work_item, NULL);
 		}
 		else
@@ -4027,24 +4006,26 @@ static void
 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							 TocEntry *pending_list)
 {
-	ParallelReadyList ready_list;
+	binaryheap *ready_heap;
 	TocEntry   *next_work_item;
 
 	pg_log_debug("entering restore_toc_entries_parallel");
 
-	/* Set up ready_list with enough room for all known TocEntrys */
-	ready_list_init(&ready_list, AH->tocCount);
+	/* Set up ready_heap with enough room for all known TocEntrys */
+	ready_heap = binaryheap_allocate(AH->tocCount,
+									 TocEntrySizeCompareBinaryheap,
+									 NULL);
 
 	/*
 	 * The pending_list contains all items that we need to restore.  Move all
-	 * items that are available to process immediately into the ready_list.
+	 * items that are available to process immediately into the ready_heap.
 	 * After this setup, the pending list is everything that needs to be done
-	 * but is blocked by one or more dependencies, while the ready list
+	 * but is blocked by one or more dependencies, while the ready heap
 	 * contains items that have no remaining dependencies and are OK to
 	 * process in the current restore pass.
 	 */
 	AH->restorePass = RESTORE_PASS_MAIN;
-	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+	move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 
 	/*
 	 * main parent loop
@@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	for (;;)
 	{
 		/* Look for an item ready to be dispatched to a worker */
-		next_work_item = pop_next_work_item(&ready_list, pstate);
+		next_work_item = pop_next_work_item(ready_heap, pstate);
 		if (next_work_item != NULL)
 		{
 			/* If not to be restored, don't waste time launching a worker */
@@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							next_work_item->dumpId,
 							next_work_item->desc, next_work_item->tag);
 				/* Update its dependencies as though we'd completed it */
-				reduce_dependencies(AH, next_work_item, &ready_list);
+				reduce_dependencies(AH, next_work_item, ready_heap);
 				/* Loop around to see if anything else can be dispatched */
 				continue;
 			}
@@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 
 			/* Dispatch to some worker */
 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
-								   mark_restore_job_done, &ready_list);
+								   mark_restore_job_done, ready_heap);
 		}
 		else if (IsEveryWorkerIdle(pstate))
 		{
@@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 			/* Advance to next restore pass */
 			AH->restorePass++;
 			/* That probably allows some stuff to be made ready */
-			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+			move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 			/* Loop around to see if anything's now ready */
 			continue;
 		}
@@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 					   next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
 	}
 
-	/* There should now be nothing in ready_list. */
-	Assert(ready_list.first_te > ready_list.last_te);
+	/* There should now be nothing in ready_heap. */
+	Assert(binaryheap_empty(ready_heap));
 
-	ready_list_free(&ready_list);
+	binaryheap_free(ready_heap);
 
 	pg_log_info("finished main parallel loop");
 }
@@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
 }
 
 
-/*
- * Initialize the ready_list with enough room for up to tocCount entries.
- */
-static void
-ready_list_init(ParallelReadyList *ready_list, int tocCount)
-{
-	ready_list->tes = (TocEntry **)
-		pg_malloc(tocCount * sizeof(TocEntry *));
-	ready_list->first_te = 0;
-	ready_list->last_te = -1;
-	ready_list->sorted = false;
-}
-
-/*
- * Free storage for a ready_list.
- */
-static void
-ready_list_free(ParallelReadyList *ready_list)
-{
-	pg_free(ready_list->tes);
-}
-
-/* Add te to the ready_list */
-static void
-ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
-{
-	ready_list->tes[++ready_list->last_te] = te;
-	/* List is (probably) not sorted anymore. */
-	ready_list->sorted = false;
-}
-
-/* Remove the i'th entry in the ready_list */
-static void
-ready_list_remove(ParallelReadyList *ready_list, int i)
-{
-	int			f = ready_list->first_te;
-
-	Assert(i >= f && i <= ready_list->last_te);
-
-	/*
-	 * In the typical case where the item to be removed is the first ready
-	 * entry, we need only increment first_te to remove it.  Otherwise, move
-	 * the entries before it to compact the list.  (This preserves sortedness,
-	 * if any.)  We could alternatively move the entries after i, but there
-	 * are typically many more of those.
-	 */
-	if (i > f)
-	{
-		TocEntry  **first_te_ptr = &ready_list->tes[f];
-
-		memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
-	}
-	ready_list->first_te++;
-}
-
-/* Sort the ready_list into the desired order */
-static void
-ready_list_sort(ParallelReadyList *ready_list)
-{
-	if (!ready_list->sorted)
-	{
-		int			n = ready_list->last_te - ready_list->first_te + 1;
-
-		if (n > 1)
-			qsort(ready_list->tes + ready_list->first_te, n,
-				  sizeof(TocEntry *),
-				  TocEntrySizeCompare);
-		ready_list->sorted = true;
-	}
-}
-
 /* qsort comparator for sorting TocEntries by dataLength */
 static int
-TocEntrySizeCompare(const void *p1, const void *p2)
+TocEntrySizeCompareQsort(const void *p1, const void *p2)
 {
 	const TocEntry *te1 = *(const TocEntry *const *) p1;
 	const TocEntry *te2 = *(const TocEntry *const *) p2;
@@ -4318,17 +4228,24 @@ TocEntrySizeCompare(const void *p1, const void *p2)
 	return 0;
 }
 
+/* binaryheap comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
+{
+	return TocEntrySizeCompareQsort(&p1, &p2);
+}
+
 
 /*
- * Move all immediately-ready items from pending_list to ready_list.
+ * Move all immediately-ready items from pending_list to ready_heap.
  *
  * Items are considered ready if they have no remaining dependencies and
  * they belong in the current restore pass.  (See also reduce_dependencies,
  * which applies the same logic one-at-a-time.)
  */
 static void
-move_to_ready_list(TocEntry *pending_list,
-				   ParallelReadyList *ready_list,
+move_to_ready_heap(TocEntry *pending_list,
+				   binaryheap *ready_heap,
 				   RestorePass pass)
 {
 	TocEntry   *te;
@@ -4344,38 +4261,38 @@ move_to_ready_list(TocEntry *pending_list,
 		{
 			/* Remove it from pending_list ... */
 			pending_list_remove(te);
-			/* ... and add to ready_list */
-			ready_list_insert(ready_list, te);
+			/* ... and add to ready_heap */
+			binaryheap_add(ready_heap, te);
 		}
 	}
 }
 
 /*
  * Find the next work item (if any) that is capable of being run now,
- * and remove it from the ready_list.
+ * and remove it from the ready_heap.
  *
  * Returns the item, or NULL if nothing is runnable.
  *
  * To qualify, the item must have no remaining dependencies
  * and no requirements for locks that are incompatible with
- * items currently running.  Items in the ready_list are known to have
+ * items currently running.  Items in the ready_heap are known to have
  * no remaining dependencies, but we have to check for lock conflicts.
  */
 static TocEntry *
-pop_next_work_item(ParallelReadyList *ready_list,
+pop_next_work_item(binaryheap *ready_heap,
 				   ParallelState *pstate)
 {
 	/*
-	 * Sort the ready_list so that we'll tackle larger jobs first.
-	 */
-	ready_list_sort(ready_list);
-
-	/*
-	 * Search the ready_list until we find a suitable item.
+	 * Search the ready_heap until we find a suitable item.  Note that we do a
+	 * sequential scan through the heap nodes, so even though we will first
+	 * try to choose the highest-priority item, we might end up picking
+	 * something with a much lower priority.  However, it is expected that we
+	 * will typically be able to pick one of the first few items, which should
+	 * usually have a relatively high priority.
 	 */
-	for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
+	for (int i = 0; i < binaryheap_size(ready_heap); i++)
 	{
-		TocEntry   *te = ready_list->tes[i];
+		TocEntry   *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
 		bool		conflicts = false;
 
 		/*
@@ -4401,7 +4318,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
 			continue;
 
 		/* passed all tests, so this item can run */
-		ready_list_remove(ready_list, i);
+		binaryheap_remove_node(ready_heap, i);
 		return te;
 	}
 
@@ -4447,7 +4364,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 					  int status,
 					  void *callback_data)
 {
-	ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
+	binaryheap *ready_heap = (binaryheap *) callback_data;
 
 	pg_log_info("finished item %d %s %s",
 				te->dumpId, te->desc, te->tag);
@@ -4465,7 +4382,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 		pg_fatal("worker process failed: exit code %d",
 				 status);
 
-	reduce_dependencies(AH, te, ready_list);
+	reduce_dependencies(AH, te, ready_heap);
 }
 
 
@@ -4708,11 +4625,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
 /*
  * Remove the specified TOC entry from the depCounts of items that depend on
  * it, thereby possibly making them ready-to-run.  Any pending item that
- * becomes ready should be moved to the ready_list, if that's provided.
+ * becomes ready should be moved to the ready_heap, if that's provided.
  */
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-					ParallelReadyList *ready_list)
+					binaryheap *ready_heap)
 {
 	int			i;
 
@@ -4730,18 +4647,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
 		 * the current restore pass, and it is currently a member of the
 		 * pending list (that check is needed to prevent double restore in
 		 * some cases where a list-file forces out-of-order restoring).
-		 * However, if ready_list == NULL then caller doesn't want any list
+		 * However, if ready_heap == NULL then caller doesn't want any list
 		 * memberships changed.
 		 */
 		if (otherte->depCount == 0 &&
 			_tocEntryRestorePass(otherte) == AH->restorePass &&
 			otherte->pending_prev != NULL &&
-			ready_list != NULL)
+			ready_heap != NULL)
 		{
 			/* Remove it from pending list ... */
 			pending_list_remove(otherte);
-			/* ... and add to ready_list */
-			ready_list_insert(ready_list, otherte);
+			/* ... and add to ready_heap */
+			binaryheap_add(ready_heap, otherte);
 		}
 	}
 }
-- 
2.25.1

