diff --git a/src/backend/port/pipe.c b/src/backend/port/pipe.c
index 357f5ec..4e47d13 100644
*** a/src/backend/port/pipe.c
--- b/src/backend/port/pipe.c
***************
*** 15,23 ****
   *-------------------------------------------------------------------------
   */
  
  #include "postgres.h"
  
- #ifdef WIN32
  int
  pgpipe(int handles[2])
  {
--- 15,42 ----
   *-------------------------------------------------------------------------
   */
  
+ #ifdef WIN32
+ 
+ /*
+  * This pipe implementation is used in both the server and non-server programs.
+  * In the default case we run within the server and use the standard ereport
+  * error reporting.
+  * If the code runs in a non-server program (like pg_dump), then that program
+  * #defines an error routine and includes this .c file.
+  */
+ #ifndef PGPIPE_EREPORT
  #include "postgres.h"
+ #define PGPIPE_EREPORT pgpipe_ereport
+ static void
+ pgpipe_ereport(const char *fmt, ...)
+ {
+ 	va_list args;
+ 	va_start(args, fmt);
+ 	ereport(LOG, (errmsg_internal(fmt, args)));
+ 	va_end(args);
+ }
+ #endif
  
  int
  pgpipe(int handles[2])
  {
*************** pgpipe(int handles[2])
*** 29,35 ****
  
  	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not create socket: %ui", WSAGetLastError())));
  		return -1;
  	}
  
--- 48,54 ----
  
  	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
  	{
! 		PGPIPE_EREPORT("pgpipe could not create socket: %ui", WSAGetLastError());
  		return -1;
  	}
  
*************** pgpipe(int handles[2])
*** 39,76 ****
  	serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not bind: %ui", WSAGetLastError())));
  		closesocket(s);
  		return -1;
  	}
  	if (listen(s, 1) == SOCKET_ERROR)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not listen: %ui", WSAGetLastError())));
  		closesocket(s);
  		return -1;
  	}
  	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not getsockname: %ui", WSAGetLastError())));
  		closesocket(s);
  		return -1;
  	}
  	if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not create socket 2: %ui", WSAGetLastError())));
  		closesocket(s);
  		return -1;
  	}
  
  	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not connect socket: %ui", WSAGetLastError())));
  		closesocket(s);
  		return -1;
  	}
  	if ((handles[0] = accept(s, (SOCKADDR *) &serv_addr, &len)) == INVALID_SOCKET)
  	{
! 		ereport(LOG, (errmsg_internal("pgpipe could not accept socket: %ui", WSAGetLastError())));
  		closesocket(handles[1]);
  		handles[1] = INVALID_SOCKET;
  		closesocket(s);
--- 58,95 ----
  	serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
  	{
! 		PGPIPE_EREPORT("pgpipe could not bind: %ui", WSAGetLastError());
  		closesocket(s);
  		return -1;
  	}
  	if (listen(s, 1) == SOCKET_ERROR)
  	{
! 		PGPIPE_EREPORT("pgpipe could not listen: %ui", WSAGetLastError());
  		closesocket(s);
  		return -1;
  	}
  	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
  	{
! 		PGPIPE_EREPORT("pgpipe could not getsockname: %ui", WSAGetLastError());
  		closesocket(s);
  		return -1;
  	}
  	if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
  	{
! 		PGPIPE_EREPORT("pgpipe could not create socket 2: %ui", WSAGetLastError());
  		closesocket(s);
  		return -1;
  	}
  
  	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
  	{
! 		PGPIPE_EREPORT("pgpipe could not connect socket: %ui", WSAGetLastError());
  		closesocket(s);
  		return -1;
  	}
  	if ((handles[0] = accept(s, (SOCKADDR *) &serv_addr, &len)) == INVALID_SOCKET)
  	{
! 		PGPIPE_EREPORT("pgpipe could not accept socket: %ui", WSAGetLastError());
  		closesocket(handles[1]);
  		handles[1] = INVALID_SOCKET;
  		closesocket(s);
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index 033fb1e..00501de 100644
*** a/src/bin/pg_dump/Makefile
--- b/src/bin/pg_dump/Makefile
*************** override CPPFLAGS := -I$(libpq_srcdir) $
*** 20,26 ****
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_files.o pg_backup_null.o pg_backup_tar.o \
! 	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
--- 20,27 ----
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_files.o pg_backup_null.o pg_backup_tar.o \
! 	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o \
! 	parallel.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index d48b276..3bc224b 100644
*** a/src/bin/pg_dump/compress_io.c
--- b/src/bin/pg_dump/compress_io.c
***************
*** 54,59 ****
--- 54,60 ----
  
  #include "compress_io.h"
  #include "dumpmem.h"
+ #include "parallel.h"
  
  /*----------------------
   * Compressor API
*************** size_t
*** 181,186 ****
--- 182,190 ----
  WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
  				   const void *data, size_t dLen)
  {
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	switch (cs->comprAlg)
  	{
  		case COMPR_ALG_LIBZ:
*************** ReadDataFromArchiveZlib(ArchiveHandle *A
*** 350,355 ****
--- 354,362 ----
  	/* no minimal chunk size for zlib */
  	while ((cnt = readF(AH, &buf, &buflen)))
  	{
+ 		/* Are we aborting? */
+ 		checkAborting(AH);
+ 
  		zp->next_in = (void *) buf;
  		zp->avail_in = cnt;
  
*************** ReadDataFromArchiveNone(ArchiveHandle *A
*** 410,415 ****
--- 417,425 ----
  
  	while ((cnt = readF(AH, &buf, &buflen)))
  	{
+ 		/* Are we aborting? */
+ 		checkAborting(AH);
+ 
  		ahwrite(buf, 1, cnt, AH);
  	}
  
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index 3493e39..352daba 100644
*** a/src/bin/pg_dump/dumputils.c
--- b/src/bin/pg_dump/dumputils.c
***************
*** 16,21 ****
--- 16,22 ----
  
  #include <ctype.h>
  
+ #include "dumpmem.h"
  #include "dumputils.h"
  #include "pg_backup.h"
  
*************** static bool parseAclItem(const char *ite
*** 36,41 ****
--- 37,43 ----
  static char *copyAclUserName(PQExpBuffer output, char *input);
  static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
  	   const char *subname);
+ static PQExpBuffer getThreadLocalPQExpBuffer(void);
  
  #ifdef WIN32
  static bool parallel_init_done = false;
*************** init_parallel_dump_utils(void)
*** 55,69 ****
  }
  
  /*
!  *	Quotes input string if it's not a legitimate SQL identifier as-is.
!  *
!  *	Note that the returned string must be used before calling fmtId again,
!  *	since we re-use the same return buffer each time.  Non-reentrant but
!  *	reduces memory leakage. (On Windows the memory leakage will be one buffer
!  *	per thread, which is at least better than one per call).
   */
! const char *
! fmtId(const char *rawid)
  {
  	/*
  	 * The Tls code goes awry if we use a static var, so we provide for both
--- 57,67 ----
  }
  
  /*
!  * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
!  * will be one buffer per thread, which is at least better than one per call).
   */
! static PQExpBuffer
! getThreadLocalPQExpBuffer(void)
  {
  	/*
  	 * The Tls code goes awry if we use a static var, so we provide for both
*************** fmtId(const char *rawid)
*** 72,80 ****
  	static PQExpBuffer s_id_return = NULL;
  	PQExpBuffer id_return;
  
- 	const char *cp;
- 	bool		need_quotes = false;
- 
  #ifdef WIN32
  	if (parallel_init_done)
  		id_return = (PQExpBuffer) TlsGetValue(tls_index);		/* 0 when not set */
--- 70,75 ----
*************** fmtId(const char *rawid)
*** 101,109 ****
  #else
  		s_id_return = id_return;
  #endif
- 
  	}
  
  	/*
  	 * These checks need to match the identifier production in scan.l. Don't
  	 * use islower() etc.
--- 96,120 ----
  #else
  		s_id_return = id_return;
  #endif
  	}
  
+ 	return id_return;
+ }
+ 
+ /*
+  *	Quotes input string if it's not a legitimate SQL identifier as-is.
+  *
+  *	Note that the returned string must be used before calling fmtId again,
+  *	since we re-use the same return buffer each time.
+  */
+ const char *
+ fmtId(const char *rawid)
+ {
+ 	PQExpBuffer id_return = getThreadLocalPQExpBuffer();
+ 
+ 	const char *cp;
+ 	bool		need_quotes = false;
+ 
  	/*
  	 * These checks need to match the identifier production in scan.l. Don't
  	 * use islower() etc.
*************** fmtId(const char *rawid)
*** 171,176 ****
--- 182,226 ----
  	return id_return->data;
  }
  
+ /*
+  * fmtQualifiedId - convert a qualified name to the proper format for
+  * the source database.
+  *
+  * Like fmtId, use the result before calling again.
+  */
+ const char *
+ fmtQualifiedId(const char *schema, const char *id, int remoteVersion)
+ {
+ 	PQExpBuffer id_return;
+ 	/*
+ 	 * We're using the same PQExpBuffer as fmtId(), that's why we first get all
+ 	 * the values from fmtId and then return them appended in the PQExpBuffer.
+ 	 * The reason we still use the PQExpBuffer to return the string is just for
+ 	 * ease of use in the caller, that doesn't have to free the string
+ 	 * explicitly.
+ 	 */
+ 	char	   *schemaBuf, *idBuf;
+ 
+ 	/* Suppress schema name if fetching from pre-7.3 DB */
+ 	if (remoteVersion >= 70300 && schema && *schema)
+ 	{
+ 		schemaBuf = pg_strdup(fmtId(schema));
+ 	} else
+ 		schemaBuf = pg_strdup("");
+ 
+ 	idBuf = pg_strdup(fmtId(id));
+ 
+ 	/* this will reset the PQExpBuffer */
+ 	id_return = getThreadLocalPQExpBuffer();
+ 	appendPQExpBuffer(id_return, "%s%s%s",
+ 								 schemaBuf,
+ 								 strlen(schemaBuf) > 0 ? "." : "",
+ 								 idBuf);
+ 	free(schemaBuf);
+ 	free(idBuf);
+ 
+ 	return id_return->data;
+ }
  
  /*
   * Convert a string value to an SQL string literal and append it to
diff --git a/src/bin/pg_dump/dumputils.h b/src/bin/pg_dump/dumputils.h
index b4cf730..060c95d 100644
*** a/src/bin/pg_dump/dumputils.h
--- b/src/bin/pg_dump/dumputils.h
*************** extern const char *progname;
*** 24,29 ****
--- 24,30 ----
  
  extern void init_parallel_dump_utils(void);
  extern const char *fmtId(const char *identifier);
+ extern const char *fmtQualifiedId(const char *schema, const char *id, int remoteVersion);
  extern void appendStringLiteral(PQExpBuffer buf, const char *str,
  					int encoding, bool std_strings);
  extern void appendStringLiteralConn(PQExpBuffer buf, const char *str,
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index ...bcde24c .
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 0 ****
--- 1,1289 ----
+ /*-------------------------------------------------------------------------
+  *
+  * parallel.c
+  *
+  *	Parallel support for the pg_dump archiver
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *	The author is not responsible for loss or damages that may
+  *	result from its use.
+  *
+  * IDENTIFICATION
+  *		src/bin/pg_dump/parallel.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "pg_backup_db.h"
+ 
+ #include "dumpmem.h"
+ #include "dumputils.h"
+ #include "parallel.h"
+ 
+ #ifndef WIN32
+ #include <sys/types.h>
+ #include <sys/wait.h>
+ #include "signal.h"
+ #include <unistd.h>
+ #include <fcntl.h>
+ #endif
+ 
+ #define PIPE_READ							0
+ #define PIPE_WRITE							1
+ #define SHUTDOWN_GRACE_PERIOD				(500)
+ 
+ /* file-scope variables */
+ #ifdef WIN32
+ static unsigned int	tMasterThreadId = 0;
+ static HANDLE		termEvent = INVALID_HANDLE_VALUE;
+ #else
+ static volatile sig_atomic_t wantAbort = 0;
+ static bool aborting = false;
+ #endif
+ 
+ /*
+  * The parallel error handler is called for any die_horribly() in a child or master process.
+  * It then takes control over shutting down the rest of the gang.
+  */
+ void (*volatile vparallel_error_handler)(ArchiveHandle *AH, const char *modulename,
+ 								const char *fmt, va_list ap)
+ 						__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0))) = NULL;
+ 
+ /* The actual implementation of the error handler function */
+ static void vparallel_error_handler_imp(ArchiveHandle *AH, const char *modulename,
+ 										const char *fmt, va_list ap)
+ 								__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
+ 
+ static const char *modulename = gettext_noop("parallel archiver");
+ 
+ static int ShutdownConnection(PGconn **conn);
+ 
+ static void WaitForTerminatingWorkers(ArchiveHandle *AH, ParallelState *pstate);
+ static void ShutdownWorkersHard(ArchiveHandle *AH, ParallelState *pstate);
+ static void ShutdownWorkersSoft(ArchiveHandle *AH, ParallelState *pstate, bool do_wait);
+ static void PrintStatus(ParallelState *pstate);
+ static bool HasEveryWorkerTerminated(ParallelState *pstate);
+ 
+ static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
+ static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+ static char *getMessageFromMaster(ArchiveHandle *AH, int pipefd[2]);
+ static void sendMessageToMaster(ArchiveHandle *AH, int pipefd[2], const char *str);
+ static char *getMessageFromWorker(ArchiveHandle *AH, ParallelState *pstate,
+ 								  bool do_wait, int *worker);
+ static void sendMessageToWorker(ArchiveHandle *AH, ParallelState *pstate,
+ 							    int worker, const char *str);
+ static char *readMessageFromPipe(int fd, bool do_wait);
+ 
+ static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+ 						RestoreOptions *ropt);
+ 
+ #define messageStartsWith(msg, prefix) \
+ 	(strncmp(msg, prefix, strlen(prefix)) == 0)
+ #define messageEquals(msg, pattern) \
+ 	(strcmp(msg, pattern) == 0)
+ 
+ /* architecture dependent #defines */
+ #ifdef WIN32
+ 	/* WIN32 */
+ 	/* pgpipe implemented in src/backend/port/pipe.c */
+ 	#define setnonblocking(fd) \
+ 		do { u_long mode = 1; \
+ 			 ioctlsocket((fd), FIONBIO, &mode); \
+ 		} while(0);
+ 	#define setblocking(fd) \
+ 		do { u_long mode = 0; \
+ 			 ioctlsocket((fd), FIONBIO, &mode); \
+ 		} while(0);
+ #else /* UNIX */
+ 	#define setnonblocking(fd) \
+ 		do { long flags = (long) fcntl((fd), F_GETFL, 0); \
+ 			fcntl(fd, F_SETFL, flags | O_NONBLOCK); \
+ 		} while(0);
+ 	#define setblocking(fd) \
+ 		do { long flags = (long) fcntl((fd), F_GETFL, 0); \
+ 			fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); \
+ 		} while(0);
+ #endif
+ 
+ #ifdef WIN32
+ /*
+  * On Windows, source in the pgpipe implementation from the backend and provide
+  * an own error reporting routine, the backend usually uses ereport() for that.
+  */
+ static void pgdump_pgpipe_ereport(const char* fmt, ...);
+ #define PGPIPE_EREPORT pgdump_pgpipe_ereport
+ #include "../../backend/port/pipe.c"
+ static void
+ pgdump_pgpipe_ereport(const char* fmt, ...)
+ {
+ 	va_list args;
+ 	va_start(args, fmt);
+ 	vwrite_msg("pgpipe", fmt, args);
+ 	va_end(args);
+ }
+ #endif
+ 
+ static int
+ #ifdef WIN32
+ GetSlotOfThread(ParallelState *pstate, unsigned int threadId)
+ #else
+ GetSlotOfProcess(ParallelState *pstate, pid_t pid)
+ #endif
+ {
+ 	int i;
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ #ifdef WIN32
+ 		if (pstate->parallelSlot[i].threadId == threadId)
+ #else
+ 		if (pstate->parallelSlot[i].pid == pid)
+ #endif
+ 			return i;
+ 
+ 	Assert(false);
+ 	return NO_SLOT;
+ }
+ 
+ enum escrow_action { GET, SET };
+ static void
+ parallel_error_handler_escrow_data(enum escrow_action act, ParallelState *pstate)
+ {
+ 	static ParallelState *s_pstate = NULL;
+ 
+ 	if (act == SET)
+ 		s_pstate = pstate;
+ 	else
+ 		*pstate = *s_pstate;
+ }
+ 
+ static void
+ vparallel_error_handler_imp(ArchiveHandle *AH,
+ 							const char *modulename,
+ 							const char *fmt, va_list ap)
+ {
+ 	ParallelState pstate;
+ 	char		buf[512];
+ 	int			pipefd[2];
+ 	int			i;
+ 
+ 	if (AH->is_clone)
+ 	{
+ 		/* we are the child, get the message out to the parent */
+ 		parallel_error_handler_escrow_data(GET, &pstate);
+ #ifdef WIN32
+ 		i = GetSlotOfThread(&pstate, GetCurrentThreadId());
+ #else
+ 		i = GetSlotOfProcess(&pstate, getpid());
+ #endif
+ 		if (pstate.parallelSlot[i].inErrorHandling)
+ 			return;
+ 
+ 		pstate.parallelSlot[i].inErrorHandling = true;
+ 
+ 		pipefd[PIPE_READ] = pstate.parallelSlot[i].pipeRevRead;
+ 		pipefd[PIPE_WRITE] = pstate.parallelSlot[i].pipeRevWrite;
+ 
+ 		strcpy(buf, "ERROR ");
+ 		vsnprintf(buf + strlen("ERROR "), sizeof(buf) - strlen("ERROR "), fmt, ap);
+ 
+ 		sendMessageToMaster(AH, pipefd, buf);
+ 		if (AH->connection)
+ 			ShutdownConnection(&(AH->connection));
+ #ifdef WIN32
+ 		ExitThread(1);
+ #else
+ 		exit(1);
+ #endif
+ 	}
+ 	else
+ 	{
+ #ifndef WIN32
+ 		/*
+ 		 * We are the parent. We need the handling variable to see if we're
+ 		 * already handling an error.
+ 		 */
+ 		if (aborting)
+ 			return;
+ 		aborting = 1;
+ 
+ 		signal(SIGPIPE, SIG_IGN);
+ #endif
+ 		/*
+ 		 * Note that technically we're using a new pstate here, the old one
+ 		 * is copied over and then the old one isn't updated anymore. Only
+ 		 * the new one is, which is okay because control will never return
+ 		 * from this function.
+ 		 */
+ 		parallel_error_handler_escrow_data(GET, &pstate);
+ 		ShutdownWorkersHard(AH, &pstate);
+ 		/* Terminate own connection */
+ 		if (AH->connection)
+ 			ShutdownConnection(&(AH->connection));
+ 		vwrite_msg(NULL, fmt, ap);
+ 		exit(1);
+ 	}
+ 	Assert(false);
+ }
+ 
+ /*
+  * If we have one worker that terminates for some reason, we'd like the other
+  * threads to terminate as well (and not finish with their 70 GB table dump
+  * first...). Now in UNIX we can just kill these processes, and let the signal
+  * handler set wantAbort to 1 or more. In Windows we set a termEvent and this
+  * serves as the signal for everyone to terminate.
+  */
+ void
+ checkAborting(ArchiveHandle *AH)
+ {
+ #ifdef WIN32
+ 	if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
+ #else
+ 	if (wantAbort)
+ #endif
+ 		/*
+ 		 * Terminate, this error will actually never show up somewhere
+ 		 * because if termEvent/wantAbort is set, then we are already in the
+ 		 * process of going down and already have a reason why we're
+ 		 * terminating.
+ 		 */
+ 		die_horribly(AH, modulename, "worker is terminating");
+ }
+ 
+ /*
+  * A select loop that repeats calling select until a descriptor in the read set
+  * becomes readable. On Windows we have to check for the termination event from
+  * time to time, on Unix we can just block forever.
+  */
+ #ifdef WIN32
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ 	int			i;
+ 	fd_set		saveSet = *workerset;
+ 
+ 	/* should always be the master */
+ 	Assert(tMasterThreadId == GetCurrentThreadId());
+ 
+ 	for (;;)
+ 	{
+ 		/*
+ 		 * sleep a quarter of a second before checking if we should
+ 		 * terminate.
+ 		 */
+ 		struct timeval tv = { 0, 250000 };
+ 		*workerset = saveSet;
+ 		i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+ 
+ 		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ 			continue;
+ 		if (i)
+ 			break;
+ 	}
+ 
+ 	return i;
+ }
+ #else /* UNIX */
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ 	int		i;
+ 
+ 	fd_set saveSet = *workerset;
+ 	for (;;)
+ 	{
+ 		*workerset = saveSet;
+ 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+ 		Assert(i != 0);
+ 		if (wantAbort && !aborting) {
+ 			return NO_SLOT;
+ 		}
+ 		if (i < 0 && errno == EINTR)
+ 			continue;
+ 		break;
+ 	}
+ 
+ 	return i;
+ }
+ #endif
+ 
+ /*
+  * Shut down any remaining workers, this has an implicit do_wait == true
+  */
+ static void
+ ShutdownWorkersHard(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ #ifdef WIN32
+ 	/* The workers monitor this event via checkAborting(). */
+ 	SetEvent(termEvent);
+ #endif
+ 	/*
+ 	 * The fastest way we can make them terminate is when they are listening
+ 	 * for new commands and we just tell them to terminate.
+ 	 */
+ 	ShutdownWorkersSoft(AH, pstate, false);
+ 
+ #ifndef WIN32
+ 	{
+ 		int i;
+ 		for (i = 0; i < pstate->numWorkers; i++)
+ 			kill(pstate->parallelSlot[i].pid, SIGTERM);
+ 
+ 		/* Reset our signal handler, if we get signaled again, terminate normally */
+ 		signal(SIGINT, SIG_DFL);
+ 		signal(SIGTERM, SIG_DFL);
+ 		signal(SIGQUIT, SIG_DFL);
+ 	}
+ #endif
+ 
+ 	WaitForTerminatingWorkers(AH, pstate);
+ }
+ 
+ static void
+ WaitForTerminatingWorkers(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	while (!HasEveryWorkerTerminated(pstate))
+ 	{
+ 		int			worker;
+ 		char	   *msg;
+ 
+ 		PrintStatus(pstate);
+ 
+ 		msg = getMessageFromWorker(AH, pstate, true, &worker);
+ 		if (!msg || messageStartsWith(msg, "ERROR "))
+ 			pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
+ 		if (msg)
+ 			free(msg);
+ 	}
+ 	Assert(HasEveryWorkerTerminated(pstate));
+ }
+ 
+ #ifndef WIN32
+ /* Signal handling (UNIX only) */
+ static void
+ sigTermHandler(int signum)
+ {
+ 	wantAbort++;
+ }
+ #endif
+ 
+ /*
+  * This function is called by both UNIX and Windows variants to set up a
+  * worker process.
+  */
+ static void
+ SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+ 			RestoreOptions *ropt)
+ {
+ 	/*
+ 	 * In dump mode (pg_dump) this calls _SetupWorker() as defined in
+ 	 * pg_dump.c, while in restore mode (pg_restore) it calls _SetupWorker() as
+ 	 * defined in pg_restore.c.
+      *
+ 	 * We get the raw connection only for the reason that we can close it
+ 	 * properly when we shut down. This happens only that way when it is
+ 	 * brought down because of an error.
+ 	 */
+ 	_SetupWorker((Archive *) AH, ropt);
+ 
+ 	Assert(AH->connection != NULL);
+ 
+ 	WaitForCommands(AH, pipefd);
+ 
+ 	closesocket(pipefd[PIPE_READ]);
+ 	closesocket(pipefd[PIPE_WRITE]);
+ }
+ 
+ #ifdef WIN32
+ /*
+  * On Windows the _beginthreadex() function allows us to pass one parameter.
+  * Since we need to pass a few values however, we define a structure here
+  * and then pass a pointer to such a structure in _beginthreadex().
+  */
+ typedef struct {
+ 	ArchiveHandle  *AH;
+ 	RestoreOptions *ropt;
+ 	int				worker;
+ 	int				pipeRead;
+ 	int				pipeWrite;
+ } WorkerInfo;
+ 
+ static unsigned __stdcall
+ init_spawned_worker_win32(WorkerInfo *wi)
+ {
+ 	ArchiveHandle *AH;
+ 	int pipefd[2] = { wi->pipeRead, wi->pipeWrite };
+ 	int worker = wi->worker;
+ 	RestoreOptions *ropt = wi->ropt;
+ 
+ 	AH = CloneArchive(wi->AH);
+ 
+ 	free(wi);
+ 	SetupWorker(AH, pipefd, worker, ropt);
+ 
+ 	DeCloneArchive(AH);
+ 	_endthreadex(0);
+ 	return 0;
+ }
+ #endif
+ 
+ /*
+  * This function starts the parallel dump or restore by spawning off the worker
+  * processes in both Unix and Windows. For Windows, it creates a number of
+  * threads while it does a fork() on Unix.
+  */
+ ParallelState *
+ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
+ {
+ 	ParallelState  *pstate;
+ 	int				i;
+ 	const size_t	slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
+ 
+ 	Assert(AH->public.numWorkers > 0);
+ 
+ 	/* Ensure stdio state is quiesced before forking */
+ 	fflush(NULL);
+ 
+ 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
+ 
+ 	pstate->numWorkers = AH->public.numWorkers;
+ 	pstate->parallelSlot = NULL;
+ 
+ 	if (AH->public.numWorkers == 1)
+ 		return pstate;
+ 
+ 	pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
+ 	memset((void *) pstate->parallelSlot, 0, slotSize);
+ 
+ 	parallel_error_handler_escrow_data(SET, pstate);
+ 	vparallel_error_handler = vparallel_error_handler_imp;
+ 
+ #ifdef WIN32
+ 	tMasterThreadId = GetCurrentThreadId();
+ 	termEvent = CreateEvent(NULL, true, false, "Terminate");
+ #else
+ 	signal(SIGTERM, sigTermHandler);
+ 	signal(SIGINT, sigTermHandler);
+ 	signal(SIGQUIT, sigTermHandler);
+ #endif
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ #ifdef WIN32
+ 		WorkerInfo *wi;
+ 		uintptr_t	handle;
+ #else
+ 		pid_t		pid;
+ #endif
+ 		int			pipeMW[2], pipeWM[2];
+ 
+ 		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
+ 			die_horribly(AH, modulename, "Cannot create communication channels: %s",
+ 						 strerror(errno));
+ 
+ 		pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ #ifdef WIN32
+ 		/* Allocate a new structure for every worker */
+ 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
+ 
+ 		wi->ropt = ropt;
+ 		wi->worker = i;
+ 		wi->AH = AH;
+ 		wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+ 		wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+ 
+ 		handle = _beginthreadex(NULL, 0, &init_spawned_worker_win32,
+ 								wi, 0, &(pstate->parallelSlot[i].threadId));
+ 		pstate->parallelSlot[i].hThread = handle;
+ #else
+ 		pid = fork();
+ 		if (pid == 0)
+ 		{
+ 			/* we are the worker */
+ 			int j;
+ 			int pipefd[2] = { pipeMW[PIPE_READ], pipeWM[PIPE_WRITE] };
+ 
+ 			/*
+ 			 * Store the fds for the reverse communication in pstate. Actually
+ 			 * we only use this in case of an error and don't use pstate
+ 			 * otherwise in the worker process. On Windows we write to the
+ 			 * global pstate, in Unix we write to our process-local copy but
+ 			 * that's also where we'd retrieve this information back from.
+ 			 */
+ 			pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
+ 			pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
+ 			pstate->parallelSlot[i].pid = getpid();
+ 
+ 			/*
+ 			 * Call CloneArchive on Unix as well even though technically we
+ 			 * don't need to because fork() gives us a copy in our own address space
+ 			 * already. But CloneArchive resets the state information, sets is_clone
+ 			 * and also clones the database connection (for parallel dump)
+ 			 * which all seems kinda helpful.
+ 			 */
+ 			AH = CloneArchive(AH);
+ 
+ #ifdef HAVE_SETSID
+ 			/*
+ 			 * If we can, we try to make each process the leader of its own
+ 			 * process group. The reason is that if you hit Ctrl-C and they are
+ 			 * all in the same process group, any termination sequence is
+ 			 * possible, because every process will receive the signal. What
+ 			 * often happens is that a worker receives the signal, terminates
+ 			 * and the master detects that one of the workers had a problem,
+ 			 * even before acting on its own signal. That's still okay because
+ 			 * everyone still terminates but it looks a bit weird.
+ 			 *
+ 			 * With setsid() however, a Ctrl-C is only sent to the master and
+ 			 * he can then cascade it to the worker processes.
+ 			 */
+ 			setsid();
+ #endif
+ 
+ 			closesocket(pipeWM[PIPE_READ]);		/* close read end of Worker -> Master */
+ 			closesocket(pipeMW[PIPE_WRITE]);	/* close write end of Master -> Worker */
+ 
+ 			/*
+ 			 * Close all inherited fds for communication of the master with
+ 			 * the other workers.
+ 			 */
+ 			for (j = 0; j < i; j++)
+ 			{
+ 				closesocket(pstate->parallelSlot[j].pipeRead);
+ 				closesocket(pstate->parallelSlot[j].pipeWrite);
+ 			}
+ 
+ 			SetupWorker(AH, pipefd, i, ropt);
+ 
+ 			exit(0);
+ 		}
+ 		else if (pid < 0)
+ 			/* fork failed */
+ 			die_horribly(AH, modulename,
+ 						 "could not create worker process: %s\n",
+ 						 strerror(errno));
+ 
+ 		/* we are the Master, pid > 0 here */
+ 		Assert(pid > 0);
+ 		closesocket(pipeMW[PIPE_READ]);		/* close read end of Master -> Worker */
+ 		closesocket(pipeWM[PIPE_WRITE]);	/* close write end of Worker -> Master */
+ 
+ 		pstate->parallelSlot[i].pid = pid;
+ #endif
+ 
+ 		pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+ 		pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+ 
+ 		pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
+ 		pstate->parallelSlot[i].args->AH = AH;
+ 		pstate->parallelSlot[i].args->te = NULL;
+ 		pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ 	}
+ 
+ 	return pstate;
+ }
+ 
+ /*
+  * Tell all of our workers to terminate.
+  *
+  * Pretty straightforward routine, first we tell everyone to terminate, then we
+  * listen to the workers' replies and finally close the sockets that we have
+  * used for communication.
+  */
+ void
+ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	int i;
+ 
+ 	if (pstate->numWorkers == 1)
+ 		return;
+ 
+ 	PrintStatus(pstate);
+ 	Assert(IsEveryWorkerIdle(pstate));
+ 
+ 	/* no hard shutdown, let workers exit by themselves and wait for them */
+ 	ShutdownWorkersSoft(AH, pstate, true);
+ 
+ 	PrintStatus(pstate);
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		closesocket(pstate->parallelSlot[i].pipeRead);
+ 		closesocket(pstate->parallelSlot[i].pipeWrite);
+ 	}
+ 
+ 	vparallel_error_handler = NULL;
+ 
+ 	free(pstate->parallelSlot);
+ 	free(pstate);
+ }
+ 
+ 
+ /*
+  * The sequence is the following (for dump, similar for restore):
+  *
+  * Master                                   Worker
+  *
+  *                                          enters WaitForCommands()
+  * DispatchJobForTocEntry(...te...)
+  *
+  * [ Worker is IDLE ]
+  *
+  * arg = (MasterStartParallelItemPtr)()
+  * send: DUMP arg
+  *                                          receive: DUMP arg
+  *                                          str = (WorkerJobDumpPtr)(arg)
+  * [ Worker is WORKING ]                    ... gets te from arg ...
+  *                                          ... dump te ...
+  *                                          send: OK DUMP info
+  *
+  * In ListenToWorkers():
+  *
+  * [ Worker is FINISHED ]
+  * receive: OK DUMP info
+  * status = (MasterEndParallelItemPtr)(info)
+  *
+  * In ReapWorkerStatus(&ptr):
+  * *ptr = status;
+  * [ Worker is IDLE ]
+  */
+ void
+ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
+ 					   T_Action act)
+ {
+ 	int		worker;
+ 	char   *arg;
+ 
+ 	/* our caller makes sure that at least one worker is idle */
+ 	Assert(GetIdleWorker(pstate) != NO_SLOT);
+ 	worker = GetIdleWorker(pstate);
+ 	Assert(worker != NO_SLOT);
+ 
+ 	arg = (AH->MasterStartParallelItemPtr)(AH, te, act);
+ 
+ 	sendMessageToWorker(AH, pstate, worker, arg);
+ 
+ 	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
+ 	pstate->parallelSlot[worker].args->te = te;
+ 	PrintStatus(pstate);
+ }
+ 
+ static void
+ PrintStatus(ParallelState *pstate)
+ {
+ 	int			i;
+ 	printf("------Status------\n");
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		printf("Status of worker %d: ", i);
+ 		switch (pstate->parallelSlot[i].workerStatus)
+ 		{
+ 			case WRKR_IDLE:
+ 				printf("IDLE");
+ 				break;
+ 			case WRKR_WORKING:
+ 				printf("WORKING");
+ 				break;
+ 			case WRKR_FINISHED:
+ 				printf("FINISHED");
+ 				break;
+ 			case WRKR_TERMINATED:
+ 				printf("TERMINATED");
+ 				break;
+ 		}
+ 		printf("\n");
+ 	}
+ 	printf("------------\n");
+ }
+ 
+ 
+ /*
+  * Find the first free parallel slot (if any).
+  */
+ int
+ GetIdleWorker(ParallelState *pstate)
+ {
+ 	int			i;
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
+ 			return i;
+ 	return NO_SLOT;
+ }
+ 
+ /*
+  * Return true iff every worker process is in the WRKR_TERMINATED state.
+  */
+ static bool
+ HasEveryWorkerTerminated(ParallelState *pstate)
+ {
+ 	int			i;
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ 			return false;
+ 	return true;
+ }
+ 
+ /*
+  * Return true iff every worker is in the WRKR_IDLE state.
+  */
+ bool
+ IsEveryWorkerIdle(ParallelState *pstate)
+ {
+ 	int			i;
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
+ 			return false;
+ 	return true;
+ }
+ 
+ /*
+  * Performs a soft shutdown and optionally waits for every worker to terminate.
+  * A soft shutdown sends a "TERMINATE" message to every worker only.
+  */
+ static void
+ ShutdownWorkersSoft(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
+ {
+ 	int			i;
+ 
+ 	/* soft shutdown */
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ 		{
+ 			sendMessageToWorker(AH, pstate, i, "TERMINATE");
+ 			pstate->parallelSlot[i].workerStatus = WRKR_WORKING;
+ 		}
+ 	}
+ 
+ 	if (!do_wait)
+ 		return;
+ 
+ 	WaitForTerminatingWorkers(AH, pstate);
+ }
+ 
+ /*
+  * This routine does some effort to gracefully shut down the database
+  * connection, but not too much, since the parent is waiting for the workers to
+  * terminate. The cancellation of the database connection is done in an
+  * asynchronous way, so we need to wait a bit after sending PQcancel().
+  * Calling PQcancel() first and then and PQfinish() immediately afterwards
+  * would still cancel an active connection because most likely the PQfinish()
+  * has not yet been processed.
+  *
+  * On Windows, when the master process terminates the childrens' database
+  * connections it forks off new threads that do nothing else than close the
+  * connection. These threads only live as long as they are in this function.
+  * And since a thread needs to return a value this function needs to as well.
+  * Hence this function returns an (unsigned) int.
+  */
+ static int
+ ShutdownConnection(PGconn **conn)
+ {
+ 	PGcancel   *cancel;
+ 	char		errbuf[1];
+ 	int			i;
+ 
+ 	Assert(conn != NULL);
+ 	Assert(*conn != NULL);
+ 
+ 	if ((cancel = PQgetCancel(*conn)))
+ 	{
+ 		PQcancel(cancel, errbuf, sizeof(errbuf));
+ 		PQfreeCancel(cancel);
+ 	}
+ 
+ 	/* give the server a little while */
+ 	for (i = 0; i < 10; i++)
+ 	{
+ 		PQconsumeInput(*conn);
+ 		if (!PQisBusy(*conn))
+ 			break;
+ 		pg_usleep((SHUTDOWN_GRACE_PERIOD / 10) * 1000);
+ 	}
+ 
+ 	PQfinish(*conn);
+ 	*conn = NULL;
+ 	return 0;
+ }
+ 
+ /*
+  * One danger of the parallel backup is a possible deadlock:
+  *
+  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
+  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
+  *    because the master holds a conflicting ACCESS SHARE lock).
+  * 3) The worker process also requests an ACCESS SHARE lock to read the table.
+  *    The worker's not granted that lock but is enqueued behind the ACCESS
+  *    EXCLUSIVE lock request.
+  *
+  * Now what we do here is to just request a lock in ACCESS SHARE but with
+  * NOWAIT in the worker prior to touching the table. If we don't get the lock,
+  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
+  * are good to just fail the whole backup because we have detected a deadlock.
+  */
+ static void
+ lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	const char *qualId;
+ 	PQExpBuffer query = createPQExpBuffer();
+ 	PGresult   *res;
+ 
+ 	Assert(AH->format == archDirectory);
+ 	Assert(strcmp(te->desc, "BLOBS") != 0);
+ 
+ 	/*
+ 	 * We are only locking tables and thus we can peek at the DROP command
+ 	 * which contains the fully qualified name.
+ 	 *
+ 	 * Additionally, strlen("DROP") == strlen("LOCK").
+ 	 */
+ 	appendPQExpBuffer(query, "SELECT pg_namespace.nspname,"
+ 							 "       pg_class.relname "
+ 							 "  FROM pg_class "
+ 							 "  JOIN pg_namespace on pg_namespace.oid = relnamespace "
+ 							 " WHERE pg_class.oid = %d", te->catalogId.oid);
+ 
+ 	res = PQexec(AH->connection, query->data);
+ 
+ 	if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		die_horribly(AH, modulename, "could not get relation name for oid %d: %s",
+ 					 te->catalogId.oid, PQerrorMessage(AH->connection));
+ 
+ 	resetPQExpBuffer(query);
+ 
+ 	qualId = fmtQualifiedId(PQgetvalue(res, 0, 0),
+ 							PQgetvalue(res, 0, 1),
+ 							AH->public.remoteVersion);
+ 
+ 	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT", qualId);
+ 	PQclear(res);
+ 
+ 	res = PQexec(AH->connection, query->data);
+ 
+ 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ 		die_horribly(AH, modulename, "could not obtain lock on relation \"%s\". This "
+ 					 "usually means that someone requested an ACCESS EXCLUSIVE lock "
+ 					 "on the table after the pg_dump parent process has gotten the "
+ 					 "initial ACCESS SHARE lock on the table.", qualId);
+ 
+ 	PQclear(res);
+ 	destroyPQExpBuffer(query);
+ }
+ 
+ /*
+  * That's the main routine for the worker.
+  * When it starts up it enters this routine and waits for commands from the
+  * master process. After having processed a command it comes back to here to
+  * wait for the next command. Finally it will receive a TERMINATE command and
+  * exit.
+  */
+ static void
+ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
+ {
+ 	char	   *command;
+ 	DumpId		dumpId;
+ 	int			nBytes;
+ 	char	   *str = NULL;
+ 	TocEntry   *te;
+ 
+ 	for(;;)
+ 	{
+ 		command = getMessageFromMaster(AH, pipefd);
+ 
+ 		if (messageStartsWith(command, "DUMP "))
+ 		{
+ 			Assert(AH->format == archDirectory);
+ 			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
+ 			Assert(nBytes == strlen(command) - strlen("DUMP "));
+ 
+ 			te = getTocEntryByDumpId(AH, dumpId);
+ 			Assert(te != NULL);
+ 
+ 			/*
+ 			 * Lock the table but with NOWAIT. Note that the parent is already
+ 			 * holding a lock. If we cannot acquire another ACCESS SHARE MODE
+ 			 * lock, then somebody else has requested an exclusive lock in the
+ 			 * meantime.  lockTableNoWait dies in this case to prevent a
+ 			 * deadlock.
+ 			 */
+ 			if (strcmp(te->desc, "BLOBS") != 0)
+ 				lockTableNoWait(AH, te);
+ 
+ 			/*
+ 			 * The message we return here has been pg_malloc()ed and we are
+ 			 * responsible for free()ing it.
+ 			 */
+ 			str = (AH->WorkerJobDumpPtr)(AH, te);
+ 			Assert(AH->connection != NULL);
+ 			sendMessageToMaster(AH, pipefd, str);
+ 			free(str);
+ 		}
+ 		else if (messageStartsWith(command, "RESTORE "))
+ 		{
+ 			Assert(AH->format == archDirectory || AH->format == archCustom);
+ 			Assert(AH->connection != NULL);
+ 
+ 			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
+ 			Assert(nBytes == strlen(command) - strlen("RESTORE "));
+ 
+ 			te = getTocEntryByDumpId(AH, dumpId);
+ 			Assert(te != NULL);
+ 			/*
+ 			 * The message we return here has been pg_malloc()ed and we are
+ 			 * responsible for free()ing it.
+ 			 */
+ 			str = (AH->WorkerJobRestorePtr)(AH, te);
+ 			Assert(AH->connection != NULL);
+ 			sendMessageToMaster(AH, pipefd, str);
+ 			free(str);
+ 		}
+ 		else if (messageEquals(command, "TERMINATE"))
+ 		{
+ 			PQfinish(AH->connection);
+ 			AH->connection = NULL;
+ 			return;
+ 		}
+ 		else
+ 		{
+ 			die_horribly(AH, modulename,
+ 						 "Unknown command on communication channel: %s", command);
+ 		}
+ 	}
+ }
+ 
+ /*
+  * Note the status change:
+  *
+  * DispatchJobForTocEntry		WRKR_IDLE -> WRKR_WORKING
+  * ListenToWorkers				WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
+  * ReapWorkerStatus				WRKR_FINISHED -> WRKR_IDLE
+  *
+  * Just calling ReapWorkerStatus() when all workers are working might or might
+  * not give you an idle worker because you need to call ListenToWorkers() in
+  * between and only thereafter ReapWorkerStatus(). This is necessary in order to
+  * get and deal with the status (=result) of the worker's execution.
+  */
+ void
+ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
+ {
+ 	int			worker;
+ 	char	   *msg;
+ 
+ 	msg = getMessageFromWorker(AH, pstate, do_wait, &worker);
+ 
+ 	if (!msg)
+ 	{
+ 		Assert(!do_wait);
+ 		return;
+ 	}
+ 
+ 	if (messageStartsWith(msg, "OK "))
+ 	{
+ 		char	   *statusString;
+ 		TocEntry   *te;
+ 
+ 		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
+ 		te = pstate->parallelSlot[worker].args->te;
+ 		if (messageStartsWith(msg, "OK RESTORE "))
+ 		{
+ 			statusString = msg + strlen("OK RESTORE ");
+ 			pstate->parallelSlot[worker].status =
+ 				(AH->MasterEndParallelItemPtr)
+ 					(AH, te, statusString, ACT_RESTORE);
+ 		}
+ 		else if (messageStartsWith(msg, "OK DUMP "))
+ 		{
+ 			statusString = msg + strlen("OK DUMP ");
+ 			pstate->parallelSlot[worker].status =
+ 				(AH->MasterEndParallelItemPtr)
+ 					(AH, te, statusString, ACT_DUMP);
+ 		}
+ 		else
+ 			die_horribly(AH, modulename, "Invalid message received from worker: %s", msg);
+ 	}
+ 	else if (messageStartsWith(msg, "ERROR "))
+ 	{
+ 		Assert(AH->format == archDirectory || AH->format == archCustom);
+ 		pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
+ 		die_horribly(AH, modulename, "%s", msg + strlen("ERROR "));
+ 	}
+ 	else
+ 		die_horribly(AH, modulename, "Invalid message received from worker: %s", msg);
+ 
+ 	PrintStatus(pstate);
+ 
+ 	/* both Unix and Win32 return pg_malloc()ed space, so we free it */
+ 	free(msg);
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * This function is used to get the return value of a terminated worker
+  * process. If a process has terminated, its status is stored in *status and
+  * the id of the worker is returned.
+  */
+ int
+ ReapWorkerStatus(ParallelState *pstate, int *status)
+ {
+ 	int			i;
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
+ 		{
+ 			*status = pstate->parallelSlot[i].status;
+ 			pstate->parallelSlot[i].status = 0;
+ 			pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ 			PrintStatus(pstate);
+ 			return i;
+ 		}
+ 	}
+ 	return NO_SLOT;
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It looks for an idle worker process and only returns if there is one.
+  */
+ void
+ EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	int		ret_worker;
+ 	int		work_status;
+ 
+ 	for (;;)
+ 	{
+ 		int nTerm = 0;
+ 		while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+ 		{
+ 			if (work_status != 0)
+ 				die_horribly(AH, modulename, "Error processing a parallel work item.\n");
+ 
+ 			nTerm++;
+ 		}
+ 
+ 		/* We need to make sure that we have an idle worker before dispatching
+ 		 * the next item. If nTerm > 0 we already have that (quick check). */
+ 		if (nTerm > 0)
+ 			return;
+ 
+ 		/* explicit check for an idle worker */
+ 		if (GetIdleWorker(pstate) != NO_SLOT)
+ 			return;
+ 
+ 		/*
+ 		 * If we have no idle worker, read the result of one or more
+ 		 * workers and loop the loop to call ReapWorkerStatus() on them
+ 		 */
+ 		ListenToWorkers(AH, pstate, true);
+ 	}
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It waits for all workers to terminate.
+  */
+ void
+ EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	int			work_status;
+ 
+ 	if (!pstate || pstate->numWorkers == 1)
+ 		return;
+ 
+ 	/* Waiting for the remaining worker processes to finish */
+ 	while (!IsEveryWorkerIdle(pstate))
+ 	{
+ 		if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
+ 			ListenToWorkers(AH, pstate, true);
+ 		else if (work_status != 0)
+ 			die_horribly(AH, modulename, "Error processing a parallel work item");
+ 	}
+ }
+ 
+ /*
+  * This function is executed in the worker process.
+  *
+  * It returns the next message on the communication channel, blocking until it
+  * becomes available.
+  */
+ static char *
+ getMessageFromMaster(ArchiveHandle *AH, int pipefd[2])
+ {
+ 	return readMessageFromPipe(pipefd[PIPE_READ], true);
+ }
+ 
+ /*
+  * This function is executed in the worker process.
+  *
+  * It sends a message to the master on the communication channel.
+  */
+ static void
+ sendMessageToMaster(ArchiveHandle *AH, int pipefd[2], const char *str)
+ {
+ 	int			len = strlen(str) + 1;
+ 
+ 	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
+ 		die_horribly(AH, modulename,
+ 					 "Error writing to the communication channel: %s",
+ 					 strerror(errno));
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It returns the next message from the worker on the communication channel,
+  * optionally blocking (do_wait) until it becomes available.
+  *
+  * The id of the worker is returned in *worker.
+  */
+ static char *
+ getMessageFromWorker(ArchiveHandle *AH, ParallelState *pstate, bool do_wait, int *worker)
+ {
+ 	int			i;
+ 	fd_set		workerset;
+ 	int			maxFd = -1;
+ 	struct		timeval nowait = { 0, 0 };
+ 
+ 	FD_ZERO(&workerset);
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+ 			continue;
+ 		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
+ 		/* actually WIN32 ignores the first parameter to select()... */
+ 		if (pstate->parallelSlot[i].pipeRead > maxFd)
+ 			maxFd = pstate->parallelSlot[i].pipeRead;
+ 	}
+ 
+ 	if (do_wait)
+ 	{
+ 		i = select_loop(maxFd, &workerset);
+ 		Assert(i != 0);
+ 	}
+ 	else
+ 	{
+ 		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
+ 			return NULL;
+ 	}
+ 
+ #ifndef WIN32
+ 	if (wantAbort && !aborting)
+ 		die_horribly(AH, modulename, "terminated by user\n");
+ #endif
+ 
+ 	if (i < 0)
+ 	{
+ 		write_msg(NULL, "Error in ListenToWorkers(): %s", strerror(errno));
+ 		exit(1);
+ 	}
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		char	   *msg;
+ 
+ 		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
+ 			continue;
+ 
+ 		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead, false);
+ 		*worker = i;
+ 		return msg;
+ 	}
+ 	Assert(false);
+ 	return NULL;
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It sends a message to a certain worker on the communication channel.
+  */
+ static void
+ sendMessageToWorker(ArchiveHandle *AH, ParallelState *pstate, int worker, const char *str)
+ {
+ 	int			len = strlen(str) + 1;
+ 
+ 	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
+ 		die_horribly(AH, modulename,
+ 					 "Error writing to the communication channel: %s",
+ 					 strerror(errno));
+ }
+ 
+ /*
+  * The underlying function to read a message from the communication channel (fd)
+  * with optional blocking (do_wait).
+  */
+ static char *
+ readMessageFromPipe(int fd, bool do_wait)
+ {
+ 	char	   *msg;
+ 	int			msgsize, bufsize;
+ 	int			ret;
+ 
+ 	/*
+ 	 * The problem here is that we need to deal with several possibilites:
+ 	 * we could receive only a partial message or several messages at once.
+ 	 * The caller expects us to return exactly one message however.
+ 	 *
+ 	 * We could either read in as much as we can and keep track of what we
+ 	 * delivered back to the caller or we just read byte by byte. Once we see
+ 	 * (char) 0, we know that it's the message's end. This would be quite
+ 	 * inefficient for more data but since we are reading only on the command
+ 	 * channel, the performance loss does not seem worth the trouble of keeping
+ 	 * internal states for different file descriptors.
+ 	 */
+ 
+ 	bufsize = 64;  /* could be any number */
+ 	msg = (char *) pg_malloc(bufsize);
+ 
+ 	msgsize = 0;
+ 	for (;;)
+ 	{
+ 		Assert(msgsize <= bufsize);
+ 		/*
+ 		 * If we do non-blocking read, only set the channel non-blocking for
+ 		 * the very first character. We trust in our messages to be
+ 		 * \0-terminated, so if there is any character in the beginning, then
+ 		 * we read the message until we find a \0 somewhere, which indicates
+ 		 * the end of the message.
+ 		 */
+ 		if (msgsize == 0 && !do_wait) {
+ 			setnonblocking(fd);
+ 		}
+ 
+ 		ret = piperead(fd, msg + msgsize, 1);
+ 
+ 		if (msgsize == 0 && !do_wait)
+ 		{
+ 			int		saved_errno = errno;
+ 			setblocking(fd);
+ 			/* no data has been available */
+ 			if (ret < 0 && saved_errno == EAGAIN)
+ 				return NULL;
+ 		}
+ 
+ 		/* worker has closed the connection or another error happened */
+ 		if (ret <= 0)
+ 			return NULL;
+ 
+ 		Assert(ret == 1);
+ 
+ 		if (msg[msgsize] == '\0') {
+ 			return msg;
+ 		}
+ 
+ 		msgsize++;
+ 		if (msgsize == bufsize)
+ 		{
+ 			/* could be any number */
+ 			bufsize += 16;
+ 			msg = (char *) realloc(msg, bufsize);
+ 		}
+ 	}
+ }
+ 
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index ...4c86b9b .
*** a/src/bin/pg_dump/parallel.h
--- b/src/bin/pg_dump/parallel.h
***************
*** 0 ****
--- 1,91 ----
+ /*-------------------------------------------------------------------------
+  *
+  * parallel.h
+  *
+  *	Parallel support header file for the pg_dump archiver
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *	The author is not responsible for loss or damages that may
+  *	result from its use.
+  *
+  * IDENTIFICATION
+  *		src/bin/pg_dump/parallel.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "pg_backup_db.h"
+ 
+ struct _archiveHandle;
+ struct _tocEntry;
+ 
+ typedef enum
+ {
+ 	WRKR_TERMINATED = 0,
+ 	WRKR_IDLE,
+ 	WRKR_WORKING,
+ 	WRKR_FINISHED
+ } T_WorkerStatus;
+ 
+ typedef enum _action
+ {
+ 	ACT_DUMP,
+ 	ACT_RESTORE,
+ } T_Action;
+ 
+ /* Arguments needed for a worker process */
+ typedef struct _parallel_args
+ {
+ 	struct _archiveHandle *AH;
+ 	struct _tocEntry	  *te;
+ } ParallelArgs;
+ 
+ /* State for each parallel activity slot */
+ typedef struct _parallel_slot
+ {
+ 	ParallelArgs	   *args;
+ 	T_WorkerStatus		workerStatus;
+ 	int					status;
+ 	int					pipeRead;
+ 	int					pipeWrite;
+ 	int					pipeRevRead;
+ 	int					pipeRevWrite;
+ #ifdef WIN32
+ 	uintptr_t			hThread;
+ 	unsigned int		threadId;
+ #else
+ 	pid_t				pid;
+ #endif
+ 	bool				inErrorHandling;
+ } ParallelSlot;
+ 
+ #define NO_SLOT (-1)
+ 
+ typedef struct _parallel_state
+ {
+ 	int			numWorkers;
+ 	ParallelSlot *parallelSlot;
+ } ParallelState;
+ 
+ extern int GetIdleWorker(ParallelState *pstate);
+ extern bool IsEveryWorkerIdle(ParallelState *pstate);
+ extern void ListenToWorkers(struct _archiveHandle *AH, ParallelState *pstate, bool do_wait);
+ extern int ReapWorkerStatus(ParallelState *pstate, int *status);
+ extern void EnsureIdleWorker(struct _archiveHandle *AH, ParallelState *pstate);
+ extern void EnsureWorkersFinished(struct _archiveHandle *AH, ParallelState *pstate);
+ 
+ extern ParallelState *ParallelBackupStart(struct _archiveHandle *AH,
+ 										  RestoreOptions *ropt);
+ extern void DispatchJobForTocEntry(struct _archiveHandle *AH,
+ 								   ParallelState *pstate,
+ 								   struct _tocEntry *te, T_Action act);
+ extern void ParallelBackupEnd(struct _archiveHandle *AH, ParallelState *pstate);
+ 
+ extern void (* volatile vparallel_error_handler)(struct _archiveHandle *AH,
+ 									const char *modulename,
+ 									const char *fmt, va_list ap);
+ 
+ extern void checkAborting(struct _archiveHandle *AH);
+ 
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 8926488..767f865 100644
*** a/src/bin/pg_dump/pg_backup.h
--- b/src/bin/pg_dump/pg_backup.h
*************** typedef struct _Archive
*** 90,95 ****
--- 90,97 ----
  	int			minRemoteVersion;		/* allowable range */
  	int			maxRemoteVersion;
  
