Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.349
diff -c -p -r1.349 vacuum.c
*** src/backend/commands/vacuum.c	14 Mar 2007 18:48:55 -0000	1.349
--- src/backend/commands/vacuum.c	11 Apr 2007 23:43:23 -0000
*************** vacuum_delay_point(void)
*** 3504,3509 ****
--- 3504,3512 ----
  
  		VacuumCostBalance = 0;
  
+ 		/* update balance values for workers */
+ 		AutoVacuumUpdateDelay();
+ 
  		/* Might have gotten an interrupt while sleeping */
  		CHECK_FOR_INTERRUPTS();
  	}
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c	28 Mar 2007 22:17:12 -0000	1.40
--- src/backend/postmaster/autovacuum.c	12 Apr 2007 20:16:08 -0000
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 60,66 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
*************** int			autovacuum_freeze_max_age;
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 71,77 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flags to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
*************** static int	default_freeze_min_age;
*** 82,95 ****
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
  {
! 	Oid			ad_datid;
! 	char	   *ad_name;
! 	TransactionId ad_frozenxid;
! 	PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
--- 84,105 ----
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
  {
! 	Oid			adl_datid;			/* hash key -- must be first */
! 	TimestampTz	adl_next_worker;
! 	int			adl_score;
! } avl_dbase;
! 
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! 	Oid			adw_datid;
! 	char	   *adw_name;
! 	TransactionId adw_frozenxid;
! 	PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 120,189 ----
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_links		entry into free list or running list
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_launchtime Time at which this worker was launched
+  * wi_cost_*	Vacuum cost-based delay parameters current in this worker
+  *
+  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+  * protected by AutovacuumScheduleLock (which is read-only for everyone except
+  * that worker itself).
+  *-------------
+  */
+ typedef struct WorkerInfoData
+ {
+ 	SHM_QUEUE	wi_links;
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	TimestampTz	wi_launchtime;
+ 	int			wi_cost_delay;
+ 	int			wi_cost_limit;
+ 	int			wi_cost_limit_base;
+ } WorkerInfoData;
+ 
+ typedef struct WorkerInfoData *WorkerInfo;
+ 
+ /*-------------
+  * The main autovacuum shmem struct.  On shared memory we store this main
+  * struct and the array of WorkerInfo structs.  This struct keeps:
+  *
+  * av_launcherpid	the PID of the autovacuum launcher
+  * av_freeWorkers	the WorkerInfo freelist
+  * av_runningWorkers the WorkerInfo non-free queue
+  * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+  *					the worker itself as soon as it's up and running)
+  * av_rebalance		true when a worker determines that cost limits must be
+  * 					rebalanced
+  *
+  * This struct is protected by AutovacuumLock.
+  *-------------
+  */
  typedef struct
  {
! 	pid_t			av_launcherpid;
! 	SHMEM_OFFSET	av_freeWorkers;
! 	SHM_QUEUE		av_runningWorkers;
! 	SHMEM_OFFSET	av_startingWorker;
! 	bool			av_rebalance;
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+ 
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ static WorkerInfo	MyWorkerInfo = NULL;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_start_worker(void);
  static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 191,206 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(void);
! static uint64 launcher_determine_sleep(bool canlaunch, bool recursing);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! static void autovac_balance_cost(void);
! 
  static void do_autovacuum(void);
! static void FreeWorkerInfo(int code, Datum arg);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static PgStat_StatTabEntry *get_pgstat_t
*** 147,152 ****
--- 220,226 ----
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
*************** StartAutoVacLauncher(void)
*** 230,241 ****
  
  /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
- 	MemoryContext	avlauncher_cxt;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 304,334 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * When the worker exits, it resets the WorkerInfo struct and puts it back into
+  * the free list.  It must also signal the launcher.  The launcher wakes up and
+  * can launch a new worker if it needs to, or it might just go back to sleep.
+  *
+  * There is a potential problem if, for some reason, a worker starts and is not
+  * able to bootstrap itself correctly.  To prevent this situation from starving
+  * the whole system, the launcher checks the launch time of the "starting
+  * worker".  If it's too old (older than autovacuum_naptime seconds), it resets
+  * the worker entry and puts it back into the free list.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 357,362 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 366,372 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! 										   "Autovacuum Launcher",
! 										   ALLOCSET_DEFAULT_MINSIZE,
! 										   ALLOCSET_DEFAULT_INITSIZE,
! 										   ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(avlauncher_cxt);
  
  
  	/*
--- 390,401 ----
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 										  "Autovacuum Launcher",
! 										  ALLOCSET_DEFAULT_MINSIZE,
! 										  ALLOCSET_DEFAULT_INITSIZE,
! 										  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  
  	/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(avlauncher_cxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(avlauncher_cxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
--- 426,436 ----
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
  	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 451,482 ----
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
+ 	/* must unblock signals before calling rebuild_database_list */
  	PG_SETMASK(&UnBlockSig);
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker();
+ 		proc_exit(0);		/* done */
+ 	}
+ 
+ 	AutoVacuumShmem->av_launcherpid = MyProcPid;
+ 
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time.  As soon as an entry
! 	 * is updated to a higher time, it will be moved to the front (which is
! 	 * correct because the only operation is to add autovacuum_naptime to the
! 	 * entry, and time always increases).
  	 */
! 	rebuild_database_list(InvalidOid);
  
  	for (;;)
  	{
! 		uint64		micros;
! 		bool	can_launch;
! 		TimestampTz current_time = 0;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 485,497 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ 										  INVALID_OFFSET, false);
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(micros);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
*************** AutoVacLauncherMain(int argc, char *argv
*** 388,469 ****
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
- 		worker_pid = AutoVacuumShmem->worker_pid;
- 		LWLockRelease(AutovacuumLock);
  
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
  			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
! 		do_start_worker();
  
! sleep:
! 		/*
! 		 * in emergency mode, exit immediately so that the postmaster can
! 		 * request another run right away if needed.
! 		 *
! 		 * XXX -- maybe it would be better to handle this inside the launcher
! 		 * itself.
! 		 */
! 		if (!autovacuum_start_daemon)
! 			break;
  
! 		/* have pgstat read the file again next time */
! 		pgstat_clear_snapshot();
  
! 		/* now sleep until the next autovac iteration */
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
  
  	proc_exit(0);		/* done */
  }
  
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.
   */
! static void
  do_start_worker(void)
  {
  	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	ListCell *cell;
  	TransactionId xidForceLimit;
  
  	/* Get a list of databases */
! 	dblist = autovac_get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 499,949 ----
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
+ 
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 			autovac_balance_cost();
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			got_SIGUSR1 = false;
+ 
+ 			/* rebalance cost limits, if needed */
+ 			if (AutoVacuumShmem->av_rebalance)
+ 			{
+ 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 				AutoVacuumShmem->av_rebalance = false;
+ 				autovac_balance_cost();
+ 				LWLockRelease(AutovacuumLock);
+ 			}
  		}
  
  		/*
! 		 * There are some conditions that we need to check before trying to
! 		 * start a launcher.  First, we need to make sure that there is a
! 		 * launcher slot available.  Second, we need to make sure that no other
! 		 * worker is still starting up.
  		 */
+ 
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
  
! 		can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
! 
! 		if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
  		{
! 			long	secs;
! 			int		usecs;
! 			WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 
! 			if (current_time == 0)
! 				current_time = GetCurrentTimestamp();
! 
! 			/*
! 			 * We can't launch another worker when another one is still
! 			 * starting up, so just sleep for a bit more; that worker will wake
! 			 * us up again as soon as it's ready.  We will only wait
! 			 * autovacuum_naptime seconds for this to happen however.  Note
! 			 * that failure to connect to a particular database is not a
! 			 * problem here, because the worker removes itself from the
! 			 * startingWorker pointer before trying to connect; only low-level
! 			 * problems, like fork() failure, can get us here.
! 			 */
! 			TimestampDifference(worker->wi_launchtime, current_time,
! 								&secs, &usecs);
  
! 			/* ignore microseconds, as they cannot make any difference */
! 			if (secs > autovacuum_naptime)
! 			{
! 				LWLockRelease(AutovacuumLock);
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				/*
! 				 * No other process can put a worker in starting mode, so if
! 				 * startingWorker is still INVALID after exchanging our lock,
! 				 * we assume it's the same one we saw above (so we don't
! 				 * recheck the launch time).
! 				 */
! 				if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
! 				{
! 					worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 					worker->wi_dboid = InvalidOid;
! 					worker->wi_tableoid = InvalidOid;
! 					worker->wi_workerpid = 0;
! 					worker->wi_launchtime = 0;
! 					worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 					AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
! 					AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! 				}
! 			}
  			else
  			{
  				/*
! 				 * maybe the postmaster neglected this start signal --
! 				 * resend it.  Note: the constraints in
! 				 * launcher_determine_sleep keep us from delivering signals too
! 				 * quickly (at most once every 100ms).
  				 */
! 				SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
! 				can_launch = false;
  			}
  		}
+ 		LWLockRelease(AutovacuumLock);		/* either shared or exclusive */
  
! 		if (can_launch)
! 		{
! 			Dlelem	   *elem;
  
! 			elem = DLGetTail(DatabaseList);
  
! 			if (current_time == 0)
! 				current_time = GetCurrentTimestamp();
  
! 			if (elem != NULL)
! 			{
! 				avl_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
! 
! 				/* do we have to start a worker? */
! 				if (secs <= 0 && usecs <= 0)
! 					launch_worker(current_time);
! 			}
! 			else
! 			{
! 				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).  Note that the constraints
! 				 * in launcher_determine_sleep keep us from starting workers
! 				 * too quickly (at most once every autovacuum_naptime when the
! 				 * list is empty).
! 				 */
! 				launch_worker(current_time);
! 			}
! 		}
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
+ 	AutoVacuumShmem->av_launcherpid = 0;
  
  	proc_exit(0);		/* done */
  }
  
  /*
+  * Determine the time to sleep, in microseconds, based on the database list.
+  *
+  * The "canlaunch" parameter indicates whether we can start a worker right now,
+  * for example due to the workers being all busy.
+  */
+ static uint64
+ launcher_determine_sleep(bool canlaunch, bool recursing)
+ {
+ 	long	secs;
+ 	int		usecs;
+ 	Dlelem *elem;
+ 
+ 	/*
+ 	 * We sleep until the next scheduled vacuum.  We trust that when the
+ 	 * database list was built, care was taken so that no entries have times in
+ 	 * the past; if the first entry has too close a next_worker value, or a
+ 	 * time in the past, we will sleep a small nominal time.
+ 	 */
+ 	if (!canlaunch)
+ 	{
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ 	{
+ 		avl_dbase  *avdb = DLE_VAL(elem);
+ 		TimestampTz	current_time = GetCurrentTimestamp();
+ 		TimestampTz	next_wakeup;
+ 
+ 		next_wakeup = avdb->adl_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 
+ 	/*
+ 	 * If the result is exactly zero, it means a database had an entry with
+ 	 * time in the past.  Rebuild the list so that the databases are evenly
+ 	 * distributed again, and recalculate the time to sleep.  This can happen
+ 	 * if there are more tables needing vacuum than workers, and they all take
+ 	 * longer to vacuum than autovacuum_naptime.
+ 	 *
+ 	 * We only recurse once.  rebuild_database_list should always return times
+ 	 * in the future, but it seems best not to trust too much on that.
+ 	 */
+ 	if (secs == 0L && usecs == 0 && !recursing)
+ 	{
+ 		rebuild_database_list(InvalidOid);
+ 		return launcher_determine_sleep(canlaunch, true);
+ 	}
+ 
+ 	/* 100ms is the smallest time we'll allow the launcher to sleep */
+ 	if (secs <= 0L && usecs <= 100000)
+ 	{
+ 		secs = 0L;
+ 		usecs = 100000;	/* 100 ms */
+ 	}
+ 
+ 	return secs * 1000000 + usecs;
+ }
+ 
+ /*
+  * Build an updated DatabaseList.  It must only contain databases that appear
+  * in pgstats, and must be sorted by next_worker from highest to lowest,
+  * distributed regularly across the next autovacuum_naptime interval.
+  *
+  * Receives the Oid of the database that made this list be generated (we call
+  * this the "new" database, because when the database was already present on
+  * the list, we expect that this function is not called at all).  The
+  * preexisting list, if any, will be used to preserve the order of the
+  * databases in the autovacuum_naptime period.  The new database is put at the
+  * end of the interval.  The actual values are not saved, which should not be
+  * much of a problem.
+  */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	MemoryContext newcxt;
+ 	MemoryContext oldcxt;
+ 	MemoryContext tmpcxt;
+ 	HASHCTL		hctl;
+ 	int			score;
+ 	int			nelems;
+ 	HTAB	   *dbhash;
+ 
+ 	/* use fresh stats */
+ 	pgstat_clear_snapshot();
+ 
+ 	newcxt = AllocSetContextCreate(AutovacMemCxt,
+ 								   "AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	tmpcxt = AllocSetContextCreate(newcxt,
+ 								   "tmp AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	oldcxt = MemoryContextSwitchTo(tmpcxt);
+ 
+ 	/*
+ 	 * Implementing this is not as simple as it sounds, because we need to put
+ 	 * the new database at the end of the list; next the databases that were
+ 	 * already on the list, and finally (at the tail of the list) all the other
+ 	 * databases that are not on the existing list.
+ 	 *
+ 	 * To do this, we build an empty hash table of scored databases.  We will
+ 	 * start with the lowest score (zero) for the new database, then increasing
+ 	 * scores for the databases in the existing list, in order, and lastly
+ 	 * increasing scores for all databases gotten via get_database_list() that
+ 	 * are not already on the hash.
+ 	 *
+ 	 * Then we will put all the hash elements into an array, sort the array by
+ 	 * score, and finally put the array elements into the new doubly linked
+ 	 * list.
+ 	 */
+ 	hctl.keysize = sizeof(Oid);
+ 	hctl.entrysize = sizeof(avl_dbase);
+ 	hctl.hash = oid_hash;
+ 	hctl.hcxt = tmpcxt;
+ 	dbhash = hash_create("db hash", 20, &hctl,	/* magic number here FIXME */
+ 						 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	/* start by inserting the new database */
+ 	score = 0;
+ 	if (OidIsValid(newdb))
+ 	{
+ 		avl_dbase	*db;
+ 		PgStat_StatDBEntry *entry;
+ 
+ 		/* only consider this database if it has a pgstat entry */
+ 		entry = pgstat_fetch_stat_dbentry(newdb);
+ 		if (entry != NULL)
+ 		{
+ 			/* we assume it isn't found because the hash was just created */
+ 			db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+ 
+ 			/* hash_search already filled in the key */
+ 			db->adl_score = score++;
+ 			/* next_worker is filled in later */
+ 		}
+ 	}
+ 
+ 	/* Now insert the databases from the existing list */
+ 	if (DatabaseList != NULL)
+ 	{
+ 		Dlelem	*elem;
+ 
+ 		elem = DLGetHead(DatabaseList);
+ 		while (elem != NULL)
+ 		{
+ 			avl_dbase  *avdb = DLE_VAL(elem);
+ 			avl_dbase  *db;
+ 			bool		found;
+ 			PgStat_StatDBEntry *entry;
+ 
+ 			elem = DLGetSucc(elem);
+ 
+ 			/*
+ 			 * skip databases with no stat entries -- in particular, this
+ 			 * gets rid of dropped databases
+ 			 */
+ 			entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ 			if (entry == NULL)
+ 				continue;
+ 
+ 			db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+ 
+ 			if (!found)
+ 			{
+ 				/* hash_search already filled in the key */
+ 				db->adl_score = score++;
+ 				/* next_worker is filled in later */
+ 			}
+ 		}
+ 	}
+ 
+ 	/* finally, insert all qualifying databases not previously inserted */
+ 	dblist = get_database_list();
+ 	foreach(cell, dblist)
+ 	{
+ 		avw_dbase  *avdb = lfirst(cell);
+ 		avl_dbase  *db;
+ 		bool		found;
+ 		PgStat_StatDBEntry *entry;
+ 
+ 		/* only consider databases with a pgstat entry */
+ 		entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ 		if (entry == NULL)
+ 			continue;
+ 
+ 		db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ 		/* only update the score if the database was not already on the hash */
+ 		if (!found)
+ 		{
+ 			/* hash_search already filled in the key */
+ 			db->adl_score = score++;
+ 			/* next_worker is filled in later */
+ 		}
+ 	}
+ 	nelems = score;
+ 
+ 	/* from here on, the allocated memory belongs to the new list */
+ 	MemoryContextSwitchTo(newcxt);
+ 	DatabaseList = DLNewList();
+ 
+ 	if (nelems > 0)
+ 	{
+ 		TimestampTz		current_time;
+ 		int				millis_increment;
+ 		avl_dbase	   *dbary;
+ 		avl_dbase	   *db;
+ 		HASH_SEQ_STATUS	seq;
+ 		int				i;
+ 
+ 		/* put all the hash elements into an array */
+ 		dbary = palloc(nelems * sizeof(avl_dbase));
+ 
+ 		i = 0;
+ 		hash_seq_init(&seq, dbhash);
+ 		while ((db = hash_seq_search(&seq)) != NULL)
+ 			memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+ 
+ 		/* sort the array */
+ 		qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+ 
+ 		/* this is the time interval between databases in the schedule */
+ 		millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ 		current_time = GetCurrentTimestamp();
+ 
+ 		/*
+ 		 * move the elements from the array into the dllist, setting the 
+ 		 * next_worker while walking the array
+ 		 */
+ 		for (i = 0; i < nelems; i++)
+ 		{
+ 			avl_dbase  *db = &(dbary[i]);
+ 			Dlelem	   *elem;
+ 
+ 			current_time = TimestampTzPlusMilliseconds(current_time,
+ 													   millis_increment);
+ 			db->adl_next_worker = current_time;
+ 
+ 			elem = DLNewElem(db);
+ 			/* later elements should go closer to the head of the list */
+ 			DLAddHead(DatabaseList, elem);
+ 		}
+ 	}
+ 
+ 	/* all done, clean up memory */
+ 	if (DatabaseListCxt != NULL)
+ 		MemoryContextDelete(DatabaseListCxt);
+ 	MemoryContextDelete(tmpcxt);
+ 	DatabaseListCxt = newcxt;
+ 	MemoryContextSwitchTo(oldcxt);
+ }
+ 
+ /* qsort comparator for avl_dbase, using adl_score */
+ static int
+ db_comparator(const void *a, const void *b)
+ {
+ 	if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ 		return 0;
+ 	else
+ 		return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+ }
+ 
+ /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.  It fails gracefully if invoked when
!  * autovacuum_workers are already active.
!  *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
! static Oid
  do_start_worker(void)
  {
  	List	   *dblist;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
+ 	bool		for_xid_wrap;
+ 	avw_dbase  *avdb;
+ 	TimestampTz	current_time;
+ 	bool		skipit = false;
+ 
+ 	/* return quickly when there are no free workers */
+ 	LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 	if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+ 	{
+ 		LWLockRelease(AutovacuumLock);
+ 		return InvalidOid;
+ 	}
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	/* use fresh stats */
+ 	pgstat_clear_snapshot();
  
  	/* Get a list of databases */
! 	dblist = get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
*************** do_start_worker(void)
*** 495,515 ****
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	db = NULL;
  	for_xid_wrap = false;
  	foreach(cell, dblist)
  	{
! 		autovac_dbase *tmp = lfirst(cell);
  
  		/* Find pgstat entry if any */
! 		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
  
  		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (db == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! 				db = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
--- 975,997 ----
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	avdb = NULL;
  	for_xid_wrap = false;
+ 	current_time = GetCurrentTimestamp();
  	foreach(cell, dblist)
  	{
! 		avw_dbase  *tmp = lfirst(cell);
! 		Dlelem	   *elem;
  
  		/* Find pgstat entry if any */
! 		tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
  
  		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
  		{
! 			if (avdb == NULL ||
! 				TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
! 				avdb = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
*************** do_start_worker(void)
*** 520,545 ****
  		 * Otherwise, skip a database with no pgstat entry; it means it
  		 * hasn't seen any activity.
  		 */
! 		if (!tmp->ad_entry)
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (db == NULL ||
! 			tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! 			db = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (db != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		AutoVacuumShmem->process_db = db->ad_datid;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
  	}
  }
  
--- 1002,1157 ----
  		 * Otherwise, skip a database with no pgstat entry; it means it
  		 * hasn't seen any activity.
  		 */
! 		if (!tmp->adw_entry)
! 			continue;
! 
! 		/*
! 		 * Also, skip a database that appears on the database list as having
! 		 * been processed recently (less than autovacuum_naptime seconds ago).
! 		 * We do this so that we don't select a database which we just
! 		 * selected, but that pgstat hasn't gotten around to updating the last
! 		 * autovacuum time yet.
! 		 */
! 		skipit = false;
! 		elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
! 
! 		while (elem != NULL)
! 		{
! 			avl_dbase *dbp = DLE_VAL(elem);
! 
! 			if (dbp->adl_datid == tmp->adw_datid)
! 			{
! 				TimestampTz		curr_plus_naptime;
! 				TimestampTz		next = dbp->adl_next_worker;
! 				
! 				curr_plus_naptime =
! 					TimestampTzPlusMilliseconds(current_time,
! 												autovacuum_naptime * 1000);
! 
! 				/*
! 				 * What we want here if to skip if next_worker falls between
! 				 * the current time and the current time plus naptime.
! 				 */
! 				if (timestamp_cmp_internal(current_time, next) > 0)
! 					skipit = false;
! 				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! 					skipit = false;
! 				else
! 					skipit = true;
! 
! 				break;
! 			}
! 			elem = DLGetPred(elem);
! 		}
! 		if (skipit)
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (avdb == NULL ||
! 			tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
! 			avdb = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (avdb != NULL)
  	{
+ 		WorkerInfo	worker;
+ 		SHMEM_OFFSET sworker;
+ 
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 		/*
! 		 * Get a worker entry from the freelist.  We checked above, so there
! 		 * really should be a free slot -- complain very loudly if there isn't.
! 		 */
! 		sworker = AutoVacuumShmem->av_freeWorkers;
! 		if (sworker == INVALID_OFFSET)
! 			elog(FATAL, "no free worker found");
! 
! 		worker = (WorkerInfo) MAKE_PTR(sworker);
! 		AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
! 
! 		worker->wi_dboid = avdb->adw_datid;
! 		worker->wi_workerpid = 0;
! 		worker->wi_launchtime = GetCurrentTimestamp();
! 
! 		AutoVacuumShmem->av_startingWorker = sworker;
! 
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+ 
+ 		return avdb->adw_datid;
+ 	}
+ 	else if (skipit)
+ 	{
+ 		/*
+ 		 * If we skipped all databases on the list, rebuild it, because it
+ 		 * probably contains a dropped database.
+ 		 */
+ 		rebuild_database_list(InvalidOid);
+ 	}
+ 
+ 	return InvalidOid;
+ }
+ 
+ /*
+  * launch_worker
+  *
+  * Wrapper for starting a worker from the launcher.  Besides actually starting
+  * it, update the database list to reflect the next time that another one will
+  * need to be started on the selected database.  The actual database choice is
+  * left to do_start_worker.
+  *
+  * This routine is also expected to insert an entry into the database list if
+  * the selected database was previously absent from the list.  It returns the
+  * new database list.
+  */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ 	Oid		dbid;
+ 	Dlelem *elem;
+ 
+ 	dbid = do_start_worker();
+ 	if (OidIsValid(dbid))
+ 	{
+ 		/*
+ 		 * Walk the database list and update the corresponding entry.  If the
+ 		 * database is not on the list, we'll recreate the list.
+ 		 */
+ 		elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
+ 		while (elem != NULL)
+ 		{
+ 			avl_dbase *avdb = DLE_VAL(elem);
+ 
+ 			if (avdb->adl_datid == dbid)
+ 			{
+ 				/*
+ 				 * add autovacuum_naptime seconds to the current time, and use
+ 				 * that as the new "next_worker" field for this database.
+ 				 */
+ 				avdb->adl_next_worker =
+ 					TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+ 
+ 				DLMoveToFront(elem);
+ 				break;
+ 			}
+ 			elem = DLGetSucc(elem);
+ 		}
+ 
+ 		/*
+ 		 * If the database was not present in the database list, we rebuild the
+ 		 * list.  It's possible that the database does not get into the list
+ 		 * anyway, for example if it's a database that doesn't have a pgstat
+ 		 * entry, but this is not a problem because we don't want to schedule
+ 		 * workers regularly into those in any case.
+ 		 */
+ 		if (elem == NULL)
+ 			rebuild_database_list(dbid);
  	}
  }
  
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 550,555 ****
--- 1162,1174 ----
  	got_SIGHUP = true;
  }
  
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ 	got_SIGUSR1 = true;
+ }
+ 
  static void
  avlauncher_shutdown(SIGNAL_ARGS)
  {
*************** NON_EXEC_STATIC void
*** 665,671 ****
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 1284,1290 ----
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid = InvalidOid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 763,779 ****
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
  
! 	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
  	{
--- 1382,1415 ----
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Force statement_timeout to zero to avoid a timeout setting from
! 	 * preventing regular maintenance from being executed.
! 	 */
! 	SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
! 
! 	/*
! 	 * Get the info about the database we're going to work on.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 	MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
+ 	dbid = MyWorkerInfo->wi_dboid;
+ 	MyWorkerInfo->wi_workerpid = MyProcPid;
+ 
+ 	/* insert into the running list */
+ 	SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, 
+ 						 &MyWorkerInfo->wi_links);
+ 	/*
+ 	 * remove from the "starting" pointer, so that the launcher can start a new
+ 	 * worker if required
+ 	 */
+ 	AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
+ 	LWLockRelease(AutovacuumLock);
  
! 	on_shmem_exit(FreeWorkerInfo, 0);
  
! 	/* wake up the launcher */
! 	if (AutoVacuumShmem->av_launcherpid != 0)
! 		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
  
  	if (OidIsValid(dbid))
  	{
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 803,809 ****
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 											  "Autovacuum context",
  											  ALLOCSET_DEFAULT_MINSIZE,
  											  ALLOCSET_DEFAULT_INITSIZE,
  											  ALLOCSET_DEFAULT_MAXSIZE);
--- 1439,1445 ----
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 											  "AV worker",
  											  ALLOCSET_DEFAULT_MINSIZE,
  											  ALLOCSET_DEFAULT_INITSIZE,
  											  ALLOCSET_DEFAULT_MAXSIZE);
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 814,838 ****
  	}
  
  	/*
! 	 * Now remove our PID from shared memory, so that the launcher can start
! 	 * another worker as soon as appropriate.
  	 */
- 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
- 	AutoVacuumShmem->worker_pid = 0;
- 	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
  	proc_exit(0);
  }
  
  /*
!  * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! autovac_get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
--- 1450,1586 ----
  	}
  
  	/*
! 	 * FIXME -- we need to notify the launcher when we are gone.  But this
! 	 * should be done after our PGPROC is released, in ProcKill.
  	 */
  
  	/* All done, go away */
  	proc_exit(0);
  }
  
  /*
!  * Return a WorkerInfo to the free list */
! static void
! FreeWorkerInfo(int code, Datum arg)
! {
! 	if (MyWorkerInfo != NULL)
! 	{
! 		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 		SHMQueueDelete(&MyWorkerInfo->wi_links);
! 		MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 		MyWorkerInfo->wi_dboid = InvalidOid;
! 		MyWorkerInfo->wi_tableoid = InvalidOid;
! 		MyWorkerInfo->wi_workerpid = 0;
! 		MyWorkerInfo->wi_launchtime = 0;
! 		MyWorkerInfo->wi_cost_delay = 0;
! 		MyWorkerInfo->wi_cost_limit = 0;
! 		MyWorkerInfo->wi_cost_limit_base = 0;
! 		AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
! 		/* not mine anymore */
! 		MyWorkerInfo = NULL;
! 
! 		/*
! 		 * now that we're inactive, cause a rebalancing of the surviving
! 		 * workers
! 		 */
! 		AutoVacuumShmem->av_rebalance = true;
! 		LWLockRelease(AutovacuumLock);
! 	}
! }
! 
! /*
!  * Update the cost-based delay parameters, so that multiple workers consume
!  * each a fraction of the total available I/O.
!  */
! void
! AutoVacuumUpdateDelay(void)
! {
! 	if (MyWorkerInfo)
! 	{
! 		VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
! 		VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
! 	}
! }
! 
! /*
!  * autovac_balance_cost
!  *		Recalculate the cost limit setting for each active workers.
!  *
!  * Caller must hold the AutovacuumLock in exclusive mode.
!  */
! static void
! autovac_balance_cost(void)
! {
! 	WorkerInfo	worker;
! 	int         vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
! 								  autovacuum_vac_cost_limit : VacuumCostLimit);
! 	int         vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
! 								  autovacuum_vac_cost_delay : VacuumCostDelay);
! 	double      cost_total;
! 	double      cost_avail;
! 
! 	/* not set? nothing to do */
! 	if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
! 		return;
! 
! 	/* caculate the total base cost limit of active workers */
! 	cost_total = 0.0;
! 	worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 									   &AutoVacuumShmem->av_runningWorkers,
! 									   offsetof(WorkerInfoData, wi_links));
! 	while (worker)
! 	{
! 		if (worker->wi_workerpid != 0 &&
! 			worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! 			cost_total +=
! 				(double) worker->wi_cost_limit_base / worker->wi_cost_delay;
! 
! 		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 										   &worker->wi_links,
! 										   offsetof(WorkerInfoData, wi_links));
! 	}
! 	/* there are no cost limits -- nothing to do */
! 	if (cost_total <= 0)
! 		return;
! 
! 	/*
! 	 * Adjust each cost limit of active workers to balance the total of
! 	 * cost limit to autovacuum_vacuum_cost_limit.
! 	 */
! 	cost_avail = (double) vac_cost_limit / vac_cost_delay;
! 	worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 									   &AutoVacuumShmem->av_runningWorkers,
! 									   offsetof(WorkerInfoData, wi_links));
! 	while (worker)
! 	{
! 		if (worker->wi_workerpid != 0 &&
! 			worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! 		{
! 			int     limit = (int)
! 				(cost_avail * worker->wi_cost_limit_base / cost_total);
! 
! 			worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
! 
! 			elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
! 				 worker->wi_workerpid, worker->wi_dboid,
! 				 worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
! 		}
! 
! 		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 										   &worker->wi_links,
! 										   offsetof(WorkerInfoData, wi_links));
! 	}
! }
! 
! /*
!  * get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
*************** autovac_get_database_list(void)
*** 852,866 ****
  	while (read_pg_database_line(db_file, thisname, &db_id,
  								 &db_tablespace, &db_frozenxid))
  	{
! 		autovac_dbase *avdb;
  
! 		avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
  
! 		avdb->ad_datid = db_id;
! 		avdb->ad_name = pstrdup(thisname);
! 		avdb->ad_frozenxid = db_frozenxid;
  		/* this gets set later: */
