diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index e9be18b..f3d554f 100644
*** a/src/bin/pg_dump/Makefile
--- b/src/bin/pg_dump/Makefile
*************** override CPPFLAGS := -I$(libpq_srcdir) $
*** 20,26 ****
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_null.o pg_backup_tar.o \
! 	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
--- 20,27 ----
  
  OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
  	pg_backup_null.o pg_backup_tar.o \
! 	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o \
! 	parallel.o $(WIN32RES)
  
  KEYWRDOBJS = keywords.o kwlookup.o
  
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index dea0588..fddf767 100644
*** a/src/bin/pg_dump/compress_io.c
--- b/src/bin/pg_dump/compress_io.c
***************
*** 55,60 ****
--- 55,61 ----
  #include "compress_io.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  /*----------------------
   * Compressor API
*************** size_t
*** 183,188 ****
--- 184,192 ----
  WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
  				   const void *data, size_t dLen)
  {
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	switch (cs->comprAlg)
  	{
  		case COMPR_ALG_LIBZ:
*************** ReadDataFromArchiveZlib(ArchiveHandle *A
*** 352,357 ****
--- 356,364 ----
  	/* no minimal chunk size for zlib */
  	while ((cnt = readF(AH, &buf, &buflen)))
  	{
+ 		/* Are we aborting? */
+ 		checkAborting(AH);
+ 
  		zp->next_in = (void *) buf;
  		zp->avail_in = cnt;
  
*************** ReadDataFromArchiveNone(ArchiveHandle *A
*** 412,417 ****
--- 419,427 ----
  
  	while ((cnt = readF(AH, &buf, &buflen)))
  	{
+ 		/* Are we aborting? */
+ 		checkAborting(AH);
+ 
  		ahwrite(buf, 1, cnt, AH);
  	}
  
diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c
index def8e71..c55eaa1 100644
*** a/src/bin/pg_dump/dumputils.c
--- b/src/bin/pg_dump/dumputils.c
*************** static void AddAcl(PQExpBuffer aclbuf, c
*** 53,61 ****
--- 53,70 ----
  static PQExpBuffer getThreadLocalPQExpBuffer(void);
  
  #ifdef WIN32
+ static void shutdown_parallel_dump_utils(int code, void* unused);
  static bool parallel_init_done = false;
  static DWORD tls_index;
  static DWORD mainThreadId;
+ 
+ static void
+ shutdown_parallel_dump_utils(int code, void* unused)
+ {
+ 	/* Call the cleanup function only from the main thread */
+ 	if (mainThreadId == GetCurrentThreadId())
+ 		WSACleanup();
+ }
  #endif
  
  void
*************** init_parallel_dump_utils(void)
*** 64,72 ****
  #ifdef WIN32
  	if (!parallel_init_done)
  	{
  		tls_index = TlsAlloc();
- 		parallel_init_done = true;
  		mainThreadId = GetCurrentThreadId();
  	}
  #endif
  }
--- 73,91 ----
  #ifdef WIN32
  	if (!parallel_init_done)
  	{
+ 		WSADATA	wsaData;
+ 		int		err;
+ 
  		tls_index = TlsAlloc();
  		mainThreadId = GetCurrentThreadId();
+ 		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
+ 		if (err != 0)
+ 		{
+ 			fprintf(stderr, _("WSAStartup failed: %d\n"), err);
+ 			exit_nicely(1);
+ 		}
+ 		on_exit_nicely(shutdown_parallel_dump_utils, NULL);
+ 		parallel_init_done = true;
  	}
  #endif
  }
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index ...65bc8bb .
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 0 ****
--- 1,1283 ----
+ /*-------------------------------------------------------------------------
+  *
+  * parallel.c
+  *
+  *	Parallel support for the pg_dump archiver
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *	The author is not responsible for loss or damages that may
+  *	result from its use.
+  *
+  * IDENTIFICATION
+  *		src/bin/pg_dump/parallel.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "pg_backup_db.h"
+ 
+ #include "dumpmem.h"
+ #include "dumputils.h"
+ #include "parallel.h"
+ 
+ #ifndef WIN32
+ #include <sys/types.h>
+ #include <sys/wait.h>
+ #include "signal.h"
+ #include <unistd.h>
+ #include <fcntl.h>
+ #endif
+ 
+ #define PIPE_READ							0
+ #define PIPE_WRITE							1
+ 
+ /* file-scope variables */
+ #ifdef WIN32
+ static unsigned int	tMasterThreadId = 0;
+ static HANDLE		termEvent = INVALID_HANDLE_VALUE;
+ static int pgpipe(int handles[2]);
+ static int piperead(int s, char *buf, int len);
+ #define pipewrite(a,b,c)	send(a,b,c,0)
+ #else
+ /*
+  * aborting is only ever used in the master, the workers are fine with just
+  * wantAbort.
+  */
+ static bool aborting = false;
+ static volatile sig_atomic_t wantAbort = 0;
+ #define pgpipe(a)			pipe(a)
+ #define piperead(a,b,c)		read(a,b,c)
+ #define pipewrite(a,b,c)	write(a,b,c)
+ #endif
+ 
+ typedef struct ShutdownInformation
+ {
+     ParallelState *pstate;
+     Archive       *AHX;
+ } ShutdownInformation;
+ 
+ static ShutdownInformation shutdown_info;
+ 
+ static const char *modulename = gettext_noop("parallel archiver");
+ 
+ static ParallelSlot *GetMyPSlot(ParallelState *pstate);
+ static void parallel_exit_msg_func(const char *modulename,
+ 								   const char *fmt, va_list ap)
+ 			__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
+ static void parallel_msg_master(ParallelSlot *slot, const char *modulename,
+ 								const char *fmt, va_list ap)
+ 			__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
+ static void archive_close_connection(int code, void *arg);
+ static void ShutdownWorkersHard(ParallelState *pstate);
+ static void WaitForTerminatingWorkers(ParallelState *pstate);
+ #ifndef WIN32
+ static void sigTermHandler(int signum);
+ #endif
+ static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+ 						RestoreOptions *ropt);
+ static bool HasEveryWorkerTerminated(ParallelState *pstate);
+ 
+ static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
+ static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+ static char *getMessageFromMaster(int pipefd[2]);
+ static void sendMessageToMaster(int pipefd[2], const char *str);
+ static int select_loop(int maxFd, fd_set *workerset);
+ static char *getMessageFromWorker(ParallelState *pstate,
+ 								  bool do_wait, int *worker);
+ static void sendMessageToWorker(ParallelState *pstate,
+ 							    int worker, const char *str);
+ static char *readMessageFromPipe(int fd);
+ 
+ #define messageStartsWith(msg, prefix) \
+ 	(strncmp(msg, prefix, strlen(prefix)) == 0)
+ #define messageEquals(msg, pattern) \
+ 	(strcmp(msg, pattern) == 0)
+ 
+ static ParallelSlot *
+ GetMyPSlot(ParallelState *pstate)
+ {
+ 	int i;
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ #ifdef WIN32
+ 		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
+ #else
+ 		if (pstate->parallelSlot[i].pid == getpid())
+ #endif
+ 			return &(pstate->parallelSlot[i]);
+ 
+ 	return NULL;
+ }
+ 
+ /*
+  * This is the function that will be called from exit_horribly() to print the
+  * error message. If the worker process does exit_horribly(), we forward its
+  * last words to the master process. The master process then does
+  * exit_horribly() with this error message itself and prints it normally.
+  * After printing the message, exit_horribly() on the master will shut down
+  * the remaining worker processes.
+  */
+ static void
+ parallel_exit_msg_func(const char *modulename, const char *fmt, va_list ap)
+ {
+ 	ParallelState *pstate = shutdown_info.pstate;
+ 	ParallelSlot *slot;
+ 
+ 	Assert(pstate);
+ 
+ 	slot = GetMyPSlot(pstate);
+ 
+ 	if (!slot)
+ 		/* We're the parent, just write the message out */
+ 		vwrite_msg(modulename, fmt, ap);
+ 	else
+ 		/* If we're a worker process, send the msg to the master process */
+ 		parallel_msg_master(slot, modulename, fmt, ap);
+ }
+ 
+ /* Sends the error message from the worker to the master process */
+ static void
+ parallel_msg_master(ParallelSlot *slot, const char *modulename,
+ 					const char *fmt, va_list ap)
+ {
+ 	char		buf[512];
+ 	int			pipefd[2];
+ 
+ 	pipefd[PIPE_READ] = slot->pipeRevRead;
+ 	pipefd[PIPE_WRITE] = slot->pipeRevWrite;
+ 
+ 	strcpy(buf, "ERROR ");
+ 	vsnprintf(buf + strlen("ERROR "),
+ 			  sizeof(buf) - strlen("ERROR "), fmt, ap);
+ 
+ 	sendMessageToMaster(pipefd, buf);
+ }
+ 
+ /*
+  * pg_dump and pg_restore register the Archive pointer for the exit handler
+  * (called from exit_horribly). This function mainly exists so that we can
+  * keep shutdown_info in file scope only.
+  */
+ void
+ on_exit_close_archive(Archive *AHX)
+ {
+ 	shutdown_info.AHX = AHX;
+ 	on_exit_nicely(archive_close_connection, &shutdown_info);
+ }
+ 
+ /*
+  * This function can close archives in both the parallel and non-parallel
+  * case.
+  */
+ static void
+ archive_close_connection(int code, void *arg)
+ {
+ 	ShutdownInformation *si = (ShutdownInformation *) arg;
+ 
+ 	if (si->pstate)
+ 	{
+ 		ParallelSlot *slot = GetMyPSlot(si->pstate);
+ 
+ 		if (!slot) {
+ 			/*
+ 			 * We're the master: We have already printed out the message
+ 			 * passed to exit_horribly() either from the master itself or from
+ 			 * a worker process. Now we need to close our own database
+ 			 * connection (only open during parallel dump but not restore) and
+ 			 * shut down the remaining workers.
+ 			 */
+ 			DisconnectDatabase(si->AHX);
+ #ifndef WIN32
+ 			/*
+ 			 * Setting aborting to true switches to best-effort-mode
+ 			 * (send/receive but ignore errors) in communicating with our
+ 			 * workers.
+ 			 */
+ 			aborting = true;
+ #endif
+ 			ShutdownWorkersHard(si->pstate);
+ 		}
+ 		else if (slot->args->AH)
+ 			DisconnectDatabase(&(slot->args->AH->public));
+ 	}
+ 	else if (si->AHX)
+ 		DisconnectDatabase(si->AHX);
+ }
+ 
+ /*
+  * If we have one worker that terminates for some reason, we'd like the other
+  * threads to terminate as well (and not finish with their 70 GB table dump
+  * first...). Now in UNIX we can just kill these processes, and let the signal
+  * handler set wantAbort to 1. In Windows we set a termEvent and this serves
+  * as the signal for everyone to terminate.
+  */
+ void
+ checkAborting(ArchiveHandle *AH)
+ {
+ #ifdef WIN32
+ 	if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
+ #else
+ 	if (wantAbort)
+ #endif
+ 		exit_horribly(modulename, "worker is terminating\n");
+ }
+ 
+ /*
+  * Shut down any remaining workers, this has an implicit do_wait == true.
+  *
+  * The fastest way we can make the workers terminate gracefully is when
+  * they are listening for new commands and we just tell them to terminate.
+  */
+ static void
+ ShutdownWorkersHard(ParallelState *pstate)
+ {
+ #ifndef WIN32
+ 	int i;
+ 	signal(SIGPIPE, SIG_IGN);
+ 
+ 	/*
+ 	 * Close our write end of the sockets so that the workers know they can
+ 	 * exit.
+ 	 */
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		closesocket(pstate->parallelSlot[i].pipeWrite);
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		kill(pstate->parallelSlot[i].pid, SIGTERM);
+ 
+ #else
+ 	/* The workers monitor this event via checkAborting(). */
+ 	SetEvent(termEvent);
+ #endif
+ 
+ 	WaitForTerminatingWorkers(pstate);
+ }
+ 
+ /*
+  * Wait for the termination of the processes using the OS-specific method.
+  */
+ static void
+ WaitForTerminatingWorkers(ParallelState *pstate)
+ {
+ 	while (!HasEveryWorkerTerminated(pstate))
+ 	{
+ 		ParallelSlot *slot = NULL;
+ 		int j;
+ #ifndef WIN32
+ 		int		status;
+ 		pid_t	pid = wait(&status);
+ 		for (j = 0; j < pstate->numWorkers; j++)
+ 			if (pstate->parallelSlot[j].pid == pid)
+ 				slot = &(pstate->parallelSlot[j]);
+ #else
+ 		uintptr_t hThread;
+ 		DWORD	ret;
+ 		uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
+ 		int nrun = 0;
+ 		for (j = 0; j < pstate->numWorkers; j++)
+ 			if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+ 			{
+ 				lpHandles[nrun] = pstate->parallelSlot[j].hThread;
+ 				nrun++;
+ 			}
+ 		ret = WaitForMultipleObjects(nrun, (HANDLE*) lpHandles, false, INFINITE);
+ 		Assert(ret != WAIT_FAILED);
+ 		hThread = lpHandles[ret - WAIT_OBJECT_0];
+ 
+ 		for (j = 0; j < pstate->numWorkers; j++)
+ 			if (pstate->parallelSlot[j].hThread == hThread)
+ 				slot = &(pstate->parallelSlot[j]);
+ 
+ 		free(lpHandles);
+ #endif
+ 		Assert(slot);
+ 
+ 		slot->workerStatus = WRKR_TERMINATED;
+ 	}
+ 	Assert(HasEveryWorkerTerminated(pstate));
+ }
+ 
+ #ifndef WIN32
+ /* Signal handling (UNIX only) */
+ static void
+ sigTermHandler(int signum)
+ {
+ 	wantAbort = 1;
+ }
+ #endif
+ 
+ /*
+  * This function is called by both UNIX and Windows variants to set up a
+  * worker process.
+  */
+ static void
+ SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
+ 			RestoreOptions *ropt)
+ {
+ 	/*
+ 	 * In dump mode (pg_dump) this calls _SetupWorker() as defined in
+ 	 * pg_dump.c, while in restore mode (pg_restore) it calls _SetupWorker()
+ 	 * as defined in pg_restore.c.
+      *
+ 	 * We get the raw connection only for the reason that we can close it
+ 	 * properly when we shut down. This happens only that way when it is
+ 	 * brought down because of an error.
+ 	 */
+ 	_SetupWorker((Archive *) AH, ropt);
+ 
+ 	Assert(AH->connection != NULL);
+ 
+ 	WaitForCommands(AH, pipefd);
+ 
+ 	closesocket(pipefd[PIPE_READ]);
+ 	closesocket(pipefd[PIPE_WRITE]);
+ }
+ 
+ #ifdef WIN32
+ /*
+  * On Windows the _beginthreadex() function allows us to pass one parameter.
+  * Since we need to pass a few values however, we define a structure here
+  * and then pass a pointer to such a structure in _beginthreadex().
+  */
+ typedef struct {
+ 	ArchiveHandle  *AH;
+ 	RestoreOptions *ropt;
+ 	int				worker;
+ 	int				pipeRead;
+ 	int				pipeWrite;
+ } WorkerInfo;
+ 
+ static unsigned __stdcall
+ init_spawned_worker_win32(WorkerInfo *wi)
+ {
+ 	ArchiveHandle *AH;
+ 	int pipefd[2] = { wi->pipeRead, wi->pipeWrite };
+ 	int worker = wi->worker;
+ 	RestoreOptions *ropt = wi->ropt;
+ 
+ 	AH = CloneArchive(wi->AH);
+ 
+ 	free(wi);
+ 	SetupWorker(AH, pipefd, worker, ropt);
+ 
+ 	DeCloneArchive(AH);
+ 	_endthreadex(0);
+ 	return 0;
+ }
+ #endif
+ 
+ /*
+  * This function starts the parallel dump or restore by spawning off the
+  * worker processes in both Unix and Windows. For Windows, it creates a number
+  * of threads while it does a fork() on Unix.
+  */
+ ParallelState *
+ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
+ {
+ 	ParallelState  *pstate;
+ 	int				i;
+ 	const size_t	slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
+ 
+ 	Assert(AH->public.numWorkers > 0);
+ 
+ 	/* Ensure stdio state is quiesced before forking */
+ 	fflush(NULL);
+ 
+ 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
+ 
+ 	pstate->numWorkers = AH->public.numWorkers;
+ 	pstate->parallelSlot = NULL;
+ 
+ 	if (AH->public.numWorkers == 1)
+ 		return pstate;
+ 
+ 	pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
+ 	memset((void *) pstate->parallelSlot, 0, slotSize);
+ 
+ 	/*
+ 	 * Set the pstate in the shutdown_info. The exit handler uses pstate if
+ 	 * set and falls back to AHX otherwise.
+ 	 */
+ 	shutdown_info.pstate = pstate;
+ 	on_exit_msg_func = parallel_exit_msg_func;
+ 
+ #ifdef WIN32
+ 	tMasterThreadId = GetCurrentThreadId();
+ 	termEvent = CreateEvent(NULL, true, false, "Terminate");
+ #else
+ 	signal(SIGTERM, sigTermHandler);
+ 	signal(SIGINT, sigTermHandler);
+ 	signal(SIGQUIT, sigTermHandler);
+ #endif
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ #ifdef WIN32
+ 		WorkerInfo *wi;
+ 		uintptr_t	handle;
+ #else
+ 		pid_t		pid;
+ #endif
+ 		int			pipeMW[2], pipeWM[2];
+ 
+ 		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
+ 			exit_horribly(modulename,
+ 						  "Cannot create communication channels: %s\n",
+ 						  strerror(errno));
+ 
+ 		pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ 		pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
+ 		pstate->parallelSlot[i].args->AH = NULL;
+ 		pstate->parallelSlot[i].args->te = NULL;
+ #ifdef WIN32
+ 		/* Allocate a new structure for every worker */
+ 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
+ 
+ 		wi->ropt = ropt;
+ 		wi->worker = i;
+ 		wi->AH = AH;
+ 		wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+ 		wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+ 
+ 		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
+ 								wi, 0, &(pstate->parallelSlot[i].threadId));
+ 		pstate->parallelSlot[i].hThread = handle;
+ #else
+ 		pid = fork();
+ 		if (pid == 0)
+ 		{
+ 			/* we are the worker */
+ 			int j;
+ 			int pipefd[2] = { pipeMW[PIPE_READ], pipeWM[PIPE_WRITE] };
+ 
+ 			/*
+ 			 * Store the fds for the reverse communication in pstate. Actually
+ 			 * we only use this in case of an error and don't use pstate
+ 			 * otherwise in the worker process. On Windows we write to the
+ 			 * global pstate, in Unix we write to our process-local copy but
+ 			 * that's also where we'd retrieve this information back from.
+ 			 */
+ 			pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
+ 			pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
+ 			pstate->parallelSlot[i].pid = getpid();
+ 
+ 			/*
+ 			 * Call CloneArchive on Unix as well even though technically we
+ 			 * don't need to because fork() gives us a copy in our own address
+ 			 * space already. But CloneArchive resets the state information
+ 			 * and also clones the database connection (for parallel dump)
+ 			 * which both seem kinda helpful.
+ 			 */
+ 			pstate->parallelSlot[i].args->AH = CloneArchive(AH);
+ 
+ 			/* close read end of Worker -> Master */
+ 			closesocket(pipeWM[PIPE_READ]);
+ 			/* close write end of Master -> Worker */
+ 			closesocket(pipeMW[PIPE_WRITE]);
+ 
+ 			/*
+ 			 * Close all inherited fds for communication of the master with
+ 			 * the other workers.
+ 			 */
+ 			for (j = 0; j < i; j++)
+ 			{
+ 				closesocket(pstate->parallelSlot[j].pipeRead);
+ 				closesocket(pstate->parallelSlot[j].pipeWrite);
+ 			}
+ 
+ 			SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);
+ 
+ 			exit(0);
+ 		}
+ 		else if (pid < 0)
+ 			/* fork failed */
+ 			exit_horribly(modulename,
+ 						  "could not create worker process: %s\n",
+ 						  strerror(errno));
+ 
+ 		/* we are the Master, pid > 0 here */
+ 		Assert(pid > 0);
+ 
+ 		/* close read end of Master -> Worker */
+ 		closesocket(pipeMW[PIPE_READ]);
+ 		/* close write end of Worker -> Master */
+ 		closesocket(pipeWM[PIPE_WRITE]);
+ 
+ 		pstate->parallelSlot[i].pid = pid;
+ #endif
+ 
+ 		pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+ 		pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+ 	}
+ 
+ 	return pstate;
+ }
+ 
+ /*
+  * Tell all of our workers to terminate.
+  *
+  * Pretty straightforward routine, first we tell everyone to terminate, then
+  * we listen to the workers' replies and finally close the sockets that we
+  * have used for communication.
+  */
+ void
+ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	int i;
+ 
+ 	if (pstate->numWorkers == 1)
+ 		return;
+ 
+ 	Assert(IsEveryWorkerIdle(pstate));
+ 
+ 	/* close the sockets so that the workers know they can exit */
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		closesocket(pstate->parallelSlot[i].pipeRead);
+ 		closesocket(pstate->parallelSlot[i].pipeWrite);
+ 	}
+ 	WaitForTerminatingWorkers(pstate);
+ 
+ 	/*
+ 	 * Remove the pstate again, so the exit handler in the parent will now
+ 	 * again fall back to closing AH->connection (if connected).
+ 	 */
+ 	shutdown_info.pstate = NULL;
+ 
+ 	free(pstate->parallelSlot);
+ 	free(pstate);
+ }
+ 
+ 
+ /*
+  * The sequence is the following (for dump, similar for restore):
+  *
+  * The master process starts the parallel backup in ParllelBackupStart, this
+  * forks the worker processes which enter WaitForCommand().
+  *
+  * The master process dispatches an individual work item to one of the worker
+  * processes in DispatchJobForTocEntry(). It calls
+  * AH->MasterStartParallelItemPtr, a routine of the output format. This
+  * function's arguments are the parents archive handle AH (containing the full
+  * catalog information), the TocEntry that the worker should work on and a
+  * T_Action act indicating whether this is a backup or a restore item.  The
+  * function then converts the TocEntry assignment into a string that is then
+  * sent over to the worker process. In the simplest case that would be
+  * something like "DUMP 1234", with 1234 being the TocEntry id.
+  *
+  * The worker receives the message in the routine pointed to by
+  * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to
+  * corresponding routines of the respective output format, e.g.
+  * _WorkerJobDumpDirectory().
+  *
+  * Remember that we have forked off the workers only after we have read in the
+  * catalog. That's why our worker processes can also access the catalog
+  * information. Now they re-translate the textual representation to a TocEntry
+  * on their side and do the required action (restore or dump).
+  *
+  * The result is again a textual string that is sent back to the master and is
+  * interpreted by AH->MasterEndParallelItemPtr. This function can update state
+  * or catalog information on the master's side, depending on the reply from
+  * the worker process. In the end it returns status which is 0 for successful
+  * execution.
+  *
+  * ---------------------------------------------------------------------
+  * Master                                   Worker
+  *
+  *                                          enters WaitForCommands()
+  * DispatchJobForTocEntry(...te...)
+  *
+  * [ Worker is IDLE ]
+  *
+  * arg = (MasterStartParallelItemPtr)()
+  * send: DUMP arg
+  *                                          receive: DUMP arg
+  *                                          str = (WorkerJobDumpPtr)(arg)
+  * [ Worker is WORKING ]                    ... gets te from arg ...
+  *                                          ... dump te ...
+  *                                          send: OK DUMP info
+  *
+  * In ListenToWorkers():
+  *
+  * [ Worker is FINISHED ]
+  * receive: OK DUMP info
+  * status = (MasterEndParallelItemPtr)(info)
+  *
+  * In ReapWorkerStatus(&ptr):
+  * *ptr = status;
+  * [ Worker is IDLE ]
+  * ---------------------------------------------------------------------
+  */
+ void
+ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
+ 					   T_Action act)
+ {
+ 	int		worker;
+ 	char   *arg;
+ 
+ 	/* our caller makes sure that at least one worker is idle */
+ 	Assert(GetIdleWorker(pstate) != NO_SLOT);
+ 	worker = GetIdleWorker(pstate);
+ 	Assert(worker != NO_SLOT);
+ 
+ 	arg = (AH->MasterStartParallelItemPtr)(AH, te, act);
+ 
+ 	sendMessageToWorker(pstate, worker, arg);
+ 
+ 	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
+ 	pstate->parallelSlot[worker].args->te = te;
+ }
+ 
+ /*
+  * Find the first free parallel slot (if any).
+  */
+ int
+ GetIdleWorker(ParallelState *pstate)
+ {
+ 	int			i;
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
+ 			return i;
+ 	return NO_SLOT;
+ }
+ 
+ /*
+  * Return true iff every worker process is in the WRKR_TERMINATED state.
+  */
+ static bool
+ HasEveryWorkerTerminated(ParallelState *pstate)
+ {
+ 	int			i;
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ 			return false;
+ 	return true;
+ }
+ 
+ /*
+  * Return true iff every worker is in the WRKR_IDLE state.
+  */
+ bool
+ IsEveryWorkerIdle(ParallelState *pstate)
+ {
+ 	int			i;
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
+ 			return false;
+ 	return true;
+ }
+ 
+ /*
+  * ---------------------------------------------------------------------
+  * One danger of the parallel backup is a possible deadlock:
+  *
+  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
+  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
+  *    because the master holds a conflicting ACCESS SHARE lock).
+  * 3) The worker process also requests an ACCESS SHARE lock to read the table.
+  *    The worker's not granted that lock but is enqueued behind the ACCESS
+  *    EXCLUSIVE lock request.
+  * ---------------------------------------------------------------------
+  *
+  * Now what we do here is to just request a lock in ACCESS SHARE but with
+  * NOWAIT in the worker prior to touching the table. If we don't get the lock,
+  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
+  * are good to just fail the whole backup because we have detected a deadlock.
+  */
+ static void
+ lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	Archive *AHX = (Archive *) AH;
+ 	const char *qualId;
+ 	PQExpBuffer query = createPQExpBuffer();
+ 	PGresult   *res;
+ 
+ 	Assert(AH->format == archDirectory);
+ 	Assert(strcmp(te->desc, "BLOBS") != 0);
+ 
+ 	appendPQExpBuffer(query,
+ 					  "SELECT pg_namespace.nspname,"
+ 					  "       pg_class.relname "
+ 					  "  FROM pg_class "
+ 					  "  JOIN pg_namespace on pg_namespace.oid = relnamespace "
+ 					  " WHERE pg_class.oid = %d", te->catalogId.oid);
+ 
+ 	res = PQexec(AH->connection, query->data);
+ 
+ 	if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		exit_horribly(modulename,
+ 					  "could not get relation name for oid %d: %s\n",
+ 					  te->catalogId.oid, PQerrorMessage(AH->connection));
+ 
+ 	resetPQExpBuffer(query);
+ 
+ 	qualId = fmtQualifiedId(AHX->remoteVersion,
+ 							PQgetvalue(res, 0, 0),
+ 							PQgetvalue(res, 0, 1));
+ 
+ 	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
+ 					  qualId);
+ 	PQclear(res);
+ 
+ 	res = PQexec(AH->connection, query->data);
+ 
+ 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ 		exit_horribly(modulename,
+ 					  "could not obtain lock on relation \"%s\". This "
+ 					  "usually means that someone requested an ACCESS EXCLUSIVE lock "
+ 					  "on the table after the pg_dump parent process has gotten the "
+ 					  "initial ACCESS SHARE lock on the table.\n", qualId);
+ 
+ 	PQclear(res);
+ 	destroyPQExpBuffer(query);
+ }
+ 
+ /*
+  * That's the main routine for the worker.
+  * When it starts up it enters this routine and waits for commands from the
+  * master process. After having processed a command it comes back to here to
+  * wait for the next command. Finally it will receive a TERMINATE command and
+  * exit.
+  */
+ static void
+ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
+ {
+ 	char	   *command;
+ 	DumpId		dumpId;
+ 	int			nBytes;
+ 	char	   *str = NULL;
+ 	TocEntry   *te;
+ 
+ 	for(;;)
+ 	{
+ 		if (!(command = getMessageFromMaster(pipefd)))
+ 		{
+ 			PQfinish(AH->connection);
+ 			AH->connection = NULL;
+ 			return;
+ 		}
+ 
+ 		if (messageStartsWith(command, "DUMP "))
+ 		{
+ 			Assert(AH->format == archDirectory);
+ 			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
+ 			Assert(nBytes == strlen(command) - strlen("DUMP "));
+ 
+ 			te = getTocEntryByDumpId(AH, dumpId);
+ 			Assert(te != NULL);
+ 
+ 			/*
+ 			 * Lock the table but with NOWAIT. Note that the parent is already
+ 			 * holding a lock. If we cannot acquire another ACCESS SHARE MODE
+ 			 * lock, then somebody else has requested an exclusive lock in the
+ 			 * meantime.  lockTableNoWait dies in this case to prevent a
+ 			 * deadlock.
+ 			 */
+ 			if (strcmp(te->desc, "BLOBS") != 0)
+ 				lockTableNoWait(AH, te);
+ 
+ 			/*
+ 			 * The message we return here has been pg_malloc()ed and we are
+ 			 * responsible for free()ing it.
+ 			 */
+ 			str = (AH->WorkerJobDumpPtr)(AH, te);
+ 			Assert(AH->connection != NULL);
+ 			sendMessageToMaster(pipefd, str);
+ 			free(str);
+ 		}
+ 		else if (messageStartsWith(command, "RESTORE "))
+ 		{
+ 			Assert(AH->format == archDirectory || AH->format == archCustom);
+ 			Assert(AH->connection != NULL);
+ 
+ 			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
+ 			Assert(nBytes == strlen(command) - strlen("RESTORE "));
+ 
+ 			te = getTocEntryByDumpId(AH, dumpId);
+ 			Assert(te != NULL);
+ 			/*
+ 			 * The message we return here has been pg_malloc()ed and we are
+ 			 * responsible for free()ing it.
+ 			 */
+ 			str = (AH->WorkerJobRestorePtr)(AH, te);
+ 			Assert(AH->connection != NULL);
+ 			sendMessageToMaster(pipefd, str);
+ 			free(str);
+ 		}
+ 		else
+ 			exit_horribly(modulename,
+ 						  "Unknown command on communication channel: %s\n",
+ 						  command);
+ 	}
+ }
+ 
+ /*
+  * ---------------------------------------------------------------------
+  * Note the status change:
+  *
+  * DispatchJobForTocEntry		WRKR_IDLE -> WRKR_WORKING
+  * ListenToWorkers				WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
+  * ReapWorkerStatus				WRKR_FINISHED -> WRKR_IDLE
+  * ---------------------------------------------------------------------
+  *
+  * Just calling ReapWorkerStatus() when all workers are working might or might
+  * not give you an idle worker because you need to call ListenToWorkers() in
+  * between and only thereafter ReapWorkerStatus(). This is necessary in order
+  * to get and deal with the status (=result) of the worker's execution.
+  */
+ void
+ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
+ {
+ 	int			worker;
+ 	char	   *msg;
+ 
+ 	msg = getMessageFromWorker(pstate, do_wait, &worker);
+ 
+ 	if (!msg)
+ 	{
+ 		if (do_wait)
+ 			exit_horribly(modulename, "A worker process died unexpectedly\n");
+ 		return;
+ 	}
+ 
+ 	if (messageStartsWith(msg, "OK "))
+ 	{
+ 		char	   *statusString;
+ 		TocEntry   *te;
+ 
+ 		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
+ 		te = pstate->parallelSlot[worker].args->te;
+ 		if (messageStartsWith(msg, "OK RESTORE "))
+ 		{
+ 			statusString = msg + strlen("OK RESTORE ");
+ 			pstate->parallelSlot[worker].status =
+ 				(AH->MasterEndParallelItemPtr)
+ 					(AH, te, statusString, ACT_RESTORE);
+ 		}
+ 		else if (messageStartsWith(msg, "OK DUMP "))
+ 		{
+ 			statusString = msg + strlen("OK DUMP ");
+ 			pstate->parallelSlot[worker].status =
+ 				(AH->MasterEndParallelItemPtr)
+ 					(AH, te, statusString, ACT_DUMP);
+ 		}
+ 		else
+ 			exit_horribly(modulename,
+ 						  "Invalid message received from worker: %s\n", msg);
+ 	}
+ 	else if (messageStartsWith(msg, "ERROR "))
+ 	{
+ 		Assert(AH->format == archDirectory || AH->format == archCustom);
+ 		pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
+ 		exit_horribly(modulename, "%s", msg + strlen("ERROR "));
+ 	}
+ 	else
+ 		exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);
+ 
+ 	/* both Unix and Win32 return pg_malloc()ed space, so we free it */
+ 	free(msg);
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * This function is used to get the return value of a terminated worker
+  * process. If a process has terminated, its status is stored in *status and
+  * the id of the worker is returned.
+  */
+ int
+ ReapWorkerStatus(ParallelState *pstate, int *status)
+ {
+ 	int			i;
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
+ 		{
+ 			*status = pstate->parallelSlot[i].status;
+ 			pstate->parallelSlot[i].status = 0;
+ 			pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
+ 			return i;
+ 		}
+ 	}
+ 	return NO_SLOT;
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It looks for an idle worker process and only returns if there is one.
+  */
+ void
+ EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	int		ret_worker;
+ 	int		work_status;
+ 
+ 	for (;;)
+ 	{
+ 		int nTerm = 0;
+ 		while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+ 		{
+ 			if (work_status != 0)
+ 				exit_horribly(modulename, "Error processing a parallel work item.\n");
+ 
+ 			nTerm++;
+ 		}
+ 
+ 		/*
+ 		 * We need to make sure that we have an idle worker before dispatching
+ 		 * the next item. If nTerm > 0 we already have that (quick check).
+ 		 */
+ 		if (nTerm > 0)
+ 			return;
+ 
+ 		/* explicit check for an idle worker */
+ 		if (GetIdleWorker(pstate) != NO_SLOT)
+ 			return;
+ 
+ 		/*
+ 		 * If we have no idle worker, read the result of one or more
+ 		 * workers and loop the loop to call ReapWorkerStatus() on them
+ 		 */
+ 		ListenToWorkers(AH, pstate, true);
+ 	}
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It waits for all workers to terminate.
+  */
+ void
+ EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ 	int			work_status;
+ 
+ 	if (!pstate || pstate->numWorkers == 1)
+ 		return;
+ 
+ 	/* Waiting for the remaining worker processes to finish */
+ 	while (!IsEveryWorkerIdle(pstate))
+ 	{
+ 		if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
+ 			ListenToWorkers(AH, pstate, true);
+ 		else if (work_status != 0)
+ 			exit_horribly(modulename,
+ 						  "Error processing a parallel work item\n");
+ 	}
+ }
+ 
+ /*
+  * This function is executed in the worker process.
+  *
+  * It returns the next message on the communication channel, blocking until it
+  * becomes available.
+  */
+ static char *
+ getMessageFromMaster(int pipefd[2])
+ {
+ 	return readMessageFromPipe(pipefd[PIPE_READ]);
+ }
+ 
+ /*
+  * This function is executed in the worker process.
+  *
+  * It sends a message to the master on the communication channel.
+  */
+ static void
+ sendMessageToMaster(int pipefd[2], const char *str)
+ {
+ 	int			len = strlen(str) + 1;
+ 
+ 	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
+ 		exit_horribly(modulename,
+ 					  "Error writing to the communication channel: %s\n",
+ 					  strerror(errno));
+ }
+ 
+ /*
+  * A select loop that repeats calling select until a descriptor in the read
+  * set becomes readable. On Windows we have to check for the termination event
+  * from time to time, on Unix we can just block forever.
+  */
+ #ifdef WIN32
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ 	int			i;
+ 	fd_set		saveSet = *workerset;
+ 
+ 	/* should always be the master */
+ 	Assert(tMasterThreadId == GetCurrentThreadId());
+ 
+ 	for (;;)
+ 	{
+ 		/*
+ 		 * sleep a quarter of a second before checking if we should
+ 		 * terminate.
+ 		 */
+ 		struct timeval tv = { 0, 250000 };
+ 		*workerset = saveSet;
+ 		i = select(maxFd + 1, workerset, NULL, NULL, &tv);
+ 
+ 		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
+ 			continue;
+ 		if (i)
+ 			break;
+ 	}
+ 
+ 	return i;
+ }
+ #else /* UNIX */
+ static int
+ select_loop(int maxFd, fd_set *workerset)
+ {
+ 	int		i;
+ 
+ 	fd_set saveSet = *workerset;
+ 	for (;;)
+ 	{
+ 		*workerset = saveSet;
+ 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
+ 
+ 		/*
+ 		 * If we Ctrl-C the master process , it's likely that we interrupt
+ 		 * select() here. The signal handler will set wantAbort == true and
+ 		 * the shutdown journey starts from here. Note that we'll come back
+ 		 * here later when we tell all workers to terminate and read their
+ 		 * responses. But then we have aborting set to true.
+ 		 */
+ 		if (wantAbort && !aborting)
+ 			exit_horribly(modulename, "terminated by user\n");
+ 
+ 		if (i < 0 && errno == EINTR)
+ 			continue;
+ 		break;
+ 	}
+ 
+ 	return i;
+ }
+ #endif
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It returns the next message from the worker on the communication channel,
+  * optionally blocking (do_wait) until it becomes available.
+  *
+  * The id of the worker is returned in *worker.
+  */
+ static char *
+ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
+ {
+ 	int			i;
+ 	fd_set		workerset;
+ 	int			maxFd = -1;
+ 	struct		timeval nowait = { 0, 0 };
+ 
+ 	FD_ZERO(&workerset);
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+ 			continue;
+ 		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
+ 		/* actually WIN32 ignores the first parameter to select()... */
+ 		if (pstate->parallelSlot[i].pipeRead > maxFd)
+ 			maxFd = pstate->parallelSlot[i].pipeRead;
+ 	}
+ 
+ 	if (do_wait)
+ 	{
+ 		i = select_loop(maxFd, &workerset);
+ 		Assert(i != 0);
+ 	}
+ 	else
+ 	{
+ 		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
+ 			return NULL;
+ 	}
+ 
+ 	if (i < 0)
+ 		exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno));
+ 
+ 	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
+ 		char	   *msg;
+ 
+ 		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
+ 			continue;
+ 
+ 		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
+ 		*worker = i;
+ 		return msg;
+ 	}
+ 	Assert(false);
+ 	return NULL;
+ }
+ 
+ /*
+  * This function is executed in the master process.
+  *
+  * It sends a message to a certain worker on the communication channel.
+  */
+ static void
+ sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
+ {
+ 	int			len = strlen(str) + 1;
+ 
+ 	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
+ 	{
+ 		/*
+ 		 * If we're already aborting anyway, don't care if we succeed or not.
+ 		 * The child might have gone already.
+ 		 */
+ #ifndef WIN32
+ 		if (!aborting)
+ #endif
+ 			exit_horribly(modulename,
+ 						  "Error writing to the communication channel: %s\n",
+ 						  strerror(errno));
+ 	}
+ }
+ 
+ /*
+  * The underlying function to read a message from the communication channel
+  * (fd) with optional blocking (do_wait).
+  */
+ static char *
+ readMessageFromPipe(int fd)
+ {
+ 	char	   *msg;
+ 	int			msgsize, bufsize;
+ 	int			ret;
+ 
+ 	/*
+ 	 * The problem here is that we need to deal with several possibilites:
+ 	 * we could receive only a partial message or several messages at once.
+ 	 * The caller expects us to return exactly one message however.
+ 	 *
+ 	 * We could either read in as much as we can and keep track of what we
+ 	 * delivered back to the caller or we just read byte by byte. Once we see
+ 	 * (char) 0, we know that it's the message's end. This would be quite
+ 	 * inefficient for more data but since we are reading only on the command
+ 	 * channel, the performance loss does not seem worth the trouble of
+ 	 * keeping internal states for different file descriptors.
+ 	 */
+ 	bufsize = 64;  /* could be any number */
+ 	msg = (char *) pg_malloc(bufsize);
+ 
+ 	msgsize = 0;
+ 	for (;;)
+ 	{
+ 		Assert(msgsize <= bufsize);
+ 		ret = piperead(fd, msg + msgsize, 1);
+ 
+ 		/* worker has closed the connection or another error happened */
+ 		if (ret <= 0)
+ 			return NULL;
+ 
+ 		Assert(ret == 1);
+ 
+ 		if (msg[msgsize] == '\0')
+ 			return msg;
+ 
+ 		msgsize++;
+ 		if (msgsize == bufsize)
+ 		{
+ 			/* could be any number */
+ 			bufsize += 16;
+ 			msg = (char *) realloc(msg, bufsize);
+ 		}
+ 	}
+ }
+ 
+ #ifdef WIN32
+ /*
+  * This is a replacement version of pipe for Win32 which allows returned
+  * handles to be used in select(). Note that read/write calls must be replaced
+  * with recv/send.
+  */
+ static int
+ pgpipe(int handles[2])
+ {
+ 	SOCKET		s;
+ 	struct sockaddr_in serv_addr;
+ 	int			len = sizeof(serv_addr);
+ 
+ 	handles[0] = handles[1] = INVALID_SOCKET;
+ 
+ 	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ 	{
+ 		write_msg(modulename, "pgpipe could not create socket: %ui",
+ 				  WSAGetLastError());
+ 		return -1;
+ 	}
+ 
+ 	memset((void *) &serv_addr, 0, sizeof(serv_addr));
+ 	serv_addr.sin_family = AF_INET;
+ 	serv_addr.sin_port = htons(0);
+ 	serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ 	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
+ 	{
+ 		write_msg(modulename, "pgpipe could not bind: %ui",
+ 				  WSAGetLastError());
+ 		closesocket(s);
+ 		return -1;
+ 	}
+ 	if (listen(s, 1) == SOCKET_ERROR)
+ 	{
+ 		write_msg(modulename, "pgpipe could not listen: %ui",
+ 				  WSAGetLastError());
+ 		closesocket(s);
+ 		return -1;
+ 	}
+ 	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
+ 	{
+ 		write_msg(modulename, "pgpipe could not getsockname: %ui",
+ 				  WSAGetLastError());
+ 		closesocket(s);
+ 		return -1;
+ 	}
+ 	if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ 	{
+ 		write_msg(modulename, "pgpipe could not create socket 2: %ui",
+ 				  WSAGetLastError());
+ 		closesocket(s);
+ 		return -1;
+ 	}
+ 
+ 	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
+ 	{
+ 		write_msg(modulename, "pgpipe could not connect socket: %ui",
+ 				  WSAGetLastError());
+ 		closesocket(s);
+ 		return -1;
+ 	}
+ 	if ((handles[0] = accept(s, (SOCKADDR *) &serv_addr, &len)) == INVALID_SOCKET)
+ 	{
+ 		write_msg(modulename, "pgpipe could not accept socket: %ui",
+ 				  WSAGetLastError());
+ 		closesocket(handles[1]);
+ 		handles[1] = INVALID_SOCKET;
+ 		closesocket(s);
+ 		return -1;
+ 	}
+ 	closesocket(s);
+ 	return 0;
+ }
+ 
+ static int
+ piperead(int s, char *buf, int len)
+ {
+ 	int			ret = recv(s, buf, len, 0);
+ 
+ 	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
+ 		/* EOF on the pipe! (win32 socket based implementation) */
+ 		ret = 0;
+ 	return ret;
+ }
+ #endif
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index ...3eafe2f .
*** a/src/bin/pg_dump/parallel.h
--- b/src/bin/pg_dump/parallel.h
***************
*** 0 ****
--- 1,86 ----
+ /*-------------------------------------------------------------------------
+  *
+  * parallel.h
+  *
+  *	Parallel support header file for the pg_dump archiver
+  *
+  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *	The author is not responsible for loss or damages that may
+  *	result from its use.
+  *
+  * IDENTIFICATION
+  *		src/bin/pg_dump/parallel.h
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "pg_backup_db.h"
+ 
+ struct _archiveHandle;
+ struct _tocEntry;
+ 
+ typedef enum
+ {
+ 	WRKR_TERMINATED = 0,
+ 	WRKR_IDLE,
+ 	WRKR_WORKING,
+ 	WRKR_FINISHED
+ } T_WorkerStatus;
+ 
+ typedef enum T_Action
+ {
+ 	ACT_DUMP,
+ 	ACT_RESTORE,
+ } T_Action;
+ 
+ /* Arguments needed for a worker process */
+ typedef struct ParallelArgs
+ {
+ 	struct _archiveHandle *AH;
+ 	struct _tocEntry	  *te;
+ } ParallelArgs;
+ 
+ /* State for each parallel activity slot */
+ typedef struct ParallelSlot
+ {
+ 	ParallelArgs	   *args;
+ 	T_WorkerStatus		workerStatus;
+ 	int					status;
+ 	int					pipeRead;
+ 	int					pipeWrite;
+ 	int					pipeRevRead;
+ 	int					pipeRevWrite;
+ #ifdef WIN32
+ 	uintptr_t			hThread;
+ 	unsigned int		threadId;
+ #else
+ 	pid_t				pid;
+ #endif
+ } ParallelSlot;
+ 
+ #define NO_SLOT (-1)
+ 
+ typedef struct ParallelState
+ {
+ 	int			numWorkers;
+ 	ParallelSlot *parallelSlot;
+ } ParallelState;
+ 
+ extern int GetIdleWorker(ParallelState *pstate);
+ extern bool IsEveryWorkerIdle(ParallelState *pstate);
+ extern void ListenToWorkers(struct _archiveHandle *AH, ParallelState *pstate, bool do_wait);
+ extern int ReapWorkerStatus(ParallelState *pstate, int *status);
+ extern void EnsureIdleWorker(struct _archiveHandle *AH, ParallelState *pstate);
+ extern void EnsureWorkersFinished(struct _archiveHandle *AH, ParallelState *pstate);
+ 
+ extern ParallelState *ParallelBackupStart(struct _archiveHandle *AH,
+ 										  RestoreOptions *ropt);
+ extern void DispatchJobForTocEntry(struct _archiveHandle *AH,
+ 								   ParallelState *pstate,
+ 								   struct _tocEntry *te, T_Action act);
+ extern void ParallelBackupEnd(struct _archiveHandle *AH, ParallelState *pstate);
+ 
+ extern void checkAborting(struct _archiveHandle *AH);
+ 
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index b82171c..6236544 100644
*** a/src/bin/pg_dump/pg_backup.h
--- b/src/bin/pg_dump/pg_backup.h
*************** struct Archive
*** 82,91 ****
--- 82,93 ----
  	int			maxRemoteVersion;
  
  	int			numWorkers;		/* number of parallel processes */