+ 	int			numWorkers;		/* number of parallel processes */
+ 
  	/* info needed for string escaping */
  	int			encoding;		/* libpq code for client_encoding */
  	bool		std_strings;	/* standard_conforming_strings */
*************** typedef struct _restoreOptions
*** 150,156 ****
  	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
  										 * to stderr */
  	bool		single_txn;
- 	int			number_of_jobs;
  
  	bool	   *idWanted;		/* array showing which dump IDs to emit */
  } RestoreOptions;
--- 152,157 ----
*************** typedef struct _restoreOptions
*** 162,180 ****
  
  /* Lets the archive know we have a DB connection to shutdown if it dies */
  
! PGconn *ConnectDatabase(Archive *AH,
  				const char *dbname,
  				const char *pghost,
  				const char *pgport,
  				const char *username,
  				enum trivalue prompt_password);
  
  /* Called to add a TOC entry */
  extern void ArchiveEntry(Archive *AHX,
  			 CatalogId catalogId, DumpId dumpId,
  			 const char *tag,
  			 const char *namespace, const char *tablespace,
! 			 const char *owner, bool withOids,
  			 const char *desc, teSection section,
  			 const char *defn,
  			 const char *dropStmt, const char *copyStmt,
--- 163,183 ----
  
  /* Lets the archive know we have a DB connection to shutdown if it dies */
  
! PGconn *ConnectDatabase(Archive *AHX,
  				const char *dbname,
  				const char *pghost,
  				const char *pgport,
  				const char *username,
  				enum trivalue prompt_password);
+ PGconn *CloneDatabaseConnection(Archive *AHX);
  
  /* Called to add a TOC entry */
  extern void ArchiveEntry(Archive *AHX,
  			 CatalogId catalogId, DumpId dumpId,
  			 const char *tag,
  			 const char *namespace, const char *tablespace,
! 			 const char *owner,
! 			 unsigned long int relpages, bool withOids,
  			 const char *desc, teSection section,
  			 const char *defn,
  			 const char *dropStmt, const char *copyStmt,
*************** extern void PrintTOCSummary(Archive *AH,
*** 203,208 ****
--- 206,214 ----
  
  extern RestoreOptions *NewRestoreOptions(void);
  
+ /* We have one in pg_dump.c and another one in pg_restore.c */
+ extern void _SetupWorker(Archive *AHX, RestoreOptions *ropt);
+ 
  /* Rearrange and filter TOC entries */
  extern void SortTocFromFile(Archive *AHX, RestoreOptions *ropt);
  extern void InitDummyWantedList(Archive *AHX, RestoreOptions *ropt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 234e50f..0c81dfe 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 23,82 ****
  #include "pg_backup_db.h"
  #include "dumpmem.h"
  #include "dumputils.h"
  
  #include <ctype.h>
  #include <unistd.h>
  #include <sys/stat.h>
  #include <sys/types.h>
  #include <sys/wait.h>
  
  #ifdef WIN32
  #include <io.h>
  #endif
  
  #include "libpq/libpq-fs.h"
  
- /*
-  * Special exit values from worker children.  We reserve 0 for normal
-  * success; 1 and other small values should be interpreted as crashes.
-  */
- #define WORKER_CREATE_DONE		10
- #define WORKER_INHIBIT_DATA		11
- #define WORKER_IGNORED_ERRORS	12
- 
- /*
-  * Unix uses exit to return result from worker child, so function is void.
-  * Windows thread result comes via function return.
-  */
- #ifndef WIN32
- #define parallel_restore_result void
- #else
- #define parallel_restore_result DWORD
- #endif
- 
- /* IDs for worker children are either PIDs or thread handles */
- #ifndef WIN32
- #define thandle pid_t
- #else
- #define thandle HANDLE
- #endif
- 
- /* Arguments needed for a worker child */
- typedef struct _restore_args
- {
- 	ArchiveHandle *AH;
- 	TocEntry   *te;
- } RestoreArgs;
- 
- /* State for each parallel activity slot */
- typedef struct _parallel_slot
- {
- 	thandle		child_id;
- 	RestoreArgs *args;
- } ParallelSlot;
- 
- #define NO_SLOT (-1)
- 
  #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
  #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
  
--- 23,46 ----
  #include "pg_backup_db.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  #include <ctype.h>
+ #include <fcntl.h>
  #include <unistd.h>
  #include <sys/stat.h>
  #include <sys/types.h>
  #include <sys/wait.h>
  
+ static const char *modulename = gettext_noop("archiver");
+ 
+ 
  #ifdef WIN32
  #include <io.h>
  #endif
  
  #include "libpq/libpq-fs.h"
  
  #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
  #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
  
*************** typedef struct _outputContext
*** 87,94 ****
  	int			gzOut;
  } OutputContext;
  
- static const char *modulename = gettext_noop("archiver");
- 
  /* index array created by fix_dependencies -- only used in parallel restore */
  static TocEntry **tocsByDumpId; /* index by dumpId - 1 */
  static DumpId maxDumpId;		/* length of above array */
--- 51,56 ----
*************** static teReqs _tocEntryRequired(TocEntry
*** 115,121 ****
  static bool _tocEntryIsACL(TocEntry *te);
  static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
- static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
  static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
  static int	_discoverArchiveFormat(ArchiveHandle *AH);
  
--- 77,82 ----
*************** static void RestoreOutput(ArchiveHandle
*** 132,152 ****
  
  static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel);
! static void restore_toc_entries_parallel(ArchiveHandle *AH);
! static thandle spawn_restore(RestoreArgs *args);
! static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
! static bool work_in_progress(ParallelSlot *slots, int n_slots);
! static int	get_next_slot(ParallelSlot *slots, int n_slots);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
! 				   ParallelSlot *slots, int n_slots);
! static parallel_restore_result parallel_restore(RestoreArgs *args);
  static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   thandle worker, int status,
! 			   ParallelSlot *slots, int n_slots);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH,
--- 93,111 ----
  
  static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel);
! static void restore_toc_entries_prefork(ArchiveHandle *AH);
! static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
! 										 TocEntry *pending_list);
! static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
! 				   ParallelState *pstate);
  static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH,
