*** a/src/bin/pg_dump/Makefile
--- b/src/bin/pg_dump/Makefile
***************
*** 20,26 **** override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_null.o pg_backup_tar.o \
! 	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
--- 20,27 ----
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_null.o pg_backup_tar.o \
! 	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o \
! 	parallel.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
*** a/src/bin/pg_dump/compress_io.c
--- b/src/bin/pg_dump/compress_io.c
***************
*** 55,60 ****
--- 55,61 ----
  #include "compress_io.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  /*----------------------
   * Compressor API
***************
*** 183,188 **** size_t
--- 184,192 ----
  WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
  				   const void *data, size_t dLen)
  {
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	switch (cs->comprAlg)
  	{
  		case COMPR_ALG_LIBZ:
***************
*** 352,357 **** ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
--- 356,364 ----
  	/* no minimal chunk size for zlib */
  	while ((cnt = readF(AH, &buf, &buflen)))
  	{
+ 		/* Are we aborting? */
+ 		checkAborting(AH);
+ 
  		zp->next_in = (void *) buf;
  		zp->avail_in = cnt;
  
***************
*** 412,417 **** ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
--- 419,427 ----
  
  	while ((cnt = readF(AH, &buf, &buflen)))
  	{
+ 		/* Are we aborting? */
+ 		checkAborting(AH);
+ 
  		ahwrite(buf, 1, cnt, AH);
  	}
  
*** a/src/bin/pg_dump/dumputils.c
--- b/src/bin/pg_dump/dumputils.c
***************
*** 16,21 ****
--- 16,22 ----
  
  #include <ctype.h>
  
+ #include "dumpmem.h"
  #include "dumputils.h"
  
  #include "parser/keywords.h"
***************
*** 38,43 **** static struct
--- 39,45 ----
  }	on_exit_nicely_list[MAX_ON_EXIT_NICELY];
  
  static int	on_exit_nicely_index;
+ void (*on_exit_msg_func)(const char *modulename, const char *fmt, va_list ap) = vwrite_msg;
  
  #define supports_grant_options(version) ((version) >= 70400)
  
***************
*** 48,58 **** static bool parseAclItem(const char *item, const char *type,
--- 50,70 ----
  static char *copyAclUserName(PQExpBuffer output, char *input);
  static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
  	   const char *subname);
+ static PQExpBuffer getThreadLocalPQExpBuffer(void);
  
  #ifdef WIN32
+ static void shutdown_parallel_dump_utils(int code, void* unused);
  static bool parallel_init_done = false;
  static DWORD tls_index;
  static DWORD mainThreadId;
+ 
+ static void
+ shutdown_parallel_dump_utils(int code, void* unused)
+ {
+ 	/* Call the cleanup function only from the main thread */
+ 	if (mainThreadId == GetCurrentThreadId())
+ 		WSACleanup();
+ }
  #endif
  
  void
***************
*** 61,83 **** init_parallel_dump_utils(void)
  #ifdef WIN32
  	if (!parallel_init_done)
  	{
  		tls_index = TlsAlloc();
- 		parallel_init_done = true;
  		mainThreadId = GetCurrentThreadId();
  	}
  #endif
  }
  
  /*
!  *	Quotes input string if it's not a legitimate SQL identifier as-is.
!  *
!  *	Note that the returned string must be used before calling fmtId again,
!  *	since we re-use the same return buffer each time.  Non-reentrant but
!  *	reduces memory leakage. (On Windows the memory leakage will be one buffer
!  *	per thread, which is at least better than one per call).
   */
! const char *
! fmtId(const char *rawid)
  {
  	/*
  	 * The Tls code goes awry if we use a static var, so we provide for both
--- 73,101 ----
  #ifdef WIN32
  	if (!parallel_init_done)
  	{
+ 		WSADATA	wsaData;
+ 		int		err;
+ 
  		tls_index = TlsAlloc();
  		mainThreadId = GetCurrentThreadId();
+ 		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
+ 		if (err != 0)
+ 		{
+ 			fprintf(stderr, _("WSAStartup failed: %d\n"), err);
+ 			exit_nicely(1);
+ 		}
+ 		on_exit_nicely(shutdown_parallel_dump_utils, NULL);
+ 		parallel_init_done = true;
  	}
  #endif
  }
  
  /*
!  * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
!  * will be one buffer per thread, which is at least better than one per call).
   */
! static PQExpBuffer
! getThreadLocalPQExpBuffer(void)
  {
  	/*
  	 * The Tls code goes awry if we use a static var, so we provide for both
***************
*** 86,94 **** fmtId(const char *rawid)
  	static PQExpBuffer s_id_return = NULL;
  	PQExpBuffer id_return;
  
- 	const char *cp;
- 	bool		need_quotes = false;
- 
  #ifdef WIN32
  	if (parallel_init_done)
  		id_return = (PQExpBuffer) TlsGetValue(tls_index);		/* 0 when not set */
--- 104,109 ----
***************
*** 118,123 **** fmtId(const char *rawid)
--- 133,155 ----
  
  	}
  
+ 	return id_return;
+ }
+ 
+ /*
+  *	Quotes input string if it's not a legitimate SQL identifier as-is.
+  *
+  *	Note that the returned string must be used before calling fmtId again,
+  *	since we re-use the same return buffer each time.
+  */
+ const char *
+ fmtId(const char *rawid)
+ {
+ 	PQExpBuffer id_return = getThreadLocalPQExpBuffer();
+ 
+ 	const char *cp;
+ 	bool		need_quotes = false;
+ 
  	/*
  	 * These checks need to match the identifier production in scan.l. Don't
  	 * use islower() etc.
***************
*** 185,190 **** fmtId(const char *rawid)
--- 217,251 ----
  	return id_return->data;
  }
  
+ /*
+  * fmtQualifiedId - convert a qualified name to the proper format for
+  * the source database.
+  *
+  * Like fmtId, use the result before calling again.
+  *
+  * Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot
+  * use it until we're finished with calling fmtId().
+  */
+ const char *
+ fmtQualifiedId(int remoteVersion, const char *schema, const char *id)
+ {
+ 	PQExpBuffer id_return;
+ 	PQExpBuffer lcl_pqexp = createPQExpBuffer();
+ 
+ 	/* Suppress schema name if fetching from pre-7.3 DB */
+ 	if (remoteVersion >= 70300 && schema && *schema)
+ 	{
+ 		appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema));
+ 	}
+ 	appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id));
+ 
+ 	id_return = getThreadLocalPQExpBuffer();
+ 
+ 	appendPQExpBuffer(id_return, "%s", lcl_pqexp->data);
+ 	destroyPQExpBuffer(lcl_pqexp);
+ 
+ 	return id_return->data;
+ }
  
  /*
   * Convert a string value to an SQL string literal and append it to
***************
*** 1312,1318 **** exit_horribly(const char *modulename, const char *fmt,...)
  	va_list		ap;
  
  	va_start(ap, fmt);
! 	vwrite_msg(modulename, fmt, ap);
  	va_end(ap);
  
  	exit_nicely(1);
--- 1373,1379 ----
  	va_list		ap;
  
  	va_start(ap, fmt);
! 	on_exit_msg_func(modulename, fmt, ap);
  	va_end(ap);
  
  	exit_nicely(1);
*** a/src/bin/pg_dump/dumputils.h
--- b/src/bin/pg_dump/dumputils.h
***************
*** 34,39 **** extern const char *progname;
--- 34,41 ----
  
  extern void init_parallel_dump_utils(void);
  extern const char *fmtId(const char *identifier);
+ extern const char *fmtQualifiedId(int remoteVersion,
+ 								  const char *schema, const char *id);
  extern void appendStringLiteral(PQExpBuffer buf, const char *str,
  					int encoding, bool std_strings);
  extern void appendStringLiteralConn(PQExpBuffer buf, const char *str,
***************
*** 72,77 **** __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
--- 74,81 ----
  extern void
  exit_horribly(const char *modulename, const char *fmt,...)
  __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn));
+ extern void (*on_exit_msg_func)(const char *modulename, const char *fmt, va_list ap)
+ 				__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
  extern void on_exit_nicely(on_exit_nicely_callback function, void *arg);
  extern void exit_nicely(int code) __attribute__((noreturn));
  
*** a/src/bin/pg_dump/pg_backup.h
--- b/src/bin/pg_dump/pg_backup.h
***************
*** 81,89 **** struct Archive
--- 81,93 ----
  	int			minRemoteVersion;		/* allowable range */
  	int			maxRemoteVersion;
  
+ 	int			numWorkers;		/* number of parallel processes */
+ 	char	   *sync_snapshot_id;  /* sync snapshot id for parallel operation */
+ 
  	/* info needed for string escaping */
  	int			encoding;		/* libpq code for client_encoding */
  	bool		std_strings;	/* standard_conforming_strings */
+ 	char	   *use_role;		/* Issue SET ROLE to this */
  
  	/* error handling */
  	bool		exit_on_error;	/* whether to exit on SQL errors... */
***************
*** 141,147 **** typedef struct _restoreOptions
  	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
  										 * to stderr */
  	bool		single_txn;
- 	int			number_of_jobs;
  
  	bool	   *idWanted;		/* array showing which dump IDs to emit */
  } RestoreOptions;
--- 145,150 ----
***************
*** 195,200 **** extern void PrintTOCSummary(Archive *AH, RestoreOptions *ropt);
--- 198,206 ----
  
  extern RestoreOptions *NewRestoreOptions(void);
  
+ /* We have one in pg_dump.c and another one in pg_restore.c */
+ extern void _SetupWorker(Archive *AHX, RestoreOptions *ropt);
+ 
  /* Rearrange and filter TOC entries */
  extern void SortTocFromFile(Archive *AHX, RestoreOptions *ropt);
  
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 23,30 ****
--- 23,32 ----
  #include "pg_backup_db.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  #include <ctype.h>
+ #include <fcntl.h>
  #include <unistd.h>
  #include <sys/stat.h>
  #include <sys/types.h>
***************
*** 36,107 ****
  
  #include "libpq/libpq-fs.h"
  
- /*
-  * Special exit values from worker children.  We reserve 0 for normal
-  * success; 1 and other small values should be interpreted as crashes.
-  */
- #define WORKER_CREATE_DONE		10
- #define WORKER_INHIBIT_DATA		11
- #define WORKER_IGNORED_ERRORS	12
- 
- /*
-  * Unix uses exit to return result from worker child, so function is void.
-  * Windows thread result comes via function return.
-  */
- #ifndef WIN32
- #define parallel_restore_result void
- #else
- #define parallel_restore_result DWORD
- #endif
- 
- /* IDs for worker children are either PIDs or thread handles */
- #ifndef WIN32
- #define thandle pid_t
- #else
- #define thandle HANDLE
- #endif
- 
- typedef struct ParallelStateEntry
- {
- #ifdef WIN32
- 	unsigned int threadId;
- #else
- 	pid_t		pid;
- #endif
- 	ArchiveHandle *AH;
- } ParallelStateEntry;
- 
- typedef struct ParallelState
- {
- 	int			numWorkers;
- 	ParallelStateEntry *pse;
- } ParallelState;
- 
- /* Arguments needed for a worker child */
- typedef struct _restore_args
- {
- 	ArchiveHandle *AH;
- 	TocEntry   *te;
- 	ParallelStateEntry *pse;
- } RestoreArgs;
- 
- /* State for each parallel activity slot */
- typedef struct _parallel_slot
- {
- 	thandle		child_id;
- 	RestoreArgs *args;
- } ParallelSlot;
- 
- typedef struct ShutdownInformation
- {
- 	ParallelState *pstate;
- 	Archive    *AHX;
- } ShutdownInformation;
- 
- static ShutdownInformation shutdown_info;
- 
- #define NO_SLOT (-1)
- 
  #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
  #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
  
--- 38,43 ----
***************
*** 137,143 **** static bool _tocEntryIsACL(TocEntry *te);
  static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void buildTocEntryArrays(ArchiveHandle *AH);
- static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
  static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
  static int	_discoverArchiveFormat(ArchiveHandle *AH);
  
--- 73,78 ----
***************
*** 150,170 **** static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
  
  static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel);
