diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index c656ba5..1a52fae 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 2,21 ****
   *
   * parallel.c
   *
!  *	Parallel support for the pg_dump archiver
   *
   * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
-  *	The author is not responsible for loss or damages that may
-  *	result from its use.
-  *
   * IDENTIFICATION
   *		src/bin/pg_dump/parallel.c
   *
   *-------------------------------------------------------------------------
   */
  
  #include "postgres_fe.h"
  
  #include "parallel.h"
--- 2,62 ----
   *
   * parallel.c
   *
!  *	Parallel support for pg_dump and pg_restore
   *
   * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
   * IDENTIFICATION
   *		src/bin/pg_dump/parallel.c
   *
   *-------------------------------------------------------------------------
   */
  
+ /*
+  * Parallel operation works like this:
+  *
+  * The original, master process calls ParallelBackupStart(), which forks off
+  * the desired number of worker processes, which each enter WaitForCommands().
+  *
+  * The master process dispatches an individual work item to one of the worker
+  * processes in DispatchJobForTocEntry().  That calls
+  * AH->MasterStartParallelItemPtr, a routine of the output format.  This
+  * function's arguments are the parents archive handle AH (containing the full
+  * catalog information), the TocEntry that the worker should work on and a
+  * T_Action value indicating whether this is a backup or a restore task.  The
+  * function simply converts the TocEntry assignment into a command string that
+  * is then sent over to the worker process. In the simplest case that would be
+  * something like "DUMP 1234", with 1234 being the TocEntry id.
+  *
+  * The worker process receives and decodes the command and passes it to the
+  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
+  * which are routines of the current archive format.  That routine performs
+  * the required action (dump or restore) and returns a malloc'd status string.
+  * The status string is passed back to the master where it is interpreted by
+  * AH->MasterEndParallelItemPtr, another format-specific routine.  That
+  * function can update state or catalog information on the master's side,
+  * depending on the reply from the worker process.  In the end it returns a
+  * status code, which is 0 for successful execution.
+  *
+  * Remember that we have forked off the workers only after we have read in
+  * the catalog.  That's why our worker processes can also access the catalog
+  * information.  (In the Windows case, the workers are threads in the same
+  * process.  To avoid problems, they work with cloned copies of the Archive
+  * data structure; see init_spawned_worker_win32().)
+  *
+  * In the master process, the workerStatus field for each worker has one of
+  * the following values:
+  *		WRKR_IDLE: it's waiting for a command
+  *		WRKR_WORKING: it's been sent a command
+  *		WRKR_FINISHED: it's returned a result
+  *		WRKR_TERMINATED: process ended
+  * The FINISHED state indicates that the worker is idle, but we've not yet
+  * dealt with the status code it returned from the prior command.
+  * ReapWorkerStatus() extracts the unhandled command status value and sets
+  * the workerStatus back to WRKR_IDLE.
+  */
+ 
  #include "postgres_fe.h"
  
  #include "parallel.h"
***************
*** 30,44 ****
  #include <fcntl.h>
  #endif
  
  #define PIPE_READ							0
  #define PIPE_WRITE							1
  
- /* file-scope variables */
  #ifdef WIN32
- static unsigned int tMasterThreadId = 0;
- static HANDLE termEvent = INVALID_HANDLE_VALUE;
- static int	pgpipe(int handles[2]);
- static int	piperead(int s, char *buf, int len);
  
  /*
   * Structure to hold info passed by _beginthreadex() to the function it calls
--- 71,81 ----
  #include <fcntl.h>
  #endif
  
+ /* Mnemonic macros for indexing the fd array returned by pipe(2) */
  #define PIPE_READ							0
  #define PIPE_WRITE							1
  
  #ifdef WIN32
  
  /*
   * Structure to hold info passed by _beginthreadex() to the function it calls
*************** static int	piperead(int s, char *buf, in
*** 47,71 ****
  typedef struct
  {
  	ArchiveHandle *AH;
- 	int			worker;
  	int			pipeRead;
  	int			pipeWrite;
  } WorkerInfo;
  
  #define pipewrite(a,b,c)	send(a,b,c,0)
! #else
  /*
!  * aborting is only ever used in the master, the workers are fine with just
!  * wantAbort.
   */
  static bool aborting = false;
  static volatile sig_atomic_t wantAbort = 0;
  
  #define pgpipe(a)			pipe(a)
  #define piperead(a,b,c)		read(a,b,c)
  #define pipewrite(a,b,c)	write(a,b,c)