*************** static void reduce_dependencies(ArchiveH
*** 156,164 ****
  					TocEntry *ready_list);
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
- static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
- static void DeCloneArchive(ArchiveHandle *AH);
- 
  
  /*
   *	Wrapper functions.
--- 115,120 ----
*************** RestoreArchive(Archive *AHX, RestoreOpti
*** 245,251 ****
  	/*
  	 * If we're going to do parallel restore, there are some restrictions.
  	 */
! 	parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB);
  	if (parallel_mode)
  	{
  		/* We haven't got round to making this work for all archive formats */
--- 201,207 ----
  	/*
  	 * If we're going to do parallel restore, there are some restrictions.
  	 */
! 	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
  	if (parallel_mode)
  	{
  		/* We haven't got round to making this work for all archive formats */
*************** RestoreArchive(Archive *AHX, RestoreOpti
*** 411,417 ****
  	 * In parallel mode, turn control over to the parallel-restore logic.
  	 */
  	if (parallel_mode)
! 		restore_toc_entries_parallel(AH);
  	else
  	{
  		for (te = AH->toc->next; te != AH->toc; te = te->next)
--- 367,391 ----
  	 * In parallel mode, turn control over to the parallel-restore logic.
  	 */
  	if (parallel_mode)
! 	{
! 		ParallelState  *pstate;
! 		TocEntry		pending_list;
! 
! 		par_list_header_init(&pending_list);
! 
! 		/* This runs PRE_DATA items and then disconnects from the database */
! 		restore_toc_entries_prefork(AH);
! 		Assert(AH->connection == NULL);
! 
! 		/* ParallelBackupStart() will actually fork the processes */
! 		pstate = ParallelBackupStart(AH, ropt);
! 		restore_toc_entries_parallel(AH, pstate, &pending_list);
! 		ParallelBackupEnd(AH, pstate);
! 
! 		/* reconnect the master and see if we missed something */
! 		restore_toc_entries_postfork(AH, &pending_list);
! 		Assert(AH->connection != NULL);
! 	}
  	else
  	{
  		for (te = AH->toc->next; te != AH->toc; te = te->next)
*************** static int
*** 476,482 ****
  restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel)
  {
! 	int			retval = 0;
  	teReqs		reqs;
  	bool		defnDumped;
  
--- 450,456 ----
  restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel)
  {
! 	int			status = WORKER_OK;
  	teReqs		reqs;
  	bool		defnDumped;
  
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 518,524 ****
  				if (ropt->noDataForFailedTables)
  				{
  					if (is_parallel)
! 						retval = WORKER_INHIBIT_DATA;
  					else
  						inhibit_data_for_failed_table(AH, te);
  				}
--- 492,498 ----
  				if (ropt->noDataForFailedTables)
  				{
  					if (is_parallel)
! 						status = WORKER_INHIBIT_DATA;
  					else
  						inhibit_data_for_failed_table(AH, te);
  				}
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 533,539 ****
  				 * just set the return value.
  				 */
  				if (is_parallel)
! 					retval = WORKER_CREATE_DONE;
  				else
  					mark_create_done(AH, te);
  			}
--- 507,513 ----
  				 * just set the return value.
  				 */
  				if (is_parallel)
! 					status = WORKER_CREATE_DONE;
  				else
  					mark_create_done(AH, te);
  			}
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 651,657 ****
  		}
  	}
  
! 	return retval;
  }
  
  /*
--- 625,634 ----
  		}
  	}
  
! 	if (AH->public.n_errors > 0 && status == WORKER_OK)
! 		status = WORKER_IGNORED_ERRORS;
! 
! 	return status;
  }
  
  /*
*************** ArchiveEntry(Archive *AHX,
*** 753,759 ****
  			 const char *tag,
  			 const char *namespace,
  			 const char *tablespace,
! 			 const char *owner, bool withOids,
  			 const char *desc, teSection section,
  			 const char *defn,
  			 const char *dropStmt, const char *copyStmt,
--- 730,737 ----
  			 const char *tag,
  			 const char *namespace,
  			 const char *tablespace,
! 			 const char *owner,
! 			 unsigned long int relpages, bool withOids,
  			 const char *desc, teSection section,
  			 const char *defn,
  			 const char *dropStmt, const char *copyStmt,
*************** static void
*** 1429,1434 ****
--- 1407,1421 ----
  vdie_horribly(ArchiveHandle *AH, const char *modulename,
  			  const char *fmt, va_list ap)
  {
+ 	/*
+ 	 * If we have an error handler for the parallel operation, then
+ 	 * control will not come back from there.
+ 	 */
+ 	if (vparallel_error_handler)
+ 		vparallel_error_handler(AH, modulename, fmt, ap);
+ 
+ 	Assert(!vparallel_error_handler);
+ 
  	vwrite_msg(modulename, fmt, ap);
  
  	if (AH)
*************** _moveBefore(ArchiveHandle *AH, TocEntry
*** 1534,1540 ****
  	pos->prev = te;
  }
  
! static TocEntry *
  getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
  {
  	TocEntry   *te;
--- 1521,1527 ----
  	pos->prev = te;
  }
  
! TocEntry *
  getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
  {
  	TocEntry   *te;
*************** _allocAH(const char *FileSpec, const Arc
*** 1944,1949 ****
--- 1931,1938 ----
  
  	AH->archiveDumpVersion = PG_VERSION;
  
+ 	AH->is_clone = false;
+ 
  	AH->createDate = time(NULL);
  
  	AH->intSize = sizeof(int);
*************** _allocAH(const char *FileSpec, const Arc
*** 2035,2082 ****
  
  
  void
! WriteDataChunks(ArchiveHandle *AH)
  {
  	TocEntry   *te;
- 	StartDataPtr startPtr;
- 	EndDataPtr	endPtr;
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		if (te->dataDumper != NULL)
! 		{
! 			AH->currToc = te;
! 			/* printf("Writing data for %d (%x)\n", te->id, te); */
! 
! 			if (strcmp(te->desc, "BLOBS") == 0)
! 			{
! 				startPtr = AH->StartBlobsPtr;
! 				endPtr = AH->EndBlobsPtr;
! 			}
! 			else
! 			{
! 				startPtr = AH->StartDataPtr;
! 				endPtr = AH->EndDataPtr;
! 			}
! 
! 			if (startPtr != NULL)
! 				(*startPtr) (AH, te);
  
  			/*
! 			 * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
  			 */
  
! 			/*
! 			 * The user-provided DataDumper routine needs to call
! 			 * AH->WriteData
! 			 */
! 			(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
  
! 			if (endPtr != NULL)
! 				(*endPtr) (AH, te);
! 			AH->currToc = NULL;
! 		}
  	}
  }
  
  void
--- 2024,2088 ----
  
  
  void
! WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
  {
  	TocEntry   *te;
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		if (!te->hadDumper)
! 			continue;
  
+ 		if (pstate && pstate->numWorkers > 1)
+ 		{
  			/*
! 			 * If we are in a parallel backup, then we are always the master
! 			 * process.
  			 */
+ 			EnsureIdleWorker(AH, pstate);
+ 			Assert(GetIdleWorker(pstate) != NO_SLOT);
+ 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+ 		}
+ 		else
+ 		{
+ 			WriteDataChunksForTocEntry(AH, te);
+ 		}
+ 	}
+ 	EnsureWorkersFinished(AH, pstate);
+ }
  
! void
! WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
! {
! 	StartDataPtr startPtr;
! 	EndDataPtr	endPtr;
  
! 	AH->currToc = te;
! 
! 	if (strcmp(te->desc, "BLOBS") == 0)
! 	{
! 		startPtr = AH->StartBlobsPtr;
! 		endPtr = AH->EndBlobsPtr;
! 	}
! 	else
! 	{
! 		startPtr = AH->StartDataPtr;
! 		endPtr = AH->EndDataPtr;
  	}
+ 
+ 	if (startPtr != NULL)
+ 		(*startPtr) (AH, te);
+ 
+ 	/*
+ 	 * The user-provided DataDumper routine needs to call
+ 	 * AH->WriteData
+ 	 */
+ 	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+ 
+ 	if (endPtr != NULL)
+ 		(*endPtr) (AH, te);
+ 
+ 	AH->currToc = NULL;
  }
  
  void
*************** WriteToc(ArchiveHandle *AH)
*** 2086,2093 ****
  	char		workbuf[32];
  	int			i;
  
- 	/* printf("%d TOC Entries to save\n", AH->tocCount); */
- 
  	WriteInt(AH, AH->tocCount);
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
--- 2092,2097 ----
*************** dumpTimestamp(ArchiveHandle *AH, const c
*** 3239,3274 ****
  		ahprintf(AH, "-- %s %s\n\n", msg, buf);
  }
  
- 
- /*
-  * Main engine for parallel restore.
-  *
-  * Work is done in three phases.
-  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
-  * just as for a standard restore.	Second we process the remaining non-ACL
-  * steps in parallel worker children (threads on Windows, processes on Unix),
-  * each of which connects separately to the database.  Finally we process all
-  * the ACL entries in a single connection (that happens back in
-  * RestoreArchive).
-  */
  static void
! restore_toc_entries_parallel(ArchiveHandle *AH)
  {
  	RestoreOptions *ropt = AH->ropt;
- 	int			n_slots = ropt->number_of_jobs;
- 	ParallelSlot *slots;
- 	int			work_status;
- 	int			next_slot;
  	bool		skipped_some;
- 	TocEntry	pending_list;
- 	TocEntry	ready_list;
  	TocEntry   *next_work_item;
- 	thandle		ret_child;
- 	TocEntry   *te;
  
! 	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
  
! 	slots = (ParallelSlot *) pg_calloc(sizeof(ParallelSlot), n_slots);
  
  	/* Adjust dependency information */
  	fix_dependencies(AH);
--- 3243,3264 ----
  		ahprintf(AH, "-- %s %s\n\n", msg, buf);
  }
  
  static void
! restore_toc_entries_prefork(ArchiveHandle *AH)
  {
  	RestoreOptions *ropt = AH->ropt;
  	bool		skipped_some;
  	TocEntry   *next_work_item;
  
! 	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
  
! 	/* we haven't got round to making this work for all archive formats */
! 	if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
! 		die_horribly(AH, modulename, "parallel restore is not supported with this archive file format\n");
! 
! 	/* doesn't work if the archive represents dependencies as OIDs, either */
! 	if (AH->version < K_VERS_1_8)
! 		die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
  
  	/* Adjust dependency information */
  	fix_dependencies(AH);
*************** restore_toc_entries_parallel(ArchiveHand
*** 3336,3352 ****
  		free(AH->currTablespace);
  	AH->currTablespace = NULL;
  	AH->currWithOids = -1;
  
  	/*
! 	 * Initialize the lists of pending and ready items.  After this setup, the
! 	 * pending list is everything that needs to be done but is blocked by one
! 	 * or more dependencies, while the ready list contains items that have no
! 	 * remaining dependencies.	Note: we don't yet filter out entries that
! 	 * aren't going to be restored.  They might participate in dependency
! 	 * chains connecting entries that should be restored, so we treat them as
! 	 * live until we actually process them.
  	 */
- 	par_list_header_init(&pending_list);
  	par_list_header_init(&ready_list);
  	skipped_some = false;
  	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
--- 3326,3367 ----
  		free(AH->currTablespace);
  	AH->currTablespace = NULL;
  	AH->currWithOids = -1;
+ }
+ 
+ /*
+  * Main engine for parallel restore.
+  *
+  * Work is done in three phases.
+  * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+  * just as for a standard restore. This is done in restore_toc_entries_prefork().
+  * Second we process the remaining non-ACL steps in parallel worker children
+  * (threads on Windows, processes on Unix), these fork off and set up their
+  * connections before we call restore_toc_entries_parallel_forked.
+  * Finally we process all the ACL entries in a single connection (that happens
+  * back in RestoreArchive).
+  */
+ static void
+ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ 							 TocEntry *pending_list)
+ {
+ 	int			work_status;
+ 	bool		skipped_some;
+ 	TocEntry	ready_list;
+ 	TocEntry   *next_work_item;
+ 	int			ret_child;
+ 
+ 	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
  
  	/*
! 	 * Initialize the lists of ready items, the list for pending items has
! 	 * already been initialized in the parent.  After this setup, the pending
! 	 * list is everything that needs to be done but is blocked by one or more
! 	 * dependencies, while the ready list contains items that have no remaining
! 	 * dependencies. Note: we don't yet filter out entries that aren't going
! 	 * to be restored. They might participate in dependency chains connecting
! 	 * entries that should be restored, so we treat them as live until we
! 	 * actually process them.
  	 */
  	par_list_header_init(&ready_list);
  	skipped_some = false;
  	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
*************** restore_toc_entries_parallel(ArchiveHand
*** 3371,3377 ****
  		}
  
  		if (next_work_item->depCount > 0)
! 			par_list_append(&pending_list, next_work_item);
  		else
  			par_list_append(&ready_list, next_work_item);
  	}
--- 3386,3392 ----
  		}
  
  		if (next_work_item->depCount > 0)
! 			par_list_append(pending_list, next_work_item);
  		else
  			par_list_append(&ready_list, next_work_item);
  	}