+ 	char	   *sync_snapshot_id;  /* sync snapshot id for parallel operation */
  
  	/* info needed for string escaping */
  	int			encoding;		/* libpq code for client_encoding */
  	bool		std_strings;	/* standard_conforming_strings */
+ 	char	   *use_role;		/* Issue SET ROLE to this */
  
  	/* error handling */
  	bool		exit_on_error;	/* whether to exit on SQL errors... */
*************** extern void PrintTOCSummary(Archive *AH,
*** 196,201 ****
--- 198,206 ----
  
  extern RestoreOptions *NewRestoreOptions(void);
  
+ /* We have one in pg_dump.c and another one in pg_restore.c */
+ extern void _SetupWorker(Archive *AHX, RestoreOptions *ropt);
+ 
  /* Rearrange and filter TOC entries */
  extern void SortTocFromFile(Archive *AHX, RestoreOptions *ropt);
  
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 017cc73..417e961 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 23,30 ****
--- 23,32 ----
  #include "pg_backup_db.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  #include <ctype.h>
+ #include <fcntl.h>
  #include <unistd.h>
  #include <sys/stat.h>
  #include <sys/types.h>
***************
*** 36,107 ****
  
  #include "libpq/libpq-fs.h"
  
- /*
-  * Special exit values from worker children.  We reserve 0 for normal
-  * success; 1 and other small values should be interpreted as crashes.
-  */
- #define WORKER_CREATE_DONE		10
- #define WORKER_INHIBIT_DATA		11
- #define WORKER_IGNORED_ERRORS	12
- 
- /*
-  * Unix uses exit to return result from worker child, so function is void.
-  * Windows thread result comes via function return.
-  */
- #ifndef WIN32
- #define parallel_restore_result void
- #else
- #define parallel_restore_result DWORD
- #endif
- 
- /* IDs for worker children are either PIDs or thread handles */
- #ifndef WIN32
- #define thandle pid_t
- #else
- #define thandle HANDLE
- #endif
- 
- typedef struct ParallelStateEntry
- {
- #ifdef WIN32
- 	unsigned int threadId;
- #else
- 	pid_t		pid;
- #endif
- 	ArchiveHandle *AH;
- } ParallelStateEntry;
- 
- typedef struct ParallelState
- {
- 	int			numWorkers;
- 	ParallelStateEntry *pse;
- } ParallelState;
- 
- /* Arguments needed for a worker child */
- typedef struct _restore_args
- {
- 	ArchiveHandle *AH;
- 	TocEntry   *te;
- 	ParallelStateEntry *pse;
- } RestoreArgs;
- 
- /* State for each parallel activity slot */
- typedef struct _parallel_slot
- {
- 	thandle		child_id;
- 	RestoreArgs *args;
- } ParallelSlot;
- 
- typedef struct ShutdownInformation
- {
- 	ParallelState *pstate;
- 	Archive    *AHX;
- } ShutdownInformation;
- 
- static ShutdownInformation shutdown_info;
- 
- #define NO_SLOT (-1)
- 
  #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
  #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
  
--- 38,43 ----
*************** static void RestoreOutput(ArchiveHandle
*** 150,171 ****
  static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel);
  static void restore_toc_entries_prefork(ArchiveHandle *AH);
! static void restore_toc_entries_parallel(ArchiveHandle *AH, TocEntry *pending_list);
  static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
- static thandle spawn_restore(RestoreArgs *args);
- static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
- static bool work_in_progress(ParallelSlot *slots, int n_slots);
- static int	get_next_slot(ParallelSlot *slots, int n_slots);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
! 				   ParallelSlot *slots, int n_slots);
! static parallel_restore_result parallel_restore(RestoreArgs *args);
  static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   thandle worker, int status,