! static void restore_toc_entries_parallel(ArchiveHandle *AH);
! static thandle spawn_restore(RestoreArgs *args);
! static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
! static bool work_in_progress(ParallelSlot *slots, int n_slots);
! static int	get_next_slot(ParallelSlot *slots, int n_slots);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
! 				   ParallelSlot *slots, int n_slots);
! static parallel_restore_result parallel_restore(RestoreArgs *args);
  static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   thandle worker, int status,
! 			   ParallelSlot *slots, int n_slots);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
--- 85,103 ----
  
  static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel);
! static void restore_toc_entries_prefork(ArchiveHandle *AH);
! static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
! 										 TocEntry *pending_list);
! static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
! 				   ParallelState *pstate);
  static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
***************
*** 173,186 **** static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
  					TocEntry *ready_list);
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
- static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
- static void DeCloneArchive(ArchiveHandle *AH);
- 
- static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH);
- static void unsetProcessIdentifier(ParallelStateEntry *pse);
- static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate);
- static void archive_close_connection(int code, void *arg);
- 
  
  /*
   *	Wrapper functions.
--- 106,111 ----
***************
*** 321,327 **** RestoreArchive(Archive *AHX)
  	/*
  	 * If we're going to do parallel restore, there are some restrictions.
  	 */
! 	parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB);
  	if (parallel_mode)
  	{
  		/* We haven't got round to making this work for all archive formats */
--- 246,252 ----
  	/*
  	 * If we're going to do parallel restore, there are some restrictions.
  	 */
! 	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
  	if (parallel_mode)
  	{
  		/* We haven't got round to making this work for all archive formats */
***************
*** 491,497 **** RestoreArchive(Archive *AHX)
  	 * In parallel mode, turn control over to the parallel-restore logic.
  	 */
  	if (parallel_mode)
! 		restore_toc_entries_parallel(AH);
  	else
  	{
  		for (te = AH->toc->next; te != AH->toc; te = te->next)
--- 416,440 ----
  	 * In parallel mode, turn control over to the parallel-restore logic.
  	 */
  	if (parallel_mode)
! 	{
! 		ParallelState  *pstate;
! 		TocEntry		pending_list;
! 
! 		par_list_header_init(&pending_list);
! 
! 		/* This runs PRE_DATA items and then disconnects from the database */
! 		restore_toc_entries_prefork(AH);
! 		Assert(AH->connection == NULL);
! 
! 		/* ParallelBackupStart() will actually fork the processes */
! 		pstate = ParallelBackupStart(AH, ropt);
! 		restore_toc_entries_parallel(AH, pstate, &pending_list);
! 		ParallelBackupEnd(AH, pstate);
! 
! 		/* reconnect the master and see if we missed something */
! 		restore_toc_entries_postfork(AH, &pending_list);
! 		Assert(AH->connection != NULL);
! 	}
  	else
  	{
  		for (te = AH->toc->next; te != AH->toc; te = te->next)
***************
*** 550,556 **** static int
  restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel)
  {
! 	int			retval = 0;
  	teReqs		reqs;
  	bool		defnDumped;
  
--- 493,499 ----
  restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel)
  {
! 	int			status = WORKER_OK;
  	teReqs		reqs;
  	bool		defnDumped;
  
***************
*** 603,609 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				if (ropt->noDataForFailedTables)
  				{
  					if (is_parallel)
! 						retval = WORKER_INHIBIT_DATA;
  					else
  						inhibit_data_for_failed_table(AH, te);
  				}
--- 546,552 ----
  				if (ropt->noDataForFailedTables)
  				{
  					if (is_parallel)
! 						status = WORKER_INHIBIT_DATA;
  					else
  						inhibit_data_for_failed_table(AH, te);
  				}
***************
*** 618,624 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				 * just set the return value.
  				 */
  				if (is_parallel)
! 					retval = WORKER_CREATE_DONE;
  				else
  					mark_create_done(AH, te);
  			}
--- 561,567 ----
  				 * just set the return value.
  				 */
  				if (is_parallel)
! 					status = WORKER_CREATE_DONE;
  				else
  					mark_create_done(AH, te);
  			}
***************
*** 736,742 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  		}
  	}
  
! 	return retval;
  }
  
  /*
--- 679,688 ----
  		}
  	}
  
! 	if (AH->public.n_errors > 0 && status == WORKER_OK)
! 		status = WORKER_IGNORED_ERRORS;
! 
! 	return status;
  }
  
  /*
***************
*** 1629,1635 **** buildTocEntryArrays(ArchiveHandle *AH)
  	}
  }
  
! static TocEntry *
  getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
  {
  	/* build index arrays if we didn't already */
--- 1575,1581 ----
  	}
  }
  
! TocEntry *
  getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
  {
  	/* build index arrays if we didn't already */
***************
*** 2127,2176 **** _allocAH(const char *FileSpec, const ArchiveFormat fmt,
  	return AH;
  }
  
- 
  void
! WriteDataChunks(ArchiveHandle *AH)
  {
  	TocEntry   *te;
- 	StartDataPtr startPtr;
- 	EndDataPtr	endPtr;
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0)
! 		{
! 			AH->currToc = te;
! 			/* printf("Writing data for %d (%x)\n", te->id, te); */
! 
! 			if (strcmp(te->desc, "BLOBS") == 0)
! 			{
! 				startPtr = AH->StartBlobsPtr;
! 				endPtr = AH->EndBlobsPtr;
! 			}
! 			else
! 			{
! 				startPtr = AH->StartDataPtr;
! 				endPtr = AH->EndDataPtr;
! 			}
  
! 			if (startPtr != NULL)
! 				(*startPtr) (AH, te);
  
  			/*
! 			 * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
  			 */
  
! 			/*
! 			 * The user-provided DataDumper routine needs to call
! 			 * AH->WriteData
! 			 */
! 			(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
  
! 			if (endPtr != NULL)
! 				(*endPtr) (AH, te);
! 			AH->currToc = NULL;
! 		}
  	}
  }
  
  void
--- 2073,2139 ----
  	return AH;
  }
  
  void
! WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
  {
  	TocEntry   *te;
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		if (!te->dataDumper)
! 			continue;
  
! 		if ((te->reqs & REQ_DATA) == 0)
! 			continue;
  
+ 		if (pstate && pstate->numWorkers > 1)
+ 		{
  			/*
! 			 * If we are in a parallel backup, then we are always the master
! 			 * process.
  			 */
+ 			EnsureIdleWorker(AH, pstate);
+ 			Assert(GetIdleWorker(pstate) != NO_SLOT);
+ 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+ 		}
+ 		else
+ 			WriteDataChunksForTocEntry(AH, te);
+ 	}
+ 	EnsureWorkersFinished(AH, pstate);
+ }
  
! void
! WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
! {
! 	StartDataPtr startPtr;
! 	EndDataPtr	endPtr;
  
! 	AH->currToc = te;
! 
! 	if (strcmp(te->desc, "BLOBS") == 0)
! 	{
! 		startPtr = AH->StartBlobsPtr;
! 		endPtr = AH->EndBlobsPtr;
  	}
+ 	else
+ 	{
+ 		startPtr = AH->StartDataPtr;
+ 		endPtr = AH->EndDataPtr;
+ 	}
+ 
+ 	if (startPtr != NULL)
+ 		(*startPtr) (AH, te);
+ 
+ 	/*
+ 	 * The user-provided DataDumper routine needs to call
+ 	 * AH->WriteData
+ 	 */
+ 	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+ 
+ 	if (endPtr != NULL)
+ 		(*endPtr) (AH, te);
+ 
+ 	AH->currToc = NULL;
  }
  
  void
***************
*** 3394,3460 **** dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
  		ahprintf(AH, "-- %s %s\n\n", msg, buf);
  }
  
- static void
- setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH)
- {
- #ifdef WIN32
- 	pse->threadId = GetCurrentThreadId();
- #else
- 	pse->pid = getpid();
- #endif
- 	pse->AH = AH;
- }
- 
- static void
- unsetProcessIdentifier(ParallelStateEntry *pse)
- {
- #ifdef WIN32
- 	pse->threadId = 0;
- #else
- 	pse->pid = 0;
- #endif
- 	pse->AH = NULL;
- }
- 
- static ParallelStateEntry *
- GetMyPSEntry(ParallelState *pstate)
- {
- 	int			i;
- 
- 	for (i = 0; i < pstate->numWorkers; i++)
- #ifdef WIN32
- 		if (pstate->pse[i].threadId == GetCurrentThreadId())
- #else
- 		if (pstate->pse[i].pid == getpid())
- #endif
- 			return &(pstate->pse[i]);
- 
- 	return NULL;
- }
- 
- static void
- archive_close_connection(int code, void *arg)
- {
- 	ShutdownInformation *si = (ShutdownInformation *) arg;
- 
- 	if (si->pstate)
- 	{
- 		ParallelStateEntry *entry = GetMyPSEntry(si->pstate);
- 
- 		if (entry != NULL && entry->AH)
- 			DisconnectDatabase(&(entry->AH->public));
- 	}
- 	else if (si->AHX)
- 		DisconnectDatabase(si->AHX);
- }
- 
- void
- on_exit_close_archive(Archive *AHX)
- {
- 	shutdown_info.AHX = AHX;
- 	on_exit_nicely(archive_close_connection, &shutdown_info);
- }
- 
  /*
   * Main engine for parallel restore.
   *
--- 3357,3362 ----
***************
*** 3467,3496 **** on_exit_close_archive(Archive *AHX)
   * RestoreArchive).
   */
  static void
! restore_toc_entries_parallel(ArchiveHandle *AH)
! {
! 	RestoreOptions *ropt = AH->ropt;
! 	int			n_slots = ropt->number_of_jobs;
! 	ParallelSlot *slots;
! 	int			work_status;
! 	int			next_slot;
! 	bool		skipped_some;
! 	TocEntry	pending_list;
! 	TocEntry	ready_list;
! 	TocEntry   *next_work_item;
! 	thandle		ret_child;
! 	TocEntry   *te;
! 	ParallelState *pstate;
! 	int			i;
! 
! 	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
! 
! 	slots = (ParallelSlot *) pg_malloc0(n_slots * sizeof(ParallelSlot));
! 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
! 	pstate->pse = (ParallelStateEntry *) pg_malloc0(n_slots * sizeof(ParallelStateEntry));
! 	pstate->numWorkers = ropt->number_of_jobs;
! 	for (i = 0; i < pstate->numWorkers; i++)
! 		unsetProcessIdentifier(&(pstate->pse[i]));
  
  	/* Adjust dependency information */
  	fix_dependencies(AH);
--- 3369,3381 ----
   * RestoreArchive).
   */
  static void
! restore_toc_entries_prefork(ArchiveHandle *AH)
!   {
!     RestoreOptions *ropt = AH->ropt;
!     bool        skipped_some;
!     TocEntry   *next_work_item;
!   
! 	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
  
  	/* Adjust dependency information */
  	fix_dependencies(AH);
***************
*** 3551,3562 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  	 */
  	DisconnectDatabase(&AH->public);
  
- 	/*
- 	 * Set the pstate in the shutdown_info. The exit handler uses pstate if
- 	 * set and falls back to AHX otherwise.
- 	 */
- 	shutdown_info.pstate = pstate;
- 
  	/* blow away any transient state from the old connection */
  	if (AH->currUser)
  		free(AH->currUser);
--- 3436,3441 ----
***************
*** 3568,3584 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  		free(AH->currTablespace);
  	AH->currTablespace = NULL;
  	AH->currWithOids = -1;
  
  	/*
! 	 * Initialize the lists of pending and ready items.  After this setup, the
! 	 * pending list is everything that needs to be done but is blocked by one
! 	 * or more dependencies, while the ready list contains items that have no
! 	 * remaining dependencies.	Note: we don't yet filter out entries that
! 	 * aren't going to be restored.  They might participate in dependency
! 	 * chains connecting entries that should be restored, so we treat them as
! 	 * live until we actually process them.
  	 */
- 	par_list_header_init(&pending_list);
  	par_list_header_init(&ready_list);
  	skipped_some = false;
  	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
--- 3447,3488 ----
  		free(AH->currTablespace);
  	AH->currTablespace = NULL;
  	AH->currWithOids = -1;
+ }
+ 
+ /*
+  * Main engine for parallel restore.
+  *
+  * Work is done in three phases.
+  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+  * just as for a standard restore. This is done in restore_toc_entries_prefork().
+  * Second we process the remaining non-ACL steps in parallel worker children
+  * (threads on Windows, processes on Unix), these fork off and set up their
+  * connections before we call restore_toc_entries_parallel_forked.
+  * Finally we process all the ACL entries in a single connection (that happens
+  * back in RestoreArchive).
+  */
+ static void
+ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ 							 TocEntry *pending_list)
+ {
+ 	int			work_status;
+ 	bool		skipped_some;
+ 	TocEntry	ready_list;
+ 	TocEntry   *next_work_item;
+ 	int			ret_child;
+ 
+ 	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
  
  	/*
! 	 * Initialize the lists of ready items, the list for pending items has
! 	 * already been initialized in the caller.  After this setup, the pending
! 	 * list is everything that needs to be done but is blocked by one or more
! 	 * dependencies, while the ready list contains items that have no remaining
! 	 * dependencies. Note: we don't yet filter out entries that aren't going
! 	 * to be restored. They might participate in dependency chains connecting
! 	 * entries that should be restored, so we treat them as live until we
! 	 * actually process them.
  	 */
  	par_list_header_init(&ready_list);
  	skipped_some = false;
  	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
***************
*** 3603,3609 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  		}
  
  		if (next_work_item->depCount > 0)