- #endif
  
  typedef struct ShutdownInformation
  {
  	ParallelState *pstate;
--- 84,117 ----
  typedef struct
  {
  	ArchiveHandle *AH;
  	int			pipeRead;
  	int			pipeWrite;
  } WorkerInfo;
  
+ /* Windows implementation of pipe access */
+ static int	pgpipe(int handles[2]);
+ static int	piperead(int s, char *buf, int len);
  #define pipewrite(a,b,c)	send(a,b,c,0)
! 
! #else							/* !WIN32 */
! 
  /*
!  * Variables for handling signals.  aborting is only ever used in the master,
!  * the workers just need wantAbort.
   */
  static bool aborting = false;
  static volatile sig_atomic_t wantAbort = 0;
  
+ /* Non-Windows implementation of pipe access */
  #define pgpipe(a)			pipe(a)
  #define piperead(a,b,c)		read(a,b,c)
  #define pipewrite(a,b,c)	write(a,b,c)
  
+ #endif   /* WIN32 */
+ 
+ /*
+  * State info for archive_close_connection() shutdown callback.
+  */
  typedef struct ShutdownInformation
  {
  	ParallelState *pstate;
*************** typedef struct ShutdownInformation
*** 74,93 ****
  
  static ShutdownInformation shutdown_info;
  
  static const char *modulename = gettext_noop("parallel archiver");
  
  static ParallelSlot *GetMyPSlot(ParallelState *pstate);
  static void archive_close_connection(int code, void *arg);
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
! 
! #ifndef WIN32
! static void sigTermHandler(int signum);
! #endif
! static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker);
  static bool HasEveryWorkerTerminated(ParallelState *pstate);
! 
! static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
  static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
  static char *getMessageFromMaster(int pipefd[2]);
  static void sendMessageToMaster(int pipefd[2], const char *str);
--- 120,146 ----
  
  static ShutdownInformation shutdown_info;
  
+ #ifdef WIN32
+ /* file-scope variables */
+ static unsigned int tMasterThreadId = 0;
+ static HANDLE termEvent = INVALID_HANDLE_VALUE;
+ static DWORD tls_index;
+ 
+ /* globally visible variables (needed by exit_nicely) */
+ bool		parallel_init_done = false;
+ DWORD		mainThreadId;
+ #endif   /* WIN32 */
+ 
  static const char *modulename = gettext_noop("parallel archiver");
  
+ /* Local function prototypes */
  static ParallelSlot *GetMyPSlot(ParallelState *pstate);
  static void archive_close_connection(int code, void *arg);
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
! static void RunWorker(ArchiveHandle *AH, int pipefd[2]);
  static bool HasEveryWorkerTerminated(ParallelState *pstate);
! static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
  static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
  static char *getMessageFromMaster(int pipefd[2]);
  static void sendMessageToMaster(int pipefd[2], const char *str);
*************** static char *readMessageFromPipe(int fd)
*** 103,117 ****
  #define messageEquals(msg, pattern) \
  	(strcmp(msg, pattern) == 0)
  
- #ifdef WIN32
- static void shutdown_parallel_dump_utils(int code, void *unused);
- bool		parallel_init_done = false;
- static DWORD tls_index;
- DWORD		mainThreadId;
- #endif
- 
  
  #ifdef WIN32
  static void
  shutdown_parallel_dump_utils(int code, void *unused)
  {
--- 156,166 ----
  #define messageEquals(msg, pattern) \
  	(strcmp(msg, pattern) == 0)
  
  
  #ifdef WIN32
+ /*
+  * Shutdown callback to clean up socket access
+  */
  static void
  shutdown_parallel_dump_utils(int code, void *unused)
  {
*************** shutdown_parallel_dump_utils(int code, v
*** 121,126 ****
--- 170,180 ----
  }
  #endif
  
+ /*
+  * Initialize parallel dump support --- should be called early in process
+  * startup.  (Currently, this is called whether or not we intend parallel
+  * activity.)
+  */
  void
  init_parallel_dump_utils(void)
  {
*************** init_parallel_dump_utils(void)
*** 130,161 ****
--- 184,226 ----
  		WSADATA		wsaData;
  		int			err;
  
+ 		/* Prepare for threaded operation */
  		tls_index = TlsAlloc();
  		mainThreadId = GetCurrentThreadId();
+ 
+ 		/* Initialize socket access */
  		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
  		if (err != 0)
  		{
  			fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
  			exit_nicely(1);
  		}
+ 		/* ... and arrange to shut it down at exit */
  		on_exit_nicely(shutdown_parallel_dump_utils, NULL);
  		parallel_init_done = true;
  	}
  #endif
  }
  
+ /*
+  * Find the ParallelSlot for the current worker process or thread.
+  *
+  * Returns NULL if no matching slot is found (this implies we're the master).
+  */
  static ParallelSlot *
  GetMyPSlot(ParallelState *pstate)
  {
  	int			i;
  
  	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
  #ifdef WIN32
  		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
  #else
  		if (pstate->parallelSlot[i].pid == getpid())
  #endif
  			return &(pstate->parallelSlot[i]);
+ 	}
  
  	return NULL;
  }
*************** GetMyPSlot(ParallelState *pstate)
*** 163,189 ****
  /*
   * A thread-local version of getLocalPQExpBuffer().
   *
!  * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
!  * will be one buffer per thread, which is at least better than one per call).
   */
  static PQExpBuffer
  getThreadLocalPQExpBuffer(void)
  {
  	/*
  	 * The Tls code goes awry if we use a static var, so we provide for both
! 	 * static and auto, and omit any use of the static var when using Tls.
  	 */
  	static PQExpBuffer s_id_return = NULL;
  	PQExpBuffer id_return;
  
- #ifdef WIN32
  	if (parallel_init_done)
! 		id_return = (PQExpBuffer) TlsGetValue(tls_index);		/* 0 when not set */
  	else
  		id_return = s_id_return;
- #else
- 	id_return = s_id_return;
- #endif
  
  	if (id_return)				/* first time through? */
  	{
--- 228,252 ----
  /*
   * A thread-local version of getLocalPQExpBuffer().
   *
!  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
!  * thread, which is much better than one per fmtId/fmtQualifiedId call.
   */
+ #ifdef WIN32
  static PQExpBuffer
  getThreadLocalPQExpBuffer(void)
  {
  	/*
  	 * The Tls code goes awry if we use a static var, so we provide for both
! 	 * static and auto, and omit any use of the static var when using Tls. We
! 	 * rely on TlsGetValue() to return 0 if the value is not yet set.
  	 */
  	static PQExpBuffer s_id_return = NULL;
  	PQExpBuffer id_return;
  
  	if (parallel_init_done)
! 		id_return = (PQExpBuffer) TlsGetValue(tls_index);
  	else
  		id_return = s_id_return;
  
  	if (id_return)				/* first time through? */
  	{
*************** getThreadLocalPQExpBuffer(void)
*** 194,217 ****
  	{
  		/* new buffer */
  		id_return = createPQExpBuffer();
- #ifdef WIN32
  		if (parallel_init_done)
  			TlsSetValue(tls_index, id_return);
  		else
  			s_id_return = id_return;
- #else
- 		s_id_return = id_return;
- #endif
- 
  	}
  
  	return id_return;
  }
  
  /*
!  * pg_dump and pg_restore register the Archive pointer for the exit handler
!  * (called from exit_nicely). This function mainly exists so that we can
!  * keep shutdown_info in file scope only.
   */
  void
  on_exit_close_archive(Archive *AHX)
--- 257,275 ----
  	{
  		/* new buffer */
  		id_return = createPQExpBuffer();
  		if (parallel_init_done)
  			TlsSetValue(tls_index, id_return);
  		else
  			s_id_return = id_return;
  	}
  
  	return id_return;
  }
+ #endif   /* WIN32 */
  
  /*
!  * pg_dump and pg_restore call this to register the cleanup handler
!  * as soon as they've created the ArchiveHandle.
   */
  void
  on_exit_close_archive(Archive *AHX)
*************** archive_close_connection(int code, void 
*** 281,292 ****
  }
  
  /*
   * If we have one worker that terminates for some reason, we'd like the other
   * threads to terminate as well (and not finish with their 70 GB table dump
!  * first...). Now in UNIX we can just kill these processes, and let the signal
!  * handler set wantAbort to 1. In Windows we set a termEvent and this serves
!  * as the signal for everyone to terminate.  We don't print any error message,
!  * that would just clutter the screen.
   */
  void
  checkAborting(ArchiveHandle *AH)
--- 339,357 ----
  }
  
  /*
+  * Check to see if we've been told to abort, and exit the process/thread if
+  * so.  We don't print any error message; that would just clutter the screen.
+  *
   * If we have one worker that terminates for some reason, we'd like the other
   * threads to terminate as well (and not finish with their 70 GB table dump
!  * first...).  In Unix, the master sends SIGTERM and the worker's signal
!  * handler sets wantAbort to 1.  In Windows we set a termEvent and this serves
!  * as the signal for worker threads to exit.  Note that while we check this
!  * fairly frequently during data transfers, an idle worker doesn't come here
!  * at all, so additional measures are needed to force shutdown.
!  *
!  * XXX in parallel restore, slow server-side operations like CREATE INDEX
!  * are not interrupted by anything we do here.  This needs more work.
   */
  void
  checkAborting(ArchiveHandle *AH)
*************** checkAborting(ArchiveHandle *AH)
*** 300,306 ****
  }
  
  /*
!  * Shut down any remaining workers, waiting for them to finish.
   */
  static void
  ShutdownWorkersHard(ParallelState *pstate)
--- 365,371 ----
  }
  
  /*
!  * Forcibly shut down any remaining workers, waiting for them to finish.
   */
  static void
  ShutdownWorkersHard(ParallelState *pstate)