! 			   ParallelSlot *slots, int n_slots);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
--- 86,103 ----
  static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel);
  static void restore_toc_entries_prefork(ArchiveHandle *AH);
! static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
! 										 TocEntry *pending_list);
  static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
  static void par_list_header_init(TocEntry *l);
  static void par_list_append(TocEntry *l, TocEntry *te);
  static void par_list_remove(TocEntry *te);
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
! 				   ParallelState *pstate);
  static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
*************** static void reduce_dependencies(ArchiveH
*** 175,186 ****
  static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
  static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
  
- static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH);
- static void unsetProcessIdentifier(ParallelStateEntry *pse);
- static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate);
- static void archive_close_connection(int code, void *arg);
- 
- 
  /*
   *	Wrapper functions.
   *
--- 107,112 ----
*************** RestoreArchive(Archive *AHX)
*** 491,497 ****
  	 */
  	if (parallel_mode)
  	{
! 		TocEntry pending_list;
  
  		par_list_header_init(&pending_list);
  
--- 417,424 ----
  	 */
  	if (parallel_mode)
  	{
! 		ParallelState  *pstate;
! 		TocEntry		pending_list;
  
  		par_list_header_init(&pending_list);
  
*************** RestoreArchive(Archive *AHX)
*** 499,506 ****
  		restore_toc_entries_prefork(AH);
  		Assert(AH->connection == NULL);
  
! 		/* This will actually fork the processes */
! 		restore_toc_entries_parallel(AH, &pending_list);
  
  		/* reconnect the master and see if we missed something */
  		restore_toc_entries_postfork(AH, &pending_list);
--- 426,435 ----
  		restore_toc_entries_prefork(AH);
  		Assert(AH->connection == NULL);
  
! 		/* ParallelBackupStart() will actually fork the processes */
! 		pstate = ParallelBackupStart(AH, ropt);
! 		restore_toc_entries_parallel(AH, pstate, &pending_list);
! 		ParallelBackupEnd(AH, pstate);
  
  		/* reconnect the master and see if we missed something */
  		restore_toc_entries_postfork(AH, &pending_list);
*************** static int
*** 564,570 ****
  restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel)
  {
! 	int			retval = 0;
  	teReqs		reqs;
  	bool		defnDumped;
  
--- 493,499 ----
  restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  				  RestoreOptions *ropt, bool is_parallel)
  {
! 	int			status = WORKER_OK;
  	teReqs		reqs;
  	bool		defnDumped;
  
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 617,623 ****
  				if (ropt->noDataForFailedTables)
  				{
  					if (is_parallel)
! 						retval = WORKER_INHIBIT_DATA;
  					else
  						inhibit_data_for_failed_table(AH, te);
  				}
--- 546,552 ----
  				if (ropt->noDataForFailedTables)
  				{
  					if (is_parallel)
! 						status = WORKER_INHIBIT_DATA;
  					else
  						inhibit_data_for_failed_table(AH, te);
  				}
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 632,638 ****
  				 * just set the return value.
  				 */
  				if (is_parallel)
! 					retval = WORKER_CREATE_DONE;
  				else
  					mark_create_done(AH, te);
  			}
--- 561,567 ----
  				 * just set the return value.
  				 */
  				if (is_parallel)
! 					status = WORKER_CREATE_DONE;
  				else
  					mark_create_done(AH, te);
  			}