! 			par_list_append(&pending_list, next_work_item);
  		else
  			par_list_append(&ready_list, next_work_item);
  	}
--- 3507,3513 ----
  		}
  
  		if (next_work_item->depCount > 0)
! 			par_list_append(pending_list, next_work_item);
  		else
  			par_list_append(&ready_list, next_work_item);
  	}
***************
*** 3617,3625 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  
  	ahlog(AH, 1, "entering main parallel loop\n");
  
! 	while ((next_work_item = get_next_work_item(AH, &ready_list,
! 												slots, n_slots)) != NULL ||
! 		   work_in_progress(slots, n_slots))
  	{
  		if (next_work_item != NULL)
  		{
--- 3521,3528 ----
  
  	ahlog(AH, 1, "entering main parallel loop\n");
  
! 	while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
! 		   !IsEveryWorkerIdle(pstate))
  	{
  		if (next_work_item != NULL)
  		{
***************
*** 3637,3698 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  				continue;
  			}
  
! 			if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
! 			{
! 				/* There is work still to do and a worker slot available */
! 				thandle		child;
! 				RestoreArgs *args;
  
! 				ahlog(AH, 1, "launching item %d %s %s\n",
! 					  next_work_item->dumpId,
! 					  next_work_item->desc, next_work_item->tag);
! 
! 				par_list_remove(next_work_item);
  
! 				/* this memory is dealloced in mark_work_done() */
! 				args = pg_malloc(sizeof(RestoreArgs));
! 				args->AH = CloneArchive(AH);
! 				args->te = next_work_item;
! 				args->pse = &pstate->pse[next_slot];
  
! 				/* run the step in a worker child */
! 				child = spawn_restore(args);
  
! 				slots[next_slot].child_id = child;
! 				slots[next_slot].args = args;
  
! 				continue;
  			}
- 		}
  
! 		/*
! 		 * If we get here there must be work being done.  Either there is no
! 		 * work available to schedule (and work_in_progress returned true) or
! 		 * there are no slots available.  So we wait for a worker to finish,
! 		 * and process the result.
! 		 */
! 		ret_child = reap_child(slots, n_slots, &work_status);
  
! 		if (WIFEXITED(work_status))
! 		{
! 			mark_work_done(AH, &ready_list,
! 						   ret_child, WEXITSTATUS(work_status),
! 						   slots, n_slots);
! 		}
! 		else
! 		{
! 			exit_horribly(modulename, "worker process crashed: status %d\n",
! 						  work_status);
  		}
  	}
  
  	ahlog(AH, 1, "finished main parallel loop\n");
  
! 	/*
! 	 * Remove the pstate again, so the exit handler will now fall back to
! 	 * closing AH->connection again.
! 	 */
! 	shutdown_info.pstate = NULL;
  
  	/*
  	 * Now reconnect the single parent connection.
--- 3540,3610 ----
  				continue;
  			}
  
! 			ahlog(AH, 1, "launching item %d %s %s\n",
! 				  next_work_item->dumpId,
! 				  next_work_item->desc, next_work_item->tag);
  
! 			par_list_remove(next_work_item);
  
! 			Assert(GetIdleWorker(pstate) != NO_SLOT);
! 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
! 		}
! 		else
! 			/* at least one child is working and we have nothing ready. */
! 			Assert(!IsEveryWorkerIdle(pstate));
  
! 		for (;;)
! 		{
! 			int nTerm = 0;
  
! 			/*
! 			 * In order to reduce dependencies as soon as possible and
! 			 * especially to reap the status of workers who are working on
! 			 * items that pending items depend on, we do a non-blocking check
! 			 * for ended workers first.
! 			 *
! 			 * However, if we do not have any other work items currently that
! 			 * workers can work on, we do not busy-loop here but instead
! 			 * really wait for at least one worker to terminate. Hence we call
! 			 * ListenToWorkers(..., ..., do_wait = true) in this case.
! 			 */
! 			ListenToWorkers(AH, pstate, !next_work_item);
  
! 			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
! 			{
! 				nTerm++;
! 				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
  			}
  
! 			/*
! 			 * We need to make sure that we have an idle worker before re-running the
! 			 * loop. If nTerm > 0 we already have that (quick check).
! 			 */
! 			if (nTerm > 0)
! 				break;
  
! 			/* if nobody terminated, explicitly check for an idle worker */
! 			if (GetIdleWorker(pstate) != NO_SLOT)
! 				break;
! 
! 			/*
! 			 * If we have no idle worker, read the result of one or more
! 			 * workers and loop the loop to call ReapWorkerStatus() on them.
! 			 */
! 			ListenToWorkers(AH, pstate, true);
  		}
  	}
  
  	ahlog(AH, 1, "finished main parallel loop\n");
+ }
  
! static void
! restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
! {
! 	RestoreOptions *ropt = AH->ropt;
! 	TocEntry   *te;
! 
! 	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
  
  	/*
  	 * Now reconnect the single parent connection.
***************
*** 3708,3714 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  	 * dependencies, or some other pathological condition. If so, do it in the
  	 * single parent connection.
  	 */
! 	for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
  	{
  		ahlog(AH, 1, "processing missed item %d %s %s\n",
  			  te->dumpId, te->desc, te->tag);
--- 3620,3626 ----
  	 * dependencies, or some other pathological condition. If so, do it in the
  	 * single parent connection.
  	 */
! 	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
  	{
  		ahlog(AH, 1, "processing missed item %d %s %s\n",
  			  te->dumpId, te->desc, te->tag);
***************
*** 3719,3839 **** restore_toc_entries_parallel(ArchiveHandle *AH)
  }
  
  /*
-  * create a worker child to perform a restore step in parallel
-  */
- static thandle
- spawn_restore(RestoreArgs *args)
- {
- 	thandle		child;
- 
- 	/* Ensure stdio state is quiesced before forking */
- 	fflush(NULL);
- 
- #ifndef WIN32
- 	child = fork();
- 	if (child == 0)
- 	{
- 		/* in child process */
- 		parallel_restore(args);
- 		exit_horribly(modulename,
- 					  "parallel_restore should not return\n");
- 	}
- 	else if (child < 0)
- 	{
- 		/* fork failed */
- 		exit_horribly(modulename,
- 					  "could not create worker process: %s\n",
- 					  strerror(errno));
- 	}
- #else
- 	child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
- 									args, 0, NULL);
- 	if (child == 0)
- 		exit_horribly(modulename,
- 					  "could not create worker thread: %s\n",
- 					  strerror(errno));
- #endif
- 
- 	return child;
- }
- 
- /*
-  *	collect status from a completed worker child
-  */
- static thandle
- reap_child(ParallelSlot *slots, int n_slots, int *work_status)
- {
- #ifndef WIN32
- 	/* Unix is so much easier ... */
- 	return wait(work_status);
- #else
- 	static HANDLE *handles = NULL;
- 	int			hindex,
- 				snum,
- 				tnum;
- 	thandle		ret_child;
- 	DWORD		res;
- 
- 	/* first time around only, make space for handles to listen on */
- 	if (handles == NULL)
- 		handles = (HANDLE *) pg_malloc0(n_slots * sizeof(HANDLE));
- 
- 	/* set up list of handles to listen to */
- 	for (snum = 0, tnum = 0; snum < n_slots; snum++)
- 		if (slots[snum].child_id != 0)
- 			handles[tnum++] = slots[snum].child_id;
- 
- 	/* wait for one to finish */
- 	hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
- 
- 	/* get handle of finished thread */
- 	ret_child = handles[hindex - WAIT_OBJECT_0];
- 
- 	/* get the result */
- 	GetExitCodeThread(ret_child, &res);
- 	*work_status = res;
- 
- 	/* dispose of handle to stop leaks */
- 	CloseHandle(ret_child);
- 
- 	return ret_child;
- #endif
- }
- 
- /*
-  * are we doing anything now?
-  */
- static bool
- work_in_progress(ParallelSlot *slots, int n_slots)
- {
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id != 0)
- 			return true;
- 	}
- 	return false;
- }
- 
- /*
-  * find the first free parallel slot (if any).
-  */
- static int
- get_next_slot(ParallelSlot *slots, int n_slots)
- {
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id == 0)
- 			return i;
- 	}
- 	return NO_SLOT;
- }
- 
- 
- /*
   * Check if te1 has an exclusive lock requirement for an item that te2 also
   * requires, whether or not te2's requirement is for an exclusive lock.
   */
--- 3631,3636 ----
***************
*** 3906,3912 **** par_list_remove(TocEntry *te)
   */
  static TocEntry *
  get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! 				   ParallelSlot *slots, int n_slots)
  {
  	bool		pref_non_data = false;	/* or get from AH->ropt */
  	TocEntry   *data_te = NULL;
--- 3703,3709 ----
   */
  static TocEntry *
  get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! 				   ParallelState *pstate)
  {
  	bool		pref_non_data = false;	/* or get from AH->ropt */
  	TocEntry   *data_te = NULL;
***************
*** 3921,3931 **** get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
  	{
  		int			count = 0;
  
! 		for (k = 0; k < n_slots; k++)
! 			if (slots[k].args->te != NULL &&
! 				slots[k].args->te->section == SECTION_DATA)
  				count++;
! 		if (n_slots == 0 || count * 4 < n_slots)
  			pref_non_data = false;
  	}
  
--- 3718,3728 ----
  	{
  		int			count = 0;
  
! 		for (k = 0; k < pstate->numWorkers; k++)
! 			if (pstate->parallelSlot[k].args->te != NULL &&
! 				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
  				count++;
! 		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
  			pref_non_data = false;
  	}
  
***************
*** 3941,3953 **** get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < n_slots && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
! 			if (slots[i].args == NULL)
  				continue;
! 			running_te = slots[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
--- 3738,3750 ----
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
! 			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
  				continue;
! 			running_te = pstate->parallelSlot[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
***************
*** 3982,4044 **** get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
  /*
   * Restore a single TOC item in parallel with others
   *
!  * this is the procedure run as a thread (Windows) or a
!  * separate process (everything else).
   */
! static parallel_restore_result
! parallel_restore(RestoreArgs *args)
  {
  	ArchiveHandle *AH = args->AH;
  	TocEntry   *te = args->te;
  	RestoreOptions *ropt = AH->ropt;
! 	int			retval;
! 
! 	setProcessIdentifier(args->pse, AH);
! 
! 	/*
! 	 * Close and reopen the input file so we have a private file pointer that
! 	 * doesn't stomp on anyone else's file pointer, if we're actually going to
! 	 * need to read from the file. Otherwise, just close it except on Windows,
! 	 * where it will possibly be needed by other threads.
! 	 *
! 	 * Note: on Windows, since we are using threads not processes, the reopen
! 	 * call *doesn't* close the original file pointer but just open a new one.
! 	 */
! 	if (te->section == SECTION_DATA)
! 		(AH->ReopenPtr) (AH);
! #ifndef WIN32
! 	else
! 		(AH->ClosePtr) (AH);
! #endif
! 
! 	/*
! 	 * We need our own database connection, too
! 	 */
! 	ConnectDatabase((Archive *) AH, ropt->dbname,
! 					ropt->pghost, ropt->pgport, ropt->username,
! 					ropt->promptPassword);
  
  	_doSetFixedOutputState(AH);
  
! 	/* Restore the TOC item */
! 	retval = restore_toc_entry(AH, te, ropt, true);
! 
! 	/* And clean up */
! 	DisconnectDatabase((Archive *) AH);
! 	unsetProcessIdentifier(args->pse);
  
! 	/* If we reopened the file, we are done with it, so close it now */
! 	if (te->section == SECTION_DATA)
! 		(AH->ClosePtr) (AH);
  
! 	if (retval == 0 && AH->public.n_errors)
! 		retval = WORKER_IGNORED_ERRORS;
  
! #ifndef WIN32
! 	exit(retval);
! #else
! 	return retval;
! #endif
  }
  
  
--- 3779,3807 ----
  /*
   * Restore a single TOC item in parallel with others
   *
!  * this is run in the worker, i.e. in a thread (Windows) or a separate process
!  * (everything else). A worker process executes several such work items during
!  * a parallel backup or restore. Once we terminate here and report back that
!  * our work is finished, the master process will assign us a new work item.
   */
! int
! parallel_restore(ParallelArgs *args)
  {
  	ArchiveHandle *AH = args->AH;
  	TocEntry   *te = args->te;
  	RestoreOptions *ropt = AH->ropt;
! 	int			status;
  
  	_doSetFixedOutputState(AH);
  
! 	Assert(AH->connection != NULL);
  
! 	AH->public.n_errors = 0;
  
! 	/* Restore the TOC item */
! 	status = restore_toc_entry(AH, te, ropt, true);
  
! 	return status;
  }
  
  
***************
*** 4050,4074 **** parallel_restore(RestoreArgs *args)
   */
  static void
  mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   thandle worker, int status,
! 			   ParallelSlot *slots, int n_slots)
  {
  	TocEntry   *te = NULL;
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id == worker)
- 		{
- 			slots[i].child_id = 0;
- 			te = slots[i].args->te;
- 			DeCloneArchive(slots[i].args->AH);
- 			free(slots[i].args);
- 			slots[i].args = NULL;
  
! 			break;
! 		}
! 	}
  
  	if (te == NULL)
  		exit_horribly(modulename, "could not find slot of finished worker\n");
--- 3813,3824 ----
   */
  static void
  mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate)
  {
  	TocEntry   *te = NULL;
  
! 	te = pstate->parallelSlot[worker].args->te;
  
  	if (te == NULL)
  		exit_horribly(modulename, "could not find slot of finished worker\n");
***************
*** 4367,4382 **** inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
  	}
  }
  
