diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile new file mode 100644 index a6ab39d..6336edc *** a/src/bin/pg_dump/Makefile --- b/src/bin/pg_dump/Makefile *************** include $(top_builddir)/src/Makefile.glo *** 19,25 **** override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) OBJS= pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \ ! pg_backup_null.o pg_backup_tar.o \ pg_backup_directory.o dumputils.o compress_io.o $(WIN32RES) KEYWRDOBJS = keywords.o kwlookup.o --- 19,25 ---- override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) OBJS= pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \ ! pg_backup_null.o pg_backup_tar.o parallel.o \ pg_backup_directory.o dumputils.o compress_io.o $(WIN32RES) KEYWRDOBJS = keywords.o kwlookup.o diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c new file mode 100644 index 768b923..0308f66 *** a/src/bin/pg_dump/compress_io.c --- b/src/bin/pg_dump/compress_io.c *************** *** 54,59 **** --- 54,60 ---- #include "compress_io.h" #include "dumputils.h" + #include "parallel.h" /*---------------------- * Compressor API *************** size_t *** 182,187 **** --- 183,191 ---- WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen) { + /* Are we aborting? */ + checkAborting(AH); + switch (cs->comprAlg) { case COMPR_ALG_LIBZ: *************** ReadDataFromArchiveZlib(ArchiveHandle *A *** 351,356 **** --- 355,363 ---- /* no minimal chunk size for zlib */ while ((cnt = readF(AH, &buf, &buflen))) { + /* Are we aborting? */ + checkAborting(AH); + zp->next_in = (void *) buf; zp->avail_in = cnt; *************** ReadDataFromArchiveNone(ArchiveHandle *A *** 411,416 **** --- 418,426 ---- while ((cnt = readF(AH, &buf, &buflen))) { + /* Are we aborting? */ + checkAborting(AH); + ahwrite(buf, 1, cnt, AH); } diff --git a/src/bin/pg_dump/dumputils.c b/src/bin/pg_dump/dumputils.c new file mode 100644 index 0a09882..50cdaf1 *** a/src/bin/pg_dump/dumputils.c --- b/src/bin/pg_dump/dumputils.c *************** static struct *** 38,43 **** --- 38,44 ---- } on_exit_nicely_list[MAX_ON_EXIT_NICELY]; static int on_exit_nicely_index; + void (*on_exit_msg_func)(const char *modulename, const char *fmt, va_list ap) = vwrite_msg; #define supports_grant_options(version) ((version) >= 70400) *************** static bool parseAclItem(const char *ite *** 48,58 **** --- 49,69 ---- static char *copyAclUserName(PQExpBuffer output, char *input); static void AddAcl(PQExpBuffer aclbuf, const char *keyword, const char *subname); + static PQExpBuffer getThreadLocalPQExpBuffer(void); #ifdef WIN32 + static void shutdown_parallel_dump_utils(int code, void* unused); static bool parallel_init_done = false; static DWORD tls_index; static DWORD mainThreadId; + + static void + shutdown_parallel_dump_utils(int code, void* unused) + { + /* Call the cleanup function only from the main thread */ + if (mainThreadId == GetCurrentThreadId()) + WSACleanup(); + } #endif void *************** init_parallel_dump_utils(void) *** 61,83 **** #ifdef WIN32 if (!parallel_init_done) { tls_index = TlsAlloc(); - parallel_init_done = true; mainThreadId = GetCurrentThreadId(); } #endif } /* ! * Quotes input string if it's not a legitimate SQL identifier as-is. ! * ! * Note that the returned string must be used before calling fmtId again, ! * since we re-use the same return buffer each time. Non-reentrant but ! * reduces memory leakage. (On Windows the memory leakage will be one buffer ! * per thread, which is at least better than one per call). */ ! const char * ! fmtId(const char *rawid) { /* * The Tls code goes awry if we use a static var, so we provide for both --- 72,100 ---- #ifdef WIN32 if (!parallel_init_done) { + WSADATA wsaData; + int err; + tls_index = TlsAlloc(); mainThreadId = GetCurrentThreadId(); + err = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (err != 0) + { + fprintf(stderr, _("WSAStartup failed: %d\n"), err); + exit_nicely(1); + } + on_exit_nicely(shutdown_parallel_dump_utils, NULL); + parallel_init_done = true; } #endif } /* ! * Non-reentrant but reduces memory leakage. (On Windows the memory leakage ! * will be one buffer per thread, which is at least better than one per call). */ ! static PQExpBuffer ! getThreadLocalPQExpBuffer(void) { /* * The Tls code goes awry if we use a static var, so we provide for both *************** fmtId(const char *rawid) *** 86,94 **** static PQExpBuffer s_id_return = NULL; PQExpBuffer id_return; - const char *cp; - bool need_quotes = false; - #ifdef WIN32 if (parallel_init_done) id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */ --- 103,108 ---- *************** fmtId(const char *rawid) *** 118,123 **** --- 132,154 ---- } + return id_return; + } + + /* + * Quotes input string if it's not a legitimate SQL identifier as-is. + * + * Note that the returned string must be used before calling fmtId again, + * since we re-use the same return buffer each time. + */ + const char * + fmtId(const char *rawid) + { + PQExpBuffer id_return = getThreadLocalPQExpBuffer(); + + const char *cp; + bool need_quotes = false; + /* * These checks need to match the identifier production in scan.l. Don't * use islower() etc. *************** fmtId(const char *rawid) *** 185,190 **** --- 216,250 ---- return id_return->data; } + /* + * fmtQualifiedId - convert a qualified name to the proper format for + * the source database. + * + * Like fmtId, use the result before calling again. + * + * Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot + * use it until we're finished with calling fmtId(). + */ + const char * + fmtQualifiedId(int remoteVersion, const char *schema, const char *id) + { + PQExpBuffer id_return; + PQExpBuffer lcl_pqexp = createPQExpBuffer(); + + /* Suppress schema name if fetching from pre-7.3 DB */ + if (remoteVersion >= 70300 && schema && *schema) + { + appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema)); + } + appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id)); + + id_return = getThreadLocalPQExpBuffer(); + + appendPQExpBuffer(id_return, "%s", lcl_pqexp->data); + destroyPQExpBuffer(lcl_pqexp); + + return id_return->data; + } /* * Convert a string value to an SQL string literal and append it to *************** exit_horribly(const char *modulename, co *** 1315,1321 **** va_list ap; va_start(ap, fmt); ! vwrite_msg(modulename, fmt, ap); va_end(ap); exit_nicely(1); --- 1375,1381 ---- va_list ap; va_start(ap, fmt); ! on_exit_msg_func(modulename, fmt, ap); va_end(ap); exit_nicely(1); diff --git a/src/bin/pg_dump/dumputils.h b/src/bin/pg_dump/dumputils.h new file mode 100644 index a4b351d..3abf24d *** a/src/bin/pg_dump/dumputils.h --- b/src/bin/pg_dump/dumputils.h *************** extern const char *progname; *** 47,52 **** --- 47,54 ---- extern void init_parallel_dump_utils(void); extern const char *fmtId(const char *identifier); + extern const char *fmtQualifiedId(int remoteVersion, + const char *schema, const char *id); extern void appendStringLiteral(PQExpBuffer buf, const char *str, int encoding, bool std_strings); extern void appendStringLiteralConn(PQExpBuffer buf, const char *str, *************** __attribute__((format(PG_PRINTF_ATTRIBUT *** 85,90 **** --- 87,94 ---- extern void exit_horribly(const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn)); + extern void (*on_exit_msg_func)(const char *modulename, const char *fmt, va_list ap) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0))); extern void on_exit_nicely(on_exit_nicely_callback function, void *arg); extern void exit_nicely(int code) __attribute__((noreturn)); diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c new file mode 100644 index ...7d4e4e6 *** a/src/bin/pg_dump/parallel.c --- b/src/bin/pg_dump/parallel.c *************** *** 0 **** --- 1,1282 ---- + /*------------------------------------------------------------------------- + * + * parallel.c + * + * Parallel support for the pg_dump archiver + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * The author is not responsible for loss or damages that may + * result from its use. + * + * IDENTIFICATION + * src/bin/pg_dump/parallel.c + * + *------------------------------------------------------------------------- + */ + + #include "pg_backup_db.h" + + #include "dumputils.h" + #include "parallel.h" + + #ifndef WIN32 + #include + #include + #include "signal.h" + #include + #include + #endif + + #define PIPE_READ 0 + #define PIPE_WRITE 1 + + /* file-scope variables */ + #ifdef WIN32 + static unsigned int tMasterThreadId = 0; + static HANDLE termEvent = INVALID_HANDLE_VALUE; + static int pgpipe(int handles[2]); + static int piperead(int s, char *buf, int len); + #define pipewrite(a,b,c) send(a,b,c,0) + #else + /* + * aborting is only ever used in the master, the workers are fine with just + * wantAbort. + */ + static bool aborting = false; + static volatile sig_atomic_t wantAbort = 0; + #define pgpipe(a) pipe(a) + #define piperead(a,b,c) read(a,b,c) + #define pipewrite(a,b,c) write(a,b,c) + #endif + + typedef struct ShutdownInformation + { + ParallelState *pstate; + Archive *AHX; + } ShutdownInformation; + + static ShutdownInformation shutdown_info; + + static const char *modulename = gettext_noop("parallel archiver"); + + static ParallelSlot *GetMyPSlot(ParallelState *pstate); + static void parallel_exit_msg_func(const char *modulename, + const char *fmt, va_list ap) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0))); + static void parallel_msg_master(ParallelSlot *slot, const char *modulename, + const char *fmt, va_list ap) + __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0))); + static void archive_close_connection(int code, void *arg); + static void ShutdownWorkersHard(ParallelState *pstate); + static void WaitForTerminatingWorkers(ParallelState *pstate); + #ifndef WIN32 + static void sigTermHandler(int signum); + #endif + static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker, + RestoreOptions *ropt); + static bool HasEveryWorkerTerminated(ParallelState *pstate); + + static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te); + static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); + static char *getMessageFromMaster(int pipefd[2]); + static void sendMessageToMaster(int pipefd[2], const char *str); + static int select_loop(int maxFd, fd_set *workerset); + static char *getMessageFromWorker(ParallelState *pstate, + bool do_wait, int *worker); + static void sendMessageToWorker(ParallelState *pstate, + int worker, const char *str); + static char *readMessageFromPipe(int fd); + + #define messageStartsWith(msg, prefix) \ + (strncmp(msg, prefix, strlen(prefix)) == 0) + #define messageEquals(msg, pattern) \ + (strcmp(msg, pattern) == 0) + + static ParallelSlot * + GetMyPSlot(ParallelState *pstate) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + #ifdef WIN32 + if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) + #else + if (pstate->parallelSlot[i].pid == getpid()) + #endif + return &(pstate->parallelSlot[i]); + + return NULL; + } + + /* + * This is the function that will be called from exit_horribly() to print the + * error message. If the worker process does exit_horribly(), we forward its + * last words to the master process. The master process then does + * exit_horribly() with this error message itself and prints it normally. + * After printing the message, exit_horribly() on the master will shut down + * the remaining worker processes. + */ + static void + parallel_exit_msg_func(const char *modulename, const char *fmt, va_list ap) + { + ParallelState *pstate = shutdown_info.pstate; + ParallelSlot *slot; + + Assert(pstate); + + slot = GetMyPSlot(pstate); + + if (!slot) + /* We're the parent, just write the message out */ + vwrite_msg(modulename, fmt, ap); + else + /* If we're a worker process, send the msg to the master process */ + parallel_msg_master(slot, modulename, fmt, ap); + } + + /* Sends the error message from the worker to the master process */ + static void + parallel_msg_master(ParallelSlot *slot, const char *modulename, + const char *fmt, va_list ap) + { + char buf[512]; + int pipefd[2]; + + pipefd[PIPE_READ] = slot->pipeRevRead; + pipefd[PIPE_WRITE] = slot->pipeRevWrite; + + strcpy(buf, "ERROR "); + vsnprintf(buf + strlen("ERROR "), + sizeof(buf) - strlen("ERROR "), fmt, ap); + + sendMessageToMaster(pipefd, buf); + } + + /* + * pg_dump and pg_restore register the Archive pointer for the exit handler + * (called from exit_horribly). This function mainly exists so that we can + * keep shutdown_info in file scope only. + */ + void + on_exit_close_archive(Archive *AHX) + { + shutdown_info.AHX = AHX; + on_exit_nicely(archive_close_connection, &shutdown_info); + } + + /* + * This function can close archives in both the parallel and non-parallel + * case. + */ + static void + archive_close_connection(int code, void *arg) + { + ShutdownInformation *si = (ShutdownInformation *) arg; + + if (si->pstate) + { + ParallelSlot *slot = GetMyPSlot(si->pstate); + + if (!slot) { + /* + * We're the master: We have already printed out the message + * passed to exit_horribly() either from the master itself or from + * a worker process. Now we need to close our own database + * connection (only open during parallel dump but not restore) and + * shut down the remaining workers. + */ + DisconnectDatabase(si->AHX); + #ifndef WIN32 + /* + * Setting aborting to true switches to best-effort-mode + * (send/receive but ignore errors) in communicating with our + * workers. + */ + aborting = true; + #endif + ShutdownWorkersHard(si->pstate); + } + else if (slot->args->AH) + DisconnectDatabase(&(slot->args->AH->public)); + } + else if (si->AHX) + DisconnectDatabase(si->AHX); + } + + /* + * If we have one worker that terminates for some reason, we'd like the other + * threads to terminate as well (and not finish with their 70 GB table dump + * first...). Now in UNIX we can just kill these processes, and let the signal + * handler set wantAbort to 1. In Windows we set a termEvent and this serves + * as the signal for everyone to terminate. + */ + void + checkAborting(ArchiveHandle *AH) + { + #ifdef WIN32 + if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0) + #else + if (wantAbort) + #endif + exit_horribly(modulename, "worker is terminating\n"); + } + + /* + * Shut down any remaining workers, this has an implicit do_wait == true. + * + * The fastest way we can make the workers terminate gracefully is when + * they are listening for new commands and we just tell them to terminate. + */ + static void + ShutdownWorkersHard(ParallelState *pstate) + { + #ifndef WIN32 + int i; + signal(SIGPIPE, SIG_IGN); + + /* + * Close our write end of the sockets so that the workers know they can + * exit. + */ + for (i = 0; i < pstate->numWorkers; i++) + closesocket(pstate->parallelSlot[i].pipeWrite); + + for (i = 0; i < pstate->numWorkers; i++) + kill(pstate->parallelSlot[i].pid, SIGTERM); + + #else + /* The workers monitor this event via checkAborting(). */ + SetEvent(termEvent); + #endif + + WaitForTerminatingWorkers(pstate); + } + + /* + * Wait for the termination of the processes using the OS-specific method. + */ + static void + WaitForTerminatingWorkers(ParallelState *pstate) + { + while (!HasEveryWorkerTerminated(pstate)) + { + ParallelSlot *slot = NULL; + int j; + #ifndef WIN32 + int status; + pid_t pid = wait(&status); + for (j = 0; j < pstate->numWorkers; j++) + if (pstate->parallelSlot[j].pid == pid) + slot = &(pstate->parallelSlot[j]); + #else + uintptr_t hThread; + DWORD ret; + uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); + int nrun = 0; + for (j = 0; j < pstate->numWorkers; j++) + if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) + { + lpHandles[nrun] = pstate->parallelSlot[j].hThread; + nrun++; + } + ret = WaitForMultipleObjects(nrun, (HANDLE*) lpHandles, false, INFINITE); + Assert(ret != WAIT_FAILED); + hThread = lpHandles[ret - WAIT_OBJECT_0]; + + for (j = 0; j < pstate->numWorkers; j++) + if (pstate->parallelSlot[j].hThread == hThread) + slot = &(pstate->parallelSlot[j]); + + free(lpHandles); + #endif + Assert(slot); + + slot->workerStatus = WRKR_TERMINATED; + } + Assert(HasEveryWorkerTerminated(pstate)); + } + + #ifndef WIN32 + /* Signal handling (UNIX only) */ + static void + sigTermHandler(int signum) + { + wantAbort = 1; + } + #endif + + /* + * This function is called by both UNIX and Windows variants to set up a + * worker process. + */ + static void + SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker, + RestoreOptions *ropt) + { + /* + * In dump mode (pg_dump) this calls _SetupWorker() as defined in + * pg_dump.c, while in restore mode (pg_restore) it calls _SetupWorker() + * as defined in pg_restore.c. + * + * We get the raw connection only for the reason that we can close it + * properly when we shut down. This happens only that way when it is + * brought down because of an error. + */ + _SetupWorker((Archive *) AH, ropt); + + Assert(AH->connection != NULL); + + WaitForCommands(AH, pipefd); + + closesocket(pipefd[PIPE_READ]); + closesocket(pipefd[PIPE_WRITE]); + } + + #ifdef WIN32 + /* + * On Windows the _beginthreadex() function allows us to pass one parameter. + * Since we need to pass a few values however, we define a structure here + * and then pass a pointer to such a structure in _beginthreadex(). + */ + typedef struct { + ArchiveHandle *AH; + RestoreOptions *ropt; + int worker; + int pipeRead; + int pipeWrite; + } WorkerInfo; + + static unsigned __stdcall + init_spawned_worker_win32(WorkerInfo *wi) + { + ArchiveHandle *AH; + int pipefd[2] = { wi->pipeRead, wi->pipeWrite }; + int worker = wi->worker; + RestoreOptions *ropt = wi->ropt; + + AH = CloneArchive(wi->AH); + + free(wi); + SetupWorker(AH, pipefd, worker, ropt); + + DeCloneArchive(AH); + _endthreadex(0); + return 0; + } + #endif + + /* + * This function starts the parallel dump or restore by spawning off the + * worker processes in both Unix and Windows. For Windows, it creates a number + * of threads while it does a fork() on Unix. + */ + ParallelState * + ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt) + { + ParallelState *pstate; + int i; + const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot); + + Assert(AH->public.numWorkers > 0); + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + + pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); + + pstate->numWorkers = AH->public.numWorkers; + pstate->parallelSlot = NULL; + + if (AH->public.numWorkers == 1) + return pstate; + + pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); + memset((void *) pstate->parallelSlot, 0, slotSize); + + /* + * Set the pstate in the shutdown_info. The exit handler uses pstate if + * set and falls back to AHX otherwise. + */ + shutdown_info.pstate = pstate; + on_exit_msg_func = parallel_exit_msg_func; + + #ifdef WIN32 + tMasterThreadId = GetCurrentThreadId(); + termEvent = CreateEvent(NULL, true, false, "Terminate"); + #else + signal(SIGTERM, sigTermHandler); + signal(SIGINT, sigTermHandler); + signal(SIGQUIT, sigTermHandler); + #endif + + for (i = 0; i < pstate->numWorkers; i++) + { + #ifdef WIN32 + WorkerInfo *wi; + uintptr_t handle; + #else + pid_t pid; + #endif + int pipeMW[2], pipeWM[2]; + + if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) + exit_horribly(modulename, + "Cannot create communication channels: %s\n", + strerror(errno)); + + pstate->parallelSlot[i].workerStatus = WRKR_IDLE; + pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); + pstate->parallelSlot[i].args->AH = NULL; + pstate->parallelSlot[i].args->te = NULL; + #ifdef WIN32 + /* Allocate a new structure for every worker */ + wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); + + wi->ropt = ropt; + wi->worker = i; + wi->AH = AH; + wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ]; + wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE]; + + handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, + wi, 0, &(pstate->parallelSlot[i].threadId)); + pstate->parallelSlot[i].hThread = handle; + #else + pid = fork(); + if (pid == 0) + { + /* we are the worker */ + int j; + int pipefd[2] = { pipeMW[PIPE_READ], pipeWM[PIPE_WRITE] }; + + /* + * Store the fds for the reverse communication in pstate. Actually + * we only use this in case of an error and don't use pstate + * otherwise in the worker process. On Windows we write to the + * global pstate, in Unix we write to our process-local copy but + * that's also where we'd retrieve this information back from. + */ + pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ]; + pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE]; + pstate->parallelSlot[i].pid = getpid(); + + /* + * Call CloneArchive on Unix as well even though technically we + * don't need to because fork() gives us a copy in our own address + * space already. But CloneArchive resets the state information + * and also clones the database connection (for parallel dump) + * which both seem kinda helpful. + */ + pstate->parallelSlot[i].args->AH = CloneArchive(AH); + + /* close read end of Worker -> Master */ + closesocket(pipeWM[PIPE_READ]); + /* close write end of Master -> Worker */ + closesocket(pipeMW[PIPE_WRITE]); + + /* + * Close all inherited fds for communication of the master with + * the other workers. + */ + for (j = 0; j < i; j++) + { + closesocket(pstate->parallelSlot[j].pipeRead); + closesocket(pstate->parallelSlot[j].pipeWrite); + } + + SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt); + + exit(0); + } + else if (pid < 0) + /* fork failed */ + exit_horribly(modulename, + "could not create worker process: %s\n", + strerror(errno)); + + /* we are the Master, pid > 0 here */ + Assert(pid > 0); + + /* close read end of Master -> Worker */ + closesocket(pipeMW[PIPE_READ]); + /* close write end of Worker -> Master */ + closesocket(pipeWM[PIPE_WRITE]); + + pstate->parallelSlot[i].pid = pid; + #endif + + pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ]; + pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE]; + } + + return pstate; + } + + /* + * Tell all of our workers to terminate. + * + * Pretty straightforward routine, first we tell everyone to terminate, then + * we listen to the workers' replies and finally close the sockets that we + * have used for communication. + */ + void + ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) + { + int i; + + if (pstate->numWorkers == 1) + return; + + Assert(IsEveryWorkerIdle(pstate)); + + /* close the sockets so that the workers know they can exit */ + for (i = 0; i < pstate->numWorkers; i++) + { + closesocket(pstate->parallelSlot[i].pipeRead); + closesocket(pstate->parallelSlot[i].pipeWrite); + } + WaitForTerminatingWorkers(pstate); + + /* + * Remove the pstate again, so the exit handler in the parent will now + * again fall back to closing AH->connection (if connected). + */ + shutdown_info.pstate = NULL; + + free(pstate->parallelSlot); + free(pstate); + } + + + /* + * The sequence is the following (for dump, similar for restore): + * + * The master process starts the parallel backup in ParllelBackupStart, this + * forks the worker processes which enter WaitForCommand(). + * + * The master process dispatches an individual work item to one of the worker + * processes in DispatchJobForTocEntry(). It calls + * AH->MasterStartParallelItemPtr, a routine of the output format. This + * function's arguments are the parents archive handle AH (containing the full + * catalog information), the TocEntry that the worker should work on and a + * T_Action act indicating whether this is a backup or a restore item. The + * function then converts the TocEntry assignment into a string that is then + * sent over to the worker process. In the simplest case that would be + * something like "DUMP 1234", with 1234 being the TocEntry id. + * + * The worker receives the message in the routine pointed to by + * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to + * corresponding routines of the respective output format, e.g. + * _WorkerJobDumpDirectory(). + * + * Remember that we have forked off the workers only after we have read in the + * catalog. That's why our worker processes can also access the catalog + * information. Now they re-translate the textual representation to a TocEntry + * on their side and do the required action (restore or dump). + * + * The result is again a textual string that is sent back to the master and is + * interpreted by AH->MasterEndParallelItemPtr. This function can update state + * or catalog information on the master's side, depending on the reply from + * the worker process. In the end it returns status which is 0 for successful + * execution. + * + * --------------------------------------------------------------------- + * Master Worker + * + * enters WaitForCommands() + * DispatchJobForTocEntry(...te...) + * + * [ Worker is IDLE ] + * + * arg = (MasterStartParallelItemPtr)() + * send: DUMP arg + * receive: DUMP arg + * str = (WorkerJobDumpPtr)(arg) + * [ Worker is WORKING ] ... gets te from arg ... + * ... dump te ... + * send: OK DUMP info + * + * In ListenToWorkers(): + * + * [ Worker is FINISHED ] + * receive: OK DUMP info + * status = (MasterEndParallelItemPtr)(info) + * + * In ReapWorkerStatus(&ptr): + * *ptr = status; + * [ Worker is IDLE ] + * --------------------------------------------------------------------- + */ + void + DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, + T_Action act) + { + int worker; + char *arg; + + /* our caller makes sure that at least one worker is idle */ + Assert(GetIdleWorker(pstate) != NO_SLOT); + worker = GetIdleWorker(pstate); + Assert(worker != NO_SLOT); + + arg = (AH->MasterStartParallelItemPtr)(AH, te, act); + + sendMessageToWorker(pstate, worker, arg); + + pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; + pstate->parallelSlot[worker].args->te = te; + } + + /* + * Find the first free parallel slot (if any). + */ + int + GetIdleWorker(ParallelState *pstate) + { + int i; + for (i = 0; i < pstate->numWorkers; i++) + if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) + return i; + return NO_SLOT; + } + + /* + * Return true iff every worker process is in the WRKR_TERMINATED state. + */ + static bool + HasEveryWorkerTerminated(ParallelState *pstate) + { + int i; + for (i = 0; i < pstate->numWorkers; i++) + if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) + return false; + return true; + } + + /* + * Return true iff every worker is in the WRKR_IDLE state. + */ + bool + IsEveryWorkerIdle(ParallelState *pstate) + { + int i; + for (i = 0; i < pstate->numWorkers; i++) + if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) + return false; + return true; + } + + /* + * --------------------------------------------------------------------- + * One danger of the parallel backup is a possible deadlock: + * + * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode. + * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted + * because the master holds a conflicting ACCESS SHARE lock). + * 3) The worker process also requests an ACCESS SHARE lock to read the table. + * The worker's not granted that lock but is enqueued behind the ACCESS + * EXCLUSIVE lock request. + * --------------------------------------------------------------------- + * + * Now what we do here is to just request a lock in ACCESS SHARE but with + * NOWAIT in the worker prior to touching the table. If we don't get the lock, + * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and + * are good to just fail the whole backup because we have detected a deadlock. + */ + static void + lockTableNoWait(ArchiveHandle *AH, TocEntry *te) + { + Archive *AHX = (Archive *) AH; + const char *qualId; + PQExpBuffer query = createPQExpBuffer(); + PGresult *res; + + Assert(AH->format == archDirectory); + Assert(strcmp(te->desc, "BLOBS") != 0); + + appendPQExpBuffer(query, + "SELECT pg_namespace.nspname," + " pg_class.relname " + " FROM pg_class " + " JOIN pg_namespace on pg_namespace.oid = relnamespace " + " WHERE pg_class.oid = %d", te->catalogId.oid); + + res = PQexec(AH->connection, query->data); + + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) + exit_horribly(modulename, + "could not get relation name for oid %d: %s\n", + te->catalogId.oid, PQerrorMessage(AH->connection)); + + resetPQExpBuffer(query); + + qualId = fmtQualifiedId(AHX->remoteVersion, + PQgetvalue(res, 0, 0), + PQgetvalue(res, 0, 1)); + + appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT", + qualId); + PQclear(res); + + res = PQexec(AH->connection, query->data); + + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + exit_horribly(modulename, + "could not obtain lock on relation \"%s\". This " + "usually means that someone requested an ACCESS EXCLUSIVE lock " + "on the table after the pg_dump parent process has gotten the " + "initial ACCESS SHARE lock on the table.\n", qualId); + + PQclear(res); + destroyPQExpBuffer(query); + } + + /* + * That's the main routine for the worker. + * When it starts up it enters this routine and waits for commands from the + * master process. After having processed a command it comes back to here to + * wait for the next command. Finally it will receive a TERMINATE command and + * exit. + */ + static void + WaitForCommands(ArchiveHandle *AH, int pipefd[2]) + { + char *command; + DumpId dumpId; + int nBytes; + char *str = NULL; + TocEntry *te; + + for(;;) + { + if (!(command = getMessageFromMaster(pipefd))) + { + PQfinish(AH->connection); + AH->connection = NULL; + return; + } + + if (messageStartsWith(command, "DUMP ")) + { + Assert(AH->format == archDirectory); + sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(command) - strlen("DUMP ")); + + te = getTocEntryByDumpId(AH, dumpId); + Assert(te != NULL); + + /* + * Lock the table but with NOWAIT. Note that the parent is already + * holding a lock. If we cannot acquire another ACCESS SHARE MODE + * lock, then somebody else has requested an exclusive lock in the + * meantime. lockTableNoWait dies in this case to prevent a + * deadlock. + */ + if (strcmp(te->desc, "BLOBS") != 0) + lockTableNoWait(AH, te); + + /* + * The message we return here has been pg_malloc()ed and we are + * responsible for free()ing it. + */ + str = (AH->WorkerJobDumpPtr)(AH, te); + Assert(AH->connection != NULL); + sendMessageToMaster(pipefd, str); + free(str); + } + else if (messageStartsWith(command, "RESTORE ")) + { + Assert(AH->format == archDirectory || AH->format == archCustom); + Assert(AH->connection != NULL); + + sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(command) - strlen("RESTORE ")); + + te = getTocEntryByDumpId(AH, dumpId); + Assert(te != NULL); + /* + * The message we return here has been pg_malloc()ed and we are + * responsible for free()ing it. + */ + str = (AH->WorkerJobRestorePtr)(AH, te); + Assert(AH->connection != NULL); + sendMessageToMaster(pipefd, str); + free(str); + } + else + exit_horribly(modulename, + "Unknown command on communication channel: %s\n", + command); + } + } + + /* + * --------------------------------------------------------------------- + * Note the status change: + * + * DispatchJobForTocEntry WRKR_IDLE -> WRKR_WORKING + * ListenToWorkers WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED + * ReapWorkerStatus WRKR_FINISHED -> WRKR_IDLE + * --------------------------------------------------------------------- + * + * Just calling ReapWorkerStatus() when all workers are working might or might + * not give you an idle worker because you need to call ListenToWorkers() in + * between and only thereafter ReapWorkerStatus(). This is necessary in order + * to get and deal with the status (=result) of the worker's execution. + */ + void + ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) + { + int worker; + char *msg; + + msg = getMessageFromWorker(pstate, do_wait, &worker); + + if (!msg) + { + if (do_wait) + exit_horribly(modulename, "A worker process died unexpectedly\n"); + return; + } + + if (messageStartsWith(msg, "OK ")) + { + char *statusString; + TocEntry *te; + + pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; + te = pstate->parallelSlot[worker].args->te; + if (messageStartsWith(msg, "OK RESTORE ")) + { + statusString = msg + strlen("OK RESTORE "); + pstate->parallelSlot[worker].status = + (AH->MasterEndParallelItemPtr) + (AH, te, statusString, ACT_RESTORE); + } + else if (messageStartsWith(msg, "OK DUMP ")) + { + statusString = msg + strlen("OK DUMP "); + pstate->parallelSlot[worker].status = + (AH->MasterEndParallelItemPtr) + (AH, te, statusString, ACT_DUMP); + } + else + exit_horribly(modulename, + "Invalid message received from worker: %s\n", msg); + } + else if (messageStartsWith(msg, "ERROR ")) + { + Assert(AH->format == archDirectory || AH->format == archCustom); + pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED; + exit_horribly(modulename, "%s", msg + strlen("ERROR ")); + } + else + exit_horribly(modulename, "Invalid message received from worker: %s\n", msg); + + /* both Unix and Win32 return pg_malloc()ed space, so we free it */ + free(msg); + } + + /* + * This function is executed in the master process. + * + * This function is used to get the return value of a terminated worker + * process. If a process has terminated, its status is stored in *status and + * the id of the worker is returned. + */ + int + ReapWorkerStatus(ParallelState *pstate, int *status) + { + int i; + + for (i = 0; i < pstate->numWorkers; i++) + { + if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) + { + *status = pstate->parallelSlot[i].status; + pstate->parallelSlot[i].status = 0; + pstate->parallelSlot[i].workerStatus = WRKR_IDLE; + return i; + } + } + return NO_SLOT; + } + + /* + * This function is executed in the master process. + * + * It looks for an idle worker process and only returns if there is one. + */ + void + EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate) + { + int ret_worker; + int work_status; + + for (;;) + { + int nTerm = 0; + while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) + { + if (work_status != 0) + exit_horribly(modulename, "Error processing a parallel work item.\n"); + + nTerm++; + } + + /* + * We need to make sure that we have an idle worker before dispatching + * the next item. If nTerm > 0 we already have that (quick check). + */ + if (nTerm > 0) + return; + + /* explicit check for an idle worker */ + if (GetIdleWorker(pstate) != NO_SLOT) + return; + + /* + * If we have no idle worker, read the result of one or more + * workers and loop the loop to call ReapWorkerStatus() on them + */ + ListenToWorkers(AH, pstate, true); + } + } + + /* + * This function is executed in the master process. + * + * It waits for all workers to terminate. + */ + void + EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate) + { + int work_status; + + if (!pstate || pstate->numWorkers == 1) + return; + + /* Waiting for the remaining worker processes to finish */ + while (!IsEveryWorkerIdle(pstate)) + { + if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT) + ListenToWorkers(AH, pstate, true); + else if (work_status != 0) + exit_horribly(modulename, + "Error processing a parallel work item\n"); + } + } + + /* + * This function is executed in the worker process. + * + * It returns the next message on the communication channel, blocking until it + * becomes available. + */ + static char * + getMessageFromMaster(int pipefd[2]) + { + return readMessageFromPipe(pipefd[PIPE_READ]); + } + + /* + * This function is executed in the worker process. + * + * It sends a message to the master on the communication channel. + */ + static void + sendMessageToMaster(int pipefd[2], const char *str) + { + int len = strlen(str) + 1; + + if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) + exit_horribly(modulename, + "Error writing to the communication channel: %s\n", + strerror(errno)); + } + + /* + * A select loop that repeats calling select until a descriptor in the read + * set becomes readable. On Windows we have to check for the termination event + * from time to time, on Unix we can just block forever. + */ + #ifdef WIN32 + static int + select_loop(int maxFd, fd_set *workerset) + { + int i; + fd_set saveSet = *workerset; + + /* should always be the master */ + Assert(tMasterThreadId == GetCurrentThreadId()); + + for (;;) + { + /* + * sleep a quarter of a second before checking if we should + * terminate. + */ + struct timeval tv = { 0, 250000 }; + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, &tv); + + if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) + continue; + if (i) + break; + } + + return i; + } + #else /* UNIX */ + static int + select_loop(int maxFd, fd_set *workerset) + { + int i; + + fd_set saveSet = *workerset; + for (;;) + { + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, NULL); + + /* + * If we Ctrl-C the master process , it's likely that we interrupt + * select() here. The signal handler will set wantAbort == true and + * the shutdown journey starts from here. Note that we'll come back + * here later when we tell all workers to terminate and read their + * responses. But then we have aborting set to true. + */ + if (wantAbort && !aborting) + exit_horribly(modulename, "terminated by user\n"); + + if (i < 0 && errno == EINTR) + continue; + break; + } + + return i; + } + #endif + + /* + * This function is executed in the master process. + * + * It returns the next message from the worker on the communication channel, + * optionally blocking (do_wait) until it becomes available. + * + * The id of the worker is returned in *worker. + */ + static char * + getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) + { + int i; + fd_set workerset; + int maxFd = -1; + struct timeval nowait = { 0, 0 }; + + FD_ZERO(&workerset); + + for (i = 0; i < pstate->numWorkers; i++) + { + if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) + continue; + FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); + /* actually WIN32 ignores the first parameter to select()... */ + if (pstate->parallelSlot[i].pipeRead > maxFd) + maxFd = pstate->parallelSlot[i].pipeRead; + } + + if (do_wait) + { + i = select_loop(maxFd, &workerset); + Assert(i != 0); + } + else + { + if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) + return NULL; + } + + if (i < 0) + exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno)); + + for (i = 0; i < pstate->numWorkers; i++) + { + char *msg; + + if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) + continue; + + msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); + *worker = i; + return msg; + } + Assert(false); + return NULL; + } + + /* + * This function is executed in the master process. + * + * It sends a message to a certain worker on the communication channel. + */ + static void + sendMessageToWorker(ParallelState *pstate, int worker, const char *str) + { + int len = strlen(str) + 1; + + if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) + { + /* + * If we're already aborting anyway, don't care if we succeed or not. + * The child might have gone already. + */ + #ifndef WIN32 + if (!aborting) + #endif + exit_horribly(modulename, + "Error writing to the communication channel: %s\n", + strerror(errno)); + } + } + + /* + * The underlying function to read a message from the communication channel + * (fd) with optional blocking (do_wait). + */ + static char * + readMessageFromPipe(int fd) + { + char *msg; + int msgsize, bufsize; + int ret; + + /* + * The problem here is that we need to deal with several possibilites: + * we could receive only a partial message or several messages at once. + * The caller expects us to return exactly one message however. + * + * We could either read in as much as we can and keep track of what we + * delivered back to the caller or we just read byte by byte. Once we see + * (char) 0, we know that it's the message's end. This would be quite + * inefficient for more data but since we are reading only on the command + * channel, the performance loss does not seem worth the trouble of + * keeping internal states for different file descriptors. + */ + bufsize = 64; /* could be any number */ + msg = (char *) pg_malloc(bufsize); + + msgsize = 0; + for (;;) + { + Assert(msgsize <= bufsize); + ret = piperead(fd, msg + msgsize, 1); + + /* worker has closed the connection or another error happened */ + if (ret <= 0) + return NULL; + + Assert(ret == 1); + + if (msg[msgsize] == '\0') + return msg; + + msgsize++; + if (msgsize == bufsize) + { + /* could be any number */ + bufsize += 16; + msg = (char *) realloc(msg, bufsize); + } + } + } + + #ifdef WIN32 + /* + * This is a replacement version of pipe for Win32 which allows returned + * handles to be used in select(). Note that read/write calls must be replaced + * with recv/send. + */ + static int + pgpipe(int handles[2]) + { + SOCKET s; + struct sockaddr_in serv_addr; + int len = sizeof(serv_addr); + + handles[0] = handles[1] = INVALID_SOCKET; + + if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + { + write_msg(modulename, "pgpipe could not create socket: %ui", + WSAGetLastError()); + return -1; + } + + memset((void *) &serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(0); + serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) + { + write_msg(modulename, "pgpipe could not bind: %ui", + WSAGetLastError()); + closesocket(s); + return -1; + } + if (listen(s, 1) == SOCKET_ERROR) + { + write_msg(modulename, "pgpipe could not listen: %ui", + WSAGetLastError()); + closesocket(s); + return -1; + } + if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR) + { + write_msg(modulename, "pgpipe could not getsockname: %ui", + WSAGetLastError()); + closesocket(s); + return -1; + } + if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + { + write_msg(modulename, "pgpipe could not create socket 2: %ui", + WSAGetLastError()); + closesocket(s); + return -1; + } + + if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) + { + write_msg(modulename, "pgpipe could not connect socket: %ui", + WSAGetLastError()); + closesocket(s); + return -1; + } + if ((handles[0] = accept(s, (SOCKADDR *) &serv_addr, &len)) == INVALID_SOCKET) + { + write_msg(modulename, "pgpipe could not accept socket: %ui", + WSAGetLastError()); + closesocket(handles[1]); + handles[1] = INVALID_SOCKET; + closesocket(s); + return -1; + } + closesocket(s); + return 0; + } + + static int + piperead(int s, char *buf, int len) + { + int ret = recv(s, buf, len, 0); + + if (ret < 0 && WSAGetLastError() == WSAECONNRESET) + /* EOF on the pipe! (win32 socket based implementation) */ + ret = 0; + return ret; + } + #endif diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h new file mode 100644 index ...3eafe2f *** a/src/bin/pg_dump/parallel.h --- b/src/bin/pg_dump/parallel.h *************** *** 0 **** --- 1,86 ---- + /*------------------------------------------------------------------------- + * + * parallel.h + * + * Parallel support header file for the pg_dump archiver + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * The author is not responsible for loss or damages that may + * result from its use. + * + * IDENTIFICATION + * src/bin/pg_dump/parallel.h + * + *------------------------------------------------------------------------- + */ + + #include "pg_backup_db.h" + + struct _archiveHandle; + struct _tocEntry; + + typedef enum + { + WRKR_TERMINATED = 0, + WRKR_IDLE, + WRKR_WORKING, + WRKR_FINISHED + } T_WorkerStatus; + + typedef enum T_Action + { + ACT_DUMP, + ACT_RESTORE, + } T_Action; + + /* Arguments needed for a worker process */ + typedef struct ParallelArgs + { + struct _archiveHandle *AH; + struct _tocEntry *te; + } ParallelArgs; + + /* State for each parallel activity slot */ + typedef struct ParallelSlot + { + ParallelArgs *args; + T_WorkerStatus workerStatus; + int status; + int pipeRead; + int pipeWrite; + int pipeRevRead; + int pipeRevWrite; + #ifdef WIN32 + uintptr_t hThread; + unsigned int threadId; + #else + pid_t pid; + #endif + } ParallelSlot; + + #define NO_SLOT (-1) + + typedef struct ParallelState + { + int numWorkers; + ParallelSlot *parallelSlot; + } ParallelState; + + extern int GetIdleWorker(ParallelState *pstate); + extern bool IsEveryWorkerIdle(ParallelState *pstate); + extern void ListenToWorkers(struct _archiveHandle *AH, ParallelState *pstate, bool do_wait); + extern int ReapWorkerStatus(ParallelState *pstate, int *status); + extern void EnsureIdleWorker(struct _archiveHandle *AH, ParallelState *pstate); + extern void EnsureWorkersFinished(struct _archiveHandle *AH, ParallelState *pstate); + + extern ParallelState *ParallelBackupStart(struct _archiveHandle *AH, + RestoreOptions *ropt); + extern void DispatchJobForTocEntry(struct _archiveHandle *AH, + ParallelState *pstate, + struct _tocEntry *te, T_Action act); + extern void ParallelBackupEnd(struct _archiveHandle *AH, ParallelState *pstate); + + extern void checkAborting(struct _archiveHandle *AH); + diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h new file mode 100644 index 473670d..4990eb1 *** a/src/bin/pg_dump/pg_backup.h --- b/src/bin/pg_dump/pg_backup.h *************** struct Archive *** 82,90 **** --- 82,94 ---- int minRemoteVersion; /* allowable range */ int maxRemoteVersion; + int numWorkers; /* number of parallel processes */ + char *sync_snapshot_id; /* sync snapshot id for parallel operation */ + /* info needed for string escaping */ int encoding; /* libpq code for client_encoding */ bool std_strings; /* standard_conforming_strings */ + char *use_role; /* Issue SET ROLE to this */ /* error handling */ bool exit_on_error; /* whether to exit on SQL errors... */ *************** typedef struct _restoreOptions *** 142,148 **** int suppressDumpWarnings; /* Suppress output of WARNING entries * to stderr */ bool single_txn; - int number_of_jobs; bool *idWanted; /* array showing which dump IDs to emit */ } RestoreOptions; --- 146,151 ---- *************** extern void PrintTOCSummary(Archive *AH, *** 196,201 **** --- 199,207 ---- extern RestoreOptions *NewRestoreOptions(void); + /* We have one in pg_dump.c and another one in pg_restore.c */ + extern void _SetupWorker(Archive *AHX, RestoreOptions *ropt); + /* Rearrange and filter TOC entries */ extern void SortTocFromFile(Archive *AHX, RestoreOptions *ropt); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c new file mode 100644 index d500bfd..562faad *** a/src/bin/pg_dump/pg_backup_archiver.c --- b/src/bin/pg_dump/pg_backup_archiver.c *************** *** 22,29 **** --- 22,31 ---- #include "pg_backup_db.h" #include "dumputils.h" + #include "parallel.h" #include + #include #include #include #include *************** *** 35,106 **** #include "libpq/libpq-fs.h" - /* - * Special exit values from worker children. We reserve 0 for normal - * success; 1 and other small values should be interpreted as crashes. - */ - #define WORKER_CREATE_DONE 10 - #define WORKER_INHIBIT_DATA 11 - #define WORKER_IGNORED_ERRORS 12 - - /* - * Unix uses exit to return result from worker child, so function is void. - * Windows thread result comes via function return. - */ - #ifndef WIN32 - #define parallel_restore_result void - #else - #define parallel_restore_result DWORD - #endif - - /* IDs for worker children are either PIDs or thread handles */ - #ifndef WIN32 - #define thandle pid_t - #else - #define thandle HANDLE - #endif - - typedef struct ParallelStateEntry - { - #ifdef WIN32 - unsigned int threadId; - #else - pid_t pid; - #endif - ArchiveHandle *AH; - } ParallelStateEntry; - - typedef struct ParallelState - { - int numWorkers; - ParallelStateEntry *pse; - } ParallelState; - - /* Arguments needed for a worker child */ - typedef struct _restore_args - { - ArchiveHandle *AH; - TocEntry *te; - ParallelStateEntry *pse; - } RestoreArgs; - - /* State for each parallel activity slot */ - typedef struct _parallel_slot - { - thandle child_id; - RestoreArgs *args; - } ParallelSlot; - - typedef struct ShutdownInformation - { - ParallelState *pstate; - Archive *AHX; - } ShutdownInformation; - - static ShutdownInformation shutdown_info; - - #define NO_SLOT (-1) - #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" --- 37,42 ---- *************** static bool _tocEntryIsACL(TocEntry *te) *** 136,142 **** static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void buildTocEntryArrays(ArchiveHandle *AH); - static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id); static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te); static int _discoverArchiveFormat(ArchiveHandle *AH); --- 72,77 ---- *************** static void RestoreOutput(ArchiveHandle *** 149,169 **** static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool is_parallel); ! static void restore_toc_entries_parallel(ArchiveHandle *AH); ! static thandle spawn_restore(RestoreArgs *args); ! static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status); ! static bool work_in_progress(ParallelSlot *slots, int n_slots); ! static int get_next_slot(ParallelSlot *slots, int n_slots); static void par_list_header_init(TocEntry *l); static void par_list_append(TocEntry *l, TocEntry *te); static void par_list_remove(TocEntry *te); static TocEntry *get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ! ParallelSlot *slots, int n_slots); ! static parallel_restore_result parallel_restore(RestoreArgs *args); static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, ! thandle worker, int status, ! ParallelSlot *slots, int n_slots); static void fix_dependencies(ArchiveHandle *AH); static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); --- 84,102 ---- static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool is_parallel); ! static void restore_toc_entries_prefork(ArchiveHandle *AH); ! static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, ! TocEntry *pending_list); ! static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); static void par_list_header_init(TocEntry *l); static void par_list_append(TocEntry *l, TocEntry *te); static void par_list_remove(TocEntry *te); static TocEntry *get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ! ParallelState *pstate); static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, ! int worker, int status, ! ParallelState *pstate); static void fix_dependencies(ArchiveHandle *AH); static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); *************** static void reduce_dependencies(ArchiveH *** 172,185 **** TocEntry *ready_list); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); - static ArchiveHandle *CloneArchive(ArchiveHandle *AH); - static void DeCloneArchive(ArchiveHandle *AH); - - static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH); - static void unsetProcessIdentifier(ParallelStateEntry *pse); - static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate); - static void archive_close_connection(int code, void *arg); - /* * Wrapper functions. --- 105,110 ---- *************** RestoreArchive(Archive *AHX) *** 311,317 **** /* * If we're going to do parallel restore, there are some restrictions. */ ! parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB); if (parallel_mode) { /* We haven't got round to making this work for all archive formats */ --- 236,242 ---- /* * If we're going to do parallel restore, there are some restrictions. */ ! parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB); if (parallel_mode) { /* We haven't got round to making this work for all archive formats */ *************** RestoreArchive(Archive *AHX) *** 499,505 **** * In parallel mode, turn control over to the parallel-restore logic. */ if (parallel_mode) ! restore_toc_entries_parallel(AH); else { for (te = AH->toc->next; te != AH->toc; te = te->next) --- 424,448 ---- * In parallel mode, turn control over to the parallel-restore logic. */ if (parallel_mode) ! { ! ParallelState *pstate; ! TocEntry pending_list; ! ! par_list_header_init(&pending_list); ! ! /* This runs PRE_DATA items and then disconnects from the database */ ! restore_toc_entries_prefork(AH); ! Assert(AH->connection == NULL); ! ! /* ParallelBackupStart() will actually fork the processes */ ! pstate = ParallelBackupStart(AH, ropt); ! restore_toc_entries_parallel(AH, pstate, &pending_list); ! ParallelBackupEnd(AH, pstate); ! ! /* reconnect the master and see if we missed something */ ! restore_toc_entries_postfork(AH, &pending_list); ! Assert(AH->connection != NULL); ! } else { for (te = AH->toc->next; te != AH->toc; te = te->next) *************** static int *** 558,564 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool is_parallel) { ! int retval = 0; teReqs reqs; bool defnDumped; --- 501,507 ---- restore_toc_entry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool is_parallel) { ! int status = WORKER_OK; teReqs reqs; bool defnDumped; *************** restore_toc_entry(ArchiveHandle *AH, Toc *** 611,617 **** if (ropt->noDataForFailedTables) { if (is_parallel) ! retval = WORKER_INHIBIT_DATA; else inhibit_data_for_failed_table(AH, te); } --- 554,560 ---- if (ropt->noDataForFailedTables) { if (is_parallel) ! status = WORKER_INHIBIT_DATA; else inhibit_data_for_failed_table(AH, te); } *************** restore_toc_entry(ArchiveHandle *AH, Toc *** 626,632 **** * just set the return value. */ if (is_parallel) ! retval = WORKER_CREATE_DONE; else mark_create_done(AH, te); } --- 569,575 ---- * just set the return value. */ if (is_parallel) ! status = WORKER_CREATE_DONE; else mark_create_done(AH, te); } *************** restore_toc_entry(ArchiveHandle *AH, Toc *** 744,750 **** } } ! return retval; } /* --- 687,696 ---- } } ! if (AH->public.n_errors > 0 && status == WORKER_OK) ! status = WORKER_IGNORED_ERRORS; ! ! return status; } /* *************** buildTocEntryArrays(ArchiveHandle *AH) *** 1634,1640 **** } } ! static TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id) { /* build index arrays if we didn't already */ --- 1580,1586 ---- } } ! TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id) { /* build index arrays if we didn't already */ *************** _allocAH(const char *FileSpec, const Arc *** 2132,2181 **** return AH; } - void ! WriteDataChunks(ArchiveHandle *AH) { TocEntry *te; - StartDataPtr startPtr; - EndDataPtr endPtr; for (te = AH->toc->next; te != AH->toc; te = te->next) { ! if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0) ! { ! AH->currToc = te; ! /* printf("Writing data for %d (%x)\n", te->id, te); */ ! ! if (strcmp(te->desc, "BLOBS") == 0) ! { ! startPtr = AH->StartBlobsPtr; ! endPtr = AH->EndBlobsPtr; ! } ! else ! { ! startPtr = AH->StartDataPtr; ! endPtr = AH->EndDataPtr; ! } ! if (startPtr != NULL) ! (*startPtr) (AH, te); /* ! * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg); */ ! /* ! * The user-provided DataDumper routine needs to call ! * AH->WriteData ! */ ! (*te->dataDumper) ((Archive *) AH, te->dataDumperArg); ! if (endPtr != NULL) ! (*endPtr) (AH, te); ! AH->currToc = NULL; ! } } } void --- 2078,2144 ---- return AH; } void ! WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { TocEntry *te; for (te = AH->toc->next; te != AH->toc; te = te->next) { ! if (!te->dataDumper) ! continue; ! if ((te->reqs & REQ_DATA) == 0) ! continue; + if (pstate && pstate->numWorkers > 1) + { /* ! * If we are in a parallel backup, then we are always the master ! * process. */ + EnsureIdleWorker(AH, pstate); + Assert(GetIdleWorker(pstate) != NO_SLOT); + DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP); + } + else + WriteDataChunksForTocEntry(AH, te); + } + EnsureWorkersFinished(AH, pstate); + } ! void ! WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te) ! { ! StartDataPtr startPtr; ! EndDataPtr endPtr; ! AH->currToc = te; ! ! if (strcmp(te->desc, "BLOBS") == 0) ! { ! startPtr = AH->StartBlobsPtr; ! endPtr = AH->EndBlobsPtr; } + else + { + startPtr = AH->StartDataPtr; + endPtr = AH->EndDataPtr; + } + + if (startPtr != NULL) + (*startPtr) (AH, te); + + /* + * The user-provided DataDumper routine needs to call + * AH->WriteData + */ + (*te->dataDumper) ((Archive *) AH, te->dataDumperArg); + + if (endPtr != NULL) + (*endPtr) (AH, te); + + AH->currToc = NULL; } void *************** dumpTimestamp(ArchiveHandle *AH, const c *** 3401,3467 **** ahprintf(AH, "-- %s %s\n\n", msg, buf); } - static void - setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH) - { - #ifdef WIN32 - pse->threadId = GetCurrentThreadId(); - #else - pse->pid = getpid(); - #endif - pse->AH = AH; - } - - static void - unsetProcessIdentifier(ParallelStateEntry *pse) - { - #ifdef WIN32 - pse->threadId = 0; - #else - pse->pid = 0; - #endif - pse->AH = NULL; - } - - static ParallelStateEntry * - GetMyPSEntry(ParallelState *pstate) - { - int i; - - for (i = 0; i < pstate->numWorkers; i++) - #ifdef WIN32 - if (pstate->pse[i].threadId == GetCurrentThreadId()) - #else - if (pstate->pse[i].pid == getpid()) - #endif - return &(pstate->pse[i]); - - return NULL; - } - - static void - archive_close_connection(int code, void *arg) - { - ShutdownInformation *si = (ShutdownInformation *) arg; - - if (si->pstate) - { - ParallelStateEntry *entry = GetMyPSEntry(si->pstate); - - if (entry != NULL && entry->AH) - DisconnectDatabase(&(entry->AH->public)); - } - else if (si->AHX) - DisconnectDatabase(si->AHX); - } - - void - on_exit_close_archive(Archive *AHX) - { - shutdown_info.AHX = AHX; - on_exit_nicely(archive_close_connection, &shutdown_info); - } - /* * Main engine for parallel restore. * --- 3364,3369 ---- *************** on_exit_close_archive(Archive *AHX) *** 3474,3503 **** * RestoreArchive). */ static void ! restore_toc_entries_parallel(ArchiveHandle *AH) ! { ! RestoreOptions *ropt = AH->ropt; ! int n_slots = ropt->number_of_jobs; ! ParallelSlot *slots; ! int work_status; ! int next_slot; ! bool skipped_some; ! TocEntry pending_list; ! TocEntry ready_list; ! TocEntry *next_work_item; ! thandle ret_child; ! TocEntry *te; ! ParallelState *pstate; ! int i; ! ! ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); ! ! slots = (ParallelSlot *) pg_malloc0(n_slots * sizeof(ParallelSlot)); ! pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); ! pstate->pse = (ParallelStateEntry *) pg_malloc0(n_slots * sizeof(ParallelStateEntry)); ! pstate->numWorkers = ropt->number_of_jobs; ! for (i = 0; i < pstate->numWorkers; i++) ! unsetProcessIdentifier(&(pstate->pse[i])); /* Adjust dependency information */ fix_dependencies(AH); --- 3376,3388 ---- * RestoreArchive). */ static void ! restore_toc_entries_prefork(ArchiveHandle *AH) ! { ! RestoreOptions *ropt = AH->ropt; ! bool skipped_some; ! TocEntry *next_work_item; ! ! ahlog(AH, 2, "entering restore_toc_entries_prefork\n"); /* Adjust dependency information */ fix_dependencies(AH); *************** restore_toc_entries_parallel(ArchiveHand *** 3558,3569 **** */ DisconnectDatabase(&AH->public); - /* - * Set the pstate in the shutdown_info. The exit handler uses pstate if - * set and falls back to AHX otherwise. - */ - shutdown_info.pstate = pstate; - /* blow away any transient state from the old connection */ if (AH->currUser) free(AH->currUser); --- 3443,3448 ---- *************** restore_toc_entries_parallel(ArchiveHand *** 3575,3591 **** free(AH->currTablespace); AH->currTablespace = NULL; AH->currWithOids = -1; /* ! * Initialize the lists of pending and ready items. After this setup, the ! * pending list is everything that needs to be done but is blocked by one ! * or more dependencies, while the ready list contains items that have no ! * remaining dependencies. Note: we don't yet filter out entries that ! * aren't going to be restored. They might participate in dependency ! * chains connecting entries that should be restored, so we treat them as ! * live until we actually process them. */ - par_list_header_init(&pending_list); par_list_header_init(&ready_list); skipped_some = false; for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) --- 3454,3495 ---- free(AH->currTablespace); AH->currTablespace = NULL; AH->currWithOids = -1; + } + + /* + * Main engine for parallel restore. + * + * Work is done in three phases. + * First we process all SECTION_PRE_DATA tocEntries, in a single connection, + * just as for a standard restore. This is done in restore_toc_entries_prefork(). + * Second we process the remaining non-ACL steps in parallel worker children + * (threads on Windows, processes on Unix), these fork off and set up their + * connections before we call restore_toc_entries_parallel_forked. + * Finally we process all the ACL entries in a single connection (that happens + * back in RestoreArchive). + */ + static void + restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, + TocEntry *pending_list) + { + int work_status; + bool skipped_some; + TocEntry ready_list; + TocEntry *next_work_item; + int ret_child; + + ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); /* ! * Initialize the lists of ready items, the list for pending items has ! * already been initialized in the caller. After this setup, the pending ! * list is everything that needs to be done but is blocked by one or more ! * dependencies, while the ready list contains items that have no remaining ! * dependencies. Note: we don't yet filter out entries that aren't going ! * to be restored. They might participate in dependency chains connecting ! * entries that should be restored, so we treat them as live until we ! * actually process them. */ par_list_header_init(&ready_list); skipped_some = false; for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) *************** restore_toc_entries_parallel(ArchiveHand *** 3610,3616 **** } if (next_work_item->depCount > 0) ! par_list_append(&pending_list, next_work_item); else par_list_append(&ready_list, next_work_item); } --- 3514,3520 ---- } if (next_work_item->depCount > 0) ! par_list_append(pending_list, next_work_item); else par_list_append(&ready_list, next_work_item); } *************** restore_toc_entries_parallel(ArchiveHand *** 3624,3632 **** ahlog(AH, 1, "entering main parallel loop\n"); ! while ((next_work_item = get_next_work_item(AH, &ready_list, ! slots, n_slots)) != NULL || ! work_in_progress(slots, n_slots)) { if (next_work_item != NULL) { --- 3528,3535 ---- ahlog(AH, 1, "entering main parallel loop\n"); ! while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL || ! !IsEveryWorkerIdle(pstate)) { if (next_work_item != NULL) { *************** restore_toc_entries_parallel(ArchiveHand *** 3644,3705 **** continue; } ! if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT) ! { ! /* There is work still to do and a worker slot available */ ! thandle child; ! RestoreArgs *args; ! ! ahlog(AH, 1, "launching item %d %s %s\n", ! next_work_item->dumpId, ! next_work_item->desc, next_work_item->tag); ! par_list_remove(next_work_item); ! /* this memory is dealloced in mark_work_done() */ ! args = pg_malloc(sizeof(RestoreArgs)); ! args->AH = CloneArchive(AH); ! args->te = next_work_item; ! args->pse = &pstate->pse[next_slot]; ! /* run the step in a worker child */ ! child = spawn_restore(args); ! slots[next_slot].child_id = child; ! slots[next_slot].args = args; ! continue; } - } ! /* ! * If we get here there must be work being done. Either there is no ! * work available to schedule (and work_in_progress returned true) or ! * there are no slots available. So we wait for a worker to finish, ! * and process the result. ! */ ! ret_child = reap_child(slots, n_slots, &work_status); ! if (WIFEXITED(work_status)) ! { ! mark_work_done(AH, &ready_list, ! ret_child, WEXITSTATUS(work_status), ! slots, n_slots); ! } ! else ! { ! exit_horribly(modulename, "worker process crashed: status %d\n", ! work_status); } } ahlog(AH, 1, "finished main parallel loop\n"); ! /* ! * Remove the pstate again, so the exit handler will now fall back to ! * closing AH->connection again. ! */ ! shutdown_info.pstate = NULL; /* * Now reconnect the single parent connection. --- 3547,3617 ---- continue; } ! ahlog(AH, 1, "launching item %d %s %s\n", ! next_work_item->dumpId, ! next_work_item->desc, next_work_item->tag); ! par_list_remove(next_work_item); ! Assert(GetIdleWorker(pstate) != NO_SLOT); ! DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE); ! } ! else ! /* at least one child is working and we have nothing ready. */ ! Assert(!IsEveryWorkerIdle(pstate)); ! for (;;) ! { ! int nTerm = 0; ! /* ! * In order to reduce dependencies as soon as possible and ! * especially to reap the status of workers who are working on ! * items that pending items depend on, we do a non-blocking check ! * for ended workers first. ! * ! * However, if we do not have any other work items currently that ! * workers can work on, we do not busy-loop here but instead ! * really wait for at least one worker to terminate. Hence we call ! * ListenToWorkers(..., ..., do_wait = true) in this case. ! */ ! ListenToWorkers(AH, pstate, !next_work_item); ! while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) ! { ! nTerm++; ! mark_work_done(AH, &ready_list, ret_child, work_status, pstate); } ! /* ! * We need to make sure that we have an idle worker before re-running the ! * loop. If nTerm > 0 we already have that (quick check). ! */ ! if (nTerm > 0) ! break; ! /* if nobody terminated, explicitly check for an idle worker */ ! if (GetIdleWorker(pstate) != NO_SLOT) ! break; ! ! /* ! * If we have no idle worker, read the result of one or more ! * workers and loop the loop to call ReapWorkerStatus() on them. ! */ ! ListenToWorkers(AH, pstate, true); } } ahlog(AH, 1, "finished main parallel loop\n"); + } ! static void ! restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) ! { ! RestoreOptions *ropt = AH->ropt; ! TocEntry *te; ! ! ahlog(AH, 2, "entering restore_toc_entries_postfork\n"); /* * Now reconnect the single parent connection. *************** restore_toc_entries_parallel(ArchiveHand *** 3715,3721 **** * dependencies, or some other pathological condition. If so, do it in the * single parent connection. */ ! for (te = pending_list.par_next; te != &pending_list; te = te->par_next) { ahlog(AH, 1, "processing missed item %d %s %s\n", te->dumpId, te->desc, te->tag); --- 3627,3633 ---- * dependencies, or some other pathological condition. If so, do it in the * single parent connection. */ ! for (te = pending_list->par_next; te != pending_list; te = te->par_next) { ahlog(AH, 1, "processing missed item %d %s %s\n", te->dumpId, te->desc, te->tag); *************** restore_toc_entries_parallel(ArchiveHand *** 3726,3846 **** } /* - * create a worker child to perform a restore step in parallel - */ - static thandle - spawn_restore(RestoreArgs *args) - { - thandle child; - - /* Ensure stdio state is quiesced before forking */ - fflush(NULL); - - #ifndef WIN32 - child = fork(); - if (child == 0) - { - /* in child process */ - parallel_restore(args); - exit_horribly(modulename, - "parallel_restore should not return\n"); - } - else if (child < 0) - { - /* fork failed */ - exit_horribly(modulename, - "could not create worker process: %s\n", - strerror(errno)); - } - #else - child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore, - args, 0, NULL); - if (child == 0) - exit_horribly(modulename, - "could not create worker thread: %s\n", - strerror(errno)); - #endif - - return child; - } - - /* - * collect status from a completed worker child - */ - static thandle - reap_child(ParallelSlot *slots, int n_slots, int *work_status) - { - #ifndef WIN32 - /* Unix is so much easier ... */ - return wait(work_status); - #else - static HANDLE *handles = NULL; - int hindex, - snum, - tnum; - thandle ret_child; - DWORD res; - - /* first time around only, make space for handles to listen on */ - if (handles == NULL) - handles = (HANDLE *) pg_malloc0(n_slots * sizeof(HANDLE)); - - /* set up list of handles to listen to */ - for (snum = 0, tnum = 0; snum < n_slots; snum++) - if (slots[snum].child_id != 0) - handles[tnum++] = slots[snum].child_id; - - /* wait for one to finish */ - hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE); - - /* get handle of finished thread */ - ret_child = handles[hindex - WAIT_OBJECT_0]; - - /* get the result */ - GetExitCodeThread(ret_child, &res); - *work_status = res; - - /* dispose of handle to stop leaks */ - CloseHandle(ret_child); - - return ret_child; - #endif - } - - /* - * are we doing anything now? - */ - static bool - work_in_progress(ParallelSlot *slots, int n_slots) - { - int i; - - for (i = 0; i < n_slots; i++) - { - if (slots[i].child_id != 0) - return true; - } - return false; - } - - /* - * find the first free parallel slot (if any). - */ - static int - get_next_slot(ParallelSlot *slots, int n_slots) - { - int i; - - for (i = 0; i < n_slots; i++) - { - if (slots[i].child_id == 0) - return i; - } - return NO_SLOT; - } - - - /* * Check if te1 has an exclusive lock requirement for an item that te2 also * requires, whether or not te2's requirement is for an exclusive lock. */ --- 3638,3643 ---- *************** par_list_remove(TocEntry *te) *** 3913,3919 **** */ static TocEntry * get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ! ParallelSlot *slots, int n_slots) { bool pref_non_data = false; /* or get from AH->ropt */ TocEntry *data_te = NULL; --- 3710,3716 ---- */ static TocEntry * get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ! ParallelState *pstate) { bool pref_non_data = false; /* or get from AH->ropt */ TocEntry *data_te = NULL; *************** get_next_work_item(ArchiveHandle *AH, To *** 3928,3938 **** { int count = 0; ! for (k = 0; k < n_slots; k++) ! if (slots[k].args->te != NULL && ! slots[k].args->te->section == SECTION_DATA) count++; ! if (n_slots == 0 || count * 4 < n_slots) pref_non_data = false; } --- 3725,3735 ---- { int count = 0; ! for (k = 0; k < pstate->numWorkers; k++) ! if (pstate->parallelSlot[k].args->te != NULL && ! pstate->parallelSlot[k].args->te->section == SECTION_DATA) count++; ! if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers) pref_non_data = false; } *************** get_next_work_item(ArchiveHandle *AH, To *** 3948,3960 **** * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ ! for (i = 0; i < n_slots && !conflicts; i++) { TocEntry *running_te; ! if (slots[i].args == NULL) continue; ! running_te = slots[i].args->te; if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) --- 3745,3757 ---- * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ ! for (i = 0; i < pstate->numWorkers && !conflicts; i++) { TocEntry *running_te; ! if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING) continue; ! running_te = pstate->parallelSlot[i].args->te; if (has_lock_conflicts(te, running_te) || has_lock_conflicts(running_te, te)) *************** get_next_work_item(ArchiveHandle *AH, To *** 3989,4051 **** /* * Restore a single TOC item in parallel with others * ! * this is the procedure run as a thread (Windows) or a ! * separate process (everything else). */ ! static parallel_restore_result ! parallel_restore(RestoreArgs *args) { ArchiveHandle *AH = args->AH; TocEntry *te = args->te; RestoreOptions *ropt = AH->ropt; ! int retval; ! ! setProcessIdentifier(args->pse, AH); ! ! /* ! * Close and reopen the input file so we have a private file pointer that ! * doesn't stomp on anyone else's file pointer, if we're actually going to ! * need to read from the file. Otherwise, just close it except on Windows, ! * where it will possibly be needed by other threads. ! * ! * Note: on Windows, since we are using threads not processes, the reopen ! * call *doesn't* close the original file pointer but just open a new one. ! */ ! if (te->section == SECTION_DATA) ! (AH->ReopenPtr) (AH); ! #ifndef WIN32 ! else ! (AH->ClosePtr) (AH); ! #endif ! ! /* ! * We need our own database connection, too ! */ ! ConnectDatabase((Archive *) AH, ropt->dbname, ! ropt->pghost, ropt->pgport, ropt->username, ! ropt->promptPassword); _doSetFixedOutputState(AH); ! /* Restore the TOC item */ ! retval = restore_toc_entry(AH, te, ropt, true); ! ! /* And clean up */ ! DisconnectDatabase((Archive *) AH); ! unsetProcessIdentifier(args->pse); ! /* If we reopened the file, we are done with it, so close it now */ ! if (te->section == SECTION_DATA) ! (AH->ClosePtr) (AH); ! if (retval == 0 && AH->public.n_errors) ! retval = WORKER_IGNORED_ERRORS; ! #ifndef WIN32 ! exit(retval); ! #else ! return retval; ! #endif } --- 3786,3814 ---- /* * Restore a single TOC item in parallel with others * ! * this is run in the worker, i.e. in a thread (Windows) or a separate process ! * (everything else). A worker process executes several such work items during ! * a parallel backup or restore. Once we terminate here and report back that ! * our work is finished, the master process will assign us a new work item. */ ! int ! parallel_restore(ParallelArgs *args) { ArchiveHandle *AH = args->AH; TocEntry *te = args->te; RestoreOptions *ropt = AH->ropt; ! int status; _doSetFixedOutputState(AH); ! Assert(AH->connection != NULL); ! AH->public.n_errors = 0; ! /* Restore the TOC item */ ! status = restore_toc_entry(AH, te, ropt, true); ! return status; } *************** parallel_restore(RestoreArgs *args) *** 4057,4081 **** */ static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, ! thandle worker, int status, ! ParallelSlot *slots, int n_slots) { TocEntry *te = NULL; - int i; ! for (i = 0; i < n_slots; i++) ! { ! if (slots[i].child_id == worker) ! { ! slots[i].child_id = 0; ! te = slots[i].args->te; ! DeCloneArchive(slots[i].args->AH); ! free(slots[i].args); ! slots[i].args = NULL; ! ! break; ! } ! } if (te == NULL) exit_horribly(modulename, "could not find slot of finished worker\n"); --- 3820,3831 ---- */ static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list, ! int worker, int status, ! ParallelState *pstate) { TocEntry *te = NULL; ! te = pstate->parallelSlot[worker].args->te; if (te == NULL) exit_horribly(modulename, "could not find slot of finished worker\n"); *************** inhibit_data_for_failed_table(ArchiveHan *** 4374,4389 **** } } - /* * Clone and de-clone routines used in parallel restoration. * * Enough of the structure is cloned to ensure that there is no * conflict between different threads each with their own clone. - * - * These could be public, but no need at present. */ ! static ArchiveHandle * CloneArchive(ArchiveHandle *AH) { ArchiveHandle *clone; --- 4124,4136 ---- } } /* * Clone and de-clone routines used in parallel restoration. * * Enough of the structure is cloned to ensure that there is no * conflict between different threads each with their own clone. */ ! ArchiveHandle * CloneArchive(ArchiveHandle *AH) { ArchiveHandle *clone; *************** CloneArchive(ArchiveHandle *AH) *** 4409,4417 **** --- 4156,4214 ---- /* clone has its own error count, too */ clone->public.n_errors = 0; + /* + * Connect our new clone object to the database: + * In parallel restore the parent is already disconnected, because we can + * connect the worker processes independently to the database (no snapshot + * sync required). + * In parallel backup we clone the parent's existing connection. + */ + if (AH->mode == archModeRead) + { + RestoreOptions *ropt = AH->ropt; + Assert(AH->connection == NULL); + /* this also sets clone->connection */ + ConnectDatabase((Archive *) clone, ropt->dbname, + ropt->pghost, ropt->pgport, ropt->username, + ropt->promptPassword); + } + else + { + char *dbname; + char *pghost; + char *pgport; + char *username; + const char *encname; + + Assert(AH->connection != NULL); + + /* + * Even though we are technically accessing the parent's database object + * here, these functions are fine to be called like that because all just + * return a pointer and do not actually send/receive any data to/from the + * database. + */ + dbname = PQdb(AH->connection); + pghost = PQhost(AH->connection); + pgport = PQport(AH->connection); + username = PQuser(AH->connection); + encname = pg_encoding_to_char(AH->public.encoding); + + /* this also sets clone->connection */ + ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO); + + /* + * Set the same encoding, whatever we set here is what we got from + * pg_encoding_to_char(), so we really shouldn't run into an error setting that + * very same value. Also see the comment in SetupConnection(). + */ + PQsetClientEncoding(clone->connection, encname); + } + /* Let the format-specific code have a chance too */ (clone->ClonePtr) (clone); + Assert(clone->connection != NULL); return clone; } *************** CloneArchive(ArchiveHandle *AH) *** 4420,4426 **** * * Note: we assume any clone-local connection was already closed. */ ! static void DeCloneArchive(ArchiveHandle *AH) { /* Clear format-specific state */ --- 4217,4223 ---- * * Note: we assume any clone-local connection was already closed. */ ! void DeCloneArchive(ArchiveHandle *AH) { /* Clear format-specific state */ diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h new file mode 100644 index 8859bd9..8883a19 *** a/src/bin/pg_dump/pg_backup_archiver.h --- b/src/bin/pg_dump/pg_backup_archiver.h *************** typedef z_stream *z_streamp; *** 100,107 **** --- 100,120 ---- #define K_OFFSET_POS_SET 2 #define K_OFFSET_NO_DATA 3 + /* + * Special exit values from worker children. We reserve 0 for normal + * success; 1 and other small values should be interpreted as crashes. + */ + #define WORKER_OK 0 + #define WORKER_CREATE_DONE 10 + #define WORKER_INHIBIT_DATA 11 + #define WORKER_IGNORED_ERRORS 12 + struct _archiveHandle; struct _tocEntry; + struct _restoreList; + struct ParallelArgs; + struct ParallelState; + enum T_Action; typedef void (*ClosePtr) (struct _archiveHandle * AH); typedef void (*ReopenPtr) (struct _archiveHandle * AH); *************** typedef void (*PrintTocDataPtr) (struct *** 129,134 **** --- 142,154 ---- typedef void (*ClonePtr) (struct _archiveHandle * AH); typedef void (*DeClonePtr) (struct _archiveHandle * AH); + typedef char *(*WorkerJobRestorePtr)(struct _archiveHandle * AH, struct _tocEntry * te); + typedef char *(*WorkerJobDumpPtr)(struct _archiveHandle * AH, struct _tocEntry * te); + typedef char *(*MasterStartParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te, + enum T_Action act); + typedef int (*MasterEndParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te, + const char *str, enum T_Action act); + typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len); typedef enum *************** typedef struct _archiveHandle *** 227,232 **** --- 247,258 ---- StartBlobPtr StartBlobPtr; EndBlobPtr EndBlobPtr; + MasterStartParallelItemPtr MasterStartParallelItemPtr; + MasterEndParallelItemPtr MasterEndParallelItemPtr; + + WorkerJobDumpPtr WorkerJobDumpPtr; + WorkerJobRestorePtr WorkerJobRestorePtr; + ClonePtr ClonePtr; /* Clone format-specific fields */ DeClonePtr DeClonePtr; /* Clean up cloned fields */ *************** typedef struct _archiveHandle *** 236,241 **** --- 262,268 ---- char *archdbname; /* DB name *read* from archive */ enum trivalue promptPassword; char *savedPassword; /* password for ropt->username, if known */ + char *use_role; PGconn *connection; int connectToDB; /* Flag to indicate if direct DB connection is * required */ *************** typedef struct _tocEntry *** 327,332 **** --- 354,360 ---- int nLockDeps; /* number of such dependencies */ } TocEntry; + extern int parallel_restore(struct ParallelArgs *args); extern void on_exit_close_archive(Archive *AHX); extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4))); *************** extern void WriteHead(ArchiveHandle *AH) *** 337,345 **** extern void ReadHead(ArchiveHandle *AH); extern void WriteToc(ArchiveHandle *AH); extern void ReadToc(ArchiveHandle *AH); ! extern void WriteDataChunks(ArchiveHandle *AH); extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id); extern bool checkSeek(FILE *fp); #define appendStringLiteralAHX(buf,str,AH) \ --- 365,377 ---- extern void ReadHead(ArchiveHandle *AH); extern void WriteToc(ArchiveHandle *AH); extern void ReadToc(ArchiveHandle *AH); ! extern void WriteDataChunks(ArchiveHandle *AH, struct ParallelState *pstate); ! extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te); ! extern ArchiveHandle *CloneArchive(ArchiveHandle *AH); ! extern void DeCloneArchive(ArchiveHandle *AH); extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id); + TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id); extern bool checkSeek(FILE *fp); #define appendStringLiteralAHX(buf,str,AH) \ diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c new file mode 100644 index 7081598..ccfe202 *** a/src/bin/pg_dump/pg_backup_custom.c --- b/src/bin/pg_dump/pg_backup_custom.c *************** *** 26,31 **** --- 26,32 ---- #include "compress_io.h" #include "dumputils.h" + #include "parallel.h" /*-------- * Routines in the format interface *************** static void _LoadBlobs(ArchiveHandle *AH *** 59,64 **** --- 60,69 ---- static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); + static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act); + static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act); + char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); + typedef struct { CompressorState *cs; *************** InitArchiveFmt_Custom(ArchiveHandle *AH) *** 127,132 **** --- 132,144 ---- AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; + AH->MasterStartParallelItemPtr = _MasterStartParallelItem; + AH->MasterEndParallelItemPtr = _MasterEndParallelItem; + + /* no parallel dump in the custom archive, only parallel restore */ + AH->WorkerJobDumpPtr = NULL; + AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; + /* Set up a private area. */ ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); AH->formatData = (void *) ctx; *************** _CloseArchive(ArchiveHandle *AH) *** 698,704 **** tpos = ftello(AH->FH); WriteToc(AH); ctx->dataStart = _getFilePos(AH, ctx); ! WriteDataChunks(AH); /* * If possible, re-write the TOC in order to update the data offset --- 710,716 ---- tpos = ftello(AH->FH); WriteToc(AH); ctx->dataStart = _getFilePos(AH, ctx); ! WriteDataChunks(AH, NULL); /* * If possible, re-write the TOC in order to update the data offset *************** _DeClone(ArchiveHandle *AH) *** 796,801 **** --- 808,885 ---- free(ctx); } + /* + * This function is executed in the child of a parallel backup for the + * custom format archive and dumps the actual data. + */ + char * + _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) + { + /* short fixed-size string + some ID so far, this needs to be malloc'ed + * instead of static because we work with threads on windows */ + const int buflen = 64; + char *buf = (char*) pg_malloc(buflen); + ParallelArgs pargs; + int status; + + pargs.AH = AH; + pargs.te = te; + + status = parallel_restore(&pargs); + + snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); + + return buf; + } + + /* + * This function is executed in the parent process. Depending on the desired + * action (dump or restore) it creates a string that is understood by the + * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the + * respective dump format. + */ + static char * + _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act) + { + /* + * A static char is okay here, even on Windows because we call this + * function only from one process (the master). + */ + static char buf[64]; /* short fixed-size string + number */ + + /* no parallel dump in the custom archive format */ + Assert(act == ACT_RESTORE); + + snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId); + + return buf; + } + + /* + * This function is executed in the parent process. It analyzes the response of + * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the + * respective dump format. + */ + static int + _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act) + { + DumpId dumpId; + int nBytes, status, n_errors; + + /* no parallel dump in the custom archive */ + Assert(act == ACT_RESTORE); + + sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(nBytes == strlen(str)); + Assert(dumpId == te->dumpId); + + AH->public.n_errors += n_errors; + + return status; + } + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c new file mode 100644 index 4c4f24f..544d01a *** a/src/bin/pg_dump/pg_backup_db.c --- b/src/bin/pg_dump/pg_backup_db.c *************** ConnectDatabase(Archive *AHX, *** 309,320 **** PQsetNoticeProcessor(AH->connection, notice_processor, NULL); } void DisconnectDatabase(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; ! PQfinish(AH->connection); /* noop if AH->connection is NULL */ AH->connection = NULL; } --- 309,338 ---- PQsetNoticeProcessor(AH->connection, notice_processor, NULL); } + /* + * Close the connection to the database and also cancel off the query if we + * have one running. + */ void DisconnectDatabase(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; + PGcancel *cancel; + char errbuf[1]; ! if (!AH->connection) ! return; ! ! if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE) ! { ! if ((cancel = PQgetCancel(AH->connection))) ! { ! PQcancel(cancel, errbuf, sizeof(errbuf)); ! PQfreeCancel(cancel); ! } ! } ! ! PQfinish(AH->connection); AH->connection = NULL; } diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c new file mode 100644 index 5b71eba..9be4f8d *** a/src/bin/pg_dump/pg_backup_directory.c --- b/src/bin/pg_dump/pg_backup_directory.c *************** *** 35,40 **** --- 35,41 ---- #include "compress_io.h" #include "dumputils.h" + #include "parallel.h" #include #include *************** typedef struct *** 50,55 **** --- 51,57 ---- cfp *dataFH; /* currently open data file */ cfp *blobsTocFH; /* file handle for blobs.toc */ + ParallelState *pstate; /* for parallel backup / restore */ } lclContext; typedef struct *************** static int _ReadByte(ArchiveHandle *); *** 70,75 **** --- 72,78 ---- static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); + static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); *************** static void _EndBlob(ArchiveHandle *AH, *** 82,89 **** static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt); ! static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename); /* * Init routine required by ALL formats. This is a global routine --- 85,101 ---- static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt); ! static void _Clone(ArchiveHandle *AH); ! static void _DeClone(ArchiveHandle *AH); ! ! static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act); ! static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, ! const char *str, T_Action act); ! static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te); ! static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te); + static char *prependDirectory(ArchiveHandle *AH, char *buf, + const char *relativeFilename); /* * Init routine required by ALL formats. This is a global routine *************** InitArchiveFmt_Directory(ArchiveHandle * *** 110,116 **** AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; ! AH->ReopenPtr = NULL; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; --- 122,128 ---- AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; ! AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; *************** InitArchiveFmt_Directory(ArchiveHandle * *** 121,128 **** AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; ! AH->ClonePtr = NULL; ! AH->DeClonePtr = NULL; /* Set up our private context */ ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); --- 133,146 ---- AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; ! AH->ClonePtr = _Clone; ! AH->DeClonePtr = _DeClone; ! ! AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory; ! AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory; ! ! AH->MasterStartParallelItemPtr = _MasterStartParallelItem; ! AH->MasterEndParallelItemPtr = _MasterEndParallelItem; /* Set up our private context */ ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); *************** InitArchiveFmt_Directory(ArchiveHandle * *** 146,161 **** if (AH->mode == archModeWrite) { ! if (mkdir(ctx->directory, 0700) < 0) exit_horribly(modulename, "could not create directory \"%s\": %s\n", ctx->directory, strerror(errno)); } else { /* Read Mode */ ! char *fname; cfp *tocFH; ! fname = prependDirectory(AH, "toc.dat"); tocFH = cfopen_read(fname, PG_BINARY_R); if (tocFH == NULL) --- 164,200 ---- if (AH->mode == archModeWrite) { ! struct stat st; ! bool is_empty = false; ! ! /* we accept an empty existing directory */ ! if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode)) ! { ! DIR* dir = opendir(ctx->directory); ! if (dir) { ! struct dirent *d; ! is_empty = true; ! while ((d = readdir(dir))) { ! if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0) ! { ! is_empty = false; ! break; ! } ! } ! closedir(dir); ! } ! } ! ! if (!is_empty && mkdir(ctx->directory, 0700) < 0) exit_horribly(modulename, "could not create directory \"%s\": %s\n", ctx->directory, strerror(errno)); } else { /* Read Mode */ ! char fname[MAXPGPATH]; cfp *tocFH; ! prependDirectory(AH, fname, "toc.dat"); tocFH = cfopen_read(fname, PG_BINARY_R); if (tocFH == NULL) *************** _StartData(ArchiveHandle *AH, TocEntry * *** 281,289 **** { lclTocEntry *tctx = (lclTocEntry *) te->formatData; lclContext *ctx = (lclContext *) AH->formatData; ! char *fname; ! fname = prependDirectory(AH, tctx->filename); ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); if (ctx->dataFH == NULL) --- 320,328 ---- { lclTocEntry *tctx = (lclTocEntry *) te->formatData; lclContext *ctx = (lclContext *) AH->formatData; ! char fname[MAXPGPATH]; ! prependDirectory(AH, fname, tctx->filename); ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); if (ctx->dataFH == NULL) *************** _WriteData(ArchiveHandle *AH, const void *** 308,313 **** --- 347,355 ---- if (dLen == 0) return 0; + /* Are we aborting? */ + checkAborting(AH); + return cfwrite(data, dLen, ctx->dataFH); } *************** _PrintTocData(ArchiveHandle *AH, TocEntr *** 375,382 **** _LoadBlobs(AH, ropt); else { ! char *fname = prependDirectory(AH, tctx->filename); _PrintFileData(AH, fname, ropt); } } --- 417,425 ---- _LoadBlobs(AH, ropt); else { ! char fname[MAXPGPATH]; + prependDirectory(AH, fname, tctx->filename); _PrintFileData(AH, fname, ropt); } } *************** _LoadBlobs(ArchiveHandle *AH, RestoreOpt *** 386,397 **** { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; ! char *fname; char line[MAXPGPATH]; StartRestoreBlobs(AH); ! fname = prependDirectory(AH, "blobs.toc"); ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R); --- 429,440 ---- { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; ! char fname[MAXPGPATH]; char line[MAXPGPATH]; StartRestoreBlobs(AH); ! prependDirectory(AH, fname, "blobs.toc"); ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R); *************** _WriteBuf(ArchiveHandle *AH, const void *** 474,479 **** --- 517,525 ---- lclContext *ctx = (lclContext *) AH->formatData; size_t res; + /* Are we aborting? */ + checkAborting(AH); + res = cfwrite(buf, len, ctx->dataFH); if (res != len) exit_horribly(modulename, "could not write to output file: %s\n", *************** _CloseArchive(ArchiveHandle *AH) *** 518,524 **** if (AH->mode == archModeWrite) { cfp *tocFH; ! char *fname = prependDirectory(AH, "toc.dat"); /* The TOC is always created uncompressed */ tocFH = cfopen_write(fname, PG_BINARY_W, 0); --- 564,575 ---- if (AH->mode == archModeWrite) { cfp *tocFH; ! char fname[MAXPGPATH]; ! ! prependDirectory(AH, fname, "toc.dat"); ! ! /* this will actually fork the processes for a parallel backup */ ! ctx->pstate = ParallelBackupStart(AH, NULL); /* The TOC is always created uncompressed */ tocFH = cfopen_write(fname, PG_BINARY_W, 0); *************** _CloseArchive(ArchiveHandle *AH) *** 539,549 **** if (cfclose(tocFH) != 0) exit_horribly(modulename, "could not close TOC file: %s\n", strerror(errno)); ! WriteDataChunks(AH); } AH->FH = NULL; } /* * BLOB support --- 590,613 ---- if (cfclose(tocFH) != 0) exit_horribly(modulename, "could not close TOC file: %s\n", strerror(errno)); ! WriteDataChunks(AH, ctx->pstate); ! ! ParallelBackupEnd(AH, ctx->pstate); } AH->FH = NULL; } + /* + * Reopen the archive's file handle. + */ + static void + _ReopenArchive(ArchiveHandle *AH) + { + /* + * Our TOC is in memory, our data files are opened by each child anyway as + * they are separate. We support reopening the archive by just doing nothing. + */ + } /* * BLOB support *************** static void *** 560,568 **** _StartBlobs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; ! char *fname; ! fname = prependDirectory(AH, "blobs.toc"); /* The blob TOC file is never compressed */ ctx->blobsTocFH = cfopen_write(fname, "ab", 0); --- 624,632 ---- _StartBlobs(ArchiveHandle *AH, TocEntry *te) { lclContext *ctx = (lclContext *) AH->formatData; ! char fname[MAXPGPATH]; ! prependDirectory(AH, fname, "blobs.toc"); /* The blob TOC file is never compressed */ ctx->blobsTocFH = cfopen_write(fname, "ab", 0); *************** _EndBlobs(ArchiveHandle *AH, TocEntry *t *** 627,638 **** ctx->blobsTocFH = NULL; } ! static char * ! prependDirectory(ArchiveHandle *AH, const char *relativeFilename) { lclContext *ctx = (lclContext *) AH->formatData; - static char buf[MAXPGPATH]; char *dname; dname = ctx->directory; --- 691,706 ---- ctx->blobsTocFH = NULL; } ! /* ! * Gets a relative file name and prepends the output directory, writing the ! * result to buf. The caller needs to make sure that buf is MAXPGPATH bytes ! * big. Can't use a static char[MAXPGPATH] inside the function because we run ! * multithreaded on Windows. ! */ static char * ! prependDirectory(ArchiveHandle *AH, char *buf, const char *relativeFilename) { lclContext *ctx = (lclContext *) AH->formatData; char *dname; dname = ctx->directory; *************** prependDirectory(ArchiveHandle *AH, cons *** 646,648 **** --- 714,865 ---- return buf; } + + /* + * Clone format-specific fields during parallel restoration. + */ + static void + _Clone(ArchiveHandle *AH) + { + lclContext *ctx = (lclContext *) AH->formatData; + + AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext)); + memcpy(AH->formatData, ctx, sizeof(lclContext)); + ctx = (lclContext *) AH->formatData; + + /* + * Note: we do not make a local lo_buf because we expect at most one BLOBS + * entry per archive, so no parallelism is possible. Likewise, + * TOC-entry-local state isn't an issue because any one TOC entry is + * touched by just one worker child. + */ + + /* + * We also don't copy the ParallelState pointer (pstate), only the master + * process ever writes to it. + */ + } + + static void + _DeClone(ArchiveHandle *AH) + { + lclContext *ctx = (lclContext *) AH->formatData; + free(ctx); + } + + /* + * This function is executed in the parent process. Depending on the desired + * action (dump or restore) it creates a string that is understood by the + * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the + * respective dump format. + */ + static char * + _MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act) + { + /* + * A static char is okay here, even on Windows because we call this + * function only from one process (the master). + */ + static char buf[64]; + + if (act == ACT_DUMP) + snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId); + else if (act == ACT_RESTORE) + snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId); + + return buf; + } + + /* + * This function is executed in the child of a parallel backup for the + * directory archive and dumps the actual data. + * + * We are currently returning only the DumpId so theoretically we could + * make this function returning an int (or a DumpId). However, to + * facilitate further enhancements and because sooner or later we need to + * convert this to a string and send it via a message anyway, we stick with + * char *. It is parsed on the other side by the _EndMasterParallel() + * function of the respective dump format. + */ + static char * + _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te) + { + /* short fixed-size string + some ID so far, this needs to be malloc'ed + * instead of static because we work with threads on windows */ + const int buflen = 64; + char *buf = (char*) pg_malloc(buflen); + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + + /* This should never happen */ + if (!tctx) + exit_horribly(modulename, "Error during backup\n"); + + /* + * This function returns void. We either fail and die horribly or succeed... + * A failure will be detected by the parent when the child dies unexpectedly. + */ + WriteDataChunksForTocEntry(AH, te); + + snprintf(buf, buflen, "OK DUMP %d", te->dumpId); + + return buf; + } + + /* + * This function is executed in the child of a parallel backup for the + * directory archive and dumps the actual data. + */ + static char * + _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te) + { + /* short fixed-size string + some ID so far, this needs to be malloc'ed + * instead of static because we work with threads on windows */ + const int buflen = 64; + char *buf = (char*) pg_malloc(buflen); + ParallelArgs pargs; + int status; + lclTocEntry *tctx; + + tctx = (lclTocEntry *) te->formatData; + + pargs.AH = AH; + pargs.te = te; + + status = parallel_restore(&pargs); + + snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); + + return buf; + } + /* + * This function is executed in the parent process. It analyzes the response of + * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the + * respective dump format. + */ + static int + _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act) + { + DumpId dumpId; + int nBytes, n_errors; + int status = 0; + + if (act == ACT_DUMP) + { + sscanf(str, "%u%n", &dumpId, &nBytes); + + Assert(dumpId == te->dumpId); + Assert(nBytes == strlen(str)); + } + else if (act == ACT_RESTORE) + { + sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(dumpId == te->dumpId); + Assert(nBytes == strlen(str)); + + AH->public.n_errors += n_errors; + } + + return status; + } diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c new file mode 100644 index 03ae4f8..6465ac3 *** a/src/bin/pg_dump/pg_backup_tar.c --- b/src/bin/pg_dump/pg_backup_tar.c *************** InitArchiveFmt_Tar(ArchiveHandle *AH) *** 158,163 **** --- 158,169 ---- AH->ClonePtr = NULL; AH->DeClonePtr = NULL; + AH->MasterStartParallelItemPtr = NULL; + AH->MasterEndParallelItemPtr = NULL; + + AH->WorkerJobDumpPtr = NULL; + AH->WorkerJobRestorePtr = NULL; + /* * Set up some special context used in compressing data. */ *************** _CloseArchive(ArchiveHandle *AH) *** 828,834 **** /* * Now send the data (tables & blobs) */ ! WriteDataChunks(AH); /* * Now this format wants to append a script which does a full restore --- 834,840 ---- /* * Now send the data (tables & blobs) */ ! WriteDataChunks(AH, NULL); /* * Now this format wants to append a script which does a full restore diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c new file mode 100644 index e6c85ac..2b075aa *** a/src/bin/pg_dump/pg_dump.c --- b/src/bin/pg_dump/pg_dump.c *************** static int disable_dollar_quoting = 0; *** 135,140 **** --- 135,141 ---- static int dump_inserts = 0; static int column_inserts = 0; static int no_security_labels = 0; + static int no_synchronized_snapshots = 0; static int no_unlogged_table_data = 0; static int serializable_deferrable = 0; *************** static Oid findLastBuiltinOid_V70(Archiv *** 243,250 **** static void selectSourceSchema(Archive *fout, const char *schemaName); static char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); - static const char *fmtQualifiedId(Archive *fout, - const char *schema, const char *id); static void getBlobs(Archive *fout); static void dumpBlob(Archive *fout, BlobInfo *binfo); static int dumpBlobs(Archive *fout, void *arg); --- 244,249 ---- *************** static void binary_upgrade_extension_mem *** 262,268 **** DumpableObject *dobj, const char *objlabel); static const char *getAttrName(int attrnum, TableInfo *tblInfo); ! static const char *fmtCopyColumnList(const TableInfo *ti); static PGresult *ExecuteSqlQueryForSingleRow(Archive *fout, char *query); --- 261,268 ---- DumpableObject *dobj, const char *objlabel); static const char *getAttrName(int attrnum, TableInfo *tblInfo); ! static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer); ! static char *get_synchronized_snapshot(Archive *fout); static PGresult *ExecuteSqlQueryForSingleRow(Archive *fout, char *query); *************** main(int argc, char **argv) *** 284,289 **** --- 284,290 ---- int numObjs; DumpableObject *boundaryObjs; int i; + int numWorkers = 1; enum trivalue prompt_password = TRI_DEFAULT; int compressLevel = -1; int plainText = 0; *************** main(int argc, char **argv) *** 314,319 **** --- 315,321 ---- {"format", required_argument, NULL, 'F'}, {"host", required_argument, NULL, 'h'}, {"ignore-version", no_argument, NULL, 'i'}, + {"jobs", 1, NULL, 'j'}, {"no-reconnect", no_argument, NULL, 'R'}, {"oids", no_argument, NULL, 'o'}, {"no-owner", no_argument, NULL, 'O'}, *************** main(int argc, char **argv) *** 353,358 **** --- 355,361 ---- {"serializable-deferrable", no_argument, &serializable_deferrable, 1}, {"use-set-session-authorization", no_argument, &use_setsessauth, 1}, {"no-security-labels", no_argument, &no_security_labels, 1}, + {"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1}, {"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1}, {NULL, 0, NULL, 0} *************** main(int argc, char **argv) *** 360,365 **** --- 363,374 ---- set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump")); + /* + * Initialize what we need for parallel execution, especially for thread + * support on Windows. + */ + init_parallel_dump_utils(); + g_verbose = false; strcpy(g_comment_start, "-- "); *************** main(int argc, char **argv) *** 390,396 **** } } ! while ((c = getopt_long(argc, argv, "abcCd:E:f:F:h:iK:n:N:oOp:RsS:t:T:U:vwWxZ:", long_options, &optindex)) != -1) { switch (c) --- 399,405 ---- } } ! while ((c = getopt_long(argc, argv, "abcCd:E:f:F:h:ij:K:n:N:oOp:RsS:t:T:U:vwWxZ:", long_options, &optindex)) != -1) { switch (c) *************** main(int argc, char **argv) *** 435,440 **** --- 444,453 ---- /* ignored, deprecated option */ break; + case 'j': /* number of dump jobs */ + numWorkers = atoi(optarg); + break; + case 'n': /* include schema(s) */ simple_string_list_append(&schema_include_patterns, optarg); include_everything = false; *************** main(int argc, char **argv) *** 577,582 **** --- 590,611 ---- compressLevel = 0; } + /* + * On Windows we can only have at most MAXIMUM_WAIT_OBJECTS (= 64 usually) + * parallel jobs because that's the maximum limit for the + * WaitForMultipleObjects() call. + */ + if (numWorkers <= 0 + #ifdef WIN32 + || numWorkers > MAXIMUM_WAIT_OBJECTS + #endif + ) + exit_horribly(NULL, "%s: invalid number of parallel jobs\n", progname); + + /* Parallel backup only in the directory archive format so far */ + if (archiveFormat != archDirectory && numWorkers > 1) + exit_horribly(NULL, "parallel backup only supported by the directory format\n"); + /* Open the output file */ fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode); *************** main(int argc, char **argv) *** 600,605 **** --- 629,636 ---- fout->minRemoteVersion = 70000; fout->maxRemoteVersion = (my_version / 100) * 100 + 99; + fout->numWorkers = numWorkers; + /* * Open the database using the Archiver, so it knows about it. Errors mean * death. *************** main(int argc, char **argv) *** 632,663 **** PQclear(res); } - /* - * Start transaction-snapshot mode transaction to dump consistent data. - */ - ExecuteSqlStatement(fout, "BEGIN"); - if (fout->remoteVersion >= 90100) - { - if (serializable_deferrable) - ExecuteSqlStatement(fout, - "SET TRANSACTION ISOLATION LEVEL " - "SERIALIZABLE, READ ONLY, DEFERRABLE"); - else - ExecuteSqlStatement(fout, - "SET TRANSACTION ISOLATION LEVEL " - "REPEATABLE READ, READ ONLY"); - } - else if (fout->remoteVersion >= 70400) - { - /* note: comma was not accepted in SET TRANSACTION before 8.0 */ - ExecuteSqlStatement(fout, - "SET TRANSACTION ISOLATION LEVEL " - "SERIALIZABLE READ ONLY"); - } - else - ExecuteSqlStatement(fout, - "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - /* Select the appropriate subquery to convert user IDs to names */ if (fout->remoteVersion >= 80100) username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid ="; --- 663,668 ---- *************** main(int argc, char **argv) *** 666,671 **** --- 671,684 ---- else username_subquery = "SELECT usename FROM pg_user WHERE usesysid ="; + /* check the version for the synchronized snapshots feature */ + if (numWorkers > 1 && fout->remoteVersion < 90200 + && !no_synchronized_snapshots) + exit_horribly(NULL, + "No synchronized snapshots available in this server version.\n" + "Run with --no-synchronized-snapshots instead if you do not\n" + "need synchronized snapshots.\n"); + /* Find the last built-in OID, if needed */ if (fout->remoteVersion < 70300) { *************** main(int argc, char **argv) *** 763,768 **** --- 776,785 ---- else sortDumpableObjectsByTypeOid(dobjs, numObjs); + /* If we do a parallel dump, we want the largest tables to go first */ + if (archiveFormat == archDirectory && numWorkers > 1) + sortDataAndIndexObjectsBySize(dobjs, numObjs); + sortDumpableObjects(dobjs, numObjs, boundaryObjs[0].dumpId, boundaryObjs[1].dumpId); *************** help(const char *progname) *** 844,849 **** --- 861,867 ---- printf(_(" -f, --file=FILENAME output file or directory name\n")); printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n" " plain text (default))\n")); + printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n")); printf(_(" -v, --verbose verbose mode\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -Z, --compress=0-9 compression level for compressed formats\n")); *************** help(const char *progname) *** 873,878 **** --- 891,897 ---- printf(_(" --exclude-table-data=TABLE do NOT dump data for the named table(s)\n")); printf(_(" --inserts dump data as INSERT commands, rather than COPY\n")); printf(_(" --no-security-labels do not dump security label assignments\n")); + printf(_(" --no-synchronized-snapshots parallel processes should not use synchronized snapshots\n")); printf(_(" --no-tablespaces do not dump tablespace assignments\n")); printf(_(" --no-unlogged-table-data do not dump unlogged table data\n")); printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n")); *************** setup_connection(Archive *AH, const char *** 902,908 **** PGconn *conn = GetConnection(AH); const char *std_strings; ! /* Set the client encoding if requested */ if (dumpencoding) { if (PQsetClientEncoding(conn, dumpencoding) < 0) --- 921,932 ---- PGconn *conn = GetConnection(AH); const char *std_strings; ! /* ! * Set the client encoding if requested. If dumpencoding == NULL then ! * either it hasn't been requested or we're a cloned connection and then this ! * has already been set in CloneArchive according to the original ! * connection encoding. ! */ if (dumpencoding) { if (PQsetClientEncoding(conn, dumpencoding) < 0) *************** setup_connection(Archive *AH, const char *** 920,925 **** --- 944,953 ---- AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0); /* Set the role if requested */ + if (!use_role && AH->use_role) + use_role = AH->use_role; + + /* Set the role if requested */ if (use_role && AH->remoteVersion >= 80100) { PQExpBuffer query = createPQExpBuffer(); *************** setup_connection(Archive *AH, const char *** 927,932 **** --- 955,964 ---- appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role)); ExecuteSqlStatement(AH, query->data); destroyPQExpBuffer(query); + + /* save this for later use on parallel connections */ + if (!AH->use_role) + AH->use_role = strdup(use_role); } /* Set the datestyle to ISO to ensure the dump's portability */ *************** setup_connection(Archive *AH, const char *** 963,968 **** --- 995,1064 ---- */ if (quote_all_identifiers && AH->remoteVersion >= 90100) ExecuteSqlStatement(AH, "SET quote_all_identifiers = true"); + + /* + * Start transaction-snapshot mode transaction to dump consistent data. + */ + ExecuteSqlStatement(AH, "BEGIN"); + if (AH->remoteVersion >= 90100) + { + if (serializable_deferrable) + ExecuteSqlStatement(AH, + "SET TRANSACTION ISOLATION LEVEL " + "SERIALIZABLE, READ ONLY, DEFERRABLE"); + else + ExecuteSqlStatement(AH, + "SET TRANSACTION ISOLATION LEVEL " + "REPEATABLE READ, READ ONLY"); + } + else if (AH->remoteVersion >= 70400) + { + /* note: comma was not accepted in SET TRANSACTION before 8.0 */ + ExecuteSqlStatement(AH, + "SET TRANSACTION ISOLATION LEVEL " + "SERIALIZABLE READ ONLY"); + } + else + ExecuteSqlStatement(AH, + "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + + + + if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots) + { + if (AH->sync_snapshot_id) + { + PQExpBuffer query = createPQExpBuffer(); + appendPQExpBuffer(query, "SET TRANSACTION SNAPSHOT "); + appendStringLiteralConn(query, AH->sync_snapshot_id, conn); + destroyPQExpBuffer(query); + } + else + AH->sync_snapshot_id = get_synchronized_snapshot(AH); + } + } + + /* + * Initialize the connection for a new worker process. + */ + void + _SetupWorker(Archive *AHX, RestoreOptions *ropt) + { + setup_connection(AHX, NULL, NULL); + } + + static char* + get_synchronized_snapshot(Archive *fout) + { + char *query = "select pg_export_snapshot()"; + char *result; + PGresult *res; + + res = ExecuteSqlQueryForSingleRow(fout, query); + result = strdup(PQgetvalue(res, 0, 0)); + PQclear(res); + + return result; } static ArchiveFormat *************** dumpTableData_copy(Archive *fout, void * *** 1280,1285 **** --- 1376,1386 ---- const bool hasoids = tbinfo->hasoids; const bool oids = tdinfo->oids; PQExpBuffer q = createPQExpBuffer(); + /* + * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId which + * uses it already. + */ + PQExpBuffer clistBuf = createPQExpBuffer(); PGconn *conn = GetConnection(fout); PGresult *res; int ret; *************** dumpTableData_copy(Archive *fout, void * *** 1304,1317 **** * cases involving ADD COLUMN and inheritance.) */ if (fout->remoteVersion >= 70300) ! column_list = fmtCopyColumnList(tbinfo); else column_list = ""; /* can't select columns in COPY */ if (oids && hasoids) { appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;", ! fmtQualifiedId(fout, tbinfo->dobj.namespace->dobj.name, classname), column_list); --- 1405,1418 ---- * cases involving ADD COLUMN and inheritance.) */ if (fout->remoteVersion >= 70300) ! column_list = fmtCopyColumnList(tbinfo, clistBuf); else column_list = ""; /* can't select columns in COPY */ if (oids && hasoids) { appendPQExpBuffer(q, "COPY %s %s WITH OIDS TO stdout;", ! fmtQualifiedId(fout->remoteVersion, tbinfo->dobj.namespace->dobj.name, classname), column_list); *************** dumpTableData_copy(Archive *fout, void * *** 1329,1335 **** else appendPQExpBufferStr(q, "* "); appendPQExpBuffer(q, "FROM %s %s) TO stdout;", ! fmtQualifiedId(fout, tbinfo->dobj.namespace->dobj.name, classname), tdinfo->filtercond); --- 1430,1436 ---- else appendPQExpBufferStr(q, "* "); appendPQExpBuffer(q, "FROM %s %s) TO stdout;", ! fmtQualifiedId(fout->remoteVersion, tbinfo->dobj.namespace->dobj.name, classname), tdinfo->filtercond); *************** dumpTableData_copy(Archive *fout, void * *** 1337,1349 **** else { appendPQExpBuffer(q, "COPY %s %s TO stdout;", ! fmtQualifiedId(fout, tbinfo->dobj.namespace->dobj.name, classname), column_list); } res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT); PQclear(res); for (;;) { --- 1438,1451 ---- else { appendPQExpBuffer(q, "COPY %s %s TO stdout;", ! fmtQualifiedId(fout->remoteVersion, tbinfo->dobj.namespace->dobj.name, classname), column_list); } res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT); PQclear(res); + destroyPQExpBuffer(clistBuf); for (;;) { *************** dumpTableData_insert(Archive *fout, void *** 1462,1468 **** { appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR " "SELECT * FROM ONLY %s", ! fmtQualifiedId(fout, tbinfo->dobj.namespace->dobj.name, classname)); } --- 1564,1570 ---- { appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR " "SELECT * FROM ONLY %s", ! fmtQualifiedId(fout->remoteVersion, tbinfo->dobj.namespace->dobj.name, classname)); } *************** dumpTableData_insert(Archive *fout, void *** 1470,1476 **** { appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR " "SELECT * FROM %s", ! fmtQualifiedId(fout, tbinfo->dobj.namespace->dobj.name, classname)); } --- 1572,1578 ---- { appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR " "SELECT * FROM %s", ! fmtQualifiedId(fout->remoteVersion, tbinfo->dobj.namespace->dobj.name, classname)); } *************** dumpTableData(Archive *fout, TableDataIn *** 1602,1607 **** --- 1704,1710 ---- { TableInfo *tbinfo = tdinfo->tdtable; PQExpBuffer copyBuf = createPQExpBuffer(); + PQExpBuffer clistBuf = createPQExpBuffer(); DataDumperPtr dumpFn; char *copyStmt; *************** dumpTableData(Archive *fout, TableDataIn *** 1613,1619 **** appendPQExpBuffer(copyBuf, "COPY %s ", fmtId(tbinfo->dobj.name)); appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n", ! fmtCopyColumnList(tbinfo), (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : ""); copyStmt = copyBuf->data; } --- 1716,1722 ---- appendPQExpBuffer(copyBuf, "COPY %s ", fmtId(tbinfo->dobj.name)); appendPQExpBuffer(copyBuf, "%s %sFROM stdin;\n", ! fmtCopyColumnList(tbinfo, clistBuf), (tdinfo->oids && tbinfo->hasoids) ? "WITH OIDS " : ""); copyStmt = copyBuf->data; } *************** dumpTableData(Archive *fout, TableDataIn *** 1638,1643 **** --- 1741,1747 ---- dumpFn, tdinfo); destroyPQExpBuffer(copyBuf); + destroyPQExpBuffer(clistBuf); } /* *************** getTables(Archive *fout, int *numTables) *** 4117,4122 **** --- 4221,4227 ---- int i_reloptions; int i_toastreloptions; int i_reloftype; + int i_relpages; /* Make sure we are in proper schema */ selectSourceSchema(fout, "pg_catalog"); *************** getTables(Archive *fout, int *numTables) *** 4156,4161 **** --- 4261,4267 ---- "c.relfrozenxid, tc.oid AS toid, " "tc.relfrozenxid AS tfrozenxid, " "c.relpersistence, pg_relation_is_scannable(c.oid) as isscannable, " + "c.relpages, " "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4192,4197 **** --- 4298,4304 ---- "c.relfrozenxid, tc.oid AS toid, " "tc.relfrozenxid AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "c.relpages, " "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4227,4232 **** --- 4334,4340 ---- "c.relfrozenxid, tc.oid AS toid, " "tc.relfrozenxid AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "c.relpages, " "NULL AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4262,4267 **** --- 4370,4376 ---- "c.relfrozenxid, tc.oid AS toid, " "tc.relfrozenxid AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "c.relpages, " "NULL AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4298,4303 **** --- 4407,4413 ---- "0 AS toid, " "0 AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "relpages, " "NULL AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4333,4338 **** --- 4443,4449 ---- "0 AS toid, " "0 AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "relpages, " "NULL AS reloftype, " "d.refobjid AS owning_tab, " "d.refobjsubid AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4364,4369 **** --- 4475,4481 ---- "0 AS toid, " "0 AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "relpages, " "NULL AS reloftype, " "NULL::oid AS owning_tab, " "NULL::int4 AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4390,4395 **** --- 4502,4508 ---- "0 AS toid, " "0 AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "relpages, " "NULL AS reloftype, " "NULL::oid AS owning_tab, " "NULL::int4 AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4426,4431 **** --- 4539,4545 ---- "0 AS toid, " "0 AS tfrozenxid, " "'p' AS relpersistence, 't'::bool as isscannable, " + "0 AS relpages, " "NULL AS reloftype, " "NULL::oid AS owning_tab, " "NULL::int4 AS owning_col, " *************** getTables(Archive *fout, int *numTables) *** 4474,4479 **** --- 4588,4594 ---- i_toastfrozenxid = PQfnumber(res, "tfrozenxid"); i_relpersistence = PQfnumber(res, "relpersistence"); i_isscannable = PQfnumber(res, "isscannable"); + i_relpages = PQfnumber(res, "relpages"); i_owning_tab = PQfnumber(res, "owning_tab"); i_owning_col = PQfnumber(res, "owning_col"); i_reltablespace = PQfnumber(res, "reltablespace"); *************** getTables(Archive *fout, int *numTables) *** 4516,4521 **** --- 4631,4637 ---- tblinfo[i].hastriggers = (strcmp(PQgetvalue(res, i, i_relhastriggers), "t") == 0); tblinfo[i].hasoids = (strcmp(PQgetvalue(res, i, i_relhasoids), "t") == 0); tblinfo[i].isscannable = (strcmp(PQgetvalue(res, i, i_isscannable), "t") == 0); + tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages)); tblinfo[i].frozenxid = atooid(PQgetvalue(res, i, i_relfrozenxid)); tblinfo[i].toast_oid = atooid(PQgetvalue(res, i, i_toastoid)); tblinfo[i].toast_frozenxid = atooid(PQgetvalue(res, i, i_toastfrozenxid)); *************** getTables(Archive *fout, int *numTables) *** 4565,4571 **** resetPQExpBuffer(query); appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE", ! fmtQualifiedId(fout, tblinfo[i].dobj.namespace->dobj.name, tblinfo[i].dobj.name)); ExecuteSqlStatement(fout, query->data); --- 4681,4687 ---- resetPQExpBuffer(query); appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE", ! fmtQualifiedId(fout->remoteVersion, tblinfo[i].dobj.namespace->dobj.name, tblinfo[i].dobj.name)); ExecuteSqlStatement(fout, query->data); *************** getIndexes(Archive *fout, TableInfo tbli *** 4704,4710 **** i_conoid, i_condef, i_tablespace, ! i_options; int ntups; for (i = 0; i < numTables; i++) --- 4820,4827 ---- i_conoid, i_condef, i_tablespace, ! i_options, ! i_relpages; int ntups; for (i = 0; i < numTables; i++) *************** getIndexes(Archive *fout, TableInfo tbli *** 4749,4754 **** --- 4866,4872 ---- "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, " "t.relnatts AS indnkeys, " "i.indkey, i.indisclustered, " + "t.relpages, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " *************** getIndexes(Archive *fout, TableInfo tbli *** 4774,4779 **** --- 4892,4898 ---- "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, " "t.relnatts AS indnkeys, " "i.indkey, i.indisclustered, " + "t.relpages, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " *************** getIndexes(Archive *fout, TableInfo tbli *** 4802,4807 **** --- 4921,4927 ---- "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, " "t.relnatts AS indnkeys, " "i.indkey, i.indisclustered, " + "t.relpages, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " *************** getIndexes(Archive *fout, TableInfo tbli *** 4830,4835 **** --- 4950,4956 ---- "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, " "t.relnatts AS indnkeys, " "i.indkey, i.indisclustered, " + "t.relpages, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " *************** getIndexes(Archive *fout, TableInfo tbli *** 4858,4863 **** --- 4979,4985 ---- "pg_get_indexdef(i.indexrelid) AS indexdef, " "t.relnatts AS indnkeys, " "i.indkey, false AS indisclustered, " + "t.relpages, " "CASE WHEN i.indisprimary THEN 'p'::char " "ELSE '0'::char END AS contype, " "t.relname AS conname, " *************** getIndexes(Archive *fout, TableInfo tbli *** 4884,4889 **** --- 5006,5012 ---- "pg_get_indexdef(i.indexrelid) AS indexdef, " "t.relnatts AS indnkeys, " "i.indkey, false AS indisclustered, " + "t.relpages, " "CASE WHEN i.indisprimary THEN 'p'::char " "ELSE '0'::char END AS contype, " "t.relname AS conname, " *************** getIndexes(Archive *fout, TableInfo tbli *** 4912,4917 **** --- 5035,5041 ---- i_indnkeys = PQfnumber(res, "indnkeys"); i_indkey = PQfnumber(res, "indkey"); i_indisclustered = PQfnumber(res, "indisclustered"); + i_relpages = PQfnumber(res, "relpages"); i_contype = PQfnumber(res, "contype"); i_conname = PQfnumber(res, "conname"); i_condeferrable = PQfnumber(res, "condeferrable"); *************** getIndexes(Archive *fout, TableInfo tbli *** 4954,4959 **** --- 5078,5084 ---- parseOidArray(PQgetvalue(res, j, i_indkey), indxinfo[j].indkeys, INDEX_MAX_KEYS); indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't'); + indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages)); contype = *(PQgetvalue(res, j, i_contype)); if (contype == 'p' || contype == 'u' || contype == 'x') *************** findDumpableDependencies(ArchiveHandle * *** 14843,14864 **** * * Whenever the selected schema is not pg_catalog, be careful to qualify * references to system catalogs and types in our emitted commands! */ static void selectSourceSchema(Archive *fout, const char *schemaName) { - static char *curSchemaName = NULL; PQExpBuffer query; /* Not relevant if fetching from pre-7.3 DB */ if (fout->remoteVersion < 70300) return; - /* Ignore null schema names */ - if (schemaName == NULL || *schemaName == '\0') - return; - /* Optimize away repeated selection of same schema */ - if (curSchemaName && strcmp(curSchemaName, schemaName) == 0) - return; query = createPQExpBuffer(); appendPQExpBuffer(query, "SET search_path = %s", --- 14968,14988 ---- * * Whenever the selected schema is not pg_catalog, be careful to qualify * references to system catalogs and types in our emitted commands! + * + * This function is called only from selectSourceSchemaOnAH and + * selectSourceSchema. */ static void selectSourceSchema(Archive *fout, const char *schemaName) { PQExpBuffer query; + /* This is checked by the callers already */ + Assert(schemaName != NULL && *schemaName != '\0'); + /* Not relevant if fetching from pre-7.3 DB */ if (fout->remoteVersion < 70300) return; query = createPQExpBuffer(); appendPQExpBuffer(query, "SET search_path = %s", *************** selectSourceSchema(Archive *fout, const *** 14869,14877 **** ExecuteSqlStatement(fout, query->data); destroyPQExpBuffer(query); - if (curSchemaName) - free(curSchemaName); - curSchemaName = pg_strdup(schemaName); } /* --- 14993,14998 ---- *************** myFormatType(const char *typname, int32 *** 15009,15079 **** } /* - * fmtQualifiedId - convert a qualified name to the proper format for - * the source database. - * - * Like fmtId, use the result before calling again. - */ - static const char * - fmtQualifiedId(Archive *fout, const char *schema, const char *id) - { - static PQExpBuffer id_return = NULL; - - if (id_return) /* first time through? */ - resetPQExpBuffer(id_return); - else - id_return = createPQExpBuffer(); - - /* Suppress schema name if fetching from pre-7.3 DB */ - if (fout->remoteVersion >= 70300 && schema && *schema) - { - appendPQExpBuffer(id_return, "%s.", - fmtId(schema)); - } - appendPQExpBuffer(id_return, "%s", - fmtId(id)); - - return id_return->data; - } - - /* * Return a column list clause for the given relation. * * Special case: if there are no undropped columns in the relation, return * "", not an invalid "()" column list. */ static const char * ! fmtCopyColumnList(const TableInfo *ti) { - static PQExpBuffer q = NULL; int numatts = ti->numatts; char **attnames = ti->attnames; bool *attisdropped = ti->attisdropped; bool needComma; int i; ! if (q) /* first time through? */ ! resetPQExpBuffer(q); ! else ! q = createPQExpBuffer(); ! ! appendPQExpBuffer(q, "("); needComma = false; for (i = 0; i < numatts; i++) { if (attisdropped[i]) continue; if (needComma) ! appendPQExpBuffer(q, ", "); ! appendPQExpBuffer(q, "%s", fmtId(attnames[i])); needComma = true; } if (!needComma) return ""; /* no undropped columns */ ! appendPQExpBuffer(q, ")"); ! return q->data; } /* --- 15130,15166 ---- } /* * Return a column list clause for the given relation. * * Special case: if there are no undropped columns in the relation, return * "", not an invalid "()" column list. */ static const char * ! fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer) { int numatts = ti->numatts; char **attnames = ti->attnames; bool *attisdropped = ti->attisdropped; bool needComma; int i; ! appendPQExpBuffer(buffer, "("); needComma = false; for (i = 0; i < numatts; i++) { if (attisdropped[i]) continue; if (needComma) ! appendPQExpBuffer(buffer, ", "); ! appendPQExpBuffer(buffer, "%s", fmtId(attnames[i])); needComma = true; } if (!needComma) return ""; /* no undropped columns */ ! appendPQExpBuffer(buffer, ")"); ! return buffer->data; } /* diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h new file mode 100644 index 01ec27b..7970a35 *** a/src/bin/pg_dump/pg_dump.h --- b/src/bin/pg_dump/pg_dump.h *************** typedef struct _tableInfo *** 252,257 **** --- 252,258 ---- /* these two are set only if table is a sequence owned by a column: */ Oid owning_tab; /* OID of table owning sequence */ int owning_col; /* attr # of column owning sequence */ + int relpages; bool interesting; /* true if need to collect more data */ *************** typedef struct _indxInfo *** 315,320 **** --- 316,322 ---- bool indisclustered; /* if there is an associated constraint object, its dumpId: */ DumpId indexconstraint; + int relpages; /* relpages of the underlying table */ } IndxInfo; typedef struct _ruleInfo *************** extern void sortDumpableObjects(Dumpable *** 532,537 **** --- 534,540 ---- DumpId preBoundaryId, DumpId postBoundaryId); extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs); extern void sortDumpableObjectsByTypeOid(DumpableObject **objs, int numObjs); + extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs); /* * version specific routines diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c new file mode 100644 index 2c3d850..f1130c0 *** a/src/bin/pg_dump/pg_dump_sort.c --- b/src/bin/pg_dump/pg_dump_sort.c *************** static void repairDependencyLoop(Dumpabl *** 143,148 **** --- 143,235 ---- static void describeDumpableObject(DumpableObject *obj, char *buf, int bufsize); + static int DOSizeCompare(const void *p1, const void *p2); + + static int + findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs) + { + int i; + for (i = 0; i < numObjs; i++) + if (objs[i]->objType == type) + return i; + return -1; + } + + static int + findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start) + { + int i; + for (i = start; i < numObjs; i++) + if (objs[i]->objType != type) + return i; + return numObjs - 1; + } + + /* + * When we do a parallel dump, we want to start with the largest items first. + * + * Say we have the objects in this order: + * ....DDDDD....III.... + * + * with D = Table data, I = Index, . = other object + * + * This sorting function now takes each of the D or I blocks and sorts them + * according to their size. + */ + void + sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs) + { + int startIdx, endIdx; + void *startPtr; + + if (numObjs <= 1) + return; + + startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs); + if (startIdx >= 0) + { + endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx); + startPtr = objs + startIdx; + qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *), + DOSizeCompare); + } + + startIdx = findFirstEqualType(DO_INDEX, objs, numObjs); + if (startIdx >= 0) + { + endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx); + startPtr = objs + startIdx; + qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *), + DOSizeCompare); + } + } + + static int + DOSizeCompare(const void *p1, const void *p2) + { + DumpableObject *obj1 = *(DumpableObject **) p1; + DumpableObject *obj2 = *(DumpableObject **) p2; + int obj1_size = 0; + int obj2_size = 0; + + if (obj1->objType == DO_TABLE_DATA) + obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages; + if (obj1->objType == DO_INDEX) + obj1_size = ((IndxInfo *) obj1)->relpages; + + if (obj2->objType == DO_TABLE_DATA) + obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages; + if (obj2->objType == DO_INDEX) + obj2_size = ((IndxInfo *) obj2)->relpages; + + /* we want to see the biggest item go first */ + if (obj1_size > obj2_size) + return -1; + if (obj2_size > obj1_size) + return 1; + + return 0; + } /* * Sort the given objects into a type/name-based ordering diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c new file mode 100644 index 5dbe98f..9b36a09 *** a/src/bin/pg_dump/pg_restore.c --- b/src/bin/pg_dump/pg_restore.c *************** main(int argc, char **argv) *** 71,76 **** --- 71,77 ---- RestoreOptions *opts; int c; int exit_code; + int numWorkers = 1; Archive *AH; char *inputFileSpec; static int disable_triggers = 0; *************** main(int argc, char **argv) *** 182,188 **** break; case 'j': /* number of restore jobs */ ! opts->number_of_jobs = atoi(optarg); break; case 'l': /* Dump the TOC summary */ --- 183,189 ---- break; case 'j': /* number of restore jobs */ ! numWorkers = atoi(optarg); break; case 'l': /* Dump the TOC summary */ *************** main(int argc, char **argv) *** 313,319 **** } /* Can't do single-txn mode with multiple connections */ ! if (opts->single_txn && opts->number_of_jobs > 1) { fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"), progname); --- 314,320 ---- } /* Can't do single-txn mode with multiple connections */ ! if (opts->single_txn && numWorkers > 1) { fprintf(stderr, _("%s: cannot specify both --single-transaction and multiple jobs\n"), progname); *************** main(int argc, char **argv) *** 372,377 **** --- 373,390 ---- if (opts->tocFile) SortTocFromFile(AH, opts); + /* See comments in pg_dump.c */ + #ifdef WIN32 + if (numWorkers > MAXIMUM_WAIT_OBJECTS) + { + fprintf(stderr, _("%s: maximum number of parallel jobs is %d\n"), + progname, MAXIMUM_WAIT_OBJECTS); + exit(1); + } + #endif + + AH->numWorkers = numWorkers; + if (opts->tocSummary) PrintTOCSummary(AH, opts); else *************** main(int argc, char **argv) *** 393,398 **** --- 406,418 ---- return exit_code; } + void + _SetupWorker(Archive *AHX, RestoreOptions *ropt) + { + ArchiveHandle *AH = (ArchiveHandle *) AHX; + (AH->ReopenPtr) (AH); + } + static void usage(const char *progname) {