*************** restore_toc_entry(ArchiveHandle *AH, Toc
*** 750,756 ****
  		}
  	}
  
! 	return retval;
  }
  
  /*
--- 679,688 ----
  		}
  	}
  
! 	if (AH->public.n_errors > 0 && status == WORKER_OK)
! 		status = WORKER_IGNORED_ERRORS;
! 
! 	return status;
  }
  
  /*
*************** _allocAH(const char *FileSpec, const Arc
*** 2141,2190 ****
  	return AH;
  }
  
- 
  void
! WriteDataChunks(ArchiveHandle *AH)
  {
  	TocEntry   *te;
- 	StartDataPtr startPtr;
- 	EndDataPtr	endPtr;
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0)
! 		{
! 			AH->currToc = te;
! 			/* printf("Writing data for %d (%x)\n", te->id, te); */
! 
! 			if (strcmp(te->desc, "BLOBS") == 0)
! 			{
! 				startPtr = AH->StartBlobsPtr;
! 				endPtr = AH->EndBlobsPtr;
! 			}
! 			else
! 			{
! 				startPtr = AH->StartDataPtr;
! 				endPtr = AH->EndDataPtr;
! 			}
  
! 			if (startPtr != NULL)
! 				(*startPtr) (AH, te);
  
  			/*
! 			 * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
  			 */
  