! 		avdb->ad_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
  	}
--- 1600,1614 ----
  	while (read_pg_database_line(db_file, thisname, &db_id,
  								 &db_tablespace, &db_frozenxid))
  	{
! 		avw_dbase *avdb;
  
! 		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
! 		avdb->adw_datid = db_id;
! 		avdb->adw_name = pstrdup(thisname);
! 		avdb->adw_frozenxid = db_frozenxid;
  		/* this gets set later: */
! 		avdb->adw_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
  	}
*************** do_autovacuum(void)
*** 1008,1019 ****
  	 * Add to the list of tables to vacuum, the OIDs of the tables that
  	 * correspond to the saved OIDs of toast tables needing vacuum.
  	 */
! 	foreach (cell, toast_oids)
  	{
  		Oid		toastoid = lfirst_oid(cell);
  		ListCell *cell2;
  
! 		foreach (cell2, table_toast_list)
  		{
  			av_relation	   *ar = lfirst(cell2);
  
--- 1756,1767 ----
  	 * Add to the list of tables to vacuum, the OIDs of the tables that
  	 * correspond to the saved OIDs of toast tables needing vacuum.
  	 */
! 	foreach(cell, toast_oids)
  	{
  		Oid		toastoid = lfirst_oid(cell);
  		ListCell *cell2;
  
! 		foreach(cell2, table_toast_list)
  		{
  			av_relation	   *ar = lfirst(cell2);
  
*************** do_autovacuum(void)
*** 1038,1047 ****
--- 1786,1841 ----
  		Oid		relid = lfirst_oid(cell);
  		autovac_table *tab;
  		char   *relname;
+ 		WorkerInfo	worker;
+ 		bool        skipit;
  
  		CHECK_FOR_INTERRUPTS();
  
  		/*
+ 		 * hold schedule lock from here until we're sure that this table
+ 		 * still needs vacuuming.  We also need the AutovacuumLock to walk
+ 		 * the worker array, but we'll let go of that one quickly.
+ 		 */
+ 		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ 		LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 
+ 		/*
+ 		 * Check whether the table is being vacuumed concurrently by another
+ 		 * worker.
+ 		 */
+ 		skipit = false;
+ 		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ 										   &AutoVacuumShmem->av_runningWorkers,
+ 										   offsetof(WorkerInfoData, wi_links));
+ 		while (worker)
+ 		{
+ 			/* ignore myself */
+ 			if (worker == MyWorkerInfo)
+ 				goto next_worker;
+ 
+ 			/* ignore workers in other databases */
+ 			if (worker->wi_dboid != MyDatabaseId)
+ 				goto next_worker;
+ 
+ 			if (worker->wi_tableoid == relid)
+ 			{
+ 				skipit = true;
+ 				break;
+ 			}
+ 
+ next_worker:
+ 			worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ 											   &worker->wi_links,
+ 											   offsetof(WorkerInfoData, wi_links));
+ 		}
+ 		LWLockRelease(AutovacuumLock);
+ 		if (skipit)
+ 		{
+ 			LWLockRelease(AutovacuumScheduleLock);
+ 			continue;
+ 		}
+ 
+ 		/*
  		 * Check whether pgstat data still says we need to vacuum this table.
  		 * It could have changed if something else processed the table while we
  		 * weren't looking.
*************** do_autovacuum(void)
*** 1053,1063 ****
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
  			continue;
  		}
- 		/* Ok, good to go! */
  
! 		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
  		VacuumCostLimit = tab->at_vacuum_cost_limit;
  
--- 1847,1864 ----
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
+ 			LWLockRelease(AutovacuumScheduleLock);
  			continue;
  		}
  
! 		/*
! 		 * Ok, good to go.  Store the table in shared memory before releasing
! 		 * the lock so that other workers don't vacuum it concurrently.
! 		 */
! 		MyWorkerInfo->wi_tableoid = relid;
! 		LWLockRelease(AutovacuumScheduleLock);
! 
! 		/* Set the initial vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
  		VacuumCostLimit = tab->at_vacuum_cost_limit;
  
*************** do_autovacuum(void)
*** 1067,1072 ****
--- 1868,1885 ----
  			 (tab->at_doanalyze ? " ANALYZE" : ""),
  			 relname);
  
+ 		/*
+ 		 * Advertise my cost delay parameters for the balancing algorithm, and
+ 		 * do a balance
+ 		 */
+ 		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 		MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
+ 		MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
+ 		MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
+ 		autovac_balance_cost();
+ 		LWLockRelease(AutovacuumLock);
+ 
+ 		/* have at it */
  		autovacuum_do_vac_analyze(tab->at_relid,
  								  tab->at_dovacuum,
  								  tab->at_doanalyze,
*************** table_recheck_autovac(Oid relid)
*** 1211,1217 ****
  	PgStat_StatDBEntry *shared;
  	PgStat_StatDBEntry *dbentry;
  
! 	/* We need fresh pgstat data for this */
  	pgstat_clear_snapshot();
  
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
--- 2024,2030 ----
  	PgStat_StatDBEntry *shared;
  	PgStat_StatDBEntry *dbentry;
  
! 	/* use fresh stats */
  	pgstat_clear_snapshot();
  
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
*************** table_recheck_autovac(Oid relid)
*** 1219,1226 ****
  
  	/* fetch the relation's relcache entry */
  	classTup = SearchSysCacheCopy(RELOID,
! 							  ObjectIdGetDatum(relid),
! 							  0, 0, 0);
  	if (!HeapTupleIsValid(classTup))
  		return NULL;
  	classForm = (Form_pg_class) GETSTRUCT(classTup);
--- 2032,2039 ----
  
  	/* fetch the relation's relcache entry */
  	classTup = SearchSysCacheCopy(RELOID,
! 								  ObjectIdGetDatum(relid),
! 								  0, 0, 0);
  	if (!HeapTupleIsValid(classTup))
  		return NULL;
  	classForm = (Form_pg_class) GETSTRUCT(classTup);
*************** IsAutoVacuumWorkerProcess(void)
*** 1630,1636 ****
  Size
  AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
  }
  
  /*
--- 2443,2458 ----
  Size
  AutoVacuumShmemSize(void)
  {
! 	Size	size;
! 
! 	/*
! 	 * Need the fixed struct and the array of WorkerInfoData.
! 	 */
! 	size = sizeof(AutoVacuumShmemStruct);
! 	size = MAXALIGN(size);
! 	size = add_size(size, mul_size(autovacuum_max_workers,
! 								   sizeof(WorkerInfoData)));
! 	return size;
  }
  
  /*
*************** AutoVacuumShmemInit(void)
*** 1650,1657 ****
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("not enough shared memory for autovacuum")));
- 	if (found)
- 		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
--- 2472,2500 ----
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("not enough shared memory for autovacuum")));
  
! 	if (!IsUnderPostmaster)
! 	{
! 		WorkerInfo	worker;
! 		int			i;
! 
! 		Assert(!found);
! 
! 		AutoVacuumShmem->av_launcherpid = 0;
! 		AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
! 		SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
! 		AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! 
! 		worker = (WorkerInfo) ((char *) AutoVacuumShmem +
! 							   MAXALIGN(sizeof(AutoVacuumShmemStruct)));
! 
! 		/* initialize the WorkerInfo free list */
! 		for (i = 0; i < autovacuum_max_workers; i++)
! 		{
! 			worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 			AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
! 		}
! 	}
! 	else
! 		Assert(found);
  }