- 
  /*
   * Clone and de-clone routines used in parallel restoration.
   *
   * Enough of the structure is cloned to ensure that there is no
   * conflict between different threads each with their own clone.
-  *
-  * These could be public, but no need at present.
   */
! static ArchiveHandle *
  CloneArchive(ArchiveHandle *AH)
  {
  	ArchiveHandle *clone;
--- 4117,4129 ----
  	}
  }
  
  /*
   * Clone and de-clone routines used in parallel restoration.
   *
   * Enough of the structure is cloned to ensure that there is no
   * conflict between different threads each with their own clone.
   */
! ArchiveHandle *
  CloneArchive(ArchiveHandle *AH)
  {
  	ArchiveHandle *clone;
***************
*** 4402,4410 **** CloneArchive(ArchiveHandle *AH)
--- 4149,4207 ----
  	/* clone has its own error count, too */
  	clone->public.n_errors = 0;
  
+ 	/*
+ 	 * Connect our new clone object to the database:
+ 	 * In parallel restore the parent is already disconnected, because we can
+ 	 * connect the worker processes independently to the database (no snapshot
+ 	 * sync required).
+ 	 * In parallel backup we clone the parent's existing connection.
+ 	 */
+ 	if (AH->mode == archModeRead)
+ 	{
+ 		RestoreOptions *ropt = AH->ropt;
+ 		Assert(AH->connection == NULL);
+ 		/* this also sets clone->connection */
+ 		ConnectDatabase((Archive *) clone, ropt->dbname,
+ 					ropt->pghost, ropt->pgport, ropt->username,
+ 					ropt->promptPassword);
+ 	}
+ 	else
+ 	{
+ 		char	   *dbname;
+ 		char	   *pghost;
+ 		char	   *pgport;
+ 		char	   *username;
+ 		const char *encname;
+ 
+ 		Assert(AH->connection != NULL);
+ 
+ 		/*
+ 		 * Even though we are technically accessing the parent's database object
+ 		 * here, these functions are fine to be called like that because all just
+ 		 * return a pointer and do not actually send/receive any data to/from the
+ 		 * database.
+ 		 */
+ 		dbname = PQdb(AH->connection);
+ 		pghost = PQhost(AH->connection);
+ 		pgport = PQport(AH->connection);
+ 		username = PQuser(AH->connection);
+ 		encname = pg_encoding_to_char(AH->public.encoding);
+ 
+ 		/* this also sets clone->connection */
+ 		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+ 
+ 		/*
+ 		 * Set the same encoding, whatever we set here is what we got from
+ 		 * pg_encoding_to_char(), so we really shouldn't run into an error setting that
+ 		 * very same value. Also see the comment in SetupConnection().
+ 		 */
+ 		PQsetClientEncoding(clone->connection, encname);
+ 	}
+ 
  	/* Let the format-specific code have a chance too */
  	(clone->ClonePtr) (clone);
  
+ 	Assert(clone->connection != NULL);
  	return clone;
  }
  
***************
*** 4413,4419 **** CloneArchive(ArchiveHandle *AH)
   *
   * Note: we assume any clone-local connection was already closed.
   */
! static void
  DeCloneArchive(ArchiveHandle *AH)
  {
  	/* Clear format-specific state */
--- 4210,4216 ----
   *
   * Note: we assume any clone-local connection was already closed.
   */
! void
  DeCloneArchive(ArchiveHandle *AH)
  {
  	/* Clear format-specific state */
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
***************
*** 100,107 **** typedef z_stream *z_streamp;
--- 100,120 ----
  #define K_OFFSET_POS_SET 2
  #define K_OFFSET_NO_DATA 3
  
+ /*
+  * Special exit values from worker children.  We reserve 0 for normal
+  * success; 1 and other small values should be interpreted as crashes.
+  */
+ #define WORKER_OK                     0
+ #define WORKER_CREATE_DONE            10
+ #define WORKER_INHIBIT_DATA           11
+ #define WORKER_IGNORED_ERRORS         12
+ 
  struct _archiveHandle;
  struct _tocEntry;
+ struct _restoreList;
+ struct ParallelArgs;
+ struct ParallelState;
+ enum T_Action;
  
  typedef void (*ClosePtr) (struct _archiveHandle * AH);
  typedef void (*ReopenPtr) (struct _archiveHandle * AH);
***************
*** 129,134 **** typedef void (*PrintTocDataPtr) (struct _archiveHandle * AH, struct _tocEntry *
--- 142,154 ----
  typedef void (*ClonePtr) (struct _archiveHandle * AH);
  typedef void (*DeClonePtr) (struct _archiveHandle * AH);
  
+ typedef char *(*WorkerJobRestorePtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+ typedef char *(*WorkerJobDumpPtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+ typedef char *(*MasterStartParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+ 											enum T_Action act);
+ typedef int (*MasterEndParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+ 										const char *str, enum T_Action act);
+ 
  typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len);
  
  typedef enum
***************
*** 227,232 **** typedef struct _archiveHandle
--- 247,258 ----
  	StartBlobPtr StartBlobPtr;
  	EndBlobPtr EndBlobPtr;
  
+ 	MasterStartParallelItemPtr MasterStartParallelItemPtr;
+ 	MasterEndParallelItemPtr MasterEndParallelItemPtr;
+ 
+ 	WorkerJobDumpPtr WorkerJobDumpPtr;
+ 	WorkerJobRestorePtr WorkerJobRestorePtr;
+ 
  	ClonePtr ClonePtr;			/* Clone format-specific fields */
  	DeClonePtr DeClonePtr;		/* Clean up cloned fields */
  
***************
*** 236,241 **** typedef struct _archiveHandle
--- 262,268 ----
  	char	   *archdbname;		/* DB name *read* from archive */
  	enum trivalue promptPassword;
  	char	   *savedPassword;	/* password for ropt->username, if known */
+ 	char	   *use_role;
  	PGconn	   *connection;
  	int			connectToDB;	/* Flag to indicate if direct DB connection is
  								 * required */
***************
*** 327,332 **** typedef struct _tocEntry
--- 354,360 ----
  	int			nLockDeps;		/* number of such dependencies */
  } TocEntry;
  
+ extern int parallel_restore(struct ParallelArgs *args);
  extern void on_exit_close_archive(Archive *AHX);
  
  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
***************
*** 337,345 **** extern void WriteHead(ArchiveHandle *AH);
  extern void ReadHead(ArchiveHandle *AH);
  extern void WriteToc(ArchiveHandle *AH);
  extern void ReadToc(ArchiveHandle *AH);
! extern void WriteDataChunks(ArchiveHandle *AH);
  
  extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id);
  extern bool checkSeek(FILE *fp);
  
  #define appendStringLiteralAHX(buf,str,AH) \
--- 365,377 ----
  extern void ReadHead(ArchiveHandle *AH);
  extern void WriteToc(ArchiveHandle *AH);
  extern void ReadToc(ArchiveHandle *AH);
! extern void WriteDataChunks(ArchiveHandle *AH, struct ParallelState *pstate);
! extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te);
! extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
! extern void DeCloneArchive(ArchiveHandle *AH);
  
  extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id);
+ TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
  extern bool checkSeek(FILE *fp);
  
  #define appendStringLiteralAHX(buf,str,AH) \