! 			/*
! 			 * The user-provided DataDumper routine needs to call
! 			 * AH->WriteData
! 			 */
! 			(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
  
! 			if (endPtr != NULL)
! 				(*endPtr) (AH, te);
! 			AH->currToc = NULL;
! 		}
  	}
  }
  
  void
--- 2073,2139 ----
  	return AH;
  }
  
  void
! WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
  {
  	TocEntry   *te;
  
  	for (te = AH->toc->next; te != AH->toc; te = te->next)
  	{
! 		if (!te->dataDumper)
! 			continue;
  
! 		if ((te->reqs & REQ_DATA) == 0)
! 			continue;
  
+ 		if (pstate && pstate->numWorkers > 1)
+ 		{
  			/*
! 			 * If we are in a parallel backup, then we are always the master
! 			 * process.
  			 */
+ 			EnsureIdleWorker(AH, pstate);
+ 			Assert(GetIdleWorker(pstate) != NO_SLOT);
+ 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+ 		}
+ 		else
+ 			WriteDataChunksForTocEntry(AH, te);
+ 	}
+ 	EnsureWorkersFinished(AH, pstate);
+ }
  
! void
! WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
! {
! 	StartDataPtr startPtr;
! 	EndDataPtr	endPtr;
  
! 	AH->currToc = te;
! 
! 	if (strcmp(te->desc, "BLOBS") == 0)
! 	{
! 		startPtr = AH->StartBlobsPtr;
! 		endPtr = AH->EndBlobsPtr;
! 	}
! 	else
! 	{
! 		startPtr = AH->StartDataPtr;
! 		endPtr = AH->EndDataPtr;
  	}
+ 
+ 	if (startPtr != NULL)
+ 		(*startPtr) (AH, te);
+ 
+ 	/*
+ 	 * The user-provided DataDumper routine needs to call
+ 	 * AH->WriteData
+ 	 */
+ 	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+ 
+ 	if (endPtr != NULL)
+ 		(*endPtr) (AH, te);
+ 
+ 	AH->currToc = NULL;
  }
  
  void
*************** dumpTimestamp(ArchiveHandle *AH, const c
*** 3409,3475 ****
  }
  
  static void
- setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH)
- {
- #ifdef WIN32
- 	pse->threadId = GetCurrentThreadId();
- #else
- 	pse->pid = getpid();
- #endif
- 	pse->AH = AH;
- }
- 
- static void
- unsetProcessIdentifier(ParallelStateEntry *pse)
- {
- #ifdef WIN32
- 	pse->threadId = 0;
- #else
- 	pse->pid = 0;
- #endif
- 	pse->AH = NULL;
- }
- 
- static ParallelStateEntry *
- GetMyPSEntry(ParallelState *pstate)
- {
- 	int			i;
- 
- 	for (i = 0; i < pstate->numWorkers; i++)
- #ifdef WIN32
- 		if (pstate->pse[i].threadId == GetCurrentThreadId())
- #else
- 		if (pstate->pse[i].pid == getpid())
- #endif
- 			return &(pstate->pse[i]);
- 
- 	return NULL;
- }
- 
- static void
- archive_close_connection(int code, void *arg)
- {
- 	ShutdownInformation *si = (ShutdownInformation *) arg;
- 
- 	if (si->pstate)
- 	{
- 		ParallelStateEntry *entry = GetMyPSEntry(si->pstate);
- 
- 		if (entry != NULL && entry->AH)
- 			DisconnectDatabase(&(entry->AH->public));
- 	}
- 	else if (si->AHX)
- 		DisconnectDatabase(si->AHX);
- }
- 
- void
- on_exit_close_archive(Archive *AHX)
- {
- 	shutdown_info.AHX = AHX;
- 	on_exit_nicely(archive_close_connection, &shutdown_info);
- }
- 
- static void
  restore_toc_entries_prefork(ArchiveHandle *AH)
  {
  	RestoreOptions *ropt = AH->ropt;
--- 3358,3363 ----
*************** restore_toc_entries_prefork(ArchiveHandl
*** 3557,3603 ****
   * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
   * just as for a standard restore. This is done in restore_toc_entries_prefork().
   * Second we process the remaining non-ACL steps in parallel worker children
!  * (threads on Windows, processes on Unix), each of which connects separately
!  * to the database.
   * Finally we process all the ACL entries in a single connection (that happens
   * back in RestoreArchive).
   */
  static void
! restore_toc_entries_parallel(ArchiveHandle *AH, TocEntry *pending_list)
  {
! 	ParallelState *pstate;
! 	ParallelSlot *slots;
! 	int			n_slots = AH->public.numWorkers;
! 	TocEntry   *next_work_item;
! 	int			next_slot;
  	TocEntry	ready_list;
  	int			ret_child;
- 	bool		skipped_some;
- 	int			work_status;
- 	int			i;
  
  	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
  
- 	slots = (ParallelSlot *) pg_calloc(n_slots, sizeof(ParallelSlot));
- 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
- 	pstate->pse = (ParallelStateEntry *) pg_calloc(n_slots, sizeof(ParallelStateEntry));
- 	pstate->numWorkers = AH->public.numWorkers;
- 	for (i = 0; i < pstate->numWorkers; i++)
- 		unsetProcessIdentifier(&(pstate->pse[i]));
- 
- 	/*
- 	 * Set the pstate in the shutdown_info. The exit handler uses pstate if set
- 	 * and falls back to AHX otherwise.
- 	 */
- 	shutdown_info.pstate = pstate;
- 
  	/*
  	 * Initialize the lists of ready items, the list for pending items has
  	 * already been initialized in the caller.  After this setup, the pending
  	 * list is everything that needs to be done but is blocked by one or more
  	 * dependencies, while the ready list contains items that have no remaining
! 	 * dependencies.	Note: we don't yet filter out entries that aren't going
! 	 * to be restored.  They might participate in dependency chains connecting
  	 * entries that should be restored, so we treat them as live until we
  	 * actually process them.
  	 */
--- 3445,3474 ----
   * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
   * just as for a standard restore. This is done in restore_toc_entries_prefork().
   * Second we process the remaining non-ACL steps in parallel worker children
!  * (threads on Windows, processes on Unix), these fork off and set up their
!  * connections before we call restore_toc_entries_parallel_forked.
   * Finally we process all the ACL entries in a single connection (that happens
   * back in RestoreArchive).
   */
  static void
! restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
! 							 TocEntry *pending_list)
  {
! 	int			work_status;
! 	bool		skipped_some;
  	TocEntry	ready_list;
+ 	TocEntry   *next_work_item;
  	int			ret_child;
  
  	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
  
  	/*
  	 * Initialize the lists of ready items, the list for pending items has
  	 * already been initialized in the caller.  After this setup, the pending
  	 * list is everything that needs to be done but is blocked by one or more
  	 * dependencies, while the ready list contains items that have no remaining
! 	 * dependencies. Note: we don't yet filter out entries that aren't going
! 	 * to be restored. They might participate in dependency chains connecting
  	 * entries that should be restored, so we treat them as live until we
  	 * actually process them.
  	 */
*************** restore_toc_entries_parallel(ArchiveHand
*** 3639,3647 ****
  
  	ahlog(AH, 1, "entering main parallel loop\n");
  
! 	while ((next_work_item = get_next_work_item(AH, &ready_list,
! 												slots, n_slots)) != NULL ||
! 		   work_in_progress(slots, n_slots))
  	{
  		if (next_work_item != NULL)
  		{
--- 3510,3517 ----
  
  	ahlog(AH, 1, "entering main parallel loop\n");
  
! 	while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
! 		   !IsEveryWorkerIdle(pstate))
  	{
  		if (next_work_item != NULL)
  		{
*************** restore_toc_entries_parallel(ArchiveHand
*** 3659,3710 ****
  				continue;
  			}
  
! 			if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
! 			{
! 				/* There is work still to do and a worker slot available */
! 				thandle		child;
! 				RestoreArgs *args;
! 
! 				ahlog(AH, 1, "launching item %d %s %s\n",
! 					  next_work_item->dumpId,
! 					  next_work_item->desc, next_work_item->tag);
  
! 				par_list_remove(next_work_item);
  
! 				/* this memory is dealloced in mark_work_done() */
! 				args = pg_malloc(sizeof(RestoreArgs));
! 				args->AH = CloneArchive(AH);
! 				args->te = next_work_item;
! 				args->pse = &pstate->pse[next_slot];
  
! 				/* run the step in a worker child */
! 				child = spawn_restore(args);
  
! 				slots[next_slot].child_id = child;
! 				slots[next_slot].args = args;
  
! 				continue;
  			}
- 		}
  
! 		/*
! 		 * If we get here there must be work being done.  Either there is no
! 		 * work available to schedule (and work_in_progress returned true) or
! 		 * there are no slots available.  So we wait for a worker to finish,
! 		 * and process the result.
! 		 */
! 		ret_child = reap_child(slots, n_slots, &work_status);
  
! 		if (WIFEXITED(work_status))
! 		{
! 			mark_work_done(AH, &ready_list,
! 						   ret_child, WEXITSTATUS(work_status),
! 						   slots, n_slots);
! 		}
! 		else
! 		{
! 			exit_horribly(modulename, "worker process crashed: status %d\n",
! 						  work_status);
  		}
  	}
  
--- 3529,3586 ----
  				continue;
  			}
  
! 			ahlog(AH, 1, "launching item %d %s %s\n",
! 				  next_work_item->dumpId,
! 				  next_work_item->desc, next_work_item->tag);
  
! 			par_list_remove(next_work_item);
  
! 			Assert(GetIdleWorker(pstate) != NO_SLOT);
! 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
! 		}
! 		else
! 			/* at least one child is working and we have nothing ready. */
! 			Assert(!IsEveryWorkerIdle(pstate));
  
! 		for (;;)
! 		{
! 			int nTerm = 0;
  
! 			/*
! 			 * In order to reduce dependencies as soon as possible and
! 			 * especially to reap the status of workers who are working on
! 			 * items that pending items depend on, we do a non-blocking check
! 			 * for ended workers first.
! 			 *
! 			 * However, if we do not have any other work items currently that
! 			 * workers can work on, we do not busy-loop here but instead
! 			 * really wait for at least one worker to terminate. Hence we call
! 			 * ListenToWorkers(..., ..., do_wait = true) in this case.
! 			 */
! 			ListenToWorkers(AH, pstate, !next_work_item);
  
! 			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
! 			{
! 				nTerm++;
! 				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
  			}
  
! 			/*
! 			 * We need to make sure that we have an idle worker before re-running the
! 			 * loop. If nTerm > 0 we already have that (quick check).
! 			 */
! 			if (nTerm > 0)
! 				break;
  
! 			/* if nobody terminated, explicitly check for an idle worker */
! 			if (GetIdleWorker(pstate) != NO_SLOT)
! 				break;
! 
! 			/*
! 			 * If we have no idle worker, read the result of one or more
! 			 * workers and loop the loop to call ReapWorkerStatus() on them.
! 			 */
! 			ListenToWorkers(AH, pstate, true);
  		}
  	}
  
*************** restore_toc_entries_postfork(ArchiveHand
*** 3720,3731 ****
  	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
  
  	/*
- 	 * Remove the pstate again, so the exit handler will now fall back to
- 	 * closing AH->connection again.
- 	 */
- 	shutdown_info.pstate = NULL;
- 
- 	/*
  	 * Now reconnect the single parent connection.
  	 */
  	ConnectDatabase((Archive *) AH, ropt->dbname,
--- 3596,3601 ----
*************** restore_toc_entries_postfork(ArchiveHand
*** 3750,3870 ****
  }
  
  /*
-  * create a worker child to perform a restore step in parallel
-  */
- static thandle
- spawn_restore(RestoreArgs *args)
- {
- 	thandle		child;
- 
- 	/* Ensure stdio state is quiesced before forking */
- 	fflush(NULL);
- 
- #ifndef WIN32
- 	child = fork();
- 	if (child == 0)
- 	{
- 		/* in child process */
- 		parallel_restore(args);
- 		exit_horribly(modulename,
- 					  "parallel_restore should not return\n");
- 	}
- 	else if (child < 0)
- 	{
- 		/* fork failed */
- 		exit_horribly(modulename,
- 					  "could not create worker process: %s\n",
- 					  strerror(errno));
- 	}
- #else
- 	child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
- 									args, 0, NULL);
- 	if (child == 0)
- 		exit_horribly(modulename,
- 					  "could not create worker thread: %s\n",
- 					  strerror(errno));
- #endif
- 
- 	return child;
- }
- 
- /*
-  *	collect status from a completed worker child
-  */
- static thandle
- reap_child(ParallelSlot *slots, int n_slots, int *work_status)
- {
- #ifndef WIN32
- 	/* Unix is so much easier ... */
- 	return wait(work_status);
- #else
- 	static HANDLE *handles = NULL;
- 	int			hindex,
- 				snum,
- 				tnum;
- 	thandle		ret_child;
- 	DWORD		res;
- 
- 	/* first time around only, make space for handles to listen on */
- 	if (handles == NULL)
- 		handles = (HANDLE *) pg_calloc(sizeof(HANDLE), n_slots);
- 
- 	/* set up list of handles to listen to */
- 	for (snum = 0, tnum = 0; snum < n_slots; snum++)
- 		if (slots[snum].child_id != 0)
- 			handles[tnum++] = slots[snum].child_id;
- 
- 	/* wait for one to finish */
- 	hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
- 
- 	/* get handle of finished thread */
- 	ret_child = handles[hindex - WAIT_OBJECT_0];
- 
- 	/* get the result */
- 	GetExitCodeThread(ret_child, &res);
- 	*work_status = res;
- 
- 	/* dispose of handle to stop leaks */
- 	CloseHandle(ret_child);
- 
- 	return ret_child;
- #endif
- }
- 
- /*
-  * are we doing anything now?
-  */
- static bool
- work_in_progress(ParallelSlot *slots, int n_slots)
- {
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id != 0)
- 			return true;
- 	}
- 	return false;
- }
- 
- /*
-  * find the first free parallel slot (if any).
-  */
- static int
- get_next_slot(ParallelSlot *slots, int n_slots)
- {
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id == 0)
- 			return i;
- 	}
- 	return NO_SLOT;
- }
- 
- 
- /*
   * Check if te1 has an exclusive lock requirement for an item that te2 also
   * requires, whether or not te2's requirement is for an exclusive lock.
   */