Index: src/backend/storage/lmgr/proc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/proc.c,v
retrieving revision 1.187
diff -c -p -r1.187 proc.c
*** src/backend/storage/lmgr/proc.c	3 Apr 2007 16:34:36 -0000	1.187
--- src/backend/storage/lmgr/proc.c	12 Apr 2007 16:07:09 -0000
*************** ProcGlobalShmemSize(void)
*** 96,102 ****
  	size = add_size(size, sizeof(PROC_HDR));
  	/* AuxiliaryProcs */
  	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! 	/* MyProcs */
  	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
  	/* ProcStructLock */
  	size = add_size(size, sizeof(slock_t));
--- 96,102 ----
  	size = add_size(size, sizeof(PROC_HDR));
  	/* AuxiliaryProcs */
  	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! 	/* MyProcs, including autovacuum */
  	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
  	/* ProcStructLock */
  	size = add_size(size, sizeof(slock_t));
*************** ProcGlobalShmemSize(void)
*** 110,116 ****
  int
  ProcGlobalSemas(void)
  {
! 	/* We need a sema per backend, plus one for each auxiliary process. */
  	return MaxBackends + NUM_AUXILIARY_PROCS;
  }
  
--- 110,119 ----
  int
  ProcGlobalSemas(void)
  {
! 	/*
! 	 * We need a sema per backend (including autovacuum), plus one for each
! 	 * auxiliary process.
! 	 */
  	return MaxBackends + NUM_AUXILIARY_PROCS;
  }
  