***************
*** 380,383 **** int			ahprintf(ArchiveHandle *AH, const char *fmt,...) __attribute__((format(PG_
--- 412,427 ----
  
  void		ahlog(ArchiveHandle *AH, int level, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
  
+ #ifdef USE_ASSERT_CHECKING
+ #define Assert(condition) \
+ 	if (!(condition)) \
+ 	{ \
+ 		write_msg(NULL, "Failed assertion in %s, line %d\n", \
+ 				  __FILE__, __LINE__); \
+ 		abort();\
+ 	}
+ #else
+ #define Assert(condition)
+ #endif
+ 
  #endif
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "compress_io.h"
  #include "dumputils.h"
  #include "dumpmem.h"
+ #include "parallel.h"
  
  /*--------
   * Routines in the format interface
***************
*** 60,65 **** static void _LoadBlobs(ArchiveHandle *AH, bool drop);
--- 61,70 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
+ static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+ static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+ char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+ 
  typedef struct
  {
  	CompressorState *cs;
***************
*** 128,133 **** InitArchiveFmt_Custom(ArchiveHandle *AH)
--- 133,145 ----
  	AH->ClonePtr = _Clone;
  	AH->DeClonePtr = _DeClone;
  
+ 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
+ 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
+ 
+ 	/* no parallel dump in the custom archive, only parallel restore */
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+ 
  	/* Set up a private area. */
  	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
  	AH->formatData = (void *) ctx;
***************
*** 699,705 **** _CloseArchive(ArchiveHandle *AH)
  		tpos = ftello(AH->FH);
  		WriteToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
! 		WriteDataChunks(AH);
  
  		/*
  		 * If possible, re-write the TOC in order to update the data offset
--- 711,717 ----
  		tpos = ftello(AH->FH);
  		WriteToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
! 		WriteDataChunks(AH, NULL);
  
  		/*
  		 * If possible, re-write the TOC in order to update the data offset
***************
*** 797,802 **** _DeClone(ArchiveHandle *AH)
--- 809,889 ----
  	free(ctx);
  }
  
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * custom format archive and dumps the actual data.
+  */
+ char *
+ _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	ParallelArgs pargs;
+ 	int			status;
+ 	lclTocEntry *tctx;
+ 
+ 	tctx = (lclTocEntry *) te->formatData;
+ 
+ 	pargs.AH = AH;
+ 	pargs.te = te;
+ 
+ 	status = parallel_restore(&pargs);
+ 
+ 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the parent process. Depending on the desired
+  * action (dump or restore) it creates a string that is understood by the
+  * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static char *
+ _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ 	/*
+ 	 * A static char is okay here, even on Windows because we call this
+ 	 * function only from one process (the master).
+ 	 */
+ 	static char			buf[64]; /* short fixed-size string + number */
+ 
+ 	/* no parallel dump in the custom archive format */
+ 	Assert(act == ACT_RESTORE);
+ 
+ 	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the parent process. It analyzes the response of
+  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static int
+ _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes, status, n_errors;
+ 
+ 	/* no parallel dump in the custom archive */
+ 	Assert(act == ACT_RESTORE);
+ 
+ 	sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 	Assert(nBytes == strlen(str));
+ 	Assert(dumpId == te->dumpId);
+ 
+ 	AH->public.n_errors += n_errors;
+ 
+ 	return status;
+ }
+ 
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
*** a/src/bin/pg_dump/pg_backup_db.c
--- b/src/bin/pg_dump/pg_backup_db.c
***************
*** 309,320 **** ConnectDatabase(Archive *AHX,
  	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
  }
  
  void
  DisconnectDatabase(Archive *AHX)
  {
  	ArchiveHandle *AH = (ArchiveHandle *) AHX;
  
! 	PQfinish(AH->connection);	/* noop if AH->connection is NULL */
  	AH->connection = NULL;
  }
  
--- 309,338 ----
  	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
  }
  
+ /*
+  * Close the connection to the database and also cancel off the query if we
+  * have one running.
+  */
  void
  DisconnectDatabase(Archive *AHX)
  {
  	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ 	PGcancel   *cancel;
+ 	char		errbuf[1];
+ 
+ 	if (!AH->connection)
+ 		return;
  
! 	if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
! 	{
! 		if ((cancel = PQgetCancel(AH->connection)))
! 		{
! 			PQcancel(cancel, errbuf, sizeof(errbuf));
! 			PQfreeCancel(cancel);
! 		}
! 	}
! 
! 	PQfinish(AH->connection);
  	AH->connection = NULL;
  }
  
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
***************
*** 36,41 ****
--- 36,42 ----
  #include "compress_io.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  #include <dirent.h>
  #include <sys/stat.h>
***************
*** 51,56 **** typedef struct
--- 52,58 ----
  	cfp		   *dataFH;			/* currently open data file */
  
  	cfp		   *blobsTocFH;		/* file handle for blobs.toc */
+ 	ParallelState *pstate;		/* for parallel backup / restore */
  } lclContext;
  
  typedef struct
***************
*** 71,76 **** static int	_ReadByte(ArchiveHandle *);
--- 73,79 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 83,90 **** static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
! static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename);
  
  
  /*
   *	Init routine required by ALL formats. This is a global routine
--- 86,102 ----
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
! static void _Clone(ArchiveHandle *AH);
! static void _DeClone(ArchiveHandle *AH);
  
+ static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+ static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
+ 								  const char *str, T_Action act);
+ static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
+ static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
+ 
+ static char *prependDirectory(ArchiveHandle *AH, char *buf,
+ 							  const char *relativeFilename);
  
  /*
   *	Init routine required by ALL formats. This is a global routine
***************
*** 111,117 **** InitArchiveFmt_Directory(ArchiveHandle *AH)
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
! 	AH->ReopenPtr = NULL;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
--- 123,129 ----
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
! 	AH->ReopenPtr = _ReopenArchive;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 122,129 **** InitArchiveFmt_Directory(ArchiveHandle *AH)
  	AH->EndBlobPtr = _EndBlob;
  	AH->EndBlobsPtr = _EndBlobs;
  
! 	AH->ClonePtr = NULL;
! 	AH->DeClonePtr = NULL;
  
  	/* Set up our private context */
  	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
--- 134,147 ----
  	AH->EndBlobPtr = _EndBlob;
  	AH->EndBlobsPtr = _EndBlobs;
  
! 	AH->ClonePtr = _Clone;
! 	AH->DeClonePtr = _DeClone;
! 
! 	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
! 	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
! 
! 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
! 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
  
  	/* Set up our private context */
  	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
***************
*** 147,162 **** InitArchiveFmt_Directory(ArchiveHandle *AH)
  
  	if (AH->mode == archModeWrite)
  	{
! 		if (mkdir(ctx->directory, 0700) < 0)
  			exit_horribly(modulename, "could not create directory \"%s\": %s\n",
  						  ctx->directory, strerror(errno));
  	}
  	else
  	{							/* Read Mode */
! 		char	   *fname;
  		cfp		   *tocFH;
  
! 		fname = prependDirectory(AH, "toc.dat");
  
  		tocFH = cfopen_read(fname, PG_BINARY_R);
  		if (tocFH == NULL)
--- 165,201 ----
  
  	if (AH->mode == archModeWrite)
  	{
! 		struct stat st;
! 		bool is_empty = false;
! 
! 		/* we accept an empty existing directory */
! 		if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
! 		{
! 			DIR* dir = opendir(ctx->directory);
! 			if (dir) {
! 				struct dirent *d;
! 				is_empty = true;
! 				while ((d = readdir(dir))) {
! 					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
! 					{
! 						is_empty = false;
! 						break;
! 					}
! 				}
! 				closedir(dir);
! 			}
! 		}
! 
! 		if (!is_empty && mkdir(ctx->directory, 0700) < 0)
  			exit_horribly(modulename, "could not create directory \"%s\": %s\n",
  						  ctx->directory, strerror(errno));
  	}
  	else
  	{							/* Read Mode */
! 		char	   fname[MAXPGPATH];
  		cfp		   *tocFH;
  
! 		prependDirectory(AH, fname, "toc.dat");
  
  		tocFH = cfopen_read(fname, PG_BINARY_R);
  		if (tocFH == NULL)
***************
*** 282,290 **** _StartData(ArchiveHandle *AH, TocEntry *te)
  {
  	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char	   *fname;
  
! 	fname = prependDirectory(AH, tctx->filename);
  
  	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
  	if (ctx->dataFH == NULL)
--- 321,329 ----
  {
  	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char		fname[MAXPGPATH];
  
! 	prependDirectory(AH, fname, tctx->filename);
  
  	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
  	if (ctx->dataFH == NULL)
***************
*** 309,314 **** _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
--- 348,356 ----
  	if (dLen == 0)
  		return 0;
  
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	return cfwrite(data, dLen, ctx->dataFH);
  }
  
***************
*** 376,383 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  		_LoadBlobs(AH, ropt);
  	else
  	{
! 		char	   *fname = prependDirectory(AH, tctx->filename);
  
  		_PrintFileData(AH, fname, ropt);
  	}
  }
--- 418,426 ----
  		_LoadBlobs(AH, ropt);
  	else
  	{
! 		char		fname[MAXPGPATH];
  
+ 		prependDirectory(AH, fname, tctx->filename);
  		_PrintFileData(AH, fname, ropt);
  	}
  }
***************
*** 387,398 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char	   *fname;
  	char		line[MAXPGPATH];
  
  	StartRestoreBlobs(AH);
  
! 	fname = prependDirectory(AH, "blobs.toc");
  
  	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
  
--- 430,441 ----
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char		fname[MAXPGPATH];
  	char		line[MAXPGPATH];
  
  	StartRestoreBlobs(AH);
  
! 	prependDirectory(AH, fname, "blobs.toc");
  
  	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
  
***************
*** 475,480 **** _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
--- 518,526 ----
  	lclContext *ctx = (lclContext *) AH->formatData;
  	size_t		res;
  
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	res = cfwrite(buf, len, ctx->dataFH);
  	if (res != len)
  		exit_horribly(modulename, "could not write to output file: %s\n",
***************
*** 519,525 **** _CloseArchive(ArchiveHandle *AH)
  	if (AH->mode == archModeWrite)
  	{
  		cfp		   *tocFH;
! 		char	   *fname = prependDirectory(AH, "toc.dat");
  
  		/* The TOC is always created uncompressed */
  		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
--- 565,576 ----
  	if (AH->mode == archModeWrite)
  	{
  		cfp		   *tocFH;
! 		char		fname[MAXPGPATH];
! 
! 		prependDirectory(AH, fname, "toc.dat");
! 
! 		/* this will actually fork the processes for a parallel backup */
! 		ctx->pstate = ParallelBackupStart(AH, NULL);
  
  		/* The TOC is always created uncompressed */
  		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
***************
*** 540,550 **** _CloseArchive(ArchiveHandle *AH)
  		if (cfclose(tocFH) != 0)
  			exit_horribly(modulename, "could not close TOC file: %s\n",
  						  strerror(errno));
! 		WriteDataChunks(AH);
  	}
  	AH->FH = NULL;
  }
  
  
  /*
   * BLOB support
--- 591,614 ----
  		if (cfclose(tocFH) != 0)
  			exit_horribly(modulename, "could not close TOC file: %s\n",
  						  strerror(errno));
! 		WriteDataChunks(AH, ctx->pstate);
! 
! 		ParallelBackupEnd(AH, ctx->pstate);
  	}
  	AH->FH = NULL;
  }
  
+ /*
+  * Reopen the archive's file handle.
+  */
+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ 	/*
+ 	 * Our TOC is in memory, our data files are opened by each child anyway as
+ 	 * they are separate. We support reopening the archive by just doing nothing.
+ 	 */
+ }
  
  /*
   * BLOB support
***************
*** 561,569 **** static void
  _StartBlobs(ArchiveHandle *AH, TocEntry *te)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char	   *fname;
  
! 	fname = prependDirectory(AH, "blobs.toc");
  
  	/* The blob TOC file is never compressed */
  	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
--- 625,633 ----
  _StartBlobs(ArchiveHandle *AH, TocEntry *te)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char		fname[MAXPGPATH];
  
! 	prependDirectory(AH, fname, "blobs.toc");
  
  	/* The blob TOC file is never compressed */
  	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
***************
*** 628,639 **** _EndBlobs(ArchiveHandle *AH, TocEntry *te)
  	ctx->blobsTocFH = NULL;
  }
  
! 
  static char *
! prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
- 	static char buf[MAXPGPATH];
  	char	   *dname;
  
  	dname = ctx->directory;
--- 692,707 ----
  	ctx->blobsTocFH = NULL;
  }
  
! /*
!  * Gets a relative file name and prepends the output directory, writing the
!  * result to buf. The caller needs to make sure that buf is MAXPGPATH bytes
!  * big. Can't use a static char[MAXPGPATH] inside the function because we run
!  * multithreaded on Windows.
!  */
  static char *
! prependDirectory(ArchiveHandle *AH, char *buf, const char *relativeFilename)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  	char	   *dname;
  
  	dname = ctx->directory;
***************
*** 647,649 **** prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
--- 715,866 ----
  
  	return buf;
  }
