autovacuum multiworkers, patch 5
Hi,
Here is the autovacuum patch I am currently working with. This is
basically the same as the previous patch; I have tweaked the database
list management so that after a change in databases (say a new database
is created or a database is dropped), the list is recomputed to account
for the change, keeping the ordering of the previous list.
Modulo two low probability failure scenarios, I feel this patch is ready
to be applied; I will do so on Friday unless there are objections.
The failure scenarios are detailed in the comment pasted below. I
intend to attack these problems next, but as the first one should be
fairly low probability, I don't think it should bar the current patch
from being applied. (The second problem, which seems to me to be the
most serious, should be easily fixable by checking launch times and
"aborting" processes that took longer than autovacuum_naptime to start).
/*
* Main loop for the autovacuum launcher process.
*
* The signalling between launcher and worker is as follows:
*
* When the worker has finished starting up, it stores its PID in wi_workerpid
* and sends a SIGUSR1 signal to the launcher. The launcher then knows that
* the postmaster is ready to start a new worker. We do it this way because
* otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
* yet processed the last one, in which case the second signal would be lost.
* This is only useful when two workers need to be started close to one
* another, which should be rare but it's possible.
*
* Additionally, when the worker is finished with the vacuum work, it sets the
* wi_finished flag and sends a SIGUSR1 signal to the launcher. Upon receipt
* of this signal, the launcher then clears the entry for future use and may
* start another worker right away, if need be.
*
* There is at least one race condition here: if the workers are all busy, a
* database needs immediate attention and a worker finishes just after the
* launcher started a worker and sent the signal to postmaster, but before
* postmaster processes the signal; at this point, the launcher receives a
* signal from the finishing process, sees the empty slot, and sends the
* signal to postmaster again to start another worker. But the postmaster
* SendPostmasterSignal() flag was already set, so the signal is lost. To
* avoid this problem, the launcher should not try to start a new worker until
* all WorkerInfo entries that have the wi_dboid field set have a PID assigned.
* FIXME someday. The problem is that if we have workers failing to start for
* some reason, holding the start of new workers will worsen the starvation by
* disabling the start of a new worker as soon as one worker fails to start.
* So it's important to be able to distinguish a worker that has failed
* starting from a worker that is just taking its little bit of time to do so.
*
* There is another potential problem if, for some reason, a worker starts and
* is not able to finish correctly. It will not be able to set its finished
* flag, so the launcher will believe that it's still starting up. To prevent
* this problem, we should check the PGPROCs of worker processes, and clean
* them up if we find they are not actually running (or they correspond to
* processes that are not autovacuum workers.) FIXME someday.
*/
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Alvaro Herrera wrote:
Hi,
Here is the autovacuum patch I am currently working with.
Obviously I forgot to attach the patch, sorry.
--
Alvaro Herrera Developer, http://www.PostgreSQL.org/
"Para tener m�s hay que desear menos"
Attachments:
autovacuum-multiworkers-5.patchtext/x-diff; charset=us-asciiDownload
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c 28 Mar 2007 22:17:12 -0000 1.40
--- src/backend/postmaster/autovacuum.c 4 Apr 2007 23:34:15 -0000
***************
*** 52,57 ****
--- 52,58 ----
#include "utils/syscache.h"
+ static volatile sig_atomic_t got_SIGUSR1 = false;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t avlauncher_shutdown_request = false;
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 60,66 ----
* GUC parameters
*/
bool autovacuum_start_daemon = false;
+ int autovacuum_max_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
*************** int autovacuum_freeze_max_age;
*** 69,75 ****
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flag to tell if we are in the autovacuum daemon process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
--- 71,77 ----
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flags to tell if we are in an autovacuum process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
*************** static int default_freeze_min_age;
*** 82,95 ****
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
{
! Oid ad_datid;
! char *ad_name;
! TransactionId ad_frozenxid;
! PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
--- 84,105 ----
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
{
! Oid adl_datid; /* hash key -- must be first */
! TimestampTz adl_next_worker;
! int adl_score;
! } avl_dbase;
!
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! Oid adw_datid;
! char *adw_name;
! TransactionId adw_frozenxid;
! PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
int at_vacuum_cost_limit;
} autovac_table;
typedef struct
{
! Oid process_db; /* OID of database to process */
! int worker_pid; /* PID of the worker process, if any */
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
--- 120,169 ----
int at_vacuum_cost_limit;
} autovac_table;
+ /*-------------
+ * This struct holds information about a single worker's whereabouts. We keep
+ * an array of these in shared memory, sized according to
+ * autovacuum_max_workers.
+ *
+ * wi_dboid OID of the database this worker is supposed to work on
+ * wi_tableoid OID of the table currently being vacuumed
+ * wi_workerpid PID of the running worker, 0 if not yet started
+ * wi_finished True when the worker is done and about to exit
+ *
+ * The locking for this is a bit weird: all fields except wi_tableoid are
+ * protected by AutovacuumLock, and wi_tableoid is protected by
+ * AutovacuumScheduleLock.
+ *-------------
+ */
+ typedef struct
+ {
+ Oid wi_dboid;
+ Oid wi_tableoid;
+ int wi_workerpid;
+ bool wi_finished;
+ } WorkerInfo;
+
typedef struct
{
! pid_t av_launcherpid;
! WorkerInfo av_workers[1];
! /* VARIABLE LENGTH STRUCT */
} AutoVacuumShmemStruct;
+ /* Macro to iterate over all workers. Beware multiple evaluation of args! */
+ #define foreach_worker(_i, _worker) \
+ _worker = (WorkerInfo *) (AutoVacuumShmem + \
+ offsetof(AutoVacuumShmemStruct, av_workers)); \
+ for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
+
static AutoVacuumShmemStruct *AutoVacuumShmem;
+ /* number of currently free worker slots; only valid in the launcher */
+ static int free_workers;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static void do_start_worker(void);
! static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 171,185 ----
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static Oid do_start_worker(void);
! static int launcher_determine_sleep(bool canlaunch);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
!
! static void do_autovacuum(WorkerInfo *worker);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static void relation_needs_vacanalyze(Oi
*** 141,152 ****
static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
--- 193,204 ----
static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
bool doanalyze, int freeze_min_age);
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
*************** StartAutoVacLauncher(void)
*** 230,241 ****
/*
* Main loop for the autovacuum launcher process.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
- MemoryContext avlauncher_cxt;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 282,329 ----
/*
* Main loop for the autovacuum launcher process.
+ *
+ * The signalling between launcher and worker is as follows:
+ *
+ * When the worker has finished starting up, it stores its PID in wi_workerpid
+ * and sends a SIGUSR1 signal to the launcher. The launcher then knows that
+ * the postmaster is ready to start a new worker. We do it this way because
+ * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+ * yet processed the last one, in which case the second signal would be lost.
+ * This is only useful when two workers need to be started close to one
+ * another, which should be rare but it's possible.
+ *
+ * Additionally, when the worker is finished with the vacuum work, it sets the
+ * wi_finished flag and sends a SIGUSR1 signal to the launcher. Upon receipt
+ * of this signal, the launcher then clears the entry for future use and may
+ * start another worker right away, if need be.
+ *
+ * There is at least one race condition here: if the workers are all busy, a
+ * database needs immediate attention and a worker finishes just after the
+ * launcher started a worker and sent the signal to postmaster, but before
+ * postmaster processes the signal; at this point, the launcher receives a
+ * signal from the finishing process, sees the empty slot, and sends the
+ * signal to postmaster again to start another worker. But the postmaster
+ * SendPostmasterSignal() flag was already set, so the signal is lost. To
+ * avoid this problem, the launcher should not try to start a new worker until
+ * all WorkerInfo entries that have the wi_dboid field set have a PID assigned.
+ * FIXME someday. The problem is that if we have workers failing to start for
+ * some reason, holding the start of new workers will worsen the starvation by
+ * disabling the start of a new worker as soon as one worker fails to start.
+ * So it's important to be able to distinguish a worker that has failed
+ * starting from a worker that is just taking its little bit of time to do so.
+ *
+ * There is another potential problem if, for some reason, a worker starts and
+ * is not able to finish correctly. It will not be able to set its finished
+ * flag, so the launcher will believe that it's still starting up. To prevent
+ * this problem, we should check the PGPROCs of worker processes, and clean
+ * them up if we find they are not actually running (or they correspond to
+ * processes that are not autovacuum workers.) FIXME someday.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
* Set up signal handlers. Since this is an auxiliary process, it has
* particular signal requirements -- no deadlock checker or sinval
* catchup, for example.
- *
- * XXX It may be a good idea to receive signals when an avworker process
- * finishes.
*/
pqsignal(SIGHUP, avl_sighup_handler);
--- 352,357 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, SIG_IGN);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
--- 361,367 ----
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, avl_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(avlauncher_cxt);
/*
--- 385,396 ----
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(AutovacMemCxt);
/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(avlauncher_cxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(avlauncher_cxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
--- 421,431 ----
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(AutovacMemCxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(AutovacMemCxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
ereport(LOG,
(errmsg("autovacuum launcher started")));
PG_SETMASK(&UnBlockSig);
/*
! * take a nap before executing the first iteration, unless we were
! * requested an emergency run.
*/
! if (autovacuum_start_daemon)
! pg_usleep(autovacuum_naptime * 1000000L);
for (;;)
{
! int worker_pid;
/*
* Emergency bailout if postmaster has died. This is to avoid the
--- 446,476 ----
ereport(LOG,
(errmsg("autovacuum launcher started")));
+ /* must unblock signals before calling rebuild_database_list */
PG_SETMASK(&UnBlockSig);
+ /* in emergency mode, just start a worker and go away */
+ if (!autovacuum_start_daemon)
+ {
+ do_start_worker();
+ proc_exit(0); /* done */
+ }
+
+ AutoVacuumShmem->av_launcherpid = MyProcPid;
+
/*
! * Create the initial database list. The invariant we want this list to
! * keep is that it's ordered by decreasing next_time. As soon as an entry is updated to
! * a higher time, it will be moved to the front (which is correct because
! * the only operation is to add autovacuum_naptime to the entry, and time
! * always increases).
*/
! rebuild_database_list(InvalidOid);
! free_workers = autovacuum_max_workers;
for (;;)
{
! int millis;
/*
* Emergency bailout if postmaster has died. This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 479,490 ----
if (!PostmasterIsAlive(true))
exit(1);
+ millis = launcher_determine_sleep(free_workers > 0);
+
+ /* Sleep for a while according to schedule */
+ pg_usleep(millis * 1000);
+
+ /* the normal shutdown case */
if (avlauncher_shutdown_request)
break;
*************** AutoVacLauncherMain(int argc, char *argv
*** 390,469 ****
ProcessConfigFile(PGC_SIGHUP);
}
/*
! * if there's a worker already running, sleep until it
! * disappears.
*/
! LWLockAcquire(AutovacuumLock, LW_SHARED);
! worker_pid = AutoVacuumShmem->worker_pid;
! LWLockRelease(AutovacuumLock);
!
! if (worker_pid != 0)
{
! PGPROC *proc = BackendPidGetProc(worker_pid);
! if (proc != NULL && proc->isAutovacuum)
! goto sleep;
else
{
/*
! * if the worker is not really running (or it's a process
! * that's not an autovacuum worker), remove the PID from shmem.
! * This should not happen, because either the worker exits
! * cleanly, in which case it'll remove the PID, or it dies, in
! * which case postmaster will cause a system reset cycle.
*/
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! worker_pid = 0;
! LWLockRelease(AutovacuumLock);
}
}
- do_start_worker();
-
- sleep:
- /*
- * in emergency mode, exit immediately so that the postmaster can
- * request another run right away if needed.
- *
- * XXX -- maybe it would be better to handle this inside the launcher
- * itself.
- */
- if (!autovacuum_start_daemon)
- break;
-
/* have pgstat read the file again next time */
pgstat_clear_snapshot();
-
- /* now sleep until the next autovac iteration */
- pg_usleep(autovacuum_naptime * 1000000L);
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
proc_exit(0); /* done */
}
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker.
*/
! static void
do_start_worker(void)
{
List *dblist;
! bool for_xid_wrap;
! autovac_dbase *db;
! ListCell *cell;
TransactionId xidForceLimit;
/* Get a list of databases */
! dblist = autovac_get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 494,877 ----
ProcessConfigFile(PGC_SIGHUP);
}
+ /* a worker started up or finished */
+ if (got_SIGUSR1)
+ {
+ WorkerInfo *worker;
+ int i;
+
+ got_SIGUSR1 = false;
+
+ /* Walk the workers and clean up finished entries. */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ foreach_worker(i, worker)
+ {
+ if (worker->wi_finished)
+ {
+ worker->wi_tableoid = InvalidOid;
+ worker->wi_dboid = InvalidOid;
+ worker->wi_workerpid = 0;
+ worker->wi_finished = false;
+ free_workers++;
+ }
+ }
+ LWLockRelease(AutovacuumLock);
+ }
+
/*
! * See if there's need to start a new worker, and do so if possible.
! * If there are no free worker slots, avoid doing all this work, as
! * we will not be able to start the worker anyway.
*/
! if (free_workers > 0)
{
! TimestampTz current_time;
! Dlelem *elem;
!
! elem = DLGetTail(DatabaseList);
! current_time = GetCurrentTimestamp();
! if (elem != NULL)
! {
! avl_dbase *avdb = DLE_VAL(elem);
! long secs;
! int usecs;
!
! TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
!
! /* do we have to start a worker? */
! if (secs <= 0 && usecs <= 0)
! launch_worker(current_time);
! }
else
{
/*
! * Special case when the list is empty: start a worker right
! * away. This covers the initial case, when no database is in
! * pgstats (thus the list is empty).
*/
! launch_worker(current_time);
}
}
/* have pgstat read the file again next time */
pgstat_clear_snapshot();
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
+ AutoVacuumShmem->av_launcherpid = 0;
proc_exit(0); /* done */
}
+
+ /*
+ * Determine the time to sleep, in milliseconds, based on the database list.
+ *
+ * The "canlaunch" parameter indicates whether we can start a worker right now,
+ * for example due to the workers being all busy.
+ */
+ static int
+ launcher_determine_sleep(bool canlaunch)
+ {
+ long secs;
+ int usecs;
+ Dlelem *elem;
+
+ /*
+ * We sleep until the next scheduled vacuum. We trust that when the
+ * database list was built, care was taken so that no entries have times in
+ * the past; if the first entry has too close a next_worker value, or a
+ * time in the past, we will sleep a small nominal time.
+ */
+ if (!canlaunch)
+ {
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+ else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ TimestampTz current_time = GetCurrentTimestamp();
+ TimestampTz next_wakeup;
+
+ next_wakeup = avdb->adl_next_worker;
+ TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ }
+ else
+ {
+ /* list is empty, sleep for whole autovacuum_naptime seconds */
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+
+ /*
+ * someone screwed up (invalid entry on the list); sleep a nominal amount
+ */
+ if (secs <= 0L && usecs <= 0)
+ {
+ secs = 0;
+ usecs = 500000; /* 500 ms */
+ }
+
+ return secs * 1000 + usecs / 1000;
+ }
+
+ /*
+ * Build an updated DatabaseList. It must only contain databases that appear
+ * in pgstats, and must be sorted by next_worker from highest to lowest,
+ * distributed regularly across the next autovacuum_naptime interval.
+ *
+ * Receives the Oid of the database that made this list be generated (we call
+ * this the "new" database, because when the database was already present on
+ * the list, we expect that this function is not called at all). The
+ * preexisting list, if any, will be used to preserve the order of the
+ * databases in the autovacuum_naptime period. The new database is put at the
+ * end of the interval. The actual values are not saved, which should not be
+ * much of a problem.
+ */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ List *dblist;
+ ListCell *cell;
+ MemoryContext newcxt;
+ MemoryContext oldcxt;
+ MemoryContext tmpcxt;
+ HASHCTL hctl;
+ int score;
+ int nelems;
+ HTAB *dbhash;
+
+ newcxt = AllocSetContextCreate(AutovacMemCxt,
+ "AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ tmpcxt = AllocSetContextCreate(newcxt,
+ "tmp AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+ /*
+ * Implementing this is not as simple as it sounds, because we need to put
+ * the new database at the end of the list; next the databases that were
+ * already on the list, and finally (at the tail of the list) all the other
+ * databases that are not on the existing list.
+ *
+ * To do this, we build an empty hash table of scored databases. We will
+ * start with the lowest score (zero) for the new database, then increasing
+ * scores for the databases in the existing list, in order, and lastly
+ * increasing scores for all databases gotten via get_database_list() that
+ * are not already on the hash.
+ *
+ * Then we will put all the hash elements into an array, sort the array by
+ * score, and finally put the array elements into the new doubly linked
+ * list.
+ */
+ hctl.keysize = sizeof(Oid);
+ hctl.entrysize = sizeof(avl_dbase);
+ hctl.hash = oid_hash;
+ hctl.hcxt = tmpcxt;
+ dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /* start by inserting the new database */
+ score = 0;
+ if (OidIsValid(newdb))
+ {
+ avl_dbase *db;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider this database if it has a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(newdb);
+ if (entry != NULL)
+ {
+ /* we assume it isn't found because the hash was just created */
+ db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+
+ /* Now insert the databases from the existing list */
+ if (DatabaseList != NULL)
+ {
+ Dlelem *elem;
+
+ elem = DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ /*
+ * skip databases with no stat entries -- in particular, this
+ * gets rid of dropped databases
+ */
+ entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+
+ elem = DLGetSucc(elem);
+ }
+ }
+
+ /* finally, insert all qualifying databases not previously inserted */
+ dblist = get_database_list();
+ foreach (cell, dblist)
+ {
+ avw_dbase *avdb = lfirst(cell);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider databases with a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ /* only update the score if the database was not already on the hash */
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ nelems = score;
+
+ /* from here on, the allocated memory belongs to the new list */
+ MemoryContextSwitchTo(newcxt);
+ DatabaseList = DLNewList();
+
+ if (nelems > 0)
+ {
+ TimestampTz current_time;
+ int millis_increment;
+ avl_dbase *dbary;
+ avl_dbase *db;
+ HASH_SEQ_STATUS seq;
+ int i;
+
+ /* put all the hash elements into an array */
+ dbary = palloc(nelems * sizeof(avl_dbase));
+
+ i = 0;
+ hash_seq_init(&seq, dbhash);
+ while ((db = hash_seq_search(&seq)) != NULL)
+ memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+
+ /* sort the array */
+ qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+
+ /* this is the time interval between databases in the schedule */
+ millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ current_time = GetCurrentTimestamp();
+
+ /*
+ * move the elements from the array into the dllist, setting the
+ * next_worker while walking the array
+ */
+ for (i = 0; i < nelems; i++)
+ {
+ avl_dbase *db = &(dbary[i]);
+ Dlelem *elem;
+
+ current_time = TimestampTzPlusMilliseconds(current_time,
+ millis_increment);
+ db->adl_next_worker = current_time;
+
+ elem = DLNewElem(db);
+ /* later elements should go closer to the head of the list */
+ DLAddHead(DatabaseList, elem);
+ }
+ }
+
+ /* all done, clean up memory */
+ if (DatabaseListCxt != NULL)
+ MemoryContextDelete(DatabaseListCxt);
+ MemoryContextDelete(tmpcxt);
+ DatabaseListCxt = newcxt;
+ MemoryContextSwitchTo(oldcxt);
+ }
+
+ /* qsort comparator for avl_dbase, using adl_score */
+ static int
+ db_comparator(const void *a, const void *b)
+ {
+ if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ return 0;
+ else
+ return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+ }
+
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker. It fails gracefully if invoked when
! * autovacuum_workers are already active.
! *
! * Return value is the OID of the database that the worker is going to process,
! * or InvalidOid if no worker was actually started.
*/
! static Oid
do_start_worker(void)
{
List *dblist;
! WorkerInfo *worker;
! int i;
! ListCell *cell;
TransactionId xidForceLimit;
+ bool for_xid_wrap;
+ avw_dbase *avdb;
+ TimestampTz current_time;
+ bool skipit = false;
+
+ /*
+ * Find an unused WorkerInfo entry to set up. If there is none, go to
+ * sleep.
+ *
+ * NB: we only read the array here, and save a pointer where we'll
+ * write the entry later. Since this is the only process that creates
+ * new entries into the array, there's no risk that somebody else will
+ * use that pointer while we weren't looking.
+ */
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+ foreach_worker(i, worker)
+ {
+ /* Invalid database OID means unused worker entry; use it */
+ if (!OidIsValid(worker->wi_dboid))
+ break;
+ }
+ LWLockRelease(AutovacuumLock);
+
+ /* they're all used up */
+ if (i >= autovacuum_max_workers)
+ return InvalidOid;
/* Get a list of databases */
! dblist = get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
*************** do_start_worker(void)
*** 495,515 ****
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
! db = NULL;
for_xid_wrap = false;
foreach(cell, dblist)
{
! autovac_dbase *tmp = lfirst(cell);
/* Find pgstat entry if any */
! tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
/* Check to see if this one is at risk of wraparound */
! if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
{
! if (db == NULL ||
! TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! db = tmp;
for_xid_wrap = true;
continue;
}
--- 903,925 ----
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
! avdb = NULL;
for_xid_wrap = false;
+ current_time = GetCurrentTimestamp();
foreach(cell, dblist)
{
! avw_dbase *tmp = lfirst(cell);
! Dlelem *elem;
/* Find pgstat entry if any */
! tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
/* Check to see if this one is at risk of wraparound */
! if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
{
! if (avdb == NULL ||
! TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
! avdb = tmp;
for_xid_wrap = true;
continue;
}
*************** do_start_worker(void)
*** 520,545 ****
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
! if (!tmp->ad_entry)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
! if (db == NULL ||
! tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! db = tmp;
}
/* Found a database -- process it */
! if (db != NULL)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! AutoVacuumShmem->process_db = db->ad_datid;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
}
}
--- 930,1069 ----
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
! if (!tmp->adw_entry)
! continue;
!
! /*
! * Also, skip a database that appears on the database list as having
! * been processed recently (less than autovacuum_naptime seconds ago).
! * We do this so that we don't select a database which we just
! * selected, but that pgstat hasn't gotten around to updating the last
! * autovacuum time yet.
! */
! skipit = false;
! elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
!
! while (elem != NULL)
! {
! avl_dbase *dbp = DLE_VAL(elem);
!
! if (dbp->adl_datid == tmp->adw_datid)
! {
! TimestampTz curr_plus_naptime;
! TimestampTz next = dbp->adl_next_worker;
!
! curr_plus_naptime =
! TimestampTzPlusMilliseconds(current_time,
! autovacuum_naptime * 1000);
!
! /*
! * What we want here if to skip if next_worker falls between
! * the current time and the current time plus naptime.
! */
! if (timestamp_cmp_internal(current_time, next) > 0)
! skipit = false;
! else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! skipit = false;
! else
! skipit = true;
!
! break;
! }
! elem = DLGetPred(elem);
! }
! if (skipit)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
! if (avdb == NULL ||
! tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
! avdb = tmp;
}
/* Found a database -- process it */
! if (avdb != NULL)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! Assert(!OidIsValid(worker->wi_dboid));
! worker->wi_dboid = avdb->adw_datid;
! worker->wi_workerpid = 0;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+ return avdb->adw_datid;
+ }
+ else if (skipit)
+ {
+ /*
+ * If we skipped all databases on the list, rebuild it, because it
+ * probably contains a dropped database.
+ */
+ rebuild_database_list(InvalidOid);
+ }
+
+ return InvalidOid;
+ }
+
+ /*
+ * launch_worker
+ *
+ * Wrapper for starting a worker from the launcher. Besides actually starting
+ * it, update the database list to reflect the next time that another one will
+ * need to be started on the selected database. The actual database choice is
+ * left to do_start_worker.
+ *
+ * This routine is also expected to insert an entry into the database list if
+ * the selected database was previously absent from the list. It returns the
+ * new database list.
+ */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ Oid dbid;
+ Dlelem *elem;
+
+ dbid = do_start_worker();
+ if (OidIsValid(dbid))
+ {
+ free_workers--;
+
+ /*
+ * Walk the database list and update the corresponding entry. If the
+ * database is not on the list, we'll recreate the list.
+ */
+ elem = DatabaseList == NULL ? NULL : DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+
+ if (avdb->adl_datid == dbid)
+ {
+ /*
+ * add autovacuum_naptime seconds to the current time, and use
+ * that as the new "next_worker" field for this database.
+ */
+ avdb->adl_next_worker =
+ TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+
+ DLMoveToFront(elem);
+ break;
+ }
+ elem = DLGetSucc(elem);
+ }
+
+ /*
+ * If the database was not present in the database list, we rebuild the
+ * list. It's possible that the database does not get into the list
+ * anyway, for example if it's a database that doesn't have a pgstat
+ * entry, but this is not a problem because we don't want to schedule
+ * workers regularly into those in any case.
+ */
+ if (elem == NULL)
+ rebuild_database_list(dbid);
}
}
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 550,555 ****
--- 1074,1086 ----
got_SIGHUP = true;
}
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ got_SIGUSR1 = true;
+ }
+
static void
avlauncher_shutdown(SIGNAL_ARGS)
{
*************** NON_EXEC_STATIC void
*** 665,671 ****
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
! Oid dbid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 1196,1204 ----
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
! Oid dbid = InvalidOid;
! WorkerInfo *worker;
! int i;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 763,778 ****
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
! * Get the database Id we're going to work on, and announce our PID
! * in the shared memory area. We remove the database OID immediately
! * from the shared memory area.
*/
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
!
! dbid = AutoVacuumShmem->process_db;
! AutoVacuumShmem->process_db = InvalidOid;
! AutoVacuumShmem->worker_pid = MyProcPid;
LWLockRelease(AutovacuumLock);
if (OidIsValid(dbid))
--- 1296,1323 ----
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
! * Force statement_timeout to zero to avoid a timeout setting from
! * preventing regular maintenance from being executed.
*/
! SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
+ /*
+ * Walk the WorkerInfo array, and get the database OID we're going to work
+ * on. Use the first entry with PID 0 in the list, and advertise our PID
+ * on it, thus marking it used.
+ */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ foreach_worker(i, worker)
+ {
+ if (worker->wi_workerpid == 0)
+ {
+ dbid = worker->wi_dboid;
+ worker->wi_workerpid = MyProcPid;
+ break;
+ }
+ }
+ if (AutoVacuumShmem->av_launcherpid != 0)
+ kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
LWLockRelease(AutovacuumLock);
if (OidIsValid(dbid))
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 803,824 ****
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* And do an appropriate amount of work */
recentXid = ReadNewTransactionId();
! do_autovacuum();
}
- /*
- * Now remove our PID from shared memory, so that the launcher can start
- * another worker as soon as appropriate.
- */
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! AutoVacuumShmem->worker_pid = 0;
LWLockRelease(AutovacuumLock);
/* All done, go away */
--- 1348,1385 ----
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "AV worker",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* And do an appropriate amount of work */
recentXid = ReadNewTransactionId();
! do_autovacuum(worker);
}
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! if (!autovacuum_start_daemon)
! {
! /* in emergency mode we must cleanup after ourselves */
! worker->wi_workerpid = 0;
! worker->wi_dboid = InvalidOid;
! worker->wi_tableoid = InvalidOid;
! worker->wi_finished = false;
! }
! else
! {
! /*
! * Otherwise, let the launcher know we're done. Warning: must set the
! * flag before sending the signal. Note: we don't care about code
! * rearrangement from the compiler, because we're doing this with the
! * lock held, thus the launcher can't read the flag until we've
! * released the lock, below.
! */
! worker->wi_finished = true;
! if (AutoVacuumShmem->av_launcherpid != 0)
! kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
! }
LWLockRelease(AutovacuumLock);
/* All done, go away */
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 826,838 ****
}
/*
! * autovac_get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
! autovac_get_database_list(void)
{
char *filename;
List *dblist = NIL;
--- 1387,1399 ----
}
/*
! * get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
! get_database_list(void)
{
char *filename;
List *dblist = NIL;
*************** autovac_get_database_list(void)
*** 852,866 ****
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
! autovac_dbase *avdb;
! avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
! avdb->ad_datid = db_id;
! avdb->ad_name = pstrdup(thisname);
! avdb->ad_frozenxid = db_frozenxid;
/* this gets set later: */
! avdb->ad_entry = NULL;
dblist = lappend(dblist, avdb);
}
--- 1413,1427 ----
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
! avw_dbase *avdb;
! avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
! avdb->adw_datid = db_id;
! avdb->adw_name = pstrdup(thisname);
! avdb->adw_frozenxid = db_frozenxid;
/* this gets set later: */
! avdb->adw_entry = NULL;
dblist = lappend(dblist, avdb);
}
*************** autovac_get_database_list(void)
*** 878,884 ****
* order not to ignore shutdown commands for too long.
*/
static void
! do_autovacuum(void)
{
Relation classRel,
avRel;
--- 1439,1445 ----
* order not to ignore shutdown commands for too long.
*/
static void
! do_autovacuum(WorkerInfo *worker)
{
Relation classRel,
avRel;
*************** do_autovacuum(void)
*** 1038,1047 ****
--- 1599,1642 ----
Oid relid = lfirst_oid(cell);
autovac_table *tab;
char *relname;
+ WorkerInfo *other_worker;
+ int i;
+ bool skipit;
CHECK_FOR_INTERRUPTS();
/*
+ * hold schedule lock from here until we're sure that this table
+ * still needs vacuuming
+ */
+ LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+
+ /*
+ * Check whether the table is being vacuumed concurrently by another
+ * worker.
+ */
+ skipit = false;
+ foreach_worker(i, other_worker)
+ {
+ /*
+ * ignore not-yet-registered or not running workers, and workers in
+ * other databases
+ */
+ if (other_worker->wi_workerpid == 0 ||
+ other_worker->wi_dboid != MyDatabaseId)
+ continue;
+
+ if (other_worker->wi_tableoid == relid)
+ {
+ LWLockRelease(AutovacuumScheduleLock);
+ skipit = true;
+ break;
+ }
+ }
+ if (skipit)
+ continue;
+
+ /*
* Check whether pgstat data still says we need to vacuum this table.
* It could have changed if something else processed the table while we
* weren't looking.
*************** do_autovacuum(void)
*** 1053,1061 ****
if (tab == NULL)
{
/* someone else vacuumed the table */
continue;
}
! /* Ok, good to go! */
/* Set the vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
--- 1648,1663 ----
if (tab == NULL)
{
/* someone else vacuumed the table */
+ LWLockRelease(AutovacuumScheduleLock);
continue;
}
!
! /*
! * Ok, good to go. Store the table in shared memory before releasing
! * the lock so that other workers don't vacuum it concurrently.
! */
! worker->wi_tableoid = relid;
! LWLockRelease(AutovacuumScheduleLock);
/* Set the vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
*************** IsAutoVacuumWorkerProcess(void)
*** 1630,1636 ****
Size
AutoVacuumShmemSize(void)
{
! return sizeof(AutoVacuumShmemStruct);
}
/*
--- 2232,2239 ----
Size
AutoVacuumShmemSize(void)
{
! return add_size(offsetof(AutoVacuumShmemStruct, av_workers),
! mul_size(autovacuum_max_workers, sizeof(WorkerInfo)));
}
/*
*************** AutoVacuumShmemInit(void)
*** 1653,1657 ****
if (found)
return; /* already initialized */
! MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
}
--- 2256,2260 ----
if (found)
return; /* already initialized */
! MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize());
}
Index: src/backend/storage/ipc/procarray.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v
retrieving revision 1.23
diff -c -p -r1.23 procarray.c
*** src/backend/storage/ipc/procarray.c 25 Mar 2007 19:45:14 -0000 1.23
--- src/backend/storage/ipc/procarray.c 30 Mar 2007 20:46:08 -0000
***************
*** 36,41 ****
--- 36,42 ----
#include "access/xact.h"
#include "access/twophase.h"
#include "miscadmin.h"
+ #include "postmaster/autovacuum.h"
#include "storage/procarray.h"
#include "utils/tqual.h"
*************** ProcArrayShmemSize(void)
*** 89,95 ****
size = offsetof(ProcArrayStruct, procs);
size = add_size(size, mul_size(sizeof(PGPROC *),
! add_size(MaxBackends, max_prepared_xacts)));
return size;
}
--- 90,98 ----
size = offsetof(ProcArrayStruct, procs);
size = add_size(size, mul_size(sizeof(PGPROC *),
! add_size(add_size(MaxBackends,
! max_prepared_xacts),
! autovacuum_max_workers)));
return size;
}
*************** CreateSharedProcArray(void)
*** 112,118 ****
* We're the first - initialize.
*/
procArray->numProcs = 0;
! procArray->maxProcs = MaxBackends + max_prepared_xacts;
}
}
--- 115,121 ----
* We're the first - initialize.
*/
procArray->numProcs = 0;
! procArray->maxProcs = MaxBackends + max_prepared_xacts + autovacuum_max_workers;
}
}
Index: src/backend/storage/lmgr/lock.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/lock.c,v
retrieving revision 1.176
diff -c -p -r1.176 lock.c
*** src/backend/storage/lmgr/lock.c 1 Feb 2007 19:10:28 -0000 1.176
--- src/backend/storage/lmgr/lock.c 30 Mar 2007 20:46:08 -0000
***************
*** 37,42 ****
--- 37,43 ----
#include "access/twophase_rmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
+ #include "postmaster/autovacuum.h"
#include "storage/lmgr.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
***************
*** 47,53 ****
int max_locks_per_xact; /* set by guc.c */
#define NLOCKENTS() \
! mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
/*
--- 48,55 ----
int max_locks_per_xact; /* set by guc.c */
#define NLOCKENTS() \
! mul_size(max_locks_per_xact, \
! add_size(add_size(MaxBackends, max_prepared_xacts), autovacuum_max_workers))
/*
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.383
diff -c -p -r1.383 guc.c
*** src/backend/utils/misc/guc.c 19 Mar 2007 23:38:30 -0000 1.383
--- src/backend/utils/misc/guc.c 30 Mar 2007 20:46:08 -0000
*************** static struct config_int ConfigureNamesI
*** 1620,1625 ****
--- 1620,1634 ----
&autovacuum_freeze_max_age,
200000000, 100000000, 2000000000, NULL, NULL
},
+ {
+ /* this is PGC_POSTMASTER because it determines shared memory size */
+ {"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ NULL
+ },
+ &autovacuum_max_workers,
+ 10, 1, INT_MAX, NULL, NULL
+ },
{
{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h 15 Feb 2007 23:23:23 -0000 1.8
--- src/include/postmaster/autovacuum.h 30 Mar 2007 20:46:08 -0000
***************
*** 16,21 ****
--- 16,22 ----
/* GUC variables */
extern bool autovacuum_start_daemon;
+ extern int autovacuum_max_workers;
extern int autovacuum_naptime;
extern int autovacuum_vac_thresh;
extern double autovacuum_vac_scale;
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.34
diff -c -p -r1.34 lwlock.h
*** src/include/storage/lwlock.h 15 Feb 2007 23:23:23 -0000 1.34
--- src/include/storage/lwlock.h 30 Mar 2007 20:25:59 -0000
*************** typedef enum LWLockId
*** 62,67 ****
--- 62,68 ----
BtreeVacuumLock,
AddinShmemInitLock,
AutovacuumLock,
+ AutovacuumScheduleLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
Alvaro Herrera wrote:
Hi,
uhmmm patch?
Here is the autovacuum patch I am currently working with. This is
basically the same as the previous patch; I have tweaked the database
list management so that after a change in databases (say a new database
is created or a database is dropped), the list is recomputed to account
for the change, keeping the ordering of the previous list.Modulo two low probability failure scenarios, I feel this patch is ready
to be applied; I will do so on Friday unless there are objections.The failure scenarios are detailed in the comment pasted below. I
intend to attack these problems next, but as the first one should be
fairly low probability, I don't think it should bar the current patch
from being applied. (The second problem, which seems to me to be the
most serious, should be easily fixable by checking launch times and
--
=== The PostgreSQL Company: Command Prompt, Inc. ===
Sales/Support: +1.503.667.4564 || 24x7/Emergency: +1.800.492.2240
Providing the most comprehensive PostgreSQL solutions since 1997
http://www.commandprompt.com/
Donate to the PostgreSQL Project: http://www.postgresql.org/about/donate
PostgreSQL Replication: http://www.commandprompt.com/products/
Alvaro Herrera <alvherre@commandprompt.com> wrote:
Here is the autovacuum patch I am currently working with. This is
basically the same as the previous patch; I have tweaked the database
list management so that after a change in databases (say a new database
is created or a database is dropped), the list is recomputed to account
for the change, keeping the ordering of the previous list.
I'm interested in your multiworkers autovacuum proposal.
I'm researching the impact of multiworkers with autovacuum_vacuum_cost_limit.
Autovacuum will consume server resources up to autovacuum_max_workers times
as many as before. I think we might need to change the semantics of
autovacuum_vacuum_cost_limit when we have multiworkers.
BTW, I found an unwitting mistake in the foreach_worker() macro.
These two operations are same in C.
- worker + 1
- (WorkerInfo *) (((char *) worker) + sizeof(WorkerInfo))
#define foreach_worker(_i, _worker) \
_worker = (WorkerInfo *) (AutoVacuumShmem + \
offsetof(AutoVacuumShmemStruct, av_workers)); \
for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
should be:
#define foreach_worker(_worker) \
for ((_worker) = AutoVacuumShmem->av_workers; \
(_worker) < AutoVacuumShmem->av_workers + autovacuum_max_workers; \
(_worker)++)
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
ITAGAKI Takahiro wrote:
Alvaro Herrera <alvherre@commandprompt.com> wrote:
Here is the autovacuum patch I am currently working with. This is
basically the same as the previous patch; I have tweaked the database
list management so that after a change in databases (say a new database
is created or a database is dropped), the list is recomputed to account
for the change, keeping the ordering of the previous list.I'm interested in your multiworkers autovacuum proposal.
I'm researching the impact of multiworkers with autovacuum_vacuum_cost_limit.
Autovacuum will consume server resources up to autovacuum_max_workers times
as many as before. I think we might need to change the semantics of
autovacuum_vacuum_cost_limit when we have multiworkers.
Yes, that's correct. Per previous discussion, what I actually wanted to
do was to create a GUC setting to simplify the whole thing, something
like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
Then, have each worker use up to (max_per_second/active workers) as much
IO resources. This way, the maximum use of IO resources by vacuum can
be easily determined and limited by the DBA; certainly much simpler than
the vacuum cost limiting feature.
BTW, I found an unwitting mistake in the foreach_worker() macro.
These two operations are same in C.
- worker + 1
- (WorkerInfo *) (((char *) worker) + sizeof(WorkerInfo))
Ah, thanks. I had originally coded the macro like you suggest, but then
during the development I needed to use the "i" variable as well, so I
added it. Apparently later I removed that usage; I see that there are
no such uses left in the current code. The "+ sizeof(WorkerInfo)" part
is just stupidity on my part, sorry about that.
--
Alvaro Herrera http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Alvaro Herrera wrote:
ITAGAKI Takahiro wrote:
Alvaro Herrera <alvherre@commandprompt.com> wrote:
Here is the autovacuum patch I am currently working with. This is
basically the same as the previous patch; I have tweaked the database
list management so that after a change in databases (say a new database
is created or a database is dropped), the list is recomputed to account
for the change, keeping the ordering of the previous list.I'm interested in your multiworkers autovacuum proposal.
I'm researching the impact of multiworkers with autovacuum_vacuum_cost_limit.
Autovacuum will consume server resources up to autovacuum_max_workers times
as many as before. I think we might need to change the semantics of
autovacuum_vacuum_cost_limit when we have multiworkers.Yes, that's correct. Per previous discussion, what I actually wanted to
do was to create a GUC setting to simplify the whole thing, something
like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
Then, have each worker use up to (max_per_second/active workers) as much
IO resources. This way, the maximum use of IO resources by vacuum can
be easily determined and limited by the DBA; certainly much simpler than
the vacuum cost limiting feature.
+1
Joshua D. Drake
--
=== The PostgreSQL Company: Command Prompt, Inc. ===
Sales/Support: +1.503.667.4564 || 24x7/Emergency: +1.800.492.2240
Providing the most comprehensive PostgreSQL solutions since 1997
http://www.commandprompt.com/
Donate to the PostgreSQL Project: http://www.postgresql.org/about/donate
PostgreSQL Replication: http://www.commandprompt.com/products/
Joshua D. Drake wrote:
Alvaro Herrera wrote:
ITAGAKI Takahiro wrote:
Alvaro Herrera <alvherre@commandprompt.com> wrote:
Here is the autovacuum patch I am currently working with. This is
basically the same as the previous patch; I have tweaked the database
list management so that after a change in databases (say a new database
is created or a database is dropped), the list is recomputed to account
for the change, keeping the ordering of the previous list.I'm interested in your multiworkers autovacuum proposal.
I'm researching the impact of multiworkers with
autovacuum_vacuum_cost_limit.
Autovacuum will consume server resources up to autovacuum_max_workers
times
as many as before. I think we might need to change the semantics of
autovacuum_vacuum_cost_limit when we have multiworkers.Yes, that's correct. Per previous discussion, what I actually wanted to
do was to create a GUC setting to simplify the whole thing, something
like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
Then, have each worker use up to (max_per_second/active workers) as much
IO resources. This way, the maximum use of IO resources by vacuum can
be easily determined and limited by the DBA; certainly much simpler than
the vacuum cost limiting feature.+1
One thing I forgot to mention is that this is unlikely to be implemented
in 8.3.
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Another problem seems to be that I'm not checking anywhere that a
regular connection (not autovac) is not using an autovac-reserved PGPROC
slot :-( I think I should tweak the logic that deals with
ReservedBackends but it doesn't look entirely trivial.
--
Alvaro Herrera http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Yes, that's correct. Per previous discussion, what I actually wanted to
do was to create a GUC setting to simplify the whole thing, something
like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
Then, have each worker use up to (max_per_second/active workers) as much
IO resources.One thing I forgot to mention is that this is unlikely to be implemented
in 8.3.
This is a WIP cost balancing patch built on autovacuum-multiworkers-5.patch.
The total cost of workers are adjusted to autovacuum_vacuum_cost_delay.
I added copy of worker's cost parameters to the shared WorkerInfo array.
A launcher and each worker reads and writes the copied parameters when
a worker starts a vacuum job or exit the process. Workers assign their local
VacuumCostDelay from the shared value every sleep in vacuum_delay_point().
I agree that "mb_per_second" or "io_per_second" are easier to use than
present cost delay parameters, but we need more research to move to it.
I think it is better to keep "cost_limit" and "cost_delay" as of 8.3,
but we need cost-balanced multiworkers at any rate.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
Attachments:
autovacuum_balance.patchapplication/octet-stream; name=autovacuum_balance.patchDownload
diff -cpr HEAD/src/backend/commands/vacuum.c autovacuum/src/backend/commands/vacuum.c
*** HEAD/src/backend/commands/vacuum.c Fri Mar 16 10:15:08 2007
--- autovacuum/src/backend/commands/vacuum.c Thu Mar 29 13:39:22 2007
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 384,390 ****
{
ListCell *cur;
! VacuumCostActive = (VacuumCostDelay > 0);
VacuumCostBalance = 0;
/*
--- 384,391 ----
{
ListCell *cur;
! VacuumCostActive = (VacuumCostDelay > 0 ||
! IsAutoVacuumWorkerProcess());
VacuumCostBalance = 0;
/*
*************** vacuum_delay_point(void)
*** 3503,3508 ****
--- 3504,3510 ----
pg_usleep(msec * 1000L);
VacuumCostBalance = 0;
+ AutoVacUpdateDelay();
/* Might have gotten an interrupt while sleeping */
CHECK_FOR_INTERRUPTS();
diff -cpr HEAD/src/backend/postmaster/autovacuum.c autovacuum/src/backend/postmaster/autovacuum.c
*** HEAD/src/backend/postmaster/autovacuum.c Mon Apr 9 19:34:55 2007
--- autovacuum/src/backend/postmaster/autovacuum.c Mon Apr 9 09:36:25 2007
*************** typedef struct
*** 141,146 ****
--- 141,149 ----
Oid wi_tableoid;
int wi_workerpid;
bool wi_finished;
+ int wi_cost_delay;
+ int wi_cost_limit;
+ int wi_cost_limit_base;
} WorkerInfo;
typedef struct
*************** typedef struct
*** 151,160 ****
} AutoVacuumShmemStruct;
/* Macro to iterate over all workers. Beware multiple evaluation of args! */
! #define foreach_worker(_i, _worker) \
! _worker = (WorkerInfo *) (AutoVacuumShmem + \
! offsetof(AutoVacuumShmemStruct, av_workers)); \
! for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
static AutoVacuumShmemStruct *AutoVacuumShmem;
--- 154,163 ----
} AutoVacuumShmemStruct;
/* Macro to iterate over all workers. Beware multiple evaluation of args! */
! #define foreach_worker(_worker) \
! for ((_worker) = AutoVacuumShmem->av_workers; \
! (_worker) < AutoVacuumShmem->av_workers + autovacuum_max_workers; \
! (_worker)++)
static AutoVacuumShmemStruct *AutoVacuumShmem;
*************** static int free_workers;
*** 163,168 ****
--- 166,172 ----
/* the database list in the launcher, and the context that contains it */
static Dllist *DatabaseList = NULL;
static MemoryContext DatabaseListCxt = NULL;
+ static WorkerInfo *MyAutoVacWorker;
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
*************** static int db_comparator(const void *a,
*** 180,185 ****
--- 184,190 ----
static void do_autovacuum(WorkerInfo *worker);
static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
+ static void autovac_balance_cost(void);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** AutoVacLauncherMain(int argc, char *argv
*** 492,510 ****
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/* a worker started up or finished */
if (got_SIGUSR1)
{
WorkerInfo *worker;
! int i;
got_SIGUSR1 = false;
/* Walk the workers and clean up finished entries. */
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! foreach_worker(i, worker)
{
if (worker->wi_finished)
{
--- 497,519 ----
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
}
/* a worker started up or finished */
if (got_SIGUSR1)
{
WorkerInfo *worker;
! bool rebalance = false;
got_SIGUSR1 = false;
/* Walk the workers and clean up finished entries. */
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! foreach_worker(worker)
{
if (worker->wi_finished)
{
*************** AutoVacLauncherMain(int argc, char *argv
*** 513,520 ****
--- 522,532 ----
worker->wi_workerpid = 0;
worker->wi_finished = false;
free_workers++;
+ rebalance = true;
}
}
+ if (rebalance)
+ autovac_balance_cost();
LWLockRelease(AutovacuumLock);
}
*************** do_start_worker(void)
*** 840,846 ****
{
List *dblist;
WorkerInfo *worker;
- int i;
ListCell *cell;
TransactionId xidForceLimit;
bool for_xid_wrap;
--- 852,857 ----
*************** do_start_worker(void)
*** 858,864 ****
* use that pointer while we weren't looking.
*/
LWLockAcquire(AutovacuumLock, LW_SHARED);
! foreach_worker(i, worker)
{
/* Invalid database OID means unused worker entry; use it */
if (!OidIsValid(worker->wi_dboid))
--- 869,875 ----
* use that pointer while we weren't looking.
*/
LWLockAcquire(AutovacuumLock, LW_SHARED);
! foreach_worker(worker)
{
/* Invalid database OID means unused worker entry; use it */
if (!OidIsValid(worker->wi_dboid))
*************** do_start_worker(void)
*** 867,873 ****
LWLockRelease(AutovacuumLock);
/* they're all used up */
! if (i >= autovacuum_max_workers)
return InvalidOid;
/* Get a list of databases */
--- 878,884 ----
LWLockRelease(AutovacuumLock);
/* they're all used up */
! if (worker >= AutoVacuumShmem->av_workers + autovacuum_max_workers)
return InvalidOid;
/* Get a list of databases */
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1198,1204 ****
sigjmp_buf local_sigjmp_buf;
Oid dbid = InvalidOid;
WorkerInfo *worker;
- int i;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 1209,1214 ----
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1307,1313 ****
* on it, thus marking it used.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! foreach_worker(i, worker)
{
if (worker->wi_workerpid == 0)
{
--- 1317,1323 ----
* on it, thus marking it used.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! foreach_worker(worker)
{
if (worker->wi_workerpid == 0)
{
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1355,1360 ****
--- 1365,1371 ----
/* And do an appropriate amount of work */
recentXid = ReadNewTransactionId();
+ MyAutoVacWorker = worker;
do_autovacuum(worker);
}
*************** do_autovacuum(WorkerInfo *worker)
*** 1600,1606 ****
autovac_table *tab;
char *relname;
WorkerInfo *other_worker;
- int i;
bool skipit;
CHECK_FOR_INTERRUPTS();
--- 1611,1616 ----
*************** do_autovacuum(WorkerInfo *worker)
*** 1616,1622 ****
* worker.
*/
skipit = false;
! foreach_worker(i, other_worker)
{
/*
* ignore not-yet-registered or not running workers, and workers in
--- 1626,1632 ----
* worker.
*/
skipit = false;
! foreach_worker(other_worker)
{
/*
* ignore not-yet-registered or not running workers, and workers in
*************** do_autovacuum(WorkerInfo *worker)
*** 1660,1667 ****
LWLockRelease(AutovacuumScheduleLock);
/* Set the vacuum cost parameters for this table */
! VacuumCostDelay = tab->at_vacuum_cost_delay;
! VacuumCostLimit = tab->at_vacuum_cost_limit;
relname = get_rel_name(relid);
elog(DEBUG2, "autovac: will%s%s %s",
--- 1670,1678 ----
LWLockRelease(AutovacuumScheduleLock);
/* Set the vacuum cost parameters for this table */
! VacuumCostDelay = worker->wi_cost_delay = tab->at_vacuum_cost_delay;
! VacuumCostLimit = worker->wi_cost_limit_base =
! worker->wi_cost_limit = tab->at_vacuum_cost_limit;
relname = get_rel_name(relid);
elog(DEBUG2, "autovac: will%s%s %s",
*************** do_autovacuum(WorkerInfo *worker)
*** 1669,1674 ****
--- 1680,1689 ----
(tab->at_doanalyze ? " ANALYZE" : ""),
relname);
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+
autovacuum_do_vac_analyze(tab->at_relid,
tab->at_dovacuum,
tab->at_doanalyze,
*************** AutoVacuumShmemInit(void)
*** 2258,2260 ****
--- 2273,2338 ----
MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize());
}
+
+ /*
+ * AutoVacUpdateDelay
+ */
+ void
+ AutoVacUpdateDelay(void)
+ {
+ if (MyAutoVacWorker)
+ {
+ VacuumCostDelay = MyAutoVacWorker->wi_cost_delay;
+ VacuumCostLimit = MyAutoVacWorker->wi_cost_limit;
+ }
+ }
+
+ /*
+ * autovac_balance_cost
+ * recaculate the cost limit setting for each active workers.
+ */
+ static void
+ autovac_balance_cost(void)
+ {
+ WorkerInfo *worker;
+ int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
+ autovacuum_vac_cost_limit : VacuumCostLimit);
+ int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
+ autovacuum_vac_cost_delay : VacuumCostDelay);
+ double cost_total;
+ double cost_avail;
+
+ if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
+ return;
+
+ /* caculate the total base cost limit of active workers.*/
+ cost_total = 0.0;
+ foreach_worker(worker)
+ {
+ if (worker->wi_workerpid != 0 &&
+ worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
+ cost_total +=
+ (double) worker->wi_cost_limit_base / worker->wi_cost_delay;
+ }
+ if (cost_total <= 0)
+ return;
+
+ /*
+ * Adjust each cost limit of active workers to balance the total of
+ * cost limit to autovacuum_vacuum_cost_limit.
+ */
+ cost_avail = (double) vac_cost_limit / vac_cost_delay;
+ foreach_worker(worker)
+ {
+ if (worker->wi_workerpid != 0 &&
+ worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
+ {
+ int limit = (int)
+ (cost_avail * worker->wi_cost_limit_base / cost_total);
+ worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
+ elog(DEBUG1, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
+ worker->wi_workerpid, worker->wi_dboid,
+ worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
+ }
+ }
+ }
diff -cpr HEAD/src/backend/utils/misc/postgresql.conf.sample autovacuum/src/backend/utils/misc/postgresql.conf.sample
*** HEAD/src/backend/utils/misc/postgresql.conf.sample Thu Mar 22 09:42:41 2007
--- autovacuum/src/backend/utils/misc/postgresql.conf.sample Thu Apr 5 18:13:32 2007
***************
*** 376,381 ****
--- 376,382 ----
#autovacuum = on # enable autovacuum subprocess?
# 'on' requires stats_start_collector
# and stats_row_level to also be on
+ #autovacuum_max_workers = 10 # max # of autovacuum subprocess
#autovacuum_naptime = 1min # time between autovacuum runs
#autovacuum_vacuum_threshold = 500 # min # of tuple updates before
# vacuum
diff -cpr HEAD/src/include/postmaster/autovacuum.h autovacuum/src/include/postmaster/autovacuum.h
*** HEAD/src/include/postmaster/autovacuum.h Mon Apr 9 19:34:56 2007
--- autovacuum/src/include/postmaster/autovacuum.h Thu Apr 5 18:13:32 2007
*************** extern int autovacuum_vac_cost_limit;
*** 30,35 ****
--- 30,36 ----
extern bool AutoVacuumingActive(void);
extern bool IsAutoVacuumLauncherProcess(void);
extern bool IsAutoVacuumWorkerProcess(void);
+ extern void AutoVacUpdateDelay(void);
/* Functions to start autovacuum process, called from postmaster */
extern void autovac_init(void);
ITAGAKI Takahiro wrote:
Yes, that's correct. Per previous discussion, what I actually wanted to
do was to create a GUC setting to simplify the whole thing, something
like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
Then, have each worker use up to (max_per_second/active workers) as much
IO resources.One thing I forgot to mention is that this is unlikely to be implemented
in 8.3.This is a WIP cost balancing patch built on autovacuum-multiworkers-5.patch.
The total cost of workers are adjusted to autovacuum_vacuum_cost_delay.I added copy of worker's cost parameters to the shared WorkerInfo array.
A launcher and each worker reads and writes the copied parameters when
a worker starts a vacuum job or exit the process. Workers assign their local
VacuumCostDelay from the shared value every sleep in vacuum_delay_point().
Thanks! I had already incorporated the foreach_worker changes into my
code, and later realized that there's an important bug regarding the
PGPROC of the workers, so I've reworked the patch, which meant that the
foreach_worker() macro went away completely.
I'll put it your changes in my current WIP patch; if you do any further
work on it, please let me have it to include it in the latest work.
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Alvaro Herrera wrote:
Thanks! I had already incorporated the foreach_worker changes into my
code, and later realized that there's an important bug regarding the
PGPROC of the workers, so I've reworked the patch, which meant that the
foreach_worker() macro went away completely.
FWIW, the problem was that it is difficult to keep the "max_connections"
control and still allow extra connections for autovacuum so that it
doesn't hinder regular operation. The first thing I tried was enlarging
the PGPROC array, but the problem with that is that the max_connection
tests get unwieldy (it would have to cycle through all used PGPROCs and
count the autovacuum ones).
So I'm now leaning towards having autovacuum keep their PGPROCs
separately, similarly to what the 2-phase code does, the main difference
being that 2PC doesn't have semaphores, while these ones will because
they need to acquire locks.
This needs a bit of rejigger in InitProcess() so that it acquires a
PGPROC from ProcGlobal if a regular backend, or from autovac's array
otherwise. This has not been very invasive.
If there's an objection to this, and/or better ideas, please speak
quickly!
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
ITAGAKI Takahiro wrote:
Yes, that's correct. Per previous discussion, what I actually wanted to
do was to create a GUC setting to simplify the whole thing, something
like "autovacuum_max_mb_per_second" or "autovacuum_max_io_per_second".
Then, have each worker use up to (max_per_second/active workers) as much
IO resources.One thing I forgot to mention is that this is unlikely to be implemented
in 8.3.This is a WIP cost balancing patch built on autovacuum-multiworkers-5.patch.
The total cost of workers are adjusted to autovacuum_vacuum_cost_delay.
I manually merged your patch on top of my own. This is the result.
Please have a look at whether the new code is correct and behaves sanely
(I haven't tested it).
--
Alvaro Herrera http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Attachments:
autovacuum-multiworkers-7.patchtext/x-diff; charset=us-asciiDownload
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.349
diff -c -p -r1.349 vacuum.c
*** src/backend/commands/vacuum.c 14 Mar 2007 18:48:55 -0000 1.349
--- src/backend/commands/vacuum.c 11 Apr 2007 23:43:23 -0000
*************** vacuum_delay_point(void)
*** 3504,3509 ****
--- 3504,3512 ----
VacuumCostBalance = 0;
+ /* update balance values for workers */
+ AutoVacuumUpdateDelay();
+
/* Might have gotten an interrupt while sleeping */
CHECK_FOR_INTERRUPTS();
}
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c 28 Mar 2007 22:17:12 -0000 1.40
--- src/backend/postmaster/autovacuum.c 11 Apr 2007 23:43:31 -0000
***************
*** 43,48 ****
--- 43,49 ----
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
+ #include "storage/spin.h"
#include "tcop/tcopprot.h"
#include "utils/flatfiles.h"
#include "utils/fmgroids.h"
***************
*** 52,57 ****
--- 53,59 ----
#include "utils/syscache.h"
+ static volatile sig_atomic_t got_SIGUSR1 = false;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t avlauncher_shutdown_request = false;
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 61,67 ----
* GUC parameters
*/
bool autovacuum_start_daemon = false;
+ int autovacuum_max_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
*************** int autovacuum_freeze_max_age;
*** 69,75 ****
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flag to tell if we are in the autovacuum daemon process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
--- 72,78 ----
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flags to tell if we are in an autovacuum process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
*************** static int default_freeze_min_age;
*** 82,95 ****
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
{
! Oid ad_datid;
! char *ad_name;
! TransactionId ad_frozenxid;
! PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
--- 85,106 ----
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
{
! Oid adl_datid; /* hash key -- must be first */
! TimestampTz adl_next_worker;
! int adl_score;
! } avl_dbase;
!
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! Oid adw_datid;
! char *adw_name;
! TransactionId adw_frozenxid;
! PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
int at_vacuum_cost_limit;
} autovac_table;
typedef struct
{
! Oid process_db; /* OID of database to process */
! int worker_pid; /* PID of the worker process, if any */
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
--- 121,195 ----
int at_vacuum_cost_limit;
} autovac_table;
+ /*-------------
+ * This struct holds information about a single worker's whereabouts. We keep
+ * an array of these in shared memory, sized according to
+ * autovacuum_max_workers.
+ *
+ * wi_links entry into free list or running list
+ * wi_dboid OID of the database this worker is supposed to work on
+ * wi_tableoid OID of the table currently being vacuumed
+ * wi_workerpid PID of the running worker, 0 if not yet started
+ * wi_launchtime Time this worker was launched
+ *
+ * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+ * protected by AutovacuumScheduleLock (which is read-only for everyone except
+ * that worker itself).
+ *-------------
+ */
+ typedef struct WorkerInfoData
+ {
+ SHM_QUEUE wi_links;
+ Oid wi_dboid;
+ Oid wi_tableoid;
+ int wi_workerpid;
+ TimestampTz wi_launchtime;
+ int wi_cost_delay;
+ int wi_cost_limit;
+ int wi_cost_limit_base;
+ } WorkerInfoData;
+
+ typedef struct WorkerInfoData *WorkerInfo;
+
+ /* the spinlock protecting the PGPROC array */
+ NON_EXEC_STATIC slock_t *AutovacProcLock = NULL;
+
+ /*-------------
+ * The main autovacuum shmem struct. On shared memory we store: 1) this main
+ * struct; 2) the array of WorkerInfo structs; 3) the array of PGPROCs.
+ *
+ * av_launcherpid the PID of the autovacuum launcher
+ * av_freeProcs the PGPROC freelist
+ * av_freeWorkers the WorkerInfo freelist
+ * av_runningWorkers the WorkerInfo non-free queue
+ * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+ * the worker itself as soon as it's up and running)
+ * av_rebalance true when a worker determines that cost limits must be
+ * rebalanced
+ *
+ * This struct is protected by AutovacuumLock, except for the PGPROC list which
+ * is protected by the AutovacProcLock spinlock.
+ *-------------
+ */
typedef struct
{
! pid_t av_launcherpid;
! SHMEM_OFFSET av_freeProcs;
! SHMEM_OFFSET av_freeWorkers;
! SHM_QUEUE av_runningWorkers;
! SHMEM_OFFSET av_startingWorker;
! bool av_rebalance;
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ static WorkerInfo MyWorkerInfo = NULL;
+
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static void do_start_worker(void);
static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 197,212 ----
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static Oid do_start_worker(void);
! static uint64 launcher_determine_sleep(bool canlaunch);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! static void autovac_balance_cost(void);
!
static void do_autovacuum(void);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static void relation_needs_vacanalyze(Oi
*** 141,152 ****
static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
--- 220,231 ----
static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
bool doanalyze, int freeze_min_age);
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
*************** StartAutoVacLauncher(void)
*** 230,241 ****
/*
* Main loop for the autovacuum launcher process.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
- MemoryContext avlauncher_cxt;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 309,340 ----
/*
* Main loop for the autovacuum launcher process.
+ *
+ * The signalling between launcher and worker is as follows:
+ *
+ * When the worker has finished starting up, it stores its PID in wi_workerpid
+ * and sends a SIGUSR1 signal to the launcher. The launcher then knows that
+ * the postmaster is ready to start a new worker. We do it this way because
+ * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+ * yet processed the last one, in which case the second signal would be lost.
+ * This is only useful when two workers need to be started close to one
+ * another, which should be rare but it's possible.
+ *
+ * When the worker exits, its ProcKill routine (actually AutovacWorkerProcKill)
+ * is in charge of resetting the WorkerInfo entry and signalling the launcher.
+ * The launcher then wakes up and can launch a new worker if need be, or just
+ * go back to sleep.
+ *
+ * There is a potential problem if, for some reason, a worker starts and is not
+ * able to bootstrap itself correctly. To prevent this situation from starving
+ * the whole system, the launcher checks the launch time of the "starting
+ * worker". If it's too old (older than autovacuum_naptime seconds), it resets
+ * the worker entry and puts it back into the free list.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
* Set up signal handlers. Since this is an auxiliary process, it has
* particular signal requirements -- no deadlock checker or sinval
* catchup, for example.
- *
- * XXX It may be a good idea to receive signals when an avworker process
- * finishes.
*/
pqsignal(SIGHUP, avl_sighup_handler);
--- 363,368 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, SIG_IGN);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
--- 372,378 ----
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, avl_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(avlauncher_cxt);
/*
--- 396,407 ----
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(AutovacMemCxt);
/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(avlauncher_cxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(avlauncher_cxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
--- 432,442 ----
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(AutovacMemCxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(AutovacMemCxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
ereport(LOG,
(errmsg("autovacuum launcher started")));
PG_SETMASK(&UnBlockSig);
/*
! * take a nap before executing the first iteration, unless we were
! * requested an emergency run.
*/
! if (autovacuum_start_daemon)
! pg_usleep(autovacuum_naptime * 1000000L);
for (;;)
{
! int worker_pid;
/*
* Emergency bailout if postmaster has died. This is to avoid the
--- 457,488 ----
ereport(LOG,
(errmsg("autovacuum launcher started")));
+ /* must unblock signals before calling rebuild_database_list */
PG_SETMASK(&UnBlockSig);
+ /* in emergency mode, just start a worker and go away */
+ if (!autovacuum_start_daemon)
+ {
+ do_start_worker();
+ proc_exit(0); /* done */
+ }
+
+ AutoVacuumShmem->av_launcherpid = MyProcPid;
+
/*
! * Create the initial database list. The invariant we want this list to
! * keep is that it's ordered by decreasing next_time. As soon as an entry is updated to
! * a higher time, it will be moved to the front (which is correct because
! * the only operation is to add autovacuum_naptime to the entry, and time
! * always increases).
*/
! rebuild_database_list(InvalidOid);
for (;;)
{
! uint64 micros;
! bool can_launch;
! TimestampTz current_time = 0;
/*
* Emergency bailout if postmaster has died. This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 491,503 ----
if (!PostmasterIsAlive(true))
exit(1);
+ micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ INVALID_OFFSET);
+
+ /* Sleep for a while according to schedule */
+ pg_usleep(micros);
+
+ /* the normal shutdown case */
if (avlauncher_shutdown_request)
break;
*************** AutoVacLauncherMain(int argc, char *argv
*** 388,469 ****
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
! * if there's a worker already running, sleep until it
! * disappears.
*/
LWLockAcquire(AutovacuumLock, LW_SHARED);
- worker_pid = AutoVacuumShmem->worker_pid;
- LWLockRelease(AutovacuumLock);
! if (worker_pid != 0)
{
! PGPROC *proc = BackendPidGetProc(worker_pid);
! if (proc != NULL && proc->isAutovacuum)
! goto sleep;
! else
{
/*
! * if the worker is not really running (or it's a process
! * that's not an autovacuum worker), remove the PID from shmem.
! * This should not happen, because either the worker exits
! * cleanly, in which case it'll remove the PID, or it dies, in
! * which case postmaster will cause a system reset cycle.
*/
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! worker_pid = 0;
! LWLockRelease(AutovacuumLock);
}
}
! do_start_worker();
! sleep:
! /*
! * in emergency mode, exit immediately so that the postmaster can
! * request another run right away if needed.
! *
! * XXX -- maybe it would be better to handle this inside the launcher
! * itself.
! */
! if (!autovacuum_start_daemon)
! break;
! /* have pgstat read the file again next time */
! pgstat_clear_snapshot();
! /* now sleep until the next autovac iteration */
! pg_usleep(autovacuum_naptime * 1000000L);
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
proc_exit(0); /* done */
}
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker.
*/
! static void
do_start_worker(void)
{
List *dblist;
! bool for_xid_wrap;
! autovac_dbase *db;
! ListCell *cell;
TransactionId xidForceLimit;
/* Get a list of databases */
! dblist = autovac_get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 505,927 ----
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
+
+ /* a worker started up or finished */
+ if (got_SIGUSR1)
+ {
+ got_SIGUSR1 = false;
+
+ /* rebalance cost limits, if needed */
+ if (AutoVacuumShmem->av_rebalance)
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
}
/*
! * There are some conditions that we need to check before trying to
! * start a launcher. First, we need to make sure that there is a
! * launcher slot available. Second, we need to make sure that no other
! * worker is still starting up.
*/
+
LWLockAcquire(AutovacuumLock, LW_SHARED);
! can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
!
! if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
{
! long secs;
! int usecs;
! WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
!
! if (current_time == 0)
! current_time = GetCurrentTimestamp();
!
! /*
! * We can't launch another worker when another one is still
! * starting up, so just sleep for a bit more; that worker will wake
! * us up again as soon as it's ready. We will only wait
! * autovacuum_naptime seconds for this to happen however. Note
! * that failure to connect to a particular database is not a
! * problem here, because the worker removes itself from the
! * startingWorker pointer before trying to connect; only low-level
! * problems, like fork() failure, can get us here.
! */
! TimestampDifference(worker->wi_launchtime, current_time,
! &secs, &usecs);
! /* ignore microseconds, as they cannot make any difference */
! if (secs > autovacuum_naptime)
{
+ LWLockRelease(AutovacuumLock);
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/*
! * No other process can put a worker in starting mode, so if
! * startingWorker is still INVALID after exchanging our lock,
! * we assume it's the same one we saw above (so we don't
! * recheck the launch time).
*/
! if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
! {
! worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! worker->wi_dboid = InvalidOid;
! worker->wi_tableoid = InvalidOid;
! worker->wi_workerpid = 0;
! worker->wi_launchtime = 0;
! worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! }
}
+ else
+ can_launch = false;
}
+ LWLockRelease(AutovacuumLock); /* either shared or exclusive */
! if (can_launch)
! {
! Dlelem *elem;
! elem = DLGetTail(DatabaseList);
! if (current_time == 0)
! current_time = GetCurrentTimestamp();
! if (elem != NULL)
! {
! avl_dbase *avdb = DLE_VAL(elem);
! long secs;
! int usecs;
!
! TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
!
! /* do we have to start a worker? */
! if (secs <= 0 && usecs <= 0)
! launch_worker(current_time);
! }
! else
! {
! /*
! * Special case when the list is empty: start a worker right
! * away. This covers the initial case, when no database is in
! * pgstats (thus the list is empty).
! */
! launch_worker(current_time);
! }
! }
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
+ AutoVacuumShmem->av_launcherpid = 0;
proc_exit(0); /* done */
}
+
+ /*
+ * Determine the time to sleep, in microseconds, based on the database list.
+ *
+ * The "canlaunch" parameter indicates whether we can start a worker right now,
+ * for example due to the workers being all busy.
+ */
+ static uint64
+ launcher_determine_sleep(bool canlaunch)
+ {
+ long secs;
+ int usecs;
+ Dlelem *elem;
+
+ /*
+ * We sleep until the next scheduled vacuum. We trust that when the
+ * database list was built, care was taken so that no entries have times in
+ * the past; if the first entry has too close a next_worker value, or a
+ * time in the past, we will sleep a small nominal time.
+ */
+ if (!canlaunch)
+ {
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+ else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ TimestampTz current_time = GetCurrentTimestamp();
+ TimestampTz next_wakeup;
+
+ next_wakeup = avdb->adl_next_worker;
+ TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ }
+ else
+ {
+ /* list is empty, sleep for whole autovacuum_naptime seconds */
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+
+ /* 100ms is the smallest time we'll allow the launcher to sleep */
+ if (secs <= 0L && usecs <= 100000)
+ {
+ secs = 0L;
+ usecs = 100000; /* 100 ms */
+ }
+
+ return secs * 1000000 + usecs;
+ }
+
+ /*
+ * Build an updated DatabaseList. It must only contain databases that appear
+ * in pgstats, and must be sorted by next_worker from highest to lowest,
+ * distributed regularly across the next autovacuum_naptime interval.
+ *
+ * Receives the Oid of the database that made this list be generated (we call
+ * this the "new" database, because when the database was already present on
+ * the list, we expect that this function is not called at all). The
+ * preexisting list, if any, will be used to preserve the order of the
+ * databases in the autovacuum_naptime period. The new database is put at the
+ * end of the interval. The actual values are not saved, which should not be
+ * much of a problem.
+ */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ List *dblist;
+ ListCell *cell;
+ MemoryContext newcxt;
+ MemoryContext oldcxt;
+ MemoryContext tmpcxt;
+ HASHCTL hctl;
+ int score;
+ int nelems;
+ HTAB *dbhash;
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
+
+ newcxt = AllocSetContextCreate(AutovacMemCxt,
+ "AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ tmpcxt = AllocSetContextCreate(newcxt,
+ "tmp AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+ /*
+ * Implementing this is not as simple as it sounds, because we need to put
+ * the new database at the end of the list; next the databases that were
+ * already on the list, and finally (at the tail of the list) all the other
+ * databases that are not on the existing list.
+ *
+ * To do this, we build an empty hash table of scored databases. We will
+ * start with the lowest scorAlvaro Herrera <alvherre@commandprompt.com> wrote:
I manually merged your patch on top of my own. This is the result.
Please have a look at whether the new code is correct and behaves sanely
(I haven't tested it).
The patch seems to be broken -- the latter half is lost.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
ITAGAKI Takahiro wrote:
Alvaro Herrera <alvherre@commandprompt.com> wrote:
I manually merged your patch on top of my own. This is the result.
Please have a look at whether the new code is correct and behaves sanely
(I haven't tested it).The patch seems to be broken -- the latter half is lost.
Huh, you are right, it is broken, even in my outgoing mailbox -- I don't
know what happened, as the file I have on disk is complete. Here is
attached again.
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Attachments:
autovacuum-multiworkers-7.patchtext/x-diff; charset=us-asciiDownload
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.349
diff -c -p -r1.349 vacuum.c
*** src/backend/commands/vacuum.c 14 Mar 2007 18:48:55 -0000 1.349
--- src/backend/commands/vacuum.c 11 Apr 2007 23:43:23 -0000
*************** vacuum_delay_point(void)
*** 3504,3509 ****
--- 3504,3512 ----
VacuumCostBalance = 0;
+ /* update balance values for workers */
+ AutoVacuumUpdateDelay();
+
/* Might have gotten an interrupt while sleeping */
CHECK_FOR_INTERRUPTS();
}
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c 28 Mar 2007 22:17:12 -0000 1.40
--- src/backend/postmaster/autovacuum.c 11 Apr 2007 23:43:31 -0000
***************
*** 43,48 ****
--- 43,49 ----
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
+ #include "storage/spin.h"
#include "tcop/tcopprot.h"
#include "utils/flatfiles.h"
#include "utils/fmgroids.h"
***************
*** 52,57 ****
--- 53,59 ----
#include "utils/syscache.h"
+ static volatile sig_atomic_t got_SIGUSR1 = false;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t avlauncher_shutdown_request = false;
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 61,67 ----
* GUC parameters
*/
bool autovacuum_start_daemon = false;
+ int autovacuum_max_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
*************** int autovacuum_freeze_max_age;
*** 69,75 ****
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flag to tell if we are in the autovacuum daemon process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
--- 72,78 ----
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flags to tell if we are in an autovacuum process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
*************** static int default_freeze_min_age;
*** 82,95 ****
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
{
! Oid ad_datid;
! char *ad_name;
! TransactionId ad_frozenxid;
! PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
--- 85,106 ----
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
{
! Oid adl_datid; /* hash key -- must be first */
! TimestampTz adl_next_worker;
! int adl_score;
! } avl_dbase;
!
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! Oid adw_datid;
! char *adw_name;
! TransactionId adw_frozenxid;
! PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
int at_vacuum_cost_limit;
} autovac_table;
typedef struct
{
! Oid process_db; /* OID of database to process */
! int worker_pid; /* PID of the worker process, if any */
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
--- 121,195 ----
int at_vacuum_cost_limit;
} autovac_table;
+ /*-------------
+ * This struct holds information about a single worker's whereabouts. We keep
+ * an array of these in shared memory, sized according to
+ * autovacuum_max_workers.
+ *
+ * wi_links entry into free list or running list
+ * wi_dboid OID of the database this worker is supposed to work on
+ * wi_tableoid OID of the table currently being vacuumed
+ * wi_workerpid PID of the running worker, 0 if not yet started
+ * wi_launchtime Time this worker was launched
+ *
+ * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+ * protected by AutovacuumScheduleLock (which is read-only for everyone except
+ * that worker itself).
+ *-------------
+ */
+ typedef struct WorkerInfoData
+ {
+ SHM_QUEUE wi_links;
+ Oid wi_dboid;
+ Oid wi_tableoid;
+ int wi_workerpid;
+ TimestampTz wi_launchtime;
+ int wi_cost_delay;
+ int wi_cost_limit;
+ int wi_cost_limit_base;
+ } WorkerInfoData;
+
+ typedef struct WorkerInfoData *WorkerInfo;
+
+ /* the spinlock protecting the PGPROC array */
+ NON_EXEC_STATIC slock_t *AutovacProcLock = NULL;
+
+ /*-------------
+ * The main autovacuum shmem struct. On shared memory we store: 1) this main
+ * struct; 2) the array of WorkerInfo structs; 3) the array of PGPROCs.
+ *
+ * av_launcherpid the PID of the autovacuum launcher
+ * av_freeProcs the PGPROC freelist
+ * av_freeWorkers the WorkerInfo freelist
+ * av_runningWorkers the WorkerInfo non-free queue
+ * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+ * the worker itself as soon as it's up and running)
+ * av_rebalance true when a worker determines that cost limits must be
+ * rebalanced
+ *
+ * This struct is protected by AutovacuumLock, except for the PGPROC list which
+ * is protected by the AutovacProcLock spinlock.
+ *-------------
+ */
typedef struct
{
! pid_t av_launcherpid;
! SHMEM_OFFSET av_freeProcs;
! SHMEM_OFFSET av_freeWorkers;
! SHM_QUEUE av_runningWorkers;
! SHMEM_OFFSET av_startingWorker;
! bool av_rebalance;
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ static WorkerInfo MyWorkerInfo = NULL;
+
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static void do_start_worker(void);
static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 197,212 ----
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static Oid do_start_worker(void);
! static uint64 launcher_determine_sleep(bool canlaunch);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! static void autovac_balance_cost(void);
!
static void do_autovacuum(void);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static void relation_needs_vacanalyze(Oi
*** 141,152 ****
static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
--- 220,231 ----
static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
bool doanalyze, int freeze_min_age);
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
*************** StartAutoVacLauncher(void)
*** 230,241 ****
/*
* Main loop for the autovacuum launcher process.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
- MemoryContext avlauncher_cxt;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 309,340 ----
/*
* Main loop for the autovacuum launcher process.
+ *
+ * The signalling between launcher and worker is as follows:
+ *
+ * When the worker has finished starting up, it stores its PID in wi_workerpid
+ * and sends a SIGUSR1 signal to the launcher. The launcher then knows that
+ * the postmaster is ready to start a new worker. We do it this way because
+ * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+ * yet processed the last one, in which case the second signal would be lost.
+ * This is only useful when two workers need to be started close to one
+ * another, which should be rare but it's possible.
+ *
+ * When the worker exits, its ProcKill routine (actually AutovacWorkerProcKill)
+ * is in charge of resetting the WorkerInfo entry and signalling the launcher.
+ * The launcher then wakes up and can launch a new worker if need be, or just
+ * go back to sleep.
+ *
+ * There is a potential problem if, for some reason, a worker starts and is not
+ * able to bootstrap itself correctly. To prevent this situation from starving
+ * the whole system, the launcher checks the launch time of the "starting
+ * worker". If it's too old (older than autovacuum_naptime seconds), it resets
+ * the worker entry and puts it back into the free list.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
* Set up signal handlers. Since this is an auxiliary process, it has
* particular signal requirements -- no deadlock checker or sinval
* catchup, for example.
- *
- * XXX It may be a good idea to receive signals when an avworker process
- * finishes.
*/
pqsignal(SIGHUP, avl_sighup_handler);
--- 363,368 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, SIG_IGN);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
--- 372,378 ----
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, avl_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(avlauncher_cxt);
/*
--- 396,407 ----
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(AutovacMemCxt);
/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(avlauncher_cxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(avlauncher_cxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
--- 432,442 ----
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(AutovacMemCxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(AutovacMemCxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
ereport(LOG,
(errmsg("autovacuum launcher started")));
PG_SETMASK(&UnBlockSig);
/*
! * take a nap before executing the first iteration, unless we were
! * requested an emergency run.
*/
! if (autovacuum_start_daemon)
! pg_usleep(autovacuum_naptime * 1000000L);
for (;;)
{
! int worker_pid;
/*
* Emergency bailout if postmaster has died. This is to avoid the
--- 457,488 ----
ereport(LOG,
(errmsg("autovacuum launcher started")));
+ /* must unblock signals before calling rebuild_database_list */
PG_SETMASK(&UnBlockSig);
+ /* in emergency mode, just start a worker and go away */
+ if (!autovacuum_start_daemon)
+ {
+ do_start_worker();
+ proc_exit(0); /* done */
+ }
+
+ AutoVacuumShmem->av_launcherpid = MyProcPid;
+
/*
! * Create the initial database list. The invariant we want this list to
! * keep is that it's ordered by decreasing next_time. As soon as an entry is updated to
! * a higher time, it will be moved to the front (which is correct because
! * the only operation is to add autovacuum_naptime to the entry, and time
! * always increases).
*/
! rebuild_database_list(InvalidOid);
for (;;)
{
! uint64 micros;
! bool can_launch;
! TimestampTz current_time = 0;
/*
* Emergency bailout if postmaster has died. This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 491,503 ----
if (!PostmasterIsAlive(true))
exit(1);
+ micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ INVALID_OFFSET);
+
+ /* Sleep for a while according to schedule */
+ pg_usleep(micros);
+
+ /* the normal shutdown case */
if (avlauncher_shutdown_request)
break;
*************** AutoVacLauncherMain(int argc, char *argv
*** 388,469 ****
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
! * if there's a worker already running, sleep until it
! * disappears.
*/
LWLockAcquire(AutovacuumLock, LW_SHARED);
- worker_pid = AutoVacuumShmem->worker_pid;
- LWLockRelease(AutovacuumLock);
! if (worker_pid != 0)
{
! PGPROC *proc = BackendPidGetProc(worker_pid);
! if (proc != NULL && proc->isAutovacuum)
! goto sleep;
! else
{
/*
! * if the worker is not really running (or it's a process
! * that's not an autovacuum worker), remove the PID from shmem.
! * This should not happen, because either the worker exits
! * cleanly, in which case it'll remove the PID, or it dies, in
! * which case postmaster will cause a system reset cycle.
*/
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! worker_pid = 0;
! LWLockRelease(AutovacuumLock);
}
}
! do_start_worker();
! sleep:
! /*
! * in emergency mode, exit immediately so that the postmaster can
! * request another run right away if needed.
! *
! * XXX -- maybe it would be better to handle this inside the launcher
! * itself.
! */
! if (!autovacuum_start_daemon)
! break;
! /* have pgstat read the file again next time */
! pgstat_clear_snapshot();
! /* now sleep until the next autovac iteration */
! pg_usleep(autovacuum_naptime * 1000000L);
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
proc_exit(0); /* done */
}
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker.
*/
! static void
do_start_worker(void)
{
List *dblist;
! bool for_xid_wrap;
! autovac_dbase *db;
! ListCell *cell;
TransactionId xidForceLimit;
/* Get a list of databases */
! dblist = autovac_get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 505,927 ----
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
+
+ /* a worker started up or finished */
+ if (got_SIGUSR1)
+ {
+ got_SIGUSR1 = false;
+
+ /* rebalance cost limits, if needed */
+ if (AutoVacuumShmem->av_rebalance)
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
}
/*
! * There are some conditions that we need to check before trying to
! * start a launcher. First, we need to make sure that there is a
! * launcher slot available. Second, we need to make sure that no other
! * worker is still starting up.
*/
+
LWLockAcquire(AutovacuumLock, LW_SHARED);
! can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
!
! if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
{
! long secs;
! int usecs;
! WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
!
! if (current_time == 0)
! current_time = GetCurrentTimestamp();
!
! /*
! * We can't launch another worker when another one is still
! * starting up, so just sleep for a bit more; that worker will wake
! * us up again as soon as it's ready. We will only wait
! * autovacuum_naptime seconds for this to happen however. Note
! * that failure to connect to a particular database is not a
! * problem here, because the worker removes itself from the
! * startingWorker pointer before trying to connect; only low-level
! * problems, like fork() failure, can get us here.
! */
! TimestampDifference(worker->wi_launchtime, current_time,
! &secs, &usecs);
! /* ignore microseconds, as they cannot make any difference */
! if (secs > autovacuum_naptime)
{
+ LWLockRelease(AutovacuumLock);
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/*
! * No other process can put a worker in starting mode, so if
! * startingWorker is still INVALID after exchanging our lock,
! * we assume it's the same one we saw above (so we don't
! * recheck the launch time).
*/
! if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
! {
! worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! worker->wi_dboid = InvalidOid;
! worker->wi_tableoid = InvalidOid;
! worker->wi_workerpid = 0;
! worker->wi_launchtime = 0;
! worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! }
}
+ else
+ can_launch = false;
}
+ LWLockRelease(AutovacuumLock); /* either shared or exclusive */
! if (can_launch)
! {
! Dlelem *elem;
! elem = DLGetTail(DatabaseList);
! if (current_time == 0)
! current_time = GetCurrentTimestamp();
! if (elem != NULL)
! {
! avl_dbase *avdb = DLE_VAL(elem);
! long secs;
! int usecs;
!
! TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
!
! /* do we have to start a worker? */
! if (secs <= 0 && usecs <= 0)
! launch_worker(current_time);
! }
! else
! {
! /*
! * Special case when the list is empty: start a worker right
! * away. This covers the initial case, when no database is in
! * pgstats (thus the list is empty).
! */
! launch_worker(current_time);
! }
! }
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
+ AutoVacuumShmem->av_launcherpid = 0;
proc_exit(0); /* done */
}
+
+ /*
+ * Determine the time to sleep, in microseconds, based on the database list.
+ *
+ * The "canlaunch" parameter indicates whether we can start a worker right now,
+ * for example due to the workers being all busy.
+ */
+ static uint64
+ launcher_determine_sleep(bool canlaunch)
+ {
+ long secs;
+ int usecs;
+ Dlelem *elem;
+
+ /*
+ * We sleep until the next scheduled vacuum. We trust that when the
+ * database list was built, care was taken so that no entries have times in
+ * the past; if the first entry has too close a next_worker value, or a
+ * time in the past, we will sleep a small nominal time.
+ */
+ if (!canlaunch)
+ {
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+ else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ TimestampTz current_time = GetCurrentTimestamp();
+ TimestampTz next_wakeup;
+
+ next_wakeup = avdb->adl_next_worker;
+ TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ }
+ else
+ {
+ /* list is empty, sleep for whole autovacuum_naptime seconds */
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+
+ /* 100ms is the smallest time we'll allow the launcher to sleep */
+ if (secs <= 0L && usecs <= 100000)
+ {
+ secs = 0L;
+ usecs = 100000; /* 100 ms */
+ }
+
+ return secs * 1000000 + usecs;
+ }
+
+ /*
+ * Build an updated DatabaseList. It must only contain databases that appear
+ * in pgstats, and must be sorted by next_worker from highest to lowest,
+ * distributed regularly across the next autovacuum_naptime interval.
+ *
+ * Receives the Oid of the database that made this list be generated (we call
+ * this the "new" database, because when the database was already present on
+ * the list, we expect that this function is not called at all). The
+ * preexisting list, if any, will be used to preserve the order of the
+ * databases in the autovacuum_naptime period. The new database is put at the
+ * end of the interval. The actual values are not saved, which should not be
+ * much of a problem.
+ */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ List *dblist;
+ ListCell *cell;
+ MemoryContext newcxt;
+ MemoryContext oldcxt;
+ MemoryContext tmpcxt;
+ HASHCTL hctl;
+ int score;
+ int nelems;
+ HTAB *dbhash;
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
+
+ newcxt = AllocSetContextCreate(AutovacMemCxt,
+ "AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ tmpcxt = AllocSetContextCreate(newcxt,
+ "tmp AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+ /*
+ * Implementing this is not as simple as it sounds, because we need to put
+ * the new database at the end of the list; next the databases that were
+ * already on the list, and finally (at the tail of the list) all the other
+ * databases that are not on the existing list.
+ *
+ * To do this, we build an empty hash table of scored databases. We will
+ * start with the lowest score (zero) for the new database, then increasing
+ * scores for the databases in the existing list, in order, and lastly
+ * increasing scores for all databases gotten via get_database_list() that
+ * are not already on the hash.
+ *
+ * Then we will put all the hash elements into an array, sort the array by
+ * score, and finally put the array elements into the new doubly linked
+ * list.
+ */
+ hctl.keysize = sizeof(Oid);
+ hctl.entrysize = sizeof(avl_dbase);
+ hctl.hash = oid_hash;
+ hctl.hcxt = tmpcxt;
+ dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /* start by inserting the new database */
+ score = 0;
+ if (OidIsValid(newdb))
+ {
+ avl_dbase *db;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider this database if it has a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(newdb);
+ if (entry != NULL)
+ {
+ /* we assume it isn't found because the hash was just created */
+ db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+
+ /* Now insert the databases from the existing list */
+ if (DatabaseList != NULL)
+ {
+ Dlelem *elem;
+
+ elem = DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ elem = DLGetSucc(elem);
+
+ /*
+ * skip databases with no stat entries -- in particular, this
+ * gets rid of dropped databases
+ */
+ entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ }
+
+ /* finally, insert all qualifying databases not previously inserted */
+ dblist = get_database_list();
+ foreach(cell, dblist)
+ {
+ avw_dbase *avdb = lfirst(cell);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider databases with a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ /* only update the score if the database was not already on the hash */
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ nelems = score;
+
+ /* from here on, the allocated memory belongs to the new list */
+ MemoryContextSwitchTo(newcxt);
+ DatabaseList = DLNewList();
+
+ if (nelems > 0)
+ {
+ TimestampTz current_time;
+ int millis_increment;
+ avl_dbase *dbary;
+ avl_dbase *db;
+ HASH_SEQ_STATUS seq;
+ int i;
+
+ /* put all the hash elements into an array */
+ dbary = palloc(nelems * sizeof(avl_dbase));
+
+ i = 0;
+ hash_seq_init(&seq, dbhash);
+ while ((db = hash_seq_search(&seq)) != NULL)
+ memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+
+ /* sort the array */
+ qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+
+ /* this is the time interval between databases in the schedule */
+ millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ current_time = GetCurrentTimestamp();
+
+ /*
+ * move the elements from the array into the dllist, setting the
+ * next_worker while walking the array
+ */
+ for (i = 0; i < nelems; i++)
+ {
+ avl_dbase *db = &(dbary[i]);
+ Dlelem *elem;
+
+ current_time = TimestampTzPlusMilliseconds(current_time,
+ millis_increment);
+ db->adl_next_worker = current_time;
+
+ elem = DLNewElem(db);
+ /* later elements should go closer to the head of the list */
+ DLAddHead(DatabaseList, elem);
+ }
+ }
+
+ /* all done, clean up memory */
+ if (DatabaseListCxt != NULL)
+ MemoryContextDelete(DatabaseListCxt);
+ MemoryContextDelete(tmpcxt);
+ DatabaseListCxt = newcxt;
+ MemoryContextSwitchTo(oldcxt);
+ }
+
+ /* qsort comparator for avl_dbase, using adl_score */
+ static int
+ db_comparator(const void *a, const void *b)
+ {
+ if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ return 0;
+ else
+ return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+ }
+
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker. It fails gracefully if invoked when
! * autovacuum_workers are already active.
! *
! * Return value is the OID of the database that the worker is going to process,
! * or InvalidOid if no worker was actually started.
*/
! static Oid
do_start_worker(void)
{
List *dblist;
! ListCell *cell;
TransactionId xidForceLimit;
+ bool for_xid_wrap;
+ avw_dbase *avdb;
+ TimestampTz current_time;
+ bool skipit = false;
+
+ /* return quickly when there are no free workers */
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+ if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+ {
+ LWLockRelease(AutovacuumLock);
+ return InvalidOid;
+ }
+ LWLockRelease(AutovacuumLock);
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
/* Get a list of databases */
! dblist = get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
*************** do_start_worker(void)
*** 495,515 ****
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
! db = NULL;
for_xid_wrap = false;
foreach(cell, dblist)
{
! autovac_dbase *tmp = lfirst(cell);
/* Find pgstat entry if any */
! tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
/* Check to see if this one is at risk of wraparound */
! if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
{
! if (db == NULL ||
! TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! db = tmp;
for_xid_wrap = true;
continue;
}
--- 953,975 ----
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
! avdb = NULL;
for_xid_wrap = false;
+ current_time = GetCurrentTimestamp();
foreach(cell, dblist)
{
! avw_dbase *tmp = lfirst(cell);
! Dlelem *elem;
/* Find pgstat entry if any */
! tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
/* Check to see if this one is at risk of wraparound */
! if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
{
! if (avdb == NULL ||
! TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
! avdb = tmp;
for_xid_wrap = true;
continue;
}
*************** do_start_worker(void)
*** 520,545 ****
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
! if (!tmp->ad_entry)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
! if (db == NULL ||
! tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! db = tmp;
}
/* Found a database -- process it */
! if (db != NULL)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! AutoVacuumShmem->process_db = db->ad_datid;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
}
}
--- 980,1135 ----
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
! if (!tmp->adw_entry)
! continue;
!
! /*
! * Also, skip a database that appears on the database list as having
! * been processed recently (less than autovacuum_naptime seconds ago).
! * We do this so that we don't select a database which we just
! * selected, but that pgstat hasn't gotten around to updating the last
! * autovacuum time yet.
! */
! skipit = false;
! elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
!
! while (elem != NULL)
! {
! avl_dbase *dbp = DLE_VAL(elem);
!
! if (dbp->adl_datid == tmp->adw_datid)
! {
! TimestampTz curr_plus_naptime;
! TimestampTz next = dbp->adl_next_worker;
!
! curr_plus_naptime =
! TimestampTzPlusMilliseconds(current_time,
! autovacuum_naptime * 1000);
!
! /*
! * What we want here if to skip if next_worker falls between
! * the current time and the current time plus naptime.
! */
! if (timestamp_cmp_internal(current_time, next) > 0)
! skipit = false;
! else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! skipit = false;
! else
! skipit = true;
!
! break;
! }
! elem = DLGetPred(elem);
! }
! if (skipit)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
! if (avdb == NULL ||
! tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
! avdb = tmp;
}
/* Found a database -- process it */
! if (avdb != NULL)
{
+ WorkerInfo worker;
+ SHMEM_OFFSET sworker;
+
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
!
! /*
! * Get a worker entry from the freelist. We checked above, so there
! * really should be a free slot -- complain very loudly if it isn't.
! */
! sworker = AutoVacuumShmem->av_freeWorkers;
! if (sworker == INVALID_OFFSET)
! elog(FATAL, "no free worker found");
!
! worker = (WorkerInfo) MAKE_PTR(sworker);
! AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
!
! worker->wi_dboid = avdb->adw_datid;
! worker->wi_workerpid = 0;
! worker->wi_launchtime = GetCurrentTimestamp();
!
! AutoVacuumShmem->av_startingWorker = sworker;
!
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+ return avdb->adw_datid;
+ }
+ else if (skipit)
+ {
+ /*
+ * If we skipped all databases on the list, rebuild it, because it
+ * probably contains a dropped database.
+ */
+ rebuild_database_list(InvalidOid);
+ }
+
+ return InvalidOid;
+ }
+
+ /*
+ * launch_worker
+ *
+ * Wrapper for starting a worker from the launcher. Besides actually starting
+ * it, update the database list to reflect the next time that another one will
+ * need to be started on the selected database. The actual database choice is
+ * left to do_start_worker.
+ *
+ * This routine is also expected to insert an entry into the database list if
+ * the selected database was previously absent from the list. It returns the
+ * new database list.
+ */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ Oid dbid;
+ Dlelem *elem;
+
+ dbid = do_start_worker();
+ if (OidIsValid(dbid))
+ {
+ /*
+ * Walk the database list and update the corresponding entry. If the
+ * database is not on the list, we'll recreate the list.
+ */
+ elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+
+ if (avdb->adl_datid == dbid)
+ {
+ /*
+ * add autovacuum_naptime seconds to the current time, and use
+ * that as the new "next_worker" field for this database.
+ */
+ avdb->adl_next_worker =
+ TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+
+ DLMoveToFront(elem);
+ break;
+ }
+ elem = DLGetSucc(elem);
+ }
+
+ /*
+ * If the database was not present in the database list, we rebuild the
+ * list. It's possible that the database does not get into the list
+ * anyway, for example if it's a database that doesn't have a pgstat
+ * entry, but this is not a problem because we don't want to schedule
+ * workers regularly into those in any case.
+ */
+ if (elem == NULL)
+ rebuild_database_list(dbid);
}
}
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 550,555 ****
--- 1140,1152 ----
got_SIGHUP = true;
}
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ got_SIGUSR1 = true;
+ }
+
static void
avlauncher_shutdown(SIGNAL_ARGS)
{
*************** NON_EXEC_STATIC void
*** 665,671 ****
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
! Oid dbid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 1262,1268 ----
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
! Oid dbid = InvalidOid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 744,751 ****
/*
* We can now go away. Note that because we called InitProcess, a
! * callback was registered to do ProcKill, which will clean up
! * necessary state.
*/
proc_exit(0);
}
--- 1341,1348 ----
/*
* We can now go away. Note that because we called InitProcess, a
! * callback was registered to do AutovacWorkerProcKill, which will
! * clean up necessary state.
*/
proc_exit(0);
}
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 763,780 ****
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
! * Get the database Id we're going to work on, and announce our PID
! * in the shared memory area. We remove the database OID immediately
! * from the shared memory area.
*/
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! dbid = AutoVacuumShmem->process_db;
! AutoVacuumShmem->process_db = InvalidOid;
! AutoVacuumShmem->worker_pid = MyProcPid;
LWLockRelease(AutovacuumLock);
if (OidIsValid(dbid))
{
char *dbname;
--- 1360,1393 ----
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
! * Force statement_timeout to zero to avoid a timeout setting from
! * preventing regular maintenance from being executed.
*/
! SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
! /*
! * Get the info about the database we're going to work on.
! */
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! dbid = MyWorkerInfo->wi_dboid;
! MyWorkerInfo->wi_workerpid = MyProcPid;
!
! /* insert into the running list */
! SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
! &MyWorkerInfo->wi_links);
! /*
! * remove from the "starting" pointer, so that the launcher can start a new
! * worker if required
! */
! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
LWLockRelease(AutovacuumLock);
+ /* wake up the launcher */
+ if (AutoVacuumShmem->av_launcherpid != 0)
+ kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
+
if (OidIsValid(dbid))
{
char *dbname;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 803,809 ****
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
--- 1416,1422 ----
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "AV worker",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 813,838 ****
do_autovacuum();
}
! /*
! * Now remove our PID from shared memory, so that the launcher can start
! * another worker as soon as appropriate.
! */
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! AutoVacuumShmem->worker_pid = 0;
! LWLockRelease(AutovacuumLock);
/* All done, go away */
proc_exit(0);
}
/*
! * autovac_get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
! autovac_get_database_list(void)
{
char *filename;
List *dblist = NIL;
--- 1426,1605 ----
do_autovacuum();
}
! /* AutovacWorkerProcKill cleans up and notifies the launcher. */
/* All done, go away */
proc_exit(0);
}
/*
! * Find and return an unused PGPROC entry to set up, for the new worker
! * process.
! */
! PGPROC *
! AutoVacuumGetFreeProc()
! {
! PGPROC *proc;
! volatile SHMEM_OFFSET freeprocs;
!
! SpinLockAcquire(AutovacProcLock);
!
! freeprocs = AutoVacuumShmem->av_freeProcs;
!
! /*
! * This shouldn't happen, because we check the number of workers we launch
! */
! if (freeprocs == INVALID_OFFSET)
! elog(FATAL, "too many autovacuum workers already");
!
! /* get a free PGPROC slot from the freelist */
! proc = (PGPROC *) MAKE_PTR(freeprocs);
! AutoVacuumShmem->av_freeProcs = proc->links.next;
!
! SpinLockRelease(AutovacProcLock);
!
! return proc;
! }
!
! /*
! * on_shmem_exit hook for returning the PGPROC struct into the freelist and
! * releasing any held LWLocks.
! */
! void
! AutovacWorkerProcKill(int code, Datum arg)
! {
! volatile SHMEM_OFFSET freeprocs;
!
! Assert(MyProc != NULL);
!
! LWLockReleaseAll();
!
! /*
! * Remove the WorkerInfo entry from the running list, and put it back into
! * the free list.
! */
! if (MyWorkerInfo != NULL)
! {
! /*
! * XXX Acquiring a lwlock just after LWLockReleaseAll seems a bit
! * contradictory. Is it a problem?
! */
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
!
! SHMQueueDelete(&MyWorkerInfo->wi_links);
! MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
! AutoVacuumShmem->av_rebalance = true;
! MyWorkerInfo = NULL;
!
! LWLockRelease(AutovacuumLock);
! }
!
! /* Put my PGPROC back into the freelist */
! SpinLockAcquire(AutovacProcLock);
!
! freeprocs = AutoVacuumShmem->av_freeProcs;
! MyProc->links.next = freeprocs;
! AutoVacuumShmem->av_freeProcs = MAKE_OFFSET(MyProc);
!
! SpinLockRelease(AutovacProcLock);
!
! /* It isn't mine anymore */
! MyProc = NULL;
! }
!
! /*
! * Update the cost-based delay parameters, so that multiple workers consume
! * each a fraction of the total available I/O.
! */
! void
! AutoVacuumUpdateDelay(void)
! {
! if (MyWorkerInfo)
! {
! VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
! VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
! }
! }
!
! /*
! * autovac_balance_cost
! * Recalculate the cost limit setting for each active workers.
! *
! * Caller must hold the AutovacuumLock in exclusive mode.
! */
! static void
! autovac_balance_cost(void)
! {
! WorkerInfo worker;
! int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
! autovacuum_vac_cost_limit : VacuumCostLimit);
! int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
! autovacuum_vac_cost_delay : VacuumCostDelay);
! double cost_total;
! double cost_avail;
!
! /* not set? nothing to do */
! if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
! return;
!
! /* caculate the total base cost limit of active workers */
! cost_total = 0.0;
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &AutoVacuumShmem->av_runningWorkers,
! offsetof(WorkerInfoData, wi_links));
! while (worker)
! {
! if (worker->wi_workerpid != 0 &&
! worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! cost_total +=
! (double) worker->wi_cost_limit_base / worker->wi_cost_delay;
!
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &worker->wi_links,
! offsetof(WorkerInfoData, wi_links));
! }
! /* there are no cost limits -- nothing to do */
! if (cost_total <= 0)
! return;
!
! /*
! * Adjust each cost limit of active workers to balance the total of
! * cost limit to autovacuum_vacuum_cost_limit.
! */
! cost_avail = (double) vac_cost_limit / vac_cost_delay;
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &AutoVacuumShmem->av_runningWorkers,
! offsetof(WorkerInfoData, wi_links));
! while (worker)
! {
! if (worker->wi_workerpid != 0 &&
! worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! {
! int limit = (int)
! (cost_avail * worker->wi_cost_limit_base / cost_total);
!
! worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
!
! elog(DEBUG1, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
! worker->wi_workerpid, worker->wi_dboid,
! worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
! }
!
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &worker->wi_links,
! offsetof(WorkerInfoData, wi_links));
! }
! }
!
! /*
! * get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
! get_database_list(void)
{
char *filename;
List *dblist = NIL;
*************** autovac_get_database_list(void)
*** 852,866 ****
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
! autovac_dbase *avdb;
! avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
! avdb->ad_datid = db_id;
! avdb->ad_name = pstrdup(thisname);
! avdb->ad_frozenxid = db_frozenxid;
/* this gets set later: */
! avdb->ad_entry = NULL;
dblist = lappend(dblist, avdb);
}
--- 1619,1633 ----
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
! avw_dbase *avdb;
! avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
! avdb->adw_datid = db_id;
! avdb->adw_name = pstrdup(thisname);
! avdb->adw_frozenxid = db_frozenxid;
/* this gets set later: */
! avdb->adw_entry = NULL;
dblist = lappend(dblist, avdb);
}
*************** do_autovacuum(void)
*** 1008,1019 ****
* Add to the list of tables to vacuum, the OIDs of the tables that
* correspond to the saved OIDs of toast tables needing vacuum.
*/
! foreach (cell, toast_oids)
{
Oid toastoid = lfirst_oid(cell);
ListCell *cell2;
! foreach (cell2, table_toast_list)
{
av_relation *ar = lfirst(cell2);
--- 1775,1786 ----
* Add to the list of tables to vacuum, the OIDs of the tables that
* correspond to the saved OIDs of toast tables needing vacuum.
*/
! foreach(cell, toast_oids)
{
Oid toastoid = lfirst_oid(cell);
ListCell *cell2;
! foreach(cell2, table_toast_list)
{
av_relation *ar = lfirst(cell2);
*************** do_autovacuum(void)
*** 1038,1047 ****
--- 1805,1860 ----
Oid relid = lfirst_oid(cell);
autovac_table *tab;
char *relname;
+ WorkerInfo worker;
+ bool skipit;
CHECK_FOR_INTERRUPTS();
/*
+ * hold schedule lock from here until we're sure that this table
+ * still needs vacuuming. We also need the AutovacuumLock to walk
+ * the worker array, but we'll let go of that one quickly.
+ */
+ LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+
+ /*
+ * Check whether the table is being vacuumed concurrently by another
+ * worker.
+ */
+ skipit = false;
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &AutoVacuumShmem->av_runningWorkers,
+ offsetof(WorkerInfoData, wi_links));
+ while (worker)
+ {
+ /* ignore myself */
+ if (worker == MyWorkerInfo)
+ goto next_worker;
+
+ /* ignore workers in other databases */
+ if (worker->wi_dboid != MyDatabaseId)
+ goto next_worker;
+
+ if (worker->wi_tableoid == relid)
+ {
+ skipit = true;
+ break;
+ }
+
+ next_worker:
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &worker->wi_links,
+ offsetof(WorkerInfoData, wi_links));
+ }
+ LWLockRelease(AutovacuumLock);
+ if (skipit)
+ {
+ LWLockRelease(AutovacuumScheduleLock);
+ continue;
+ }
+
+ /*
* Check whether pgstat data still says we need to vacuum this table.
* It could have changed if something else processed the table while we
* weren't looking.
*************** do_autovacuum(void)
*** 1053,1063 ****
if (tab == NULL)
{
/* someone else vacuumed the table */
continue;
}
- /* Ok, good to go! */
! /* Set the vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
VacuumCostLimit = tab->at_vacuum_cost_limit;
--- 1866,1883 ----
if (tab == NULL)
{
/* someone else vacuumed the table */
+ LWLockRelease(AutovacuumScheduleLock);
continue;
}
! /*
! * Ok, good to go. Store the table in shared memory before releasing
! * the lock so that other workers don't vacuum it concurrently.
! */
! MyWorkerInfo->wi_tableoid = relid;
! LWLockRelease(AutovacuumScheduleLock);
!
! /* Set the initial vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
VacuumCostLimit = tab->at_vacuum_cost_limit;
*************** do_autovacuum(void)
*** 1067,1072 ****
--- 1887,1904 ----
(tab->at_doanalyze ? " ANALYZE" : ""),
relname);
+ /*
+ * Advertise my cost delay parameters for the balancing algorithm, and
+ * do a balance
+ */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
+ MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
+ MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+
+ /* have at it */
autovacuum_do_vac_analyze(tab->at_relid,
tab->at_dovacuum,
tab->at_doanalyze,
*************** table_recheck_autovac(Oid relid)
*** 1211,1217 ****
PgStat_StatDBEntry *shared;
PgStat_StatDBEntry *dbentry;
! /* We need fresh pgstat data for this */
pgstat_clear_snapshot();
shared = pgstat_fetch_stat_dbentry(InvalidOid);
--- 2043,2049 ----
PgStat_StatDBEntry *shared;
PgStat_StatDBEntry *dbentry;
! /* use fresh stats */
pgstat_clear_snapshot();
shared = pgstat_fetch_stat_dbentry(InvalidOid);
*************** table_recheck_autovac(Oid relid)
*** 1219,1226 ****
/* fetch the relation's relcache entry */
classTup = SearchSysCacheCopy(RELOID,
! ObjectIdGetDatum(relid),
! 0, 0, 0);
if (!HeapTupleIsValid(classTup))
return NULL;
classForm = (Form_pg_class) GETSTRUCT(classTup);
--- 2051,2058 ----
/* fetch the relation's relcache entry */
classTup = SearchSysCacheCopy(RELOID,
! ObjectIdGetDatum(relid),
! 0, 0, 0);
if (!HeapTupleIsValid(classTup))
return NULL;
classForm = (Form_pg_class) GETSTRUCT(classTup);
*************** IsAutoVacuumWorkerProcess(void)
*** 1630,1636 ****
Size
AutoVacuumShmemSize(void)
{
! return sizeof(AutoVacuumShmemStruct);
}
/*
--- 2462,2482 ----
Size
AutoVacuumShmemSize(void)
{
! Size size;
!
! /*
! * Need the fixed struct, the array of WorkerInfoData, and the array
! * of PGPROCs
! */
! size = sizeof(AutoVacuumShmemStruct);
! size = MAXALIGN(size);
! size = add_size(size, mul_size(autovacuum_max_workers,
! sizeof(WorkerInfoData)));
! size = MAXALIGN(size);
! size = add_size(size, mul_size(autovacuum_max_workers,
! sizeof(PGPROC)));
!
! return size;
}
/*
*************** AutoVacuumShmemInit(void)
*** 1650,1657 ****
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for autovacuum")));
- if (found)
- return; /* already initialized */
! MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
}
--- 2496,2548 ----
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for autovacuum")));
! if (!IsUnderPostmaster)
! {
! WorkerInfo worker;
! PGPROC *proc;
! int i;
!
! Assert(!found);
!
! /* initialize our spinlock */
! AutovacProcLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
! SpinLockInit(AutovacProcLock);
!
! AutoVacuumShmem->av_launcherpid = 0;
! AutoVacuumShmem->av_freeProcs = INVALID_OFFSET;
! AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
! SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
!
! worker = (WorkerInfo) ((char *) AutoVacuumShmem +
! MAXALIGN(sizeof(AutoVacuumShmemStruct)));
!
! /* initialize the WorkerInfo free list */
! for (i = 0; i < autovacuum_max_workers; i++)
! {
! worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
! }
!
! proc = (PGPROC *)
! ((char *) worker +
! MAXALIGN(sizeof(WorkerInfoData) * autovacuum_max_workers));
!
! /* initialize the PGPROC free list and semaphores */
! for (i = 0; i < autovacuum_max_workers; i++)
! {
! proc[i].links.next = AutoVacuumShmem->av_freeProcs;
! PGSemaphoreCreate(&proc[i].sem);
! AutoVacuumShmem->av_freeProcs = MAKE_OFFSET(&proc[i]);
! }
! }
! else
! Assert(found);
! }
!
! int
! AutoVacuumSemas(void)
! {
! return autovacuum_max_workers;
}
Index: src/backend/postmaster/postmaster.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/postmaster.c,v
retrieving revision 1.527
diff -c -p -r1.527 postmaster.c
*** src/backend/postmaster/postmaster.c 22 Mar 2007 19:53:30 -0000 1.527
--- src/backend/postmaster/postmaster.c 10 Apr 2007 20:31:40 -0000
*************** typedef struct
*** 327,332 ****
--- 327,333 ----
Backend *ShmemBackendArray;
LWLock *LWLockArray;
slock_t *ProcStructLock;
+ slock_t *AutovacProcLock;
PROC_HDR *ProcGlobal;
PGPROC *AuxiliaryProcs;
InheritableSocket pgStatSock;
*************** CreateOptsFile(int argc, char *argv[], c
*** 3899,3904 ****
--- 3900,3906 ----
extern slock_t *ShmemLock;
extern LWLock *LWLockArray;
extern slock_t *ProcStructLock;
+ extern slock_t *AutovacProcLock;
extern PROC_HDR *ProcGlobal;
extern PGPROC *AuxiliaryProcs;
extern int pgStatSock;
*************** save_backend_variables(BackendParameters
*** 3942,3947 ****
--- 3944,3950 ----
param->LWLockArray = LWLockArray;
param->ProcStructLock = ProcStructLock;
+ param->AutovacProcLock = AutovacProcLock;
param->ProcGlobal = ProcGlobal;
param->AuxiliaryProcs = AuxiliaryProcs;
write_inheritable_socket(¶m->pgStatSock, pgStatSock, childPid);
*************** restore_backend_variables(BackendParamet
*** 4145,4150 ****
--- 4148,4154 ----
LWLockArray = param->LWLockArray;
ProcStructLock = param->ProcStructLock;
+ AutovacProcLock = param->AutovacProcLock;
ProcGlobal = param->ProcGlobal;
AuxiliaryProcs = param->AuxiliaryProcs;
read_inheritable_socket(&pgStatSock, ¶m->pgStatSock);
Index: src/backend/storage/ipc/ipci.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/ipci.c,v
retrieving revision 1.91
diff -c -p -r1.91 ipci.c
*** src/backend/storage/ipc/ipci.c 15 Feb 2007 23:23:23 -0000 1.91
--- src/backend/storage/ipc/ipci.c 9 Apr 2007 16:07:08 -0000
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 138,143 ****
--- 138,144 ----
*/
numSemas = ProcGlobalSemas();
numSemas += SpinlockSemas();
+ numSemas += AutoVacuumSemas();
PGReserveSemaphores(numSemas, port);
}
else
Index: src/backend/storage/lmgr/proc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/proc.c,v
retrieving revision 1.187
diff -c -p -r1.187 proc.c
*** src/backend/storage/lmgr/proc.c 3 Apr 2007 16:34:36 -0000 1.187
--- src/backend/storage/lmgr/proc.c 10 Apr 2007 20:50:14 -0000
*************** ProcGlobalShmemSize(void)
*** 96,103 ****
size = add_size(size, sizeof(PROC_HDR));
/* AuxiliaryProcs */
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! /* MyProcs */
! size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
--- 96,106 ----
size = add_size(size, sizeof(PROC_HDR));
/* AuxiliaryProcs */
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! /*
! * MyProcs. Note we use MaxConnections here instead of MaxBackends,
! * because the autovacuum PGPROC structs are reserved by autovacuum itself.
! */
! size = add_size(size, mul_size(MaxConnections, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
*************** ProcGlobalShmemSize(void)
*** 110,117 ****
int
ProcGlobalSemas(void)
{
! /* We need a sema per backend, plus one for each auxiliary process. */
! return MaxBackends + NUM_AUXILIARY_PROCS;
}
/*
--- 113,123 ----
int
ProcGlobalSemas(void)
{
! /*
! * We need a sema per backend (but don't count autovacuum), plus one for
! * each auxiliary process.
! */
! return MaxConnections + NUM_AUXILIARY_PROCS;
}
/*
*************** ProcGlobalSemas(void)
*** 127,133 ****
* running out when trying to start another backend is a common failure.
* So, now we grab enough semaphores to support the desired max number
* of backends immediately at initialization --- if the sysadmin has set
! * MaxBackends higher than his kernel will support, he'll find out sooner
* rather than later.
*
* Another reason for creating semaphores here is that the semaphore
--- 133,139 ----
* running out when trying to start another backend is a common failure.
* So, now we grab enough semaphores to support the desired max number
* of backends immediately at initialization --- if the sysadmin has set
! * MaxConnections higher than his kernel will support, he'll find out sooner
* rather than later.
*
* Another reason for creating semaphores here is that the semaphore
*************** InitProcGlobal(void)
*** 169,181 ****
/*
* Pre-create the PGPROC structures and create a semaphore for each.
*/
! procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
! MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
! for (i = 0; i < MaxBackends; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
--- 175,187 ----
/*
* Pre-create the PGPROC structures and create a semaphore for each.
*/
! procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
! MemSet(procs, 0, MaxConnections * sizeof(PGPROC));
! for (i = 0; i < MaxConnections; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
*************** InitProcGlobal(void)
*** 200,250 ****
void
InitProcess(void)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile PROC_HDR *procglobal = ProcGlobal;
- SHMEM_OFFSET myOffset;
int i;
! /*
! * ProcGlobal should be set up already (if we are a backend, we inherit
! * this by fork() or EXEC_BACKEND mechanism from the postmaster).
! */
! if (procglobal == NULL)
! elog(PANIC, "proc header uninitialized");
!
! if (MyProc != NULL)
! elog(ERROR, "you already exist");
!
! /*
! * Try to get a proc struct from the free list. If this fails, we must be
! * out of PGPROC structures (not to mention semaphores).
! *
! * While we are holding the ProcStructLock, also copy the current shared
! * estimate of spins_per_delay to local storage.
! */
! SpinLockAcquire(ProcStructLock);
! set_spins_per_delay(procglobal->spins_per_delay);
! myOffset = procglobal->freeProcs;
- if (myOffset != INVALID_OFFSET)
- {
- MyProc = (PGPROC *) MAKE_PTR(myOffset);
- procglobal->freeProcs = MyProc->links.next;
- SpinLockRelease(ProcStructLock);
- }
- else
- {
/*
! * If we reach here, all the PGPROCs are in use. This is one of the
! * possible places to detect "too many backends", so give the standard
! * error message.
*/
! SpinLockRelease(ProcStructLock);
! ereport(FATAL,
! (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
! errmsg("sorry, too many clients already")));
}
/*
--- 206,262 ----
void
InitProcess(void)
{
int i;
! if (IsAutoVacuumWorkerProcess())
! MyProc = AutoVacuumGetFreeProc();
! else
! {
! /* use volatile pointer to prevent code rearrangement */
! volatile PROC_HDR *procglobal = ProcGlobal;
! SHMEM_OFFSET myOffset;
! /*
! * ProcGlobal should be set up already (if we are a backend, we inherit
! * this by fork() or EXEC_BACKEND mechanism from the postmaster).
! */
! if (procglobal == NULL)
! elog(PANIC, "proc header uninitialized");
! if (MyProc != NULL)
! elog(ERROR, "you already exist");
/*
! * Try to get a proc struct from the free list. If this fails, we must be
! * out of PGPROC structures (not to mention semaphores).
! *
! * While we are holding the ProcStructLock, also copy the current shared
! * estimate of spins_per_delay to local storage.
*/
! SpinLockAcquire(ProcStructLock);
!
! set_spins_per_delay(procglobal->spins_per_delay);
!
! myOffset = procglobal->freeProcs;
!
! if (myOffset != INVALID_OFFSET)
! {
! MyProc = (PGPROC *) MAKE_PTR(myOffset);
! procglobal->freeProcs = MyProc->links.next;
! SpinLockRelease(ProcStructLock);
! }
! else
! {
! /*
! * If we reach here, all the PGPROCs are in use. This is one of the
! * possible places to detect "too many backends", so give the standard
! * error message.
! */
! SpinLockRelease(ProcStructLock);
! ereport(FATAL,
! (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
! errmsg("sorry, too many clients already")));
! }
}
/*
*************** InitProcess(void)
*** 280,286 ****
/*
* Arrange to clean up at backend exit.
*/
! on_shmem_exit(ProcKill, 0);
/*
* Now that we have a PGPROC, we could try to acquire locks, so initialize
--- 292,301 ----
/*
* Arrange to clean up at backend exit.
*/
! if (IsAutoVacuumWorkerProcess())
! on_shmem_exit(AutovacWorkerProcKill, 0);
! else
! on_shmem_exit(ProcKill, 0);
/*
* Now that we have a PGPROC, we could try to acquire locks, so initialize
Index: src/backend/utils/init/globals.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/init/globals.c,v
retrieving revision 1.100
diff -c -p -r1.100 globals.c
*** src/backend/utils/init/globals.c 5 Jan 2007 22:19:44 -0000 1.100
--- src/backend/utils/init/globals.c 10 Apr 2007 21:00:28 -0000
*************** bool allowSystemTableMods = false;
*** 95,103 ****
int work_mem = 1024;
int maintenance_work_mem = 16384;
! /* Primary determinants of sizes of shared-memory structures: */
int NBuffers = 1000;
int MaxBackends = 100;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
--- 95,108 ----
int work_mem = 1024;
int maintenance_work_mem = 16384;
! /*
! * Primary determinants of sizes of shared-memory structures. MaxBackends is
! * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign
! * hook):
! */
int NBuffers = 1000;
int MaxBackends = 100;
+ int MaxConnections = 90;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.383
diff -c -p -r1.383 guc.c
*** src/backend/utils/misc/guc.c 19 Mar 2007 23:38:30 -0000 1.383
--- src/backend/utils/misc/guc.c 10 Apr 2007 20:51:14 -0000
*************** static bool assign_tcp_keepalives_count(
*** 161,166 ****
--- 161,168 ----
static const char *show_tcp_keepalives_idle(void);
static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
+ static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
+ static bool assign_maxconnections(int newval, bool doit, GucSource source);
/*
* GUC option variables that are exported from this module
*************** static struct config_int ConfigureNamesI
*** 1147,1162 ****
* number.
*
* MaxBackends is limited to INT_MAX/4 because some places compute
! * 4*MaxBackends without any overflow check. Likewise we have to limit
! * NBuffers to INT_MAX/2.
*/
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
},
! &MaxBackends,
! 100, 1, INT_MAX / 4, NULL, NULL
},
{
--- 1149,1167 ----
* number.
*
* MaxBackends is limited to INT_MAX/4 because some places compute
! * 4*MaxBackends without any overflow check. This check is made on the
! * assign_maxconnections, since MaxBackends is computed as MaxConnections +
! * autovacuum_max_workers.
! *
! * Likewise we have to limit NBuffers to INT_MAX/2.
*/
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
},
! &MaxConnections,
! 100, 1, INT_MAX / 4, assign_maxconnections, NULL
},
{
*************** static struct config_int ConfigureNamesI
*** 1620,1625 ****
--- 1625,1639 ----
&autovacuum_freeze_max_age,
200000000, 100000000, 2000000000, NULL, NULL
},
+ {
+ /* see max_connections */
+ {"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ NULL
+ },
+ &autovacuum_max_workers,
+ 10, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL
+ },
{
{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
*************** show_tcp_keepalives_count(void)
*** 6659,6663 ****
--- 6673,6704 ----
return nbuf;
}
+ static bool
+ assign_maxconnections(int newval, bool doit, GucSource source)
+ {
+ if (doit)
+ {
+ if (newval + autovacuum_max_workers > INT_MAX / 4)
+ return false;
+
+ MaxBackends = newval + autovacuum_max_workers;
+ }
+
+ return true;
+ }
+
+ static bool
+ assign_autovacuum_max_workers(int newval, bool doit, GucSource source)
+ {
+ if (doit)
+ {
+ if (newval + MaxConnections > INT_MAX / 4)
+ return false;
+
+ MaxBackends = newval + MaxConnections;
+ }
+
+ return true;
+ }
#include "guc-file.c"
Index: src/backend/utils/misc/postgresql.conf.sample
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/postgresql.conf.sample,v
retrieving revision 1.213
diff -c -p -r1.213 postgresql.conf.sample
*** src/backend/utils/misc/postgresql.conf.sample 19 Mar 2007 23:38:30 -0000 1.213
--- src/backend/utils/misc/postgresql.conf.sample 11 Apr 2007 20:42:25 -0000
***************
*** 376,381 ****
--- 376,382 ----
#autovacuum = on # enable autovacuum subprocess?
# 'on' requires stats_start_collector
# and stats_row_level to also be on
+ #autovacuum_max_workers = 10 # max # of autovacuum subprocesses
#autovacuum_naptime = 1min # time between autovacuum runs
#autovacuum_vacuum_threshold = 500 # min # of tuple updates before
# vacuum
Index: src/include/miscadmin.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/miscadmin.h,v
retrieving revision 1.193
diff -c -p -r1.193 miscadmin.h
*** src/include/miscadmin.h 1 Mar 2007 14:52:04 -0000 1.193
--- src/include/miscadmin.h 10 Apr 2007 20:51:06 -0000
*************** extern DLLIMPORT char *DataDir;
*** 129,134 ****
--- 129,135 ----
extern DLLIMPORT int NBuffers;
extern int MaxBackends;
+ extern int MaxConnections;
extern DLLIMPORT int MyProcPid;
extern DLLIMPORT struct Port *MyProcPort;
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h 15 Feb 2007 23:23:23 -0000 1.8
--- src/include/postmaster/autovacuum.h 11 Apr 2007 23:43:45 -0000
***************
*** 14,21 ****
--- 14,24 ----
#ifndef AUTOVACUUM_H
#define AUTOVACUUM_H
+ #include "storage/lock.h"
+
/* GUC variables */
extern bool autovacuum_start_daemon;
+ extern int autovacuum_max_workers;
extern int autovacuum_naptime;
extern int autovacuum_vac_thresh;
extern double autovacuum_vac_scale;
*************** extern bool IsAutoVacuumWorkerProcess(vo
*** 34,39 ****
--- 37,48 ----
extern void autovac_init(void);
extern int StartAutoVacLauncher(void);
extern int StartAutoVacWorker(void);
+ extern PGPROC *AutoVacuumGetFreeProc(void);
+ extern void AutovacWorkerProcKill(int code, Datum arg);
+ extern int AutoVacuumSemas(void);
+
+ /* autovacuum cost-delay balancer */
+ extern void AutoVacuumUpdateDelay(void);
#ifdef EXEC_BACKEND
extern void AutoVacLauncherMain(int argc, char *argv[]);
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.35
diff -c -p -r1.35 lwlock.h
*** src/include/storage/lwlock.h 3 Apr 2007 16:34:36 -0000 1.35
--- src/include/storage/lwlock.h 6 Apr 2007 02:20:17 -0000
*************** typedef enum LWLockId
*** 61,66 ****
--- 61,67 ----
BtreeVacuumLock,
AddinShmemInitLock,
AutovacuumLock,
+ AutovacuumScheduleLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
Alvaro Herrera <alvherre@commandprompt.com> wrote:
I manually merged your patch on top of my own. This is the result.
Please have a look at whether the new code is correct and behaves sanely
(I haven't tested it).Huh, you are right, it is broken, even in my outgoing mailbox -- I don't
know what happened, as the file I have on disk is complete. Here is
attached again.
I tested your patch on Linux and Windows. It works well on Linux,
where we use fork(), but falls into segfault on Windows, where we
use exec(). Maybe you forgot to initialize the shared memory stuff.
(I haven't find out where to be fixed, sorry.)
Multiworker and balancing seem to work well after they successfully start up.
Regards,
---
ITAGAKI Takahiro
NTT Open Source Software Center
ITAGAKI Takahiro wrote:
Alvaro Herrera <alvherre@commandprompt.com> wrote:
I manually merged your patch on top of my own. This is the result.
Please have a look at whether the new code is correct and behaves sanely
(I haven't tested it).Huh, you are right, it is broken, even in my outgoing mailbox -- I don't
know what happened, as the file I have on disk is complete. Here is
attached again.I tested your patch on Linux and Windows. It works well on Linux,
where we use fork(), but falls into segfault on Windows, where we
use exec(). Maybe you forgot to initialize the shared memory stuff.
(I haven't find out where to be fixed, sorry.)
Ok, thanks, this confirms that I have to try the EXEC_BACKEND code path.
Multiworker and balancing seem to work well after they successfully start up.
Great.
--
Alvaro Herrera http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Alvaro Herrera wrote:
ITAGAKI Takahiro wrote:
I tested your patch on Linux and Windows. It works well on Linux,
where we use fork(), but falls into segfault on Windows, where we
use exec(). Maybe you forgot to initialize the shared memory stuff.
(I haven't find out where to be fixed, sorry.)Ok, thanks, this confirms that I have to try the EXEC_BACKEND code path.
Oh, uh, the problem is that CreateSharedMemoryAndSemaphores wants to
have access to the PGPROC already, but to obtain the PGPROC we need
access to autovac shared memory (per AutoVacuumGetFreeProc). So this
wasn't too bright a choice :-(
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Alvaro Herrera wrote:
Oh, uh, the problem is that CreateSharedMemoryAndSemaphores wants to
have access to the PGPROC already, but to obtain the PGPROC we need
access to autovac shared memory (per AutoVacuumGetFreeProc). So this
wasn't too bright a choice :-(
It seems like I'll have to decouple autovacuum PGPROC's from
autovacuum's own shared memory. The most sensible way to do this seems
to be to store them in ProcGlobal, along with the regular backend's
PGPROCs. Is everyone OK with this plan?
Note that this will mean that those PGPROCs will be protected by the
same spinlock that protects the other PGPROCs. I can't think of any
reason why this would be a problem, but if you think otherwise please
speak up.
--
Alvaro Herrera http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Alvaro Herrera <alvherre@commandprompt.com> writes:
It seems like I'll have to decouple autovacuum PGPROC's from
autovacuum's own shared memory. The most sensible way to do this seems
to be to store them in ProcGlobal, along with the regular backend's
PGPROCs. Is everyone OK with this plan?
Note that this will mean that those PGPROCs will be protected by the
same spinlock that protects the other PGPROCs. I can't think of any
reason why this would be a problem, but if you think otherwise please
speak up.
I thought the separate pool of PGPROCs was a bit weird. If you're going
back to a common pool, I'm all for it.
regards, tom lane
Ok, here is a version which works on EXEC_BACKEND too.
The main change in this patch is that the PGPROC list has been moved
into ProcGlobal, as discussed. They are still separate from the main
PGPROCs, but they are protected by the same spinlock and thus a bit of
code dealing with that goes away. Also, there is no longer a separate
ProcKill, and the changes to InitProcess are less intrusive. The code
dealing with clearing the WorkerInfo has been moved into its own
on_shmem_exit hook.
Also, a new signal is sent to the postmaster when the worker which is
supposed to be starting has not started. This should not make much of a
difference in general, since the postmaster is not supposed to lose
signals, but it gives an extra guarantee that things are not going to
linger too long. The request to start is removed when
autovacuum_naptime seconds have elapsed anyway.
Oh, I also changed the default autovacuum_max_workers from 10 to 3.
Some other minor infelicites have been corrected, like resetting the
av_rebalance flag when the rebalancing takes place (doh); some comments
have been expanded a bit.
--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.
Attachments:
autovacuum-multiworkers-8.patchtext/x-diff; charset=us-asciiDownload
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.349
diff -c -p -r1.349 vacuum.c
*** src/backend/commands/vacuum.c 14 Mar 2007 18:48:55 -0000 1.349
--- src/backend/commands/vacuum.c 11 Apr 2007 23:43:23 -0000
*************** vacuum_delay_point(void)
*** 3504,3509 ****
--- 3504,3512 ----
VacuumCostBalance = 0;
+ /* update balance values for workers */
+ AutoVacuumUpdateDelay();
+
/* Might have gotten an interrupt while sleeping */
CHECK_FOR_INTERRUPTS();
}
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c 28 Mar 2007 22:17:12 -0000 1.40
--- src/backend/postmaster/autovacuum.c 12 Apr 2007 20:16:08 -0000
***************
*** 52,57 ****
--- 52,58 ----
#include "utils/syscache.h"
+ static volatile sig_atomic_t got_SIGUSR1 = false;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t avlauncher_shutdown_request = false;
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 60,66 ----
* GUC parameters
*/
bool autovacuum_start_daemon = false;
+ int autovacuum_max_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
*************** int autovacuum_freeze_max_age;
*** 69,75 ****
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flag to tell if we are in the autovacuum daemon process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
--- 71,77 ----
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
! /* Flags to tell if we are in an autovacuum process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
*************** static int default_freeze_min_age;
*** 82,95 ****
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
{
! Oid ad_datid;
! char *ad_name;
! TransactionId ad_frozenxid;
! PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
--- 84,105 ----
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
{
! Oid adl_datid; /* hash key -- must be first */
! TimestampTz adl_next_worker;
! int adl_score;
! } avl_dbase;
!
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! Oid adw_datid;
! char *adw_name;
! TransactionId adw_frozenxid;
! PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
int at_vacuum_cost_limit;
} autovac_table;
typedef struct
{
! Oid process_db; /* OID of database to process */
! int worker_pid; /* PID of the worker process, if any */
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
--- 120,189 ----
int at_vacuum_cost_limit;
} autovac_table;
+ /*-------------
+ * This struct holds information about a single worker's whereabouts. We keep
+ * an array of these in shared memory, sized according to
+ * autovacuum_max_workers.
+ *
+ * wi_links entry into free list or running list
+ * wi_dboid OID of the database this worker is supposed to work on
+ * wi_tableoid OID of the table currently being vacuumed
+ * wi_workerpid PID of the running worker, 0 if not yet started
+ * wi_launchtime Time at which this worker was launched
+ * wi_cost_* Vacuum cost-based delay parameters current in this worker
+ *
+ * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+ * protected by AutovacuumScheduleLock (which is read-only for everyone except
+ * that worker itself).
+ *-------------
+ */
+ typedef struct WorkerInfoData
+ {
+ SHM_QUEUE wi_links;
+ Oid wi_dboid;
+ Oid wi_tableoid;
+ int wi_workerpid;
+ TimestampTz wi_launchtime;
+ int wi_cost_delay;
+ int wi_cost_limit;
+ int wi_cost_limit_base;
+ } WorkerInfoData;
+
+ typedef struct WorkerInfoData *WorkerInfo;
+
+ /*-------------
+ * The main autovacuum shmem struct. On shared memory we store this main
+ * struct and the array of WorkerInfo structs. This struct keeps:
+ *
+ * av_launcherpid the PID of the autovacuum launcher
+ * av_freeWorkers the WorkerInfo freelist
+ * av_runningWorkers the WorkerInfo non-free queue
+ * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+ * the worker itself as soon as it's up and running)
+ * av_rebalance true when a worker determines that cost limits must be
+ * rebalanced
+ *
+ * This struct is protected by AutovacuumLock.
+ *-------------
+ */
typedef struct
{
! pid_t av_launcherpid;
! SHMEM_OFFSET av_freeWorkers;
! SHM_QUEUE av_runningWorkers;
! SHMEM_OFFSET av_startingWorker;
! bool av_rebalance;
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ static WorkerInfo MyWorkerInfo = NULL;
+
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static void do_start_worker(void);
static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 191,206 ----
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! static Oid do_start_worker(void);
! static uint64 launcher_determine_sleep(bool canlaunch, bool recursing);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! static void autovac_balance_cost(void);
!
static void do_autovacuum(void);
! static void FreeWorkerInfo(int code, Datum arg);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static PgStat_StatTabEntry *get_pgstat_t
*** 147,152 ****
--- 220,226 ----
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
*************** StartAutoVacLauncher(void)
*** 230,241 ****
/*
* Main loop for the autovacuum launcher process.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
- MemoryContext avlauncher_cxt;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 304,334 ----
/*
* Main loop for the autovacuum launcher process.
+ *
+ * The signalling between launcher and worker is as follows:
+ *
+ * When the worker has finished starting up, it stores its PID in wi_workerpid
+ * and sends a SIGUSR1 signal to the launcher. The launcher then knows that
+ * the postmaster is ready to start a new worker. We do it this way because
+ * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+ * yet processed the last one, in which case the second signal would be lost.
+ * This is only useful when two workers need to be started close to one
+ * another, which should be rare but it's possible.
+ *
+ * When the worker exits, it resets the WorkerInfo struct and puts it back into
+ * the free list. It must also signal the launcher. The launcher wakes up and
+ * can launch a new worker if it needs to, or it might just go back to sleep.
+ *
+ * There is a potential problem if, for some reason, a worker starts and is not
+ * able to bootstrap itself correctly. To prevent this situation from starving
+ * the whole system, the launcher checks the launch time of the "starting
+ * worker". If it's too old (older than autovacuum_naptime seconds), it resets
+ * the worker entry and puts it back into the free list.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
* Set up signal handlers. Since this is an auxiliary process, it has
* particular signal requirements -- no deadlock checker or sinval
* catchup, for example.
- *
- * XXX It may be a good idea to receive signals when an avworker process
- * finishes.
*/
pqsignal(SIGHUP, avl_sighup_handler);
--- 357,362 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, SIG_IGN);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
--- 366,372 ----
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
! pqsignal(SIGUSR1, avl_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(avlauncher_cxt);
/*
--- 390,401 ----
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
! AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum Launcher",
! ALLOCSET_DEFAULT_MINSIZE,
! ALLOCSET_DEFAULT_INITSIZE,
! ALLOCSET_DEFAULT_MAXSIZE);
! MemoryContextSwitchTo(AutovacMemCxt);
/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(avlauncher_cxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(avlauncher_cxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
--- 426,436 ----
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
! MemoryContextSwitchTo(AutovacMemCxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
! MemoryContextResetAndDeleteChildren(AutovacMemCxt);
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
ereport(LOG,
(errmsg("autovacuum launcher started")));
PG_SETMASK(&UnBlockSig);
/*
! * take a nap before executing the first iteration, unless we were
! * requested an emergency run.
*/
! if (autovacuum_start_daemon)
! pg_usleep(autovacuum_naptime * 1000000L);
for (;;)
{
! int worker_pid;
/*
* Emergency bailout if postmaster has died. This is to avoid the
--- 451,482 ----
ereport(LOG,
(errmsg("autovacuum launcher started")));
+ /* must unblock signals before calling rebuild_database_list */
PG_SETMASK(&UnBlockSig);
+ /* in emergency mode, just start a worker and go away */
+ if (!autovacuum_start_daemon)
+ {
+ do_start_worker();
+ proc_exit(0); /* done */
+ }
+
+ AutoVacuumShmem->av_launcherpid = MyProcPid;
+
/*
! * Create the initial database list. The invariant we want this list to
! * keep is that it's ordered by decreasing next_time. As soon as an entry
! * is updated to a higher time, it will be moved to the front (which is
! * correct because the only operation is to add autovacuum_naptime to the
! * entry, and time always increases).
*/
! rebuild_database_list(InvalidOid);
for (;;)
{
! uint64 micros;
! bool can_launch;
! TimestampTz current_time = 0;
/*
* Emergency bailout if postmaster has died. This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 485,497 ----
if (!PostmasterIsAlive(true))
exit(1);
+ micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ INVALID_OFFSET, false);
+
+ /* Sleep for a while according to schedule */
+ pg_usleep(micros);
+
+ /* the normal shutdown case */
if (avlauncher_shutdown_request)
break;
*************** AutoVacLauncherMain(int argc, char *argv
*** 388,469 ****
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
! * if there's a worker already running, sleep until it
! * disappears.
*/
LWLockAcquire(AutovacuumLock, LW_SHARED);
- worker_pid = AutoVacuumShmem->worker_pid;
- LWLockRelease(AutovacuumLock);
! if (worker_pid != 0)
{
! PGPROC *proc = BackendPidGetProc(worker_pid);
! if (proc != NULL && proc->isAutovacuum)
! goto sleep;
else
{
/*
! * if the worker is not really running (or it's a process
! * that's not an autovacuum worker), remove the PID from shmem.
! * This should not happen, because either the worker exits
! * cleanly, in which case it'll remove the PID, or it dies, in
! * which case postmaster will cause a system reset cycle.
*/
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! worker_pid = 0;
! LWLockRelease(AutovacuumLock);
}
}
! do_start_worker();
! sleep:
! /*
! * in emergency mode, exit immediately so that the postmaster can
! * request another run right away if needed.
! *
! * XXX -- maybe it would be better to handle this inside the launcher
! * itself.
! */
! if (!autovacuum_start_daemon)
! break;
! /* have pgstat read the file again next time */
! pgstat_clear_snapshot();
! /* now sleep until the next autovac iteration */
! pg_usleep(autovacuum_naptime * 1000000L);
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
proc_exit(0); /* done */
}
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker.
*/
! static void
do_start_worker(void)
{
List *dblist;
! bool for_xid_wrap;
! autovac_dbase *db;
! ListCell *cell;
TransactionId xidForceLimit;
/* Get a list of databases */
! dblist = autovac_get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 499,949 ----
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
+
+ /* a worker started up or finished */
+ if (got_SIGUSR1)
+ {
+ got_SIGUSR1 = false;
+
+ /* rebalance cost limits, if needed */
+ if (AutoVacuumShmem->av_rebalance)
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ AutoVacuumShmem->av_rebalance = false;
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+ }
}
/*
! * There are some conditions that we need to check before trying to
! * start a launcher. First, we need to make sure that there is a
! * launcher slot available. Second, we need to make sure that no other
! * worker is still starting up.
*/
+
LWLockAcquire(AutovacuumLock, LW_SHARED);
! can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
!
! if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
{
! long secs;
! int usecs;
! WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
!
! if (current_time == 0)
! current_time = GetCurrentTimestamp();
!
! /*
! * We can't launch another worker when another one is still
! * starting up, so just sleep for a bit more; that worker will wake
! * us up again as soon as it's ready. We will only wait
! * autovacuum_naptime seconds for this to happen however. Note
! * that failure to connect to a particular database is not a
! * problem here, because the worker removes itself from the
! * startingWorker pointer before trying to connect; only low-level
! * problems, like fork() failure, can get us here.
! */
! TimestampDifference(worker->wi_launchtime, current_time,
! &secs, &usecs);
! /* ignore microseconds, as they cannot make any difference */
! if (secs > autovacuum_naptime)
! {
! LWLockRelease(AutovacuumLock);
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! /*
! * No other process can put a worker in starting mode, so if
! * startingWorker is still INVALID after exchanging our lock,
! * we assume it's the same one we saw above (so we don't
! * recheck the launch time).
! */
! if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
! {
! worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! worker->wi_dboid = InvalidOid;
! worker->wi_tableoid = InvalidOid;
! worker->wi_workerpid = 0;
! worker->wi_launchtime = 0;
! worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! }
! }
else
{
/*
! * maybe the postmaster neglected this start signal --
! * resend it. Note: the constraints in
! * launcher_determine_sleep keep us from delivering signals too
! * quickly (at most once every 100ms).
*/
! SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
! can_launch = false;
}
}
+ LWLockRelease(AutovacuumLock); /* either shared or exclusive */
! if (can_launch)
! {
! Dlelem *elem;
! elem = DLGetTail(DatabaseList);
! if (current_time == 0)
! current_time = GetCurrentTimestamp();
! if (elem != NULL)
! {
! avl_dbase *avdb = DLE_VAL(elem);
! long secs;
! int usecs;
!
! TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
!
! /* do we have to start a worker? */
! if (secs <= 0 && usecs <= 0)
! launch_worker(current_time);
! }
! else
! {
! /*
! * Special case when the list is empty: start a worker right
! * away. This covers the initial case, when no database is in
! * pgstats (thus the list is empty). Note that the constraints
! * in launcher_determine_sleep keep us from starting workers
! * too quickly (at most once every autovacuum_naptime when the
! * list is empty).
! */
! launch_worker(current_time);
! }
! }
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
+ AutoVacuumShmem->av_launcherpid = 0;
proc_exit(0); /* done */
}
/*
+ * Determine the time to sleep, in microseconds, based on the database list.
+ *
+ * The "canlaunch" parameter indicates whether we can start a worker right now,
+ * for example due to the workers being all busy.
+ */
+ static uint64
+ launcher_determine_sleep(bool canlaunch, bool recursing)
+ {
+ long secs;
+ int usecs;
+ Dlelem *elem;
+
+ /*
+ * We sleep until the next scheduled vacuum. We trust that when the
+ * database list was built, care was taken so that no entries have times in
+ * the past; if the first entry has too close a next_worker value, or a
+ * time in the past, we will sleep a small nominal time.
+ */
+ if (!canlaunch)
+ {
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+ else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ TimestampTz current_time = GetCurrentTimestamp();
+ TimestampTz next_wakeup;
+
+ next_wakeup = avdb->adl_next_worker;
+ TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ }
+ else
+ {
+ /* list is empty, sleep for whole autovacuum_naptime seconds */
+ secs = autovacuum_naptime;
+ usecs = 0;
+ }
+
+ /*
+ * If the result is exactly zero, it means a database had an entry with
+ * time in the past. Rebuild the list so that the databases are evenly
+ * distributed again, and recalculate the time to sleep. This can happen
+ * if there are more tables needing vacuum than workers, and they all take
+ * longer to vacuum than autovacuum_naptime.
+ *
+ * We only recurse once. rebuild_database_list should always return times
+ * in the future, but it seems best not to trust too much on that.
+ */
+ if (secs == 0L && usecs == 0 && !recursing)
+ {
+ rebuild_database_list(InvalidOid);
+ return launcher_determine_sleep(canlaunch, true);
+ }
+
+ /* 100ms is the smallest time we'll allow the launcher to sleep */
+ if (secs <= 0L && usecs <= 100000)
+ {
+ secs = 0L;
+ usecs = 100000; /* 100 ms */
+ }
+
+ return secs * 1000000 + usecs;
+ }
+
+ /*
+ * Build an updated DatabaseList. It must only contain databases that appear
+ * in pgstats, and must be sorted by next_worker from highest to lowest,
+ * distributed regularly across the next autovacuum_naptime interval.
+ *
+ * Receives the Oid of the database that made this list be generated (we call
+ * this the "new" database, because when the database was already present on
+ * the list, we expect that this function is not called at all). The
+ * preexisting list, if any, will be used to preserve the order of the
+ * databases in the autovacuum_naptime period. The new database is put at the
+ * end of the interval. The actual values are not saved, which should not be
+ * much of a problem.
+ */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ List *dblist;
+ ListCell *cell;
+ MemoryContext newcxt;
+ MemoryContext oldcxt;
+ MemoryContext tmpcxt;
+ HASHCTL hctl;
+ int score;
+ int nelems;
+ HTAB *dbhash;
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
+
+ newcxt = AllocSetContextCreate(AutovacMemCxt,
+ "AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ tmpcxt = AllocSetContextCreate(newcxt,
+ "tmp AV dblist",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(tmpcxt);
+
+ /*
+ * Implementing this is not as simple as it sounds, because we need to put
+ * the new database at the end of the list; next the databases that were
+ * already on the list, and finally (at the tail of the list) all the other
+ * databases that are not on the existing list.
+ *
+ * To do this, we build an empty hash table of scored databases. We will
+ * start with the lowest score (zero) for the new database, then increasing
+ * scores for the databases in the existing list, in order, and lastly
+ * increasing scores for all databases gotten via get_database_list() that
+ * are not already on the hash.
+ *
+ * Then we will put all the hash elements into an array, sort the array by
+ * score, and finally put the array elements into the new doubly linked
+ * list.
+ */
+ hctl.keysize = sizeof(Oid);
+ hctl.entrysize = sizeof(avl_dbase);
+ hctl.hash = oid_hash;
+ hctl.hcxt = tmpcxt;
+ dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /* start by inserting the new database */
+ score = 0;
+ if (OidIsValid(newdb))
+ {
+ avl_dbase *db;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider this database if it has a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(newdb);
+ if (entry != NULL)
+ {
+ /* we assume it isn't found because the hash was just created */
+ db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+
+ /* Now insert the databases from the existing list */
+ if (DatabaseList != NULL)
+ {
+ Dlelem *elem;
+
+ elem = DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ elem = DLGetSucc(elem);
+
+ /*
+ * skip databases with no stat entries -- in particular, this
+ * gets rid of dropped databases
+ */
+ entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ }
+
+ /* finally, insert all qualifying databases not previously inserted */
+ dblist = get_database_list();
+ foreach(cell, dblist)
+ {
+ avw_dbase *avdb = lfirst(cell);
+ avl_dbase *db;
+ bool found;
+ PgStat_StatDBEntry *entry;
+
+ /* only consider databases with a pgstat entry */
+ entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ if (entry == NULL)
+ continue;
+
+ db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ /* only update the score if the database was not already on the hash */
+ if (!found)
+ {
+ /* hash_search already filled in the key */
+ db->adl_score = score++;
+ /* next_worker is filled in later */
+ }
+ }
+ nelems = score;
+
+ /* from here on, the allocated memory belongs to the new list */
+ MemoryContextSwitchTo(newcxt);
+ DatabaseList = DLNewList();
+
+ if (nelems > 0)
+ {
+ TimestampTz current_time;
+ int millis_increment;
+ avl_dbase *dbary;
+ avl_dbase *db;
+ HASH_SEQ_STATUS seq;
+ int i;
+
+ /* put all the hash elements into an array */
+ dbary = palloc(nelems * sizeof(avl_dbase));
+
+ i = 0;
+ hash_seq_init(&seq, dbhash);
+ while ((db = hash_seq_search(&seq)) != NULL)
+ memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+
+ /* sort the array */
+ qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+
+ /* this is the time interval between databases in the schedule */
+ millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ current_time = GetCurrentTimestamp();
+
+ /*
+ * move the elements from the array into the dllist, setting the
+ * next_worker while walking the array
+ */
+ for (i = 0; i < nelems; i++)
+ {
+ avl_dbase *db = &(dbary[i]);
+ Dlelem *elem;
+
+ current_time = TimestampTzPlusMilliseconds(current_time,
+ millis_increment);
+ db->adl_next_worker = current_time;
+
+ elem = DLNewElem(db);
+ /* later elements should go closer to the head of the list */
+ DLAddHead(DatabaseList, elem);
+ }
+ }
+
+ /* all done, clean up memory */
+ if (DatabaseListCxt != NULL)
+ MemoryContextDelete(DatabaseListCxt);
+ MemoryContextDelete(tmpcxt);
+ DatabaseListCxt = newcxt;
+ MemoryContextSwitchTo(oldcxt);
+ }
+
+ /* qsort comparator for avl_dbase, using adl_score */
+ static int
+ db_comparator(const void *a, const void *b)
+ {
+ if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ return 0;
+ else
+ return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+ }
+
+ /*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
! * signals postmaster to start the worker. It fails gracefully if invoked when
! * autovacuum_workers are already active.
! *
! * Return value is the OID of the database that the worker is going to process,
! * or InvalidOid if no worker was actually started.
*/
! static Oid
do_start_worker(void)
{
List *dblist;
! ListCell *cell;
TransactionId xidForceLimit;
+ bool for_xid_wrap;
+ avw_dbase *avdb;
+ TimestampTz current_time;
+ bool skipit = false;
+
+ /* return quickly when there are no free workers */
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+ if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+ {
+ LWLockRelease(AutovacuumLock);
+ return InvalidOid;
+ }
+ LWLockRelease(AutovacuumLock);
+
+ /* use fresh stats */
+ pgstat_clear_snapshot();
/* Get a list of databases */
! dblist = get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
*************** do_start_worker(void)
*** 495,515 ****
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
! db = NULL;
for_xid_wrap = false;
foreach(cell, dblist)
{
! autovac_dbase *tmp = lfirst(cell);
/* Find pgstat entry if any */
! tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
/* Check to see if this one is at risk of wraparound */
! if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
{
! if (db == NULL ||
! TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! db = tmp;
for_xid_wrap = true;
continue;
}
--- 975,997 ----
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
! avdb = NULL;
for_xid_wrap = false;
+ current_time = GetCurrentTimestamp();
foreach(cell, dblist)
{
! avw_dbase *tmp = lfirst(cell);
! Dlelem *elem;
/* Find pgstat entry if any */
! tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
/* Check to see if this one is at risk of wraparound */
! if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
{
! if (avdb == NULL ||
! TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
! avdb = tmp;
for_xid_wrap = true;
continue;
}
*************** do_start_worker(void)
*** 520,545 ****
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
! if (!tmp->ad_entry)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
! if (db == NULL ||
! tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! db = tmp;
}
/* Found a database -- process it */
! if (db != NULL)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! AutoVacuumShmem->process_db = db->ad_datid;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
}
}
--- 1002,1157 ----
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
! if (!tmp->adw_entry)
! continue;
!
! /*
! * Also, skip a database that appears on the database list as having
! * been processed recently (less than autovacuum_naptime seconds ago).
! * We do this so that we don't select a database which we just
! * selected, but that pgstat hasn't gotten around to updating the last
! * autovacuum time yet.
! */
! skipit = false;
! elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
!
! while (elem != NULL)
! {
! avl_dbase *dbp = DLE_VAL(elem);
!
! if (dbp->adl_datid == tmp->adw_datid)
! {
! TimestampTz curr_plus_naptime;
! TimestampTz next = dbp->adl_next_worker;
!
! curr_plus_naptime =
! TimestampTzPlusMilliseconds(current_time,
! autovacuum_naptime * 1000);
!
! /*
! * What we want here if to skip if next_worker falls between
! * the current time and the current time plus naptime.
! */
! if (timestamp_cmp_internal(current_time, next) > 0)
! skipit = false;
! else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! skipit = false;
! else
! skipit = true;
!
! break;
! }
! elem = DLGetPred(elem);
! }
! if (skipit)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
! if (avdb == NULL ||
! tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
! avdb = tmp;
}
/* Found a database -- process it */
! if (avdb != NULL)
{
+ WorkerInfo worker;
+ SHMEM_OFFSET sworker;
+
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
!
! /*
! * Get a worker entry from the freelist. We checked above, so there
! * really should be a free slot -- complain very loudly if there isn't.
! */
! sworker = AutoVacuumShmem->av_freeWorkers;
! if (sworker == INVALID_OFFSET)
! elog(FATAL, "no free worker found");
!
! worker = (WorkerInfo) MAKE_PTR(sworker);
! AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
!
! worker->wi_dboid = avdb->adw_datid;
! worker->wi_workerpid = 0;
! worker->wi_launchtime = GetCurrentTimestamp();
!
! AutoVacuumShmem->av_startingWorker = sworker;
!
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+ return avdb->adw_datid;
+ }
+ else if (skipit)
+ {
+ /*
+ * If we skipped all databases on the list, rebuild it, because it
+ * probably contains a dropped database.
+ */
+ rebuild_database_list(InvalidOid);
+ }
+
+ return InvalidOid;
+ }
+
+ /*
+ * launch_worker
+ *
+ * Wrapper for starting a worker from the launcher. Besides actually starting
+ * it, update the database list to reflect the next time that another one will
+ * need to be started on the selected database. The actual database choice is
+ * left to do_start_worker.
+ *
+ * This routine is also expected to insert an entry into the database list if
+ * the selected database was previously absent from the list. It returns the
+ * new database list.
+ */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ Oid dbid;
+ Dlelem *elem;
+
+ dbid = do_start_worker();
+ if (OidIsValid(dbid))
+ {
+ /*
+ * Walk the database list and update the corresponding entry. If the
+ * database is not on the list, we'll recreate the list.
+ */
+ elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
+ while (elem != NULL)
+ {
+ avl_dbase *avdb = DLE_VAL(elem);
+
+ if (avdb->adl_datid == dbid)
+ {
+ /*
+ * add autovacuum_naptime seconds to the current time, and use
+ * that as the new "next_worker" field for this database.
+ */
+ avdb->adl_next_worker =
+ TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+
+ DLMoveToFront(elem);
+ break;
+ }
+ elem = DLGetSucc(elem);
+ }
+
+ /*
+ * If the database was not present in the database list, we rebuild the
+ * list. It's possible that the database does not get into the list
+ * anyway, for example if it's a database that doesn't have a pgstat
+ * entry, but this is not a problem because we don't want to schedule
+ * workers regularly into those in any case.
+ */
+ if (elem == NULL)
+ rebuild_database_list(dbid);
}
}
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 550,555 ****
--- 1162,1174 ----
got_SIGHUP = true;
}
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ got_SIGUSR1 = true;
+ }
+
static void
avlauncher_shutdown(SIGNAL_ARGS)
{
*************** NON_EXEC_STATIC void
*** 665,671 ****
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
! Oid dbid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
--- 1284,1290 ----
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
! Oid dbid = InvalidOid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 763,779 ****
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
! * Get the database Id we're going to work on, and announce our PID
! * in the shared memory area. We remove the database OID immediately
! * from the shared memory area.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! dbid = AutoVacuumShmem->process_db;
! AutoVacuumShmem->process_db = InvalidOid;
! AutoVacuumShmem->worker_pid = MyProcPid;
! LWLockRelease(AutovacuumLock);
if (OidIsValid(dbid))
{
--- 1382,1415 ----
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
! * Force statement_timeout to zero to avoid a timeout setting from
! * preventing regular maintenance from being executed.
! */
! SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
!
! /*
! * Get the info about the database we're going to work on.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+ dbid = MyWorkerInfo->wi_dboid;
+ MyWorkerInfo->wi_workerpid = MyProcPid;
+
+ /* insert into the running list */
+ SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
+ &MyWorkerInfo->wi_links);
+ /*
+ * remove from the "starting" pointer, so that the launcher can start a new
+ * worker if required
+ */
+ AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
+ LWLockRelease(AutovacuumLock);
! on_shmem_exit(FreeWorkerInfo, 0);
! /* wake up the launcher */
! if (AutoVacuumShmem->av_launcherpid != 0)
! kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
if (OidIsValid(dbid))
{
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 803,809 ****
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "Autovacuum context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
--- 1439,1445 ----
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! "AV worker",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 814,838 ****
}
/*
! * Now remove our PID from shared memory, so that the launcher can start
! * another worker as soon as appropriate.
*/
- LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
- AutoVacuumShmem->worker_pid = 0;
- LWLockRelease(AutovacuumLock);
/* All done, go away */
proc_exit(0);
}
/*
! * autovac_get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
! autovac_get_database_list(void)
{
char *filename;
List *dblist = NIL;
--- 1450,1586 ----
}
/*
! * FIXME -- we need to notify the launcher when we are gone. But this
! * should be done after our PGPROC is released, in ProcKill.
*/
/* All done, go away */
proc_exit(0);
}
/*
! * Return a WorkerInfo to the free list */
! static void
! FreeWorkerInfo(int code, Datum arg)
! {
! if (MyWorkerInfo != NULL)
! {
! LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
!
! SHMQueueDelete(&MyWorkerInfo->wi_links);
! MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! MyWorkerInfo->wi_dboid = InvalidOid;
! MyWorkerInfo->wi_tableoid = InvalidOid;
! MyWorkerInfo->wi_workerpid = 0;
! MyWorkerInfo->wi_launchtime = 0;
! MyWorkerInfo->wi_cost_delay = 0;
! MyWorkerInfo->wi_cost_limit = 0;
! MyWorkerInfo->wi_cost_limit_base = 0;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
! /* not mine anymore */
! MyWorkerInfo = NULL;
!
! /*
! * now that we're inactive, cause a rebalancing of the surviving
! * workers
! */
! AutoVacuumShmem->av_rebalance = true;
! LWLockRelease(AutovacuumLock);
! }
! }
!
! /*
! * Update the cost-based delay parameters, so that multiple workers consume
! * each a fraction of the total available I/O.
! */
! void
! AutoVacuumUpdateDelay(void)
! {
! if (MyWorkerInfo)
! {
! VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
! VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
! }
! }
!
! /*
! * autovac_balance_cost
! * Recalculate the cost limit setting for each active workers.
! *
! * Caller must hold the AutovacuumLock in exclusive mode.
! */
! static void
! autovac_balance_cost(void)
! {
! WorkerInfo worker;
! int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
! autovacuum_vac_cost_limit : VacuumCostLimit);
! int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
! autovacuum_vac_cost_delay : VacuumCostDelay);
! double cost_total;
! double cost_avail;
!
! /* not set? nothing to do */
! if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
! return;
!
! /* caculate the total base cost limit of active workers */
! cost_total = 0.0;
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &AutoVacuumShmem->av_runningWorkers,
! offsetof(WorkerInfoData, wi_links));
! while (worker)
! {
! if (worker->wi_workerpid != 0 &&
! worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! cost_total +=
! (double) worker->wi_cost_limit_base / worker->wi_cost_delay;
!
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &worker->wi_links,
! offsetof(WorkerInfoData, wi_links));
! }
! /* there are no cost limits -- nothing to do */
! if (cost_total <= 0)
! return;
!
! /*
! * Adjust each cost limit of active workers to balance the total of
! * cost limit to autovacuum_vacuum_cost_limit.
! */
! cost_avail = (double) vac_cost_limit / vac_cost_delay;
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &AutoVacuumShmem->av_runningWorkers,
! offsetof(WorkerInfoData, wi_links));
! while (worker)
! {
! if (worker->wi_workerpid != 0 &&
! worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! {
! int limit = (int)
! (cost_avail * worker->wi_cost_limit_base / cost_total);
!
! worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
!
! elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
! worker->wi_workerpid, worker->wi_dboid,
! worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
! }
!
! worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! &worker->wi_links,
! offsetof(WorkerInfoData, wi_links));
! }
! }
!
! /*
! * get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
! get_database_list(void)
{
char *filename;
List *dblist = NIL;
*************** autovac_get_database_list(void)
*** 852,866 ****
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
! autovac_dbase *avdb;
! avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
! avdb->ad_datid = db_id;
! avdb->ad_name = pstrdup(thisname);
! avdb->ad_frozenxid = db_frozenxid;
/* this gets set later: */
! avdb->ad_entry = NULL;
dblist = lappend(dblist, avdb);
}
--- 1600,1614 ----
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
! avw_dbase *avdb;
! avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
! avdb->adw_datid = db_id;
! avdb->adw_name = pstrdup(thisname);
! avdb->adw_frozenxid = db_frozenxid;
/* this gets set later: */
! avdb->adw_entry = NULL;
dblist = lappend(dblist, avdb);
}
*************** do_autovacuum(void)
*** 1008,1019 ****
* Add to the list of tables to vacuum, the OIDs of the tables that
* correspond to the saved OIDs of toast tables needing vacuum.
*/
! foreach (cell, toast_oids)
{
Oid toastoid = lfirst_oid(cell);
ListCell *cell2;
! foreach (cell2, table_toast_list)
{
av_relation *ar = lfirst(cell2);
--- 1756,1767 ----
* Add to the list of tables to vacuum, the OIDs of the tables that
* correspond to the saved OIDs of toast tables needing vacuum.
*/
! foreach(cell, toast_oids)
{
Oid toastoid = lfirst_oid(cell);
ListCell *cell2;
! foreach(cell2, table_toast_list)
{
av_relation *ar = lfirst(cell2);
*************** do_autovacuum(void)
*** 1038,1047 ****
--- 1786,1841 ----
Oid relid = lfirst_oid(cell);
autovac_table *tab;
char *relname;
+ WorkerInfo worker;
+ bool skipit;
CHECK_FOR_INTERRUPTS();
/*
+ * hold schedule lock from here until we're sure that this table
+ * still needs vacuuming. We also need the AutovacuumLock to walk
+ * the worker array, but we'll let go of that one quickly.
+ */
+ LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ LWLockAcquire(AutovacuumLock, LW_SHARED);
+
+ /*
+ * Check whether the table is being vacuumed concurrently by another
+ * worker.
+ */
+ skipit = false;
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &AutoVacuumShmem->av_runningWorkers,
+ offsetof(WorkerInfoData, wi_links));
+ while (worker)
+ {
+ /* ignore myself */
+ if (worker == MyWorkerInfo)
+ goto next_worker;
+
+ /* ignore workers in other databases */
+ if (worker->wi_dboid != MyDatabaseId)
+ goto next_worker;
+
+ if (worker->wi_tableoid == relid)
+ {
+ skipit = true;
+ break;
+ }
+
+ next_worker:
+ worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ &worker->wi_links,
+ offsetof(WorkerInfoData, wi_links));
+ }
+ LWLockRelease(AutovacuumLock);
+ if (skipit)
+ {
+ LWLockRelease(AutovacuumScheduleLock);
+ continue;
+ }
+
+ /*
* Check whether pgstat data still says we need to vacuum this table.
* It could have changed if something else processed the table while we
* weren't looking.
*************** do_autovacuum(void)
*** 1053,1063 ****
if (tab == NULL)
{
/* someone else vacuumed the table */
continue;
}
- /* Ok, good to go! */
! /* Set the vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
VacuumCostLimit = tab->at_vacuum_cost_limit;
--- 1847,1864 ----
if (tab == NULL)
{
/* someone else vacuumed the table */
+ LWLockRelease(AutovacuumScheduleLock);
continue;
}
! /*
! * Ok, good to go. Store the table in shared memory before releasing
! * the lock so that other workers don't vacuum it concurrently.
! */
! MyWorkerInfo->wi_tableoid = relid;
! LWLockRelease(AutovacuumScheduleLock);
!
! /* Set the initial vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
VacuumCostLimit = tab->at_vacuum_cost_limit;
*************** do_autovacuum(void)
*** 1067,1072 ****
--- 1868,1885 ----
(tab->at_doanalyze ? " ANALYZE" : ""),
relname);
+ /*
+ * Advertise my cost delay parameters for the balancing algorithm, and
+ * do a balance
+ */
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
+ MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
+ MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
+ autovac_balance_cost();
+ LWLockRelease(AutovacuumLock);
+
+ /* have at it */
autovacuum_do_vac_analyze(tab->at_relid,
tab->at_dovacuum,
tab->at_doanalyze,
*************** table_recheck_autovac(Oid relid)
*** 1211,1217 ****
PgStat_StatDBEntry *shared;
PgStat_StatDBEntry *dbentry;
! /* We need fresh pgstat data for this */
pgstat_clear_snapshot();
shared = pgstat_fetch_stat_dbentry(InvalidOid);
--- 2024,2030 ----
PgStat_StatDBEntry *shared;
PgStat_StatDBEntry *dbentry;
! /* use fresh stats */
pgstat_clear_snapshot();
shared = pgstat_fetch_stat_dbentry(InvalidOid);
*************** table_recheck_autovac(Oid relid)
*** 1219,1226 ****
/* fetch the relation's relcache entry */
classTup = SearchSysCacheCopy(RELOID,
! ObjectIdGetDatum(relid),
! 0, 0, 0);
if (!HeapTupleIsValid(classTup))
return NULL;
classForm = (Form_pg_class) GETSTRUCT(classTup);
--- 2032,2039 ----
/* fetch the relation's relcache entry */
classTup = SearchSysCacheCopy(RELOID,
! ObjectIdGetDatum(relid),
! 0, 0, 0);
if (!HeapTupleIsValid(classTup))
return NULL;
classForm = (Form_pg_class) GETSTRUCT(classTup);
*************** IsAutoVacuumWorkerProcess(void)
*** 1630,1636 ****
Size
AutoVacuumShmemSize(void)
{
! return sizeof(AutoVacuumShmemStruct);
}
/*
--- 2443,2458 ----
Size
AutoVacuumShmemSize(void)
{
! Size size;
!
! /*
! * Need the fixed struct and the array of WorkerInfoData.
! */
! size = sizeof(AutoVacuumShmemStruct);
! size = MAXALIGN(size);
! size = add_size(size, mul_size(autovacuum_max_workers,
! sizeof(WorkerInfoData)));
! return size;
}
/*
*************** AutoVacuumShmemInit(void)
*** 1650,1657 ****
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for autovacuum")));
- if (found)
- return; /* already initialized */
! MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
}
--- 2472,2500 ----
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for autovacuum")));
! if (!IsUnderPostmaster)
! {
! WorkerInfo worker;
! int i;
!
! Assert(!found);
!
! AutoVacuumShmem->av_launcherpid = 0;
! AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
! SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
! AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
!
! worker = (WorkerInfo) ((char *) AutoVacuumShmem +
! MAXALIGN(sizeof(AutoVacuumShmemStruct)));
!
! /* initialize the WorkerInfo free list */
! for (i = 0; i < autovacuum_max_workers; i++)
! {
! worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
! AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
! }
! }
! else
! Assert(found);
}
Index: src/backend/storage/lmgr/proc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/proc.c,v
retrieving revision 1.187
diff -c -p -r1.187 proc.c
*** src/backend/storage/lmgr/proc.c 3 Apr 2007 16:34:36 -0000 1.187
--- src/backend/storage/lmgr/proc.c 12 Apr 2007 16:07:09 -0000
*************** ProcGlobalShmemSize(void)
*** 96,102 ****
size = add_size(size, sizeof(PROC_HDR));
/* AuxiliaryProcs */
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! /* MyProcs */
size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
--- 96,102 ----
size = add_size(size, sizeof(PROC_HDR));
/* AuxiliaryProcs */
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! /* MyProcs, including autovacuum */
size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
*************** ProcGlobalShmemSize(void)
*** 110,116 ****
int
ProcGlobalSemas(void)
{
! /* We need a sema per backend, plus one for each auxiliary process. */
return MaxBackends + NUM_AUXILIARY_PROCS;
}
--- 110,119 ----
int
ProcGlobalSemas(void)
{
! /*
! * We need a sema per backend (including autovacuum), plus one for each
! * auxiliary process.
! */
return MaxBackends + NUM_AUXILIARY_PROCS;
}
*************** ProcGlobalSemas(void)
*** 127,134 ****
* running out when trying to start another backend is a common failure.
* So, now we grab enough semaphores to support the desired max number
* of backends immediately at initialization --- if the sysadmin has set
! * MaxBackends higher than his kernel will support, he'll find out sooner
! * rather than later.
*
* Another reason for creating semaphores here is that the semaphore
* implementation typically requires us to create semaphores in the
--- 130,137 ----
* running out when trying to start another backend is a common failure.
* So, now we grab enough semaphores to support the desired max number
* of backends immediately at initialization --- if the sysadmin has set
! * MaxConnections or autovacuum_max_workers higher than his kernel will
! * support, he'll find out sooner rather than later.
*
* Another reason for creating semaphores here is that the semaphore
* implementation typically requires us to create semaphores in the
*************** InitProcGlobal(void)
*** 163,187 ****
* Initialize the data structures.
*/
ProcGlobal->freeProcs = INVALID_OFFSET;
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
/*
* Pre-create the PGPROC structures and create a semaphore for each.
*/
! procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
! MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
! for (i = 0; i < MaxBackends; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
}
MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
{
--- 166,204 ----
* Initialize the data structures.
*/
ProcGlobal->freeProcs = INVALID_OFFSET;
+ ProcGlobal->autovacFreeProcs = INVALID_OFFSET;
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
/*
* Pre-create the PGPROC structures and create a semaphore for each.
*/
! procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
! MemSet(procs, 0, MaxConnections * sizeof(PGPROC));
! for (i = 0; i < MaxConnections; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
}
+ procs = (PGPROC *) ShmemAlloc((autovacuum_max_workers) * sizeof(PGPROC));
+ if (!procs)
+ ereport(FATAL,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory")));
+ MemSet(procs, 0, autovacuum_max_workers * sizeof(PGPROC));
+ for (i = 0; i < autovacuum_max_workers; i++)
+ {
+ PGSemaphoreCreate(&(procs[i].sem));
+ procs[i].links.next = ProcGlobal->autovacFreeProcs;
+ ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]);
+ }
+
MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
{
*************** InitProcGlobal(void)
*** 200,209 ****
void
InitProcess(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile PROC_HDR *procglobal = ProcGlobal;
SHMEM_OFFSET myOffset;
- int i;
/*
* ProcGlobal should be set up already (if we are a backend, we inherit
--- 217,227 ----
void
InitProcess(void)
{
+ int i;
+
/* use volatile pointer to prevent code rearrangement */
volatile PROC_HDR *procglobal = ProcGlobal;
SHMEM_OFFSET myOffset;
/*
* ProcGlobal should be set up already (if we are a backend, we inherit
*************** InitProcess(void)
*** 226,237 ****
set_spins_per_delay(procglobal->spins_per_delay);
! myOffset = procglobal->freeProcs;
if (myOffset != INVALID_OFFSET)
{
MyProc = (PGPROC *) MAKE_PTR(myOffset);
! procglobal->freeProcs = MyProc->links.next;
SpinLockRelease(ProcStructLock);
}
else
--- 244,261 ----
set_spins_per_delay(procglobal->spins_per_delay);
! if (IsAutoVacuumWorkerProcess())
! myOffset = procglobal->autovacFreeProcs;
! else
! myOffset = procglobal->freeProcs;
if (myOffset != INVALID_OFFSET)
{
MyProc = (PGPROC *) MAKE_PTR(myOffset);
! if (IsAutoVacuumWorkerProcess())
! procglobal->autovacFreeProcs = MyProc->links.next;
! else
! procglobal->freeProcs = MyProc->links.next;
SpinLockRelease(ProcStructLock);
}
else
*************** InitProcess(void)
*** 239,245 ****
/*
* If we reach here, all the PGPROCs are in use. This is one of the
* possible places to detect "too many backends", so give the standard
! * error message.
*/
SpinLockRelease(ProcStructLock);
ereport(FATAL,
--- 263,270 ----
/*
* If we reach here, all the PGPROCs are in use. This is one of the
* possible places to detect "too many backends", so give the standard
! * error message. XXX do we need to give a different failure message
! * in the autovacuum case?
*/
SpinLockRelease(ProcStructLock);
ereport(FATAL,
*************** ProcKill(int code, Datum arg)
*** 571,578 ****
SpinLockAcquire(ProcStructLock);
/* Return PGPROC structure (and semaphore) to freelist */
! MyProc->links.next = procglobal->freeProcs;
! procglobal->freeProcs = MAKE_OFFSET(MyProc);
/* PGPROC struct isn't mine anymore */
MyProc = NULL;
--- 596,611 ----
SpinLockAcquire(ProcStructLock);
/* Return PGPROC structure (and semaphore) to freelist */
! if (IsAutoVacuumWorkerProcess())
! {
! MyProc->links.next = procglobal->autovacFreeProcs;
! procglobal->autovacFreeProcs = MAKE_OFFSET(MyProc);
! }
! else
! {
! MyProc->links.next = procglobal->freeProcs;
! procglobal->freeProcs = MAKE_OFFSET(MyProc);
! }
/* PGPROC struct isn't mine anymore */
MyProc = NULL;
*************** ProcKill(int code, Datum arg)
*** 581,586 ****
--- 614,621 ----
procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay);
SpinLockRelease(ProcStructLock);
+
+ /* XXX need to notify the autovacuum launcher, in case this was a worker */
}
/*
Index: src/backend/utils/init/globals.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/init/globals.c,v
retrieving revision 1.100
diff -c -p -r1.100 globals.c
*** src/backend/utils/init/globals.c 5 Jan 2007 22:19:44 -0000 1.100
--- src/backend/utils/init/globals.c 10 Apr 2007 21:00:28 -0000
*************** bool allowSystemTableMods = false;
*** 95,103 ****
int work_mem = 1024;
int maintenance_work_mem = 16384;
! /* Primary determinants of sizes of shared-memory structures: */
int NBuffers = 1000;
int MaxBackends = 100;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
--- 95,108 ----
int work_mem = 1024;
int maintenance_work_mem = 16384;
! /*
! * Primary determinants of sizes of shared-memory structures. MaxBackends is
! * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign
! * hook):
! */
int NBuffers = 1000;
int MaxBackends = 100;
+ int MaxConnections = 90;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.384
diff -c -p -r1.384 guc.c
*** src/backend/utils/misc/guc.c 12 Apr 2007 06:53:47 -0000 1.384
--- src/backend/utils/misc/guc.c 12 Apr 2007 16:22:33 -0000
*************** static bool assign_tcp_keepalives_count(
*** 163,168 ****
--- 163,170 ----
static const char *show_tcp_keepalives_idle(void);
static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
+ static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
+ static bool assign_maxconnections(int newval, bool doit, GucSource source);
/*
* GUC option variables that are exported from this module
*************** static struct config_int ConfigureNamesI
*** 1149,1164 ****
* number.
*
* MaxBackends is limited to INT_MAX/4 because some places compute
! * 4*MaxBackends without any overflow check. Likewise we have to limit
! * NBuffers to INT_MAX/2.
*/
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
},
! &MaxBackends,
! 100, 1, INT_MAX / 4, NULL, NULL
},
{
--- 1151,1169 ----
* number.
*
* MaxBackends is limited to INT_MAX/4 because some places compute
! * 4*MaxBackends without any overflow check. This check is made on
! * assign_maxconnections, since MaxBackends is computed as MaxConnections +
! * autovacuum_max_workers.
! *
! * Likewise we have to limit NBuffers to INT_MAX/2.
*/
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
},
! &MaxConnections,
! 100, 1, INT_MAX / 4, assign_maxconnections, NULL
},
{
*************** static struct config_int ConfigureNamesI
*** 1622,1627 ****
--- 1627,1641 ----
&autovacuum_freeze_max_age,
200000000, 100000000, 2000000000, NULL, NULL
},
+ {
+ /* see max_connections */
+ {"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ NULL
+ },
+ &autovacuum_max_workers,
+ 3, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL
+ },
{
{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
*************** show_tcp_keepalives_count(void)
*** 6692,6696 ****
--- 6706,6737 ----
return nbuf;
}
+ static bool
+ assign_maxconnections(int newval, bool doit, GucSource source)
+ {
+ if (doit)
+ {
+ if (newval + autovacuum_max_workers > INT_MAX / 4)
+ return false;
+
+ MaxBackends = newval + autovacuum_max_workers;
+ }
+
+ return true;
+ }
+
+ static bool
+ assign_autovacuum_max_workers(int newval, bool doit, GucSource source)
+ {
+ if (doit)
+ {
+ if (newval + MaxConnections > INT_MAX / 4)
+ return false;
+
+ MaxBackends = newval + MaxConnections;
+ }
+
+ return true;
+ }
#include "guc-file.c"
Index: src/backend/utils/misc/postgresql.conf.sample
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/postgresql.conf.sample,v
retrieving revision 1.213
diff -c -p -r1.213 postgresql.conf.sample
*** src/backend/utils/misc/postgresql.conf.sample 19 Mar 2007 23:38:30 -0000 1.213
--- src/backend/utils/misc/postgresql.conf.sample 12 Apr 2007 16:19:11 -0000
***************
*** 376,381 ****
--- 376,382 ----
#autovacuum = on # enable autovacuum subprocess?
# 'on' requires stats_start_collector
# and stats_row_level to also be on
+ #autovacuum_max_workers = 3 # max # of autovacuum subprocesses
#autovacuum_naptime = 1min # time between autovacuum runs
#autovacuum_vacuum_threshold = 500 # min # of tuple updates before
# vacuum
Index: src/include/miscadmin.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/miscadmin.h,v
retrieving revision 1.193
diff -c -p -r1.193 miscadmin.h
*** src/include/miscadmin.h 1 Mar 2007 14:52:04 -0000 1.193
--- src/include/miscadmin.h 10 Apr 2007 20:51:06 -0000
*************** extern DLLIMPORT char *DataDir;
*** 129,134 ****
--- 129,135 ----
extern DLLIMPORT int NBuffers;
extern int MaxBackends;
+ extern int MaxConnections;
extern DLLIMPORT int MyProcPid;
extern DLLIMPORT struct Port *MyProcPort;
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h 15 Feb 2007 23:23:23 -0000 1.8
--- src/include/postmaster/autovacuum.h 12 Apr 2007 16:02:58 -0000
***************
*** 14,21 ****
--- 14,24 ----
#ifndef AUTOVACUUM_H
#define AUTOVACUUM_H
+ #include "storage/lock.h"
+
/* GUC variables */
extern bool autovacuum_start_daemon;
+ extern int autovacuum_max_workers;
extern int autovacuum_naptime;
extern int autovacuum_vac_thresh;
extern double autovacuum_vac_scale;
*************** extern void autovac_init(void);
*** 35,40 ****
--- 38,46 ----
extern int StartAutoVacLauncher(void);
extern int StartAutoVacWorker(void);
+ /* autovacuum cost-delay balancer */
+ extern void AutoVacuumUpdateDelay(void);
+
#ifdef EXEC_BACKEND
extern void AutoVacLauncherMain(int argc, char *argv[]);
extern void AutoVacWorkerMain(int argc, char *argv[]);
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.35
diff -c -p -r1.35 lwlock.h
*** src/include/storage/lwlock.h 3 Apr 2007 16:34:36 -0000 1.35
--- src/include/storage/lwlock.h 6 Apr 2007 02:20:17 -0000
*************** typedef enum LWLockId
*** 61,66 ****
--- 61,67 ----
BtreeVacuumLock,
AddinShmemInitLock,
AutovacuumLock,
+ AutovacuumScheduleLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
Index: src/include/storage/proc.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/proc.h,v
retrieving revision 1.97
diff -c -p -r1.97 proc.h
*** src/include/storage/proc.h 3 Apr 2007 16:34:36 -0000 1.97
--- src/include/storage/proc.h 12 Apr 2007 15:19:34 -0000
*************** typedef struct PROC_HDR
*** 115,120 ****
--- 115,122 ----
{
/* Head of list of free PGPROC structures */
SHMEM_OFFSET freeProcs;
+ /* Head of list of autovacuum's free PGPROC structures */
+ SHMEM_OFFSET autovacFreeProcs;
/* Current shared estimate of appropriate spins_per_delay value */
int spins_per_delay;
} PROC_HDR;