*************** WaitForTerminatingWorkers(ParallelState 
*** 392,401 ****
  	}
  }
  
  #ifndef WIN32
- /* Signal handling (UNIX only) */
  static void
! sigTermHandler(int signum)
  {
  	wantAbort = 1;
  }
--- 457,468 ----
  	}
  }
  
+ /*
+  * Signal handler (UNIX only)
+  */
  #ifndef WIN32
  static void
! sigTermHandler(SIGNAL_ARGS)
  {
  	wantAbort = 1;
  }
*************** sigTermHandler(int signum)
*** 407,413 ****
   * upon return.
   */
  static void
! SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
  {
  	/*
  	 * Call the setup worker function that's defined in the ArchiveHandle.
--- 474,480 ----
   * upon return.
   */
  static void
! RunWorker(ArchiveHandle *AH, int pipefd[2])
  {
  	/*
  	 * Call the setup worker function that's defined in the ArchiveHandle.
*************** SetupWorker(ArchiveHandle *AH, int pipef
*** 416,447 ****
  
  	Assert(AH->connection != NULL);
  
  	WaitForCommands(AH, pipefd);
  }
  
  #ifdef WIN32
  static unsigned __stdcall
  init_spawned_worker_win32(WorkerInfo *wi)
  {
  	ArchiveHandle *AH;
  	int			pipefd[2] = {wi->pipeRead, wi->pipeWrite};
- 	int			worker = wi->worker;
  
  	AH = CloneArchive(wi->AH);
  
  	free(wi);
- 	SetupWorker(AH, pipefd, worker);
  
  	DeCloneArchive(AH);
  	_endthreadex(0);
  	return 0;
  }
! #endif
  
  /*
!  * This function starts the parallel dump or restore by spawning off the
!  * worker processes in both Unix and Windows. For Windows, it creates a number
!  * of threads while it does a fork() on Unix.
   */
  ParallelState *
  ParallelBackupStart(ArchiveHandle *AH)
--- 483,526 ----
  
  	Assert(AH->connection != NULL);
  
+ 	/*
+ 	 * Execute commands until done.
+ 	 */
  	WaitForCommands(AH, pipefd);
  }
  
+ /*
+  * Thread base function for Windows
+  */
  #ifdef WIN32
  static unsigned __stdcall
  init_spawned_worker_win32(WorkerInfo *wi)
  {
  	ArchiveHandle *AH;
  	int			pipefd[2] = {wi->pipeRead, wi->pipeWrite};
  
+ 	/*
+ 	 * Clone the archive so that we have our own state to work with, and in
+ 	 * particular our own database connection.
+ 	 */
  	AH = CloneArchive(wi->AH);
  
  	free(wi);
  
+ 	/* Run the worker ... */
+ 	RunWorker(AH, pipefd);
+ 
+ 	/* Clean up and exit the thread */
  	DeCloneArchive(AH);
  	_endthreadex(0);
  	return 0;
  }
! #endif   /* WIN32 */
  
  /*
!  * This function starts a parallel dump or restore by spawning off the worker
!  * processes.  For Windows, it creates a number of threads; on Unix the
!  * workers are created with fork().
   */
  ParallelState *
  ParallelBackupStart(ArchiveHandle *AH)
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 471,487 ****
  	 * set and falls back to AHX otherwise.
  	 */
  	shutdown_info.pstate = pstate;
- 	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
  
  #ifdef WIN32
  	tMasterThreadId = GetCurrentThreadId();
  	termEvent = CreateEvent(NULL, true, false, "Terminate");
  #else
  	signal(SIGTERM, sigTermHandler);
  	signal(SIGINT, sigTermHandler);
  	signal(SIGQUIT, sigTermHandler);
  #endif
  
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
  #ifdef WIN32
--- 550,570 ----
  	 * set and falls back to AHX otherwise.
  	 */
  	shutdown_info.pstate = pstate;
  
  #ifdef WIN32
+ 	/* Set up thread management state */
  	tMasterThreadId = GetCurrentThreadId();
  	termEvent = CreateEvent(NULL, true, false, "Terminate");
+ 	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
+ 	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
  #else
+ 	/* Set up signal handling state */
  	signal(SIGTERM, sigTermHandler);
  	signal(SIGINT, sigTermHandler);
  	signal(SIGQUIT, sigTermHandler);
  #endif
  
+ 	/* Create desired number of workers */
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
  #ifdef WIN32
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 493,498 ****
--- 576,582 ----
  		int			pipeMW[2],
  					pipeWM[2];
  
+ 		/* Create communication pipes for this worker */
  		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
  			exit_horribly(modulename,
  						  "could not create communication channels: %s\n",
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 511,520 ****
  		pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
  
  #ifdef WIN32
! 		/* Allocate a new structure for every worker */
  		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
  
- 		wi->worker = i;
  		wi->AH = AH;
  		wi->pipeRead = pipeMW[PIPE_READ];
  		wi->pipeWrite = pipeWM[PIPE_WRITE];
--- 595,603 ----
  		pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
  
  #ifdef WIN32
! 		/* Create transient structure to pass args to worker function */
  		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
  
  		wi->AH = AH;
  		wi->pipeRead = pipeMW[PIPE_READ];
  		wi->pipeWrite = pipeWM[PIPE_WRITE];
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 522,528 ****
  		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
  								wi, 0, &(pstate->parallelSlot[i].threadId));
  		pstate->parallelSlot[i].hThread = handle;
! #else
  		pid = fork();
  		if (pid == 0)
  		{
--- 605,611 ----
  		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
  								wi, 0, &(pstate->parallelSlot[i].threadId));
  		pstate->parallelSlot[i].hThread = handle;
! #else							/* !WIN32 */
  		pid = fork();
  		if (pid == 0)
  		{
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 535,549 ****
  
  			pstate->parallelSlot[i].pid = getpid();
  
- 			/*
- 			 * Call CloneArchive on Unix as well even though technically we
- 			 * don't need to because fork() gives us a copy in our own address
- 			 * space already. But CloneArchive resets the state information
- 			 * and also clones the database connection (for parallel dump)
- 			 * which both seem kinda helpful.
- 			 */
- 			pstate->parallelSlot[i].args->AH = CloneArchive(AH);
- 
  			/* close read end of Worker -> Master */
  			closesocket(pipeWM[PIPE_READ]);
  			/* close write end of Master -> Worker */
--- 618,623 ----
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 559,589 ****
  				closesocket(pstate->parallelSlot[j].pipeWrite);
  			}
  
! 			SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i);
  
  			exit(0);
  		}
  		else if (pid < 0)
  			/* fork failed */
  			exit_horribly(modulename,
  						  "could not create worker process: %s\n",
  						  strerror(errno));
  
! 		/* we are the Master, pid > 0 here */
! 		Assert(pid > 0);
  
  		/* close read end of Master -> Worker */
  		closesocket(pipeMW[PIPE_READ]);
  		/* close write end of Worker -> Master */
  		closesocket(pipeWM[PIPE_WRITE]);