+ 
+ /*
+  * Clone format-specific fields during parallel restoration.
+  */
+ static void
+ _Clone(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 
+ 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
+ 	memcpy(AH->formatData, ctx, sizeof(lclContext));
+ 	ctx = (lclContext *) AH->formatData;
+ 
+ 	/*
+ 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
+ 	 * entry per archive, so no parallelism is possible.  Likewise,
+ 	 * TOC-entry-local state isn't an issue because any one TOC entry is
+ 	 * touched by just one worker child.
+ 	 */
+ 
+ 	/*
+ 	 * We also don't copy the ParallelState pointer (pstate), only the master
+ 	 * process ever writes to it.
+ 	 */
+ }
+ 
+ static void
+ _DeClone(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 	free(ctx);
+ }
+ 
+ /*
+  * This function is executed in the parent process. Depending on the desired
+  * action (dump or restore) it creates a string that is understood by the
+  * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static char *
+ _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ 	/*
+ 	 * A static char is okay here, even on Windows because we call this
+ 	 * function only from one process (the master).
+ 	 */
+ 	static char	buf[64];
+ 
+ 	if (act == ACT_DUMP)
+ 		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
+ 	else if (act == ACT_RESTORE)
+ 		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * directory archive and dumps the actual data.
+  *
+  * We are currently returning only the DumpId so theoretically we could
+  * make this function returning an int (or a DumpId). However, to
+  * facilitate further enhancements and because sooner or later we need to
+  * convert this to a string and send it via a message anyway, we stick with
+  * char *. It is parsed on the other side by the _EndMasterParallel()
+  * function of the respective dump format.
+  */
+ static char *
+ _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ 
+ 	/* This should never happen */
+ 	if (!tctx)
+ 		exit_horribly(modulename, "Error during backup\n");
+ 
+ 	/*
+ 	 * This function returns void. We either fail and die horribly or succeed...
+ 	 * A failure will be detected by the parent when the child dies unexpectedly.
+ 	 */
+ 	WriteDataChunksForTocEntry(AH, te);
+ 
+ 	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * directory archive and dumps the actual data.
+  */
+ static char *
+ _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	ParallelArgs pargs;
+ 	int			status;
+ 	lclTocEntry *tctx;
+ 
+ 	tctx = (lclTocEntry *) te->formatData;
+ 
+ 	pargs.AH = AH;
+ 	pargs.te = te;
+ 
+ 	status = parallel_restore(&pargs);
+ 
+ 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ 
+ 	return buf;
+ }
+ /*
+  * This function is executed in the parent process. It analyzes the response of
+  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static int
+ _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes, n_errors;
+ 	int			status = 0;
+ 
+ 	if (act == ACT_DUMP)
+ 	{
+ 		sscanf(str, "%u%n", &dumpId, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(str));
+ 	}
+ 	else if (act == ACT_RESTORE)
+ 	{
+ 		sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(str));
+ 
+ 		AH->public.n_errors += n_errors;
+ 	}
+ 
+ 	return status;
+ }
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
***************
*** 156,161 **** InitArchiveFmt_Tar(ArchiveHandle *AH)
--- 156,167 ----
  	AH->ClonePtr = NULL;
  	AH->DeClonePtr = NULL;
  
+ 	AH->MasterStartParallelItemPtr = NULL;
+ 	AH->MasterEndParallelItemPtr = NULL;
+ 
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = NULL;
+ 
  	/*
  	 * Set up some special context used in compressing data.
  	 */
***************
*** 826,832 **** _CloseArchive(ArchiveHandle *AH)
  		/*
  		 * Now send the data (tables & blobs)
  		 */
! 		WriteDataChunks(AH);
  
  		/*
  		 * Now this format wants to append a script which does a full restore
--- 832,838 ----
  		/*
  		 * Now send the data (tables & blobs)
  		 */
! 		WriteDataChunks(AH, NULL);
  
  		/*
  		 * Now this format wants to append a script which does a full restore
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 136,141 **** static int	disable_dollar_quoting = 0;
--- 136,142 ----
  static int	dump_inserts = 0;
  static int	column_inserts = 0;
  static int	no_security_labels = 0;
+ static int  no_synchronized_snapshots = 0;
  static int	no_unlogged_table_data = 0;
  static int	serializable_deferrable = 0;
  
***************
*** 241,248 **** static Oid	findLastBuiltinOid_V70(Archive *fout);
  static void selectSourceSchema(Archive *fout, const char *schemaName);
  static char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
- static const char *fmtQualifiedId(Archive *fout,
- 			   const char *schema, const char *id);
  static void getBlobs(Archive *fout);
  static void dumpBlob(Archive *fout, BlobInfo *binfo);
  static int	dumpBlobs(Archive *fout, void *arg);
--- 242,247 ----
***************
*** 260,266 **** static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
  								DumpableObject *dobj,
  								const char *objlabel);
  static const char *getAttrName(int attrnum, TableInfo *tblInfo);
! static const char *fmtCopyColumnList(const TableInfo *ti);
  static PGresult *ExecuteSqlQueryForSingleRow(Archive *fout, char *query);
  
  
--- 259,266 ----
  								DumpableObject *dobj,
  								const char *objlabel);
  static const char *getAttrName(int attrnum, TableInfo *tblInfo);
! static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
! static char *get_synchronized_snapshot(Archive *fout);
  static PGresult *ExecuteSqlQueryForSingleRow(Archive *fout, char *query);
  
  
***************
*** 282,287 **** main(int argc, char **argv)
--- 282,288 ----
  	int			numObjs;
  	DumpableObject *boundaryObjs;
  	int			i;
+ 	int			numWorkers = 1;
  	enum trivalue prompt_password = TRI_DEFAULT;
  	int			compressLevel = -1;
  	int			plainText = 0;
***************
*** 311,316 **** main(int argc, char **argv)
--- 312,318 ----
  		{"format", required_argument, NULL, 'F'},
  		{"host", required_argument, NULL, 'h'},
  		{"ignore-version", no_argument, NULL, 'i'},
+ 		{"jobs", 1, NULL, 'j'},
  		{"no-reconnect", no_argument, NULL, 'R'},
  		{"oids", no_argument, NULL, 'o'},
  		{"no-owner", no_argument, NULL, 'O'},
***************
*** 350,355 **** main(int argc, char **argv)
--- 352,358 ----
  		{"serializable-deferrable", no_argument, &serializable_deferrable, 1},
  		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
  		{"no-security-labels", no_argument, &no_security_labels, 1},
+ 		{"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1},
  		{"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1},
  
  		{NULL, 0, NULL, 0}
***************
*** 357,362 **** main(int argc, char **argv)
--- 360,371 ----
  
  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
  
+ 	/*
+ 	 * Initialize what we need for parallel execution, especially for thread
+ 	 * support on Windows.
+ 	 */
+ 	init_parallel_dump_utils();
+ 
  	g_verbose = false;
  
  	strcpy(g_comment_start, "-- ");
***************
*** 387,393 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:in:N:oOp:RsS:t:T:U:vwWxZ:",
  							long_options, &optindex)) != -1)
  	{
  		switch (c)
--- 396,402 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:ij:n:N:oOp:RsS:t:T:U:vwWxZ:",
  							long_options, &optindex)) != -1)
  	{
  		switch (c)
***************
*** 428,433 **** main(int argc, char **argv)
--- 437,446 ----
  				/* ignored, deprecated option */
  				break;
  
+ 			case 'j':			/* number of dump jobs */
+ 				numWorkers = atoi(optarg);
+ 				break;
+ 
  			case 'n':			/* include schema(s) */
  				simple_string_list_append(&schema_include_patterns, optarg);
  				include_everything = false;
***************
*** 567,572 **** main(int argc, char **argv)
--- 580,601 ----
  			compressLevel = 0;
  	}
  
+ 	/*
+ 	 * On Windows we can only have at most MAXIMUM_WAIT_OBJECTS (= 64 usually)
+ 	 * parallel jobs because that's the maximum limit for the
+ 	 * WaitForMultipleObjects() call.
+ 	 */
+ 	if (numWorkers <= 0
+ #ifdef WIN32
+ 			|| numWorkers > MAXIMUM_WAIT_OBJECTS
+ #endif
+ 		)
+ 		exit_horribly(NULL, "%s: invalid number of parallel jobs\n", progname);
+ 
+ 	/* Parallel backup only in the directory archive format so far */
+ 	if (archiveFormat != archDirectory && numWorkers > 1)
+ 		exit_horribly(NULL, "parallel backup only supported by the directory format\n");
+ 
  	/* Open the output file */
  	fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode);
  
***************
*** 590,595 **** main(int argc, char **argv)
--- 619,626 ----
  	fout->minRemoteVersion = 70000;
  	fout->maxRemoteVersion = (my_version / 100) * 100 + 99;
  
+ 	fout->numWorkers = numWorkers;
+ 
  	/*
  	 * Open the database using the Archiver, so it knows about it. Errors mean
  	 * death.
***************
*** 604,628 **** main(int argc, char **argv)
  	if (fout->remoteVersion < 90100)
  		no_security_labels = 1;
  
- 	/*
- 	 * Start transaction-snapshot mode transaction to dump consistent data.
- 	 */
- 	ExecuteSqlStatement(fout, "BEGIN");
- 	if (fout->remoteVersion >= 90100)
- 	{
- 		if (serializable_deferrable)
- 			ExecuteSqlStatement(fout,
- 								"SET TRANSACTION ISOLATION LEVEL "
- 								"SERIALIZABLE, READ ONLY, DEFERRABLE");
- 		else
- 			ExecuteSqlStatement(fout,
- 								"SET TRANSACTION ISOLATION LEVEL "
- 								"REPEATABLE READ");
- 	}
- 	else
- 		ExecuteSqlStatement(fout,
- 							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
- 
  	/* Select the appropriate subquery to convert user IDs to names */
  	if (fout->remoteVersion >= 80100)
  		username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
--- 635,640 ----
***************
*** 631,636 **** main(int argc, char **argv)
--- 643,656 ----
  	else
  		username_subquery = "SELECT usename FROM pg_user WHERE usesysid =";
  
+ 	/* check the version for the synchronized snapshots feature */
+ 	if (numWorkers > 1 && fout->remoteVersion < 90200
+ 		&& !no_synchronized_snapshots)
+ 		exit_horribly(NULL,
+ 					 "No synchronized snapshots available in this server version.\n"
+ 					 "Run with --no-synchronized-snapshots instead if you do not\n"
+ 					 "need synchronized snapshots.\n");
+ 
  	/* Find the last built-in OID, if needed */
  	if (fout->remoteVersion < 70300)
  	{
***************
*** 727,732 **** main(int argc, char **argv)
--- 747,756 ----
  	else
  		sortDumpableObjectsByTypeOid(dobjs, numObjs);
  
+ 	/* If we do a parallel dump, we want the largest tables to go first */
+ 	if (archiveFormat == archDirectory && numWorkers > 1)
+ 		sortDataAndIndexObjectsBySize(dobjs, numObjs);
+ 
  	sortDumpableObjects(dobjs, numObjs,
  						boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
  
***************
*** 808,813 **** help(const char *progname)
--- 832,838 ----
  	printf(_("  -f, --file=FILENAME          output file or directory name\n"));
  	printf(_("  -F, --format=c|d|t|p         output file format (custom, directory, tar,\n"
  			 "                               plain text (default))\n"));
+ 	printf(_("  -j, --jobs=NUM               use this many parallel jobs to dump\n"));
  	printf(_("  -v, --verbose                verbose mode\n"));
  	printf(_("  -V, --version                output version information, then exit\n"));
  	printf(_("  -Z, --compress=0-9           compression level for compressed formats\n"));
***************
*** 837,842 **** help(const char *progname)
--- 862,868 ----
  	printf(_("  --exclude-table-data=TABLE   do NOT dump data for the named table(s)\n"));
  	printf(_("  --inserts                    dump data as INSERT commands, rather than COPY\n"));
  	printf(_("  --no-security-labels         do not dump security label assignments\n"));
+ 	printf(_("  --no-synchronized-snapshots parallel processes should not use synchronized snapshots\n"));
  	printf(_("  --no-tablespaces             do not dump tablespace assignments\n"));
  	printf(_("  --no-unlogged-table-data     do not dump unlogged table data\n"));
  	printf(_("  --quote-all-identifiers      quote all identifiers, even if not key words\n"));
***************
*** 865,871 **** setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
  	PGconn	   *conn = GetConnection(AH);
  	const char *std_strings;
  
! 	/* Set the client encoding if requested */
  	if (dumpencoding)
  	{
  		if (PQsetClientEncoding(conn, dumpencoding) < 0)
--- 891,902 ----
  	PGconn	   *conn = GetConnection(AH);
  	const char *std_strings;
  
! 	/*
! 	 * Set the client encoding if requested. If dumpencoding == NULL then
! 	 * either it hasn't been requested or we're a cloned connection and then this
! 	 * has already been set in CloneArchive according to the original
! 	 * connection encoding.
! 	 */
  	if (dumpencoding)
  	{
  		if (PQsetClientEncoding(conn, dumpencoding) < 0)
***************
*** 883,888 **** setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
--- 914,923 ----
  	AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
  
  	/* Set the role if requested */
+ 	if (!use_role && AH->use_role)
+ 		use_role = AH->use_role;
+ 
+ 	/* Set the role if requested */
  	if (use_role && AH->remoteVersion >= 80100)
  	{
  		PQExpBuffer query = createPQExpBuffer();
***************
*** 890,895 **** setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
--- 925,934 ----
  		appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
  		ExecuteSqlStatement(AH, query->data);
  		destroyPQExpBuffer(query);
+ 
+ 		/* save this for later use on parallel connections */
+ 		if (!AH->use_role)
+ 			AH->use_role = strdup(use_role);
  	}
  
  	/* Set the datestyle to ISO to ensure the dump's portability */
***************
*** 926,931 **** setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
--- 965,1023 ----
  	 */
  	if (quote_all_identifiers && AH->remoteVersion >= 90100)
  		ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
+ 
+ 	/*
+ 	 * Start transaction-snapshot mode transaction to dump consistent data.
+ 	 */
+ 	ExecuteSqlStatement(AH, "BEGIN");
+ 	if (AH->remoteVersion >= 90100)
+ 	{
+ 		if (serializable_deferrable)
+ 			ExecuteSqlStatement(AH,
+ 						   "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, "
+ 						   "READ ONLY, DEFERRABLE");
+ 		else
+ 			ExecuteSqlStatement(AH,
+ 						   "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ 	}
+ 	else
+ 		ExecuteSqlStatement(AH, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+ 
+ 	if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots)
+ 	{
+ 		if (AH->sync_snapshot_id)
+ 		{
+ 			PQExpBuffer query = createPQExpBuffer();
+ 			appendPQExpBuffer(query, "SET TRANSACTION SNAPSHOT ");
+ 			appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
+ 			destroyPQExpBuffer(query);
+ 		}
+ 		else
+ 			AH->sync_snapshot_id = get_synchronized_snapshot(AH);
+ 	}
+ }
+ 
+ /*
+  * Initialize the connection for a new worker process.
+  */
+ void
+ _SetupWorker(Archive *AHX, RestoreOptions *ropt)
+ {
+ 	setup_connection(AHX, NULL, NULL);
+ }
+ 
+ static char*
+ get_synchronized_snapshot(Archive *fout)
+ {
+ 	char	   *query = "select pg_export_snapshot()";
+ 	char	   *result;
+ 	PGresult   *res;
+ 
+ 	res = ExecuteSqlQueryForSingleRow(fout, query);
+ 	result = strdup(PQgetvalue(res, 0, 0));
+ 	PQclear(res);
+ 
+ 	return result;
  }
  
  static ArchiveFormat
***************
*** 1243,1248 **** dumpTableData_copy(Archive *fout, void *dcontext)
--- 1335,1345 ----
  	const bool	hasoids = tbinfo->hasoids;
  	const bool	oids = tdinfo->oids;
  	PQExpBuffer q = createPQExpBuffer();
+ 	/*
+ 	 * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId which
+ 	 * uses it already.
+ 	 */
+ 	PQExpBuffer clistBuf = createPQExpBuffer();
  	PGconn	   *conn = GetConnection(fout);
  	PGresult   *res;
  	int			ret;
***************
*** 1267,1280 **** dumpTableData_copy(Archive *fout, void *dcontext)
  	 * cases involving ADD COLUMN and inheritance.)
  	 */
  	if (fout->remoteVersion >= 70300)
! 		column_list = fmtCopyColumnList(tbinfo);
  	else
  		column_list = "";		/* can't select columns in COPY */
  
  	if (oids && hasoids)
  	{
  		appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;",
! 						  fmtQualifiedId(fout,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname),
  						  column_list);
--- 1364,1377 ----
  	 * cases involving ADD COLUMN and inheritance.)
  	 */
  	if (fout->remoteVersion >= 70300)
! 		column_list = fmtCopyColumnList(tbinfo, clistBuf);
  	else
  		column_list = "";		/* can't select columns in COPY */
  
  	if (oids && hasoids)
  	{
  		appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;",
! 						  fmtQualifiedId(fout->remoteVersion,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname),
  						  column_list);
***************
*** 1292,1298 **** dumpTableData_copy(Archive *fout, void *dcontext)
  		else
  			appendPQExpBufferStr(q, "* ");
  		appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
! 						  fmtQualifiedId(fout,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname),
  						  tdinfo->filtercond);
--- 1389,1395 ----
  		else
  			appendPQExpBufferStr(q, "* ");
  		appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
! 						  fmtQualifiedId(fout->remoteVersion,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname),
  						  tdinfo->filtercond);
***************
*** 1300,1312 **** dumpTableData_copy(Archive *fout, void *dcontext)
  	else
  	{
  		appendPQExpBuffer(q, "COPY %s %s TO stdout;",
! 						  fmtQualifiedId(fout,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname),
  						  column_list);
  	}
  	res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
  	PQclear(res);
  
  	for (;;)
  	{
--- 1397,1410 ----
  	else
  	{
  		appendPQExpBuffer(q, "COPY %s %s TO stdout;",
! 						  fmtQualifiedId(fout->remoteVersion,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname),
  						  column_list);
  	}
  	res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
  	PQclear(res);
+ 	destroyPQExpBuffer(clistBuf);
  
  	for (;;)
  	{
***************
*** 1425,1431 **** dumpTableData_insert(Archive *fout, void *dcontext)
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM ONLY %s",
! 						  fmtQualifiedId(fout,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname));
  	}
--- 1523,1529 ----
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM ONLY %s",
! 						  fmtQualifiedId(fout->remoteVersion,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname));
  	}