*************** restore_toc_entries_parallel(ArchiveHand
*** 3385,3393 ****
  
  	ahlog(AH, 1, "entering main parallel loop\n");
  
! 	while ((next_work_item = get_next_work_item(AH, &ready_list,
! 												slots, n_slots)) != NULL ||
! 		   work_in_progress(slots, n_slots))
  	{
  		if (next_work_item != NULL)
  		{
--- 3400,3407 ----
  
  	ahlog(AH, 1, "entering main parallel loop\n");
  
! 	while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
! 		   !IsEveryWorkerIdle(pstate))
  	{
  		if (next_work_item != NULL)
  		{
*************** restore_toc_entries_parallel(ArchiveHand
*** 3407,3461 ****
  				continue;
  			}
  
! 			if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
! 			{
! 				/* There is work still to do and a worker slot available */
! 				thandle		child;
! 				RestoreArgs *args;
! 
! 				ahlog(AH, 1, "launching item %d %s %s\n",
! 					  next_work_item->dumpId,
! 					  next_work_item->desc, next_work_item->tag);
  
! 				par_list_remove(next_work_item);
  
! 				/* this memory is dealloced in mark_work_done() */
! 				args = pg_malloc(sizeof(RestoreArgs));
! 				args->AH = CloneArchive(AH);
! 				args->te = next_work_item;
  
! 				/* run the step in a worker child */
! 				child = spawn_restore(args);
  
! 				slots[next_slot].child_id = child;
! 				slots[next_slot].args = args;
  
! 				continue;
  			}
- 		}
  
! 		/*
! 		 * If we get here there must be work being done.  Either there is no
! 		 * work available to schedule (and work_in_progress returned true) or
! 		 * there are no slots available.  So we wait for a worker to finish,
! 		 * and process the result.
! 		 */
! 		ret_child = reap_child(slots, n_slots, &work_status);
  
! 		if (WIFEXITED(work_status))
! 		{
! 			mark_work_done(AH, &ready_list,
! 						   ret_child, WEXITSTATUS(work_status),
! 						   slots, n_slots);
! 		}
! 		else
! 		{
! 			die_horribly(AH, modulename, "worker process crashed: status %d\n",
! 						 work_status);
  		}
  	}
  
  	ahlog(AH, 1, "finished main parallel loop\n");
  
  	/*
  	 * Now reconnect the single parent connection.
--- 3421,3493 ----
  				continue;
  			}
  
! 			ahlog(AH, 1, "launching item %d %s %s\n",
! 				  next_work_item->dumpId,
! 				  next_work_item->desc, next_work_item->tag);
  
! 			par_list_remove(next_work_item);
  
! 			Assert(GetIdleWorker(pstate) != NO_SLOT);
! 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
! 		}
! 		else
! 		{
! 			/* at least one child is working and we have nothing ready. */
! 			Assert(!IsEveryWorkerIdle(pstate));
! 		}
  
! 		for (;;)
! 		{
! 			int nTerm = 0;
  
! 			/*
! 			 * In order to reduce dependencies as soon as possible and
! 			 * especially to reap the status of workers who are working on
! 			 * items that pending items depend on, we do a non-blocking check
! 			 * for ended workers first.
! 			 *
! 			 * However, if we do not have any other work items currently that
! 			 * workers can work on, we do not busy-loop here but instead
! 			 * really wait for at least one worker to terminate. Hence we call
! 			 * ListenToWorkers(..., ..., do_wait = true) in this case.
! 			 */
! 			ListenToWorkers(AH, pstate, !next_work_item);
  
! 			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
! 			{
! 				nTerm++;
! 				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
  			}
  
! 			/*
! 			 * We need to make sure that we have an idle worker before re-running the
! 			 * loop. If nTerm > 0 we already have that (quick check).
! 			 */
! 			if (nTerm > 0)
! 				break;
  
! 			/* if nobody terminated, explicitly check for an idle worker */
! 			if (GetIdleWorker(pstate) != NO_SLOT)
! 				break;
! 
! 			/*
! 			 * If we have no idle worker, read the result of one or more
! 			 * workers and loop the loop to call ReapWorkerStatus() on them.
! 			 */
! 			ListenToWorkers(AH, pstate, true);
  		}
  	}
  
  	ahlog(AH, 1, "finished main parallel loop\n");
+ }
+ 
+ static void
+ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
+ {
+ 	RestoreOptions *ropt = AH->ropt;
+ 	TocEntry   *te;
+ 
+ 	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
  
  	/*
  	 * Now reconnect the single parent connection.
*************** restore_toc_entries_parallel(ArchiveHand
*** 3471,3477 ****
  	 * dependencies, or some other pathological condition. If so, do it in the
  	 * single parent connection.
  	 */
! 	for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
  	{
  		ahlog(AH, 1, "processing missed item %d %s %s\n",
  			  te->dumpId, te->desc, te->tag);
--- 3503,3509 ----
  	 * dependencies, or some other pathological condition. If so, do it in the
  	 * single parent connection.
  	 */
! 	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
  	{
  		ahlog(AH, 1, "processing missed item %d %s %s\n",
  			  te->dumpId, te->desc, te->tag);
*************** restore_toc_entries_parallel(ArchiveHand
*** 3482,3602 ****
  }
  
  /*
-  * create a worker child to perform a restore step in parallel
-  */
- static thandle
- spawn_restore(RestoreArgs *args)
- {
- 	thandle		child;
- 
- 	/* Ensure stdio state is quiesced before forking */
- 	fflush(NULL);
- 
- #ifndef WIN32
- 	child = fork();
- 	if (child == 0)
- 	{
- 		/* in child process */
- 		parallel_restore(args);
- 		die_horribly(args->AH, modulename,
- 					 "parallel_restore should not return\n");
- 	}
- 	else if (child < 0)
- 	{
- 		/* fork failed */
- 		die_horribly(args->AH, modulename,
- 					 "could not create worker process: %s\n",
- 					 strerror(errno));
- 	}
- #else
- 	child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
- 									args, 0, NULL);
- 	if (child == 0)
- 		die_horribly(args->AH, modulename,
- 					 "could not create worker thread: %s\n",
- 					 strerror(errno));
- #endif
- 
- 	return child;
- }
- 
- /*
-  *	collect status from a completed worker child
-  */
- static thandle
- reap_child(ParallelSlot *slots, int n_slots, int *work_status)
- {
- #ifndef WIN32
- 	/* Unix is so much easier ... */
- 	return wait(work_status);
- #else
- 	static HANDLE *handles = NULL;
- 	int			hindex,
- 				snum,
- 				tnum;
- 	thandle		ret_child;
- 	DWORD		res;
- 
- 	/* first time around only, make space for handles to listen on */
- 	if (handles == NULL)
- 		handles = (HANDLE *) pg_calloc(sizeof(HANDLE), n_slots);
- 
- 	/* set up list of handles to listen to */
- 	for (snum = 0, tnum = 0; snum < n_slots; snum++)
- 		if (slots[snum].child_id != 0)
- 			handles[tnum++] = slots[snum].child_id;
- 
- 	/* wait for one to finish */
- 	hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
- 
- 	/* get handle of finished thread */
- 	ret_child = handles[hindex - WAIT_OBJECT_0];
- 
- 	/* get the result */
- 	GetExitCodeThread(ret_child, &res);
- 	*work_status = res;
- 
- 	/* dispose of handle to stop leaks */
- 	CloseHandle(ret_child);
- 
- 	return ret_child;
- #endif
- }
- 
- /*
-  * are we doing anything now?
-  */
- static bool
- work_in_progress(ParallelSlot *slots, int n_slots)
- {
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id != 0)
- 			return true;
- 	}
- 	return false;
- }
- 
- /*
-  * find the first free parallel slot (if any).
-  */
- static int
- get_next_slot(ParallelSlot *slots, int n_slots)
- {
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id == 0)
- 			return i;
- 	}
- 	return NO_SLOT;
- }
- 
- 
- /*
   * Check if te1 has an exclusive lock requirement for an item that te2 also
   * requires, whether or not te2's requirement is for an exclusive lock.
   */
--- 3514,3519 ----
*************** par_list_remove(TocEntry *te)
*** 3669,3675 ****
   */
  static TocEntry *
  get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! 				   ParallelSlot *slots, int n_slots)
  {
  	bool		pref_non_data = false;	/* or get from AH->ropt */
  	TocEntry   *data_te = NULL;
--- 3586,3592 ----
   */
  static TocEntry *
  get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! 				   ParallelState *pstate)
  {
  	bool		pref_non_data = false;	/* or get from AH->ropt */
  	TocEntry   *data_te = NULL;
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3684,3694 ****
  	{
  		int			count = 0;
  
! 		for (k = 0; k < n_slots; k++)
! 			if (slots[k].args->te != NULL &&
! 				slots[k].args->te->section == SECTION_DATA)
  				count++;
! 		if (n_slots == 0 || count * 4 < n_slots)
  			pref_non_data = false;
  	}
  
--- 3601,3611 ----
  	{
  		int			count = 0;
  
! 		for (k = 0; k < pstate->numWorkers; k++)
! 			if (pstate->parallelSlot[k].args->te != NULL &&
! 				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
  				count++;
! 		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
  			pref_non_data = false;
  	}
  
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3704,3716 ****
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < n_slots && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
! 			if (slots[i].args == NULL)
  				continue;
! 			running_te = slots[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
--- 3621,3633 ----
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
! 			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
  				continue;
! 			running_te = pstate->parallelSlot[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3745,3805 ****
  /*
   * Restore a single TOC item in parallel with others
   *
!  * this is the procedure run as a thread (Windows) or a
!  * separate process (everything else).
   */
! static parallel_restore_result
! parallel_restore(RestoreArgs *args)
  {
  	ArchiveHandle *AH = args->AH;
  	TocEntry   *te = args->te;
  	RestoreOptions *ropt = AH->ropt;
! 	int			retval;
! 
! 	/*
! 	 * Close and reopen the input file so we have a private file pointer that
! 	 * doesn't stomp on anyone else's file pointer, if we're actually going to
! 	 * need to read from the file. Otherwise, just close it except on Windows,
! 	 * where it will possibly be needed by other threads.
! 	 *
! 	 * Note: on Windows, since we are using threads not processes, the reopen
! 	 * call *doesn't* close the original file pointer but just open a new one.
! 	 */
! 	if (te->section == SECTION_DATA)
! 		(AH->ReopenPtr) (AH);
! #ifndef WIN32
! 	else
! 		(AH->ClosePtr) (AH);
! #endif
! 
! 	/*
! 	 * We need our own database connection, too
! 	 */
! 	ConnectDatabase((Archive *) AH, ropt->dbname,
! 					ropt->pghost, ropt->pgport, ropt->username,
! 					ropt->promptPassword);
  
  	_doSetFixedOutputState(AH);
  
! 	/* Restore the TOC item */
! 	retval = restore_toc_entry(AH, te, ropt, true);
! 
! 	/* And clean up */
! 	PQfinish(AH->connection);
! 	AH->connection = NULL;
  
! 	/* If we reopened the file, we are done with it, so close it now */
! 	if (te->section == SECTION_DATA)
! 		(AH->ClosePtr) (AH);
  
! 	if (retval == 0 && AH->public.n_errors)
! 		retval = WORKER_IGNORED_ERRORS;
  
! #ifndef WIN32
! 	exit(retval);
! #else
! 	return retval;
! #endif
  }
  
  
--- 3662,3690 ----
  /*
   * Restore a single TOC item in parallel with others
   *
!  * this is run in the worker, i.e. in a thread (Windows) or a separate process
!  * (everything else). A worker process executes several such work items during
!  * a parallel backup or restore. Once we terminate here and report back that
!  * our work is finished, the master process will assign us a new work item.
   */
! int
! parallel_restore(ParallelArgs *args)
  {
  	ArchiveHandle *AH = args->AH;
  	TocEntry   *te = args->te;
  	RestoreOptions *ropt = AH->ropt;
! 	int			status;
  
  	_doSetFixedOutputState(AH);
  
! 	Assert(AH->connection != NULL);
  
! 	AH->public.n_errors = 0;
  
! 	/* Restore the TOC item */
! 	status = restore_toc_entry(AH, te, ropt, true);
  
! 	return status;
  }
  
  
*************** parallel_restore(RestoreArgs *args)
*** 3811,3835 ****
   */
  static void
  mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   thandle worker, int status,
! 			   ParallelSlot *slots, int n_slots)
  {
  	TocEntry   *te = NULL;
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id == worker)
- 		{
- 			slots[i].child_id = 0;
- 			te = slots[i].args->te;
- 			DeCloneArchive(slots[i].args->AH);
- 			free(slots[i].args);
- 			slots[i].args = NULL;
  
! 			break;
! 		}
! 	}
  
  	if (te == NULL)
  		die_horribly(AH, modulename, "could not find slot of finished worker\n");
--- 3696,3707 ----
   */
  static void
  mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate)
  {
  	TocEntry   *te = NULL;
  
! 	te = pstate->parallelSlot[worker].args->te;
  
  	if (te == NULL)
  		die_horribly(AH, modulename, "could not find slot of finished worker\n");
*************** inhibit_data_for_failed_table(ArchiveHan
*** 4184,4193 ****
   *
   * Enough of the structure is cloned to ensure that there is no
   * conflict between different threads each with their own clone.
-  *
-  * These could be public, but no need at present.
   */
! static ArchiveHandle *
  CloneArchive(ArchiveHandle *AH)
  {
  	ArchiveHandle *clone;
--- 4056,4063 ----
   *
   * Enough of the structure is cloned to ensure that there is no
   * conflict between different threads each with their own clone.
   */
! ArchiveHandle *
  CloneArchive(ArchiveHandle *AH)
  {
  	ArchiveHandle *clone;
*************** CloneArchive(ArchiveHandle *AH)
*** 4213,4221 ****
--- 4083,4145 ----
  	/* clone has its own error count, too */
  	clone->public.n_errors = 0;
  
+ 	/*
+ 	 * Remember that we're a clone, this is used for deciding if we should
+ 	 * install a snapshot.
+ 	 */
+ 	clone->is_clone = true;
+ 
+ 	/*
+ 	 * Connect our new clone object to the database:
+ 	 * In parallel restore the parent is already disconnected.
+ 	 * In parallel backup we clone the parent's existing connection.
+ 	 */
+ 	if (AH->ropt)
+ 	{
+ 		RestoreOptions *ropt = AH->ropt;
+ 		Assert(AH->connection == NULL);
+ 		/* this also sets clone->connection */
+ 		ConnectDatabase((Archive *) clone, ropt->dbname,
+ 					ropt->pghost, ropt->pgport, ropt->username,
+ 					ropt->promptPassword);
+ 	}
+ 	else
+ 	{
+ 		char	   *dbname;
+ 		char	   *pghost;
+ 		char	   *pgport;
+ 		char	   *username;
+ 		const char *encname;
+ 
+ 		Assert(AH->connection != NULL);
+ 
+ 		/*
+ 		 * Even though we are technically accessing the parent's database object
+ 		 * here, these functions are fine to be called like that because all just
+ 		 * return a pointer and do not actually send/receive any data to/from the
+ 		 * database.
+ 		 */
+ 		dbname = PQdb(AH->connection);
+ 		pghost = PQhost(AH->connection);
+ 		pgport = PQport(AH->connection);
+ 		username = PQuser(AH->connection);
+ 		encname = pg_encoding_to_char(AH->public.encoding);
+ 
+ 		/* this also sets clone->connection */
+ 		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+ 
+ 		/*
+ 		 * Set the same encoding, whatever we set here is what we got from
+ 		 * pg_encoding_to_char(), so we really shouldn't run into an error setting that
+ 		 * very same value. Also see the comment in SetupConnection().
+ 		 */
+ 		PQsetClientEncoding(clone->connection, encname);
+ 	}
+ 
  	/* Let the format-specific code have a chance too */
  	(clone->ClonePtr) (clone);
  
+ 	Assert(clone->connection != NULL);
  	return clone;
  }
  
*************** CloneArchive(ArchiveHandle *AH)
*** 4224,4230 ****
   *
   * Note: we assume any clone-local connection was already closed.
   */
! static void
  DeCloneArchive(ArchiveHandle *AH)
  {
  	/* Clear format-specific state */
--- 4148,4154 ----
   *
   * Note: we assume any clone-local connection was already closed.
   */
! void
  DeCloneArchive(ArchiveHandle *AH)
  {
  	/* Clear format-specific state */
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 6dd5158..3b10384 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef z_stream *z_streamp;
*** 100,108 ****
--- 100,120 ----
  #define K_OFFSET_POS_SET 2
  #define K_OFFSET_NO_DATA 3
  
+ /*
+  * Special exit values from worker children.  We reserve 0 for normal
+  * success; 1 and other small values should be interpreted as crashes.
+  */
+ #define WORKER_OK                     0
+ #define WORKER_CREATE_DONE            10
+ #define WORKER_INHIBIT_DATA           11
+ #define WORKER_IGNORED_ERRORS         12
+ 
  struct _archiveHandle;
  struct _tocEntry;
  struct _restoreList;
+ struct _parallel_args;
+ struct _parallel_state;
+ enum _action;
  
  typedef void (*ClosePtr) (struct _archiveHandle * AH);
  typedef void (*ReopenPtr) (struct _archiveHandle * AH);
*************** typedef void (*PrintTocDataPtr) (struct
*** 130,135 ****
--- 142,154 ----
  typedef void (*ClonePtr) (struct _archiveHandle * AH);
  typedef void (*DeClonePtr) (struct _archiveHandle * AH);
  
+ typedef char *(*WorkerJobRestorePtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+ typedef char *(*WorkerJobDumpPtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+ typedef char *(*MasterStartParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+ 											enum _action act);
+ typedef int (*MasterEndParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+ 										const char *str, enum _action act);
+ 
  typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len);
  
  typedef enum
*************** typedef struct _archiveHandle
*** 188,193 ****
--- 207,213 ----
  								 * Added V1.7 */
  	ArchiveFormat format;		/* Archive format */
  
+ 	bool		is_clone;		/* have we been cloned ? */
  	sqlparseInfo sqlparse;		/* state for parsing INSERT data */
  
  	time_t		createDate;		/* Date archive created */
*************** typedef struct _archiveHandle
*** 228,233 ****
--- 248,259 ----
  	StartBlobPtr StartBlobPtr;
  	EndBlobPtr EndBlobPtr;
  
+ 	MasterStartParallelItemPtr MasterStartParallelItemPtr;
+ 	MasterEndParallelItemPtr MasterEndParallelItemPtr;
+ 
+ 	WorkerJobDumpPtr WorkerJobDumpPtr;
+ 	WorkerJobRestorePtr WorkerJobRestorePtr;
+ 
  	ClonePtr ClonePtr;			/* Clone format-specific fields */
  	DeClonePtr DeClonePtr;		/* Clean up cloned fields */
  
*************** typedef struct _archiveHandle
*** 237,242 ****
--- 263,271 ----
  	char	   *archdbname;		/* DB name *read* from archive */
  	enum trivalue promptPassword;
  	char	   *savedPassword;	/* password for ropt->username, if known */
+ 	char	   *use_role;
+ 	char	   *sync_snapshot_id;	/* sync snapshot id for parallel
+ 									   operation */
  	PGconn	   *connection;
  	int			connectToDB;	/* Flag to indicate if direct DB connection is
  								 * required */
*************** typedef struct _tocEntry
*** 324,329 ****
--- 353,359 ----
  	int			nLockDeps;		/* number of such dependencies */
  } TocEntry;
  
+ extern int parallel_restore(struct _parallel_args *args);
  
  extern void die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
  extern void warn_or_die_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
*************** extern void WriteHead(ArchiveHandle *AH)
*** 334,342 ****
  extern void ReadHead(ArchiveHandle *AH);
  extern void WriteToc(ArchiveHandle *AH);
  extern void ReadToc(ArchiveHandle *AH);
! extern void WriteDataChunks(ArchiveHandle *AH);
  
  extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt);
  extern bool checkSeek(FILE *fp);
  
  #define appendStringLiteralAHX(buf,str,AH) \
--- 364,377 ----
  extern void ReadHead(ArchiveHandle *AH);
  extern void WriteToc(ArchiveHandle *AH);
  extern void ReadToc(ArchiveHandle *AH);
! extern void WriteDataChunks(ArchiveHandle *AH, struct _parallel_state *pstate);
! extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te);
! 
! extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
! extern void DeCloneArchive(ArchiveHandle *AH);
  
  extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt);
+ TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
  extern bool checkSeek(FILE *fp);
  
  #define appendStringLiteralAHX(buf,str,AH) \
*************** int			ahprintf(ArchiveHandle *AH, const
*** 378,381 ****
--- 413,428 ----
  
  void		ahlog(ArchiveHandle *AH, int level, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
  
+ #ifdef USE_ASSERT_CHECKING
+ #define Assert(condition) \
+ 	if (!(condition)) \
+ 	{ \
+ 		write_msg(NULL, "Failed assertion in %s, line %d\n", \
+ 				  __FILE__, __LINE__); \
+ 		abort();\
+ 	}
+ #else
+ #define Assert(condition)
+ #endif
+ 
  #endif
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 31fa373..f3070f7 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "compress_io.h"
  #include "dumputils.h"
  #include "dumpmem.h"
+ #include "parallel.h"
  
  /*--------
   * Routines in the format interface
*************** static void _LoadBlobs(ArchiveHandle *AH
*** 60,65 ****
--- 61,70 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
+ static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+ static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+ char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+ 
  typedef struct
  {
  	CompressorState *cs;
*************** static size_t _CustomReadFunc(ArchiveHan
*** 87,94 ****
  
  static const char *modulename = gettext_noop("custom archiver");
  
- 
- 
  /*
   *	Init routine required by ALL formats. This is a global routine
   *	and should be declared in pg_backup_archiver.h
--- 92,97 ----
*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 127,132 ****
--- 130,142 ----
  	AH->ClonePtr = _Clone;
  	AH->DeClonePtr = _DeClone;
  
+ 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
+ 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
+ 
+ 	/* no parallel dump in the custom archive, only parallel restore */
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+ 
  	/* Set up a private area. */
  	ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
  	AH->formatData = (void *) ctx;
*************** _CloseArchive(ArchiveHandle *AH)
*** 698,704 ****
  		tpos = ftello(AH->FH);
  		WriteToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
! 		WriteDataChunks(AH);
  
  		/*
  		 * If possible, re-write the TOC in order to update the data offset
--- 708,714 ----
  		tpos = ftello(AH->FH);
  		WriteToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
! 		WriteDataChunks(AH, NULL);
  
  		/*
  		 * If possible, re-write the TOC in order to update the data offset
*************** _DeClone(ArchiveHandle *AH)
*** 796,801 ****
--- 806,886 ----
  	free(ctx);
  }
  
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * custom format archive and dumps the actual data.
+  */
+ char *
+ _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	ParallelArgs pargs;
+ 	int			status;
+ 	lclTocEntry *tctx;
+ 
+ 	tctx = (lclTocEntry *) te->formatData;
+ 
+ 	pargs.AH = AH;
+ 	pargs.te = te;
+ 
+ 	status = parallel_restore(&pargs);
+ 
+ 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the parent process. Depending on the desired
+  * action (dump or restore) it creates a string that is understood by the
+  * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static char *
+ _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ 	/*
+ 	 * A static char is okay here, even on Windows because we call this
+ 	 * function only from one process (the master).
+ 	 */
+ 	static char			buf[64]; /* short fixed-size string + number */
+ 
+ 	/* no parallel dump in the custom archive format */
+ 	Assert(act == ACT_RESTORE);
+ 
+ 	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the parent process. It analyzes the response of
+  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static int
+ _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes, status, n_errors;
+ 
+ 	/* no parallel dump in the custom archive */
+ 	Assert(act == ACT_RESTORE);
+ 
+ 	sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 	Assert(nBytes == strlen(str));
+ 	Assert(dumpId == te->dumpId);
+ 
+ 	AH->public.n_errors += n_errors;
+ 
+ 	return status;
+ }
+ 
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 9c6d7c1..995cf31 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
***************
*** 35,40 ****
--- 35,42 ----
  
  #include "compress_io.h"
  #include "dumpmem.h"
+ #include "dumputils.h"
+ #include "parallel.h"
  
  #include <dirent.h>
  #include <sys/stat.h>
*************** typedef struct
*** 50,55 ****
--- 52,58 ----
  	cfp		   *dataFH;			/* currently open data file */
  
  	cfp		   *blobsTocFH;		/* file handle for blobs.toc */
+ 	ParallelState *pstate;		/* for parallel backup / restore */
  } lclContext;
  
  typedef struct
*************** static int	_ReadByte(ArchiveHandle *);
*** 69,74 ****
--- 72,78 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
*************** static void _StartBlob(ArchiveHandle *AH
*** 80,90 ****
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
! static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename);
  
  static void createDirectory(const char *dir);
! 
  
  /*
   *	Init routine required by ALL formats. This is a global routine
--- 84,101 ----
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
+ static void _Clone(ArchiveHandle *AH);
+ static void _DeClone(ArchiveHandle *AH);
  
! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
! static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
! static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
! static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
  
  static void createDirectory(const char *dir);
! static char *prependDirectory(ArchiveHandle *AH, char *buf, const char *relativeFilename);
! static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
! static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
  
  /*
   *	Init routine required by ALL formats. This is a global routine
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 111,117 ****
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
! 	AH->ReopenPtr = NULL;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
--- 122,128 ----
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
! 	AH->ReopenPtr = _ReopenArchive;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 122,129 ****
  	AH->EndBlobPtr = _EndBlob;
  	AH->EndBlobsPtr = _EndBlobs;
  
! 	AH->ClonePtr = NULL;
! 	AH->DeClonePtr = NULL;
  
  	/* Set up our private context */
  	ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
--- 133,146 ----
  	AH->EndBlobPtr = _EndBlob;
  	AH->EndBlobsPtr = _EndBlobs;
  
! 	AH->ClonePtr = _Clone;
! 	AH->DeClonePtr = _DeClone;
! 
! 	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
! 	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
! 
! 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
! 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
  
  	/* Set up our private context */
  	ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 152,161 ****
  	}
  	else
  	{							/* Read Mode */
! 		char	   *fname;
  		cfp		   *tocFH;
  
! 		fname = prependDirectory(AH, "toc.dat");
  
  		tocFH = cfopen_read(fname, PG_BINARY_R);
  		if (tocFH == NULL)
--- 169,178 ----
  	}
  	else
  	{							/* Read Mode */
! 		char	   fname[MAXPGPATH];
  		cfp		   *tocFH;
  
! 		prependDirectory(AH, fname, "toc.dat");
  
  		tocFH = cfopen_read(fname, PG_BINARY_R);
  		if (tocFH == NULL)
*************** _StartData(ArchiveHandle *AH, TocEntry *
*** 281,289 ****
  {
  	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char	   *fname;
  
! 	fname = prependDirectory(AH, tctx->filename);
  
  	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
  	if (ctx->dataFH == NULL)
--- 298,306 ----
  {
  	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char		fname[MAXPGPATH];
  
! 	prependDirectory(AH, fname, tctx->filename);
  
  	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
  	if (ctx->dataFH == NULL)
*************** _PrintTocData(ArchiveHandle *AH, TocEntr
*** 372,379 ****
  		_LoadBlobs(AH, ropt);
  	else
  	{
! 		char	   *fname = prependDirectory(AH, tctx->filename);
  
  		_PrintFileData(AH, fname, ropt);
  	}
  }
--- 389,397 ----
  		_LoadBlobs(AH, ropt);
  	else
  	{
! 		char		fname[MAXPGPATH];
  
+ 		prependDirectory(AH, fname, tctx->filename);
  		_PrintFileData(AH, fname, ropt);
  	}
  }
*************** _LoadBlobs(ArchiveHandle *AH, RestoreOpt
*** 383,394 ****
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char	   *fname;
  	char		line[MAXPGPATH];
  
  	StartRestoreBlobs(AH);
  
! 	fname = prependDirectory(AH, "blobs.toc");
  
  	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
  
--- 401,412 ----
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char		fname[MAXPGPATH];
  	char		line[MAXPGPATH];
  
  	StartRestoreBlobs(AH);
  
! 	prependDirectory(AH, fname, "blobs.toc");
  
  	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
  
*************** _CloseArchive(ArchiveHandle *AH)
*** 515,521 ****
  	if (AH->mode == archModeWrite)
  	{
  		cfp		   *tocFH;
! 		char	   *fname = prependDirectory(AH, "toc.dat");
  
  		/* The TOC is always created uncompressed */
  		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
--- 533,544 ----
  	if (AH->mode == archModeWrite)
  	{
  		cfp		   *tocFH;
! 		char		fname[MAXPGPATH];
! 
! 		prependDirectory(AH, fname, "toc.dat");
! 
! 		/* this will actually fork the processes for a parallel backup */
! 		ctx->pstate = ParallelBackupStart(AH, NULL);
  
  		/* The TOC is always created uncompressed */
  		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
*************** _CloseArchive(ArchiveHandle *AH)
*** 536,546 ****
  		if (cfclose(tocFH) != 0)
  			die_horribly(AH, modulename, "could not close TOC file: %s\n",
  						 strerror(errno));
! 		WriteDataChunks(AH);
  	}
  	AH->FH = NULL;
  }
  
  
  /*
   * BLOB support
--- 559,582 ----
  		if (cfclose(tocFH) != 0)
  			die_horribly(AH, modulename, "could not close TOC file: %s\n",
  						 strerror(errno));
! 		WriteDataChunks(AH, ctx->pstate);
! 
! 		ParallelBackupEnd(AH, ctx->pstate);
  	}
  	AH->FH = NULL;
  }
  
+ /*
+  * Reopen the archive's file handle.
+  */
+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ 	/*
+ 	 * Our TOC is in memory, our data files are opened by each child anyway as
+ 	 * they are separate. We support reopening the archive by just doing nothing.
+ 	 */
+ }
  
  /*
   * BLOB support
*************** static void
*** 557,565 ****
  _StartBlobs(ArchiveHandle *AH, TocEntry *te)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char	   *fname;
  
! 	fname = prependDirectory(AH, "blobs.toc");
  
  	/* The blob TOC file is never compressed */
  	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
--- 593,601 ----
  _StartBlobs(ArchiveHandle *AH, TocEntry *te)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
! 	char		fname[MAXPGPATH];
  
! 	prependDirectory(AH, fname, "blobs.toc");
  
  	/* The blob TOC file is never compressed */
  	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
*************** createDirectory(const char *dir)
*** 652,663 ****
  					 dir, strerror(errno));
  }
  
! 
  static char *
! prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
- 	static char buf[MAXPGPATH];
  	char	   *dname;
  
  	dname = ctx->directory;
--- 688,703 ----
  					 dir, strerror(errno));
  }
  
! /*
!  * Gets a relative file name and prepends the output directory, writing the
!  * result to buf. The caller needs to make sure that buf is MAXPGPATH bytes
!  * big. Can't use a static char[MAXPGPATH] inside the function because we run
!  * multithreaded on Windows.
!  */
  static char *
! prependDirectory(ArchiveHandle *AH, char* buf, const char *relativeFilename)
  {
  	lclContext *ctx = (lclContext *) AH->formatData;
  	char	   *dname;
  
  	dname = ctx->directory;
*************** prependDirectory(ArchiveHandle *AH, cons
*** 671,673 ****
--- 711,864 ----
  
  	return buf;
  }
+ 
+ /*
+  * Clone format-specific fields during parallel restoration.
+  */
+ static void
+ _Clone(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 
+ 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
+ 	if (AH->formatData == NULL)
+ 		die_horribly(AH, modulename, "out of memory\n");
+ 	memcpy(AH->formatData, ctx, sizeof(lclContext));
+ 	ctx = (lclContext *) AH->formatData;
+ 
+ 	/*
+ 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
+ 	 * entry per archive, so no parallelism is possible.  Likewise,
+ 	 * TOC-entry-local state isn't an issue because any one TOC entry is
+ 	 * touched by just one worker child.
+ 	 */
+ 
+ 	/*
+ 	 * We also don't copy the ParallelState pointer (pstate), only the master
+ 	 * process ever writes to it.
+ 	 */
+ }
+ 
+ static void
+ _DeClone(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 	free(ctx);
+ }
+ 
+ /*
+  * This function is executed in the parent process. Depending on the desired
+  * action (dump or restore) it creates a string that is understood by the
+  * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static char *
+ _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ 	/*
+ 	 * A static char is okay here, even on Windows because we call this
+ 	 * function only from one process (the master).
+ 	 */
+ 	static char	buf[64];
+ 
+ 	if (act == ACT_DUMP)
+ 		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
+ 	else if (act == ACT_RESTORE)
+ 		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * directory archive and dumps the actual data.
+  *
+  * We are currently returning only the DumpId so theoretically we could
+  * make this function returning an int (or a DumpId). However, to
+  * facilitate further enhancements and because sooner or later we need to
+  * convert this to a string and send it via a message anyway, we stick with
+  * char *. It is parsed on the other side by the _EndMasterParallel()
+  * function of the respective dump format.
+  */
+ static char *
+ _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ 
+ 	/* This should never happen */
+ 	if (!tctx)
+ 		die_horribly(AH, modulename, "Error during backup\n");
+ 
+ 	/*
+ 	 * This function returns void. We either fail and die horribly or succeed...
+ 	 * A failure will be detected by the parent when the child dies unexpectedly.
+ 	 */
+ 	WriteDataChunksForTocEntry(AH, te);
+ 
+ 	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * directory archive and dumps the actual data.
+  */
+ static char *
+ _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	ParallelArgs pargs;
+ 	int			status;
+ 	lclTocEntry *tctx;
+ 
+ 	tctx = (lclTocEntry *) te->formatData;
+ 
+ 	pargs.AH = AH;
+ 	pargs.te = te;
+ 
+ 	status = parallel_restore(&pargs);
+ 
+ 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ 
+ 	return buf;
+ }
+ /*
+  * This function is executed in the parent process. It analyzes the response of
+  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static int
+ _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes, n_errors;
+ 	int			status = 0;
+ 
+ 	if (act == ACT_DUMP)
+ 	{
+ 		sscanf(str, "%u%n", &dumpId, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(str));
+ 	}
+ 	else if (act == ACT_RESTORE)
+ 	{
+ 		sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(str));
+ 
+ 		AH->public.n_errors += n_errors;
+ 	}
+ 
+ 	return status;
+ }
diff --git a/src/bin/pg_dump/pg_backup_files.c b/src/bin/pg_dump/pg_backup_files.c
index ffcbb8f..13e8ed3 100644
*** a/src/bin/pg_dump/pg_backup_files.c
--- b/src/bin/pg_dump/pg_backup_files.c
*************** InitArchiveFmt_Files(ArchiveHandle *AH)
*** 102,107 ****
--- 102,113 ----
  	AH->ClonePtr = NULL;
  	AH->DeClonePtr = NULL;
  
+ 	AH->MasterStartParallelItemPtr = NULL;
+ 	AH->MasterEndParallelItemPtr = NULL;
+ 
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = NULL;
+ 
  	/*
  	 * Set up some special context used in compressing data.
  	 */
*************** _CloseArchive(ArchiveHandle *AH)
*** 455,461 ****
  		WriteToc(AH);
  		if (fclose(AH->FH) != 0)
  			die_horribly(AH, modulename, "could not close TOC file: %s\n", strerror(errno));
! 		WriteDataChunks(AH);
  	}
  
  	AH->FH = NULL;
--- 461,467 ----
  		WriteToc(AH);
  		if (fclose(AH->FH) != 0)
  			die_horribly(AH, modulename, "could not close TOC file: %s\n", strerror(errno));
! 		WriteDataChunks(AH, NULL);
  	}
  
  	AH->FH = NULL;
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 39ce417..929083e 100644
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
*************** InitArchiveFmt_Tar(ArchiveHandle *AH)
*** 157,162 ****
--- 157,168 ----
  	AH->ClonePtr = NULL;
  	AH->DeClonePtr = NULL;
  
