WIP patch for parallel pg_dump
This is the second patch for parallel pg_dump, now the actual part that
parallelizes the whole thing. More precisely, it adds parallel
backup/restore
to pg_dump/pg_restore for the directory archive format and keeps the
parallel
restore part of the custom archive format. Combined with my archive format
directory patch, which also includes a prototype of the liblzf compression
you
can combine this compression with any of the just mentioned backup/restore
scenarios. This patch is on top of the previous directory patch.
You would add a regular parallel dump with
$ pg_dump -j 4 -Fd -f out.dir dbname
In previous discussions there was a request to add support for multiple
directories, which I have done as well, so that you can also run
$ pg_dump -j 4 -Fd -f dir1:dir2:dir3 dbname
to equally distribute the data among those three directories (we can still
discuss the syntax, I am not all that happy with the colon either...)
The dump would always start with the largest objects, by looking at the
relpages column of pg_class which should give a good estimate. The order of
the
objects to restore is determined by the dependencies among the objects
(which
is already used in the parallel restore of the custom archivetype).
The file test.sh includes some example commands that I have run here as a
kind
of regression test that should give you an impression of how to call it from
the
command line.
One thing that is currently missing is proper support for Windows, this is
the next
thing that I will be working on. Also this version still gives quite a bunch
of debug
information about what the processes are doing, so don't try to pipe the
pg_dump output anywhere (even when not run in parallel), it will probably
just
not work...
The missing part that would make parallel pg_dump work with no strings
attached
is snapshot synchronization. As long as there are no synchronized snapshots,
you would need to stop writing to your database before starting the parallel
pg_dump. However it turns out that most often when you are especially
concerned
about a fast dump, you have shut down your applications anyway (which is the
reason why you are so concerned about speed in the first place). These cases
are typically database migrations from one host/platform to another or
database
upgrades without pg_migrator.
Joachim
Attachments:
pg_dump-parallel.difftext/x-patch; charset=US-ASCII; name=pg_dump-parallel.diffDownload
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 5def7a7..984e700 100644
*** a/src/bin/pg_dump/pg_backup.h
--- b/src/bin/pg_dump/pg_backup.h
*************** extern void ArchiveEntry(Archive *AHX,
*** 171,177 ****
CatalogId catalogId, DumpId dumpId,
const char *tag,
const char *namespace, const char *tablespace,
! const char *owner, bool withOids,
const char *desc, teSection section,
const char *defn,
const char *dropStmt, const char *copyStmt,
--- 171,178 ----
CatalogId catalogId, DumpId dumpId,
const char *tag,
const char *namespace, const char *tablespace,
! const char *owner,
! unsigned long int relpages, bool withOids,
const char *desc, teSection section,
const char *defn,
const char *dropStmt, const char *copyStmt,
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index c5b5fcc..e00505e 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 25,30 ****
--- 25,31 ----
#include "compress_io.h"
#include <ctype.h>
+ #include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
***************
*** 44,86 ****
#define WORKER_INHIBIT_DATA 11
#define WORKER_IGNORED_ERRORS 12
- /*
- * Unix uses exit to return result from worker child, so function is void.
- * Windows thread result comes via function return.
- */
- #ifndef WIN32
- #define parallel_restore_result void
- #else
- #define parallel_restore_result DWORD
- #endif
-
- /* IDs for worker children are either PIDs or thread handles */
- #ifndef WIN32
- #define thandle pid_t
- #else
- #define thandle HANDLE
- #endif
-
- /* Arguments needed for a worker child */
- typedef struct _restore_args
- {
- ArchiveHandle *AH;
- TocEntry *te;
- } RestoreArgs;
-
- /* State for each parallel activity slot */
- typedef struct _parallel_slot
- {
- thandle child_id;
- RestoreArgs *args;
- } ParallelSlot;
-
- #define NO_SLOT (-1)
-
const char *progname;
static const char *modulename = gettext_noop("archiver");
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const int compression, ArchiveMode mode);
--- 45,56 ----
#define WORKER_INHIBIT_DATA 11
#define WORKER_IGNORED_ERRORS 12
const char *progname;
static const char *modulename = gettext_noop("archiver");
+ PGconn **g_conn_child;
+ PGconn *g_conn;
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const int compression, ArchiveMode mode);
*************** static void ResetOutput(ArchiveHandle *A
*** 119,139 ****
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel);
! static void restore_toc_entries_parallel(ArchiveHandle *AH);
! static thandle spawn_restore(RestoreArgs *args);
! static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
! static bool work_in_progress(ParallelSlot *slots, int n_slots);
! static int get_next_slot(ParallelSlot *slots, int n_slots);
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry *ready_list,
! ParallelSlot *slots, int n_slots);
! static parallel_restore_result parallel_restore(RestoreArgs *args);
static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! thandle worker, int status,
! ParallelSlot *slots, int n_slots);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH,
--- 89,104 ----
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
RestoreOptions *ropt, bool is_parallel);
! static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate);
static void par_list_header_init(TocEntry *l);
static void par_list_append(TocEntry *l, TocEntry *te);
static void par_list_remove(TocEntry *te);
static TocEntry *get_next_work_item(ArchiveHandle *AH,
TocEntry *ready_list,
! ParallelState *pstate);
static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! int worker, int status,
! ParallelState *pstate);
static void fix_dependencies(ArchiveHandle *AH);
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
static void repoint_table_dependencies(ArchiveHandle *AH,
*************** static void reduce_dependencies(ArchiveH
*** 145,153 ****
TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
- static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
- static void DeCloneArchive(ArchiveHandle *AH);
/*
* Wrapper functions.
--- 110,123 ----
TocEntry *ready_list);
static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
+ static void ListenToChildren(ArchiveHandle *AH, ParallelState *pstate, bool do_wait);
+ static void WaitForCommands(ArchiveHandle *AH, int, int);
+ static void PrintStatus(ParallelState *pstate);
+ static int GetIdleChild(ParallelState *pstate);
+ static int ReapChildStatus(ParallelState *pstate, int *status);
+ static bool HasEveryChildTerminated(ParallelState *pstate);
+ static bool IsEveryChildIdle(ParallelState *pstate);
/*
* Wrapper functions.
*************** RestoreArchive(Archive *AHX, RestoreOpti
*** 245,251 ****
}
#endif
#ifndef HAVE_LIBLZF
- /* XXX are these checks correct?? */
if (AH->compression == COMPR_LZF_CODE && AH->PrintTocDataPtr !=NULL)
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
--- 215,220 ----
*************** RestoreArchive(Archive *AHX, RestoreOpti
*** 389,395 ****
* In parallel mode, turn control over to the parallel-restore logic.
*/
if (ropt->number_of_jobs > 1 && ropt->useDB)
! restore_toc_entries_parallel(AH);
else
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
--- 358,370 ----
* In parallel mode, turn control over to the parallel-restore logic.
*/
if (ropt->number_of_jobs > 1 && ropt->useDB)
! {
! ParallelState pstate;
! /* this will actually fork the processes */
! pstate = ParallelBackupStart(AH, ropt->number_of_jobs, ropt);
! restore_toc_entries_parallel(AH, &pstate);
! ParallelBackupEnd(AH, &pstate);
! }
else
{
for (te = AH->toc->next; te != AH->toc; te = te->next)
*************** ArchiveEntry(Archive *AHX,
*** 728,734 ****
const char *tag,
const char *namespace,
const char *tablespace,
! const char *owner, bool withOids,
const char *desc, teSection section,
const char *defn,
const char *dropStmt, const char *copyStmt,
--- 703,710 ----
const char *tag,
const char *namespace,
const char *tablespace,
! const char *owner,
! unsigned long int relpages, bool withOids,
const char *desc, teSection section,
const char *defn,
const char *dropStmt, const char *copyStmt,
*************** _discoverArchiveFormat(ArchiveHandle *AH
*** 1831,1839 ****
strcpy(buf, AH->fSpec);
fh = fopen(buf, PG_BINARY_R);
! if (!fh)
! die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
! AH->fSpec, strerror(errno));
}
else
{
--- 1807,1821 ----
strcpy(buf, AH->fSpec);
fh = fopen(buf, PG_BINARY_R);
! if (!fh) {
! const char* dirhint = "";
! if (strchr(buf, ':'))
! {
! dirhint = _(" (for multiple directories, please use -Fd explicitly)");
! }
! die_horribly(AH, modulename, "could not open input file \"%s\": %s%s\n",
! AH->fSpec, strerror(errno), dirhint);
! }
}
else
{
*************** _allocAH(const char *FileSpec, const Arc
*** 2065,2118 ****
return AH;
}
-
-
void
WriteDataChunks(ArchiveHandle *AH)
{
! TocEntry *te;
! StartDataPtr startPtr;
! EndDataPtr endPtr;
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! if (te->dataDumper != NULL)
{
! AH->currToc = te;
! /* printf("Writing data for %d (%x)\n", te->id, te); */
! if (strcmp(te->desc, "BLOBS") == 0)
! {
! startPtr = AH->StartBlobsPtr;
! endPtr = AH->EndBlobsPtr;
! }
! else
{
! startPtr = AH->StartDataPtr;
! endPtr = AH->EndDataPtr;
! }
! if (startPtr != NULL)
! (*startPtr) (AH, te);
! /*
! * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
! */
! /*
! * The user-provided DataDumper routine needs to call
! * AH->WriteData
! */
! (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
! if (endPtr != NULL)
! (*endPtr) (AH, te);
! AH->currToc = NULL;
}
}
}
void
WriteToc(ArchiveHandle *AH)
{
TocEntry *te;
--- 2047,2161 ----
return AH;
}
void
WriteDataChunks(ArchiveHandle *AH)
{
! TocEntry *te;
! ParallelState *pstate = NULL;
!
! if (AH->GetParallelStatePtr)
! pstate = (AH->GetParallelStatePtr)(AH);
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! if (!te->hadDumper)
! continue;
!
! printf("Dumping table %s (%d)\n", te->tag, te->dumpId);
! fflush(stdout);
! /*
! * If we are in a parallel backup, we are always the master process.
! */
! if (pstate)
{
! int ret_child;
! int work_status;
! for (;;)
{
! int nTerm = 0;
! while ((ret_child = ReapChildStatus(pstate, &work_status)) != NO_SLOT)
! {
! if (work_status != 0)
! die_horribly(AH, modulename, "Error processing a parallel work item\n");
! nTerm++;
! }
! /* We need to make sure that we have an idle child before dispatching
! * the next item. If nTerm > 0 we already have that (quick check). */
! if (nTerm > 0)
! break;
! /* explicit check for an idle child */
! if (GetIdleChild(pstate) != NO_SLOT)
! break;
! /*
! * If we have no idle child, read the result of one or more
! * children and loop the loop to call ReapChildStatus() on them
! */
! ListenToChildren(AH, pstate, true);
! }
!
! Assert(GetIdleChild(pstate) != NO_SLOT);
! DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
! }
! else
! {
! WriteDataChunksForTocEntry(AH, te);
! }
! }
! if (pstate)
! {
! int ret_child;
! int work_status;
!
! /* Waiting for the worker processes to finish */
! /* XXX "worker" vs "child" */
! while (!IsEveryChildIdle(pstate))
! {
! if ((ret_child = ReapChildStatus(pstate, &work_status)) == NO_SLOT)
! ListenToChildren(AH, pstate, true);
}
}
}
void
+ WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
+ {
+ StartDataPtr startPtr;
+ EndDataPtr endPtr;
+
+ AH->currToc = te;
+
+ if (strcmp(te->desc, "BLOBS") == 0)
+ {
+ startPtr = AH->StartBlobsPtr;
+ endPtr = AH->EndBlobsPtr;
+ }
+ else
+ {
+ startPtr = AH->StartDataPtr;
+ endPtr = AH->EndDataPtr;
+ }
+
+ if (startPtr != NULL)
+ (*startPtr) (AH, te);
+
+ /*
+ * The user-provided DataDumper routine needs to call
+ * AH->WriteData
+ */
+ (*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+
+ if (endPtr != NULL)
+ (*endPtr) (AH, te);
+
+ AH->currToc = NULL;
+ }
+
+ void
WriteToc(ArchiveHandle *AH)
{
TocEntry *te;
*************** dumpTimestamp(ArchiveHandle *AH, const c
*** 3269,3281 ****
* entries in a single connection (that happens back in RestoreArchive).
*/
static void
! restore_toc_entries_parallel(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->ropt;
- int n_slots = ropt->number_of_jobs;
- ParallelSlot *slots;
int work_status;
- int next_slot;
TocEntry pending_list;
TocEntry ready_list;
TocEntry *next_work_item;
--- 3312,3321 ----
* entries in a single connection (that happens back in RestoreArchive).
*/
static void
! restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate)
{
RestoreOptions *ropt = AH->ropt;
int work_status;
TocEntry pending_list;
TocEntry ready_list;
TocEntry *next_work_item;
*************** restore_toc_entries_parallel(ArchiveHand
*** 3292,3299 ****
if (AH->version < K_VERS_1_8)
die_horribly(AH, modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n");
- slots = (ParallelSlot *) calloc(sizeof(ParallelSlot), n_slots);
-
/* Adjust dependency information */
fix_dependencies(AH);
--- 3332,3337 ----
*************** restore_toc_entries_parallel(ArchiveHand
*** 3362,3368 ****
--- 3400,3409 ----
if (next_work_item->depCount > 0)
par_list_append(&pending_list, next_work_item);
else
+ {
+ printf("Appending %d to ready_list\n", next_work_item->dumpId);
par_list_append(&ready_list, next_work_item);
+ }
}
}
*************** restore_toc_entries_parallel(ArchiveHand
*** 3376,3383 ****
ahlog(AH, 1, "entering main parallel loop\n");
while ((next_work_item = get_next_work_item(AH, &ready_list,
! slots, n_slots)) != NULL ||
! work_in_progress(slots, n_slots))
{
if (next_work_item != NULL)
{
--- 3417,3424 ----
ahlog(AH, 1, "entering main parallel loop\n");
while ((next_work_item = get_next_work_item(AH, &ready_list,
! pstate)) != NULL ||
! !IsEveryChildIdle(pstate))
{
if (next_work_item != NULL)
{
*************** restore_toc_entries_parallel(ArchiveHand
*** 3397,3447 ****
continue;
}
! if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
! {
! /* There is work still to do and a worker slot available */
! thandle child;
! RestoreArgs *args;
!
! ahlog(AH, 1, "launching item %d %s %s\n",
! next_work_item->dumpId,
! next_work_item->desc, next_work_item->tag);
! par_list_remove(next_work_item);
! /* this memory is dealloced in mark_work_done() */
! args = malloc(sizeof(RestoreArgs));
! args->AH = CloneArchive(AH);
! args->te = next_work_item;
! /* run the step in a worker child */
! child = spawn_restore(args);
! slots[next_slot].child_id = child;
! slots[next_slot].args = args;
! continue;
}
- }
! /*
! * If we get here there must be work being done. Either there is no
! * work available to schedule (and work_in_progress returned true) or
! * there are no slots available. So we wait for a worker to finish,
! * and process the result.
! */
! ret_child = reap_child(slots, n_slots, &work_status);
! if (WIFEXITED(work_status))
! {
! mark_work_done(AH, &ready_list,
! ret_child, WEXITSTATUS(work_status),
! slots, n_slots);
! }
! else
! {
! die_horribly(AH, modulename, "worker process crashed: status %d\n",
! work_status);
}
}
--- 3438,3496 ----
continue;
}
! ahlog(AH, 1, "launching item %d %s %s\n",
! next_work_item->dumpId,
! next_work_item->desc, next_work_item->tag);
! par_list_remove(next_work_item);
! Assert(GetIdleChild(pstate) != NO_SLOT);
! DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
! }
! else
! {
! /* at least one child is working and we have nothing ready. */
! Assert(!IsEveryChildIdle(pstate));
! }
! for (;;)
! {
! int nTerm = 0;
! /*
! * In order to reduce dependencies as soon as possible and
! * especially to reap the status of children who are working on
! * items that pending items depend on, we do a non-blocking check
! * for ended children first.
! *
! * However, if we do not have any other work items currently that
! * children can work on, we do not busy-loop here but instead
! * really wait for at least one child to terminate. Hence we call
! * ListenToChildren(..., ..., true) in this case.
! */
! ListenToChildren(AH, pstate, !next_work_item);
! while ((ret_child = ReapChildStatus(pstate, &work_status)) != NO_SLOT)
! {
! nTerm++;
! printf("Marking the child's work as done\n");
! mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
}
! /* We need to make sure that we have an idle child before re-running the
! * loop. If nTerm > 0 we already have that (quick check). */
! if (nTerm > 0)
! break;
! /* explicit check for an idle child */
! if (GetIdleChild(pstate) != NO_SLOT)
! break;
!
! /*
! * If we have no idle child, read the result of one or more
! * children and loop the loop to call ReapChildStatus() on them
! */
! ListenToChildren(AH, pstate, true);
}
}
*************** restore_toc_entries_parallel(ArchiveHand
*** 3474,3499 ****
/*
* create a worker child to perform a restore step in parallel
*/
static thandle
! spawn_restore(RestoreArgs *args)
{
! thandle child;
!
! /* Ensure stdio state is quiesced before forking */
! fflush(NULL);
#ifndef WIN32
child = fork();
if (child == 0)
{
! /* in child process */
parallel_restore(args);
die_horribly(args->AH, modulename,
"parallel_restore should not return\n");
}
else if (child < 0)
{
! /* fork failed */
die_horribly(args->AH, modulename,
"could not create worker process: %s\n",
strerror(errno));
--- 3523,3546 ----
/*
* create a worker child to perform a restore step in parallel
*/
+ /*
static thandle
! spawn_restore(ParallelArgs *args)
{
! DispatchJobForTocEntry(args->AH, args->te);
#ifndef WIN32
child = fork();
if (child == 0)
{
! /+ in child process +/
parallel_restore(args);
die_horribly(args->AH, modulename,
"parallel_restore should not return\n");
}
else if (child < 0)
{
! /+ fork failed +/
die_horribly(args->AH, modulename,
"could not create worker process: %s\n",
strerror(errno));
*************** spawn_restore(RestoreArgs *args)
*** 3509,3589 ****
return child;
}
!
! /*
! * collect status from a completed worker child
! */
! static thandle
! reap_child(ParallelSlot *slots, int n_slots, int *work_status)
! {
! #ifndef WIN32
! /* Unix is so much easier ... */
! return wait(work_status);
! #else
! static HANDLE *handles = NULL;
! int hindex,
! snum,
! tnum;
! thandle ret_child;
! DWORD res;
!
! /* first time around only, make space for handles to listen on */
! if (handles == NULL)
! handles = (HANDLE *) calloc(sizeof(HANDLE), n_slots);
!
! /* set up list of handles to listen to */
! for (snum = 0, tnum = 0; snum < n_slots; snum++)
! if (slots[snum].child_id != 0)
! handles[tnum++] = slots[snum].child_id;
!
! /* wait for one to finish */
! hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
!
! /* get handle of finished thread */
! ret_child = handles[hindex - WAIT_OBJECT_0];
!
! /* get the result */
! GetExitCodeThread(ret_child, &res);
! *work_status = res;
!
! /* dispose of handle to stop leaks */
! CloseHandle(ret_child);
!
! return ret_child;
! #endif
! }
!
! /*
! * are we doing anything now?
! */
! static bool
! work_in_progress(ParallelSlot *slots, int n_slots)
! {
! int i;
!
! for (i = 0; i < n_slots; i++)
! {
! if (slots[i].child_id != 0)
! return true;
! }
! return false;
! }
!
! /*
! * find the first free parallel slot (if any).
! */
! static int
! get_next_slot(ParallelSlot *slots, int n_slots)
! {
! int i;
!
! for (i = 0; i < n_slots; i++)
! {
! if (slots[i].child_id == 0)
! return i;
! }
! return NO_SLOT;
! }
/*
--- 3556,3562 ----
return child;
}
! */
/*
*************** par_list_remove(TocEntry *te)
*** 3659,3665 ****
*/
static TocEntry *
get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! ParallelSlot *slots, int n_slots)
{
bool pref_non_data = false; /* or get from AH->ropt */
TocEntry *data_te = NULL;
--- 3632,3638 ----
*/
static TocEntry *
get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
! ParallelState *pstate)
{
bool pref_non_data = false; /* or get from AH->ropt */
TocEntry *data_te = NULL;
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3670,3684 ****
/*
* Bogus heuristics for pref_non_data
*/
if (pref_non_data)
{
int count = 0;
! for (k = 0; k < n_slots; k++)
! if (slots[k].args->te != NULL &&
! slots[k].args->te->section == SECTION_DATA)
count++;
! if (n_slots == 0 || count * 4 < n_slots)
pref_non_data = false;
}
--- 3643,3658 ----
/*
* Bogus heuristics for pref_non_data
*/
+ /* XXX */
if (pref_non_data)
{
int count = 0;
! for (k = 0; k < pstate->numWorkers; k++)
! if (pstate->parallelSlot[k].args->te != NULL &&
! pstate->parallelSlot[k].args->te->section == SECTION_DATA)
count++;
! if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
pref_non_data = false;
}
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3694,3710 ****
* that a currently running item also needs lock on, or vice versa. If
* so, we don't want to schedule them together.
*/
! for (i = 0; i < n_slots && !conflicts; i++)
{
TocEntry *running_te;
! if (slots[i].args == NULL)
continue;
! running_te = slots[i].args->te;
if (has_lock_conflicts(te, running_te) ||
has_lock_conflicts(running_te, te))
{
conflicts = true;
break;
}
--- 3668,3685 ----
* that a currently running item also needs lock on, or vice versa. If
* so, we don't want to schedule them together.
*/
! for (i = 0; i < pstate->numWorkers && !conflicts; i++)
{
TocEntry *running_te;
! if (pstate->parallelSlot[i].ChildStatus != CS_WORKING)
continue;
! running_te = pstate->parallelSlot[i].args->te;
if (has_lock_conflicts(te, running_te) ||
has_lock_conflicts(running_te, te))
{
+ printf("lock conflicts detected. %d (want to schedule) with %d (running). i: %d. status: %d!!!\n", te->dumpId, running_te->dumpId, i, pstate->parallelSlot[i].ChildStatus);
conflicts = true;
break;
}
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3738,3745 ****
* this is the procedure run as a thread (Windows) or a
* separate process (everything else).
*/
! static parallel_restore_result
! parallel_restore(RestoreArgs *args)
{
ArchiveHandle *AH = args->AH;
TocEntry *te = args->te;
--- 3713,3720 ----
* this is the procedure run as a thread (Windows) or a
* separate process (everything else).
*/
! parallel_restore_result
! parallel_restore(ParallelArgs *args)
{
ArchiveHandle *AH = args->AH;
TocEntry *te = args->te;
*************** parallel_restore(RestoreArgs *args)
*** 3759,3795 ****
(AH->ReopenPtr) (AH);
#ifndef WIN32
else
! (AH->ClosePtr) (AH);
#endif
- /*
- * We need our own database connection, too
- */
- ConnectDatabase((Archive *) AH, ropt->dbname,
- ropt->pghost, ropt->pgport, ropt->username,
- ropt->promptPassword);
-
_doSetFixedOutputState(AH);
/* Restore the TOC item */
retval = restore_toc_entry(AH, te, ropt, true);
/* And clean up */
- PQfinish(AH->connection);
- AH->connection = NULL;
/* If we reopened the file, we are done with it, so close it now */
if (te->section == SECTION_DATA)
(AH->ClosePtr) (AH);
if (retval == 0 && AH->public.n_errors)
retval = WORKER_IGNORED_ERRORS;
- #ifndef WIN32
- exit(retval);
- #else
return retval;
- #endif
}
--- 3734,3764 ----
(AH->ReopenPtr) (AH);
#ifndef WIN32
else
! {
! if (AH->FH)
! (AH->ClosePtr) (AH);
! }
#endif
_doSetFixedOutputState(AH);
+ Assert(AH->connection != NULL);
+
/* Restore the TOC item */
retval = restore_toc_entry(AH, te, ropt, true);
/* And clean up */
/* If we reopened the file, we are done with it, so close it now */
+ /* XXX
if (te->section == SECTION_DATA)
(AH->ClosePtr) (AH);
+ */
if (retval == 0 && AH->public.n_errors)
retval = WORKER_IGNORED_ERRORS;
return retval;
}
*************** parallel_restore(RestoreArgs *args)
*** 3801,3825 ****
*/
static void
mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! thandle worker, int status,
! ParallelSlot *slots, int n_slots)
{
TocEntry *te = NULL;
- int i;
-
- for (i = 0; i < n_slots; i++)
- {
- if (slots[i].child_id == worker)
- {
- slots[i].child_id = 0;
- te = slots[i].args->te;
- DeCloneArchive(slots[i].args->AH);
- free(slots[i].args);
- slots[i].args = NULL;
! break;
! }
! }
if (te == NULL)
die_horribly(AH, modulename, "could not find slot of finished worker\n");
--- 3770,3785 ----
*/
static void
mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! int worker, int status,
! ParallelState *pstate)
{
TocEntry *te = NULL;
! te = pstate->parallelSlot[worker].args->te;
! /* XXX */
! //DeCloneArchive(pstate->parallelSlot[worker].args->AH);
! //free(pstate->parallelSlot[worker].args);
! //pstate->parallelSlot[worker].args = NULL;
if (te == NULL)
die_horribly(AH, modulename, "could not find slot of finished worker\n");
*************** inhibit_data_for_failed_table(ArchiveHan
*** 4144,4153 ****
*
* Enough of the structure is cloned to ensure that there is no
* conflict between different threads each with their own clone.
- *
- * These could be public, but no need at present.
*/
! static ArchiveHandle *
CloneArchive(ArchiveHandle *AH)
{
ArchiveHandle *clone;
--- 4104,4111 ----
*
* Enough of the structure is cloned to ensure that there is no
* conflict between different threads each with their own clone.
*/
! ArchiveHandle *
CloneArchive(ArchiveHandle *AH)
{
ArchiveHandle *clone;
*************** CloneArchive(ArchiveHandle *AH)
*** 4188,4194 ****
*
* Note: we assume any clone-local connection was already closed.
*/
! static void
DeCloneArchive(ArchiveHandle *AH)
{
/* Clear format-specific state */
--- 4146,4152 ----
*
* Note: we assume any clone-local connection was already closed.
*/
! void
DeCloneArchive(ArchiveHandle *AH)
{
/* Clear format-specific state */
*************** DeCloneArchive(ArchiveHandle *AH)
*** 4212,4214 ****
--- 4170,4683 ----
free(AH);
}
+
+ ParallelState
+ ParallelBackupStart(ArchiveHandle *AH, int numWorkers, RestoreOptions *ropt)
+ {
+ ParallelState pstate;
+ int i;
+
+ /* Ensure stdio state is quiesced before forking */
+ fflush(NULL);
+
+ Assert(numWorkers > 0);
+
+ memset((void *) &pstate, 0, sizeof(ParallelState));
+
+ pstate.numWorkers = numWorkers;
+
+ if (numWorkers == 1)
+ return pstate;
+
+ pstate.pipeWorkerRead = (int *) malloc(numWorkers * sizeof(int));
+ pstate.pipeWorkerWrite = (int *) malloc(numWorkers * sizeof(int));
+ pstate.parallelSlot = (ParallelSlot *) malloc(numWorkers * sizeof(ParallelSlot));
+
+ for (i = 0; i < numWorkers; i++)
+ {
+ int pipeMW[2], pipeWM[2];
+ pid_t pid;
+
+ if (pipe(pipeMW) < 0 || pipe(pipeWM) < 0)
+ die_horribly(AH, modulename, "Cannot create communication channels: %s",
+ strerror(errno));
+ pid = fork();
+ if (pid == 0)
+ {
+ /* we are the worker */
+ close(pipeWM[0]); /* close read end of Worker -> Master */
+ close(pipeMW[1]); /* close write end of Master -> Worker */
+
+ free(pstate.pipeWorkerRead);
+ pstate.pipeWorkerRead = NULL;
+ free(pstate.pipeWorkerWrite);
+ pstate.pipeWorkerWrite = NULL;
+ free(pstate.parallelSlot);
+ pstate.parallelSlot = NULL;
+
+ if (ropt)
+ {
+ /*
+ * Restore mode - We need our own database connection, too
+ */
+ AH->connection = NULL;
+ printf("Connecting: Db: %s host %s port %s user %s\n", ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username);
+
+ ConnectDatabase((Archive *) AH, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->promptPassword);
+
+ g_conn = AH->connection;
+ }
+ else
+ {
+ /*
+ * Dump mode - The parent has opened our connection
+ */
+ if (g_conn_child)
+ g_conn = AH->connection = g_conn_child[i];
+ }
+
+ free(g_conn_child);
+ g_conn_child = NULL;
+
+ Assert(AH->connection != NULL);
+ Assert(g_conn != NULL);
+
+ /* the worker will never return from this function */
+ WaitForCommands(AH, pipeMW[0], pipeWM[1]);
+ }
+ else
+ {
+ /* we are the Master */
+ close(pipeWM[1]); /* close write end of Worker -> Master */
+ close(pipeMW[0]); /* close read end of Master -> Worker */
+
+ pstate.pipeWorkerRead[i] = pipeWM[0];
+ pstate.pipeWorkerWrite[i] = pipeMW[1];
+
+ pstate.parallelSlot[i].args = (ParallelArgs *) malloc(sizeof(ParallelArgs));
+ pstate.parallelSlot[i].args->AH = AH;
+ pstate.parallelSlot[i].args->te = NULL;
+ pstate.parallelSlot[i].ChildStatus = CS_IDLE;
+ }
+ }
+ return pstate;
+ }
+
+ void
+ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
+ {
+ int i;
+
+ if (pstate->numWorkers == 1)
+ return;
+
+ Assert(IsEveryChildIdle(pstate));
+ printf("Asking children to terminate\n");
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ int ret;
+ printf("Asking child %d to terminate\n", i);
+ ret = write(pstate->pipeWorkerWrite[i], "TERMINATE", strlen("TERMINATE") + 1);
+ pstate->parallelSlot[i].ChildStatus = CS_WORKING;
+ }
+
+ while (!HasEveryChildTerminated(pstate))
+ {
+ ListenToChildren(AH, pstate, true);
+ }
+
+ PrintStatus(pstate);
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ close(pstate->pipeWorkerRead[i]);
+ close(pstate->pipeWorkerWrite[i]);
+ }
+ }
+
+
+ /*
+ * The sequence is the following (for dump, similar for restore):
+ *
+ * Master Worker
+ *
+ * enters WaitForCommands()
+ * DispatchJobForTocEntry(...te...)
+ *
+ * [ Worker is IDLE ]
+ *
+ * arg = (StartMasterParallelPtr)()
+ * send: DUMP arg
+ * receive: DUMP arg
+ * str = (WorkerJobDumpPtr)(arg)
+ * [ Worker is WORKING ] ... gets te from arg ...
+ * ... dump te ...
+ * send: OK DUMP info
+ *
+ * In ListenToChildren():
+ *
+ * [ Worker is FINISHED ]
+ * receive: OK DUMP info
+ * status = (EndMasterParallelPtr)(info)
+ *
+ * In ReapChildStatus(&ptr):
+ * *ptr = status;
+ * [ Worker is IDLE ]
+ */
+
+ void
+ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
+ T_Action act)
+ {
+ int worker;
+ char *arg;
+ int len;
+
+ Assert(GetIdleChild(pstate) != NO_SLOT);
+
+ /* our caller must make sure that at least one child is idle */
+ worker = GetIdleChild(pstate);
+ Assert(worker != NO_SLOT);
+
+ arg = (AH->StartMasterParallelPtr)(AH, te, act);
+ len = strlen(arg) + 1;
+ if (write(pstate->pipeWorkerWrite[worker], arg, len) != len)
+ die_horribly(AH, modulename,
+ "Error writing to the communication channel: %s",
+ strerror(errno));
+ pstate->parallelSlot[worker].ChildStatus = CS_WORKING;
+ pstate->parallelSlot[worker].args->te = te;
+ PrintStatus(pstate);
+ }
+
+
+ static void
+ PrintStatus(ParallelState *pstate)
+ {
+ int i;
+ printf("------Status------\n");
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ printf("Status of child %d: ", i);
+ switch (pstate->parallelSlot[i].ChildStatus)
+ {
+ case CS_IDLE:
+ printf("IDLE");
+ break;
+ case CS_WORKING:
+ printf("WORKING");
+ break;
+ case CS_FINISHED:
+ printf("FINISHED");
+ break;
+ case CS_TERMINATED:
+ printf("TERMINATED");
+ break;
+ }
+ printf("\n");
+ }
+ printf("------------\n");
+ }
+
+
+ /*
+ * find the first free parallel slot (if any).
+ */
+ static int
+ GetIdleChild(ParallelState *pstate)
+ {
+ int i;
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].ChildStatus == CS_IDLE)
+ return i;
+ }
+ return NO_SLOT;
+ }
+
+ static bool
+ HasEveryChildTerminated(ParallelState *pstate)
+ {
+ int i;
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].ChildStatus != CS_TERMINATED)
+ return false;
+ }
+ return true;
+ }
+
+ static bool
+ IsEveryChildIdle(ParallelState *pstate)
+ {
+ int i;
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].ChildStatus != CS_IDLE)
+ return false;
+ }
+ return true;
+ }
+
+ static char *
+ readMessageFromPipe(int fd, bool allowBlock)
+ {
+ static char *buf;
+ static int bufsize = 0;
+ char *msg;
+ int msgsize;
+ int ret;
+ int flags;
+
+ /*
+ * The problem here is that we need to deal with several possibilites:
+ * we could receive only a partial message or several messages at once.
+ * The caller expects us to return exactly one message however.
+ *
+ * We could either read in as much as we can and keep track of what we
+ * delivered back to the caller or we just read byte by byte. Once we see
+ * (char) 0, we know that it's the message's end. This is quite inefficient
+ * but since we are reading only on the command channel, the performance
+ * loss does not seem worth the trouble of keeping internal states for
+ * different file descriptors.
+ */
+
+ if (bufsize == 0)
+ {
+ buf = (char *) malloc(1);
+ bufsize = 1;
+ }
+
+ msg = buf;
+ msgsize = 0;
+
+
+ for (;;)
+ {
+ if (msgsize == 0 && !allowBlock)
+ {
+ flags = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ }
+
+ ret = read(fd, msg + msgsize, 1);
+
+ if (msgsize == 0 && !allowBlock)
+ {
+ int saved_errno = errno;
+ fcntl(fd, F_SETFL, flags);
+ if (ret < 0 && saved_errno == EAGAIN)
+ return NULL;
+ }
+
+ if (ret == 0)
+ {
+ /* child has closed the connection */
+ write_msg(NULL, "the communication partner died\n");
+ exit(1);
+ }
+ if (ret < 0)
+ {
+ write_msg(NULL, "error reading from communication partner: %s\n",
+ strerror(errno));
+ exit(1);
+ }
+
+ if (msg[msgsize] == '\0')
+ return msg;
+
+ msgsize++;
+ if (msgsize == bufsize)
+ {
+ bufsize += 10;
+ buf = (char *) realloc(buf, bufsize);
+ msg = buf;
+ }
+ }
+ }
+
+
+ #define messageStartsWith(msg, prefix) \
+ (strncmp(msg, prefix, strlen(prefix)) == 0)
+ #define messageEquals(msg, pattern) \
+ (strcmp(msg, pattern) == 0)
+ static void
+ WaitForCommands(ArchiveHandle *AH, int rfd, int wfd)
+ {
+ char *command;
+ char *str = NULL;
+ int len;
+ bool shouldExit = false;
+
+ for(;;)
+ {
+ command = readMessageFromPipe(rfd, true);
+ printf("Read command: %s in pid %d\n", command, getpid());
+ fflush(stdout);
+ if (messageStartsWith(command, "DUMP "))
+ {
+ Assert(AH->format == archDirectory);
+
+ str = (AH->WorkerJobDumpPtr)(AH, command + strlen("DUMP "));
+ }
+ else if (messageStartsWith(command, "RESTORE "))
+ {
+ Assert(AH->format == archDirectory || AH->format == archCustom);
+ Assert(AH->connection != NULL);
+
+ str = (AH->WorkerJobRestorePtr)(AH, command + strlen("RESTORE "));
+
+ Assert(AH->connection != NULL);
+ }
+ else if (messageEquals(command, "TERMINATE"))
+ {
+ printf("Terminating in %d\n", getpid());
+ PQfinish(AH->connection);
+ close(rfd);
+ str = "TERMINATE OK";
+ shouldExit = true;
+ }
+ else
+ {
+ die_horribly(AH, modulename,
+ "Unknown command on communication channel: %s", command);
+ }
+ len = strlen(str) + 1;
+ if (write(wfd, str, len) != len)
+ die_horribly(AH, modulename,
+ "Error writing to the communication channel: %s",
+ strerror(errno));
+ if (shouldExit)
+ {
+ close(wfd);
+ exit(0);
+ }
+ }
+ }
+
+
+ /*
+ * Note the status change:
+ *
+ * DispatchJobForTocEntry CS_IDLE -> CS_WORKING
+ * ListenToChildren CS_WORKING -> CS_FINISHED / CS_TERMINATED
+ * ReapChildStatus CS_FINISHED -> CS_IDLE
+ *
+ * Just calling ReapChildStatus() when all children are working might or might
+ * not give you an idle child because you need to call ListenToChildren() in
+ * between and only thereafter ReapChildStatus(). This is necessary in order to
+ * get and deal with the status (=result) of the child's execution.
+ */
+ static void
+ ListenToChildren(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
+ {
+ int i;
+ fd_set childset;
+ int maxFd = -1;
+ struct timeval nowait = { 0, 0 };
+
+ FD_ZERO(&childset);
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].ChildStatus == CS_TERMINATED)
+ continue;
+ FD_SET(pstate->pipeWorkerRead[i], &childset);
+ if (pstate->pipeWorkerRead[i] > maxFd)
+ maxFd = pstate->pipeWorkerRead[i];
+ }
+
+ if (do_wait)
+ {
+ i = select(maxFd + 1, &childset, NULL, NULL, NULL); /* no timeout */
+ Assert(i != 0);
+ }
+ else
+ {
+ if ((i = select(maxFd + 1, &childset, NULL, NULL, &nowait)) == 0)
+ return;
+ }
+
+ if (i < 0)
+ {
+ /* XXX Could there be a valid signal like SIGINT ? */
+ write_msg(NULL, "Error in ListenToChildren(): %s", strerror(errno));
+ exit(1);
+ }
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ char *msg;
+
+ if (!FD_ISSET(pstate->pipeWorkerRead[i], &childset))
+ continue;
+
+ while ((msg = readMessageFromPipe(pstate->pipeWorkerRead[i], false)))
+ {
+ if (messageStartsWith(msg, "OK "))
+ {
+ char *statusString;
+ TocEntry *te;
+
+ printf("Got OK with information from child %d (%s)\n", i, msg);
+
+ pstate->parallelSlot[i].ChildStatus = CS_FINISHED;
+ te = pstate->parallelSlot[i].args->te;
+ if (messageStartsWith(msg, "OK RESTORE "))
+ {
+ statusString = msg + strlen("OK RESTORE ");
+ pstate->parallelSlot[i].status =
+ (AH->EndMasterParallelPtr)
+ (AH, te, statusString, ACT_RESTORE);
+ }
+ else if (messageStartsWith(msg, "OK DUMP "))
+ {
+ statusString = msg + strlen("OK DUMP ");
+ pstate->parallelSlot[i].status =
+ (AH->EndMasterParallelPtr)
+ (AH, te, statusString, ACT_DUMP);
+ }
+ else
+ die_horribly(AH, modulename, "Invalid message received from child: %s", msg);
+ }
+ else if (messageStartsWith(msg, "TERMINATE OK"))
+ {
+ /* this child is idle again */
+ printf("Child %d has terminated\n", i);
+ pstate->parallelSlot[i].ChildStatus = CS_TERMINATED;
+ pstate->parallelSlot[i].status = 0;
+ /* do not read again from this fd, it will fail. */
+ break;
+ }
+ else
+ {
+ die_horribly(AH, modulename, "Invalid message received from child: %s", msg);
+ }
+ PrintStatus(pstate);
+ }
+ }
+ }
+
+ static int
+ ReapChildStatus(ParallelState *pstate, int *status)
+ {
+ int i;
+
+ for (i = 0; i < pstate->numWorkers; i++)
+ {
+ if (pstate->parallelSlot[i].ChildStatus == CS_FINISHED)
+ {
+ *status = pstate->parallelSlot[i].status;
+ pstate->parallelSlot[i].status = 0;
+ pstate->parallelSlot[i].ChildStatus = CS_IDLE;
+ PrintStatus(pstate);
+ return i;
+ }
+ }
+ return NO_SLOT;
+ }
+
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 9eb9f6f..62274f6 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef z_stream *z_streamp;
*** 112,117 ****
--- 112,119 ----
struct _archiveHandle;
struct _tocEntry;
struct _restoreList;
+ enum _teReqs;
+ enum _action;
typedef enum
{
*************** typedef void (*PrintExtraTocPtr) (struct
*** 144,149 ****
--- 146,155 ----
typedef void (*PrintTocDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te, RestoreOptions *ropt);
typedef void (*PrintExtraTocSummaryPtr) (struct _archiveHandle * AH);
+ /* XXX order similar to below */
+ typedef char *(*WorkerJobRestorePtr)(struct _archiveHandle * AH, const char *args);
+ typedef char *(*WorkerJobDumpPtr)(struct _archiveHandle * AH, const char *args);
+
typedef void (*ClonePtr) (struct _archiveHandle * AH);
typedef void (*DeClonePtr) (struct _archiveHandle * AH);
*************** typedef bool (*StartCheckArchivePtr)(str
*** 151,156 ****
--- 157,166 ----
typedef bool (*CheckTocEntryPtr)(struct _archiveHandle * AH, struct _tocEntry * te, teReqs reqs);
typedef bool (*EndCheckArchivePtr)(struct _archiveHandle * AH);
+ typedef struct _parallel_state *(*GetParallelStatePtr)(struct _archiveHandle * AH);
+ typedef char *(*StartMasterParallelPtr)(struct _archiveHandle * AH, struct _tocEntry * te, enum _action act);
+ typedef int (*EndMasterParallelPtr)(struct _archiveHandle * AH, struct _tocEntry * te, const char *str, enum _action act);
+
typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len);
typedef struct _outputContext
*************** typedef struct
*** 181,187 ****
int minTagEndPos; /* first possible end position of $-quote */
} sqlparseInfo;
! typedef enum
{
STAGE_NONE = 0,
STAGE_INITIALIZING,
--- 191,197 ----
int minTagEndPos; /* first possible end position of $-quote */
} sqlparseInfo;
! typedef enum _teReqs
{
STAGE_NONE = 0,
STAGE_INITIALIZING,
*************** typedef struct _archiveHandle
*** 251,256 ****
--- 261,273 ----
StartBlobPtr StartBlobPtr;
EndBlobPtr EndBlobPtr;
+ StartMasterParallelPtr StartMasterParallelPtr;
+ EndMasterParallelPtr EndMasterParallelPtr;
+
+ GetParallelStatePtr GetParallelStatePtr;
+ WorkerJobDumpPtr WorkerJobDumpPtr;
+ WorkerJobRestorePtr WorkerJobRestorePtr;
+
ClonePtr ClonePtr; /* Clone format-specific fields */
DeClonePtr DeClonePtr; /* Clean up cloned fields */
*************** typedef struct _tocEntry
*** 350,355 ****
--- 367,439 ----
int nLockDeps; /* number of such dependencies */
} TocEntry;
+ /* IDs for worker children are either PIDs or thread handles */
+ #ifndef WIN32
+ #define thandle pid_t
+ #else
+ #define thandle HANDLE
+ #endif
+
+ typedef enum
+ {
+ /* XXX move */
+ CS_IDLE,
+ CS_WORKING,
+ CS_FINISHED,
+ CS_TERMINATED
+ } T_ChildStatus;
+
+ typedef enum _action
+ {
+ ACT_DUMP,
+ ACT_RESTORE,
+ } T_Action;
+
+ /* Arguments needed for a worker child */
+ typedef struct _parallel_args
+ {
+ ArchiveHandle *AH;
+ TocEntry *te;
+ } ParallelArgs;
+
+ /* State for each parallel activity slot */
+ typedef struct _parallel_slot
+ {
+ thandle child_id;
+ ParallelArgs *args;
+ T_ChildStatus ChildStatus;
+ int status;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)
+
+ typedef struct _parallel_state
+ {
+ int numWorkers;
+ int *pipeWorkerRead;
+ int *pipeWorkerWrite;
+ ParallelSlot *parallelSlot;
+ } ParallelState;
+
+ /*
+ * Unix uses exit to return result from worker child, so function is void.
+ * Windows thread result comes via function return.
+ */
+ #ifndef WIN32
+ #define parallel_restore_result int
+ #else
+ #define parallel_restore_result DWORD
+ #endif
+
+ parallel_restore_result parallel_restore(ParallelArgs *args);
+
+ ParallelState ParallelBackupStart(ArchiveHandle *AH, int numWorker, RestoreOptions *ropt);
+ void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);
+ void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act);
+ void WaitForAllChildren(ArchiveHandle *AH, ParallelState *pstate);
+
+
+
/* Used everywhere */
extern const char *progname;
*************** extern void ReadHead(ArchiveHandle *AH);
*** 364,369 ****
--- 448,457 ----
extern void WriteToc(ArchiveHandle *AH);
extern void ReadToc(ArchiveHandle *AH);
extern void WriteDataChunks(ArchiveHandle *AH);
+ extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te);
+
+ extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
+ extern void DeCloneArchive(ArchiveHandle *AH);
extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id, RestoreOptions *ropt);
extern bool checkSeek(FILE *fp);
*************** extern void InitArchiveFmt_Files(Archive
*** 397,402 ****
--- 485,492 ----
extern void InitArchiveFmt_Null(ArchiveHandle *AH);
extern void InitArchiveFmt_Tar(ArchiveHandle *AH);
+ extern void setupArchDirectory(ArchiveHandle *AH, int numWorkers);
+
extern bool isValidTarHeader(char *header);
extern int ReconnectToServer(ArchiveHandle *AH, const char *dbname, const char *newUser);
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index ccc9acb..57aae6d 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** static void _DeClone(ArchiveHandle *AH);
*** 62,67 ****
--- 62,73 ----
static size_t _CustomWriteFunc(ArchiveHandle *AH, const void *buf, size_t len);
static size_t _CustomReadFunction(ArchiveHandle *AH, void **buf, size_t sizeHint);
+ static char *_StartMasterParallel(ArchiveHandle *AH, TocEntry *te, T_Action act);
+ static int _EndMasterParallel(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+
+ char *_WorkerJobRestoreCustom(ArchiveHandle *AH, const char *args);
+
+
typedef struct
{
CompressorState *cs;
*************** InitArchiveFmt_Custom(ArchiveHandle *AH)
*** 124,135 ****
AH->PrintExtraTocSummaryPtr = NULL;
AH->StartBlobsPtr = _StartBlobs;
AH->StartBlobPtr = _StartBlob;
AH->EndBlobPtr = _EndBlob;
! AH->EndBlobsPtr = _EndBlobs;
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
AH->StartCheckArchivePtr = NULL;
AH->CheckTocEntryPtr = NULL;
AH->EndCheckArchivePtr = NULL;
--- 130,150 ----
AH->PrintExtraTocSummaryPtr = NULL;
AH->StartBlobsPtr = _StartBlobs;
+ AH->EndBlobsPtr = _EndBlobs;
AH->StartBlobPtr = _StartBlob;
AH->EndBlobPtr = _EndBlob;
!
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
+ AH->StartMasterParallelPtr = _StartMasterParallel;
+ AH->EndMasterParallelPtr = _EndMasterParallel;
+
+ AH->GetParallelStatePtr = NULL;
+ /* no parallel dump in the custom archive */
+ AH->WorkerJobDumpPtr = NULL;
+ AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+
AH->StartCheckArchivePtr = NULL;
AH->CheckTocEntryPtr = NULL;
AH->EndCheckArchivePtr = NULL;
*************** _DeClone(ArchiveHandle *AH)
*** 960,962 ****
--- 975,1049 ----
free(ctx);
}
+ char *
+ _WorkerJobRestoreCustom(ArchiveHandle *AH, const char *args)
+ {
+ static char buf[64]; /* short string + some ID so far */
+ ParallelArgs pargs;
+ int ret;
+ lclTocEntry *tctx;
+ TocEntry *te;
+ DumpId dumpId = InvalidDumpId;
+ int nBytes, nTok;
+
+ nTok = sscanf(args, "%d%n", &dumpId, &nBytes);
+ Assert(nBytes == strlen(args));
+ Assert(nTok == 1);
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ if (te->dumpId == dumpId)
+ break;
+
+ Assert(dumpId != InvalidDumpId);
+
+ tctx = (lclTocEntry *) te->formatData;
+
+ pargs.AH = AH;
+ pargs.te = te;
+
+ /* parallel_restore() will reconnect and establish the restore
+ * connection */
+ //AH->connection = NULL;
+
+ ret = parallel_restore(&pargs);
+
+ tctx->restore_status = ret;
+
+ /* XXX handle failure */
+ snprintf(buf, sizeof(buf), "OK RESTORE %d", te->dumpId);
+
+ return buf;
+ }
+
+ static char *
+ _StartMasterParallel(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ static char buf[32]; /* short string + number */
+
+ /* no parallel dump in the custom archive */
+ Assert(act == ACT_RESTORE);
+
+ snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+
+ return buf;
+ }
+
+ static int
+ _EndMasterParallel(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ DumpId dumpId;
+ int nBytes;
+ int nTok;
+
+ /* no parallel dump in the custom archive */
+ Assert(act == ACT_RESTORE);
+
+ nTok = sscanf(str, "%u%n", &dumpId, &nBytes);
+
+ Assert(nBytes == strlen(str));
+ Assert(nTok == 1);
+ Assert(dumpId == te->dumpId);
+
+ return 0;
+ }
+
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 1da57b3..b0676b4 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** static int _ReadByte(ArchiveHandle *);
*** 50,55 ****
--- 50,56 ----
static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
*************** static void _StartBlob(ArchiveHandle *AH
*** 68,77 ****
--- 69,90 ----
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
+ static void _Clone(ArchiveHandle *AH);
+ static void _DeClone(ArchiveHandle *AH);
+ /* XXX Name consistently. Archiveformat at the beginning or end of the name */
static size_t _DirectoryReadFunction(ArchiveHandle *AH, void **buf, size_t sizeHint);
+ static char *_StartMasterParallel(ArchiveHandle *AH, TocEntry *te, T_Action act);
+ static int _EndMasterParallel(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+
+ static ParallelState *_GetParallelState(ArchiveHandle *AH);
+
+ /* XXX order */
+ static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, const char *args);
+ static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, const char *args);
static bool _StartCheckArchive(ArchiveHandle *AH);
+ static bool _CheckDirectory(ArchiveHandle *AH, const char *dname, bool *tocSeen);
static bool _CheckTocEntry(ArchiveHandle *AH, TocEntry *te, teReqs reqs);
static bool _CheckFileContents(ArchiveHandle *AH, const char *fname, const char* idStr, bool terminateOnError);
static bool _CheckFileSize(ArchiveHandle *AH, const char *fname, pgoff_t pgSize, bool terminateOnError);
*************** static bool _CheckBlob(ArchiveHandle *AH
*** 79,95 ****
static bool _CheckBlobs(ArchiveHandle *AH, TocEntry *te, teReqs reqs);
static bool _EndCheckArchive(ArchiveHandle *AH);
! static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename);
! static char *prependBlobsDirectory(ArchiveHandle *AH, Oid oid);
! static void createDirectory(const char *dir, const char *subdir);
static char *getRandomData(char *s, int len);
static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te);
static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te);
! static bool isDirectory(const char *fname);
! static bool isRegularFile(const char *fname);
#define K_STD_BUF_SIZE 1024
#define FILE_SUFFIX ".dat"
--- 92,108 ----
static bool _CheckBlobs(ArchiveHandle *AH, TocEntry *te, teReqs reqs);
static bool _EndCheckArchive(ArchiveHandle *AH);
! static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename, int directoryIndex);
! static char *prependBlobsDirectory(ArchiveHandle *AH, Oid oid, int directoryIndex);
! static void createDirectoryGroup(char **dirs, int nDir, const char *subdir);
static char *getRandomData(char *s, int len);
static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te);
static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te);
! static bool isDirectory(const char *dname, const char *fname);
! static bool isRegularFile(const char *dname, const char *fname);
#define K_STD_BUF_SIZE 1024
#define FILE_SUFFIX ".dat"
*************** typedef struct _lclContext
*** 98,106 ****
{
/*
* Our archive location. This is basically what the user specified as his
! * backup file but of course here it is a directory.
*/
! char *directory;
/*
* As a directory archive contains of several files we want to make sure
--- 111,120 ----
{
/*
* Our archive location. This is basically what the user specified as his
! * backup file but of course here it is one or several director(y|ies).
*/
! char **directories;
! int numDirectories;
/*
* As a directory archive contains of several files we want to make sure
*************** typedef struct _lclContext
*** 145,150 ****
--- 159,170 ----
DumpId *chkList;
int chkListSize;
+ /* this is for a parallel backup or restore */
+ int *directoryUsage; /* only used in the master */
+ ParallelState pstate;
+ int numWorkers;
+ bool is_parallel_child;
+
CompressorState *cs;
} lclContext;
*************** typedef struct
*** 152,159 ****
--- 172,187 ----
{
char *filename; /* filename excluding the directory (basename) */
pgoff_t fileSize;
+ int restore_status;
+ int directoryIndex;
} lclTocEntry;
+ static void splitDirectories(const char *spec, lclContext *ctx);
+ static int assignDirectory(lclContext *ctx);
+ static void unassignDirectory(lclContext *ctx, lclTocEntry *tctx);
+
+
+
typedef struct _lclFileHeader
{
int version;
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 188,194 ****
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
! AH->ReopenPtr = NULL;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
--- 216,222 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
! AH->ReopenPtr = _ReopenArchive;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 200,207 ****
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
! AH->ClonePtr = NULL;
! AH->DeClonePtr = NULL;
AH->StartCheckArchivePtr = _StartCheckArchive;
AH->CheckTocEntryPtr = _CheckTocEntry;
--- 228,242 ----
AH->EndBlobPtr = _EndBlob;
AH->EndBlobsPtr = _EndBlobs;
! AH->ClonePtr = _Clone;
! AH->DeClonePtr = _DeClone;
!
! AH->GetParallelStatePtr = _GetParallelState;
! AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
! AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
!
! AH->StartMasterParallelPtr = _StartMasterParallel;
! AH->EndMasterParallelPtr = _EndMasterParallel;
AH->StartCheckArchivePtr = _StartCheckArchive;
AH->CheckTocEntryPtr = _CheckTocEntry;
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 225,230 ****
--- 260,270 ----
if (AH->lo_buf == NULL)
die_horribly(AH, modulename, "out of memory\n");
+ ctx->directories = NULL;
+ ctx->numDirectories = 0;
+ ctx->directoryUsage = NULL;
+ ctx->is_parallel_child = false;
+
/*
* Now open the TOC file
*/
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 232,242 ****
if (!AH->fSpec || strcmp(AH->fSpec, "") == 0)
die_horribly(AH, modulename, "no directory specified\n");
! ctx->directory = AH->fSpec;
if (AH->mode == archModeWrite)
{
! char *fname = prependDirectory(AH, "TOC");
char buf[256];
/*
--- 272,283 ----
if (!AH->fSpec || strcmp(AH->fSpec, "") == 0)
die_horribly(AH, modulename, "no directory specified\n");
! /* Create the directory/directories */
! splitDirectories(AH->fSpec, ctx);
if (AH->mode == archModeWrite)
{
! char *fname = prependDirectory(AH, "TOC", 0);
char buf[256];
/*
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 245,254 ****
*/
getRandomData(buf, sizeof(buf));
if (!pg_md5_hash(buf, strlen(buf), ctx->idStr))
! die_horribly(AH, modulename, "Error computing checksum");
! /* Create the directory, errors are caught there */
! createDirectory(ctx->directory, NULL);
ctx->cs = AllocateCompressorState(AH);
--- 286,295 ----
*/
getRandomData(buf, sizeof(buf));
if (!pg_md5_hash(buf, strlen(buf), ctx->idStr))
! die_horribly(AH, modulename, "Error computing checksum\n");
! /* Create the directories, errors are caught there */
! createDirectoryGroup(ctx->directories, ctx->numDirectories, NULL);
ctx->cs = AllocateCompressorState(AH);
*************** InitArchiveFmt_Directory(ArchiveHandle *
*** 260,267 ****
else
{ /* Read Mode */
char *fname;
! fname = prependDirectory(AH, "TOC");
AH->FH = fopen(fname, PG_BINARY_R);
if (AH->FH == NULL)
--- 301,324 ----
else
{ /* Read Mode */
char *fname;
+ int i;
+ struct stat st;
! /* check the directories. As we are in read mode, they need to exist */
! for (i = 0; i < ctx->numDirectories; i++)
! {
! if (stat(ctx->directories[i], &st) != 0)
! die_horribly(NULL, modulename,
! "invalid input directory specified, cannot stat \"%s\": %s\n",
! ctx->directories[i], strerror(errno));
!
! if (!S_ISDIR(st.st_mode))
! die_horribly(NULL, modulename,
! "invalid input directory specified, \"%s\" is not a directory\n",
! ctx->directories[i]);
! }
!
! fname = prependDirectory(AH, "TOC", -1);
AH->FH = fopen(fname, PG_BINARY_R);
if (AH->FH == NULL)
*************** _StartData(ArchiveHandle *AH, TocEntry *
*** 423,429 ****
lclContext *ctx = (lclContext *) AH->formatData;
char *fname;
! fname = prependDirectory(AH, tctx->filename);
ctx->dataFH = (FILE *) fopen(fname, PG_BINARY_W);
if (ctx->dataFH == NULL)
--- 480,500 ----
lclContext *ctx = (lclContext *) AH->formatData;
char *fname;
! /*
! * If we are running in parallel mode, the master controls the directory
! * usage. Then the directory is already assigned.
! * If not (i.e. we are running with only one process), we assign it from
! * here.
! */
! if (ctx->is_parallel_child)
! {
! Assert(ctx->directoryUsage == NULL);
! Assert(tctx->directoryIndex >= 0);
! }
! else
! tctx->directoryIndex = assignDirectory(ctx);
!
! fname = prependDirectory(AH, tctx->filename, tctx->directoryIndex);
ctx->dataFH = (FILE *) fopen(fname, PG_BINARY_W);
if (ctx->dataFH == NULL)
*************** _EndData(ArchiveHandle *AH, TocEntry *te
*** 544,549 ****
--- 615,622 ----
tctx->fileSize = ctx->dataFilePos;
ctx->dataFH = NULL;
+ if (!ctx->is_parallel_child)
+ unassignDirectory(ctx, tctx);
}
/*
*************** _PrintTocData(ArchiveHandle *AH, TocEntr
*** 596,602 ****
_LoadBlobs(AH, ropt);
else
{
! char *fname = prependDirectory(AH, tctx->filename);
_PrintFileData(AH, fname, tctx->fileSize, ropt);
}
}
--- 669,675 ----
_LoadBlobs(AH, ropt);
else
{
! char *fname = prependDirectory(AH, tctx->filename, -1);
_PrintFileData(AH, fname, tctx->fileSize, ropt);
}
}
*************** _LoadBlobs(ArchiveHandle *AH, RestoreOpt
*** 611,618 ****
StartRestoreBlobs(AH);
! fname = prependDirectory(AH, "BLOBS.TOC");
!
ctx->blobsTocFH = fopen(fname, "rb");
if (ctx->blobsTocFH == NULL)
--- 684,690 ----
StartRestoreBlobs(AH);
! fname = prependDirectory(AH, "BLOBS.TOC", -1);
ctx->blobsTocFH = fopen(fname, "rb");
if (ctx->blobsTocFH == NULL)
*************** _LoadBlobs(ArchiveHandle *AH, RestoreOpt
*** 635,641 ****
ReadOffset(AH, &blobSize);
StartRestoreBlob(AH, oid, ropt->dropSchema);
! blobFname = prependBlobsDirectory(AH, oid);
_PrintFileData(AH, blobFname, blobSize, ropt);
EndRestoreBlob(AH, oid);
}
--- 707,713 ----
ReadOffset(AH, &blobSize);
StartRestoreBlob(AH, oid, ropt->dropSchema);
! blobFname = prependBlobsDirectory(AH, oid, -1);
_PrintFileData(AH, blobFname, blobSize, ropt);
EndRestoreBlob(AH, oid);
}
*************** _CloseArchive(ArchiveHandle *AH)
*** 813,836 ****
{
if (AH->mode == archModeWrite)
{
- #ifdef USE_ASSERT_CHECKING
lclContext *ctx = (lclContext *) AH->formatData;
! #endif
WriteDataChunks(AH);
Assert(TOC_FH_ACTIVE);
-
WriteHead(AH);
_WriteExtraHead(AH);
WriteToc(AH);
if (fclose(AH->FH) != 0)
die_horribly(AH, modulename, "could not close TOC file: %s\n", strerror(errno));
}
AH->FH = NULL;
}
/*
--- 885,921 ----
{
if (AH->mode == archModeWrite)
{
lclContext *ctx = (lclContext *) AH->formatData;
!
! /* this will actually fork the processes */
! ctx->pstate = ParallelBackupStart(AH, ctx->numWorkers, NULL);
WriteDataChunks(AH);
Assert(TOC_FH_ACTIVE);
WriteHead(AH);
_WriteExtraHead(AH);
WriteToc(AH);
+ ParallelBackupEnd(AH, &ctx->pstate);
+
if (fclose(AH->FH) != 0)
die_horribly(AH, modulename, "could not close TOC file: %s\n", strerror(errno));
}
AH->FH = NULL;
}
+ /*
+ * Reopen the archive's file handle.
+ */
+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ /*
+ * our TOC is in memory, our data files are opened by each child anyway as
+ * they are separate. We support reopening the archive by just doing nothing
+ */
+ }
/*
*************** _CloseArchive(ArchiveHandle *AH)
*** 849,859 ****
static void
_StartBlobs(ArchiveHandle *AH, TocEntry *te)
{
! lclContext *ctx = (lclContext *) AH->formatData;
! char *fname;
! fname = prependDirectory(AH, "BLOBS.TOC");
! createDirectory(ctx->directory, "blobs");
ctx->blobsTocFH = fopen(fname, "ab");
if (ctx->blobsTocFH == NULL)
--- 934,950 ----
static void
_StartBlobs(ArchiveHandle *AH, TocEntry *te)
{
! lclContext *ctx = (lclContext *) AH->formatData;
! lclTocEntry *tctx = (lclTocEntry *) te->formatData;
! char *fname;
! /* XXX see comment in StartData */
! if (!ctx->is_parallel_child)
! tctx->directoryIndex = assignDirectory(ctx);
!
! fname = prependDirectory(AH, "BLOBS.TOC", 0);
! /* XXX could also create only one blobs dir */
! createDirectoryGroup(ctx->directories, ctx->numDirectories, "blobs");
ctx->blobsTocFH = fopen(fname, "ab");
if (ctx->blobsTocFH == NULL)
*************** static void
*** 878,886 ****
_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
lclContext *ctx = (lclContext *) AH->formatData;
char *fname;
! fname = prependBlobsDirectory(AH, oid);
ctx->dataFH = (FILE *) fopen(fname, PG_BINARY_W);
if (ctx->dataFH == NULL)
--- 969,978 ----
_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
char *fname;
! fname = prependBlobsDirectory(AH, oid, tctx->directoryIndex);
ctx->dataFH = (FILE *) fopen(fname, PG_BINARY_W);
if (ctx->dataFH == NULL)
*************** _EndBlobs(ArchiveHandle *AH, TocEntry *t
*** 943,948 ****
--- 1035,1043 ----
ctx->blobsTocFH = NULL;
tctx->fileSize = ctx->blobsTocFilePos;
+
+ if (!ctx->is_parallel_child)
+ unassignDirectory(ctx, tctx);
}
/*
*************** _StartCheckArchive(ArchiveHandle *AH)
*** 965,976 ****
{
bool checkOK = true;
lclContext *ctx = (lclContext *) AH->formatData;
DIR *dir;
- char *dname = ctx->directory;
struct dirent *entry;
int idx = 0;
char *suffix;
- bool tocSeen = false;
dir = opendir(dname);
if (!dir)
--- 1060,1090 ----
{
bool checkOK = true;
lclContext *ctx = (lclContext *) AH->formatData;
+ int i;
+ bool tocSeen = false;
+
+ for (i = 0; i < ctx->numDirectories; i++)
+ {
+ Assert(ctx->directories[i] != NULL);
+ checkOK |= _CheckDirectory(AH, ctx->directories[i], &tocSeen);
+ }
+
+ if (!tocSeen)
+ printf("Could not locate the TOC file of the archive\n");
+
+ /* also return false if we haven't seen the TOC file */
+ return checkOK && tocSeen;
+ }
+
+ static bool
+ _CheckDirectory(ArchiveHandle *AH, const char *dname, bool *tocSeen)
+ {
+ bool checkOK = true;
+ lclContext *ctx = (lclContext *) AH->formatData;
DIR *dir;
struct dirent *entry;
int idx = 0;
char *suffix;
dir = opendir(dname);
if (!dir)
*************** _StartCheckArchive(ArchiveHandle *AH)
*** 1018,1033 ****
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
continue;
! if (strcmp(entry->d_name, "blobs") == 0 &&
! isDirectory(prependDirectory(AH, entry->d_name)))
continue;
! if (strcmp(entry->d_name, "BLOBS.TOC") == 0 &&
! isRegularFile(prependDirectory(AH, entry->d_name)))
continue;
! if (strcmp(entry->d_name, "TOC") == 0 &&
! isRegularFile(prependDirectory(AH, entry->d_name)))
{
! tocSeen = true;
continue;
}
/* besides the above we only expect nnnn.dat, with nnnn being our numerical dumpID */
--- 1132,1145 ----
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
continue;
! /* unfortunately Solaris doesn't have entry->d_type, so we can't use that */
! if (strcmp(entry->d_name, "blobs") == 0 && isDirectory(dname, entry->d_name))
continue;
! if (strcmp(entry->d_name, "BLOBS.TOC") == 0 && isRegularFile(dname, entry->d_name))
continue;
! if (strcmp(entry->d_name, "TOC") == 0 && isRegularFile(dname, entry->d_name))
{
! *tocSeen = true;
continue;
}
/* besides the above we only expect nnnn.dat, with nnnn being our numerical dumpID */
*************** _StartCheckArchive(ArchiveHandle *AH)
*** 1075,1082 ****
while (idx < ctx->chkListSize)
ctx->chkList[idx++] = InvalidDumpId;
! /* also return false if we haven't seen the TOC file */
! return checkOK && tocSeen;
}
static bool
--- 1187,1193 ----
while (idx < ctx->chkListSize)
ctx->chkList[idx++] = InvalidDumpId;
! return checkOK;
}
static bool
*************** static bool
*** 1188,1194 ****
_CheckBlob(ArchiveHandle *AH, Oid oid, pgoff_t size)
{
lclContext *ctx = (lclContext *) AH->formatData;
! char *fname = prependBlobsDirectory(AH, oid);
bool checkOK = true;
if (!_CheckFileSize(AH, fname, size, false))
--- 1299,1305 ----
_CheckBlob(ArchiveHandle *AH, Oid oid, pgoff_t size)
{
lclContext *ctx = (lclContext *) AH->formatData;
! char *fname = prependBlobsDirectory(AH, oid, -1);
bool checkOK = true;
if (!_CheckFileSize(AH, fname, size, false))
*************** _CheckBlobs(ArchiveHandle *AH, TocEntry
*** 1211,1223 ****
Oid oid;
/* check the BLOBS.TOC first */
! fname = prependDirectory(AH, "BLOBS.TOC");
!
! if (!fname)
! {
! printf("Could not find BLOBS.TOC. Check the archive!\n");
! return false;
! }
if (!_CheckFileSize(AH, fname, tctx->fileSize, false))
checkOK = false;
--- 1322,1328 ----
Oid oid;
/* check the BLOBS.TOC first */
! fname = prependDirectory(AH, "BLOBS.TOC", -1);
if (!_CheckFileSize(AH, fname, tctx->fileSize, false))
checkOK = false;
*************** _CheckTocEntry(ArchiveHandle *AH, TocEnt
*** 1291,1303 ****
{
char *fname;
! fname = prependDirectory(AH, tctx->filename);
! if (!fname)
! {
! printf("Could not find file %s\n", tctx->filename);
! checkOK = false;
! }
! else if (!_CheckFileSize(AH, fname, tctx->fileSize, false))
checkOK = false;
else if (!_CheckFileContents(AH, fname, ctx->idStr, false))
checkOK = false;
--- 1396,1403 ----
{
char *fname;
! fname = prependDirectory(AH, tctx->filename, -1);
! if (!_CheckFileSize(AH, fname, tctx->fileSize, false))
checkOK = false;
else if (!_CheckFileContents(AH, fname, ctx->idStr, false))
checkOK = false;
*************** _EndCheckArchive(ArchiveHandle *AH)
*** 1326,1384 ****
return checkOK;
}
-
- static void
- createDirectory(const char *dir, const char *subdir)
- {
- struct stat st;
- char dirname[MAXPGPATH];
-
- /* the directory must not yet exist, first check if it is existing */
- if (subdir && strlen(dir) + 1 + strlen(subdir) + 1 > MAXPGPATH)
- die_horribly(NULL, modulename, "directory name %s too long", dir);
-
- strcpy(dirname, dir);
-
- if (subdir)
- {
- strcat(dirname, "/");
- strcat(dirname, subdir);
- }
-
- if (stat(dirname, &st) == 0)
- {
- if (S_ISDIR(st.st_mode))
- die_horribly(NULL, modulename,
- "Cannot create directory %s, it exists already\n", dirname);
- else
- die_horribly(NULL, modulename,
- "Cannot create directory %s, a file with this name exists already\n", dirname);
- }
-
- /*
- * Now we create the directory. Note that for some race condition we
- * could also run into the situation that the directory has been created
- * just between our two calls.
- */
- if (mkdir(dirname, 0700) < 0)
- die_horribly(NULL, modulename, "Could not create directory %s: %s",
- dirname, strerror(errno));
- }
-
-
static char *
! prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
{
lclContext *ctx = (lclContext *) AH->formatData;
static char buf[MAXPGPATH];
! char *dname;
! dname = ctx->directory;
! if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
! die_horribly(AH, modulename, "path name too long: %s", dname);
! strcpy(buf, dname);
strcat(buf, "/");
strcat(buf, relativeFilename);
--- 1426,1455 ----
return checkOK;
}
static char *
! prependDirectory(ArchiveHandle *AH, const char *relativeFilename, int directoryIndex)
{
lclContext *ctx = (lclContext *) AH->formatData;
static char buf[MAXPGPATH];
! int i;
! if (directoryIndex < 0)
! {
! /* detect the directory automatically (calls itself) */
! for (i = 0; i < ctx->numDirectories; i++)
! {
! struct stat st;
! char *fname = prependDirectory(AH, relativeFilename, i);
! if (stat(fname, &st) == 0 && S_ISREG(st.st_mode))
! return fname;
! }
! die_horribly(AH, modulename, "Could not find input file \"%s\" in the archive\n", relativeFilename);
! }
! if (strlen(ctx->directories[directoryIndex]) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
! die_horribly(AH, modulename, "directory name \"%s\" too long\n", ctx->directories[directoryIndex]);
! strcpy(buf, ctx->directories[directoryIndex]);
strcat(buf, "/");
strcat(buf, relativeFilename);
*************** prependDirectory(ArchiveHandle *AH, cons
*** 1386,1405 ****
}
static char *
! prependBlobsDirectory(ArchiveHandle *AH, Oid oid)
{
static char buf[MAXPGPATH];
char *dname;
lclContext *ctx = (lclContext *) AH->formatData;
int r;
! dname = ctx->directory;
r = snprintf(buf, MAXPGPATH, "%s/blobs/%d%s",
dname, oid, FILE_SUFFIX);
if (r < 0 || r >= MAXPGPATH)
! die_horribly(AH, modulename, "path name too long: %s", dname);
return buf;
}
--- 1457,1491 ----
}
static char *
! prependBlobsDirectory(ArchiveHandle *AH, Oid oid, int directoryIndex)
{
static char buf[MAXPGPATH];
char *dname;
lclContext *ctx = (lclContext *) AH->formatData;
int r;
! if (directoryIndex < 0)
! {
! int i;
!
! for (i = 0; i < ctx->numDirectories; i++)
! {
! struct stat st;
! char *fname = prependBlobsDirectory(AH, oid, i);
! if (stat(fname, &st) == 0 && S_ISREG(st.st_mode))
! return fname;
! }
! die_horribly(AH, modulename, "Could not find input file \"%d%s\" in the archive\n",
! oid, FILE_SUFFIX);
! }
!
! dname = ctx->directories[directoryIndex];
r = snprintf(buf, MAXPGPATH, "%s/blobs/%d%s",
dname, oid, FILE_SUFFIX);
if (r < 0 || r >= MAXPGPATH)
! die_horribly(AH, modulename, "directory name \"%s\" too long\n", dname);
return buf;
}
*************** getRandomData(char *s, int len)
*** 1473,1496 ****
}
static bool
! isDirectory(const char *fname)
{
! struct stat st;
! if (stat(fname, &st))
return false;
return S_ISDIR(st.st_mode);
}
static bool
! isRegularFile(const char *fname)
{
! struct stat st;
! if (stat(fname, &st))
return false;
return S_ISREG(st.st_mode);
}
--- 1559,1922 ----
}
static bool
! isDirectory(const char *dname, const char *fname)
{
! char buf[MAXPGPATH];
! struct stat st;
! if (strlen(dname) + 1 + strlen(fname) + 1 > sizeof(buf))
! die_horribly(NULL, modulename, "directory name \"%s\" too long\n", dname);
!
! strcpy(buf, dname);
! strcat(buf, "/");
! strcat(buf, fname);
!
! if (stat(buf, &st))
return false;
return S_ISDIR(st.st_mode);
}
static bool
! isRegularFile(const char *dname, const char *fname)
{
! char buf[MAXPGPATH];
! struct stat st;
! if (strlen(dname) + 1 + strlen(fname) + 1 > sizeof(buf))
! die_horribly(NULL, modulename, "directory name \"%s\" too long\n", dname);
!
! strcpy(buf, dname);
! strcat(buf, "/");
! strcat(buf, fname);
!
! if (stat(buf, &st))
return false;
return S_ISREG(st.st_mode);
}
+ static char *
+ _WorkerJobDumpDirectory(ArchiveHandle *AH, const char *args)
+ {
+ static char buf[64]; /* short string + some ID so far */
+ lclContext *ctx = (lclContext *) AH->formatData;
+ TocEntry *te;
+ lclTocEntry *tctx = NULL;
+ DumpId dumpId;
+ int directoryIndex;
+ int nBytes, nTok;
+
+ nTok = sscanf(args, "%d %d%n", &dumpId, &directoryIndex, &nBytes);
+ Assert(nBytes == strlen(args));
+ Assert(nTok == 2); /* XXX remove, not safe acc. to manpage */
+
+ ctx->is_parallel_child = true;
+ ctx->directoryUsage = NULL;
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->dumpId == dumpId)
+ {
+ tctx = (lclTocEntry *) te->formatData;
+ tctx->directoryIndex = directoryIndex;
+ break;
+ }
+ }
+
+ Assert(te->dumpId == dumpId);
+ Assert(tctx != NULL);
+ /* This should never happen */
+ if (!tctx)
+ die_horribly(AH, modulename, "Error during backup\n");
+
+ WriteDataChunksForTocEntry(AH, te);
+
+ /* XXX handle failure */
+ snprintf(buf, sizeof(buf), "OK DUMP %d %lu", te->dumpId, (unsigned long int) tctx->fileSize);
+
+ return buf;
+ }
+
+ /*
+ * Clone format-specific fields during parallel restoration.
+ */
+ static void
+ _Clone(ArchiveHandle *AH)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ AH->formatData = (lclContext *) malloc(sizeof(lclContext));
+ if (AH->formatData == NULL)
+ die_horribly(AH, modulename, "out of memory\n");
+ memcpy(AH->formatData, ctx, sizeof(lclContext));
+ ctx = (lclContext *) AH->formatData;
+
+ ctx->cs = AllocateCompressorState(AH);
+
+ /*
+ * Note: we do not make a local lo_buf because we expect at most one BLOBS
+ * entry per archive, so no parallelism is possible. Likewise,
+ * TOC-entry-local state isn't an issue because any one TOC entry is
+ * touched by just one worker child.
+ */
+ }
+
+ static void
+ _DeClone(ArchiveHandle *AH)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+ CompressorState *cs = ctx->cs;
+
+ FreeCompressorState(cs);
+
+ free(ctx);
+ }
+
+ /* XXX sort in to a better place */
+ static char *
+ _WorkerJobRestoreDirectory(ArchiveHandle *AH, const char *args)
+ {
+ static char buf[64]; /* short string + some ID so far */
+ lclContext *ctx = (lclContext *) AH->formatData;
+ ParallelArgs pargs;
+ int ret;
+ lclTocEntry *tctx;
+ TocEntry *te;
+ DumpId dumpId = InvalidDumpId;
+ int nBytes, nTok;
+
+ nTok = sscanf(args, "%d%n", &dumpId, &nBytes);
+ Assert(nBytes == strlen(args));
+ Assert(nTok == 1); /* XXX remove, not safe acc. to manpage */
+
+ ctx->is_parallel_child = true;
+ ctx->directoryUsage = NULL;
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->dumpId == dumpId)
+ {
+ tctx = (lclTocEntry *) te->formatData;
+ break;
+ }
+ }
+
+ Assert(te->dumpId == dumpId);
+ tctx = (lclTocEntry *) te->formatData;
+
+ pargs.AH = AH;
+ pargs.te = te;
+
+ /* parallel_restore() will reconnect and establish the restore
+ * connection */
+ /* AH->connection = NULL; */
+ ctx->is_parallel_child = true;
+
+ ret = parallel_restore(&pargs);
+
+ tctx->restore_status = ret;
+
+ /* XXX handle failure */
+ snprintf(buf, sizeof(buf), "OK RESTORE %d", dumpId);
+
+ return buf;
+ }
+
+ static ParallelState *
+ _GetParallelState(ArchiveHandle *AH)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+ if (ctx->pstate.numWorkers > 1)
+ return &ctx->pstate;
+ else
+ return NULL;
+ }
+
+ /* XXX if numWorkers is the only piece of information that we pass to the
+ * format this way, consider generating a AH->number_of_jobs or the like. */
+ void
+ setupArchDirectory(ArchiveHandle *AH, int numWorkers)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+ ctx->numWorkers = numWorkers;
+ }
+
+ static void
+ splitDirectories(const char *spec, lclContext *ctx)
+ {
+ /* count the number of fragments */
+ char *p;
+ const char *q;
+
+ ctx->numDirectories = 1;
+ for (q = spec; *q != '\0'; q++)
+ {
+ if (*q == ':')
+ ctx->numDirectories++;
+ }
+
+ ctx->directories = (char **) malloc(ctx->numDirectories * sizeof(char *));
+ ctx->directoryUsage = (int *) malloc(ctx->numDirectories * sizeof(int));
+ p = strdup(spec);
+ if (!ctx->directories || !p)
+ die_horribly(NULL, modulename, "out of memory\n");
+
+ ctx->numDirectories = 1;
+ ctx->directories[0] = p;
+ ctx->directoryUsage[0] = 0;
+ for(; *p;)
+ {
+ if (*p == ':')
+ {
+ *p = '\0';
+ p++;
+ ctx->numDirectories++;
+ ctx->directories[ctx->numDirectories - 1] = p;
+ ctx->directoryUsage[ctx->numDirectories - 1] = 0;
+ }
+ else
+ p++;
+ }
+ }
+
+ static void
+ createDirectoryGroup(char **dirs, int nDir, const char *subdir)
+ {
+ /* the directories must not yet exist, first check if they are existing */
+ struct stat st;
+ int i;
+ char dirname[MAXPGPATH];
+
+ for(i = 0; i < nDir; i++)
+ {
+ if (subdir && strlen(dirs[i]) + 1 + strlen(subdir) + 1 > MAXPGPATH)
+ die_horribly(NULL, modulename, "directory name \"%s\" too long\n", dirs[i]);
+ strcpy(dirname, dirs[i]);
+
+ if (subdir)
+ {
+ strcat(dirname, "/");
+ strcat(dirname, subdir);
+ }
+
+ /* XXX extend checks - check for base path */
+ if (stat(dirname, &st) != 0)
+ continue;
+ if (S_ISDIR(st.st_mode))
+ die_horribly(NULL, modulename, "Cannot create directory \"%s\", it exists already\n", dirname);
+ else
+ die_horribly(NULL, modulename, "Cannot create directory \"%s\", a file with this name exists already\n", dirname);
+ }
+
+ /* now create the directories. Still for insufficient privileges or some
+ * race condition we could fail here */
+
+ for (i = 0; i < nDir; i++)
+ {
+ strcpy(dirname, dirs[i]);
+
+ if (subdir)
+ {
+ strcat(dirname, "/");
+ strcat(dirname, subdir);
+ }
+
+ if (mkdir(dirname, 0700) < 0)
+ die_horribly(NULL, modulename, "Could not create directory %s: %s\n",
+ dirname, strerror(errno));
+ }
+ }
+
+ static int
+ assignDirectory(lclContext *ctx)
+ {
+ /*
+ * With d directories and n parallel worker processes, every directory
+ * receives n / d items simultaneously. As long as a directory has not yet
+ * received n / d items, this is our next directory. To distribute stuff
+ * even better we do a round-robin with respect to which directory we check
+ * first. (Imagine we have 3 very large tables and the rest small, we want
+ * to distribute the 3 tables to different processes).
+ */
+
+ static int startIdx;
+ int i = startIdx;
+
+ Assert(ctx->directoryUsage != NULL);
+
+ do
+ {
+ if (ctx->directoryUsage[i] == 0 || (float) ctx->directoryUsage[i] < (float) ctx->numWorkers / (float) ctx->numDirectories)
+ {
+ ctx->directoryUsage[i]++;
+ startIdx = (i + 1) % ctx->numDirectories;
+ return i;
+ }
+ i = (i + 1) % ctx->numDirectories;
+ } while (true);
+ }
+
+ static void
+ unassignDirectory(lclContext *ctx, lclTocEntry *tctx)
+ {
+ Assert(ctx->directoryUsage != NULL);
+ Assert(ctx->directoryUsage[tctx->directoryIndex] > 0);
+ ctx->directoryUsage[tctx->directoryIndex]--;
+ tctx->directoryIndex = -1;
+ }
+
+ static char *
+ _StartMasterParallel(ArchiveHandle *AH, TocEntry *te, T_Action act)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ static char buf[32];
+
+ if (act == ACT_DUMP)
+ {
+ tctx->directoryIndex = assignDirectory(ctx);
+ snprintf(buf, sizeof(buf), "DUMP %d %d",
+ te->dumpId, tctx->directoryIndex);
+ }
+ else if (act == ACT_RESTORE)
+ {
+ snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+ }
+
+ return buf;
+ }
+
+ static int
+ _EndMasterParallel(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+ {
+ int nTok, nBytes;
+ DumpId dumpId;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ if (act == ACT_DUMP)
+ {
+ unsigned long int size;
+ unassignDirectory(ctx, tctx);
+
+ nTok = sscanf(str, "%u %lu%n", &dumpId, &size, &nBytes);
+
+ Assert(nTok == 2); /* XXX remove, not safe acc. to manpage */
+ Assert(dumpId == te->dumpId);
+ Assert(nBytes == strlen(str));
+
+ tctx->fileSize = size;
+ }
+ else if (act == ACT_RESTORE)
+ {
+ nTok = sscanf(str, "%u%n", &dumpId, &nBytes);
+
+ Assert(nTok == 1); /* XXX remove, not safe acc. to manpage */
+ Assert(dumpId == te->dumpId);
+ Assert(nBytes == strlen(str));
+ }
+
+ return 0;
+ }
+
diff --git a/src/bin/pg_dump/pg_backup_files.c b/src/bin/pg_dump/pg_backup_files.c
index 825c473..87a584b 100644
*** a/src/bin/pg_dump/pg_backup_files.c
--- b/src/bin/pg_dump/pg_backup_files.c
*************** InitArchiveFmt_Files(ArchiveHandle *AH)
*** 101,106 ****
--- 101,113 ----
AH->ClonePtr = NULL;
AH->DeClonePtr = NULL;
+ AH->StartMasterParallelPtr = NULL;
+ AH->EndMasterParallelPtr = NULL;
+
+ AH->GetParallelStatePtr = NULL;
+ AH->WorkerJobDumpPtr = NULL;
+ AH->WorkerJobRestorePtr = NULL;
+
AH->StartCheckArchivePtr = NULL;
AH->CheckTocEntryPtr = NULL;
AH->EndCheckArchivePtr = NULL;
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index dcc13ee..229d9fb 100644
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
*************** InitArchiveFmt_Tar(ArchiveHandle *AH)
*** 153,158 ****
--- 153,165 ----
AH->ClonePtr = NULL;
AH->DeClonePtr = NULL;
+ AH->StartMasterParallelPtr = NULL;
+ AH->EndMasterParallelPtr = NULL;
+
+ AH->GetParallelStatePtr = NULL;
+ AH->WorkerJobDumpPtr = NULL;
+ AH->WorkerJobRestorePtr = NULL;
+
AH->StartCheckArchivePtr = NULL;
AH->CheckTocEntryPtr = NULL;
AH->EndCheckArchivePtr = NULL;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 39e68d9..abd90f5 100644
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
*************** bool g_verbose; /* User wants verbose
*** 85,90 ****
--- 85,91 ----
* activities. */
Archive *g_fout; /* the script file */
PGconn *g_conn; /* the database connection */
+ PGconn **g_conn_child;
/* various user-settable parameters */
bool schemaOnly;
*************** static void do_sql_command(PGconn *conn,
*** 237,242 ****
--- 238,246 ----
static void check_sql_result(PGresult *res, PGconn *conn, const char *query,
ExecStatusType expected);
+ static ArchiveFormat parseArchiveFormat(const char *format);
+
+ void SetupConnection(PGconn *conn, const char* syncId, const char *dumpencoding, const char *use_role);
int
main(int argc, char **argv)
*************** main(int argc, char **argv)
*** 249,261 ****
const char *pgport = NULL;
const char *username = NULL;
const char *dumpencoding = NULL;
- const char *std_strings;
bool oids = false;
TableInfo *tblinfo;
int numTables;
DumpableObject **dobjs;
int numObjs;
int i;
enum trivalue prompt_password = TRI_DEFAULT;
int compressLevel = COMPRESSION_UNKNOWN;
int plainText = 0;
--- 253,265 ----
const char *pgport = NULL;
const char *username = NULL;
const char *dumpencoding = NULL;
bool oids = false;
TableInfo *tblinfo;
int numTables;
DumpableObject **dobjs;
int numObjs;
int i;
+ int numWorkers = 1;
enum trivalue prompt_password = TRI_DEFAULT;
int compressLevel = COMPRESSION_UNKNOWN;
int plainText = 0;
*************** main(int argc, char **argv)
*** 356,362 ****
}
}
! while ((c = getopt_long(argc, argv, "abcCE:f:F:h:in:N:oOp:RsS:t:T:U:vwWxX:Z:",
long_options, &optindex)) != -1)
{
switch (c)
--- 360,366 ----
}
}
! while ((c = getopt_long(argc, argv, "abcCE:f:F:h:ij:n:N:oOp:RsS:t:T:U:vwWxX:Z:",
long_options, &optindex)) != -1)
{
switch (c)
*************** main(int argc, char **argv)
*** 397,402 ****
--- 401,410 ----
/* ignored, deprecated option */
break;
+ case 'j':
+ numWorkers = atoi(optarg);
+ break;
+
case 'n': /* include schema(s) */
simple_string_list_append(&schema_include_patterns, optarg);
include_everything = false;
*************** main(int argc, char **argv)
*** 542,547 ****
--- 550,561 ----
archiveFormat = parseArchiveFormat(format);
+ if (archiveFormat != archDirectory && numWorkers > 1)
+ {
+ write_msg(NULL, "parallel backup only supported by the directory format\n");
+ exit(1);
+ }
+
/* archiveFormat specific setup */
if (archiveFormat == archNull || archiveFormat == archNullAppend)
plainText = 1;
*************** main(int argc, char **argv)
*** 639,742 ****
* Open the database using the Archiver, so it knows about it. Errors mean
* death.
*/
- g_conn = ConnectDatabase(g_fout, dbname, pghost, pgport,
- username, prompt_password);
- /* Set the client encoding if requested */
- if (dumpencoding)
{
! if (PQsetClientEncoding(g_conn, dumpencoding) < 0)
! {
! write_msg(NULL, "invalid client encoding \"%s\" specified\n",
! dumpencoding);
! exit(1);
! }
! }
!
! /*
! * Get the active encoding and the standard_conforming_strings setting, so
! * we know how to escape strings.
! */
! g_fout->encoding = PQclientEncoding(g_conn);
!
! std_strings = PQparameterStatus(g_conn, "standard_conforming_strings");
! g_fout->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
!
! /* Set the role if requested */
! if (use_role && g_fout->remoteVersion >= 80100)
! {
! PQExpBuffer query = createPQExpBuffer();
!
! appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
! do_sql_command(g_conn, query->data);
! destroyPQExpBuffer(query);
! }
!
! /* Set the datestyle to ISO to ensure the dump's portability */
! do_sql_command(g_conn, "SET DATESTYLE = ISO");
!
! /* Likewise, avoid using sql_standard intervalstyle */
! if (g_fout->remoteVersion >= 80400)
! do_sql_command(g_conn, "SET INTERVALSTYLE = POSTGRES");
!
! /*
! * If supported, set extra_float_digits so that we can dump float data
! * exactly (given correctly implemented float I/O code, anyway)
! */
! if (g_fout->remoteVersion >= 90000)
! do_sql_command(g_conn, "SET extra_float_digits TO 3");
! else if (g_fout->remoteVersion >= 70400)
! do_sql_command(g_conn, "SET extra_float_digits TO 2");
!
! /*
! * If synchronized scanning is supported, disable it, to prevent
! * unpredictable changes in row ordering across a dump and reload.
! */
! if (g_fout->remoteVersion >= 80300)
! do_sql_command(g_conn, "SET synchronize_seqscans TO off");
!
! /*
! * Disable timeouts if supported.
! */
! if (g_fout->remoteVersion >= 70300)
! do_sql_command(g_conn, "SET statement_timeout = 0");
!
! /*
! * Quote all identifiers, if requested.
! */
! if (quote_all_identifiers && g_fout->remoteVersion >= 90100)
! do_sql_command(g_conn, "SET quote_all_identifiers = true");
! /*
! * Disables security label support if server version < v9.1.x
! */
! if (!no_security_label && g_fout->remoteVersion < 90100)
! no_security_label = 1;
! /*
! * Start serializable transaction to dump consistent data.
! */
! do_sql_command(g_conn, "BEGIN");
! do_sql_command(g_conn, "SET TRANSACTION READ ONLY ISOLATION LEVEL SERIALIZABLE");
! /* Select the appropriate subquery to convert user IDs to names */
! if (g_fout->remoteVersion >= 80100)
! username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
! else if (g_fout->remoteVersion >= 70300)
! username_subquery = "SELECT usename FROM pg_catalog.pg_user WHERE usesysid =";
! else
! username_subquery = "SELECT usename FROM pg_user WHERE usesysid =";
! /* Find the last built-in OID, if needed */
! if (g_fout->remoteVersion < 70300)
! {
! if (g_fout->remoteVersion >= 70100)
! g_last_builtin_oid = findLastBuiltinOid_V71(PQdb(g_conn));
! else
! g_last_builtin_oid = findLastBuiltinOid_V70();
! if (g_verbose)
! write_msg(NULL, "last built-in OID is %u\n", g_last_builtin_oid);
}
/* Expand schema selection patterns into OID lists */
--- 653,694 ----
* Open the database using the Archiver, so it knows about it. Errors mean
* death.
*/
{
! ArchiveHandle *AH;
! PGconn *backup;
! PGconn *temp;
! char *idString = "id";
! AH = (ArchiveHandle *) g_fout;
! if (archiveFormat == archDirectory)
! setupArchDirectory(AH, numWorkers);
! temp = ConnectDatabase(g_fout, dbname, pghost, pgport,
! username, prompt_password);
! PQsetnonblocking(temp, 1);
! AH->connection = NULL;
! g_conn = ConnectDatabase(g_fout, dbname, pghost, pgport,
! username, prompt_password);
! AH = (ArchiveHandle *) g_fout;
! backup = AH->connection;
! g_conn_child = (PGconn**) malloc(numWorkers * sizeof(PGconn *));
! for (i = 0; i < numWorkers; i++)
! {
! AH->connection = NULL;
! g_conn_child[i] = ConnectDatabase(g_fout, dbname,
! pghost, pgport,
! username, prompt_password);
! }
! SetupConnection(g_conn, idString, dumpencoding, use_role);
! for (i = 0; i < numWorkers; i++)
! {
! SetupConnection(g_conn_child[i], idString, dumpencoding, use_role);
! }
! AH->connection = backup;
}
/* Expand schema selection patterns into OID lists */
*************** main(int argc, char **argv)
*** 816,821 ****
--- 768,776 ----
else
sortDumpableObjectsByTypeOid(dobjs, numObjs);
+ if (archiveFormat == archDirectory && numWorkers > 1)
+ sortDataAndIndexObjectsBySize(dobjs, numObjs);
+
sortDumpableObjects(dobjs, numObjs);
/*
*************** dumpTableData(Archive *fout, TableDataIn
*** 1531,1537 ****
ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
! NULL, tbinfo->rolname,
false, "TABLE DATA", SECTION_DATA,
"", "", copyStmt,
tdinfo->dobj.dependencies, tdinfo->dobj.nDeps,
--- 1486,1492 ----
ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name,
! NULL, tbinfo->rolname, tbinfo->relpages,
false, "TABLE DATA", SECTION_DATA,
"", "", copyStmt,
tdinfo->dobj.dependencies, tdinfo->dobj.nDeps,
*************** dumpDatabase(Archive *AH)
*** 1899,1904 ****
--- 1854,1860 ----
NULL, /* Namespace */
NULL, /* Tablespace */
dba, /* Owner */
+ 0, /* relpages */
false, /* with oids */
"DATABASE", /* Desc */
SECTION_PRE_DATA, /* Section */
*************** dumpDatabase(Archive *AH)
*** 1944,1950 ****
atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
LargeObjectRelationId);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
! "pg_largeobject", NULL, NULL, "",
false, "pg_largeobject", SECTION_PRE_DATA,
loOutQry->data, "", NULL,
NULL, 0,
--- 1900,1906 ----
atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
LargeObjectRelationId);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
! "pg_largeobject", NULL, NULL, "", 0,
false, "pg_largeobject", SECTION_PRE_DATA,
loOutQry->data, "", NULL,
NULL, 0,
*************** dumpDatabase(Archive *AH)
*** 1977,1983 ****
appendPQExpBuffer(dbQry, ";\n");
ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL,
! dba, false, "COMMENT", SECTION_NONE,
dbQry->data, "", NULL,
&dbDumpId, 1, NULL, NULL);
}
--- 1933,1939 ----
appendPQExpBuffer(dbQry, ";\n");
ArchiveEntry(AH, dbCatId, createDumpId(), datname, NULL, NULL,
! dba, 0, false, "COMMENT", SECTION_NONE,
dbQry->data, "", NULL,
&dbDumpId, 1, NULL, NULL);
}
*************** dumpEncoding(Archive *AH)
*** 2015,2021 ****
appendPQExpBuffer(qry, ";\n");
ArchiveEntry(AH, nilCatalogId, createDumpId(),
! "ENCODING", NULL, NULL, "",
false, "ENCODING", SECTION_PRE_DATA,
qry->data, "", NULL,
NULL, 0,
--- 1971,1977 ----
appendPQExpBuffer(qry, ";\n");
ArchiveEntry(AH, nilCatalogId, createDumpId(),
! "ENCODING", NULL, NULL, "", 0,
false, "ENCODING", SECTION_PRE_DATA,
qry->data, "", NULL,
NULL, 0,
*************** dumpStdStrings(Archive *AH)
*** 2042,2048 ****
stdstrings);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
! "STDSTRINGS", NULL, NULL, "",
false, "STDSTRINGS", SECTION_PRE_DATA,
qry->data, "", NULL,
NULL, 0,
--- 1998,2004 ----
stdstrings);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
! "STDSTRINGS", NULL, NULL, "", 0,
false, "STDSTRINGS", SECTION_PRE_DATA,
qry->data, "", NULL,
NULL, 0,
*************** dumpBlob(Archive *AH, BlobInfo *binfo)
*** 2154,2160 ****
ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
binfo->dobj.name,
NULL, NULL,
! binfo->rolname, false,
"BLOB", SECTION_PRE_DATA,
cquery->data, dquery->data, NULL,
binfo->dobj.dependencies, binfo->dobj.nDeps,
--- 2110,2116 ----
ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
binfo->dobj.name,
NULL, NULL,
! binfo->rolname, 0, false,
"BLOB", SECTION_PRE_DATA,
cquery->data, dquery->data, NULL,
binfo->dobj.dependencies, binfo->dobj.nDeps,
*************** getTables(int *numTables)
*** 3540,3545 ****
--- 3496,3502 ----
int i_reloptions;
int i_toastreloptions;
int i_reloftype;
+ int i_relpages;
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
*************** getTables(int *numTables)
*** 3572,3582 ****
*/
appendPQExpBuffer(query,
"SELECT c.tableoid, c.oid, c.relname, "
! "c.relacl, c.relkind, c.relnamespace, "
"(%s c.relowner) AS rolname, "
"c.relchecks, c.relhastriggers, "
"c.relhasindex, c.relhasrules, c.relhasoids, "
! "c.relfrozenxid, "
"CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
--- 3529,3539 ----
*/
appendPQExpBuffer(query,
"SELECT c.tableoid, c.oid, c.relname, "
! "c.relacl, c.relkind, c.relnamespace, c.relpages, "
"(%s c.relowner) AS rolname, "
"c.relchecks, c.relhastriggers, "
"c.relhasindex, c.relhasrules, c.relhasoids, "
! "c.relfrozenxid, c.relpages, "
"CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 3609,3615 ****
"(%s c.relowner) AS rolname, "
"c.relchecks, c.relhastriggers, "
"c.relhasindex, c.relhasrules, c.relhasoids, "
! "c.relfrozenxid, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
--- 3566,3572 ----
"(%s c.relowner) AS rolname, "
"c.relchecks, c.relhastriggers, "
"c.relhasindex, c.relhasrules, c.relhasoids, "
! "c.relfrozenxid, c.relpages, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 3642,3648 ****
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "relfrozenxid, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
--- 3599,3605 ----
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "relfrozenxid, relpages, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 3674,3680 ****
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "0 AS relfrozenxid, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
--- 3631,3637 ----
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "0 AS relfrozenxid, relpages, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 3706,3712 ****
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "0 AS relfrozenxid, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
--- 3663,3669 ----
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "0 AS relfrozenxid, relpages, "
"NULL AS reloftype, "
"d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, "
*************** getTables(int *numTables)
*** 3734,3740 ****
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "0 AS relfrozenxid, "
"NULL AS reloftype, "
"NULL::oid AS owning_tab, "
"NULL::int4 AS owning_col, "
--- 3691,3697 ----
"(%s relowner) AS rolname, "
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, relhasoids, "
! "0 AS relfrozenxid, relpages, "
"NULL AS reloftype, "
"NULL::oid AS owning_tab, "
"NULL::int4 AS owning_col, "
*************** getTables(int *numTables)
*** 3757,3763 ****
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, "
"'t'::bool AS relhasoids, "
! "0 AS relfrozenxid, "
"NULL AS reloftype, "
"NULL::oid AS owning_tab, "
"NULL::int4 AS owning_col, "
--- 3714,3720 ----
"relchecks, (reltriggers <> 0) AS relhastriggers, "
"relhasindex, relhasrules, "
"'t'::bool AS relhasoids, "
! "0 AS relfrozenxid, relpages, "
"NULL AS reloftype, "
"NULL::oid AS owning_tab, "
"NULL::int4 AS owning_col, "
*************** getTables(int *numTables)
*** 3842,3847 ****
--- 3799,3805 ----
i_reloptions = PQfnumber(res, "reloptions");
i_toastreloptions = PQfnumber(res, "toast_reloptions");
i_reloftype = PQfnumber(res, "reloftype");
+ i_relpages = PQfnumber(res, "relpages");
if (lockWaitTimeout && g_fout->remoteVersion >= 70300)
{
*************** getTables(int *numTables)
*** 3893,3898 ****
--- 3851,3857 ----
tblinfo[i].reltablespace = strdup(PQgetvalue(res, i, i_reltablespace));
tblinfo[i].reloptions = strdup(PQgetvalue(res, i, i_reloptions));
tblinfo[i].toast_reloptions = strdup(PQgetvalue(res, i, i_toastreloptions));
+ tblinfo[i].relpages = atoi(PQgetvalue(res, i, i_relpages));
/* other fields were zeroed above */
*************** dumpComment(Archive *fout, const char *t
*** 6277,6283 ****
* post-data.
*/
ArchiveEntry(fout, nilCatalogId, createDumpId(),
! target, namespace, NULL, owner,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(dumpId), 1,
--- 6236,6242 ----
* post-data.
*/
ArchiveEntry(fout, nilCatalogId, createDumpId(),
! target, namespace, NULL, owner, 0,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(dumpId), 1,
*************** dumpTableComment(Archive *fout, TableInf
*** 6338,6344 ****
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target->data,
tbinfo->dobj.namespace->dobj.name,
! NULL, tbinfo->rolname,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
--- 6297,6303 ----
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target->data,
tbinfo->dobj.namespace->dobj.name,
! NULL, tbinfo->rolname, 0,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
*************** dumpTableComment(Archive *fout, TableInf
*** 6360,6366 ****
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target->data,
tbinfo->dobj.namespace->dobj.name,
! NULL, tbinfo->rolname,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
--- 6319,6325 ----
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target->data,
tbinfo->dobj.namespace->dobj.name,
! NULL, tbinfo->rolname, 0,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
*************** dumpDumpableObject(Archive *fout, Dumpab
*** 6640,6646 ****
break;
case DO_BLOB_DATA:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
--- 6599,6605 ----
break;
case DO_BLOB_DATA:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL, "", 0,
false, "BLOBS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
*************** dumpNamespace(Archive *fout, NamespaceIn
*** 6680,6686 ****
ArchiveEntry(fout, nspinfo->dobj.catId, nspinfo->dobj.dumpId,
nspinfo->dobj.name,
NULL, NULL,
! nspinfo->rolname,
false, "SCHEMA", SECTION_PRE_DATA,
q->data, delq->data, NULL,
nspinfo->dobj.dependencies, nspinfo->dobj.nDeps,
--- 6639,6645 ----
ArchiveEntry(fout, nspinfo->dobj.catId, nspinfo->dobj.dumpId,
nspinfo->dobj.name,
NULL, NULL,
! nspinfo->rolname, 0,
false, "SCHEMA", SECTION_PRE_DATA,
q->data, delq->data, NULL,
nspinfo->dobj.dependencies, nspinfo->dobj.nDeps,
*************** dumpEnumType(Archive *fout, TypeInfo *ty
*** 6822,6828 ****
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, false,
"TYPE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 6781,6787 ----
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, 0, false,
"TYPE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpBaseType(Archive *fout, TypeInfo *ty
*** 7201,7207 ****
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, false,
"TYPE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 7160,7166 ----
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, 0, false,
"TYPE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpDomain(Archive *fout, TypeInfo *tyin
*** 7328,7334 ****
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, false,
"DOMAIN", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 7287,7293 ----
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, 0, false,
"DOMAIN", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpCompositeType(Archive *fout, TypeInf
*** 7430,7436 ****
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, false,
"TYPE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
--- 7389,7395 ----
tyinfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, 0, false,
"TYPE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tyinfo->dobj.dependencies, tyinfo->dobj.nDeps,
*************** dumpCompositeTypeColComments(Archive *fo
*** 7551,7557 ****
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target->data,
tyinfo->dobj.namespace->dobj.name,
! NULL, tyinfo->rolname,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(tyinfo->dobj.dumpId), 1,
--- 7510,7516 ----
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target->data,
tyinfo->dobj.namespace->dobj.name,
! NULL, tyinfo->rolname, 0,
false, "COMMENT", SECTION_NONE,
query->data, "", NULL,
&(tyinfo->dobj.dumpId), 1,
*************** dumpShellType(Archive *fout, ShellTypeIn
*** 7604,7610 ****
stinfo->dobj.name,
stinfo->dobj.namespace->dobj.name,
NULL,
! stinfo->baseType->rolname, false,
"SHELL TYPE", SECTION_PRE_DATA,
q->data, "", NULL,
stinfo->dobj.dependencies, stinfo->dobj.nDeps,
--- 7563,7569 ----
stinfo->dobj.name,
stinfo->dobj.namespace->dobj.name,
NULL,
! stinfo->baseType->rolname, 0, false,
"SHELL TYPE", SECTION_PRE_DATA,
q->data, "", NULL,
stinfo->dobj.dependencies, stinfo->dobj.nDeps,
*************** dumpProcLang(Archive *fout, ProcLangInfo
*** 7758,7764 ****
ArchiveEntry(fout, plang->dobj.catId, plang->dobj.dumpId,
plang->dobj.name,
! lanschema, NULL, plang->lanowner,
false, "PROCEDURAL LANGUAGE", SECTION_PRE_DATA,
defqry->data, delqry->data, NULL,
plang->dobj.dependencies, plang->dobj.nDeps,
--- 7717,7723 ----
ArchiveEntry(fout, plang->dobj.catId, plang->dobj.dumpId,
plang->dobj.name,
! lanschema, NULL, plang->lanowner, 0,
false, "PROCEDURAL LANGUAGE", SECTION_PRE_DATA,
defqry->data, delqry->data, NULL,
plang->dobj.dependencies, plang->dobj.nDeps,
*************** dumpFunc(Archive *fout, FuncInfo *finfo)
*** 8322,8328 ****
funcsig_tag,
finfo->dobj.namespace->dobj.name,
NULL,
! finfo->rolname, false,
"FUNCTION", SECTION_PRE_DATA,
q->data, delqry->data, NULL,
finfo->dobj.dependencies, finfo->dobj.nDeps,
--- 8281,8287 ----
funcsig_tag,
finfo->dobj.namespace->dobj.name,
NULL,
! finfo->rolname, 0, false,
"FUNCTION", SECTION_PRE_DATA,
q->data, delqry->data, NULL,
finfo->dobj.dependencies, finfo->dobj.nDeps,
*************** dumpCast(Archive *fout, CastInfo *cast)
*** 8478,8484 ****
ArchiveEntry(fout, cast->dobj.catId, cast->dobj.dumpId,
castsig->data,
! "pg_catalog", NULL, "",
false, "CAST", SECTION_PRE_DATA,
defqry->data, delqry->data, NULL,
cast->dobj.dependencies, cast->dobj.nDeps,
--- 8437,8443 ----
ArchiveEntry(fout, cast->dobj.catId, cast->dobj.dumpId,
castsig->data,
! "pg_catalog", NULL, "", 0,
false, "CAST", SECTION_PRE_DATA,
defqry->data, delqry->data, NULL,
cast->dobj.dependencies, cast->dobj.nDeps,
*************** dumpOpr(Archive *fout, OprInfo *oprinfo)
*** 8722,8728 ****
oprinfo->dobj.name,
oprinfo->dobj.namespace->dobj.name,
NULL,
! oprinfo->rolname,
false, "OPERATOR", SECTION_PRE_DATA,
q->data, delq->data, NULL,
oprinfo->dobj.dependencies, oprinfo->dobj.nDeps,
--- 8681,8687 ----
oprinfo->dobj.name,
oprinfo->dobj.namespace->dobj.name,
NULL,
! oprinfo->rolname, 0,
false, "OPERATOR", SECTION_PRE_DATA,
q->data, delq->data, NULL,
oprinfo->dobj.dependencies, oprinfo->dobj.nDeps,
*************** dumpOpclass(Archive *fout, OpclassInfo *
*** 9181,9187 ****
opcinfo->dobj.name,
opcinfo->dobj.namespace->dobj.name,
NULL,
! opcinfo->rolname,
false, "OPERATOR CLASS", SECTION_PRE_DATA,
q->data, delq->data, NULL,
opcinfo->dobj.dependencies, opcinfo->dobj.nDeps,
--- 9140,9146 ----
opcinfo->dobj.name,
opcinfo->dobj.namespace->dobj.name,
NULL,
! opcinfo->rolname, 0,
false, "OPERATOR CLASS", SECTION_PRE_DATA,
q->data, delq->data, NULL,
opcinfo->dobj.dependencies, opcinfo->dobj.nDeps,
*************** dumpOpfamily(Archive *fout, OpfamilyInfo
*** 9462,9468 ****
opfinfo->dobj.name,
opfinfo->dobj.namespace->dobj.name,
NULL,
! opfinfo->rolname,
false, "OPERATOR FAMILY", SECTION_PRE_DATA,
q->data, delq->data, NULL,
opfinfo->dobj.dependencies, opfinfo->dobj.nDeps,
--- 9421,9427 ----
opfinfo->dobj.name,
opfinfo->dobj.namespace->dobj.name,
NULL,
! opfinfo->rolname, 0,
false, "OPERATOR FAMILY", SECTION_PRE_DATA,
q->data, delq->data, NULL,
opfinfo->dobj.dependencies, opfinfo->dobj.nDeps,
*************** dumpConversion(Archive *fout, ConvInfo *
*** 9578,9584 ****
convinfo->dobj.name,
convinfo->dobj.namespace->dobj.name,
NULL,
! convinfo->rolname,
false, "CONVERSION", SECTION_PRE_DATA,
q->data, delq->data, NULL,
convinfo->dobj.dependencies, convinfo->dobj.nDeps,
--- 9537,9543 ----
convinfo->dobj.name,
convinfo->dobj.namespace->dobj.name,
NULL,
! convinfo->rolname, 0,
false, "CONVERSION", SECTION_PRE_DATA,
q->data, delq->data, NULL,
convinfo->dobj.dependencies, convinfo->dobj.nDeps,
*************** dumpAgg(Archive *fout, AggInfo *agginfo)
*** 9822,9828 ****
aggsig_tag,
agginfo->aggfn.dobj.namespace->dobj.name,
NULL,
! agginfo->aggfn.rolname,
false, "AGGREGATE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
agginfo->aggfn.dobj.dependencies, agginfo->aggfn.dobj.nDeps,
--- 9781,9787 ----
aggsig_tag,
agginfo->aggfn.dobj.namespace->dobj.name,
NULL,
! agginfo->aggfn.rolname, 0,
false, "AGGREGATE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
agginfo->aggfn.dobj.dependencies, agginfo->aggfn.dobj.nDeps,
*************** dumpTSParser(Archive *fout, TSParserInfo
*** 9914,9919 ****
--- 9873,9879 ----
prsinfo->dobj.namespace->dobj.name,
NULL,
"",
+ 0,
false, "TEXT SEARCH PARSER", SECTION_PRE_DATA,
q->data, delq->data, NULL,
prsinfo->dobj.dependencies, prsinfo->dobj.nDeps,
*************** dumpTSDictionary(Archive *fout, TSDictIn
*** 10006,10011 ****
--- 9966,9972 ----
dictinfo->dobj.namespace->dobj.name,
NULL,
dictinfo->rolname,
+ 0,
false, "TEXT SEARCH DICTIONARY", SECTION_PRE_DATA,
q->data, delq->data, NULL,
dictinfo->dobj.dependencies, dictinfo->dobj.nDeps,
*************** dumpTSTemplate(Archive *fout, TSTemplate
*** 10066,10071 ****
--- 10027,10033 ----
tmplinfo->dobj.namespace->dobj.name,
NULL,
"",
+ 0,
false, "TEXT SEARCH TEMPLATE", SECTION_PRE_DATA,
q->data, delq->data, NULL,
tmplinfo->dobj.dependencies, tmplinfo->dobj.nDeps,
*************** dumpTSConfig(Archive *fout, TSConfigInfo
*** 10199,10204 ****
--- 10161,10167 ----
cfginfo->dobj.namespace->dobj.name,
NULL,
cfginfo->rolname,
+ 0,
false, "TEXT SEARCH CONFIGURATION", SECTION_PRE_DATA,
q->data, delq->data, NULL,
cfginfo->dobj.dependencies, cfginfo->dobj.nDeps,
*************** dumpForeignDataWrapper(Archive *fout, Fd
*** 10255,10260 ****
--- 10218,10224 ----
NULL,
NULL,
fdwinfo->rolname,
+ 0,
false, "FOREIGN DATA WRAPPER", SECTION_PRE_DATA,
q->data, delq->data, NULL,
fdwinfo->dobj.dependencies, fdwinfo->dobj.nDeps,
*************** dumpForeignServer(Archive *fout, Foreign
*** 10343,10348 ****
--- 10307,10313 ----
NULL,
NULL,
srvinfo->rolname,
+ 0,
false, "SERVER", SECTION_PRE_DATA,
q->data, delq->data, NULL,
srvinfo->dobj.dependencies, srvinfo->dobj.nDeps,
*************** dumpUserMappings(Archive *fout,
*** 10448,10454 ****
tag->data,
namespace,
NULL,
! owner, false,
"USER MAPPING", SECTION_PRE_DATA,
q->data, delq->data, NULL,
&dumpId, 1,
--- 10413,10419 ----
tag->data,
namespace,
NULL,
! owner, 0, false,
"USER MAPPING", SECTION_PRE_DATA,
q->data, delq->data, NULL,
&dumpId, 1,
*************** dumpDefaultACL(Archive *fout, DefaultACL
*** 10519,10524 ****
--- 10484,10490 ----
daclinfo->dobj.namespace ? daclinfo->dobj.namespace->dobj.name : NULL,
NULL,
daclinfo->defaclrole,
+ 0,
false, "DEFAULT ACL", SECTION_NONE,
q->data, "", NULL,
daclinfo->dobj.dependencies, daclinfo->dobj.nDeps,
*************** dumpACL(Archive *fout, CatalogId objCatI
*** 10576,10581 ****
--- 10542,10548 ----
tag, nspname,
NULL,
owner ? owner : "",
+ 0,
false, "ACL", SECTION_NONE,
sql->data, "", NULL,
&(objDumpId), 1,
*************** dumpSecLabel(Archive *fout, const char *
*** 10652,10658 ****
{
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target, namespace, NULL, owner,
! false, "SECURITY LABEL", SECTION_NONE,
query->data, "", NULL,
&(dumpId), 1,
NULL, NULL);
--- 10619,10625 ----
{
ArchiveEntry(fout, nilCatalogId, createDumpId(),
target, namespace, NULL, owner,
! 0, false, "SECURITY LABEL", SECTION_NONE,
query->data, "", NULL,
&(dumpId), 1,
NULL, NULL);
*************** dumpTableSecLabel(Archive *fout, TableIn
*** 10730,10736 ****
target->data,
tbinfo->dobj.namespace->dobj.name,
NULL, tbinfo->rolname,
! false, "SECURITY LABEL", SECTION_NONE,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
NULL, NULL);
--- 10697,10703 ----
target->data,
tbinfo->dobj.namespace->dobj.name,
NULL, tbinfo->rolname,
! 0, false, "SECURITY LABEL", SECTION_NONE,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
NULL, NULL);
*************** dumpTableSchema(Archive *fout, TableInfo
*** 11384,11389 ****
--- 11351,11357 ----
tbinfo->dobj.namespace->dobj.name,
(tbinfo->relkind == RELKIND_VIEW) ? NULL : tbinfo->reltablespace,
tbinfo->rolname,
+ 0,
(strcmp(reltypename, "TABLE") == 0) ? tbinfo->hasoids : false,
reltypename, SECTION_PRE_DATA,
q->data, delq->data, NULL,
*************** dumpAttrDef(Archive *fout, AttrDefInfo *
*** 11456,11461 ****
--- 11424,11430 ----
tbinfo->dobj.namespace->dobj.name,
NULL,
tbinfo->rolname,
+ 0,
false, "DEFAULT", SECTION_PRE_DATA,
q->data, delq->data, NULL,
adinfo->dobj.dependencies, adinfo->dobj.nDeps,
*************** dumpIndex(Archive *fout, IndxInfo *indxi
*** 11552,11558 ****
indxinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
indxinfo->tablespace,
! tbinfo->rolname, false,
"INDEX", SECTION_POST_DATA,
q->data, delq->data, NULL,
indxinfo->dobj.dependencies, indxinfo->dobj.nDeps,
--- 11521,11527 ----
indxinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
indxinfo->tablespace,
! tbinfo->rolname, indxinfo->relpages, false,
"INDEX", SECTION_POST_DATA,
q->data, delq->data, NULL,
indxinfo->dobj.dependencies, indxinfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 11677,11683 ****
coninfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
indxinfo->tablespace,
! tbinfo->rolname, false,
"CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 11646,11652 ----
coninfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
indxinfo->tablespace,
! tbinfo->rolname, 0, false,
"CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 11710,11716 ****
coninfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, false,
"FK CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 11679,11685 ----
coninfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, 0, false,
"FK CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 11745,11751 ****
coninfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, false,
"CHECK CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 11714,11720 ----
coninfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, 0, false,
"CHECK CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpConstraint(Archive *fout, Constraint
*** 11781,11787 ****
coninfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, false,
"CHECK CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
--- 11750,11756 ----
coninfo->dobj.name,
tyinfo->dobj.namespace->dobj.name,
NULL,
! tyinfo->rolname, 0, false,
"CHECK CONSTRAINT", SECTION_POST_DATA,
q->data, delq->data, NULL,
coninfo->dobj.dependencies, coninfo->dobj.nDeps,
*************** dumpSequence(Archive *fout, TableInfo *t
*** 12066,12072 ****
tbinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname,
false, "SEQUENCE", SECTION_PRE_DATA,
query->data, delqry->data, NULL,
tbinfo->dobj.dependencies, tbinfo->dobj.nDeps,
--- 12035,12041 ----
tbinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, 0,
false, "SEQUENCE", SECTION_PRE_DATA,
query->data, delqry->data, NULL,
tbinfo->dobj.dependencies, tbinfo->dobj.nDeps,
*************** dumpSequence(Archive *fout, TableInfo *t
*** 12102,12108 ****
tbinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname,
false, "SEQUENCE OWNED BY", SECTION_PRE_DATA,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
--- 12071,12077 ----
tbinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, 0,
false, "SEQUENCE OWNED BY", SECTION_PRE_DATA,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
*************** dumpSequence(Archive *fout, TableInfo *t
*** 12134,12139 ****
--- 12103,12109 ----
tbinfo->dobj.namespace->dobj.name,
NULL,
tbinfo->rolname,
+ 0,
false, "SEQUENCE SET", SECTION_PRE_DATA,
query->data, "", NULL,
&(tbinfo->dobj.dumpId), 1,
*************** dumpTrigger(Archive *fout, TriggerInfo *
*** 12326,12332 ****
tginfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, false,
"TRIGGER", SECTION_POST_DATA,
query->data, delqry->data, NULL,
tginfo->dobj.dependencies, tginfo->dobj.nDeps,
--- 12296,12302 ----
tginfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, 0, false,
"TRIGGER", SECTION_POST_DATA,
query->data, delqry->data, NULL,
tginfo->dobj.dependencies, tginfo->dobj.nDeps,
*************** dumpRule(Archive *fout, RuleInfo *rinfo)
*** 12446,12452 ****
rinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, false,
"RULE", SECTION_POST_DATA,
cmd->data, delcmd->data, NULL,
rinfo->dobj.dependencies, rinfo->dobj.nDeps,
--- 12416,12422 ----
rinfo->dobj.name,
tbinfo->dobj.namespace->dobj.name,
NULL,
! tbinfo->rolname, 0, false,
"RULE", SECTION_POST_DATA,
cmd->data, delcmd->data, NULL,
rinfo->dobj.dependencies, rinfo->dobj.nDeps,
*************** check_sql_result(PGresult *res, PGconn *
*** 12880,12882 ****
--- 12850,12963 ----
write_msg(NULL, "The command was: %s\n", query);
exit_nicely();
}
+
+
+ void
+ SetupConnection(PGconn *conn, const char* syncId, const char *dumpencoding, const char *use_role)
+ {
+ const char *std_strings;
+
+ /* Set the client encoding if requested */
+ if (dumpencoding)
+ {
+ if (PQsetClientEncoding(conn, dumpencoding) < 0)
+ {
+ write_msg(NULL, "invalid client encoding \"%s\" specified\n",
+ dumpencoding);
+ exit(1);
+ }
+ }
+
+ /*
+ * Get the active encoding and the standard_conforming_strings setting, so
+ * we know how to escape strings.
+ */
+ g_fout->encoding = PQclientEncoding(conn);
+
+ std_strings = PQparameterStatus(conn, "standard_conforming_strings");
+ g_fout->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
+
+ /* Set the role if requested */
+ if (use_role && g_fout->remoteVersion >= 80100)
+ {
+ PQExpBuffer query = createPQExpBuffer();
+
+ appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
+ do_sql_command(conn, query->data);
+ destroyPQExpBuffer(query);
+ }
+
+ /* Set the datestyle to ISO to ensure the dump's portability */
+ do_sql_command(conn, "SET DATESTYLE = ISO");
+
+ /* Likewise, avoid using sql_standard intervalstyle */
+ if (g_fout->remoteVersion >= 80400)
+ do_sql_command(conn, "SET INTERVALSTYLE = POSTGRES");
+
+ /*
+ * If supported, set extra_float_digits so that we can dump float data
+ * exactly (given correctly implemented float I/O code, anyway)
+ */
+ if (g_fout->remoteVersion >= 80500)
+ do_sql_command(conn, "SET extra_float_digits TO 3");
+ else if (g_fout->remoteVersion >= 70400)
+ do_sql_command(conn, "SET extra_float_digits TO 2");
+
+ /*
+ * If synchronized scanning is supported, disable it, to prevent
+ * unpredictable changes in row ordering across a dump and reload.
+ */
+ if (g_fout->remoteVersion >= 80300)
+ do_sql_command(conn, "SET synchronize_seqscans TO off");
+
+ /*
+ * Quote all identifiers, if requested.
+ */
+ if (quote_all_identifiers && g_fout->remoteVersion >= 90100)
+ do_sql_command(g_conn, "SET quote_all_identifiers = true");
+
+ /*
+ * Disables security label support if server version < v9.1.x
+ */
+ if (!no_security_label && g_fout->remoteVersion < 90100)
+ no_security_label = 1;
+
+ /*
+ * Disable timeouts if supported.
+ */
+ if (g_fout->remoteVersion >= 70300)
+ do_sql_command(conn, "SET statement_timeout = 0");
+
+ /*
+ * Start serializable transaction to dump consistent data.
+ */
+ do_sql_command(conn, "BEGIN");
+
+ do_sql_command(conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+
+ #ifdef HAVE_SNAPSHOT_HACK
+ appendPQExpBuffer(buf, "SELECT pg_synchronize_snapshot_taken('%s')", syncId);
+ res = PQexec(g_conn, buf->data);
+ check_sql_result(res, g_conn, buf->data, PGRES_TUPLES_OK);
+ #endif
+
+ /* Select the appropriate subquery to convert user IDs to names */
+ if (g_fout->remoteVersion >= 80100)
+ username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
+ else if (g_fout->remoteVersion >= 70300)
+ username_subquery = "SELECT usename FROM pg_catalog.pg_user WHERE usesysid =";
+ else
+ username_subquery = "SELECT usename FROM pg_user WHERE usesysid =";
+
+ /* Find the last built-in OID, if needed */
+ if (g_fout->remoteVersion < 70300)
+ {
+ if (g_fout->remoteVersion >= 70100)
+ g_last_builtin_oid = findLastBuiltinOid_V71(PQdb(conn));
+ else
+ g_last_builtin_oid = findLastBuiltinOid_V70();
+ if (g_verbose)
+ write_msg(NULL, "last built-in OID is %u\n", g_last_builtin_oid);
+ }
+ }
+
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 0f643b9..42463e2 100644
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
*************** typedef struct _tableInfo
*** 234,239 ****
--- 234,240 ----
/* these two are set only if table is a sequence owned by a column: */
Oid owning_tab; /* OID of table owning sequence */
int owning_col; /* attr # of column owning sequence */
+ int relpages;
bool interesting; /* true if need to collect more data */
*************** typedef struct _indxInfo
*** 302,307 ****
--- 303,309 ----
bool indisclustered;
/* if there is an associated constraint object, its dumpId: */
DumpId indexconstraint;
+ int relpages; /* relpages of the underlying table */
} IndxInfo;
typedef struct _ruleInfo
*************** extern void parseOidArray(const char *st
*** 508,513 ****
--- 510,516 ----
extern void sortDumpableObjects(DumpableObject **objs, int numObjs);
extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs);
extern void sortDumpableObjectsByTypeOid(DumpableObject **objs, int numObjs);
+ extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs);
/*
* version specific routines
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index a52d03c..5853c13 100644
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
*************** static void repairDependencyLoop(Dumpabl
*** 116,121 ****
--- 116,198 ----
static void describeDumpableObject(DumpableObject *obj,
char *buf, int bufsize);
+ static int
+ DOSizeCompare(const void *p1, const void *p2);
+ static int
+ findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs)
+ {
+ int i;
+ for (i = 0; i < numObjs; i++)
+ if (objs[i]->objType == type)
+ return i;
+ return -1;
+ }
+
+ static int
+ findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start)
+ {
+ int i;
+ for (i = start; i < numObjs; i++)
+ if (objs[i]->objType != type)
+ return i;
+ return numObjs - 1;
+ }
+
+
+ void
+ sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs)
+ {
+ int startIdx, endIdx;
+ void *startPtr;
+
+ if (numObjs <= 1)
+ return;
+
+ startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs);
+ if (startIdx >= 0)
+ {
+ endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx);
+ startPtr = objs + startIdx;
+ qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
+ DOSizeCompare);
+ }
+
+ startIdx = findFirstEqualType(DO_INDEX, objs, numObjs);
+ if (startIdx >= 0)
+ {
+ endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx);
+ startPtr = objs + startIdx;
+ qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *),
+ DOSizeCompare);
+ }
+ }
+
+ static int
+ DOSizeCompare(const void *p1, const void *p2)
+ {
+ DumpableObject *obj1 = *(DumpableObject **) p1;
+ DumpableObject *obj2 = *(DumpableObject **) p2;
+ int obj1_size = 0;
+ int obj2_size = 0;
+
+ if (obj1->objType == DO_TABLE_DATA)
+ obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages;
+ if (obj1->objType == DO_INDEX)
+ obj1_size = ((IndxInfo *) obj1)->relpages;
+
+ if (obj2->objType == DO_TABLE_DATA)
+ obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages;
+ if (obj2->objType == DO_INDEX)
+ obj2_size = ((IndxInfo *) obj2)->relpages;
+
+ /* we want to see the biggest item go first */
+ if (obj1_size > obj2_size)
+ return -1;
+ if (obj2_size > obj1_size)
+ return 1;
+
+ return 0;
+ }
/*
* Sort the given objects into a type/name-based ordering
diff --git a/src/bin/pg_dump/test.sh b/src/bin/pg_dump/test.sh
index 23547fa..cb984ca 100755
*** a/src/bin/pg_dump/test.sh
--- b/src/bin/pg_dump/test.sh
***************
*** 1,5 ****
--- 1,45 ----
#!/bin/sh -x
+ # parallel lzf directory (multiple directories)
+ rm -rf dir1 dir2 dir3
+ dropdb foodb
+ createdb --template=template0 foodb --lc-ctype=C
+ psql foodb -c "alter database foodb set lc_monetary to 'C'"
+ ./pg_dump -j 4 --compress-lzf -Fd -f dir1:dir2:dir3 regression || exit 1
+ ./pg_restore -k -Fd dir3:dir1:dir2 -d foodb || exit 1
+ ./pg_restore -j 4 -Fd dir1:dir2:dir3 -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
+
+ # parallel lzf directory
+ rm -rf out.dir
+ dropdb foodb
+ createdb --template=template0 foodb --lc-ctype=C
+ psql foodb -c "alter database foodb set lc_monetary to 'C'"
+ ./pg_dump -j 4 --compress-lzf -Fd -f out.dir regression || exit 1
+ ./pg_restore -j 4 out.dir -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
+
+ # parallel gzip directory
+ rm -rf out.dir
+ dropdb foodb
+ createdb --template=template0 foodb --lc-ctype=C
+ psql foodb -c "alter database foodb set lc_monetary to 'C'"
+ ./pg_dump -j 7 --compress=4 -Fd -f out.dir regression || exit 1
+ ./pg_restore -j 5 out.dir -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
+
+ # parallel lzf custom
+ rm out.custom
+ dropdb foodb
+ createdb --template=template0 foodb --lc-ctype=C
+ psql foodb -c "alter database foodb set lc_monetary to 'C'"
+ ./pg_dump --compress-lzf -Fc -f out.custom regression || exit 1
+ ./pg_restore -j 4 out.custom -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
# lzf compression
rm -rf out.dir
*************** psql foodb -c "alter database foodb set
*** 9,14 ****
--- 49,56 ----
#./pg_dump --column-inserts --compress-lzf -Fd -f out.dir regression || exit 1
./pg_dump --compress-lzf -Fd -f out.dir regression || exit 1
./pg_restore out.dir -d foodb && ./pg_restore -k out.dir || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
# zlib compression
rm -rf out.dir
*************** psql foodb -c "alter database foodb set
*** 18,24 ****
--- 60,69 ----
./pg_dump --compress=4 -Fd -f out.dir regression || exit 1
./pg_restore out.dir -d foodb || exit 1
./pg_restore -k out.dir || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
+ # zlib custom
rm out.custom
dropdb foodb
createdb --template=template0 foodb --lc-ctype=C
*************** psql foodb -c "alter database foodb set
*** 26,31 ****
--- 71,88 ----
#./pg_dump --inserts --compress=8 -Fc -f out.custom regression || exit 1
./pg_dump --compress=8 -Fc -f out.custom regression || exit 1
./pg_restore out.custom -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
+
+ # lzf custom
+ rm out.custom
+ dropdb foodb
+ createdb --template=template0 foodb --lc-ctype=C
+ psql foodb -c "alter database foodb set lc_monetary to 'C'"
+ ./pg_dump --compress-lzf -Fc -f out.custom regression || exit 1
+ ./pg_restore out.custom -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
# no compression
rm -rf out.dir
*************** psql foodb -c "alter database foodb set
*** 35,40 ****
--- 92,99 ----
./pg_dump --disable-dollar-quoting --compress=0 -Fd -f out.dir regression || exit 1
./pg_restore out.dir -d foodb || exit 1
./pg_restore -k out.dir || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
rm out.custom
dropdb foodb
*************** createdb --template=template0 foodb --lc
*** 42,68 ****
--- 101,137 ----
psql foodb -c "alter database foodb set lc_monetary to 'C'"
./pg_dump --quote-all-identifiers --compress=0 -Fc -f out.custom regression || exit 1
./pg_restore out.custom -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
dropdb foodb
createdb --template=template0 foodb --lc-ctype=C
psql foodb -c "alter database foodb set lc_monetary to 'C'"
pg_dump -Ft regression | pg_restore -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
dropdb foodb
createdb --template=template0 foodb --lc-ctype=C
psql foodb -c "alter database foodb set lc_monetary to 'C'"
pg_dump regression | psql foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
# restore 9.0 archives
dropdb foodb
createdb --template=template0 foodb --lc-ctype=C
psql foodb -c "alter database foodb set lc_monetary to 'C'"
./pg_restore out.cust.none.90 -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
dropdb foodb
createdb --template=template0 foodb --lc-ctype=C
psql foodb -c "alter database foodb set lc_monetary to 'C'"
./pg_restore out.cust.z.90 -d foodb || exit 1
+ psql foodb -c 'select distinct loid from pg_largeobject'
+ psql foodb -c '\lo_list'
echo Success
On Sun, Nov 14, 2010 at 6:52 PM, Joachim Wieland <joe@mcknight.de> wrote:
You would add a regular parallel dump with
$ pg_dump -j 4 -Fd -f out.dir dbname
So this is an updated series of patches for my parallel pg_dump WIP
patch. Most importantly it now runs on Windows once you get it to
compile there (I have added the new files to the respective project of
Mkvcbuild.pm but I wondered why the other archive formats do not need
to be defined in that file...).
So far nobody has volunteered to review this patch. It would be great
if people could at least check it out, run it and let me know if it
works and if they have any comments.
I have put all four patches in a tar archive, the patches must be
applied sequentially:
1. pg_dump_compression-refactor.diff
2. pg_dump_directory.diff
3. pg_dump_directory_parallel.diff
4. pg_dump_directory_parallel_lzf.diff
The compression-refactor patch does not include Heikki's latest changes yet.
And the last of the four patches adds LZF compression for whoever
wants to try that out. You need to link against an already installed
liblzf and call it with --compress-lzf.
Joachim
Attachments:
pg_dump_parallel.tar.gzapplication/x-gzip; name=pg_dump_parallel.tar.gzDownload
� o-�L �<kw����*�
$mS��d�oGMz?b5������l��Ph��"uI*������ �%;��=�>�"��`f0��\������F�I�����F�=�M��9�=������o{s�m�l���[{��j�no�lm��ol�|�jo����a��0���$ub���W�������.1k4���9kI��
�pm"�b�W��{~��`��?�O������lro��h;�MK����:�J���{��c�G������o�N��/����������� g��5�M����o3���w�����c����I���� ���������w ��E��g8
����=a$j�������w��~~xQh�9����D��`���&�����&�"