--- 3620,3625 ----
*************** par_list_remove(TocEntry *te)
*** 3937,3943 ****
   */
  static TocEntry *
  get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! 				   ParallelSlot *slots, int n_slots)
  {
  	bool		pref_non_data = false;	/* or get from AH->ropt */
  	TocEntry   *data_te = NULL;
--- 3692,3698 ----
   */
  static TocEntry *
  get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! 				   ParallelState *pstate)
  {
  	bool		pref_non_data = false;	/* or get from AH->ropt */
  	TocEntry   *data_te = NULL;
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3952,3962 ****
  	{
  		int			count = 0;
  
! 		for (k = 0; k < n_slots; k++)
! 			if (slots[k].args->te != NULL &&
! 				slots[k].args->te->section == SECTION_DATA)
  				count++;
! 		if (n_slots == 0 || count * 4 < n_slots)
  			pref_non_data = false;
  	}
  
--- 3707,3717 ----
  	{
  		int			count = 0;
  
! 		for (k = 0; k < pstate->numWorkers; k++)
! 			if (pstate->parallelSlot[k].args->te != NULL &&
! 				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
  				count++;
! 		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
  			pref_non_data = false;
  	}
  
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3972,3984 ****
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < n_slots && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
! 			if (slots[i].args == NULL)
  				continue;
! 			running_te = slots[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
--- 3727,3739 ----
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
! 			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
  				continue;
! 			running_te = pstate->parallelSlot[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4013,4075 ****
  /*
   * Restore a single TOC item in parallel with others
   *
!  * this is the procedure run as a thread (Windows) or a
!  * separate process (everything else).
   */
! static parallel_restore_result
! parallel_restore(RestoreArgs *args)
  {
  	ArchiveHandle *AH = args->AH;
  	TocEntry   *te = args->te;
  	RestoreOptions *ropt = AH->ropt;
! 	int			retval;
! 
! 	setProcessIdentifier(args->pse, AH);
! 
! 	/*
! 	 * Close and reopen the input file so we have a private file pointer that
! 	 * doesn't stomp on anyone else's file pointer, if we're actually going to
! 	 * need to read from the file. Otherwise, just close it except on Windows,
! 	 * where it will possibly be needed by other threads.
! 	 *
! 	 * Note: on Windows, since we are using threads not processes, the reopen
! 	 * call *doesn't* close the original file pointer but just open a new one.
! 	 */
! 	if (te->section == SECTION_DATA)
! 		(AH->ReopenPtr) (AH);
! #ifndef WIN32
! 	else
! 		(AH->ClosePtr) (AH);
! #endif
! 
! 	/*
! 	 * We need our own database connection, too
! 	 */
! 	ConnectDatabase((Archive *) AH, ropt->dbname,
! 					ropt->pghost, ropt->pgport, ropt->username,
! 					ropt->promptPassword);
  
  	_doSetFixedOutputState(AH);
  
! 	/* Restore the TOC item */
! 	retval = restore_toc_entry(AH, te, ropt, true);
! 
! 	/* And clean up */
! 	DisconnectDatabase((Archive *) AH);
! 	unsetProcessIdentifier(args->pse);
  
! 	/* If we reopened the file, we are done with it, so close it now */
! 	if (te->section == SECTION_DATA)
! 		(AH->ClosePtr) (AH);
  
! 	if (retval == 0 && AH->public.n_errors)
! 		retval = WORKER_IGNORED_ERRORS;
  
! #ifndef WIN32
! 	exit(retval);
! #else
! 	return retval;
! #endif
  }
  
  
--- 3768,3796 ----
  /*
   * Restore a single TOC item in parallel with others
   *
!  * this is run in the worker, i.e. in a thread (Windows) or a separate process
!  * (everything else). A worker process executes several such work items during
!  * a parallel backup or restore. Once we terminate here and report back that
!  * our work is finished, the master process will assign us a new work item.
   */
! int
! parallel_restore(ParallelArgs *args)
  {
  	ArchiveHandle *AH = args->AH;
  	TocEntry   *te = args->te;
  	RestoreOptions *ropt = AH->ropt;
! 	int			status;
  
  	_doSetFixedOutputState(AH);
  
! 	Assert(AH->connection != NULL);
  
! 	AH->public.n_errors = 0;
  
! 	/* Restore the TOC item */
! 	status = restore_toc_entry(AH, te, ropt, true);
  
! 	return status;
  }
  
  
*************** parallel_restore(RestoreArgs *args)
*** 4081,4105 ****
   */
  static void
  mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   thandle worker, int status,
! 			   ParallelSlot *slots, int n_slots)
  {
  	TocEntry   *te = NULL;
- 	int			i;
- 
- 	for (i = 0; i < n_slots; i++)
- 	{
- 		if (slots[i].child_id == worker)
- 		{
- 			slots[i].child_id = 0;
- 			te = slots[i].args->te;
- 			DeCloneArchive(slots[i].args->AH);
- 			free(slots[i].args);
- 			slots[i].args = NULL;
  
! 			break;
! 		}
! 	}
  
  	if (te == NULL)
  		exit_horribly(modulename, "could not find slot of finished worker\n");
--- 3802,3813 ----
   */
  static void
  mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate)
  {
  	TocEntry   *te = NULL;
  
! 	te = pstate->parallelSlot[worker].args->te;
  
  	if (te == NULL)
  		exit_horribly(modulename, "could not find slot of finished worker\n");
*************** inhibit_data_for_failed_table(ArchiveHan
*** 4398,4404 ****
  	}
  }
  
- 
  /*
   * Clone and de-clone routines used in parallel restoration.
   *
--- 4106,4111 ----
*************** CloneArchive(ArchiveHandle *AH)
*** 4431,4439 ****
--- 4138,4196 ----
  	/* clone has its own error count, too */
  	clone->public.n_errors = 0;
  
+ 	/*
+ 	 * Connect our new clone object to the database:
+ 	 * In parallel restore the parent is already disconnected, because we can
+ 	 * connect the worker processes independently to the database (no snapshot
+ 	 * sync required).
+ 	 * In parallel backup we clone the parent's existing connection.
+ 	 */
+ 	if (AH->mode == archModeRead)
+ 	{
+ 		RestoreOptions *ropt = AH->ropt;
+ 		Assert(AH->connection == NULL);
+ 		/* this also sets clone->connection */
+ 		ConnectDatabase((Archive *) clone, ropt->dbname,
+ 					ropt->pghost, ropt->pgport, ropt->username,
+ 					ropt->promptPassword);
+ 	}
+ 	else
+ 	{
+ 		char	   *dbname;
+ 		char	   *pghost;
+ 		char	   *pgport;
+ 		char	   *username;
+ 		const char *encname;
+ 
+ 		Assert(AH->connection != NULL);
+ 
+ 		/*
+ 		 * Even though we are technically accessing the parent's database object
+ 		 * here, these functions are fine to be called like that because all just
+ 		 * return a pointer and do not actually send/receive any data to/from the
+ 		 * database.
+ 		 */
+ 		dbname = PQdb(AH->connection);
+ 		pghost = PQhost(AH->connection);
+ 		pgport = PQport(AH->connection);
+ 		username = PQuser(AH->connection);
+ 		encname = pg_encoding_to_char(AH->public.encoding);
+ 
+ 		/* this also sets clone->connection */
+ 		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+ 
+ 		/*
+ 		 * Set the same encoding, whatever we set here is what we got from
+ 		 * pg_encoding_to_char(), so we really shouldn't run into an error setting that
+ 		 * very same value. Also see the comment in SetupConnection().
+ 		 */
+ 		PQsetClientEncoding(clone->connection, encname);
+ 	}
+ 
  	/* Let the format-specific code have a chance too */
  	(clone->ClonePtr) (clone);
  
+ 	Assert(clone->connection != NULL);
  	return clone;
  }
  
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 2c77adc..844066e 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef z_stream *z_streamp;
*** 100,107 ****
--- 100,120 ----
  #define K_OFFSET_POS_SET 2
  #define K_OFFSET_NO_DATA 3
  
+ /*
+  * Special exit values from worker children.  We reserve 0 for normal
+  * success; 1 and other small values should be interpreted as crashes.
+  */
+ #define WORKER_OK                     0
+ #define WORKER_CREATE_DONE            10
+ #define WORKER_INHIBIT_DATA           11
+ #define WORKER_IGNORED_ERRORS         12
+ 
  struct _archiveHandle;
  struct _tocEntry;
+ struct _restoreList;
+ struct ParallelArgs;
+ struct ParallelState;
+ enum T_Action;
  
  typedef void (*ClosePtr) (struct _archiveHandle * AH);
  typedef void (*ReopenPtr) (struct _archiveHandle * AH);
*************** typedef void (*PrintTocDataPtr) (struct
*** 129,134 ****
--- 142,154 ----
  typedef void (*ClonePtr) (struct _archiveHandle * AH);
  typedef void (*DeClonePtr) (struct _archiveHandle * AH);
  
+ typedef char *(*WorkerJobRestorePtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+ typedef char *(*WorkerJobDumpPtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+ typedef char *(*MasterStartParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+ 											enum T_Action act);
+ typedef int (*MasterEndParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+ 										const char *str, enum T_Action act);
+ 
  typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len);
  
  typedef enum
*************** typedef struct _archiveHandle
*** 227,232 ****
--- 247,258 ----
  	StartBlobPtr StartBlobPtr;
  	EndBlobPtr EndBlobPtr;
  
+ 	MasterStartParallelItemPtr MasterStartParallelItemPtr;
+ 	MasterEndParallelItemPtr MasterEndParallelItemPtr;
+ 
+ 	WorkerJobDumpPtr WorkerJobDumpPtr;
+ 	WorkerJobRestorePtr WorkerJobRestorePtr;
+ 
  	ClonePtr ClonePtr;			/* Clone format-specific fields */
  	DeClonePtr DeClonePtr;		/* Clean up cloned fields */
  
*************** typedef struct _archiveHandle
*** 236,241 ****
--- 262,268 ----
  	char	   *archdbname;		/* DB name *read* from archive */
  	enum trivalue promptPassword;
  	char	   *savedPassword;	/* password for ropt->username, if known */
+ 	char	   *use_role;
  	PGconn	   *connection;
  	int			connectToDB;	/* Flag to indicate if direct DB connection is
  								 * required */
*************** typedef struct _tocEntry
*** 327,332 ****
--- 354,360 ----
  	int			nLockDeps;		/* number of such dependencies */
  } TocEntry;
  
+ extern int parallel_restore(struct ParallelArgs *args);
  extern void on_exit_close_archive(Archive *AHX);
  
  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
*************** extern void WriteHead(ArchiveHandle *AH)
*** 337,343 ****
  extern void ReadHead(ArchiveHandle *AH);
  extern void WriteToc(ArchiveHandle *AH);
  extern void ReadToc(ArchiveHandle *AH);
! extern void WriteDataChunks(ArchiveHandle *AH);
  extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
  extern void DeCloneArchive(ArchiveHandle *AH);
  
--- 365,372 ----
  extern void ReadHead(ArchiveHandle *AH);
  extern void WriteToc(ArchiveHandle *AH);
  extern void ReadToc(ArchiveHandle *AH);
! extern void WriteDataChunks(ArchiveHandle *AH, struct ParallelState *pstate);
! extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te);
  extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
  extern void DeCloneArchive(ArchiveHandle *AH);
  
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index f7dc5be..3ae3459 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "compress_io.h"
  #include "dumputils.h"
  #include "dumpmem.h"
+ #include "parallel.h"
  
  /*--------
   * Routines in the format interface
*************** static void _LoadBlobs(ArchiveHandle *AH
*** 60,65 ****
--- 61,70 ----
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
+ static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+ static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+ char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+ 
  typedef struct
  {
  	CompressorState *cs;
*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 128,133 ****
--- 133,145 ----
  	AH->ClonePtr = _Clone;
  	AH->DeClonePtr = _DeClone;
  
+ 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
+ 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
+ 
+ 	/* no parallel dump in the custom archive, only parallel restore */
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+ 
  	/* Set up a private area. */
  	ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
  	AH->formatData = (void *) ctx;
*************** _CloseArchive(ArchiveHandle *AH)
*** 699,705 ****
  		tpos = ftello(AH->FH);
  		WriteToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
! 		WriteDataChunks(AH);
  
  		/*
  		 * If possible, re-write the TOC in order to update the data offset
--- 711,717 ----
  		tpos = ftello(AH->FH);
  		WriteToc(AH);
  		ctx->dataStart = _getFilePos(AH, ctx);
! 		WriteDataChunks(AH, NULL);
  
  		/*
  		 * If possible, re-write the TOC in order to update the data offset
*************** _DeClone(ArchiveHandle *AH)
*** 797,802 ****
--- 809,889 ----
  	free(ctx);
  }
  
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * custom format archive and dumps the actual data.
+  */
+ char *
+ _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	ParallelArgs pargs;
+ 	int			status;
+ 	lclTocEntry *tctx;
+ 
+ 	tctx = (lclTocEntry *) te->formatData;
+ 
+ 	pargs.AH = AH;
+ 	pargs.te = te;
+ 
+ 	status = parallel_restore(&pargs);
+ 
+ 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the parent process. Depending on the desired
+  * action (dump or restore) it creates a string that is understood by the
+  * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static char *
+ _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ 	/*
+ 	 * A static char is okay here, even on Windows because we call this
+ 	 * function only from one process (the master).
+ 	 */
+ 	static char			buf[64]; /* short fixed-size string + number */
+ 
+ 	/* no parallel dump in the custom archive format */
+ 	Assert(act == ACT_RESTORE);
+ 
+ 	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the parent process. It analyzes the response of
+  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static int
+ _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes, status, n_errors;
+ 
+ 	/* no parallel dump in the custom archive */
+ 	Assert(act == ACT_RESTORE);
+ 
+ 	sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 	Assert(nBytes == strlen(str));
+ 	Assert(dumpId == te->dumpId);
+ 
+ 	AH->public.n_errors += n_errors;
+ 
+ 	return status;
+ }
+ 
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 5356f20..e028fda 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
***************
*** 36,41 ****
--- 36,42 ----
  #include "compress_io.h"
  #include "dumpmem.h"
  #include "dumputils.h"
+ #include "parallel.h"
  
  #include <dirent.h>
  #include <sys/stat.h>
*************** typedef struct
*** 51,56 ****
--- 52,58 ----
  	cfp		   *dataFH;			/* currently open data file */
  
  	cfp		   *blobsTocFH;		/* file handle for blobs.toc */
+ 	ParallelState *pstate;		/* for parallel backup / restore */
  } lclContext;
  
  typedef struct