! 
! 		pstate->parallelSlot[i].pid = pid;
! #endif
  	}
  
  	/*
  	 * Having forked off the workers, disable SIGPIPE so that master isn't
! 	 * killed if it tries to send a command to a dead worker.
  	 */
  #ifndef WIN32
  	signal(SIGPIPE, SIG_IGN);
--- 633,675 ----
  				closesocket(pstate->parallelSlot[j].pipeWrite);
  			}
  
! 			/*
! 			 * Call CloneArchive on Unix as well as Windows, even though
! 			 * technically we don't need to because fork() gives us a copy in
! 			 * our own address space already.  But CloneArchive resets the
! 			 * state information and also clones the database connection which
! 			 * both seem kinda helpful.
! 			 */
! 			pstate->parallelSlot[i].args->AH = CloneArchive(AH);
  
+ 			/* Run the worker ... */
+ 			RunWorker(pstate->parallelSlot[i].args->AH, pipefd);
+ 
+ 			/* We can just exit(0) when done */
  			exit(0);
  		}
  		else if (pid < 0)
+ 		{
  			/* fork failed */
  			exit_horribly(modulename,
  						  "could not create worker process: %s\n",
  						  strerror(errno));
+ 		}
  
! 		/* In Master after successful fork */
! 		pstate->parallelSlot[i].pid = pid;
  
  		/* close read end of Master -> Worker */
  		closesocket(pipeMW[PIPE_READ]);
  		/* close write end of Worker -> Master */
  		closesocket(pipeWM[PIPE_WRITE]);
! #endif   /* WIN32 */
  	}
  
  	/*
  	 * Having forked off the workers, disable SIGPIPE so that master isn't
! 	 * killed if it tries to send a command to a dead worker.  We don't want
! 	 * the workers to inherit this setting, though.
  	 */
  #ifndef WIN32
  	signal(SIGPIPE, SIG_IGN);
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 593,691 ****
  }
  
  /*
!  * Tell all of our workers to terminate.
!  *
!  * Pretty straightforward routine, first we tell everyone to terminate, then
!  * we listen to the workers' replies and finally close the sockets that we
!  * have used for communication.
   */
  void
  ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
  {
  	int			i;
  
  	if (pstate->numWorkers == 1)
  		return;
  
  	Assert(IsEveryWorkerIdle(pstate));
  
! 	/* close the sockets so that the workers know they can exit */
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
  		closesocket(pstate->parallelSlot[i].pipeRead);
  		closesocket(pstate->parallelSlot[i].pipeWrite);
  	}
  	WaitForTerminatingWorkers(pstate);
  
  	/*
! 	 * Remove the pstate again, so the exit handler in the parent will now
! 	 * again fall back to closing AH->connection (if connected).
  	 */
  	shutdown_info.pstate = NULL;
  
  	free(pstate->parallelSlot);
  	free(pstate);
  }
  
- 
  /*
!  * The sequence is the following (for dump, similar for restore):
!  *
!  * The master process starts the parallel backup in ParllelBackupStart, this
!  * forks the worker processes which enter WaitForCommand().
!  *
!  * The master process dispatches an individual work item to one of the worker
!  * processes in DispatchJobForTocEntry(). It calls
!  * AH->MasterStartParallelItemPtr, a routine of the output format. This
!  * function's arguments are the parents archive handle AH (containing the full
!  * catalog information), the TocEntry that the worker should work on and a
!  * T_Action act indicating whether this is a backup or a restore item.  The
!  * function then converts the TocEntry assignment into a string that is then
!  * sent over to the worker process. In the simplest case that would be
!  * something like "DUMP 1234", with 1234 being the TocEntry id.
!  *
!  * The worker receives the message in the routine pointed to by
!  * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to
!  * corresponding routines of the respective output format, e.g.
!  * _WorkerJobDumpDirectory().
!  *
!  * Remember that we have forked off the workers only after we have read in the
!  * catalog. That's why our worker processes can also access the catalog
!  * information. Now they re-translate the textual representation to a TocEntry
!  * on their side and do the required action (restore or dump).
!  *
!  * The result is again a textual string that is sent back to the master and is
!  * interpreted by AH->MasterEndParallelItemPtr. This function can update state
!  * or catalog information on the master's side, depending on the reply from
!  * the worker process. In the end it returns status which is 0 for successful
!  * execution.
!  *
!  * ---------------------------------------------------------------------
!  * Master									Worker
!  *
!  *											enters WaitForCommands()
!  * DispatchJobForTocEntry(...te...)
!  *
!  * [ Worker is IDLE ]
!  *
!  * arg = (MasterStartParallelItemPtr)()
!  * send: DUMP arg
!  *											receive: DUMP arg
!  *											str = (WorkerJobDumpPtr)(arg)
!  * [ Worker is WORKING ]					... gets te from arg ...
!  *											... dump te ...
!  *											send: OK DUMP info
!  *
!  * In ListenToWorkers():
!  *
!  * [ Worker is FINISHED ]
!  * receive: OK DUMP info
!  * status = (MasterEndParallelItemPtr)(info)
   *
!  * In ReapWorkerStatus(&ptr):
!  * *ptr = status;
!  * [ Worker is IDLE ]
!  * ---------------------------------------------------------------------
   */
  void
  DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
--- 679,723 ----
  }
  
  /*
!  * Close down a parallel dump or restore.
   */
  void
  ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
  {
  	int			i;
  
+ 	/* No work if non-parallel */
  	if (pstate->numWorkers == 1)
  		return;
  
+ 	/* There should not be any unfinished jobs */
  	Assert(IsEveryWorkerIdle(pstate));
  
! 	/* Close the sockets so that the workers know they can exit */
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
  		closesocket(pstate->parallelSlot[i].pipeRead);
  		closesocket(pstate->parallelSlot[i].pipeWrite);
  	}
+ 
+ 	/* Wait for them to exit */
  	WaitForTerminatingWorkers(pstate);
  
  	/*
! 	 * Unlink pstate from shutdown_info, so the exit handler will again fall
! 	 * back to closing AH->connection (if connected).
  	 */
  	shutdown_info.pstate = NULL;
  
+ 	/* Release state (mere neatnik-ism, since we're about to terminate) */
  	free(pstate->parallelSlot);
  	free(pstate);
  }
  
  /*
!  * Dispatch a job to some free worker (caller must ensure there is one!)
   *
!  * te is the TocEntry to be processed, act is the action to be taken on it.
   */
  void
  DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
*************** DispatchJobForTocEntry(ArchiveHandle *AH
*** 695,714 ****
  	char	   *arg;
  
  	/* our caller makes sure that at least one worker is idle */
- 	Assert(GetIdleWorker(pstate) != NO_SLOT);
  	worker = GetIdleWorker(pstate);
  	Assert(worker != NO_SLOT);
  
  	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
  
  	sendMessageToWorker(pstate, worker, arg);
  
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
  	pstate->parallelSlot[worker].args->te = te;
  }
  
  /*
!  * Find the first free parallel slot (if any).
   */
  int
  GetIdleWorker(ParallelState *pstate)
--- 727,750 ----
  	char	   *arg;
  
  	/* our caller makes sure that at least one worker is idle */
  	worker = GetIdleWorker(pstate);
  	Assert(worker != NO_SLOT);
  