***************
*** 1433,1439 **** dumpTableData_insert(Archive *fout, void *dcontext)
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM %s",
! 						  fmtQualifiedId(fout,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname));
  	}
--- 1531,1537 ----
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM %s",
! 						  fmtQualifiedId(fout->remoteVersion,
  										 tbinfo->dobj.namespace->dobj.name,
  										 classname));
  	}
***************
*** 1565,1570 **** dumpTableData(Archive *fout, TableDataInfo *tdinfo)
--- 1663,1669 ----
  {
  	TableInfo  *tbinfo = tdinfo->tdtable;
  	PQExpBuffer copyBuf = createPQExpBuffer();
+ 	PQExpBuffer clistBuf = createPQExpBuffer();
  	DataDumperPtr dumpFn;
  	char	   *copyStmt;
  
***************
*** 1576,1582 **** dumpTableData(Archive *fout, TableDataInfo *tdinfo)
  		appendPQExpBuffer(copyBuf, "COPY %s ",
  						  fmtId(tbinfo->dobj.name));
  		appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n",
! 						  fmtCopyColumnList(tbinfo),
  					  (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : "");
  		copyStmt = copyBuf->data;
  	}
--- 1675,1681 ----
  		appendPQExpBuffer(copyBuf, "COPY %s ",
  						  fmtId(tbinfo->dobj.name));
  		appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n",
! 						  fmtCopyColumnList(tbinfo, clistBuf),
  					  (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : "");
  		copyStmt = copyBuf->data;
  	}
***************
*** 1601,1606 **** dumpTableData(Archive *fout, TableDataInfo *tdinfo)
--- 1700,1706 ----
  				 dumpFn, tdinfo);
  
  	destroyPQExpBuffer(copyBuf);
+ 	destroyPQExpBuffer(clistBuf);
  }
  
  /*
***************
*** 3867,3872 **** getTables(Archive *fout, int *numTables)
--- 3967,3973 ----
  	int			i_reloptions;
  	int			i_toastreloptions;
  	int			i_reloftype;
+ 	int			i_relpages;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema(fout, "pg_catalog");
***************
*** 3906,3911 **** getTables(Archive *fout, int *numTables)
--- 4007,4013 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "c.relpersistence, "
+ 						  "c.relpages, "
  						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
***************
*** 3942,3947 **** getTables(Archive *fout, int *numTables)
--- 4044,4050 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "c.relpages, "
  						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
***************
*** 3977,3982 **** getTables(Archive *fout, int *numTables)
--- 4080,4086 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "c.relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
***************
*** 4012,4017 **** getTables(Archive *fout, int *numTables)
--- 4116,4122 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "c.relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
***************
*** 4048,4053 **** getTables(Archive *fout, int *numTables)
--- 4153,4159 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
***************
*** 4083,4088 **** getTables(Archive *fout, int *numTables)
--- 4189,4195 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
***************
*** 4114,4119 **** getTables(Archive *fout, int *numTables)
--- 4221,4227 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "NULL::oid AS owning_tab, "
  						  "NULL::int4 AS owning_col, "
***************
*** 4140,4145 **** getTables(Archive *fout, int *numTables)
--- 4248,4254 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "NULL::oid AS owning_tab, "
  						  "NULL::int4 AS owning_col, "
***************
*** 4176,4181 **** getTables(Archive *fout, int *numTables)
--- 4285,4291 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "0 AS relpages, "
  						  "NULL AS reloftype, "
  						  "NULL::oid AS owning_tab, "
  						  "NULL::int4 AS owning_col, "
***************
*** 4229,4234 **** getTables(Archive *fout, int *numTables)
--- 4339,4345 ----
  	i_reloptions = PQfnumber(res, "reloptions");
  	i_toastreloptions = PQfnumber(res, "toast_reloptions");
  	i_reloftype = PQfnumber(res, "reloftype");
+ 	i_relpages = PQfnumber(res, "relpages");
  
  	if (lockWaitTimeout && fout->remoteVersion >= 70300)
  	{
***************
*** 4285,4290 **** getTables(Archive *fout, int *numTables)
--- 4396,4402 ----
  		tblinfo[i].reltablespace = pg_strdup(PQgetvalue(res, i, i_reltablespace));
  		tblinfo[i].reloptions = pg_strdup(PQgetvalue(res, i, i_reloptions));
  		tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions));
+ 		tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages));
  
  		/* other fields were zeroed above */
  
***************
*** 4313,4319 **** getTables(Archive *fout, int *numTables)
  			resetPQExpBuffer(query);
  			appendPQExpBuffer(query,
  							  "LOCK TABLE %s IN ACCESS SHARE MODE",
! 							  fmtQualifiedId(fout,
  										tblinfo[i].dobj.namespace->dobj.name,
  											 tblinfo[i].dobj.name));
  			ExecuteSqlStatement(fout, query->data);
--- 4425,4431 ----
  			resetPQExpBuffer(query);
  			appendPQExpBuffer(query,
  							  "LOCK TABLE %s IN ACCESS SHARE MODE",
! 							  fmtQualifiedId(fout->remoteVersion,
  										tblinfo[i].dobj.namespace->dobj.name,
  											 tblinfo[i].dobj.name));
  			ExecuteSqlStatement(fout, query->data);
***************
*** 4452,4458 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
  				i_conoid,
  				i_condef,
  				i_tablespace,
! 				i_options;
  	int			ntups;
  
  	for (i = 0; i < numTables; i++)
--- 4564,4571 ----
  				i_conoid,
  				i_condef,
  				i_tablespace,
! 				i_options,
! 				i_relpages;
  	int			ntups;
  
  	for (i = 0; i < numTables; i++)
***************
*** 4494,4499 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4607,4613 ----
  					 "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
  							  "t.relnatts AS indnkeys, "
  							  "i.indkey, i.indisclustered, "
+ 							  "t.relpages, "
  							  "c.contype, c.conname, "
  							  "c.condeferrable, c.condeferred, "
  							  "c.tableoid AS contableoid, "
***************
*** 4519,4524 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4633,4639 ----
  					 "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
  							  "t.relnatts AS indnkeys, "
  							  "i.indkey, i.indisclustered, "
+ 							  "t.relpages, "
  							  "c.contype, c.conname, "
  							  "c.condeferrable, c.condeferred, "
  							  "c.tableoid AS contableoid, "
***************
*** 4547,4552 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4662,4668 ----
  					 "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
  							  "t.relnatts AS indnkeys, "
  							  "i.indkey, i.indisclustered, "
+ 							  "t.relpages, "
  							  "c.contype, c.conname, "
  							  "c.condeferrable, c.condeferred, "
  							  "c.tableoid AS contableoid, "
***************
*** 4575,4580 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4691,4697 ----
  					 "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
  							  "t.relnatts AS indnkeys, "
  							  "i.indkey, i.indisclustered, "
+ 							  "t.relpages, "
  							  "c.contype, c.conname, "
  							  "c.condeferrable, c.condeferred, "
  							  "c.tableoid AS contableoid, "
***************
*** 4603,4608 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4720,4726 ----
  							  "pg_get_indexdef(i.indexrelid) AS indexdef, "
  							  "t.relnatts AS indnkeys, "
  							  "i.indkey, false AS indisclustered, "
+ 							  "t.relpages, "
  							  "CASE WHEN i.indisprimary THEN 'p'::char "
  							  "ELSE '0'::char END AS contype, "
  							  "t.relname AS conname, "
***************
*** 4629,4634 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4747,4753 ----
  							  "pg_get_indexdef(i.indexrelid) AS indexdef, "
  							  "t.relnatts AS indnkeys, "
  							  "i.indkey, false AS indisclustered, "