*************** static int	_ReadByte(ArchiveHandle *);
*** 71,76 ****
--- 73,79 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
*************** static void _EndBlob(ArchiveHandle *AH,
*** 83,90 ****
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
! static char *prependDirectory(ArchiveHandle *AH, char *buf, const char *relativeFilename);
  
  
  /*
   *	Init routine required by ALL formats. This is a global routine
--- 86,102 ----
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
! static void _Clone(ArchiveHandle *AH);
! static void _DeClone(ArchiveHandle *AH);
! 
! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
! static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
! 								  const char *str, T_Action act);
! static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
! static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
  
+ static char *prependDirectory(ArchiveHandle *AH, char *buf,
+ 							  const char *relativeFilename);
  
  /*
   *	Init routine required by ALL formats. This is a global routine
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 111,117 ****
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
! 	AH->ReopenPtr = NULL;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
--- 123,129 ----
  	AH->WriteBufPtr = _WriteBuf;
  	AH->ReadBufPtr = _ReadBuf;
  	AH->ClosePtr = _CloseArchive;
! 	AH->ReopenPtr = _ReopenArchive;
  	AH->PrintTocDataPtr = _PrintTocData;
  	AH->ReadExtraTocPtr = _ReadExtraToc;
  	AH->WriteExtraTocPtr = _WriteExtraToc;
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 122,129 ****
  	AH->EndBlobPtr = _EndBlob;
  	AH->EndBlobsPtr = _EndBlobs;
  
! 	AH->ClonePtr = NULL;
! 	AH->DeClonePtr = NULL;
  
  	/* Set up our private context */
  	ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
--- 134,147 ----
  	AH->EndBlobPtr = _EndBlob;
  	AH->EndBlobsPtr = _EndBlobs;
  
! 	AH->ClonePtr = _Clone;
! 	AH->DeClonePtr = _DeClone;
! 
! 	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
! 	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
! 
! 	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
! 	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
  
  	/* Set up our private context */
  	ctx = (lclContext *) pg_calloc(1, sizeof(lclContext));
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 147,153 ****
  
  	if (AH->mode == archModeWrite)
  	{
! 		if (mkdir(ctx->directory, 0700) < 0)
  			exit_horribly(modulename, "could not create directory \"%s\": %s\n",
  						  ctx->directory, strerror(errno));
  	}
--- 165,192 ----
  
  	if (AH->mode == archModeWrite)
  	{
! 		struct stat st;
! 		bool is_empty = false;
! 
! 		/* we accept an empty existing directory */
! 		if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
! 		{
! 			DIR* dir = opendir(ctx->directory);
! 			if (dir) {
! 				struct dirent *d;
! 				is_empty = true;
! 				while ((d = readdir(dir))) {
! 					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
! 					{
! 						is_empty = false;
! 						break;
! 					}
! 				}
! 				closedir(dir);
! 			}
! 		}
! 
! 		if (!is_empty && mkdir(ctx->directory, 0700) < 0)
  			exit_horribly(modulename, "could not create directory \"%s\": %s\n",
  						  ctx->directory, strerror(errno));
  	}
*************** _WriteData(ArchiveHandle *AH, const void
*** 309,314 ****
--- 348,356 ----
  	if (dLen == 0)
  		return 0;
  
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	return cfwrite(data, dLen, ctx->dataFH);
  }
  
*************** _WriteBuf(ArchiveHandle *AH, const void
*** 476,481 ****
--- 518,526 ----
  	lclContext *ctx = (lclContext *) AH->formatData;
  	size_t		res;
  
+ 	/* Are we aborting? */
+ 	checkAborting(AH);
+ 
  	res = cfwrite(buf, len, ctx->dataFH);
  	if (res != len)
  		exit_horribly(modulename, "could not write to output file: %s\n",
*************** _CloseArchive(ArchiveHandle *AH)
*** 524,529 ****
--- 569,577 ----
  
  		prependDirectory(AH, fname, "toc.dat");
  
+ 		/* this will actually fork the processes for a parallel backup */
+ 		ctx->pstate = ParallelBackupStart(AH, NULL);
+ 
  		/* The TOC is always created uncompressed */
  		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
  		if (tocFH == NULL)
*************** _CloseArchive(ArchiveHandle *AH)
*** 543,553 ****
  		if (cfclose(tocFH) != 0)
  			exit_horribly(modulename, "could not close TOC file: %s\n",
  						  strerror(errno));
! 		WriteDataChunks(AH);
  	}
  	AH->FH = NULL;
  }
  
  
  /*
   * BLOB support
--- 591,614 ----
  		if (cfclose(tocFH) != 0)
  			exit_horribly(modulename, "could not close TOC file: %s\n",
  						  strerror(errno));
! 		WriteDataChunks(AH, ctx->pstate);
! 
! 		ParallelBackupEnd(AH, ctx->pstate);
  	}
  	AH->FH = NULL;
  }
  
+ /*
+  * Reopen the archive's file handle.
+  */
+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ 	/*
+ 	 * Our TOC is in memory, our data files are opened by each child anyway as
+ 	 * they are separate. We support reopening the archive by just doing nothing.
+ 	 */
+ }
  
  /*
   * BLOB support
*************** prependDirectory(ArchiveHandle *AH, char
*** 654,656 ****
--- 715,866 ----
  
  	return buf;
  }
+ 
+ /*
+  * Clone format-specific fields during parallel restoration.
+  */
+ static void
+ _Clone(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 
+ 	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
+ 	memcpy(AH->formatData, ctx, sizeof(lclContext));
+ 	ctx = (lclContext *) AH->formatData;
+ 
+ 	/*
+ 	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
+ 	 * entry per archive, so no parallelism is possible.  Likewise,
+ 	 * TOC-entry-local state isn't an issue because any one TOC entry is
+ 	 * touched by just one worker child.
+ 	 */
+ 
+ 	/*
+ 	 * We also don't copy the ParallelState pointer (pstate), only the master
+ 	 * process ever writes to it.
+ 	 */
+ }
+ 
+ static void
+ _DeClone(ArchiveHandle *AH)
+ {
+ 	lclContext *ctx = (lclContext *) AH->formatData;
+ 	free(ctx);
+ }
+ 
+ /*
+  * This function is executed in the parent process. Depending on the desired
+  * action (dump or restore) it creates a string that is understood by the
+  * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static char *
+ _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ 	/*
+ 	 * A static char is okay here, even on Windows because we call this
+ 	 * function only from one process (the master).
+ 	 */
+ 	static char	buf[64];
+ 
+ 	if (act == ACT_DUMP)
+ 		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
+ 	else if (act == ACT_RESTORE)
+ 		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * directory archive and dumps the actual data.
+  *
+  * We are currently returning only the DumpId so theoretically we could
+  * make this function returning an int (or a DumpId). However, to
+  * facilitate further enhancements and because sooner or later we need to
+  * convert this to a string and send it via a message anyway, we stick with
+  * char *. It is parsed on the other side by the _EndMasterParallel()
+  * function of the respective dump format.
+  */
+ static char *
+ _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ 
+ 	/* This should never happen */
+ 	if (!tctx)
+ 		exit_horribly(modulename, "Error during backup\n");
+ 
+ 	/*
+ 	 * This function returns void. We either fail and die horribly or succeed...
+ 	 * A failure will be detected by the parent when the child dies unexpectedly.
+ 	 */
+ 	WriteDataChunksForTocEntry(AH, te);
+ 
+ 	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
+ 
+ 	return buf;
+ }
+ 
+ /*
+  * This function is executed in the child of a parallel backup for the
+  * directory archive and dumps the actual data.
+  */
+ static char *
+ _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
+ {
+ 	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+ 	 * instead of static because we work with threads on windows */
+ 	const int	buflen = 64;
+ 	char	   *buf = (char*) pg_malloc(buflen);
+ 	ParallelArgs pargs;
+ 	int			status;
+ 	lclTocEntry *tctx;
+ 
+ 	tctx = (lclTocEntry *) te->formatData;
+ 
+ 	pargs.AH = AH;
+ 	pargs.te = te;
+ 
+ 	status = parallel_restore(&pargs);
+ 
+ 	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+ 
+ 	return buf;
+ }
+ /*
+  * This function is executed in the parent process. It analyzes the response of
+  * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+  * respective dump format.
+  */
+ static int
+ _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ 	DumpId		dumpId;
+ 	int			nBytes, n_errors;
+ 	int			status = 0;
+ 
+ 	if (act == ACT_DUMP)
+ 	{
+ 		sscanf(str, "%u%n", &dumpId, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(str));
+ 	}
+ 	else if (act == ACT_RESTORE)
+ 	{
+ 		sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+ 
+ 		Assert(dumpId == te->dumpId);
+ 		Assert(nBytes == strlen(str));
+ 
+ 		AH->public.n_errors += n_errors;
+ 	}
+ 
+ 	return status;
+ }
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index ced5c13..4b49b2e 100644
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
*************** InitArchiveFmt_Tar(ArchiveHandle *AH)
*** 156,161 ****
--- 156,167 ----
  	AH->ClonePtr = NULL;
  	AH->DeClonePtr = NULL;
  