+ 	/* Construct and send command string */
  	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
  
  	sendMessageToWorker(pstate, worker, arg);
  
+ 	/* XXX aren't we leaking string here? (no, because it's static. Ick.) */
+ 
+ 	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
  	pstate->parallelSlot[worker].args->te = te;
  }
  
  /*
!  * Find an idle worker and return its slot number.
!  * Return NO_SLOT if none are idle.
   */
  int
  GetIdleWorker(ParallelState *pstate)
*************** GetIdleWorker(ParallelState *pstate)
*** 716,728 ****
  	int			i;
  
  	for (i = 0; i < pstate->numWorkers; i++)
  		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
  			return i;
  	return NO_SLOT;
  }
  
  /*
!  * Return true iff every worker process is in the WRKR_TERMINATED state.
   */
  static bool
  HasEveryWorkerTerminated(ParallelState *pstate)
--- 752,766 ----
  	int			i;
  
  	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
  		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
  			return i;
+ 	}
  	return NO_SLOT;
  }
  
  /*
!  * Return true iff every worker is in the WRKR_TERMINATED state.
   */
  static bool
  HasEveryWorkerTerminated(ParallelState *pstate)
*************** HasEveryWorkerTerminated(ParallelState *
*** 730,737 ****
--- 768,777 ----
  	int			i;
  
  	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
  		if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
  			return false;
+ 	}
  	return true;
  }
  
*************** IsEveryWorkerIdle(ParallelState *pstate)
*** 744,782 ****
  	int			i;
  
  	for (i = 0; i < pstate->numWorkers; i++)
  		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
  			return false;
  	return true;
  }
  
  /*
!  * ---------------------------------------------------------------------
!  * One danger of the parallel backup is a possible deadlock:
   *
   * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
   * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
   *	  because the master holds a conflicting ACCESS SHARE lock).
!  * 3) The worker process also requests an ACCESS SHARE lock to read the table.
!  *	  The worker's not granted that lock but is enqueued behind the ACCESS
!  *	  EXCLUSIVE lock request.
!  * ---------------------------------------------------------------------
   *
!  * Now what we do here is to just request a lock in ACCESS SHARE but with
!  * NOWAIT in the worker prior to touching the table. If we don't get the lock,
   * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
!  * are good to just fail the whole backup because we have detected a deadlock.
   */
  static void
! lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
  {
  	Archive    *AHX = (Archive *) AH;
  	const char *qualId;
! 	PQExpBuffer query = createPQExpBuffer();
  	PGresult   *res;
  
! 	Assert(AH->format == archDirectory);
! 	Assert(strcmp(te->desc, "BLOBS") != 0);
  
  	appendPQExpBuffer(query,
  					  "SELECT pg_namespace.nspname,"
  					  "       pg_class.relname "
--- 784,834 ----
  	int			i;
  
  	for (i = 0; i < pstate->numWorkers; i++)
+ 	{
  		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
  			return false;
+ 	}
  	return true;
  }
  
  /*
!  * Acquire lock on a table to be dumped by a worker process.
!  *
!  * The master process is already holding an ACCESS SHARE lock.  Ordinarily
!  * it's no problem for a worker to get one too, but if anything else besides
!  * pg_dump is running, there's a possible deadlock:
   *
   * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
   * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
   *	  because the master holds a conflicting ACCESS SHARE lock).
!  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
!  *	  The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
!  * 4) Now we have a deadlock, since the master is effectively waiting for
!  *	  the worker.  The server cannot detect that, however.
   *
!  * To prevent an infinite wait, prior to touching a table in a worker, request
!  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
   * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
!  * so we have a deadlock.  We must fail the backup in that case.
   */
  static void
! lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
  {
  	Archive    *AHX = (Archive *) AH;
  	const char *qualId;
! 	PQExpBuffer query;
  	PGresult   *res;
  
! 	/* Nothing to do for BLOBS */
! 	if (strcmp(te->desc, "BLOBS") == 0)
! 		return;
  
+ 	query = createPQExpBuffer();
+ 
+ 	/*
+ 	 * XXX this is an unbelievably expensive substitute for knowing how to dig
+ 	 * a table name out of a TocEntry.
+ 	 */
  	appendPQExpBuffer(query,
  					  "SELECT pg_namespace.nspname,"
  					  "       pg_class.relname "
*************** lockTableNoWait(ArchiveHandle *AH, TocEn
*** 815,825 ****
  }
  
  /*
!  * That's the main routine for the worker.
!  * When it starts up it enters this routine and waits for commands from the
!  * master process. After having processed a command it comes back to here to
!  * wait for the next command. Finally it will receive a TERMINATE command and
!  * exit.
   */
  static void
  WaitForCommands(ArchiveHandle *AH, int pipefd[2])
--- 867,875 ----
  }
  
  /*
!  * WaitForCommands: main routine for a worker process.
!  *
!  * Read and execute commands from the master until we see EOF on the pipe.
   */
  static void
  WaitForCommands(ArchiveHandle *AH, int pipefd[2])
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 827,839 ****
  	char	   *command;
  	DumpId		dumpId;
  	int			nBytes;
! 	char	   *str = NULL;
  	TocEntry   *te;
  
  	for (;;)
  	{
  		if (!(command = getMessageFromMaster(pipefd)))
  		{
  			PQfinish(AH->connection);
  			AH->connection = NULL;
  			return;
--- 877,890 ----
  	char	   *command;
  	DumpId		dumpId;
  	int			nBytes;
! 	char	   *str;
  	TocEntry   *te;
  
  	for (;;)
  	{
  		if (!(command = getMessageFromMaster(pipefd)))
  		{
+ 			/* EOF ... clean up */
  			PQfinish(AH->connection);
  			AH->connection = NULL;
  			return;
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 841,895 ****
  
  		if (messageStartsWith(command, "DUMP "))
  		{
! 			Assert(AH->format == archDirectory);
  			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
  			Assert(nBytes == strlen(command) - strlen("DUMP "));
- 
  			te = getTocEntryByDumpId(AH, dumpId);
  			Assert(te != NULL);
  
! 			/*
! 			 * Lock the table but with NOWAIT. Note that the parent is already
! 			 * holding a lock. If we cannot acquire another ACCESS SHARE MODE
! 			 * lock, then somebody else has requested an exclusive lock in the
! 			 * meantime.  lockTableNoWait dies in this case to prevent a
! 			 * deadlock.
! 			 */
! 			if (strcmp(te->desc, "BLOBS") != 0)
! 				lockTableNoWait(AH, te);
  
! 			/*
! 			 * The message we return here has been pg_malloc()ed and we are
! 			 * responsible for free()ing it.
! 			 */
  			str = (AH->WorkerJobDumpPtr) (AH, te);
! 			Assert(AH->connection != NULL);
  			sendMessageToMaster(pipefd, str);
  			free(str);
  		}
  		else if (messageStartsWith(command, "RESTORE "))
  		{
! 			Assert(AH->format == archDirectory || AH->format == archCustom);
! 			Assert(AH->connection != NULL);
! 
  			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
  			Assert(nBytes == strlen(command) - strlen("RESTORE "));
- 
  			te = getTocEntryByDumpId(AH, dumpId);
  			Assert(te != NULL);
  
! 			/*
! 			 * The message we return here has been pg_malloc()ed and we are
! 			 * responsible for free()ing it.
! 			 */
  			str = (AH->WorkerJobRestorePtr) (AH, te);
! 			Assert(AH->connection != NULL);
  			sendMessageToMaster(pipefd, str);
  			free(str);
  		}
  		else
  			exit_horribly(modulename,
! 					   "unrecognized command on communication channel: %s\n",
  						  command);
  
  		/* command was pg_malloc'd and we are responsible for free()ing it. */
--- 892,935 ----
  
  		if (messageStartsWith(command, "DUMP "))
  		{
! 			/* Decode the command */
  			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
  			Assert(nBytes == strlen(command) - strlen("DUMP "));
  			te = getTocEntryByDumpId(AH, dumpId);
  			Assert(te != NULL);
  
! 			/* Acquire lock on this table within the worker's session */
! 			lockTableForWorker(AH, te);
  
! 			/* Perform the dump command */
  			str = (AH->WorkerJobDumpPtr) (AH, te);
! 
! 			/* Return status to master */
  			sendMessageToMaster(pipefd, str);
+ 
+ 			/* we are responsible for freeing the status string */
  			free(str);
  		}
  		else if (messageStartsWith(command, "RESTORE "))
  		{
! 			/* Decode the command */
  			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
  			Assert(nBytes == strlen(command) - strlen("RESTORE "));
  			te = getTocEntryByDumpId(AH, dumpId);
  			Assert(te != NULL);
  
! 			/* Perform the restore command */
  			str = (AH->WorkerJobRestorePtr) (AH, te);
! 
! 			/* Return status to master */
  			sendMessageToMaster(pipefd, str);
+ 
+ 			/* we are responsible for freeing the status string */
  			free(str);
  		}
  		else
  			exit_horribly(modulename,
! 					   "unrecognized command received from master: \"%s\"\n",
  						  command);
  
  		/* command was pg_malloc'd and we are responsible for free()ing it. */
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 898,915 ****
  }
  
  /*
!  * ---------------------------------------------------------------------
!  * Note the status change:
   *
!  * DispatchJobForTocEntry		WRKR_IDLE -> WRKR_WORKING
!  * ListenToWorkers				WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
!  * ReapWorkerStatus				WRKR_FINISHED -> WRKR_IDLE
!  * ---------------------------------------------------------------------
   *
!  * Just calling ReapWorkerStatus() when all workers are working might or might
!  * not give you an idle worker because you need to call ListenToWorkers() in
!  * between and only thereafter ReapWorkerStatus(). This is necessary in order
!  * to get and deal with the status (=result) of the worker's execution.
   */
  void
  ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
--- 938,958 ----
  }
  
  /*
!  * Check for status messages from workers.
   *
!  * If do_wait is true, wait to get a status message; otherwise, just return
!  * immediately if there is none available.
   *
!  * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then save the resulting status code and switch the worker's state to
!  * WRKR_FINISHED.  Later, caller must call ReapWorkerStatus() to verify
!  * that the status was "OK" and push the worker back to IDLE state.
!  *
!  * XXX Rube Goldberg would be proud of this API, but no one else should be.
!  *
!  * XXX is it worth checking for more than one status message per call?
!  * It seems somewhat unlikely that multiple workers would finish at exactly
!  * the same time.
   */
  void
  ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 917,938 ****
  	int			worker;
  	char	   *msg;
  
  	msg = getMessageFromWorker(pstate, do_wait, &worker);
  
  	if (!msg)
  	{
  		if (do_wait)
  			exit_horribly(modulename, "a worker process died unexpectedly\n");
  		return;
  	}
  
  	if (messageStartsWith(msg, "OK "))
  	{
  		char	   *statusString;
- 		TocEntry   *te;
  
- 		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
- 		te = pstate->parallelSlot[worker].args->te;
  		if (messageStartsWith(msg, "OK RESTORE "))
  		{
  			statusString = msg + strlen("OK RESTORE ");
--- 960,982 ----
  	int			worker;
  	char	   *msg;
  
+ 	/* Try to collect a status message */
  	msg = getMessageFromWorker(pstate, do_wait, &worker);
  
  	if (!msg)
  	{
+ 		/* If do_wait is true, we must have detected EOF on some socket */
  		if (do_wait)
  			exit_horribly(modulename, "a worker process died unexpectedly\n");
  		return;
  	}
  
+ 	/* Process it and update our idea of the worker's status */
  	if (messageStartsWith(msg, "OK "))
  	{
+ 		TocEntry   *te = pstate->parallelSlot[worker].args->te;
  		char	   *statusString;
  
  		if (messageStartsWith(msg, "OK RESTORE "))
  		{
  			statusString = msg + strlen("OK RESTORE ");
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 951,972 ****
  			exit_horribly(modulename,
  						  "invalid message received from worker: \"%s\"\n",
  						  msg);
  	}
  	else
  		exit_horribly(modulename,
  					  "invalid message received from worker: \"%s\"\n",
  					  msg);
  
! 	/* both Unix and Win32 return pg_malloc()ed space, so we free it */
  	free(msg);
  }
  
  /*
!  * This function is executed in the master process.
   *
!  * This function is used to get the return value of a terminated worker
!  * process. If a process has terminated, its status is stored in *status and
!  * the id of the worker is returned.
   */
  int
  ReapWorkerStatus(ParallelState *pstate, int *status)
--- 995,1017 ----
  			exit_horribly(modulename,
  						  "invalid message received from worker: \"%s\"\n",
  						  msg);
+ 		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
  	}
  	else
  		exit_horribly(modulename,
  					  "invalid message received from worker: \"%s\"\n",
  					  msg);
  
! 	/* Free the string returned from getMessageFromWorker */
  	free(msg);
  }
  
  /*
!  * Check to see if any worker is in WRKR_FINISHED state.  If so,
!  * return its command status code into *status, reset it to IDLE state,
!  * and return its slot number.  Otherwise return NO_SLOT.
   *
!  * This function is executed in the master process.
   */
  int
  ReapWorkerStatus(ParallelState *pstate, int *status)
*************** ReapWorkerStatus(ParallelState *pstate, 
*** 987,995 ****
  }
  
  /*
!  * This function is executed in the master process.
   *
!  * It looks for an idle worker process and only returns if there is one.
   */
  void
  EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
--- 1032,1047 ----
  }
  
  /*
!  * Wait, if necessary, until we have at least one idle worker.
!  * Reap worker status as necessary to move FINISHED workers to IDLE state.
   *
!  * We assume that no extra processing is required when reaping a finished
!  * command, except for checking that the status was OK (zero).
!  * Caution: that assumption means that this function can only be used in
!  * parallel dump, not parallel restore, because the latter has a more
!  * complex set of rules about handling status.
!  *
!  * This function is executed in the master process.
   */
  void
  EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
*************** EnsureIdleWorker(ArchiveHandle *AH, Para
*** 1029,1037 ****
  }
  
  /*
!  * This function is executed in the master process.
   *
!  * It waits for all workers to terminate.
   */
  void
  EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
--- 1081,1095 ----
  }
  
  /*
!  * Wait for all workers to be idle.
   *
!  * We assume that no extra processing is required when reaping a finished
!  * command, except for checking that the status was OK (zero).
!  * Caution: that assumption means that this function can only be used in
!  * parallel dump, not parallel restore, because the latter has a more
!  * complex set of rules about handling status.
!  *
!  * This function is executed in the master process.
   */
  void
  EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
*************** EnsureWorkersFinished(ArchiveHandle *AH,
*** 1053,1062 ****
  }
  
  /*
!  * This function is executed in the worker process.
   *
!  * It returns the next message on the communication channel, blocking until it
!  * becomes available.
   */
  static char *
  getMessageFromMaster(int pipefd[2])
--- 1111,1121 ----
  }
  
  /*
!  * Read one command message from the master, blocking if necessary
!  * until one is available, and return it as a malloc'd string.
!  * On EOF, return NULL.
   *
!  * This function is executed in worker processes.
   */
  static char *
  getMessageFromMaster(int pipefd[2])
*************** getMessageFromMaster(int pipefd[2])
*** 1065,1073 ****
  }
  
  /*
!  * This function is executed in the worker process.
   *
!  * It sends a message to the master on the communication channel.
   */
  static void
  sendMessageToMaster(int pipefd[2], const char *str)