*************** ProcGlobalSemas(void)
*** 127,134 ****
   *	  running out when trying to start another backend is a common failure.
   *	  So, now we grab enough semaphores to support the desired max number
   *	  of backends immediately at initialization --- if the sysadmin has set
!  *	  MaxBackends higher than his kernel will support, he'll find out sooner
!  *	  rather than later.
   *
   *	  Another reason for creating semaphores here is that the semaphore
   *	  implementation typically requires us to create semaphores in the
--- 130,137 ----
   *	  running out when trying to start another backend is a common failure.
   *	  So, now we grab enough semaphores to support the desired max number
   *	  of backends immediately at initialization --- if the sysadmin has set
!  *	  MaxConnections or autovacuum_max_workers higher than his kernel will
!  *	  support, he'll find out sooner rather than later.
   *
   *	  Another reason for creating semaphores here is that the semaphore
   *	  implementation typically requires us to create semaphores in the
*************** InitProcGlobal(void)
*** 163,187 ****
  	 * Initialize the data structures.
  	 */
  	ProcGlobal->freeProcs = INVALID_OFFSET;
  
  	ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
  
  	/*
  	 * Pre-create the PGPROC structures and create a semaphore for each.
  	 */
! 	procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
  	if (!procs)
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("out of shared memory")));
! 	MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
! 	for (i = 0; i < MaxBackends; i++)
  	{
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = ProcGlobal->freeProcs;
  		ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
  	}
  
  	MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
  	for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
  	{
--- 166,204 ----
  	 * Initialize the data structures.
  	 */
  	ProcGlobal->freeProcs = INVALID_OFFSET;
+ 	ProcGlobal->autovacFreeProcs = INVALID_OFFSET;
  
  	ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
  
  	/*
  	 * Pre-create the PGPROC structures and create a semaphore for each.
  	 */
! 	procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
  	if (!procs)
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("out of shared memory")));
! 	MemSet(procs, 0, MaxConnections * sizeof(PGPROC));
! 	for (i = 0; i < MaxConnections; i++)
  	{
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = ProcGlobal->freeProcs;
  		ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
  	}
  