+ 	AH->MasterStartParallelItemPtr = NULL;
+ 	AH->MasterEndParallelItemPtr = NULL;
+ 
+ 	AH->WorkerJobDumpPtr = NULL;
+ 	AH->WorkerJobRestorePtr = NULL;
+ 
  	/*
  	 * Set up some special context used in compressing data.
  	 */
*************** _CloseArchive(ArchiveHandle *AH)
*** 836,842 ****
  		/*
  		 * Now send the data (tables & blobs)
  		 */
! 		WriteDataChunks(AH);
  
  		/*
  		 * Now this format wants to append a script which does a full restore
--- 842,848 ----
  		/*
  		 * Now send the data (tables & blobs)
  		 */
! 		WriteDataChunks(AH, NULL);
  
  		/*
  		 * Now this format wants to append a script which does a full restore
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c8b556c..b80b35f 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
*************** static int	disable_dollar_quoting = 0;
*** 136,141 ****
--- 136,142 ----
  static int	dump_inserts = 0;
  static int	column_inserts = 0;
  static int	no_security_labels = 0;
+ static int  no_synchronized_snapshots = 0;
  static int	no_unlogged_table_data = 0;
  static int	serializable_deferrable = 0;
  
*************** static void binary_upgrade_extension_mem
*** 259,264 ****
--- 260,266 ----
  								const char *objlabel);
  static const char *getAttrName(int attrnum, TableInfo *tblInfo);
  static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
+ static char *get_synchronized_snapshot(Archive *fout);
  static PGresult *ExecuteSqlQueryForSingleRow(Archive *fout, char *query);
  
  
*************** main(int argc, char **argv)
*** 280,285 ****
--- 282,288 ----
  	int			numObjs;
  	DumpableObject *boundaryObjs;
  	int			i;
+ 	int			numWorkers = 1;
  	enum trivalue prompt_password = TRI_DEFAULT;
  	int			compressLevel = -1;
  	int			plainText = 0;
*************** main(int argc, char **argv)
*** 309,314 ****
--- 312,318 ----
  		{"format", required_argument, NULL, 'F'},
  		{"host", required_argument, NULL, 'h'},
  		{"ignore-version", no_argument, NULL, 'i'},
+ 		{"jobs", 1, NULL, 'j'},
  		{"no-reconnect", no_argument, NULL, 'R'},
  		{"oids", no_argument, NULL, 'o'},
  		{"no-owner", no_argument, NULL, 'O'},
*************** main(int argc, char **argv)
*** 348,353 ****
--- 352,358 ----
  		{"serializable-deferrable", no_argument, &serializable_deferrable, 1},
  		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
  		{"no-security-labels", no_argument, &no_security_labels, 1},
+ 		{"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1},
  		{"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1},
  
  		{NULL, 0, NULL, 0}
*************** main(int argc, char **argv)
*** 355,360 ****
--- 360,371 ----
  
  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
  
+ 	/*
+ 	 * Initialize what we need for parallel execution, especially for thread
+ 	 * support on Windows.
+ 	 */
+ 	init_parallel_dump_utils();
+ 
  	g_verbose = false;
  
  	strcpy(g_comment_start, "-- ");
*************** main(int argc, char **argv)
*** 385,391 ****
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:in:N:oOp:RsS:t:T:U:vwWxZ:",
  							long_options, &optindex)) != -1)
  	{
  		switch (c)
--- 396,402 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:ij:n:N:oOp:RsS:t:T:U:vwWxZ:",
  							long_options, &optindex)) != -1)
  	{
  		switch (c)
*************** main(int argc, char **argv)
*** 426,431 ****
--- 437,446 ----
  				/* ignored, deprecated option */
  				break;
  
+ 			case 'j':			/* number of dump jobs */
+ 				numWorkers = atoi(optarg);
+ 				break;
+ 
  			case 'n':			/* include schema(s) */
  				simple_string_list_append(&schema_include_patterns, optarg);
  				include_everything = false;
*************** main(int argc, char **argv)
*** 565,570 ****
--- 580,601 ----
  			compressLevel = 0;
  	}
  
+ 	/*
+ 	 * On Windows we can only have at most MAXIMUM_WAIT_OBJECTS (= 64 usually)
+ 	 * parallel jobs because that's the maximum limit for the
+ 	 * WaitForMultipleObjects() call.
+ 	 */
+ 	if (numWorkers <= 0
+ #ifdef WIN32
+ 			|| numWorkers > MAXIMUM_WAIT_OBJECTS
+ #endif
+ 		)
+ 		exit_horribly(NULL, "%s: invalid number of parallel jobs\n", progname);
+ 
+ 	/* Parallel backup only in the directory archive format so far */
+ 	if (archiveFormat != archDirectory && numWorkers > 1)
+ 		exit_horribly(NULL, "parallel backup only supported by the directory format\n");
+ 
  	/* Open the output file */
  	fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode);
  
*************** main(int argc, char **argv)
*** 588,593 ****
--- 619,626 ----
  	fout->minRemoteVersion = 70000;
  	fout->maxRemoteVersion = (my_version / 100) * 100 + 99;
  
+ 	fout->numWorkers = numWorkers;
+ 
  	/*
  	 * Open the database using the Archiver, so it knows about it. Errors mean
  	 * death.
*************** main(int argc, char **argv)
*** 602,626 ****
  	if (fout->remoteVersion < 90100)
  		no_security_labels = 1;
  
- 	/*
- 	 * Start transaction-snapshot mode transaction to dump consistent data.
- 	 */
- 	ExecuteSqlStatement(fout, "BEGIN");
- 	if (fout->remoteVersion >= 90100)
- 	{
- 		if (serializable_deferrable)
- 			ExecuteSqlStatement(fout,
- 								"SET TRANSACTION ISOLATION LEVEL "
- 								"SERIALIZABLE, READ ONLY, DEFERRABLE");
- 		else
- 			ExecuteSqlStatement(fout,
- 								"SET TRANSACTION ISOLATION LEVEL "
- 								"REPEATABLE READ");
- 	}
- 	else
- 		ExecuteSqlStatement(fout,
- 							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
- 
  	/* Select the appropriate subquery to convert user IDs to names */
  	if (fout->remoteVersion >= 80100)
  		username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
--- 635,640 ----
*************** main(int argc, char **argv)
*** 629,634 ****
--- 643,656 ----
  	else
  		username_subquery = "SELECT usename FROM pg_user WHERE usesysid =";
  
+ 	/* check the version for the synchronized snapshots feature */
+ 	if (numWorkers > 1 && fout->remoteVersion < 90200
+ 		&& !no_synchronized_snapshots)
+ 		exit_horribly(NULL,
+ 					 "No synchronized snapshots available in this server version.\n"
+ 					 "Run with --no-synchronized-snapshots instead if you do not\n"
+ 					 "need synchronized snapshots.\n");
+ 
  	/* Find the last built-in OID, if needed */
  	if (fout->remoteVersion < 70300)
  	{
*************** main(int argc, char **argv)
*** 725,730 ****
--- 747,756 ----
  	else
  		sortDumpableObjectsByTypeOid(dobjs, numObjs);
  
+ 	/* If we do a parallel dump, we want the largest tables to go first */
+ 	if (archiveFormat == archDirectory && numWorkers > 1)
+ 		sortDataAndIndexObjectsBySize(dobjs, numObjs);
+ 
  	sortDumpableObjects(dobjs, numObjs,
  						boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
  
*************** help(const char *progname)
*** 806,811 ****
--- 832,838 ----
  	printf(_("  -f, --file=FILENAME          output file or directory name\n"));
  	printf(_("  -F, --format=c|d|t|p         output file format (custom, directory, tar,\n"
  			 "                               plain text (default))\n"));
+ 	printf(_("  -j, --jobs=NUM               use this many parallel jobs to dump\n"));
  	printf(_("  -v, --verbose                verbose mode\n"));
  	printf(_("  -V, --version                output version information, then exit\n"));
  	printf(_("  -Z, --compress=0-9           compression level for compressed formats\n"));
*************** help(const char *progname)
*** 835,840 ****
--- 862,868 ----
  	printf(_("  --exclude-table-data=TABLE   do NOT dump data for the named table(s)\n"));
  	printf(_("  --inserts                    dump data as INSERT commands, rather than COPY\n"));
  	printf(_("  --no-security-labels         do not dump security label assignments\n"));
+ 	printf(_("  --no-synchronized-snapshots parallel processes should not use synchronized snapshots\n"));
  	printf(_("  --no-tablespaces             do not dump tablespace assignments\n"));
  	printf(_("  --no-unlogged-table-data     do not dump unlogged table data\n"));
  	printf(_("  --quote-all-identifiers      quote all identifiers, even if not key words\n"));
*************** setup_connection(Archive *AH, const char
*** 863,869 ****
  	PGconn	   *conn = GetConnection(AH);
  	const char *std_strings;
  
! 	/* Set the client encoding if requested */
  	if (dumpencoding)
  	{
  		if (PQsetClientEncoding(conn, dumpencoding) < 0)
--- 891,902 ----
  	PGconn	   *conn = GetConnection(AH);
  	const char *std_strings;
  
! 	/*
! 	 * Set the client encoding if requested. If dumpencoding == NULL then
! 	 * either it hasn't been requested or we're a cloned connection and then this
! 	 * has already been set in CloneArchive according to the original
! 	 * connection encoding.
! 	 */
  	if (dumpencoding)
  	{
  		if (PQsetClientEncoding(conn, dumpencoding) < 0)
*************** setup_connection(Archive *AH, const char
*** 881,886 ****
--- 914,923 ----
  	AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
  
  	/* Set the role if requested */
+ 	if (!use_role && AH->use_role)
+ 		use_role = AH->use_role;
+ 
+ 	/* Set the role if requested */
  	if (use_role && AH->remoteVersion >= 80100)
  	{
  		PQExpBuffer query = createPQExpBuffer();
*************** setup_connection(Archive *AH, const char
*** 888,893 ****
--- 925,934 ----
  		appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
  		ExecuteSqlStatement(AH, query->data);
  		destroyPQExpBuffer(query);
+ 
+ 		/* save this for later use on parallel connections */
+ 		if (!AH->use_role)
+ 			AH->use_role = strdup(use_role);
  	}
  
  	/* Set the datestyle to ISO to ensure the dump's portability */
*************** setup_connection(Archive *AH, const char
*** 924,929 ****
--- 965,1023 ----
  	 */
  	if (quote_all_identifiers && AH->remoteVersion >= 90100)
  		ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
+ 
+ 	/*
+ 	 * Start transaction-snapshot mode transaction to dump consistent data.
+ 	 */
+ 	ExecuteSqlStatement(AH, "BEGIN");
+ 	if (AH->remoteVersion >= 90100)
+ 	{
+ 		if (serializable_deferrable)
+ 			ExecuteSqlStatement(AH,
+ 						   "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, "
+ 						   "READ ONLY, DEFERRABLE");
+ 		else
+ 			ExecuteSqlStatement(AH,
+ 						   "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ 	}
+ 	else
+ 		ExecuteSqlStatement(AH, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+ 
+ 	if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots)
+ 	{
+ 		if (AH->sync_snapshot_id)
+ 		{
+ 			PQExpBuffer query = createPQExpBuffer();
+ 			appendPQExpBuffer(query, "SET TRANSACTION SNAPSHOT ");
+ 			appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
+ 			destroyPQExpBuffer(query);
+ 		}
+ 		else
+ 			AH->sync_snapshot_id = get_synchronized_snapshot(AH);
+ 	}
+ }
+ 
+ /*
+  * Initialize the connection for a new worker process.
+  */
+ void
+ _SetupWorker(Archive *AHX, RestoreOptions *ropt)
+ {
+ 	setup_connection(AHX, NULL, NULL);
+ }
+ 
+ static char*
+ get_synchronized_snapshot(Archive *fout)
+ {
+ 	char	   *query = "select pg_export_snapshot()";
+ 	char	   *result;
+ 	PGresult   *res;
+ 
+ 	res = ExecuteSqlQueryForSingleRow(fout, query);
+ 	result = strdup(PQgetvalue(res, 0, 0));
+ 	PQclear(res);
+ 
+ 	return result;
  }
  
  static ArchiveFormat
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 64c9f17..0c917e4 100644
*** a/src/bin/pg_dump/pg_restore.c
--- b/src/bin/pg_dump/pg_restore.c
*************** main(int argc, char **argv)
*** 374,379 ****
--- 374,389 ----
  	if (opts->tocFile)
  		SortTocFromFile(AH, opts);
  
+ 	/* See comments in pg_dump.c */
+ #ifdef WIN32
+ 	if (numWorkers > MAXIMUM_WAIT_OBJECTS)
+ 	{
+ 		fprintf(stderr, _("%s: maximum number of parallel jobs is %d\n"),
+ 				progname, MAXIMUM_WAIT_OBJECTS);
+ 		exit(1);
+ 	}
+ #endif
+ 
  	AH->numWorkers = numWorkers;
  
  	if (opts->tocSummary)
*************** main(int argc, char **argv)
*** 397,402 ****
--- 407,419 ----
  	return exit_code;
  }
  
+ void
+ _SetupWorker(Archive *AHX, RestoreOptions *ropt)
+ {
+ 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ 	(AH->ReopenPtr) (AH);
+ }
+ 
  static void
  usage(const char *progname)
  {