--- 1124,1132 ----
  }
  
  /*
!  * Send a status message to the master.
   *
!  * This function is executed in worker processes.
   */
  static void
  sendMessageToMaster(int pipefd[2], const char *str)
*************** sendMessageToMaster(int pipefd[2], const
*** 1081,1089 ****
  }
  
  /*
!  * A select loop that repeats calling select until a descriptor in the read
!  * set becomes readable. On Windows we have to check for the termination event
!  * from time to time, on Unix we can just block forever.
   */
  static int
  select_loop(int maxFd, fd_set *workerset)
--- 1140,1147 ----
  }
  
  /*
!  * Wait until some descriptor in "workerset" becomes readable.
!  * Returns -1 on error, else the number of readable descriptors.
   */
  static int
  select_loop(int maxFd, fd_set *workerset)
*************** select_loop(int maxFd, fd_set *workerset
*** 1092,1104 ****
  	fd_set		saveSet = *workerset;
  
  #ifdef WIN32
- 	/* should always be the master */
- 	Assert(tMasterThreadId == GetCurrentThreadId());
- 
  	for (;;)
  	{
  		/*
  		 * sleep a quarter of a second before checking if we should terminate.
  		 */
  		struct timeval tv = {0, 250000};
  
--- 1150,1160 ----
  	fd_set		saveSet = *workerset;
  
  #ifdef WIN32
  	for (;;)
  	{
  		/*
  		 * sleep a quarter of a second before checking if we should terminate.
+ 		 * XXX this certainly looks useless, why not just wait indefinitely?
  		 */
  		struct timeval tv = {0, 250000};
  
*************** select_loop(int maxFd, fd_set *workerset
*** 1110,1117 ****
  		if (i)
  			break;
  	}
! #else							/* UNIX */
! 
  	for (;;)
  	{
  		*workerset = saveSet;
--- 1166,1172 ----
  		if (i)
  			break;
  	}
! #else							/* !WIN32 */
  	for (;;)
  	{
  		*workerset = saveSet;
*************** select_loop(int maxFd, fd_set *workerset
*** 1131,1149 ****
  			continue;
  		break;
  	}
! #endif
  
  	return i;
  }
  
  
  /*
!  * This function is executed in the master process.
   *
!  * It returns the next message from the worker on the communication channel,
!  * optionally blocking (do_wait) until it becomes available.
   *
!  * The id of the worker is returned in *worker.
   */
  static char *
  getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
--- 1186,1210 ----
  			continue;
  		break;
  	}
! #endif   /* WIN32 */
  
  	return i;
  }
  
  
  /*
!  * Check for messages from worker processes.
   *
!  * If a message is available, return it as a malloc'd string, and put the
!  * index of the sending worker in *worker.
   *
!  * If nothing is available, wait if "do_wait" is true, else return NULL.
!  *
!  * If we detect EOF on any socket, we'll return NULL.  It's not great that
!  * that's hard to distinguish from the no-data-available case, but for now
!  * our one caller is okay with that.
!  *
!  * This function is executed in the master process.
   */
  static char *
  getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
*************** getMessageFromWorker(ParallelState *psta
*** 1153,1166 ****
  	int			maxFd = -1;
  	struct timeval nowait = {0, 0};
  
  	FD_ZERO(&workerset);
- 
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
  		if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
  			continue;
  		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
- 		/* actually WIN32 ignores the first parameter to select()... */
  		if (pstate->parallelSlot[i].pipeRead > maxFd)
  			maxFd = pstate->parallelSlot[i].pipeRead;
  	}
--- 1214,1226 ----
  	int			maxFd = -1;
  	struct timeval nowait = {0, 0};
  
+ 	/* construct bitmap of socket descriptors for select() */
  	FD_ZERO(&workerset);
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
  		if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
  			continue;
  		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
  		if (pstate->parallelSlot[i].pipeRead > maxFd)
  			maxFd = pstate->parallelSlot[i].pipeRead;
  	}
*************** getMessageFromWorker(ParallelState *psta
*** 1177,1183 ****
  	}
  
  	if (i < 0)
! 		exit_horribly(modulename, "error in ListenToWorkers(): %s\n", strerror(errno));
  
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
--- 1237,1243 ----
  	}
  
  	if (i < 0)