+ 	AH->MasterStartParallelItemPtr = NULL;
+ 	AH->MasterEndParallelItemPtr = NULL;
+ 
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = NULL;
+ 
  	/*
  	 * Set up some special context used in compressing data.
  	 */
*************** _CloseArchive(ArchiveHandle *AH)
*** 835,841 ****
  		/*
  		 * Now send the data (tables & blobs)
  		 */
! 		WriteDataChunks(AH);
  
  		/*
  		 * Now this format wants to append a script which does a full restore
--- 841,847 ----
  		/*
  		 * Now send the data (tables & blobs)
  		 */
! 		WriteDataChunks(AH, NULL);
  
  		/*
  		 * Now this format wants to append a script which does a full restore
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 13fc667..5a15435 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
*************** static int	disable_dollar_quoting = 0;
*** 140,145 ****
--- 140,146 ----
  static int	dump_inserts = 0;
  static int	column_inserts = 0;
  static int	no_security_labels = 0;
+ static int  no_synchronized_snapshots = 0;
  static int	no_unlogged_table_data = 0;
  static int	serializable_deferrable = 0;
  
*************** static const char *convertTSFunction(Oid
*** 227,235 ****
  static Oid	findLastBuiltinOid_V71(const char *);
  static Oid	findLastBuiltinOid_V70(void);
  static void selectSourceSchema(const char *schemaName);
  static char *getFormattedTypeName(Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
- static const char *fmtQualifiedId(const char *schema, const char *id);
  static void getBlobs(Archive *AH);
  static void dumpBlob(Archive *AH, BlobInfo *binfo);
  static int	dumpBlobs(Archive *AH, void *arg);
--- 228,237 ----
  static Oid	findLastBuiltinOid_V71(const char *);
  static Oid	findLastBuiltinOid_V70(void);
  static void selectSourceSchema(const char *schemaName);
+ static void selectSourceSchemaOnAH(ArchiveHandle *AH, const char *schemaName);
+ static void selectSourceSchemaOnConnection(PGconn *conn, const char *schemaName);
  static char *getFormattedTypeName(Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
  static void getBlobs(Archive *AH);
  static void dumpBlob(Archive *AH, BlobInfo *binfo);
  static int	dumpBlobs(Archive *AH, void *arg);
*************** static void binary_upgrade_extension_mem
*** 246,255 ****
  								DumpableObject *dobj,
  								const char *objlabel);
  static const char *getAttrName(int attrnum, TableInfo *tblInfo);
! static const char *fmtCopyColumnList(const TableInfo *ti);
  static void do_sql_command(PGconn *conn, const char *query);
  static void check_sql_result(PGresult *res, PGconn *conn, const char *query,
  				 ExecStatusType expected);
  
  int
  main(int argc, char **argv)
--- 248,260 ----
  								DumpableObject *dobj,
  								const char *objlabel);
  static const char *getAttrName(int attrnum, TableInfo *tblInfo);
! static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
  static void do_sql_command(PGconn *conn, const char *query);
  static void check_sql_result(PGresult *res, PGconn *conn, const char *query,
  				 ExecStatusType expected);
+ static void SetupConnection(Archive *AHX, const char *dumpencoding,
+ 							const char *use_role);
+ static char *get_synchronized_snapshot(ArchiveHandle *AH);
  
  int
  main(int argc, char **argv)
*************** main(int argc, char **argv)
*** 262,274 ****
  	const char *pgport = NULL;
  	const char *username = NULL;
  	const char *dumpencoding = NULL;
- 	const char *std_strings;
  	bool		oids = false;
  	TableInfo  *tblinfo;
  	int			numTables;
  	DumpableObject **dobjs;
  	int			numObjs;
  	int			i;
  	enum trivalue prompt_password = TRI_DEFAULT;
  	int			compressLevel = -1;
  	int			plainText = 0;
--- 267,279 ----
  	const char *pgport = NULL;
  	const char *username = NULL;
  	const char *dumpencoding = NULL;
  	bool		oids = false;
  	TableInfo  *tblinfo;
  	int			numTables;
  	DumpableObject **dobjs;
  	int			numObjs;
  	int			i;
+ 	int			numWorkers = 1;
  	enum trivalue prompt_password = TRI_DEFAULT;
  	int			compressLevel = -1;
  	int			plainText = 0;
*************** main(int argc, char **argv)
*** 297,302 ****
--- 302,308 ----
  		{"format", required_argument, NULL, 'F'},
  		{"host", required_argument, NULL, 'h'},
  		{"ignore-version", no_argument, NULL, 'i'},
+ 		{"jobs", 1, NULL, 'j'},
  		{"no-reconnect", no_argument, NULL, 'R'},
  		{"oids", no_argument, NULL, 'o'},
  		{"no-owner", no_argument, NULL, 'O'},
*************** main(int argc, char **argv)
*** 336,341 ****
--- 342,348 ----
  		{"serializable-deferrable", no_argument, &serializable_deferrable, 1},
  		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
  		{"no-security-labels", no_argument, &no_security_labels, 1},
+ 		{"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1},
  		{"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1},
  
  		{NULL, 0, NULL, 0}
*************** main(int argc, char **argv)
*** 373,379 ****
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:in:N:oOp:RsS:t:T:U:vwWxZ:",
  							long_options, &optindex)) != -1)
  	{
  		switch (c)
--- 380,386 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:ij:n:N:oOp:RsS:t:T:U:vwWxZ:",
  							long_options, &optindex)) != -1)
  	{
  		switch (c)
*************** main(int argc, char **argv)
*** 414,419 ****
--- 421,430 ----
  				/* ignored, deprecated option */
  				break;
  
+ 			case 'j':			/* number of dump jobs */
+ 				numWorkers = atoi(optarg);
+ 				break;
+ 
  			case 'n':			/* include schema(s) */
  				simple_string_list_append(&schema_include_patterns, optarg);
  				include_everything = false;
*************** main(int argc, char **argv)
*** 575,580 ****
--- 586,612 ----
  			compressLevel = 0;
  	}
  
+ 	/*
+ 	 * On Windows we can only have at most MAXIMUM_WAIT_OBJECTS (= 64 usually)
+ 	 * parallel jobs because that's the maximum limit for the
+ 	 * WaitForMultipleObjects() call.
+ 	 */
+ 	if (numWorkers <= 0
+ #ifdef WIN32
+ 			|| numWorkers > MAXIMUM_WAIT_OBJECTS
+ #endif
+ 		)
+ 	{
+ 		write_msg(NULL, _("%s: invalid number of parallel jobs\n"), progname);
+ 		exit(1);
+ 	}
+ 
+ 	/* Parallel backup only in the directory archive format so far */
+ 	if (archiveFormat != archDirectory && numWorkers > 1) {
+ 		write_msg(NULL, "parallel backup only supported by the directory format\n");
+ 		exit(1);
+ 	}
+ 
  	/* Open the output file */
  	g_fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode);
  
*************** main(int argc, char **argv)
*** 601,606 ****
--- 633,640 ----
  	g_fout->minRemoteVersion = 70000;
  	g_fout->maxRemoteVersion = (my_version / 100) * 100 + 99;
  
+ 	g_fout->numWorkers = numWorkers;
+ 
  	/*
  	 * Open the database using the Archiver, so it knows about it. Errors mean
  	 * death.
*************** main(int argc, char **argv)
*** 608,702 ****
  	g_conn = ConnectDatabase(g_fout, dbname, pghost, pgport,
  							 username, prompt_password);
  
! 	/* Set the client encoding if requested */
! 	if (dumpencoding)
  	{
! 		if (PQsetClientEncoding(g_conn, dumpencoding) < 0)
! 		{
! 			write_msg(NULL, "invalid client encoding \"%s\" specified\n",
! 					  dumpencoding);
! 			exit(1);
! 		}
  	}
  
  	/*
- 	 * Get the active encoding and the standard_conforming_strings setting, so
- 	 * we know how to escape strings.
- 	 */
- 	g_fout->encoding = PQclientEncoding(g_conn);
- 
- 	std_strings = PQparameterStatus(g_conn, "standard_conforming_strings");
- 	g_fout->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
- 
- 	/* Set the role if requested */
- 	if (use_role && g_fout->remoteVersion >= 80100)
- 	{
- 		PQExpBuffer query = createPQExpBuffer();
- 
- 		appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
- 		do_sql_command(g_conn, query->data);
- 		destroyPQExpBuffer(query);
- 	}
- 
- 	/* Set the datestyle to ISO to ensure the dump's portability */
- 	do_sql_command(g_conn, "SET DATESTYLE = ISO");
- 
- 	/* Likewise, avoid using sql_standard intervalstyle */
- 	if (g_fout->remoteVersion >= 80400)
- 		do_sql_command(g_conn, "SET INTERVALSTYLE = POSTGRES");
- 
- 	/*
- 	 * If supported, set extra_float_digits so that we can dump float data
- 	 * exactly (given correctly implemented float I/O code, anyway)
- 	 */
- 	if (g_fout->remoteVersion >= 90000)
- 		do_sql_command(g_conn, "SET extra_float_digits TO 3");
- 	else if (g_fout->remoteVersion >= 70400)
- 		do_sql_command(g_conn, "SET extra_float_digits TO 2");
- 
- 	/*
- 	 * If synchronized scanning is supported, disable it, to prevent
- 	 * unpredictable changes in row ordering across a dump and reload.
- 	 */
- 	if (g_fout->remoteVersion >= 80300)
- 		do_sql_command(g_conn, "SET synchronize_seqscans TO off");
- 
- 	/*
- 	 * Disable timeouts if supported.
- 	 */
- 	if (g_fout->remoteVersion >= 70300)
- 		do_sql_command(g_conn, "SET statement_timeout = 0");
- 
- 	/*
- 	 * Quote all identifiers, if requested.
- 	 */
- 	if (quote_all_identifiers && g_fout->remoteVersion >= 90100)
- 		do_sql_command(g_conn, "SET quote_all_identifiers = true");
- 
- 	/*
  	 * Disable security label support if server version < v9.1.x (prevents
  	 * access to nonexistent pg_seclabel catalog)
  	 */
  	if (g_fout->remoteVersion < 90100)
  		no_security_labels = 1;
  