+ 							  "t.relpages, "
  							  "CASE WHEN i.indisprimary THEN 'p'::char "
  							  "ELSE '0'::char END AS contype, "
  							  "t.relname AS conname, "
***************
*** 4657,4662 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4776,4782 ----
  		i_indnkeys = PQfnumber(res, "indnkeys");
  		i_indkey = PQfnumber(res, "indkey");
  		i_indisclustered = PQfnumber(res, "indisclustered");
+ 		i_relpages = PQfnumber(res, "relpages");
  		i_contype = PQfnumber(res, "contype");
  		i_conname = PQfnumber(res, "conname");
  		i_condeferrable = PQfnumber(res, "condeferrable");
***************
*** 4699,4704 **** getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
--- 4819,4825 ----
  			parseOidArray(PQgetvalue(res, j, i_indkey),
  						  indxinfo[j].indkeys, INDEX_MAX_KEYS);
  			indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't');
+ 			indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages));
  			contype = *(PQgetvalue(res, j, i_contype));
  
  			if (contype == 'p' || contype == 'u' || contype == 'x')
***************
*** 14476,14497 **** findDumpableDependencies(ArchiveHandle *AH, DumpableObject *dobj,
   *
   * Whenever the selected schema is not pg_catalog, be careful to qualify
   * references to system catalogs and types in our emitted commands!
   */
  static void
  selectSourceSchema(Archive *fout, const char *schemaName)
  {
- 	static char *curSchemaName = NULL;
  	PQExpBuffer query;
  
  	/* Not relevant if fetching from pre-7.3 DB */
  	if (fout->remoteVersion < 70300)
  		return;
- 	/* Ignore null schema names */
- 	if (schemaName == NULL || *schemaName == '\0')
- 		return;
- 	/* Optimize away repeated selection of same schema */
- 	if (curSchemaName && strcmp(curSchemaName, schemaName) == 0)
- 		return;
  
  	query = createPQExpBuffer();
  	appendPQExpBuffer(query, "SET search_path = %s",
--- 14597,14617 ----
   *
   * Whenever the selected schema is not pg_catalog, be careful to qualify
   * references to system catalogs and types in our emitted commands!
+  *
+  * This function is called only from selectSourceSchemaOnAH and
+  * selectSourceSchema.
   */
  static void
  selectSourceSchema(Archive *fout, const char *schemaName)
  {
  	PQExpBuffer query;
  
+ 	/* This is checked by the callers already */
+ 	Assert(schemaName != NULL && *schemaName != '\0');
+ 
  	/* Not relevant if fetching from pre-7.3 DB */
  	if (fout->remoteVersion < 70300)
  		return;
  
  	query = createPQExpBuffer();
  	appendPQExpBuffer(query, "SET search_path = %s",
***************
*** 14502,14510 **** selectSourceSchema(Archive *fout, const char *schemaName)
  	ExecuteSqlStatement(fout, query->data);
  
  	destroyPQExpBuffer(query);
- 	if (curSchemaName)
- 		free(curSchemaName);
- 	curSchemaName = pg_strdup(schemaName);
  }
  
  /*
--- 14622,14627 ----
***************
*** 14642,14712 **** myFormatType(const char *typname, int32 typmod)
  }
  
  /*
-  * fmtQualifiedId - convert a qualified name to the proper format for
-  * the source database.
-  *
-  * Like fmtId, use the result before calling again.
-  */
- static const char *
- fmtQualifiedId(Archive *fout, const char *schema, const char *id)
- {
- 	static PQExpBuffer id_return = NULL;
- 
- 	if (id_return)				/* first time through? */
- 		resetPQExpBuffer(id_return);
- 	else
- 		id_return = createPQExpBuffer();
- 
- 	/* Suppress schema name if fetching from pre-7.3 DB */
- 	if (fout->remoteVersion >= 70300 && schema && *schema)
- 	{
- 		appendPQExpBuffer(id_return, "%s.",
- 						  fmtId(schema));
- 	}
- 	appendPQExpBuffer(id_return, "%s",
- 					  fmtId(id));
- 
- 	return id_return->data;
- }
- 
- /*
   * Return a column list clause for the given relation.
   *
   * Special case: if there are no undropped columns in the relation, return
   * "", not an invalid "()" column list.
   */
  static const char *
! fmtCopyColumnList(const TableInfo *ti)
  {
- 	static PQExpBuffer q = NULL;
  	int			numatts = ti->numatts;
  	char	  **attnames = ti->attnames;
  	bool	   *attisdropped = ti->attisdropped;
  	bool		needComma;
  	int			i;
  
! 	if (q)						/* first time through? */
! 		resetPQExpBuffer(q);
! 	else
! 		q = createPQExpBuffer();
! 
! 	appendPQExpBuffer(q, "(");
  	needComma = false;
  	for (i = 0; i < numatts; i++)
  	{
  		if (attisdropped[i])
  			continue;
  		if (needComma)
! 			appendPQExpBuffer(q, ", ");
! 		appendPQExpBuffer(q, "%s", fmtId(attnames[i]));
  		needComma = true;
  	}
  
  	if (!needComma)
  		return "";				/* no undropped columns */
  
! 	appendPQExpBuffer(q, ")");
! 	return q->data;
  }
  
  /*
--- 14759,14795 ----
  }
  
  /*
   * Return a column list clause for the given relation.
   *
   * Special case: if there are no undropped columns in the relation, return
   * "", not an invalid "()" column list.
   */
  static const char *
! fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer)
  {
  	int			numatts = ti->numatts;
  	char	  **attnames = ti->attnames;
  	bool	   *attisdropped = ti->attisdropped;
  	bool		needComma;
  	int			i;
  
! 	appendPQExpBuffer(buffer, "(");
  	needComma = false;
  	for (i = 0; i < numatts; i++)
  	{
  		if (attisdropped[i])
  			continue;
  		if (needComma)
! 			appendPQExpBuffer(buffer, ", ");
! 		appendPQExpBuffer(buffer, "%s", fmtId(attnames[i]));
  		needComma = true;
  	}
  
  	if (!needComma)
  		return "";				/* no undropped columns */
  
! 	appendPQExpBuffer(buffer, ")");
! 	return buffer->data;
  }
  
  /*
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
***************
*** 259,264 **** typedef struct _tableInfo
--- 259,265 ----
  	/* these two are set only if table is a sequence owned by a column: */
  	Oid			owning_tab;		/* OID of table owning sequence */
  	int			owning_col;		/* attr # of column owning sequence */
+ 	int			relpages;
  
  	bool		interesting;	/* true if need to collect more data */
  
***************
*** 322,327 **** typedef struct _indxInfo
--- 323,329 ----
  	bool		indisclustered;
  	/* if there is an associated constraint object, its dumpId: */
  	DumpId		indexconstraint;
+ 	int			relpages;		/* relpages of the underlying table */
  } IndxInfo;
  
  typedef struct _ruleInfo
***************
*** 541,546 **** extern void sortDumpableObjects(DumpableObject **objs, int numObjs,
--- 543,549 ----
  					DumpId preBoundaryId, DumpId postBoundaryId);
  extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
  extern void sortDumpableObjectsByTypeOid(DumpableObject **objs, int numObjs);
+ extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);
  
  /*
   * version specific routines
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
***************
*** 142,147 **** static void repairDependencyLoop(DumpableObject **loop,
--- 142,234 ----
  static void describeDumpableObject(DumpableObject *obj,
  					   char *buf, int bufsize);
  
+ static int DOSizeCompare(const void *p1, const void *p2);
+ 
+ static int
+ findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
+ {
+ 	int i;
+ 	for (i = 0; i < numObjs; i++)
+ 		if (objs[i]->objType == type)
+ 			return i;
+ 	return -1;
+ }
+ 
+ static int
+ findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
+ {
+ 	int i;
+ 	for (i = start; i < numObjs; i++)
+ 		if (objs[i]->objType != type)
+ 			return i;
+ 	return numObjs - 1;
+ }
+ 
+ /*
+  * When we do a parallel dump, we want to start with the largest items first.
+  *
+  * Say we have the objects in this order:
+  * ....DDDDD....III....
+  *
+  * with D = Table data, I = Index, . = other object
+  *
+  * This sorting function now takes each of the D or I blocks and sorts them
+  * according to their size.
+  */
+ void
+ sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
+ {
+ 	int		startIdx, endIdx;
+ 	void   *startPtr;
+ 
+ 	if (numObjs <= 1)
+ 		return;
+ 
+ 	startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
+ 	if (startIdx >= 0)
+ 	{
+ 		endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
+ 		startPtr = objs + startIdx;
+ 		qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
+ 			  DOSizeCompare);
+ 	}
+ 
+ 	startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
+ 	if (startIdx >= 0)
+ 	{
+ 		endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
+ 		startPtr = objs + startIdx;
+ 		qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
+ 			  DOSizeCompare);
+ 	}
+ }
+ 
+ static int
+ DOSizeCompare(const void *p1, const void *p2)
+ {
+ 	DumpableObject *obj1 = *(DumpableObject **) p1;
+ 	DumpableObject *obj2 = *(DumpableObject **) p2;
+ 	int			obj1_size = 0;
+ 	int			obj2_size = 0;
+ 
+ 	if (obj1->objType == DO_TABLE_DATA)
+ 		obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
+ 	if (obj1->objType == DO_INDEX)
+ 		obj1_size = ((IndxInfo *) obj1)->relpages;
+ 
+ 	if (obj2->objType == DO_TABLE_DATA)
+ 		obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
+ 	if (obj2->objType == DO_INDEX)
+ 		obj2_size = ((IndxInfo *) obj2)->relpages;
+ 
+ 	/* we want to see the biggest item go first */
+ 	if (obj1_size > obj2_size)
+ 		return -1;
+ 	if (obj2_size > obj1_size)
+ 		return 1;
+ 
+ 	return 0;
+ }
  
  /*
   * Sort the given objects into a type/name-based ordering
*** a/src/bin/pg_dump/pg_restore.c
--- b/src/bin/pg_dump/pg_restore.c
***************
*** 72,77 **** main(int argc, char **argv)
--- 72,78 ----
  	RestoreOptions *opts;
  	int			c;
  	int			exit_code;
+ 	int			numWorkers = 1;
  	Archive    *AH;
  	char	   *inputFileSpec;
  	static int	disable_triggers = 0;
***************
*** 183,189 **** main(int argc, char **argv)
  				break;
  
  			case 'j':			/* number of restore jobs */
! 				opts->number_of_jobs = atoi(optarg);
  				break;
  
  			case 'l':			/* Dump the TOC summary */
--- 184,190 ----
  				break;
  
  			case 'j':			/* number of restore jobs */
! 				numWorkers = atoi(optarg);
  				break;
  
  			case 'l':			/* Dump the TOC summary */
***************
*** 314,320 **** main(int argc, char **argv)
  	}
  
  	/* Can't do single-txn mode with multiple connections */
! 	if (opts->single_txn && opts->number_of_jobs > 1)
  	{
  		fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"),
  				progname);
--- 315,321 ----
  	}
  
  	/* Can't do single-txn mode with multiple connections */
! 	if (opts->single_txn && numWorkers > 1)
  	{
  		fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"),
  				progname);
***************
*** 373,378 **** main(int argc, char **argv)
--- 374,391 ----
  	if (opts->tocFile)
  		SortTocFromFile(AH, opts);
  
+ 	/* See comments in pg_dump.c */
+ #ifdef WIN32
+ 	if (numWorkers > MAXIMUM_WAIT_OBJECTS)
+ 	{
+ 		fprintf(stderr, _("%s: maximum number of parallel jobs is %d\n"),
+ 				progname, MAXIMUM_WAIT_OBJECTS);
+ 		exit(1);
+ 	}
+ #endif
+ 
+ 	AH->numWorkers = numWorkers;
+ 
  	if (opts->tocSummary)
  		PrintTOCSummary(AH, opts);
  	else
***************
*** 394,399 **** main(int argc, char **argv)
--- 407,419 ----
  	return exit_code;
  }
  
+ void
+ _SetupWorker(Archive *AHX, RestoreOptions *ropt)
+ {
+ 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ 	(AH->ReopenPtr) (AH);
+ }
+ 
  static void
  usage(const char *progname)
  {