! 		exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
  
  	for (i = 0; i < pstate->numWorkers; i++)
  	{
*************** getMessageFromWorker(ParallelState *psta
*** 1186,1191 ****
--- 1246,1261 ----
  		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
  			continue;
  
+ 		/*
+ 		 * Read the message if any.  If the socket is ready because of EOF,
+ 		 * we'll return NULL instead (and the socket will stay ready, so the
+ 		 * condition will persist).
+ 		 *
+ 		 * Note: because this is a blocking read, we'll wait if only part of
+ 		 * the message is available.  Waiting a long time would be bad, but
+ 		 * since worker status messages are short and are always sent in one
+ 		 * operation, it shouldn't be a problem in practice.
+ 		 */
  		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
  		*worker = i;
  		return msg;
*************** getMessageFromWorker(ParallelState *psta
*** 1195,1203 ****
  }
  
  /*
!  * This function is executed in the master process.
   *
!  * It sends a message to a certain worker on the communication channel.
   */
  static void
  sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
--- 1265,1273 ----
  }
  
  /*
!  * Send a command message to the specified worker process.
   *
!  * This function is executed in the master process.
   */
  static void
  sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
*************** sendMessageToWorker(ParallelState *pstat
*** 1208,1214 ****
  	{
  		/*
  		 * If we're already aborting anyway, don't care if we succeed or not.
! 		 * The child might have gone already.
  		 */
  #ifndef WIN32
  		if (!aborting)
--- 1278,1285 ----
  	{
  		/*
  		 * If we're already aborting anyway, don't care if we succeed or not.
! 		 * The child might have gone already.  (XXX but if we're aborting
! 		 * already, why are we here at all?)
  		 */
  #ifndef WIN32
  		if (!aborting)
*************** sendMessageToWorker(ParallelState *pstat
*** 1220,1227 ****
  }
  
  /*
!  * The underlying function to read a message from the communication channel
!  * (fd) with optional blocking (do_wait).
   */
  static char *
  readMessageFromPipe(int fd)
--- 1291,1301 ----
  }
  
  /*
!  * Read one message from the specified pipe (fd), blocking if necessary
!  * until one is available, and return it as a malloc'd string.
!  * On EOF, return NULL.
!  *
!  * A "message" on the channel is just a null-terminated string.
   */
  static char *
  readMessageFromPipe(int fd)
*************** readMessageFromPipe(int fd)
*** 1232,1290 ****
  	int			ret;
  
  	/*
! 	 * The problem here is that we need to deal with several possibilities: we
! 	 * could receive only a partial message or several messages at once. The
! 	 * caller expects us to return exactly one message however.
! 	 *
! 	 * We could either read in as much as we can and keep track of what we
! 	 * delivered back to the caller or we just read byte by byte. Once we see
! 	 * (char) 0, we know that it's the message's end. This would be quite
! 	 * inefficient for more data but since we are reading only on the command
! 	 * channel, the performance loss does not seem worth the trouble of
! 	 * keeping internal states for different file descriptors.
  	 */
  	bufsize = 64;				/* could be any number */
  	msg = (char *) pg_malloc(bufsize);
- 
  	msgsize = 0;
  	for (;;)
  	{
! 		Assert(msgsize <= bufsize);
  		ret = piperead(fd, msg + msgsize, 1);
- 
- 		/* worker has closed the connection or another error happened */
  		if (ret <= 0)
! 			break;
  
  		Assert(ret == 1);
  
  		if (msg[msgsize] == '\0')
! 			return msg;
  
  		msgsize++;
! 		if (msgsize == bufsize)
  		{
! 			/* could be any number */
! 			bufsize += 16;
  			msg = (char *) pg_realloc(msg, bufsize);
  		}
  	}
  
! 	/*
! 	 * Worker has closed the connection, make sure to clean up before return
! 	 * since we are not returning msg (but did allocate it).
! 	 */
  	pg_free(msg);
- 
  	return NULL;
  }
  
  #ifdef WIN32
  /*
!  * This is a replacement version of pipe for Win32 which allows returned
!  * handles to be used in select(). Note that read/write calls must be replaced
!  * with recv/send.  "handles" have to be integers so we check for errors then
!  * cast to integers.
   */
  static int
  pgpipe(int handles[2])
--- 1306,1357 ----
  	int			ret;
  
  	/*
! 	 * In theory, if we let piperead() read multiple bytes, it might give us
! 	 * back fragments of multiple messages.  (That can't actually occur, since
! 	 * neither master nor workers send more than one message without waiting
! 	 * for a reply, but we don't wish to assume that here.)  For simplicity,
! 	 * read a byte at a time until we get the terminating '\0'.  This method
! 	 * is a bit inefficient, but since this is only used for relatively short
! 	 * command and status strings, it shouldn't matter.
  	 */
  	bufsize = 64;				/* could be any number */
  	msg = (char *) pg_malloc(bufsize);
  	msgsize = 0;
  	for (;;)
  	{
! 		Assert(msgsize < bufsize);
  		ret = piperead(fd, msg + msgsize, 1);
  		if (ret <= 0)
! 			break;				/* error or connection closure */
  
  		Assert(ret == 1);
  
  		if (msg[msgsize] == '\0')
! 			return msg;			/* collected whole message */
  
  		msgsize++;
! 		if (msgsize == bufsize) /* enlarge buffer if needed */
  		{
! 			bufsize += 16;		/* could be any number */
  			msg = (char *) pg_realloc(msg, bufsize);
  		}
  	}
  
! 	/* Other end has closed the connection */
  	pg_free(msg);
  	return NULL;
  }
  
  #ifdef WIN32
  /*
!  * This is a replacement version of pipe(2) for Windows which allows the pipe
!  * handles to be used in select().
!  *
!  * Reads and writes on the pipe must go through piperead()/pipewrite().
!  *
!  * For consistency with Unix we declare the returned handles as "int".
!  * This is okay even on WIN64 because system handles are not more than
!  * 32 bits wide, but we do have to do some casting.
   */
  static int
  pgpipe(int handles[2])
*************** pgpipe(int handles[2])
*** 1349,1354 ****
--- 1416,1423 ----
  	{
  		write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
  				  WSAGetLastError());
+ 		closesocket(handles[1]);
+ 		handles[1] = -1;
  		closesocket(s);
  		return -1;
  	}
*************** pgpipe(int handles[2])
*** 1367,1381 ****
  	return 0;
  }
  
  static int
  piperead(int s, char *buf, int len)
  {
  	int			ret = recv(s, buf, len, 0);
  
  	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
! 		/* EOF on the pipe! (win32 socket based implementation) */
  		ret = 0;
  	return ret;
  }
  
! #endif
--- 1436,1455 ----
  	return 0;
  }
  
+ /*
+  * Windows implementation of reading from a pipe.
+  */
  static int
  piperead(int s, char *buf, int len)
  {
  	int			ret = recv(s, buf, len, 0);
  
  	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
! 	{
! 		/* EOF on the pipe! */
  		ret = 0;
+ 	}
  	return ret;
  }
  
! #endif   /* WIN32 */
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 8ffd8f7..ad8e132 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** _allocAH(const char *FileSpec, const Arc
*** 2326,2331 ****
--- 2326,2334 ----
  	return AH;
  }
  
+ /*
+  * Write out all data (tables & blobs)
+  */
  void
  WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
  {
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2343,2357 ****
  		{
  			/*
  			 * If we are in a parallel backup, then we are always the master
! 			 * process.
  			 */
  			EnsureIdleWorker(AH, pstate);
- 			Assert(GetIdleWorker(pstate) != NO_SLOT);
  			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
  		}
  		else
  			WriteDataChunksForTocEntry(AH, te);
  	}
  	EnsureWorkersFinished(AH, pstate);
  }
  
--- 2346,2363 ----
  		{
  			/*
  			 * If we are in a parallel backup, then we are always the master
! 			 * process.  Dispatch each data-transfer job to a worker.
  			 */
  			EnsureIdleWorker(AH, pstate);
  			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
  		}
  		else
  			WriteDataChunksForTocEntry(AH, te);
  	}
+ 
+ 	/*
+ 	 * If parallel, wait for workers to finish.
+ 	 */
  	EnsureWorkersFinished(AH, pstate);
  }
  
*************** restore_toc_entries_parallel(ArchiveHand
*** 3819,3831 ****
  
  			par_list_remove(next_work_item);
  
- 			Assert(GetIdleWorker(pstate) != NO_SLOT);
  			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
  		}
  		else
  		{
  			/* at least one child is working and we have nothing ready. */
- 			Assert(!IsEveryWorkerIdle(pstate));
  		}
  
  		for (;;)
--- 3825,3835 ----