- 	/*
- 	 * Start transaction-snapshot mode transaction to dump consistent data.
- 	 */
- 	do_sql_command(g_conn, "BEGIN");
- 	if (g_fout->remoteVersion >= 90100)
- 	{
- 		if (serializable_deferrable)
- 			do_sql_command(g_conn,
- 						   "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, "
- 						   "READ ONLY, DEFERRABLE");
- 		else
- 			do_sql_command(g_conn,
- 						   "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
- 	}
- 	else
- 		do_sql_command(g_conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
- 
  	/* Select the appropriate subquery to convert user IDs to names */
  	if (g_fout->remoteVersion >= 80100)
  		username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
--- 642,665 ----
  	g_conn = ConnectDatabase(g_fout, dbname, pghost, pgport,
  							 username, prompt_password);
  
! 	/* Find the last built-in OID, if needed */
! 	if (g_fout->remoteVersion < 70300)
  	{
! 		if (g_fout->remoteVersion >= 70100)
! 			g_last_builtin_oid = findLastBuiltinOid_V71(PQdb(g_conn));
! 		else
! 			g_last_builtin_oid = findLastBuiltinOid_V70();
! 		if (g_verbose)
! 			write_msg(NULL, "last built-in OID is %u\n", g_last_builtin_oid);
  	}
  
  	/*
  	 * Disable security label support if server version < v9.1.x (prevents
  	 * access to nonexistent pg_seclabel catalog)
  	 */
  	if (g_fout->remoteVersion < 90100)
  		no_security_labels = 1;
  
  	/* Select the appropriate subquery to convert user IDs to names */
  	if (g_fout->remoteVersion >= 80100)
  		username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
*************** main(int argc, char **argv)
*** 705,721 ****
  	else
  		username_subquery = "SELECT usename FROM pg_user WHERE usesysid =";
  
! 	/* Find the last built-in OID, if needed */
! 	if (g_fout->remoteVersion < 70300)
  	{
! 		if (g_fout->remoteVersion >= 70100)
! 			g_last_builtin_oid = findLastBuiltinOid_V71(PQdb(g_conn));
! 		else
! 			g_last_builtin_oid = findLastBuiltinOid_V70();
! 		if (g_verbose)
! 			write_msg(NULL, "last built-in OID is %u\n", g_last_builtin_oid);
  	}
  
  	/* Expand schema selection patterns into OID lists */
  	if (schema_include_patterns.head != NULL)
  	{
--- 668,687 ----
  	else
  		username_subquery = "SELECT usename FROM pg_user WHERE usesysid =";
  
! 	if (numWorkers > 1)
  	{
! 		/* check the version for the synchronized snapshots feature */
! 		if (g_fout->remoteVersion < 90200 && !no_synchronized_snapshots)
! 		{
! 			write_msg(NULL, "No synchronized snapshots available in this version\n"
! 						 "You might have to run with --no-synchronized-snapshots\n");
! 			exit(1);
! 		} else if (g_fout->remoteVersion >= 90200 && no_synchronized_snapshots)
! 			write_msg(NULL, "Ignoring --no-synchronized-snapshots\n");
  	}
  
+ 	SetupConnection(g_fout, dumpencoding, use_role);
+ 
  	/* Expand schema selection patterns into OID lists */
  	if (schema_include_patterns.head != NULL)
  	{
*************** main(int argc, char **argv)
*** 797,802 ****
--- 763,772 ----
  	else
  		sortDumpableObjectsByTypeOid(dobjs, numObjs);
  
+ 	/* If we do a parallel dump, we want the largest tables to go first */
+ 	if (archiveFormat == archDirectory && numWorkers > 1)
+ 		sortDataAndIndexObjectsBySize(dobjs, numObjs);
+ 
  	sortDumpableObjects(dobjs, numObjs);
  
  	/*
*************** help(const char *progname)
*** 862,867 ****
--- 832,838 ----
  	printf(_("  -f, --file=FILENAME         output file or directory name\n"));
  	printf(_("  -F, --format=c|d|t|p        output file format (custom, directory, tar,\n"
  			 "                              plain text (default))\n"));
+ 	printf(_("  -j, --jobs=NUM              use this many parallel jobs to dump\n"));
  	printf(_("  -v, --verbose               verbose mode\n"));
  	printf(_("  -Z, --compress=0-9          compression level for compressed formats\n"));
  	printf(_("  --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
*************** help(const char *progname)
*** 891,896 ****
--- 862,868 ----
  	printf(_("  --exclude-table-data=TABLE  do NOT dump data for the named table(s)\n"));
  	printf(_("  --inserts                   dump data as INSERT commands, rather than COPY\n"));
  	printf(_("  --no-security-labels        do not dump security label assignments\n"));
+ 	printf(_("  --no-synchronized-snapshots parallel processes should not use synchronized snapshots\n"));
  	printf(_("  --no-tablespaces            do not dump tablespace assignments\n"));
  	printf(_("  --no-unlogged-table-data    do not dump unlogged table data\n"));
  	printf(_("  --quote-all-identifiers     quote all identifiers, even if not key words\n"));
*************** exit_nicely(void)
*** 922,927 ****
--- 894,1066 ----
  	exit(1);
  }
  
+ /*
+  * Initialize the connection for a new worker process.
+  */
+ void
+ _SetupWorker(Archive *AHX, RestoreOptions *ropt)
+ {
+ 	SetupConnection(AHX, NULL, NULL);
+ }
+ 
+ static void
+ SetupConnection(Archive *AHX, const char *dumpencoding, const char *use_role)
+ {
+ 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ 	const char *std_strings;
+ 	PGconn *conn = AH->connection;
+ 
+ 	/*
+ 	 * Set the client encoding if requested. If dumpencoding == NULL then
+ 	 * either it hasn't been requested or we're a cloned connection and then this
+ 	 * has already been set in CloneArchive according to the original
+ 	 * connection encoding.
+ 	 */
+ 	if (dumpencoding)
+ 	{
+ 		if (PQsetClientEncoding(AH->connection, dumpencoding) < 0)
+ 		{
+ 			write_msg(NULL, "invalid client encoding \"%s\" specified\n",
+ 					  dumpencoding);
+ 			exit(1);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Get the active encoding and the standard_conforming_strings setting, so
+ 	 * we know how to escape strings.
+ 	 */
+ 	AHX->encoding = PQclientEncoding(conn);
+ 
+ 	std_strings = PQparameterStatus(conn, "standard_conforming_strings");
+ 	AHX->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
+ 
+ 	/* Set the role if requested */
+ 	if (!use_role && AH->use_role)
+ 		use_role = AH->use_role;
+ 
+ 	if (use_role && AHX->remoteVersion >= 80100)
+ 	{
+ 		PQExpBuffer query = createPQExpBuffer();
+ 
+ 		appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
+ 		do_sql_command(conn, query->data);
+ 		destroyPQExpBuffer(query);
+ 
+ 		/* save this for later use on parallel connections */
+ 		if (!AH->use_role)
+ 			AH->use_role = strdup(use_role);
+ 	}
+ 
+ 	/* Set the datestyle to ISO to ensure the dump's portability */
+ 	do_sql_command(conn, "SET DATESTYLE = ISO");
+ 
+ 	/* Likewise, avoid using sql_standard intervalstyle */
+ 	if (AHX->remoteVersion >= 80400)
+ 		do_sql_command(conn, "SET INTERVALSTYLE = POSTGRES");
+ 
+ 	/*
+ 	 * If supported, set extra_float_digits so that we can dump float data
+ 	 * exactly (given correctly implemented float I/O code, anyway)
+ 	 */
+ 	if (AHX->remoteVersion >= 80500)
+ 		do_sql_command(conn, "SET extra_float_digits TO 3");
+ 	else if (AHX->remoteVersion >= 70400)
+ 		do_sql_command(conn, "SET extra_float_digits TO 2");
+ 
+ 	/*
+ 	 * If synchronized scanning is supported, disable it, to prevent
+ 	 * unpredictable changes in row ordering across a dump and reload.
+ 	 */
+ 	if (AHX->remoteVersion >= 80300)
+ 		do_sql_command(conn, "SET synchronize_seqscans TO off");
+ 
+ 	/*
+ 	 * Quote all identifiers, if requested.
+ 	 */
+ 	if (quote_all_identifiers && AHX->remoteVersion >= 90100)
+ 		do_sql_command(conn, "SET quote_all_identifiers = true");
+ 
+ 	/*
+ 	 * Disable timeouts if supported.
+ 	 */
+ 	if (AHX->remoteVersion >= 70300)
+ 		do_sql_command(conn, "SET statement_timeout = 0");
+ 
+ 	/*
+ 	 * Quote all identifiers, if requested.
+ 	 */
+ 	if (quote_all_identifiers && AHX->remoteVersion >= 90100)
+ 		do_sql_command(conn, "SET quote_all_identifiers = true");
+ 
+ 	/*
+ 	 * Start transaction-snapshot mode transaction to dump consistent data.
+ 	 */
+ 	do_sql_command(conn, "BEGIN");
+ 	if (AHX->remoteVersion >= 90100)
+ 	{
+ 		if (serializable_deferrable)
+ 			do_sql_command(conn,
+ 						   "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, "
+ 						   "READ ONLY, DEFERRABLE");
+ 		else
+ 			do_sql_command(conn,
+ 						   "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ 	}
+ 	else
+ 		do_sql_command(conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+ 
+ 	if (AHX->numWorkers > 1 && AHX->remoteVersion >= 90200)
+ 	{
+ 		if (AH->is_clone)
+ 		{
+ 			PQExpBuffer query = createPQExpBuffer();
+ 			appendPQExpBuffer(query, "SET TRANSACTION SNAPSHOT ");
+ 			appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
+ 			destroyPQExpBuffer(query);
+ 		}
+ 		else {
+ 			/*
+ 			 * If the version is lower and we don't have synchronized snapshots
+ 			 * yet, we will error out earlier already. So either we have the
+ 			 * feature or the user has given the explicit command not to use it.
+ 			 * Note: If we have it, we always use it, you cannot switch it off
+ 			 * then.
+ 			 */
+ 			if (AHX->remoteVersion >= 90200)
+ 				AH->sync_snapshot_id = get_synchronized_snapshot(AH);
+ 		}
+ 	}
+ }
+ 
+ static char*
+ get_synchronized_snapshot(ArchiveHandle *AH)
+ {
+ 	const char *query = "select pg_export_snapshot()";
+ 	char	   *result;
+ 	int			ntups;
+ 	PGconn	   *conn = AH->connection;
+ 	PGresult   *res = PQexec(conn, query);
+ 
+ 	check_sql_result(res, conn, query, PGRES_TUPLES_OK);
+ 
+ 	/* Expecting a single result only */
+ 	ntups = PQntuples(res);
+ 	if (ntups != 1)
+ 	{
+ 		write_msg(NULL, ngettext("query returned %d row instead of one: %s\n",
+ 							   "query returned %d rows instead of one: %s\n",
+ 								 ntups),
+ 				  ntups, query);
+ 		exit_nicely();
+ 	}
+ 
+ 	result = strdup(PQgetvalue(res, 0, 0));
+ 	PQclear(res);
+ 
+ 	return result;
+ }
+ 
  static ArchiveFormat
  parseArchiveFormat(const char *format, ArchiveMode *mode)
  {
*************** selectDumpableObject(DumpableObject *dob
*** 1252,1263 ****
--- 1391,1412 ----
  static int
  dumpTableData_copy(Archive *fout, void *dcontext)
  {
+ 	/*
+ 	 * This is a data dumper routine, executed in a child for parallel backup, so
+ 	 * it must not access the global g_conn but AH->connection instead.
+ 	 */
+ 	ArchiveHandle *AH = (ArchiveHandle *) fout;
  	TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
  	TableInfo  *tbinfo = tdinfo->tdtable;
  	const char *classname = tbinfo->dobj.name;
  	const bool	hasoids = tbinfo->hasoids;
  	const bool	oids = tdinfo->oids;
  	PQExpBuffer q = createPQExpBuffer();
+ 	/*
+ 	 * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId which
+ 	 * uses it already.
+ 	 */
+ 	PQExpBuffer clistBuf = createPQExpBuffer();
  	PGresult   *res;
  	int			ret;
  	char	   *copybuf;
*************** dumpTableData_copy(Archive *fout, void *
*** 1272,1278 ****
  	 * this ensures reproducible results in case the table contains regproc,
  	 * regclass, etc columns.
  	 */
! 	selectSourceSchema(tbinfo->dobj.namespace->dobj.name);
  
  	/*
  	 * If possible, specify the column list explicitly so that we have no
--- 1421,1427 ----
  	 * this ensures reproducible results in case the table contains regproc,
  	 * regclass, etc columns.
  	 */
! 	selectSourceSchemaOnAH(AH, tbinfo->dobj.namespace->dobj.name);
  
  	/*
  	 * If possible, specify the column list explicitly so that we have no
*************** dumpTableData_copy(Archive *fout, void *
*** 1280,1287 ****
  	 * column ordering of COPY will not be what we want in certain corner
  	 * cases involving ADD COLUMN and inheritance.)
  	 */
! 	if (g_fout->remoteVersion >= 70300)
! 		column_list = fmtCopyColumnList(tbinfo);
  	else
  		column_list = "";		/* can't select columns in COPY */
  
--- 1429,1436 ----
  	 * column ordering of COPY will not be what we want in certain corner
  	 * cases involving ADD COLUMN and inheritance.)
  	 */
! 	if (AH->public.remoteVersion >= 70300)
! 		column_list = fmtCopyColumnList(tbinfo, clistBuf);
  	else
  		column_list = "";		/* can't select columns in COPY */
  
*************** dumpTableData_copy(Archive *fout, void *
*** 1289,1295 ****
  	{
  		appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname),
  						  column_list);
  	}
  	else if (tdinfo->filtercond)
--- 1438,1445 ----
  	{
  		appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname,
! 										 AH->public.remoteVersion),
  						  column_list);
  	}
  	else if (tdinfo->filtercond)
*************** dumpTableData_copy(Archive *fout, void *
*** 1306,1328 ****
  			appendPQExpBufferStr(q, "* ");
  		appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname),
  						  tdinfo->filtercond);
  	}
  	else
  	{
  		appendPQExpBuffer(q, "COPY %s %s TO stdout;",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname),
  						  column_list);
  	}
! 	res = PQexec(g_conn, q->data);
! 	check_sql_result(res, g_conn, q->data, PGRES_COPY_OUT);
  	PQclear(res);
  
  	for (;;)
  	{
! 		ret = PQgetCopyData(g_conn, &copybuf, 0);
  
  		if (ret < 0)
  			break;				/* done or error */
--- 1456,1481 ----
  			appendPQExpBufferStr(q, "* ");
  		appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname,
! 										 AH->public.remoteVersion),
  						  tdinfo->filtercond);
  	}
  	else
  	{
  		appendPQExpBuffer(q, "COPY %s %s TO stdout;",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname,
! 										 AH->public.remoteVersion),
  						  column_list);
  	}
! 	res = PQexec(AH->connection, q->data);
! 	check_sql_result(res, AH->connection, q->data, PGRES_COPY_OUT);
  	PQclear(res);
+ 	destroyPQExpBuffer(clistBuf);
  
  	for (;;)
  	{
! 		ret = PQgetCopyData(AH->connection, &copybuf, 0);
  
  		if (ret < 0)
  			break;				/* done or error */
*************** dumpTableData_copy(Archive *fout, void *
*** 1385,1398 ****
  	{
  		/* copy data transfer failed */
  		write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.\n", classname);
! 		write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn));
  		write_msg(NULL, "The command was: %s\n", q->data);
  		exit_nicely();
  	}
  
  	/* Check command status and return to normal libpq state */
! 	res = PQgetResult(g_conn);
! 	check_sql_result(res, g_conn, q->data, PGRES_COMMAND_OK);
  	PQclear(res);
  
  	destroyPQExpBuffer(q);
--- 1538,1551 ----
  	{
  		/* copy data transfer failed */
  		write_msg(NULL, "Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.\n", classname);
! 		write_msg(NULL, "Error message from server: %s", PQerrorMessage(AH->connection));
  		write_msg(NULL, "The command was: %s\n", q->data);
  		exit_nicely();
  	}
  
  	/* Check command status and return to normal libpq state */
! 	res = PQgetResult(AH->connection);
! 	check_sql_result(res, AH->connection, q->data, PGRES_COMMAND_OK);
  	PQclear(res);
  
  	destroyPQExpBuffer(q);
*************** dumpTableData_copy(Archive *fout, void *
*** 1410,1415 ****
--- 1563,1573 ----
  static int
  dumpTableData_insert(Archive *fout, void *dcontext)
  {
+ 	/*
+ 	 * This is a data dumper routine, executed in a child for parallel backup, so
+ 	 * it must not access the global g_conn but AH->connection instead.
+ 	 */
+ 	ArchiveHandle *AH = (ArchiveHandle *) fout;
  	TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
  	TableInfo  *tbinfo = tdinfo->tdtable;
  	const char *classname = tbinfo->dobj.name;
*************** dumpTableData_insert(Archive *fout, void
*** 1425,1458 ****
  	 * this ensures reproducible results in case the table contains regproc,
  	 * regclass, etc columns.
  	 */
! 	selectSourceSchema(tbinfo->dobj.namespace->dobj.name);
  
  	if (fout->remoteVersion >= 70100)
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM ONLY %s",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname));
  	}
  	else
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM %s",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname));
  	}
  	if (tdinfo->filtercond)
  		appendPQExpBuffer(q, " %s", tdinfo->filtercond);
  
! 	res = PQexec(g_conn, q->data);
! 	check_sql_result(res, g_conn, q->data, PGRES_COMMAND_OK);
  
  	do
  	{
  		PQclear(res);
  
! 		res = PQexec(g_conn, "FETCH 100 FROM _pg_dump_cursor");
! 		check_sql_result(res, g_conn, "FETCH 100 FROM _pg_dump_cursor",
  						 PGRES_TUPLES_OK);
  		nfields = PQnfields(res);
  		for (tuple = 0; tuple < PQntuples(res); tuple++)
--- 1583,1618 ----
  	 * this ensures reproducible results in case the table contains regproc,
  	 * regclass, etc columns.
  	 */
! 	selectSourceSchemaOnAH(AH, tbinfo->dobj.namespace->dobj.name);
  
  	if (fout->remoteVersion >= 70100)
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM ONLY %s",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname,
! 										 AH->public.remoteVersion));
  	}
  	else
  	{
  		appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
  						  "SELECT * FROM %s",
  						  fmtQualifiedId(tbinfo->dobj.namespace->dobj.name,
! 										 classname,
! 										 AH->public.remoteVersion));
  	}
  	if (tdinfo->filtercond)
  		appendPQExpBuffer(q, " %s", tdinfo->filtercond);
  
! 	res = PQexec(AH->connection, q->data);
! 	check_sql_result(res, AH->connection, q->data, PGRES_COMMAND_OK);
  
  	do
  	{
  		PQclear(res);
  
! 		res = PQexec(AH->connection, "FETCH 100 FROM _pg_dump_cursor");
! 		check_sql_result(res, AH->connection, "FETCH 100 FROM _pg_dump_cursor",
  						 PGRES_TUPLES_OK);
  		nfields = PQnfields(res);
  		for (tuple = 0; tuple < PQntuples(res); tuple++)
*************** dumpTableData_insert(Archive *fout, void
*** 1550,1556 ****
  
  	archprintf(fout, "\n\n");
  
! 	do_sql_command(g_conn, "CLOSE _pg_dump_cursor");
  
  	destroyPQExpBuffer(q);
  	return 1;
--- 1710,1716 ----
  
  	archprintf(fout, "\n\n");
  
! 	do_sql_command(AH->connection, "CLOSE _pg_dump_cursor");
  
  	destroyPQExpBuffer(q);
  	return 1;
*************** dumpTableData(Archive *fout, TableDataIn
*** 1568,1573 ****
--- 1728,1734 ----
  {
  	TableInfo  *tbinfo = tdinfo->tdtable;
  	PQExpBuffer copyBuf = createPQExpBuffer();
+ 	PQExpBuffer clistBuf = createPQExpBuffer();
  	DataDumperPtr dumpFn;
  	char	   *copyStmt;
  
*************** dumpTableData(Archive *fout, TableDataIn
*** 1583,1589 ****
  		appendPQExpBuffer(copyBuf, "COPY %s ",
  						  fmtId(tbinfo->dobj.name));
  		appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n",
! 						  fmtCopyColumnList(tbinfo),
  					  (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : "");
  		copyStmt = copyBuf->data;
  	}
--- 1744,1750 ----
  		appendPQExpBuffer(copyBuf, "COPY %s ",
  						  fmtId(tbinfo->dobj.name));
  		appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n",
! 						  fmtCopyColumnList(tbinfo, clistBuf),
  					  (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : "");
  		copyStmt = copyBuf->data;
  	}
*************** dumpTableData(Archive *fout, TableDataIn
*** 1596,1608 ****
  
  	ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
  				 tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
! 				 NULL, tbinfo->rolname,
  				 false, "TABLE DATA", SECTION_DATA,
  				 "", "", copyStmt,
  				 tdinfo->dobj.dependencies, tdinfo->dobj.nDeps,
  				 dumpFn, tdinfo);
  
  	destroyPQExpBuffer(copyBuf);
  }
  
  /*
--- 1757,1770 ----
  
  	ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
  				 tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
! 				 NULL, tbinfo->rolname, tbinfo->relpages,
  				 false, "TABLE DATA", SECTION_DATA,
  				 "", "", copyStmt,
  				 tdinfo->dobj.dependencies, tdinfo->dobj.nDeps,
  				 dumpFn, tdinfo);
  
  	destroyPQExpBuffer(copyBuf);
+ 	destroyPQExpBuffer(clistBuf);
  }
  
  /*
*************** dumpDatabase(Archive *AH)
*** 1979,1984 ****
--- 2141,2147 ----
  				 NULL,			/* Namespace */
  				 NULL,			/* Tablespace */
  				 dba,			/* Owner */
+ 				 0,				/* relpages */
  				 false,			/* with oids */
  				 "DATABASE",	/* Desc */
  				 SECTION_PRE_DATA,		/* Section */
*************** dumpDatabase(Archive *AH)
*** 2027,2033 ****
  						  atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
  						  LargeObjectRelationId);
  		ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 					 "pg_largeobject", NULL, NULL, "",
  					 false, "pg_largeobject", SECTION_PRE_DATA,
  					 loOutQry->data, "", NULL,
  					 NULL, 0,
--- 2190,2196 ----
  						  atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
  						  LargeObjectRelationId);
  		ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 					 "pg_largeobject", NULL, NULL, "", 0,
  					 false, "pg_largeobject", SECTION_PRE_DATA,
  					 loOutQry->data, "", NULL,
  					 NULL, 0,
*************** dumpDatabase(Archive *AH)
*** 2066,2072 ****
  							  atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
  							  LargeObjectMetadataRelationId);
  			ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 						 "pg_largeobject_metadata", NULL, NULL, "",
  						 false, "pg_largeobject_metadata", SECTION_PRE_DATA,
  						 loOutQry->data, "", NULL,
  						 NULL, 0,
--- 2229,2235 ----
  							  atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
  							  LargeObjectMetadataRelationId);
  			ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 						 "pg_largeobject_metadata", NULL, NULL, "", 0,
  						 false, "pg_largeobject_metadata", SECTION_PRE_DATA,
  						 loOutQry->data, "", NULL,
  						 NULL, 0,
*************** dumpDatabase(Archive *AH)
*** 2101,2107 ****
  			appendPQExpBuffer(dbQry, ";\n");
  
  			ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL,
! 						 dba, false, "COMMENT", SECTION_NONE,
  						 dbQry->data, "", NULL,
  						 &dbDumpId, 1, NULL, NULL);
  		}
--- 2264,2270 ----
  			appendPQExpBuffer(dbQry, ";\n");
  
  			ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL,
! 						 dba, 0, false, "COMMENT", SECTION_NONE,
  						 dbQry->data, "", NULL,
  						 &dbDumpId, 1, NULL, NULL);
  		}
*************** dumpDatabase(Archive *AH)
*** 2128,2134 ****
  		emitShSecLabels(g_conn, res, seclabelQry, "DATABASE", datname);
  		if (strlen(seclabelQry->data))
  			ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL,
! 						 dba, false, "SECURITY LABEL", SECTION_NONE,
  						 seclabelQry->data, "", NULL,
  						 &dbDumpId, 1, NULL, NULL);
  		destroyPQExpBuffer(seclabelQry);