+ 	procs = (PGPROC *) ShmemAlloc((autovacuum_max_workers) * sizeof(PGPROC));
+ 	if (!procs)
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_OUT_OF_MEMORY),
+ 				 errmsg("out of shared memory")));
+ 	MemSet(procs, 0, autovacuum_max_workers * sizeof(PGPROC));
+ 	for (i = 0; i < autovacuum_max_workers; i++)
+ 	{
+ 		PGSemaphoreCreate(&(procs[i].sem));
+ 		procs[i].links.next = ProcGlobal->autovacFreeProcs;
+ 		ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]);
+ 	}
+ 
  	MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
  	for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
  	{
*************** InitProcGlobal(void)
*** 200,209 ****
  void
  InitProcess(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile PROC_HDR *procglobal = ProcGlobal;
  	SHMEM_OFFSET myOffset;
- 	int			i;
  
  	/*
  	 * ProcGlobal should be set up already (if we are a backend, we inherit
--- 217,227 ----
  void
  InitProcess(void)
  {
+ 	int			i;
+ 
  	/* use volatile pointer to prevent code rearrangement */
  	volatile PROC_HDR *procglobal = ProcGlobal;
  	SHMEM_OFFSET myOffset;
  
  	/*
  	 * ProcGlobal should be set up already (if we are a backend, we inherit
*************** InitProcess(void)
*** 226,237 ****
  
  	set_spins_per_delay(procglobal->spins_per_delay);
  
! 	myOffset = procglobal->freeProcs;
  
  	if (myOffset != INVALID_OFFSET)
  	{
  		MyProc = (PGPROC *) MAKE_PTR(myOffset);
! 		procglobal->freeProcs = MyProc->links.next;
  		SpinLockRelease(ProcStructLock);
  	}
  	else
--- 244,261 ----
  
  	set_spins_per_delay(procglobal->spins_per_delay);
  
! 	if (IsAutoVacuumWorkerProcess())
! 		myOffset = procglobal->autovacFreeProcs;
! 	else
! 		myOffset = procglobal->freeProcs;
  
  	if (myOffset != INVALID_OFFSET)
  	{
  		MyProc = (PGPROC *) MAKE_PTR(myOffset);
! 		if (IsAutoVacuumWorkerProcess())
! 			procglobal->autovacFreeProcs = MyProc->links.next;
! 		else
! 			procglobal->freeProcs = MyProc->links.next;
  		SpinLockRelease(ProcStructLock);
  	}
  	else
*************** InitProcess(void)
*** 239,245 ****
  		/*
  		 * If we reach here, all the PGPROCs are in use.  This is one of the
  		 * possible places to detect "too many backends", so give the standard
! 		 * error message.
  		 */
  		SpinLockRelease(ProcStructLock);
  		ereport(FATAL,
--- 263,270 ----
  		/*
  		 * If we reach here, all the PGPROCs are in use.  This is one of the
  		 * possible places to detect "too many backends", so give the standard
! 		 * error message.  XXX do we need to give a different failure message
! 		 * in the autovacuum case?
  		 */
  		SpinLockRelease(ProcStructLock);
  		ereport(FATAL,
*************** ProcKill(int code, Datum arg)
*** 571,578 ****
  	SpinLockAcquire(ProcStructLock);
  
  	/* Return PGPROC structure (and semaphore) to freelist */
! 	MyProc->links.next = procglobal->freeProcs;
! 	procglobal->freeProcs = MAKE_OFFSET(MyProc);
  
  	/* PGPROC struct isn't mine anymore */
  	MyProc = NULL;
--- 596,611 ----
  	SpinLockAcquire(ProcStructLock);
  
  	/* Return PGPROC structure (and semaphore) to freelist */
! 	if (IsAutoVacuumWorkerProcess())
! 	{
! 		MyProc->links.next = procglobal->autovacFreeProcs;
! 		procglobal->autovacFreeProcs = MAKE_OFFSET(MyProc);
! 	}
! 	else
! 	{
! 		MyProc->links.next = procglobal->freeProcs;
! 		procglobal->freeProcs = MAKE_OFFSET(MyProc);
! 	}
  
  	/* PGPROC struct isn't mine anymore */
  	MyProc = NULL;
*************** ProcKill(int code, Datum arg)
*** 581,586 ****
--- 614,621 ----
  	procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay);
  
  	SpinLockRelease(ProcStructLock);
+ 
+ 	/* XXX need to notify the autovacuum launcher, in case this was a worker */
  }
  
  /*
Index: src/backend/utils/init/globals.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/init/globals.c,v
retrieving revision 1.100
diff -c -p -r1.100 globals.c
*** src/backend/utils/init/globals.c	5 Jan 2007 22:19:44 -0000	1.100
--- src/backend/utils/init/globals.c	10 Apr 2007 21:00:28 -0000
*************** bool		allowSystemTableMods = false;
*** 95,103 ****
  int			work_mem = 1024;
  int			maintenance_work_mem = 16384;
  
! /* Primary determinants of sizes of shared-memory structures: */
  int			NBuffers = 1000;
  int			MaxBackends = 100;
  
  int			VacuumCostPageHit = 1;		/* GUC parameters for vacuum */
  int			VacuumCostPageMiss = 10;
--- 95,108 ----
  int			work_mem = 1024;
  int			maintenance_work_mem = 16384;
  
! /*
!  * Primary determinants of sizes of shared-memory structures.  MaxBackends is
!  * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign
!  * hook):
!  */
  int			NBuffers = 1000;
  int			MaxBackends = 100;
+ int			MaxConnections = 90;
  
  int			VacuumCostPageHit = 1;		/* GUC parameters for vacuum */
  int			VacuumCostPageMiss = 10;
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.384
diff -c -p -r1.384 guc.c
*** src/backend/utils/misc/guc.c	12 Apr 2007 06:53:47 -0000	1.384
--- src/backend/utils/misc/guc.c	12 Apr 2007 16:22:33 -0000
*************** static bool assign_tcp_keepalives_count(
*** 163,168 ****
--- 163,170 ----
  static const char *show_tcp_keepalives_idle(void);
  static const char *show_tcp_keepalives_interval(void);
  static const char *show_tcp_keepalives_count(void);
+ static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
+ static bool assign_maxconnections(int newval, bool doit, GucSource source);
  
  /*
   * GUC option variables that are exported from this module
*************** static struct config_int ConfigureNamesI
*** 1149,1164 ****
  	 * number.
  	 *
  	 * MaxBackends is limited to INT_MAX/4 because some places compute
! 	 * 4*MaxBackends without any overflow check.  Likewise we have to limit
! 	 * NBuffers to INT_MAX/2.
  	 */
  	{
  		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
  			gettext_noop("Sets the maximum number of concurrent connections."),
  			NULL
  		},
! 		&MaxBackends,
! 		100, 1, INT_MAX / 4, NULL, NULL
  	},
  
  	{
--- 1151,1169 ----
  	 * number.
  	 *
  	 * MaxBackends is limited to INT_MAX/4 because some places compute
! 	 * 4*MaxBackends without any overflow check.  This check is made on
! 	 * assign_maxconnections, since MaxBackends is computed as MaxConnections +
! 	 * autovacuum_max_workers.
! 	 *
! 	 * Likewise we have to limit NBuffers to INT_MAX/2.
  	 */
  	{
  		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
  			gettext_noop("Sets the maximum number of concurrent connections."),
  			NULL
  		},
! 		&MaxConnections,
! 		100, 1, INT_MAX / 4, assign_maxconnections, NULL
  	},
  
  	{
*************** static struct config_int ConfigureNamesI
*** 1622,1627 ****
--- 1627,1641 ----
  		&autovacuum_freeze_max_age,
  		200000000, 100000000, 2000000000, NULL, NULL
  	},
+ 	{
+ 		/* see max_connections */
+ 		{"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ 			gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ 			NULL
+ 		},
+ 		&autovacuum_max_workers,
+ 		3, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL
+ 	},
  
  	{
  		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
*************** show_tcp_keepalives_count(void)
*** 6692,6696 ****
--- 6706,6737 ----
  	return nbuf;
  }
  
+ static bool
+ assign_maxconnections(int newval, bool doit, GucSource source)
+ {
+ 	if (doit)
+ 	{
+ 		if (newval + autovacuum_max_workers > INT_MAX / 4)
+ 			return false;
+ 
+ 		MaxBackends = newval + autovacuum_max_workers;
+ 	}
+ 
+ 	return true;
+ }
+ 
+ static bool
+ assign_autovacuum_max_workers(int newval, bool doit, GucSource source)
+ {
+ 	if (doit)
+ 	{
+ 		if (newval + MaxConnections > INT_MAX / 4)
+ 			return false;
+ 
+ 		MaxBackends = newval + MaxConnections;
+ 	}
+ 
+ 	return true;
+ }
  
  #include "guc-file.c"
Index: src/backend/utils/misc/postgresql.conf.sample
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/postgresql.conf.sample,v
retrieving revision 1.213
diff -c -p -r1.213 postgresql.conf.sample
*** src/backend/utils/misc/postgresql.conf.sample	19 Mar 2007 23:38:30 -0000	1.213
--- src/backend/utils/misc/postgresql.conf.sample	12 Apr 2007 16:19:11 -0000
***************
*** 376,381 ****
--- 376,382 ----
  #autovacuum = on			# enable autovacuum subprocess?
  					# 'on' requires stats_start_collector
  					# and stats_row_level to also be on
+ #autovacuum_max_workers = 3 # max # of autovacuum subprocesses
  #autovacuum_naptime = 1min		# time between autovacuum runs
  #autovacuum_vacuum_threshold = 500	# min # of tuple updates before
  					# vacuum
Index: src/include/miscadmin.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/miscadmin.h,v
retrieving revision 1.193
diff -c -p -r1.193 miscadmin.h
*** src/include/miscadmin.h	1 Mar 2007 14:52:04 -0000	1.193
--- src/include/miscadmin.h	10 Apr 2007 20:51:06 -0000
*************** extern DLLIMPORT char *DataDir;
*** 129,134 ****
--- 129,135 ----
  
  extern DLLIMPORT int NBuffers;
  extern int	MaxBackends;
+ extern int	MaxConnections;
  
  extern DLLIMPORT int MyProcPid;
  extern DLLIMPORT struct Port *MyProcPort;
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h	15 Feb 2007 23:23:23 -0000	1.8
--- src/include/postmaster/autovacuum.h	12 Apr 2007 16:02:58 -0000
***************
*** 14,21 ****
--- 14,24 ----
  #ifndef AUTOVACUUM_H
  #define AUTOVACUUM_H
  
+ #include "storage/lock.h"
+ 
  /* GUC variables */
  extern bool autovacuum_start_daemon;
+ extern int	autovacuum_max_workers;
  extern int	autovacuum_naptime;
  extern int	autovacuum_vac_thresh;
  extern double autovacuum_vac_scale;
*************** extern void autovac_init(void);
*** 35,40 ****
--- 38,46 ----
  extern int	StartAutoVacLauncher(void);
  extern int	StartAutoVacWorker(void);
  
+ /* autovacuum cost-delay balancer */
+ extern void AutoVacuumUpdateDelay(void);
+ 
  #ifdef EXEC_BACKEND
  extern void AutoVacLauncherMain(int argc, char *argv[]);
  extern void AutoVacWorkerMain(int argc, char *argv[]);
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.35
diff -c -p -r1.35 lwlock.h
*** src/include/storage/lwlock.h	3 Apr 2007 16:34:36 -0000	1.35
--- src/include/storage/lwlock.h	6 Apr 2007 02:20:17 -0000
*************** typedef enum LWLockId
*** 61,66 ****
--- 61,67 ----
  	BtreeVacuumLock,
  	AddinShmemInitLock,
  	AutovacuumLock,
+ 	AutovacuumScheduleLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
Index: src/include/storage/proc.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/proc.h,v
retrieving revision 1.97
diff -c -p -r1.97 proc.h
*** src/include/storage/proc.h	3 Apr 2007 16:34:36 -0000	1.97
--- src/include/storage/proc.h	12 Apr 2007 15:19:34 -0000
*************** typedef struct PROC_HDR
*** 115,120 ****
--- 115,122 ----
  {
  	/* Head of list of free PGPROC structures */
  	SHMEM_OFFSET freeProcs;
+ 	/* Head of list of autovacuum's free PGPROC structures */
+ 	SHMEM_OFFSET autovacFreeProcs;
  	/* Current shared estimate of appropriate spins_per_delay value */
  	int			spins_per_delay;
  } PROC_HDR;