--- 2291,2297 ----
  		emitShSecLabels(g_conn, res, seclabelQry, "DATABASE", datname);
  		if (strlen(seclabelQry->data))
  			ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL,
! 						 dba, 0, false, "SECURITY LABEL", SECTION_NONE,
  						 seclabelQry->data, "", NULL,
  						 &dbDumpId, 1, NULL, NULL);
  		destroyPQExpBuffer(seclabelQry);
*************** dumpEncoding(Archive *AH)
*** 2157,2163 ****
  	appendPQExpBuffer(qry, ";\n");
  
  	ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 				 "ENCODING", NULL, NULL, "",
  				 false, "ENCODING", SECTION_PRE_DATA,
  				 qry->data, "", NULL,
  				 NULL, 0,
--- 2320,2326 ----
  	appendPQExpBuffer(qry, ";\n");
  
  	ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 				 "ENCODING", NULL, NULL, "", 0,
  				 false, "ENCODING", SECTION_PRE_DATA,
  				 qry->data, "", NULL,
  				 NULL, 0,
*************** dumpStdStrings(Archive *AH)
*** 2184,2190 ****
  					  stdstrings);
  
  	ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 				 "STDSTRINGS", NULL, NULL, "",
  				 false, "STDSTRINGS", SECTION_PRE_DATA,
  				 qry->data, "", NULL,
  				 NULL, 0,
--- 2347,2353 ----
  					  stdstrings);
  
  	ArchiveEntry(AH, nilCatalogId, createDumpId(),
! 				 "STDSTRINGS", NULL, NULL, "", 0,
  				 false, "STDSTRINGS", SECTION_PRE_DATA,
  				 qry->data, "", NULL,
  				 NULL, 0,
*************** dumpBlob(Archive *AH, BlobInfo *binfo)
*** 2296,2302 ****
  	ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
  				 binfo->dobj.name,
  				 NULL, NULL,
! 				 binfo->rolname, false,
  				 "BLOB", SECTION_PRE_DATA,
  				 cquery->data, dquery->data, NULL,
  				 binfo->dobj.dependencies, binfo->dobj.nDeps,
--- 2459,2465 ----
  	ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
  				 binfo->dobj.name,
  				 NULL, NULL,
! 				 binfo->rolname, 0, false,
  				 "BLOB", SECTION_PRE_DATA,
  				 cquery->data, dquery->data, NULL,
  				 binfo->dobj.dependencies, binfo->dobj.nDeps,
*************** dumpBlob(Archive *AH, BlobInfo *binfo)
*** 2331,2338 ****
   *	dump the data contents of all large objects
   */
  static int
! dumpBlobs(Archive *AH, void *arg)
  {
  	const char *blobQry;
  	const char *blobFetchQry;
  	PGresult   *res;
--- 2494,2506 ----
   *	dump the data contents of all large objects
   */
  static int
! dumpBlobs(Archive *AHX, void *arg)
  {
+ 	/*
+ 	 * This is a data dumper routine, executed in a child for parallel backup,
+ 	 * so it must not access the global g_conn but AH->connection instead.
+ 	 */
+ 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
  	const char *blobQry;
  	const char *blobFetchQry;
  	PGresult   *res;
*************** dumpBlobs(Archive *AH, void *arg)
*** 2345,2365 ****
  		write_msg(NULL, "saving large objects\n");
  
  	/* Make sure we are in proper schema */
! 	selectSourceSchema("pg_catalog");
  
  	/*
  	 * Currently, we re-fetch all BLOB OIDs using a cursor.  Consider scanning
  	 * the already-in-memory dumpable objects instead...
  	 */
! 	if (AH->remoteVersion >= 90000)
  		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
! 	else if (AH->remoteVersion >= 70100)
  		blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject";
  	else
  		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'";
  
! 	res = PQexec(g_conn, blobQry);
! 	check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
  
  	/* Command to fetch from cursor */
  	blobFetchQry = "FETCH 1000 IN bloboid";
--- 2513,2533 ----
  		write_msg(NULL, "saving large objects\n");
  
  	/* Make sure we are in proper schema */
! 	selectSourceSchemaOnAH(AH, "pg_catalog");
  
  	/*
  	 * Currently, we re-fetch all BLOB OIDs using a cursor.  Consider scanning
  	 * the already-in-memory dumpable objects instead...
  	 */
! 	if (AH->public.remoteVersion >= 90000)
  		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
! 	else if (AH->public.remoteVersion >= 70100)
  		blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject";
  	else
  		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'";
  
! 	res = PQexec(AH->connection, blobQry);
! 	check_sql_result(res, AH->connection, blobQry, PGRES_COMMAND_OK);
  
  	/* Command to fetch from cursor */
  	blobFetchQry = "FETCH 1000 IN bloboid";
*************** dumpBlobs(Archive *AH, void *arg)
*** 2369,2376 ****
  		PQclear(res);
  
  		/* Do a fetch */
! 		res = PQexec(g_conn, blobFetchQry);
! 		check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
  
  		/* Process the tuples, if any */
  		ntups = PQntuples(res);
--- 2537,2544 ----
  		PQclear(res);
  
  		/* Do a fetch */
! 		res = PQexec(AH->connection, blobFetchQry);
! 		check_sql_result(res, AH->connection, blobFetchQry, PGRES_TUPLES_OK);
  
  		/* Process the tuples, if any */
  		ntups = PQntuples(res);
*************** dumpBlobs(Archive *AH, void *arg)
*** 2381,2413 ****
  
  			blobOid = atooid(PQgetvalue(res, i, 0));
  			/* Open the BLOB */
! 			loFd = lo_open(g_conn, blobOid, INV_READ);
  			if (loFd == -1)
  			{
  				write_msg(NULL, "could not open large object %u: %s",
! 						  blobOid, PQerrorMessage(g_conn));
  				exit_nicely();
  			}
  
! 			StartBlob(AH, blobOid);
  
  			/* Now read it in chunks, sending data to archive */
  			do
  			{
! 				cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
  				if (cnt < 0)
  				{
  					write_msg(NULL, "error reading large object %u: %s",
! 							  blobOid, PQerrorMessage(g_conn));
  					exit_nicely();
  				}
  
! 				WriteData(AH, buf, cnt);
  			} while (cnt > 0);
  
! 			lo_close(g_conn, loFd);
  
! 			EndBlob(AH, blobOid);
  		}
  	} while (ntups > 0);
  
--- 2549,2583 ----
  
  			blobOid = atooid(PQgetvalue(res, i, 0));
  			/* Open the BLOB */
! 			loFd = lo_open(AH->connection, blobOid, INV_READ);
  			if (loFd == -1)
  			{
  				write_msg(NULL, "could not open large object %u: %s",
! 						  blobOid, PQerrorMessage(AH->connection));
  				exit_nicely();
  			}
  
! 			StartBlob(AHX, blobOid);
  
  			/* Now read it in chunks, sending data to archive */
  			do
  			{
! 				cnt = lo_read(AH->connection, loFd, buf, LOBBUFSIZE);
  				if (cnt < 0)
  				{
  					write_msg(NULL, "error reading large object %u: %s",
! 							  blobOid, PQerrorMessage(AH->connection));
  					exit_nicely();
  				}
  
! 				/* we try to avoid writing empty chunks */
! 				if (cnt > 0)
! 					WriteData(AHX, buf, cnt);
  			} while (cnt > 0);
  
! 			lo_close(AH->connection, loFd);
  
! 			EndBlob(AHX, blobOid);
  		}
  	} while (ntups > 0);
  
*************** getTables(int *numTables)
*** 3929,3934 ****
--- 4099,4105 ----
  	int			i_reloptions;
  	int			i_toastreloptions;
  	int			i_reloftype;
+ 	int			i_relpages;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema("pg_catalog");
*************** getTables(int *numTables)
*** 3968,3973 ****
--- 4139,4145 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "c.relpersistence, "
+ 						  "c.relpages, "
  						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 4004,4009 ****
--- 4176,4182 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "c.relpages, "
  						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 4039,4044 ****
--- 4212,4218 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "c.relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 4074,4079 ****
--- 4248,4254 ----
  						  "c.relfrozenxid, tc.oid AS toid, "
  						  "tc.relfrozenxid AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "c.relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 4110,4115 ****
--- 4285,4291 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 4145,4150 ****
--- 4321,4327 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "d.refobjid AS owning_tab, "
  						  "d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 4176,4181 ****
--- 4353,4359 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "NULL::oid AS owning_tab, "
  						  "NULL::int4 AS owning_col, "
*************** getTables(int *numTables)
*** 4202,4207 ****
--- 4380,4386 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "relpages, "
  						  "NULL AS reloftype, "
  						  "NULL::oid AS owning_tab, "
  						  "NULL::int4 AS owning_col, "
*************** getTables(int *numTables)
*** 4238,4243 ****
--- 4417,4423 ----
  						  "0 AS toid, "
  						  "0 AS tfrozenxid, "
  						  "'p' AS relpersistence, "
+ 						  "0 AS relpages, "
  						  "NULL AS reloftype, "
  						  "NULL::oid AS owning_tab, "
  						  "NULL::int4 AS owning_col, "
*************** getTables(int *numTables)
*** 4292,4297 ****
--- 4472,4478 ----
  	i_reloptions = PQfnumber(res, "reloptions");
  	i_toastreloptions = PQfnumber(res, "toast_reloptions");
  	i_reloftype = PQfnumber(res, "reloftype");
+ 	i_relpages = PQfnumber(res, "relpages");
  
  	if (lockWaitTimeout && g_fout->remoteVersion >= 70300)
  	{
*************** getTables(int *numTables)
*** 4346,4351 ****
--- 4527,4533 ----
  		tblinfo[i].reltablespace = pg_strdup(PQgetvalue(res, i, i_reltablespace));
  		tblinfo[i].reloptions = pg_strdup(PQgetvalue(res, i, i_reloptions));
  		tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions));
+ 		tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages));
  
  		/* other fields were zeroed above */
  
*************** getTables(int *numTables)
*** 4375,4381 ****
  			appendPQExpBuffer(query,
  							  "LOCK TABLE %s IN ACCESS SHARE MODE",
  						 fmtQualifiedId(tblinfo[i].dobj.namespace->dobj.name,
! 										tblinfo[i].dobj.name));
  			do_sql_command(g_conn, query->data);
  		}
  
--- 4557,4564 ----
  			appendPQExpBuffer(query,
  							  "LOCK TABLE %s IN ACCESS SHARE MODE",
  						 fmtQualifiedId(tblinfo[i].dobj.namespace->dobj.name,
! 										tblinfo[i].dobj.name,
! 										g_fout->remoteVersion));
  			do_sql_command(g_conn, query->data);
  		}
  
*************** dumpComment(Archive *fout, const char *t
*** 6874,6880 ****
  		 * post-data.
  		 */
  		ArchiveEntry(fout, nilCatalogId, createDumpId(),
! 					 target, namespace, NULL, owner,
  					 false, "COMMENT", SECTION_NONE,
  					 query->data, "", NULL,
  					 &(dumpId), 1,
--- 7057,7063 ----
  		 * post-data.
  		 */
  		ArchiveEntry(fout, nilCatalogId, createDumpId(),
! 					 target, namespace, NULL, owner, 0,
  					 false, "COMMENT", SECTION_NONE,
  					 query->data, "", NULL,
  					 &(dumpId), 1,
*************** dumpTableComment(Archive *fout, TableInf
*** 6935,6941 ****
  			ArchiveEntry(fout, nilCatalogId, createDumpId(),
  						 target->data,
  						 tbinfo->dobj.namespace->dobj.name,
! 						 NULL, tbinfo->rolname,
  						 false, "COMMENT", SECTION_NONE,
  						 query->data, "", NULL,
  						 &(tbinfo->dobj.dumpId), 1,
--- 7118,7124 ----
  			ArchiveEntry(fout, nilCatalogId, createDumpId(),
  						 target->data,
  						 tbinfo->dobj.namespace->dobj.name,
! 						 NULL, tbinfo->rolname, 0,
  						 false, "COMMENT", SECTION_NONE,
  						 query->data, "", NULL,
  						 &(tbinfo->dobj.dumpId), 1,
*************** dumpTableComment(Archive *fout, TableInf
*** 6957,6963 ****
  			ArchiveEntry(fout, nilCatalogId, createDumpId(),
  						 target->data,
  						 tbinfo->dobj.namespace->dobj.name,
! 						 NULL, tbinfo->rolname,
  						 false, "COMMENT", SECTION_NONE,
  						 query->data, "", NULL,
  						 &(tbinfo->dobj.dumpId), 1,
--- 7140,7146 ----
  			ArchiveEntry(fout, nilCatalogId, createDumpId(),
  						 target->data,
  						 tbinfo->dobj.namespace->dobj.name,
! 						 NULL, tbinfo->rolname, 0,
  						 false, "COMMENT", SECTION_NONE,
  						 query->data, "", NULL,
  						 &(tbinfo->dobj.dumpId), 1,
*************** dumpDumpableObject(Archive *fout, Dumpab
*** 7265,7271 ****
  			break;
  		case DO_BLOB_DATA:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL, "",
  						 false, "BLOBS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
--- 7448,7454 ----
  			break;
  		case DO_BLOB_DATA:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL, "", 0,
  						 false, "BLOBS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
*************** dumpNamespace(Archive *fout, NamespaceIn
*** 7312,7318 ****
  	ArchiveEntry(fout, nspinfo->dobj.catId, nspinfo->dobj.dumpId,
  				 nspinfo->dobj.name,
  				 NULL, NULL,
! 				 nspinfo->rolname,
  				 false, "SCHEMA", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 nspinfo->dobj.dependencies, nspinfo->dobj.nDeps,
--- 7495,7501 ----
  	ArchiveEntry(fout, nspinfo->dobj.catId, nspinfo->dobj.dumpId,
  				 nspinfo->dobj.name,
  				 NULL, NULL,
! 				 nspinfo->rolname, 0,
  				 false, "SCHEMA", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 nspinfo->dobj.dependencies, nspinfo->dobj.nDeps,
*************** dumpExtension(Archive *fout, ExtensionIn
*** 7430,7436 ****
  	ArchiveEntry(fout, extinfo->dobj.catId, extinfo->dobj.dumpId,
  				 extinfo->dobj.name,
  				 NULL, NULL,
! 				 "",
  				 false, "EXTENSION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 extinfo->dobj.dependencies, extinfo->dobj.nDeps,
--- 7613,7619 ----
  	ArchiveEntry(fout, extinfo->dobj.catId, extinfo->dobj.dumpId,
  				 extinfo->dobj.name,
  				 NULL, NULL,
! 				 "", 0,
  				 false, "EXTENSION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 extinfo->dobj.dependencies, extinfo->dobj.nDeps,
*************** dumpEnumType(Archive *fout, TypeInfo *ty
*** 7578,7584 ****
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 7761,7767 ----
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, 0, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpRangeType(Archive *fout, TypeInfo *t
*** 7709,7715 ****
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 7892,7898 ----
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, 0, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpBaseType(Archive *fout, TypeInfo *ty
*** 8103,8109 ****
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 8286,8292 ----
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, 0, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpDomain(Archive *fout, TypeInfo *tyin
*** 8270,8276 ****
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, false,
  				 "DOMAIN", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 8453,8459 ----
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, 0, false,
  				 "DOMAIN", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpCompositeType(Archive *fout, TypeInf
*** 8477,8483 ****
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 8660,8666 ----
  				 tyinfo->dobj.name,
  				 tyinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tyinfo->rolname, 0, false,
  				 "TYPE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpCompositeTypeColComments(Archive *fo
*** 8597,8603 ****
  			ArchiveEntry(fout, nilCatalogId, createDumpId(),
  						 target->data,
  						 tyinfo->dobj.namespace->dobj.name,
! 						 NULL, tyinfo->rolname,
  						 false, "COMMENT", SECTION_NONE,
  						 query->data, "", NULL,
  						 &(tyinfo->dobj.dumpId), 1,
--- 8780,8786 ----
  			ArchiveEntry(fout, nilCatalogId, createDumpId(),
  						 target->data,
  						 tyinfo->dobj.namespace->dobj.name,
! 						 NULL, tyinfo->rolname, 0,
  						 false, "COMMENT", SECTION_NONE,
  						 query->data, "", NULL,
  						 &(tyinfo->dobj.dumpId), 1,
*************** dumpShellType(Archive *fout, ShellTypeIn
*** 8650,8656 ****
  				 stinfo->dobj.name,
  				 stinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 stinfo->baseType->rolname, false,
  				 "SHELL TYPE", SECTION_PRE_DATA,
  				 q->data, "", NULL,
  				 stinfo->dobj.dependencies, stinfo->dobj.nDeps,
--- 8833,8839 ----
  				 stinfo->dobj.name,
  				 stinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 stinfo->baseType->rolname, 0, false,
  				 "SHELL TYPE", SECTION_PRE_DATA,
  				 q->data, "", NULL,
  				 stinfo->dobj.dependencies, stinfo->dobj.nDeps,
*************** dumpProcLang(Archive *fout, ProcLangInfo
*** 8824,8830 ****
  
  	ArchiveEntry(fout, plang->dobj.catId, plang->dobj.dumpId,
  				 plang->dobj.name,
! 				 lanschema, NULL, plang->lanowner,
  				 false, "PROCEDURAL LANGUAGE", SECTION_PRE_DATA,
  				 defqry->data, delqry->data, NULL,
  				 plang->dobj.dependencies, plang->dobj.nDeps,
--- 9007,9013 ----
  
  	ArchiveEntry(fout, plang->dobj.catId, plang->dobj.dumpId,
  				 plang->dobj.name,
! 				 lanschema, NULL, plang->lanowner, 0,
  				 false, "PROCEDURAL LANGUAGE", SECTION_PRE_DATA,
  				 defqry->data, delqry->data, NULL,
  				 plang->dobj.dependencies, plang->dobj.nDeps,
*************** dumpFunc(Archive *fout, FuncInfo *finfo)
*** 9394,9400 ****
  				 funcsig_tag,
  				 finfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 finfo->rolname, false,
  				 "FUNCTION", SECTION_PRE_DATA,
  				 q->data, delqry->data, NULL,
  				 finfo->dobj.dependencies, finfo->dobj.nDeps,
--- 9577,9583 ----
  				 funcsig_tag,
  				 finfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 finfo->rolname, 0, false,
  				 "FUNCTION", SECTION_PRE_DATA,
  				 q->data, delqry->data, NULL,
  				 finfo->dobj.dependencies, finfo->dobj.nDeps,
*************** dumpCast(Archive *fout, CastInfo *cast)
*** 9558,9564 ****
  
  	ArchiveEntry(fout, cast->dobj.catId, cast->dobj.dumpId,
  				 labelq->data,
! 				 "pg_catalog", NULL, "",
  				 false, "CAST", SECTION_PRE_DATA,
  				 defqry->data, delqry->data, NULL,
  				 cast->dobj.dependencies, cast->dobj.nDeps,
--- 9741,9747 ----
  
  	ArchiveEntry(fout, cast->dobj.catId, cast->dobj.dumpId,
  				 labelq->data,
! 				 "pg_catalog", NULL, "", 0,
  				 false, "CAST", SECTION_PRE_DATA,
  				 defqry->data, delqry->data, NULL,
  				 cast->dobj.dependencies, cast->dobj.nDeps,
*************** dumpOpr(Archive *fout, OprInfo *oprinfo)
*** 9805,9811 ****
  				 oprinfo->dobj.name,
  				 oprinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 oprinfo->rolname,
  				 false, "OPERATOR", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 oprinfo->dobj.dependencies, oprinfo->dobj.nDeps,
--- 9988,9994 ----
  				 oprinfo->dobj.name,
  				 oprinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 oprinfo->rolname, 0,
  				 false, "OPERATOR", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 oprinfo->dobj.dependencies, oprinfo->dobj.nDeps,
*************** dumpOpclass(Archive *fout, OpclassInfo *
*** 10338,10344 ****
  				 opcinfo->dobj.name,
  				 opcinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 opcinfo->rolname,
  				 false, "OPERATOR CLASS", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 opcinfo->dobj.dependencies, opcinfo->dobj.nDeps,
--- 10521,10527 ----
  				 opcinfo->dobj.name,
  				 opcinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 opcinfo->rolname, 0,
  				 false, "OPERATOR CLASS", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 opcinfo->dobj.dependencies, opcinfo->dobj.nDeps,
*************** dumpOpfamily(Archive *fout, OpfamilyInfo
*** 10666,10672 ****
  				 opfinfo->dobj.name,
  				 opfinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 opfinfo->rolname,
  				 false, "OPERATOR FAMILY", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 opfinfo->dobj.dependencies, opfinfo->dobj.nDeps,
--- 10849,10855 ----
  				 opfinfo->dobj.name,
  				 opfinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 opfinfo->rolname, 0,
  				 false, "OPERATOR FAMILY", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 opfinfo->dobj.dependencies, opfinfo->dobj.nDeps,
*************** dumpCollation(Archive *fout, CollInfo *c
*** 10768,10774 ****
  				 collinfo->dobj.name,
  				 collinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 collinfo->rolname,
  				 false, "COLLATION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 collinfo->dobj.dependencies, collinfo->dobj.nDeps,
--- 10951,10957 ----
  				 collinfo->dobj.name,
  				 collinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 collinfo->rolname, 0,
  				 false, "COLLATION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 collinfo->dobj.dependencies, collinfo->dobj.nDeps,
*************** dumpConversion(Archive *fout, ConvInfo *
*** 10880,10886 ****
  				 convinfo->dobj.name,
  				 convinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 convinfo->rolname,
  				 false, "CONVERSION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 convinfo->dobj.dependencies, convinfo->dobj.nDeps,
--- 11063,11069 ----
  				 convinfo->dobj.name,
  				 convinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 convinfo->rolname, 0,
  				 false, "CONVERSION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 convinfo->dobj.dependencies, convinfo->dobj.nDeps,
*************** dumpAgg(Archive *fout, AggInfo *agginfo)
*** 11129,11135 ****
  				 aggsig_tag,
  				 agginfo->aggfn.dobj.namespace->dobj.name,
  				 NULL,
! 				 agginfo->aggfn.rolname,
  				 false, "AGGREGATE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 agginfo->aggfn.dobj.dependencies, agginfo->aggfn.dobj.nDeps,
--- 11312,11318 ----
  				 aggsig_tag,
  				 agginfo->aggfn.dobj.namespace->dobj.name,
  				 NULL,
! 				 agginfo->aggfn.rolname, 0,
  				 false, "AGGREGATE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 agginfo->aggfn.dobj.dependencies, agginfo->aggfn.dobj.nDeps,
*************** dumpTSParser(Archive *fout, TSParserInfo
*** 11228,11233 ****
--- 11411,11417 ----
  				 prsinfo->dobj.namespace->dobj.name,
  				 NULL,
  				 "",
+ 				 0,
  				 false, "TEXT SEARCH PARSER", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 prsinfo->dobj.dependencies, prsinfo->dobj.nDeps,
*************** dumpTSDictionary(Archive *fout, TSDictIn
*** 11326,11331 ****
--- 11510,11516 ----
  				 dictinfo->dobj.namespace->dobj.name,
  				 NULL,
  				 dictinfo->rolname,
+ 				 0,
  				 false, "TEXT SEARCH DICTIONARY", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 dictinfo->dobj.dependencies, dictinfo->dobj.nDeps,
*************** dumpTSTemplate(Archive *fout, TSTemplate
*** 11392,11397 ****
--- 11577,11583 ----
  				 tmplinfo->dobj.namespace->dobj.name,
  				 NULL,
  				 "",
+ 				 0,
  				 false, "TEXT SEARCH TEMPLATE", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 tmplinfo->dobj.dependencies, tmplinfo->dobj.nDeps,
*************** dumpTSConfig(Archive *fout, TSConfigInfo
*** 11531,11536 ****
--- 11717,11723 ----
  				 cfginfo->dobj.namespace->dobj.name,
  				 NULL,
  				 cfginfo->rolname,
+ 				 0,
  				 false, "TEXT SEARCH CONFIGURATION", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 cfginfo->dobj.dependencies, cfginfo->dobj.nDeps,
*************** dumpForeignDataWrapper(Archive *fout, Fd
*** 11605,11610 ****
--- 11792,11798 ----
  				 NULL,
  				 NULL,
  				 fdwinfo->rolname,
+ 				 0,
  				 false, "FOREIGN DATA WRAPPER", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 fdwinfo->dobj.dependencies, fdwinfo->dobj.nDeps,
*************** dumpForeignServer(Archive *fout, Foreign
*** 11708,11713 ****
--- 11896,11902 ----
  				 NULL,
  				 NULL,
  				 srvinfo->rolname,
+ 				 0,
  				 false, "SERVER", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 srvinfo->dobj.dependencies, srvinfo->dobj.nDeps,
*************** dumpUserMappings(Archive *fout,
*** 11825,11831 ****
  					 tag->data,
  					 namespace,
  					 NULL,
! 					 owner, false,
  					 "USER MAPPING", SECTION_PRE_DATA,
  					 q->data, delq->data, NULL,
  					 &dumpId, 1,
--- 12014,12020 ----
  					 tag->data,
  					 namespace,
  					 NULL,
! 					 owner, 0, false,
  					 "USER MAPPING", SECTION_PRE_DATA,
  					 q->data, delq->data, NULL,
  					 &dumpId, 1,
*************** dumpDefaultACL(Archive *fout, DefaultACL
*** 11896,11901 ****
--- 12085,12091 ----
  	   daclinfo->dobj.namespace ? daclinfo->dobj.namespace->dobj.name : NULL,
  				 NULL,
  				 daclinfo->defaclrole,
+ 				 0,
  				 false, "DEFAULT ACL", SECTION_NONE,
  				 q->data, "", NULL,
  				 daclinfo->dobj.dependencies, daclinfo->dobj.nDeps,
*************** dumpACL(Archive *fout, CatalogId objCatI
*** 11953,11958 ****
--- 12143,12149 ----
  					 tag, nspname,
  					 NULL,
  					 owner ? owner : "",
+ 					 0,
  					 false, "ACL", SECTION_NONE,
  					 sql->data, "", NULL,
  					 &(objDumpId), 1,
*************** dumpSecLabel(Archive *fout, const char *
*** 12029,12035 ****
  	{
  		ArchiveEntry(fout, nilCatalogId, createDumpId(),
  					 target, namespace, NULL, owner,
! 					 false, "SECURITY LABEL", SECTION_NONE,
  					 query->data, "", NULL,
  					 &(dumpId), 1,
  					 NULL, NULL);
--- 12220,12226 ----
  	{
  		ArchiveEntry(fout, nilCatalogId, createDumpId(),
  					 target, namespace, NULL, owner,
! 					 0, false, "SECURITY LABEL", SECTION_NONE,
  					 query->data, "", NULL,
  					 &(dumpId), 1,
  					 NULL, NULL);
*************** dumpTableSecLabel(Archive *fout, TableIn
*** 12107,12113 ****
  					 target->data,
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL, tbinfo->rolname,
! 					 false, "SECURITY LABEL", SECTION_NONE,
  					 query->data, "", NULL,
  					 &(tbinfo->dobj.dumpId), 1,
  					 NULL, NULL);
--- 12298,12304 ----
  					 target->data,
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL, tbinfo->rolname,
! 					 0, false, "SECURITY LABEL", SECTION_NONE,
  					 query->data, "", NULL,
  					 &(tbinfo->dobj.dumpId), 1,
  					 NULL, NULL);
*************** dumpTableSchema(Archive *fout, TableInfo
*** 12888,12893 ****
--- 13079,13085 ----
  				 tbinfo->dobj.namespace->dobj.name,
  			(tbinfo->relkind == RELKIND_VIEW) ? NULL : tbinfo->reltablespace,
  				 tbinfo->rolname,
+ 				 0,
  			   (strcmp(reltypename, "TABLE") == 0) ? tbinfo->hasoids : false,
  				 reltypename, SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
*************** dumpAttrDef(Archive *fout, AttrDefInfo *
*** 12961,12966 ****
--- 13153,13159 ----
  				 tbinfo->dobj.namespace->dobj.name,
  				 NULL,
  				 tbinfo->rolname,
+ 				 0,
  				 false, "DEFAULT", SECTION_PRE_DATA,
  				 q->data, delq->data, NULL,
  				 adinfo->dobj.dependencies, adinfo->dobj.nDeps,
*************** dumpIndex(Archive *fout, IndxInfo *indxi
*** 13062,13068 ****
  					 indxinfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 indxinfo->tablespace,
! 					 tbinfo->rolname, false,
  					 "INDEX", SECTION_POST_DATA,
  					 q->data, delq->data, NULL,
  					 indxinfo->dobj.dependencies, indxinfo->dobj.nDeps,
--- 13255,13261 ----
  					 indxinfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 indxinfo->tablespace,
! 					 tbinfo->rolname, indxinfo->relpages, false,
  					 "INDEX", SECTION_POST_DATA,
  					 q->data, delq->data, NULL,
  					 indxinfo->dobj.dependencies, indxinfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 13185,13191 ****
  					 coninfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 indxinfo->tablespace,
! 					 tbinfo->rolname, false,
  					 "CONSTRAINT", SECTION_POST_DATA,
  					 q->data, delq->data, NULL,
  					 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 13378,13384 ----
  					 coninfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 indxinfo->tablespace,
! 					 tbinfo->rolname, 0, false,
  					 "CONSTRAINT", SECTION_POST_DATA,
  					 q->data, delq->data, NULL,
  					 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 13218,13224 ****
  					 coninfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL,
! 					 tbinfo->rolname, false,
  					 "FK CONSTRAINT", SECTION_POST_DATA,
  					 q->data, delq->data, NULL,
  					 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 13411,13417 ----
  					 coninfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL,
! 					 tbinfo->rolname, 0, false,
  					 "FK CONSTRAINT", SECTION_POST_DATA,
  					 q->data, delq->data, NULL,
  					 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 13253,13259 ****
  						 coninfo->dobj.name,
  						 tbinfo->dobj.namespace->dobj.name,
  						 NULL,
! 						 tbinfo->rolname, false,
  						 "CHECK CONSTRAINT", SECTION_POST_DATA,
  						 q->data, delq->data, NULL,
  						 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 13446,13452 ----
  						 coninfo->dobj.name,
  						 tbinfo->dobj.namespace->dobj.name,
  						 NULL,
! 						 tbinfo->rolname, 0, false,
  						 "CHECK CONSTRAINT", SECTION_POST_DATA,
  						 q->data, delq->data, NULL,
  						 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 13289,13295 ****
  						 coninfo->dobj.name,
  						 tyinfo->dobj.namespace->dobj.name,
  						 NULL,
! 						 tyinfo->rolname, false,
  						 "CHECK CONSTRAINT", SECTION_POST_DATA,
  						 q->data, delq->data, NULL,
  						 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 13482,13488 ----
  						 coninfo->dobj.name,
  						 tyinfo->dobj.namespace->dobj.name,
  						 NULL,
! 						 tyinfo->rolname, 0, false,
  						 "CHECK CONSTRAINT", SECTION_POST_DATA,
  						 q->data, delq->data, NULL,
  						 coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpSequence(Archive *fout, TableInfo *t
*** 13579,13585 ****
  					 tbinfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL,
! 					 tbinfo->rolname,
  					 false, "SEQUENCE", SECTION_PRE_DATA,
  					 query->data, delqry->data, NULL,
  					 tbinfo->dobj.dependencies, tbinfo->dobj.nDeps,
--- 13772,13778 ----
  					 tbinfo->dobj.name,
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL,
! 					 tbinfo->rolname, 0,
  					 false, "SEQUENCE", SECTION_PRE_DATA,
  					 query->data, delqry->data, NULL,
  					 tbinfo->dobj.dependencies, tbinfo->dobj.nDeps,
*************** dumpSequence(Archive *fout, TableInfo *t
*** 13615,13621 ****
  							 tbinfo->dobj.name,
  							 tbinfo->dobj.namespace->dobj.name,
  							 NULL,
! 							 tbinfo->rolname,
  							 false, "SEQUENCE OWNED BY", SECTION_PRE_DATA,
  							 query->data, "", NULL,
  							 &(tbinfo->dobj.dumpId), 1,
--- 13808,13814 ----
  							 tbinfo->dobj.name,
  							 tbinfo->dobj.namespace->dobj.name,
  							 NULL,
! 							 tbinfo->rolname, 0,
  							 false, "SEQUENCE OWNED BY", SECTION_PRE_DATA,
  							 query->data, "", NULL,
  							 &(tbinfo->dobj.dumpId), 1,
*************** dumpSequence(Archive *fout, TableInfo *t
*** 13645,13650 ****
--- 13838,13844 ----
  					 tbinfo->dobj.namespace->dobj.name,
  					 NULL,
  					 tbinfo->rolname,
+ 					 0,
  					 false, "SEQUENCE SET", SECTION_PRE_DATA,
  					 query->data, "", NULL,
  					 &(tbinfo->dobj.dumpId), 1,
*************** dumpTrigger(Archive *fout, TriggerInfo *
*** 13845,13851 ****
  				 tginfo->dobj.name,
  				 tbinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tbinfo->rolname, false,
  				 "TRIGGER", SECTION_POST_DATA,
  				 query->data, delqry->data, NULL,
  				 tginfo->dobj.dependencies, tginfo->dobj.nDeps,
--- 14039,14045 ----
  				 tginfo->dobj.name,
  				 tbinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tbinfo->rolname, 0, false,
  				 "TRIGGER", SECTION_POST_DATA,
  				 query->data, delqry->data, NULL,
  				 tginfo->dobj.dependencies, tginfo->dobj.nDeps,
*************** dumpRule(Archive *fout, RuleInfo *rinfo)
*** 13967,13973 ****
  				 rinfo->dobj.name,
  				 tbinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tbinfo->rolname, false,
  				 "RULE", SECTION_POST_DATA,
  				 cmd->data, delcmd->data, NULL,
  				 rinfo->dobj.dependencies, rinfo->dobj.nDeps,
--- 14161,14167 ----
  				 rinfo->dobj.name,
  				 tbinfo->dobj.namespace->dobj.name,
  				 NULL,
! 				 tbinfo->rolname, 0, false,
  				 "RULE", SECTION_POST_DATA,
  				 cmd->data, delcmd->data, NULL,
  				 rinfo->dobj.dependencies, rinfo->dobj.nDeps,
*************** getDependencies(void)
*** 14268,14273 ****
--- 14462,14509 ----
  	destroyPQExpBuffer(query);
  }
  
+ /*
+  * Select the source schema and use a static var to remember what we have set
+  * as the default schema right now.  This function is never called in parallel
+  * context, so the static var is okay. The parallel context will call
+  * selectSourceSchemaOnAH, and this remembers the current schema via
+  * AH->currSchema.
+  */
+ static void
+ selectSourceSchema(const char *schemaName)
+ {
+ 	static char *currSchemaName = NULL;
+ 
+ 	if (!schemaName || *schemaName == '\0' ||
+ 		(currSchemaName && strcmp(currSchemaName, schemaName) == 0))
+ 		return;					/* no need to do anything */
+ 
+ 	selectSourceSchemaOnConnection(g_conn, schemaName);
+ 
+ 	if (currSchemaName)
+ 		free(currSchemaName);
+ 	currSchemaName = pg_strdup(schemaName);
+ }
+ 
+ /*
+  * This function lets a DataDumper function select a schema on an
+  * ArchiveHandle. These functions can be called from a threaded program for
+  * parallel dump/restore and must therefore not access global variables (read
+  * only access to g_fout->remoteVersion is okay however).
+  */
+ static void
+ selectSourceSchemaOnAH(ArchiveHandle *AH, const char *schemaName)
+ {
+ 	if (!schemaName || *schemaName == '\0' ||
+ 		(AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
+ 		return;					/* no need to do anything */
+ 
+ 	selectSourceSchemaOnConnection(AH->connection, schemaName);
+ 
+ 	if (AH->currSchema)
+ 		free(AH->currSchema);
+ 	AH->currSchema = pg_strdup(schemaName);
+ }
  
  /*
   * selectSourceSchema - make the specified schema the active search path
*************** getDependencies(void)
*** 14280,14301 ****
   *
   * Whenever the selected schema is not pg_catalog, be careful to qualify
   * references to system catalogs and types in our emitted commands!
   */
  static void
! selectSourceSchema(const char *schemaName)
  {
- 	static char *curSchemaName = NULL;
  	PQExpBuffer query;
  
  	/* Not relevant if fetching from pre-7.3 DB */
  	if (g_fout->remoteVersion < 70300)
  		return;
- 	/* Ignore null schema names */
- 	if (schemaName == NULL || *schemaName == '\0')
- 		return;
- 	/* Optimize away repeated selection of same schema */
- 	if (curSchemaName && strcmp(curSchemaName, schemaName) == 0)
- 		return;
  
  	query = createPQExpBuffer();
  	appendPQExpBuffer(query, "SET search_path = %s",
--- 14516,14536 ----
   *
   * Whenever the selected schema is not pg_catalog, be careful to qualify
   * references to system catalogs and types in our emitted commands!
+  *
+  * This function is called only from selectSourceSchemaOnAH and
+  * selectSourceSchema.
   */
  static void
! selectSourceSchemaOnConnection(PGconn *conn, const char *schemaName)
  {
  	PQExpBuffer query;
  
+ 	/* This is checked by the callers already */
+ 	Assert(schemaName != NULL && *schemaName != '\0');
+ 
  	/* Not relevant if fetching from pre-7.3 DB */
  	if (g_fout->remoteVersion < 70300)
  		return;
  
  	query = createPQExpBuffer();
  	appendPQExpBuffer(query, "SET search_path = %s",
*************** selectSourceSchema(const char *schemaNam
*** 14303,14314 ****
  	if (strcmp(schemaName, "pg_catalog") != 0)
  		appendPQExpBuffer(query, ", pg_catalog");
  
! 	do_sql_command(g_conn, query->data);
  
  	destroyPQExpBuffer(query);
- 	if (curSchemaName)
- 		free(curSchemaName);
- 	curSchemaName = pg_strdup(schemaName);
  }
  
  /*
--- 14538,14546 ----
  	if (strcmp(schemaName, "pg_catalog") != 0)
  		appendPQExpBuffer(query, ", pg_catalog");
  
! 	do_sql_command(conn, query->data);
  
  	destroyPQExpBuffer(query);
  }
  
  /*
*************** myFormatType(const char *typname, int32
*** 14459,14529 ****
  }
  
  /*
-  * fmtQualifiedId - convert a qualified name to the proper format for
-  * the source database.
-  *
-  * Like fmtId, use the result before calling again.
-  */
- static const char *
- fmtQualifiedId(const char *schema, const char *id)
- {
- 	static PQExpBuffer id_return = NULL;
- 
- 	if (id_return)				/* first time through? */
- 		resetPQExpBuffer(id_return);
- 	else
- 		id_return = createPQExpBuffer();
- 
- 	/* Suppress schema name if fetching from pre-7.3 DB */
- 	if (g_fout->remoteVersion >= 70300 && schema && *schema)
- 	{
- 		appendPQExpBuffer(id_return, "%s.",
- 						  fmtId(schema));
- 	}
- 	appendPQExpBuffer(id_return, "%s",
- 					  fmtId(id));
- 
- 	return id_return->data;
- }
- 
- /*
   * Return a column list clause for the given relation.
   *
   * Special case: if there are no undropped columns in the relation, return
   * "", not an invalid "()" column list.
   */
  static const char *
! fmtCopyColumnList(const TableInfo *ti)
  {
- 	static PQExpBuffer q = NULL;
  	int			numatts = ti->numatts;
  	char	  **attnames = ti->attnames;
  	bool	   *attisdropped = ti->attisdropped;
  	bool		needComma;
  	int			i;
  
! 	if (q)						/* first time through? */
! 		resetPQExpBuffer(q);
! 	else
! 		q = createPQExpBuffer();
! 
! 	appendPQExpBuffer(q, "(");
  	needComma = false;
  	for (i = 0; i < numatts; i++)
  	{
  		if (attisdropped[i])
  			continue;
  		if (needComma)
! 			appendPQExpBuffer(q, ", ");
! 		appendPQExpBuffer(q, "%s", fmtId(attnames[i]));
  		needComma = true;
  	}
  
  	if (!needComma)
  		return "";				/* no undropped columns */
  
! 	appendPQExpBuffer(q, ")");
! 	return q->data;
  }
  
  /*
--- 14691,14727 ----
  }
  
  /*
   * Return a column list clause for the given relation.
   *
   * Special case: if there are no undropped columns in the relation, return
   * "", not an invalid "()" column list.
   */
  static const char *
! fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer)
  {
  	int			numatts = ti->numatts;
  	char	  **attnames = ti->attnames;
  	bool	   *attisdropped = ti->attisdropped;
  	bool		needComma;
  	int			i;
  
! 	appendPQExpBuffer(buffer, "(");
  	needComma = false;
  	for (i = 0; i < numatts; i++)
  	{
  		if (attisdropped[i])
  			continue;
  		if (needComma)
! 			appendPQExpBuffer(buffer, ", ");
! 		appendPQExpBuffer(buffer, "%s", fmtId(attnames[i]));
  		needComma = true;
  	}
  
  	if (!needComma)
  		return "";				/* no undropped columns */
  
! 	appendPQExpBuffer(buffer, ")");
! 	return buffer->data;
  }
  
  /*
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 11c4d37..dba617b 100644
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
*************** typedef struct _tableInfo
*** 257,262 ****
--- 257,263 ----
  	/* these two are set only if table is a sequence owned by a column: */
  	Oid			owning_tab;		/* OID of table owning sequence */
  	int			owning_col;		/* attr # of column owning sequence */
+ 	int			relpages;
  
  	bool		interesting;	/* true if need to collect more data */
  
*************** typedef struct _indxInfo
*** 328,333 ****
--- 329,335 ----
  	bool		indisclustered;
  	/* if there is an associated constraint object, its dumpId: */
  	DumpId		indexconstraint;
+ 	int			relpages;		/* relpages of the underlying table */
  } IndxInfo;
  
  typedef struct _ruleInfo
*************** extern void parseOidArray(const char *st
*** 532,537 ****
--- 534,540 ----
  extern void sortDumpableObjects(DumpableObject **objs, int numObjs);
  extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
  extern void sortDumpableObjectsByTypeOid(DumpableObject **objs, int numObjs);
+ extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);
  
  /*
   * version specific routines
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 4d1ae94..1286124 100644
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
*************** static void repairDependencyLoop(Dumpabl
*** 121,126 ****
--- 121,213 ----
  static void describeDumpableObject(DumpableObject *obj,
  					   char *buf, int bufsize);
  
+ static int DOSizeCompare(const void *p1, const void *p2);
+ 
+ static int
+ findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
+ {
+ 	int i;
+ 	for (i = 0; i < numObjs; i++)
+ 		if (objs[i]->objType == type)
+ 			return i;
+ 	return -1;
+ }
+ 
+ static int
+ findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
+ {
+ 	int i;
+ 	for (i = start; i < numObjs; i++)
+ 		if (objs[i]->objType != type)
+ 			return i;
+ 	return numObjs - 1;
+ }
+ 
+ /*
+  * When we do a parallel dump, we want to start with the largest items first.
+  *
+  * Say we have the objects in this order:
+  * ....DDDDD....III....
+  *
+  * with D = Table data, I = Index, . = other object
+  *
+  * This sorting function now takes each of the D or I blocks and sorts them
+  * according to their size.
+  */
+ void
+ sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
+ {
+ 	int		startIdx, endIdx;
+ 	void   *startPtr;
+ 
+ 	if (numObjs <= 1)
+ 		return;
+ 
+ 	startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
+ 	if (startIdx >= 0)
+ 	{
+ 		endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
+ 		startPtr = objs + startIdx;
+ 		qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
+ 			  DOSizeCompare);
+ 	}
+ 
+ 	startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
+ 	if (startIdx >= 0)
+ 	{
+ 		endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
+ 		startPtr = objs + startIdx;
+ 		qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
+ 			  DOSizeCompare);
+ 	}
+ }
+ 
+ static int
+ DOSizeCompare(const void *p1, const void *p2)
+ {
+ 	DumpableObject *obj1 = *(DumpableObject **) p1;
+ 	DumpableObject *obj2 = *(DumpableObject **) p2;
+ 	int			obj1_size = 0;
+ 	int			obj2_size = 0;
+ 
+ 	if (obj1->objType == DO_TABLE_DATA)
+ 		obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
+ 	if (obj1->objType == DO_INDEX)
+ 		obj1_size = ((IndxInfo *) obj1)->relpages;
+ 
+ 	if (obj2->objType == DO_TABLE_DATA)
+ 		obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
+ 	if (obj2->objType == DO_INDEX)
+ 		obj2_size = ((IndxInfo *) obj2)->relpages;
+ 
+ 	/* we want to see the biggest item go first */
+ 	if (obj1_size > obj2_size)
+ 		return -1;
+ 	if (obj2_size > obj1_size)
+ 		return 1;
+ 
+ 	return 0;
+ }
  
  /*
   * Sort the given objects into a type/name-based ordering
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index 4c93667..438b8f0 100644
*** a/src/bin/pg_dump/pg_dumpall.c
--- b/src/bin/pg_dump/pg_dumpall.c
*************** doShellQuoting(PQExpBuffer buf, const ch
*** 1913,1915 ****
--- 1913,1918 ----
  	appendPQExpBufferChar(buf, '"');
  #endif   /* WIN32 */
  }
+ 
+ /* dummy, no parallel dump/restore for pg_dumpall yet */
+ void _SetupWorker(Archive *AHX, RestoreOptions *ropt) {}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 6ff1ab8..965e9f2 100644
*** a/src/bin/pg_dump/pg_restore.c
--- b/src/bin/pg_dump/pg_restore.c
*************** main(int argc, char **argv)
*** 72,77 ****
--- 72,78 ----
  	RestoreOptions *opts;
  	int			c;
  	int			exit_code;
+ 	int			numWorkers = 1;
  	Archive    *AH;
  	char	   *inputFileSpec;
  	static int	disable_triggers = 0;
*************** main(int argc, char **argv)
*** 183,189 ****
  				break;
  
  			case 'j':			/* number of restore jobs */
! 				opts->number_of_jobs = atoi(optarg);
  				break;
  
  			case 'l':			/* Dump the TOC summary */
--- 184,190 ----
  				break;
  
  			case 'j':			/* number of restore jobs */
! 				numWorkers = atoi(optarg);
  				break;
  
  			case 'l':			/* Dump the TOC summary */
*************** main(int argc, char **argv)
*** 338,344 ****
  	}
  
  	/* Can't do single-txn mode with multiple connections */
! 	if (opts->single_txn && opts->number_of_jobs > 1)
  	{
  		fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"),
  				progname);
--- 339,345 ----
  	}
  
  	/* Can't do single-txn mode with multiple connections */
! 	if (opts->single_txn && numWorkers > 1)
  	{
  		fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"),
  				progname);
*************** main(int argc, char **argv)
*** 403,408 ****
--- 404,420 ----
  		InitDummyWantedList(AH, opts);
  	}
  
+ 	/* See comments in pg_dump.c */
+ #ifdef WIN32
+ 	if (numWorkers > MAXIMUM_WAIT_OBJECTS)
+ 	{
+ 		fprintf(stderr, _("%s: invalid number of parallel jobs\n"),	progname);
+ 		exit(1);
+ 	}
+ #endif
+ 
+ 	AH->numWorkers = numWorkers;
+ 
  	if (opts->tocSummary)
  		PrintTOCSummary(AH, opts);
  	else
*************** main(int argc, char **argv)
*** 421,426 ****
--- 433,445 ----
  	return exit_code;
  }
  
+ void
+ _SetupWorker(Archive *AHX, RestoreOptions *ropt)
+ {
+ 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ 	(AH->ReopenPtr) (AH);
+ }
+ 
  static void
  usage(const char *progname)
  {
