Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.349
diff -c -p -r1.349 vacuum.c
*** src/backend/commands/vacuum.c	14 Mar 2007 18:48:55 -0000	1.349
--- src/backend/commands/vacuum.c	11 Apr 2007 23:43:23 -0000
*************** vacuum_delay_point(void)
*** 3504,3509 ****
--- 3504,3512 ----
  
  		VacuumCostBalance = 0;
  
+ 		/* update balance values for workers */
+ 		AutoVacuumUpdateDelay();
+ 
  		/* Might have gotten an interrupt while sleeping */
  		CHECK_FOR_INTERRUPTS();
  	}
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.40
diff -c -p -r1.40 autovacuum.c
*** src/backend/postmaster/autovacuum.c	28 Mar 2007 22:17:12 -0000	1.40
--- src/backend/postmaster/autovacuum.c	11 Apr 2007 23:43:31 -0000
***************
*** 43,48 ****
--- 43,49 ----
  #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "storage/sinval.h"
+ #include "storage/spin.h"
  #include "tcop/tcopprot.h"
  #include "utils/flatfiles.h"
  #include "utils/fmgroids.h"
***************
*** 52,57 ****
--- 53,59 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 61,67 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
*************** int			autovacuum_freeze_max_age;
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 72,78 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flags to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
*************** static int	default_freeze_min_age;
*** 82,95 ****
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep list of candidate databases for vacuum */
! typedef struct autovac_dbase
  {
! 	Oid			ad_datid;
! 	char	   *ad_name;
! 	TransactionId ad_frozenxid;
! 	PgStat_StatDBEntry *ad_entry;
! } autovac_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
--- 85,106 ----
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
! /* struct to keep track of databases in launcher */
! typedef struct avl_dbase
  {
! 	Oid			adl_datid;			/* hash key -- must be first */
! 	TimestampTz	adl_next_worker;
! 	int			adl_score;
! } avl_dbase;
! 
! /* struct to keep track of databases in worker */
! typedef struct avw_dbase
! {
! 	Oid			adw_datid;
! 	char	   *adw_name;
! 	TransactionId adw_frozenxid;
! 	PgStat_StatDBEntry *adw_entry;
! } avw_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
*************** typedef struct autovac_table
*** 110,123 ****
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 121,195 ----
  	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_links		entry into free list or running list
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_launchtime Time this worker was launched
+  *
+  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+  * protected by AutovacuumScheduleLock (which is read-only for everyone except
+  * that worker itself).
+  *-------------
+  */
+ typedef struct WorkerInfoData
+ {
+ 	SHM_QUEUE	wi_links;
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	TimestampTz	wi_launchtime;
+ 	int			wi_cost_delay;
+ 	int			wi_cost_limit;
+ 	int			wi_cost_limit_base;
+ } WorkerInfoData;
+ 
+ typedef struct WorkerInfoData *WorkerInfo;
+ 
+ /* the spinlock protecting the PGPROC array */
+ NON_EXEC_STATIC slock_t *AutovacProcLock = NULL;
+ 
+ /*-------------
+  * The main autovacuum shmem struct.  On shared memory we store: 1) this main
+  * struct; 2) the array of WorkerInfo structs; 3) the array of PGPROCs.
+  *
+  * av_launcherpid	the PID of the autovacuum launcher
+  * av_freeProcs		the PGPROC freelist
+  * av_freeWorkers	the WorkerInfo freelist
+  * av_runningWorkers the WorkerInfo non-free queue
+  * av_startingWorker pointer to WorkerInfo currently being started (cleared by
+  *					the worker itself as soon as it's up and running)
+  * av_rebalance		true when a worker determines that cost limits must be
+  * 					rebalanced
+  *
+  * This struct is protected by AutovacuumLock, except for the PGPROC list which
+  * is protected by the AutovacProcLock spinlock.
+  *-------------
+  */
  typedef struct
  {
! 	pid_t			av_launcherpid;
! 	SHMEM_OFFSET	av_freeProcs;
! 	SHMEM_OFFSET	av_freeWorkers;
! 	SHM_QUEUE		av_runningWorkers;
! 	SHMEM_OFFSET	av_startingWorker;
! 	bool			av_rebalance;
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /* the database list in the launcher, and the context that contains it */
+ static Dllist *DatabaseList = NULL;
+ static MemoryContext DatabaseListCxt = NULL;
+ 
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ static WorkerInfo	MyWorkerInfo = NULL;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 125,133 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_start_worker(void);
  static void do_autovacuum(void);
! static List *autovac_get_database_list(void);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
--- 197,212 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(void);
! static uint64 launcher_determine_sleep(bool canlaunch);
! static void launch_worker(TimestampTz now);
! static List *get_database_list(void);
! static void rebuild_database_list(Oid newdb);
! static int db_comparator(const void *a, const void *b);
! static void autovac_balance_cost(void);
! 
  static void do_autovacuum(void);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  
  static void relation_check_autovac(Oid relid, Form_pg_class classForm,
  					   Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
*************** static void relation_needs_vacanalyze(Oi
*** 141,152 ****
  
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
- static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
--- 220,231 ----
  
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
  						  PgStat_StatDBEntry *shared,
  						  PgStat_StatDBEntry *dbentry);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
*************** StartAutoVacLauncher(void)
*** 230,241 ****
  
  /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
- 	MemoryContext	avlauncher_cxt;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 309,340 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * When the worker exits, its ProcKill routine (actually AutovacWorkerProcKill)
+  * is in charge of resetting the WorkerInfo entry and signalling the launcher.
+  * The launcher then wakes up and can launch a new worker if need be, or just
+  * go back to sleep.
+  *
+  * There is a potential problem if, for some reason, a worker starts and is not
+  * able to bootstrap itself correctly.  To prevent this situation from starving
+  * the whole system, the launcher checks the launch time of the "starting
+  * worker".  If it's too old (older than autovacuum_naptime seconds), it resets
+  * the worker entry and puts it back into the free list.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 264,272 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 363,368 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 276,282 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 372,378 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 300,311 ****
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! 										   "Autovacuum Launcher",
! 										   ALLOCSET_DEFAULT_MINSIZE,
! 										   ALLOCSET_DEFAULT_INITSIZE,
! 										   ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(avlauncher_cxt);
  
  
  	/*
--- 396,407 ----
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 										  "Autovacuum Launcher",
! 										  ALLOCSET_DEFAULT_MINSIZE,
! 										  ALLOCSET_DEFAULT_INITSIZE,
! 										  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  
  	/*
*************** AutoVacLauncherMain(int argc, char *argv
*** 336,346 ****
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(avlauncher_cxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(avlauncher_cxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
--- 432,442 ----
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
  
  		/* Make sure pgstat also considers our stat data as gone */
  		pgstat_clear_snapshot();
*************** AutoVacLauncherMain(int argc, char *argv
*** 361,378 ****
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
  	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 457,488 ----
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
+ 	/* must unblock signals before calling rebuild_database_list */
  	PG_SETMASK(&UnBlockSig);
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker();
+ 		proc_exit(0);		/* done */
+ 	}
+ 
+ 	AutoVacuumShmem->av_launcherpid = MyProcPid;
+ 
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time.  As soon as an entry is updated to
! 	 * a higher time, it will be moved to the front (which is correct because
! 	 * the only operation is to add autovacuum_naptime to the entry, and time
! 	 * always increases).
  	 */
! 	rebuild_database_list(InvalidOid);
  
  	for (;;)
  	{
! 		uint64		micros;
! 		bool	can_launch;
! 		TimestampTz current_time = 0;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 381,386 ****
--- 491,503 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
+ 										  INVALID_OFFSET);
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(micros);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
*************** AutoVacLauncherMain(int argc, char *argv
*** 388,469 ****
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
- 		worker_pid = AutoVacuumShmem->worker_pid;
- 		LWLockRelease(AutovacuumLock);
  
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
! 			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
! 		do_start_worker();
  
! sleep:
! 		/*
! 		 * in emergency mode, exit immediately so that the postmaster can
! 		 * request another run right away if needed.
! 		 *
! 		 * XXX -- maybe it would be better to handle this inside the launcher
! 		 * itself.
! 		 */
! 		if (!autovacuum_start_daemon)
! 			break;
  
! 		/* have pgstat read the file again next time */
! 		pgstat_clear_snapshot();
  
! 		/* now sleep until the next autovac iteration */
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
  
  	proc_exit(0);		/* done */
  }
  
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.
   */
! static void
  do_start_worker(void)
  {
  	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	ListCell *cell;
  	TransactionId xidForceLimit;
  
  	/* Get a list of databases */
! 	dblist = autovac_get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
--- 505,927 ----
  		{
  			got_SIGHUP = false;
  			ProcessConfigFile(PGC_SIGHUP);
+ 
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 			autovac_balance_cost();
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			got_SIGUSR1 = false;
+ 
+ 			/* rebalance cost limits, if needed */
+ 			if (AutoVacuumShmem->av_rebalance)
+ 			{
+ 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 				autovac_balance_cost();
+ 				LWLockRelease(AutovacuumLock);
+ 			}
  		}
  
  		/*
! 		 * There are some conditions that we need to check before trying to
! 		 * start a launcher.  First, we need to make sure that there is a
! 		 * launcher slot available.  Second, we need to make sure that no other
! 		 * worker is still starting up.
  		 */
+ 
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
  
! 		can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
! 
! 		if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
  		{
! 			long	secs;
! 			int		usecs;
! 			WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 
! 			if (current_time == 0)
! 				current_time = GetCurrentTimestamp();
! 
! 			/*
! 			 * We can't launch another worker when another one is still
! 			 * starting up, so just sleep for a bit more; that worker will wake
! 			 * us up again as soon as it's ready.  We will only wait
! 			 * autovacuum_naptime seconds for this to happen however.  Note
! 			 * that failure to connect to a particular database is not a
! 			 * problem here, because the worker removes itself from the
! 			 * startingWorker pointer before trying to connect; only low-level
! 			 * problems, like fork() failure, can get us here.
! 			 */
! 			TimestampDifference(worker->wi_launchtime, current_time,
! 								&secs, &usecs);
  
! 			/* ignore microseconds, as they cannot make any difference */
! 			if (secs > autovacuum_naptime)
  			{
+ 				LWLockRelease(AutovacuumLock);
+ 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  				/*
! 				 * No other process can put a worker in starting mode, so if
! 				 * startingWorker is still INVALID after exchanging our lock,
! 				 * we assume it's the same one we saw above (so we don't
! 				 * recheck the launch time).
  				 */
! 				if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
! 				{
! 					worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 					worker->wi_dboid = InvalidOid;
! 					worker->wi_tableoid = InvalidOid;
! 					worker->wi_workerpid = 0;
! 					worker->wi_launchtime = 0;
! 					worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 					AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
! 					AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! 				}
  			}
+ 			else
+ 				can_launch = false;
  		}
+ 		LWLockRelease(AutovacuumLock);		/* either shared or exclusive */
  
! 		if (can_launch)
! 		{
! 			Dlelem	   *elem;
  
! 			elem = DLGetTail(DatabaseList);
  
! 			if (current_time == 0)
! 				current_time = GetCurrentTimestamp();
  
! 			if (elem != NULL)
! 			{
! 				avl_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
! 
! 				/* do we have to start a worker? */
! 				if (secs <= 0 && usecs <= 0)
! 					launch_worker(current_time);
! 			}
! 			else
! 			{
! 				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).
! 				 */
! 				launch_worker(current_time);
! 			}
! 		}
  	}
  
  	/* Normal exit from the autovac launcher is here */
  	ereport(LOG,
  			(errmsg("autovacuum launcher shutting down")));
+ 	AutoVacuumShmem->av_launcherpid = 0;
  
  	proc_exit(0);		/* done */
  }
  
+ 
+ /*
+  * Determine the time to sleep, in microseconds, based on the database list.
+  *
+  * The "canlaunch" parameter indicates whether we can start a worker right now,
+  * for example due to the workers being all busy.
+  */
+ static uint64
+ launcher_determine_sleep(bool canlaunch)
+ {
+ 	long	secs;
+ 	int		usecs;
+ 	Dlelem *elem;
+ 
+ 	/*
+ 	 * We sleep until the next scheduled vacuum.  We trust that when the
+ 	 * database list was built, care was taken so that no entries have times in
+ 	 * the past; if the first entry has too close a next_worker value, or a
+ 	 * time in the past, we will sleep a small nominal time.
+ 	 */
+ 	if (!canlaunch)
+ 	{
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	else if ((elem = DLGetTail(DatabaseList)) != NULL)
+ 	{
+ 		avl_dbase  *avdb = DLE_VAL(elem);
+ 		TimestampTz	current_time = GetCurrentTimestamp();
+ 		TimestampTz	next_wakeup;
+ 
+ 		next_wakeup = avdb->adl_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 
+ 	/* 100ms is the smallest time we'll allow the launcher to sleep */
+ 	if (secs <= 0L && usecs <= 100000)
+ 	{
+ 		secs = 0L;
+ 		usecs = 100000;	/* 100 ms */
+ 	}
+ 
+ 	return secs * 1000000 + usecs;
+ }
+ 
+ /*
+  * Build an updated DatabaseList.  It must only contain databases that appear
+  * in pgstats, and must be sorted by next_worker from highest to lowest,
+  * distributed regularly across the next autovacuum_naptime interval.
+  *
+  * Receives the Oid of the database that made this list be generated (we call
+  * this the "new" database, because when the database was already present on
+  * the list, we expect that this function is not called at all).  The
+  * preexisting list, if any, will be used to preserve the order of the
+  * databases in the autovacuum_naptime period.  The new database is put at the
+  * end of the interval.  The actual values are not saved, which should not be
+  * much of a problem.
+  */
+ static void
+ rebuild_database_list(Oid newdb)
+ {
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	MemoryContext newcxt;
+ 	MemoryContext oldcxt;
+ 	MemoryContext tmpcxt;
+ 	HASHCTL		hctl;
+ 	int			score;
+ 	int			nelems;
+ 	HTAB	   *dbhash;
+ 
+ 	/* use fresh stats */
+ 	pgstat_clear_snapshot();
+ 
+ 	newcxt = AllocSetContextCreate(AutovacMemCxt,
+ 								   "AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	tmpcxt = AllocSetContextCreate(newcxt,
+ 								   "tmp AV dblist",
+ 								   ALLOCSET_DEFAULT_MINSIZE,
+ 								   ALLOCSET_DEFAULT_INITSIZE,
+ 								   ALLOCSET_DEFAULT_MAXSIZE);
+ 	oldcxt = MemoryContextSwitchTo(tmpcxt);
+ 
+ 	/*
+ 	 * Implementing this is not as simple as it sounds, because we need to put
+ 	 * the new database at the end of the list; next the databases that were
+ 	 * already on the list, and finally (at the tail of the list) all the other
+ 	 * databases that are not on the existing list.
+ 	 *
+ 	 * To do this, we build an empty hash table of scored databases.  We will
+ 	 * start with the lowest score (zero) for the new database, then increasing
+ 	 * scores for the databases in the existing list, in order, and lastly
+ 	 * increasing scores for all databases gotten via get_database_list() that
+ 	 * are not already on the hash.
+ 	 *
+ 	 * Then we will put all the hash elements into an array, sort the array by
+ 	 * score, and finally put the array elements into the new doubly linked
+ 	 * list.
+ 	 */
+ 	hctl.keysize = sizeof(Oid);
+ 	hctl.entrysize = sizeof(avl_dbase);
+ 	hctl.hash = oid_hash;
+ 	hctl.hcxt = tmpcxt;
+ 	dbhash = hash_create("db hash", 20, &hctl,	/* magic number here FIXME */
+ 						 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	/* start by inserting the new database */
+ 	score = 0;
+ 	if (OidIsValid(newdb))
+ 	{
+ 		avl_dbase	*db;
+ 		PgStat_StatDBEntry *entry;
+ 
+ 		/* only consider this database if it has a pgstat entry */
+ 		entry = pgstat_fetch_stat_dbentry(newdb);
+ 		if (entry != NULL)
+ 		{
+ 			/* we assume it isn't found because the hash was just created */
+ 			db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
+ 
+ 			/* hash_search already filled in the key */
+ 			db->adl_score = score++;
+ 			/* next_worker is filled in later */
+ 		}
+ 	}
+ 
+ 	/* Now insert the databases from the existing list */
+ 	if (DatabaseList != NULL)
+ 	{
+ 		Dlelem	*elem;
+ 
+ 		elem = DLGetHead(DatabaseList);
+ 		while (elem != NULL)
+ 		{
+ 			avl_dbase  *avdb = DLE_VAL(elem);
+ 			avl_dbase  *db;
+ 			bool		found;
+ 			PgStat_StatDBEntry *entry;
+ 
+ 			elem = DLGetSucc(elem);
+ 
+ 			/*
+ 			 * skip databases with no stat entries -- in particular, this
+ 			 * gets rid of dropped databases
+ 			 */
+ 			entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
+ 			if (entry == NULL)
+ 				continue;
+ 
+ 			db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
+ 
+ 			if (!found)
+ 			{
+ 				/* hash_search already filled in the key */
+ 				db->adl_score = score++;
+ 				/* next_worker is filled in later */
+ 			}
+ 		}
+ 	}
+ 
+ 	/* finally, insert all qualifying databases not previously inserted */
+ 	dblist = get_database_list();
+ 	foreach(cell, dblist)
+ 	{
+ 		avw_dbase  *avdb = lfirst(cell);
+ 		avl_dbase  *db;
+ 		bool		found;
+ 		PgStat_StatDBEntry *entry;
+ 
+ 		/* only consider databases with a pgstat entry */
+ 		entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
+ 		if (entry == NULL)
+ 			continue;
+ 
+ 		db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
+ 		/* only update the score if the database was not already on the hash */
+ 		if (!found)
+ 		{
+ 			/* hash_search already filled in the key */
+ 			db->adl_score = score++;
+ 			/* next_worker is filled in later */
+ 		}
+ 	}
+ 	nelems = score;
+ 
+ 	/* from here on, the allocated memory belongs to the new list */
+ 	MemoryContextSwitchTo(newcxt);
+ 	DatabaseList = DLNewList();
+ 
+ 	if (nelems > 0)
+ 	{
+ 		TimestampTz		current_time;
+ 		int				millis_increment;
+ 		avl_dbase	   *dbary;
+ 		avl_dbase	   *db;
+ 		HASH_SEQ_STATUS	seq;
+ 		int				i;
+ 
+ 		/* put all the hash elements into an array */
+ 		dbary = palloc(nelems * sizeof(avl_dbase));
+ 
+ 		i = 0;
+ 		hash_seq_init(&seq, dbhash);
+ 		while ((db = hash_seq_search(&seq)) != NULL)
+ 			memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
+ 
+ 		/* sort the array */
+ 		qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
+ 
+ 		/* this is the time interval between databases in the schedule */
+ 		millis_increment = 1000.0 * autovacuum_naptime / nelems;
+ 		current_time = GetCurrentTimestamp();
+ 
+ 		/*
+ 		 * move the elements from the array into the dllist, setting the 
+ 		 * next_worker while walking the array
+ 		 */
+ 		for (i = 0; i < nelems; i++)
+ 		{
+ 			avl_dbase  *db = &(dbary[i]);
+ 			Dlelem	   *elem;
+ 
+ 			current_time = TimestampTzPlusMilliseconds(current_time,
+ 													   millis_increment);
+ 			db->adl_next_worker = current_time;
+ 
+ 			elem = DLNewElem(db);
+ 			/* later elements should go closer to the head of the list */
+ 			DLAddHead(DatabaseList, elem);
+ 		}
+ 	}
+ 
+ 	/* all done, clean up memory */
+ 	if (DatabaseListCxt != NULL)
+ 		MemoryContextDelete(DatabaseListCxt);
+ 	MemoryContextDelete(tmpcxt);
+ 	DatabaseListCxt = newcxt;
+ 	MemoryContextSwitchTo(oldcxt);
+ }
+ 
+ /* qsort comparator for avl_dbase, using adl_score */
+ static int
+ db_comparator(const void *a, const void *b)
+ {
+ 	if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
+ 		return 0;
+ 	else
+ 		return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
+ }
+ 
  /*
   * do_start_worker
   *
   * Bare-bones procedure for starting an autovacuum worker from the launcher.
   * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.  It fails gracefully if invoked when
!  * autovacuum_workers are already active.
!  *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
! static Oid
  do_start_worker(void)
  {
  	List	   *dblist;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
+ 	bool		for_xid_wrap;
+ 	avw_dbase  *avdb;
+ 	TimestampTz	current_time;
+ 	bool		skipit = false;
+ 
+ 	/* return quickly when there are no free workers */
+ 	LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 	if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
+ 	{
+ 		LWLockRelease(AutovacuumLock);
+ 		return InvalidOid;
+ 	}
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	/* use fresh stats */
+ 	pgstat_clear_snapshot();
  
  	/* Get a list of databases */
! 	dblist = get_database_list();
  
  	/*
  	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
*************** do_start_worker(void)
*** 495,515 ****
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	db = NULL;
  	for_xid_wrap = false;
  	foreach(cell, dblist)
  	{
! 		autovac_dbase *tmp = lfirst(cell);
  
  		/* Find pgstat entry if any */
! 		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
  
  		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
  		{
! 			if (db == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
! 				db = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
--- 953,975 ----
  	 * isn't clear how to construct a metric that measures that and not cause
  	 * starvation for less busy databases.
  	 */
! 	avdb = NULL;
  	for_xid_wrap = false;
+ 	current_time = GetCurrentTimestamp();
  	foreach(cell, dblist)
  	{
! 		avw_dbase  *tmp = lfirst(cell);
! 		Dlelem	   *elem;
  
  		/* Find pgstat entry if any */
! 		tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
  
  		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
  		{
! 			if (avdb == NULL ||
! 				TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
! 				avdb = tmp;
  			for_xid_wrap = true;
  			continue;
  		}
*************** do_start_worker(void)
*** 520,545 ****
  		 * Otherwise, skip a database with no pgstat entry; it means it
  		 * hasn't seen any activity.
  		 */
! 		if (!tmp->ad_entry)
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (db == NULL ||
! 			tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
! 			db = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (db != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		AutoVacuumShmem->process_db = db->ad_datid;
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
  	}
  }
  
--- 980,1135 ----
  		 * Otherwise, skip a database with no pgstat entry; it means it
  		 * hasn't seen any activity.
  		 */
! 		if (!tmp->adw_entry)
! 			continue;
! 
! 		/*
! 		 * Also, skip a database that appears on the database list as having
! 		 * been processed recently (less than autovacuum_naptime seconds ago).
! 		 * We do this so that we don't select a database which we just
! 		 * selected, but that pgstat hasn't gotten around to updating the last
! 		 * autovacuum time yet.
! 		 */
! 		skipit = false;
! 		elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
! 
! 		while (elem != NULL)
! 		{
! 			avl_dbase *dbp = DLE_VAL(elem);
! 
! 			if (dbp->adl_datid == tmp->adw_datid)
! 			{
! 				TimestampTz		curr_plus_naptime;
! 				TimestampTz		next = dbp->adl_next_worker;
! 				
! 				curr_plus_naptime =
! 					TimestampTzPlusMilliseconds(current_time,
! 												autovacuum_naptime * 1000);
! 
! 				/*
! 				 * What we want here if to skip if next_worker falls between
! 				 * the current time and the current time plus naptime.
! 				 */
! 				if (timestamp_cmp_internal(current_time, next) > 0)
! 					skipit = false;
! 				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! 					skipit = false;
! 				else
! 					skipit = true;
! 
! 				break;
! 			}
! 			elem = DLGetPred(elem);
! 		}
! 		if (skipit)
  			continue;
  
  		/*
  		 * Remember the db with oldest autovac time.  (If we are here,
  		 * both tmp->entry and db->entry must be non-null.)
  		 */
! 		if (avdb == NULL ||
! 			tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
! 			avdb = tmp;
  	}
  
  	/* Found a database -- process it */
! 	if (avdb != NULL)
  	{
+ 		WorkerInfo	worker;
+ 		SHMEM_OFFSET sworker;
+ 
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 		/*
! 		 * Get a worker entry from the freelist.  We checked above, so there
! 		 * really should be a free slot -- complain very loudly if it isn't.
! 		 */
! 		sworker = AutoVacuumShmem->av_freeWorkers;
! 		if (sworker == INVALID_OFFSET)
! 			elog(FATAL, "no free worker found");
! 
! 		worker = (WorkerInfo) MAKE_PTR(sworker);
! 		AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
! 
! 		worker->wi_dboid = avdb->adw_datid;
! 		worker->wi_workerpid = 0;
! 		worker->wi_launchtime = GetCurrentTimestamp();
! 
! 		AutoVacuumShmem->av_startingWorker = sworker;
! 
  		LWLockRelease(AutovacuumLock);
  
  		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+ 
+ 		return avdb->adw_datid;
+ 	}
+ 	else if (skipit)
+ 	{
+ 		/*
+ 		 * If we skipped all databases on the list, rebuild it, because it
+ 		 * probably contains a dropped database.
+ 		 */
+ 		rebuild_database_list(InvalidOid);
+ 	}
+ 
+ 	return InvalidOid;
+ }
+ 
+ /*
+  * launch_worker
+  *
+  * Wrapper for starting a worker from the launcher.  Besides actually starting
+  * it, update the database list to reflect the next time that another one will
+  * need to be started on the selected database.  The actual database choice is
+  * left to do_start_worker.
+  *
+  * This routine is also expected to insert an entry into the database list if
+  * the selected database was previously absent from the list.  It returns the
+  * new database list.
+  */
+ static void
+ launch_worker(TimestampTz now)
+ {
+ 	Oid		dbid;
+ 	Dlelem *elem;
+ 
+ 	dbid = do_start_worker();
+ 	if (OidIsValid(dbid))
+ 	{
+ 		/*
+ 		 * Walk the database list and update the corresponding entry.  If the
+ 		 * database is not on the list, we'll recreate the list.
+ 		 */
+ 		elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
+ 		while (elem != NULL)
+ 		{
+ 			avl_dbase *avdb = DLE_VAL(elem);
+ 
+ 			if (avdb->adl_datid == dbid)
+ 			{
+ 				/*
+ 				 * add autovacuum_naptime seconds to the current time, and use
+ 				 * that as the new "next_worker" field for this database.
+ 				 */
+ 				avdb->adl_next_worker =
+ 					TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+ 
+ 				DLMoveToFront(elem);
+ 				break;
+ 			}
+ 			elem = DLGetSucc(elem);
+ 		}
+ 
+ 		/*
+ 		 * If the database was not present in the database list, we rebuild the
+ 		 * list.  It's possible that the database does not get into the list
+ 		 * anyway, for example if it's a database that doesn't have a pgstat
+ 		 * entry, but this is not a problem because we don't want to schedule
+ 		 * workers regularly into those in any case.
+ 		 */
+ 		if (elem == NULL)
+ 			rebuild_database_list(dbid);
  	}
  }
  
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 550,555 ****
--- 1140,1152 ----
  	got_SIGHUP = true;
  }
  
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ 	got_SIGUSR1 = true;
+ }
+ 
  static void
  avlauncher_shutdown(SIGNAL_ARGS)
  {
*************** NON_EXEC_STATIC void
*** 665,671 ****
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 1262,1268 ----
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid = InvalidOid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 744,751 ****
  
  		/*
  		 * We can now go away.	Note that because we called InitProcess, a
! 		 * callback was registered to do ProcKill, which will clean up
! 		 * necessary state.
  		 */
  		proc_exit(0);
  	}
--- 1341,1348 ----
  
  		/*
  		 * We can now go away.	Note that because we called InitProcess, a
! 		 * callback was registered to do AutovacWorkerProcKill, which will
! 		 * clean up necessary state.
  		 */
  		proc_exit(0);
  	}
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 763,780 ****
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
! 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
  
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
  	{
  		char	*dbname;
--- 1360,1393 ----
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Force statement_timeout to zero to avoid a timeout setting from
! 	 * preventing regular maintenance from being executed.
  	 */
! 	SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
  
! 	/*
! 	 * Get the info about the database we're going to work on.
! 	 */
! 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
! 	dbid = MyWorkerInfo->wi_dboid;
! 	MyWorkerInfo->wi_workerpid = MyProcPid;
! 
! 	/* insert into the running list */
! 	SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, 
! 						 &MyWorkerInfo->wi_links);
! 	/*
! 	 * remove from the "starting" pointer, so that the launcher can start a new
! 	 * worker if required
! 	 */
! 	AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
  
  	LWLockRelease(AutovacuumLock);
  
+ 	/* wake up the launcher */
+ 	if (AutoVacuumShmem->av_launcherpid != 0)
+ 		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
+ 
  	if (OidIsValid(dbid))
  	{
  		char	*dbname;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 803,809 ****
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 											  "Autovacuum context",
  											  ALLOCSET_DEFAULT_MINSIZE,
  											  ALLOCSET_DEFAULT_INITSIZE,
  											  ALLOCSET_DEFAULT_MAXSIZE);
--- 1416,1422 ----
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 											  "AV worker",
  											  ALLOCSET_DEFAULT_MINSIZE,
  											  ALLOCSET_DEFAULT_INITSIZE,
  											  ALLOCSET_DEFAULT_MAXSIZE);
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 813,838 ****
  		do_autovacuum();
  	}
  
! 	/*
! 	 * Now remove our PID from shared memory, so that the launcher can start
! 	 * another worker as soon as appropriate.
! 	 */
! 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	AutoVacuumShmem->worker_pid = 0;
! 	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
  	proc_exit(0);
  }
  
  /*
!  * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! autovac_get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
--- 1426,1605 ----
  		do_autovacuum();
  	}
  
! 	/* AutovacWorkerProcKill cleans up and notifies the launcher. */
  
  	/* All done, go away */
  	proc_exit(0);
  }
  
  /*
!  * Find and return an unused PGPROC entry to set up, for the new worker
!  * process.
!  */
! PGPROC *
! AutoVacuumGetFreeProc()
! {
! 	PGPROC   *proc;
! 	volatile SHMEM_OFFSET	freeprocs;
! 
! 	SpinLockAcquire(AutovacProcLock);
! 
! 	freeprocs = AutoVacuumShmem->av_freeProcs;
! 
! 	/*
! 	 * This shouldn't happen, because we check the number of workers we launch
! 	 */
! 	if (freeprocs == INVALID_OFFSET)
! 		elog(FATAL, "too many autovacuum workers already");
! 
! 	/* get a free PGPROC slot from the freelist */
! 	proc = (PGPROC *) MAKE_PTR(freeprocs);
! 	AutoVacuumShmem->av_freeProcs = proc->links.next;
! 
! 	SpinLockRelease(AutovacProcLock);
! 
! 	return proc;
! }
! 
! /*
!  * on_shmem_exit hook for returning the PGPROC struct into the freelist and
!  * releasing any held LWLocks.
!  */
! void
! AutovacWorkerProcKill(int code, Datum arg)
! {
! 	volatile SHMEM_OFFSET	freeprocs;
! 
! 	Assert(MyProc != NULL);
! 
! 	LWLockReleaseAll();
! 
! 	/*
! 	 * Remove the WorkerInfo entry from the running list, and put it back into
! 	 * the free list.
! 	 */
! 	if (MyWorkerInfo != NULL)
! 	{
! 		/*
! 		 * XXX Acquiring a lwlock just after LWLockReleaseAll seems a bit
! 		 * contradictory.  Is it a problem?
! 		 */
! 		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 		SHMQueueDelete(&MyWorkerInfo->wi_links);
! 		MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 		AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
! 		AutoVacuumShmem->av_rebalance = true;
! 		MyWorkerInfo = NULL;
! 
! 		LWLockRelease(AutovacuumLock);
! 	}
! 
! 	/* Put my PGPROC back into the freelist */
! 	SpinLockAcquire(AutovacProcLock);
! 
! 	freeprocs = AutoVacuumShmem->av_freeProcs;
! 	MyProc->links.next = freeprocs;
! 	AutoVacuumShmem->av_freeProcs = MAKE_OFFSET(MyProc);
! 
! 	SpinLockRelease(AutovacProcLock);
! 
! 	/* It isn't mine anymore */
! 	MyProc = NULL;
! }
! 
! /*
!  * Update the cost-based delay parameters, so that multiple workers consume
!  * each a fraction of the total available I/O.
!  */
! void
! AutoVacuumUpdateDelay(void)
! {
! 	if (MyWorkerInfo)
! 	{
! 		VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
! 		VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
! 	}
! }
! 
! /*
!  * autovac_balance_cost
!  *		Recalculate the cost limit setting for each active workers.
!  *
!  * Caller must hold the AutovacuumLock in exclusive mode.
!  */
! static void
! autovac_balance_cost(void)
! {
! 	WorkerInfo	worker;
! 	int         vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
! 								  autovacuum_vac_cost_limit : VacuumCostLimit);
! 	int         vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
! 								  autovacuum_vac_cost_delay : VacuumCostDelay);
! 	double      cost_total;
! 	double      cost_avail;
! 
! 	/* not set? nothing to do */
! 	if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
! 		return;
! 
! 	/* caculate the total base cost limit of active workers */
! 	cost_total = 0.0;
! 	worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 									   &AutoVacuumShmem->av_runningWorkers,
! 									   offsetof(WorkerInfoData, wi_links));
! 	while (worker)
! 	{
! 		if (worker->wi_workerpid != 0 &&
! 			worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! 			cost_total +=
! 				(double) worker->wi_cost_limit_base / worker->wi_cost_delay;
! 
! 		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 										   &worker->wi_links,
! 										   offsetof(WorkerInfoData, wi_links));
! 	}
! 	/* there are no cost limits -- nothing to do */
! 	if (cost_total <= 0)
! 		return;
! 
! 	/*
! 	 * Adjust each cost limit of active workers to balance the total of
! 	 * cost limit to autovacuum_vacuum_cost_limit.
! 	 */
! 	cost_avail = (double) vac_cost_limit / vac_cost_delay;
! 	worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 									   &AutoVacuumShmem->av_runningWorkers,
! 									   offsetof(WorkerInfoData, wi_links));
! 	while (worker)
! 	{
! 		if (worker->wi_workerpid != 0 &&
! 			worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
! 		{
! 			int     limit = (int)
! 				(cost_avail * worker->wi_cost_limit_base / cost_total);
! 
! 			worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
! 
! 			elog(DEBUG1, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
! 				 worker->wi_workerpid, worker->wi_dboid,
! 				 worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
! 		}
! 
! 		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
! 										   &worker->wi_links,
! 										   offsetof(WorkerInfoData, wi_links));
! 	}
! }
! 
! /*
!  * get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
   *		because we aren't connected; we use the flat database file.
   */
  static List *
! get_database_list(void)
  {
  	char	   *filename;
  	List	   *dblist = NIL;
*************** autovac_get_database_list(void)
*** 852,866 ****
  	while (read_pg_database_line(db_file, thisname, &db_id,
  								 &db_tablespace, &db_frozenxid))
  	{
! 		autovac_dbase *avdb;
  
! 		avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
  
! 		avdb->ad_datid = db_id;
! 		avdb->ad_name = pstrdup(thisname);
! 		avdb->ad_frozenxid = db_frozenxid;
  		/* this gets set later: */
! 		avdb->ad_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
  	}
--- 1619,1633 ----
  	while (read_pg_database_line(db_file, thisname, &db_id,
  								 &db_tablespace, &db_frozenxid))
  	{
! 		avw_dbase *avdb;
  
! 		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
! 		avdb->adw_datid = db_id;
! 		avdb->adw_name = pstrdup(thisname);
! 		avdb->adw_frozenxid = db_frozenxid;
  		/* this gets set later: */
! 		avdb->adw_entry = NULL;
  
  		dblist = lappend(dblist, avdb);
  	}
*************** do_autovacuum(void)
*** 1008,1019 ****
  	 * Add to the list of tables to vacuum, the OIDs of the tables that
  	 * correspond to the saved OIDs of toast tables needing vacuum.
  	 */
! 	foreach (cell, toast_oids)
  	{
  		Oid		toastoid = lfirst_oid(cell);
  		ListCell *cell2;
  
! 		foreach (cell2, table_toast_list)
  		{
  			av_relation	   *ar = lfirst(cell2);
  
--- 1775,1786 ----
  	 * Add to the list of tables to vacuum, the OIDs of the tables that
  	 * correspond to the saved OIDs of toast tables needing vacuum.
  	 */
! 	foreach(cell, toast_oids)
  	{
  		Oid		toastoid = lfirst_oid(cell);
  		ListCell *cell2;
  
! 		foreach(cell2, table_toast_list)
  		{
  			av_relation	   *ar = lfirst(cell2);
  
*************** do_autovacuum(void)
*** 1038,1047 ****
--- 1805,1860 ----
  		Oid		relid = lfirst_oid(cell);
  		autovac_table *tab;
  		char   *relname;
+ 		WorkerInfo	worker;
+ 		bool        skipit;
  
  		CHECK_FOR_INTERRUPTS();
  
  		/*
+ 		 * hold schedule lock from here until we're sure that this table
+ 		 * still needs vacuuming.  We also need the AutovacuumLock to walk
+ 		 * the worker array, but we'll let go of that one quickly.
+ 		 */
+ 		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ 		LWLockAcquire(AutovacuumLock, LW_SHARED);
+ 
+ 		/*
+ 		 * Check whether the table is being vacuumed concurrently by another
+ 		 * worker.
+ 		 */
+ 		skipit = false;
+ 		worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ 										   &AutoVacuumShmem->av_runningWorkers,
+ 										   offsetof(WorkerInfoData, wi_links));
+ 		while (worker)
+ 		{
+ 			/* ignore myself */
+ 			if (worker == MyWorkerInfo)
+ 				goto next_worker;
+ 
+ 			/* ignore workers in other databases */
+ 			if (worker->wi_dboid != MyDatabaseId)
+ 				goto next_worker;
+ 
+ 			if (worker->wi_tableoid == relid)
+ 			{
+ 				skipit = true;
+ 				break;
+ 			}
+ 
+ next_worker:
+ 			worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
+ 											   &worker->wi_links,
+ 											   offsetof(WorkerInfoData, wi_links));
+ 		}
+ 		LWLockRelease(AutovacuumLock);
+ 		if (skipit)
+ 		{
+ 			LWLockRelease(AutovacuumScheduleLock);
+ 			continue;
+ 		}
+ 
+ 		/*
  		 * Check whether pgstat data still says we need to vacuum this table.
  		 * It could have changed if something else processed the table while we
  		 * weren't looking.
*************** do_autovacuum(void)
*** 1053,1063 ****
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
  			continue;
  		}
- 		/* Ok, good to go! */
  
! 		/* Set the vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
  		VacuumCostLimit = tab->at_vacuum_cost_limit;
  
--- 1866,1883 ----
  		if (tab == NULL)
  		{
  			/* someone else vacuumed the table */
+ 			LWLockRelease(AutovacuumScheduleLock);
  			continue;
  		}
  
! 		/*
! 		 * Ok, good to go.  Store the table in shared memory before releasing
! 		 * the lock so that other workers don't vacuum it concurrently.
! 		 */
! 		MyWorkerInfo->wi_tableoid = relid;
! 		LWLockRelease(AutovacuumScheduleLock);
! 
! 		/* Set the initial vacuum cost parameters for this table */
  		VacuumCostDelay = tab->at_vacuum_cost_delay;
  		VacuumCostLimit = tab->at_vacuum_cost_limit;
  
*************** do_autovacuum(void)
*** 1067,1072 ****
--- 1887,1904 ----
  			 (tab->at_doanalyze ? " ANALYZE" : ""),
  			 relname);
  
+ 		/*
+ 		 * Advertise my cost delay parameters for the balancing algorithm, and
+ 		 * do a balance
+ 		 */
+ 		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 		MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
+ 		MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
+ 		MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
+ 		autovac_balance_cost();
+ 		LWLockRelease(AutovacuumLock);
+ 
+ 		/* have at it */
  		autovacuum_do_vac_analyze(tab->at_relid,
  								  tab->at_dovacuum,
  								  tab->at_doanalyze,
*************** table_recheck_autovac(Oid relid)
*** 1211,1217 ****
  	PgStat_StatDBEntry *shared;
  	PgStat_StatDBEntry *dbentry;
  
! 	/* We need fresh pgstat data for this */
  	pgstat_clear_snapshot();
  
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
--- 2043,2049 ----
  	PgStat_StatDBEntry *shared;
  	PgStat_StatDBEntry *dbentry;
  
! 	/* use fresh stats */
  	pgstat_clear_snapshot();
  
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
*************** table_recheck_autovac(Oid relid)
*** 1219,1226 ****
  
  	/* fetch the relation's relcache entry */
  	classTup = SearchSysCacheCopy(RELOID,
! 							  ObjectIdGetDatum(relid),
! 							  0, 0, 0);
  	if (!HeapTupleIsValid(classTup))
  		return NULL;
  	classForm = (Form_pg_class) GETSTRUCT(classTup);
--- 2051,2058 ----
  
  	/* fetch the relation's relcache entry */
  	classTup = SearchSysCacheCopy(RELOID,
! 								  ObjectIdGetDatum(relid),
! 								  0, 0, 0);
  	if (!HeapTupleIsValid(classTup))
  		return NULL;
  	classForm = (Form_pg_class) GETSTRUCT(classTup);
*************** IsAutoVacuumWorkerProcess(void)
*** 1630,1636 ****
  Size
  AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
  }
  
  /*
--- 2462,2482 ----
  Size
  AutoVacuumShmemSize(void)
  {
! 	Size	size;
! 
! 	/*
! 	 * Need the fixed struct, the array of WorkerInfoData, and the array
! 	 * of PGPROCs
! 	 */
! 	size = sizeof(AutoVacuumShmemStruct);
! 	size = MAXALIGN(size);
! 	size = add_size(size, mul_size(autovacuum_max_workers,
! 								   sizeof(WorkerInfoData)));
! 	size = MAXALIGN(size);
! 	size = add_size(size, mul_size(autovacuum_max_workers,
! 								   sizeof(PGPROC)));
! 
! 	return size;
  }
  
  /*
*************** AutoVacuumShmemInit(void)
*** 1650,1657 ****
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("not enough shared memory for autovacuum")));
- 	if (found)
- 		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
--- 2496,2548 ----
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("not enough shared memory for autovacuum")));
  
! 	if (!IsUnderPostmaster)
! 	{
! 		WorkerInfo	worker;
! 		PGPROC	   *proc;
! 		int			i;
! 
! 		Assert(!found);
! 
! 		/* initialize our spinlock */
! 		AutovacProcLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
! 		SpinLockInit(AutovacProcLock);
! 
! 		AutoVacuumShmem->av_launcherpid = 0;
! 		AutoVacuumShmem->av_freeProcs = INVALID_OFFSET;
! 		AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
! 		SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
! 		AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
! 
! 		worker = (WorkerInfo) ((char *) AutoVacuumShmem +
! 							   MAXALIGN(sizeof(AutoVacuumShmemStruct)));
! 
! 		/* initialize the WorkerInfo free list */
! 		for (i = 0; i < autovacuum_max_workers; i++)
! 		{
! 			worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
! 			AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
! 		}
! 		
! 		proc = (PGPROC *)
! 			((char *) worker +
! 			 MAXALIGN(sizeof(WorkerInfoData) * autovacuum_max_workers));
! 
! 		/* initialize the PGPROC free list and semaphores */
! 		for (i = 0; i < autovacuum_max_workers; i++)
! 		{
! 			proc[i].links.next = AutoVacuumShmem->av_freeProcs;
! 			PGSemaphoreCreate(&proc[i].sem);
! 			AutoVacuumShmem->av_freeProcs = MAKE_OFFSET(&proc[i]);
! 		}
! 	}
! 	else
! 		Assert(found);
! }
! 
! int
! AutoVacuumSemas(void)
! {
! 	return autovacuum_max_workers;
  }
Index: src/backend/postmaster/postmaster.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/postmaster.c,v
retrieving revision 1.527
diff -c -p -r1.527 postmaster.c
*** src/backend/postmaster/postmaster.c	22 Mar 2007 19:53:30 -0000	1.527
--- src/backend/postmaster/postmaster.c	10 Apr 2007 20:31:40 -0000
*************** typedef struct
*** 327,332 ****
--- 327,333 ----
  	Backend    *ShmemBackendArray;
  	LWLock	   *LWLockArray;
  	slock_t    *ProcStructLock;
+ 	slock_t	   *AutovacProcLock;
  	PROC_HDR   *ProcGlobal;
  	PGPROC	   *AuxiliaryProcs;
  	InheritableSocket pgStatSock;
*************** CreateOptsFile(int argc, char *argv[], c
*** 3899,3904 ****
--- 3900,3906 ----
  extern slock_t *ShmemLock;
  extern LWLock *LWLockArray;
  extern slock_t *ProcStructLock;
+ extern slock_t *AutovacProcLock;
  extern PROC_HDR *ProcGlobal;
  extern PGPROC *AuxiliaryProcs;
  extern int	pgStatSock;
*************** save_backend_variables(BackendParameters
*** 3942,3947 ****
--- 3944,3950 ----
  
  	param->LWLockArray = LWLockArray;
  	param->ProcStructLock = ProcStructLock;
+ 	param->AutovacProcLock = AutovacProcLock;
  	param->ProcGlobal = ProcGlobal;
  	param->AuxiliaryProcs = AuxiliaryProcs;
  	write_inheritable_socket(&param->pgStatSock, pgStatSock, childPid);
*************** restore_backend_variables(BackendParamet
*** 4145,4150 ****
--- 4148,4154 ----
  
  	LWLockArray = param->LWLockArray;
  	ProcStructLock = param->ProcStructLock;
+ 	AutovacProcLock = param->AutovacProcLock;
  	ProcGlobal = param->ProcGlobal;
  	AuxiliaryProcs = param->AuxiliaryProcs;
  	read_inheritable_socket(&pgStatSock, &param->pgStatSock);
Index: src/backend/storage/ipc/ipci.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/ipci.c,v
retrieving revision 1.91
diff -c -p -r1.91 ipci.c
*** src/backend/storage/ipc/ipci.c	15 Feb 2007 23:23:23 -0000	1.91
--- src/backend/storage/ipc/ipci.c	9 Apr 2007 16:07:08 -0000
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 138,143 ****
--- 138,144 ----
  		 */
  		numSemas = ProcGlobalSemas();
  		numSemas += SpinlockSemas();
+ 		numSemas += AutoVacuumSemas();
  		PGReserveSemaphores(numSemas, port);
  	}
  	else
Index: src/backend/storage/lmgr/proc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/proc.c,v
retrieving revision 1.187
diff -c -p -r1.187 proc.c
*** src/backend/storage/lmgr/proc.c	3 Apr 2007 16:34:36 -0000	1.187
--- src/backend/storage/lmgr/proc.c	10 Apr 2007 20:50:14 -0000
*************** ProcGlobalShmemSize(void)
*** 96,103 ****
  	size = add_size(size, sizeof(PROC_HDR));
  	/* AuxiliaryProcs */
  	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! 	/* MyProcs */
! 	size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
  	/* ProcStructLock */
  	size = add_size(size, sizeof(slock_t));
  
--- 96,106 ----
  	size = add_size(size, sizeof(PROC_HDR));
  	/* AuxiliaryProcs */
  	size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
! 	/*
! 	 * MyProcs.  Note we use MaxConnections here instead of MaxBackends,
! 	 * because the autovacuum PGPROC structs are reserved by autovacuum itself.
! 	 */
! 	size = add_size(size, mul_size(MaxConnections, sizeof(PGPROC)));
  	/* ProcStructLock */
  	size = add_size(size, sizeof(slock_t));
  
*************** ProcGlobalShmemSize(void)
*** 110,117 ****
  int
  ProcGlobalSemas(void)
  {
! 	/* We need a sema per backend, plus one for each auxiliary process. */
! 	return MaxBackends + NUM_AUXILIARY_PROCS;
  }
  
  /*
--- 113,123 ----
  int
  ProcGlobalSemas(void)
  {
! 	/*
! 	 * We need a sema per backend (but don't count autovacuum), plus one for
! 	 * each auxiliary process.
! 	 */
! 	return MaxConnections + NUM_AUXILIARY_PROCS;
  }
  
  /*
*************** ProcGlobalSemas(void)
*** 127,133 ****
   *	  running out when trying to start another backend is a common failure.
   *	  So, now we grab enough semaphores to support the desired max number
   *	  of backends immediately at initialization --- if the sysadmin has set
!  *	  MaxBackends higher than his kernel will support, he'll find out sooner
   *	  rather than later.
   *
   *	  Another reason for creating semaphores here is that the semaphore
--- 133,139 ----
   *	  running out when trying to start another backend is a common failure.
   *	  So, now we grab enough semaphores to support the desired max number
   *	  of backends immediately at initialization --- if the sysadmin has set
!  *	  MaxConnections higher than his kernel will support, he'll find out sooner
   *	  rather than later.
   *
   *	  Another reason for creating semaphores here is that the semaphore
*************** InitProcGlobal(void)
*** 169,181 ****
  	/*
  	 * Pre-create the PGPROC structures and create a semaphore for each.
  	 */
! 	procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
  	if (!procs)
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("out of shared memory")));
! 	MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
! 	for (i = 0; i < MaxBackends; i++)
  	{
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = ProcGlobal->freeProcs;
--- 175,187 ----
  	/*
  	 * Pre-create the PGPROC structures and create a semaphore for each.
  	 */
! 	procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
  	if (!procs)
  		ereport(FATAL,
  				(errcode(ERRCODE_OUT_OF_MEMORY),
  				 errmsg("out of shared memory")));
! 	MemSet(procs, 0, MaxConnections * sizeof(PGPROC));
! 	for (i = 0; i < MaxConnections; i++)
  	{
  		PGSemaphoreCreate(&(procs[i].sem));
  		procs[i].links.next = ProcGlobal->freeProcs;
*************** InitProcGlobal(void)
*** 200,250 ****
  void
  InitProcess(void)
  {
- 	/* use volatile pointer to prevent code rearrangement */
- 	volatile PROC_HDR *procglobal = ProcGlobal;
- 	SHMEM_OFFSET myOffset;
  	int			i;
  
! 	/*
! 	 * ProcGlobal should be set up already (if we are a backend, we inherit
! 	 * this by fork() or EXEC_BACKEND mechanism from the postmaster).
! 	 */
! 	if (procglobal == NULL)
! 		elog(PANIC, "proc header uninitialized");
! 
! 	if (MyProc != NULL)
! 		elog(ERROR, "you already exist");
! 
! 	/*
! 	 * Try to get a proc struct from the free list.  If this fails, we must be
! 	 * out of PGPROC structures (not to mention semaphores).
! 	 *
! 	 * While we are holding the ProcStructLock, also copy the current shared
! 	 * estimate of spins_per_delay to local storage.
! 	 */
! 	SpinLockAcquire(ProcStructLock);
  
! 	set_spins_per_delay(procglobal->spins_per_delay);
  
! 	myOffset = procglobal->freeProcs;
  
- 	if (myOffset != INVALID_OFFSET)
- 	{
- 		MyProc = (PGPROC *) MAKE_PTR(myOffset);
- 		procglobal->freeProcs = MyProc->links.next;
- 		SpinLockRelease(ProcStructLock);
- 	}
- 	else
- 	{
  		/*
! 		 * If we reach here, all the PGPROCs are in use.  This is one of the
! 		 * possible places to detect "too many backends", so give the standard
! 		 * error message.
  		 */
! 		SpinLockRelease(ProcStructLock);
! 		ereport(FATAL,
! 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
! 				 errmsg("sorry, too many clients already")));
  	}
  
  	/*
--- 206,262 ----
  void
  InitProcess(void)
  {
  	int			i;
  
! 	if (IsAutoVacuumWorkerProcess())
! 		MyProc = AutoVacuumGetFreeProc();
! 	else
! 	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile PROC_HDR *procglobal = ProcGlobal;
! 		SHMEM_OFFSET myOffset;
  
! 		/*
! 		 * ProcGlobal should be set up already (if we are a backend, we inherit
! 		 * this by fork() or EXEC_BACKEND mechanism from the postmaster).
! 		 */
! 		if (procglobal == NULL)
! 			elog(PANIC, "proc header uninitialized");
  
! 		if (MyProc != NULL)
! 			elog(ERROR, "you already exist");
  
  		/*
! 		 * Try to get a proc struct from the free list.  If this fails, we must be
! 		 * out of PGPROC structures (not to mention semaphores).
! 		 *
! 		 * While we are holding the ProcStructLock, also copy the current shared
! 		 * estimate of spins_per_delay to local storage.
  		 */
! 		SpinLockAcquire(ProcStructLock);
! 
! 		set_spins_per_delay(procglobal->spins_per_delay);
! 
! 		myOffset = procglobal->freeProcs;
! 
! 		if (myOffset != INVALID_OFFSET)
! 		{
! 			MyProc = (PGPROC *) MAKE_PTR(myOffset);
! 			procglobal->freeProcs = MyProc->links.next;
! 			SpinLockRelease(ProcStructLock);
! 		}
! 		else
! 		{
! 			/*
! 			 * If we reach here, all the PGPROCs are in use.  This is one of the
! 			 * possible places to detect "too many backends", so give the standard
! 			 * error message.
! 			 */
! 			SpinLockRelease(ProcStructLock);
! 			ereport(FATAL,
! 					(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
! 					 errmsg("sorry, too many clients already")));
! 		}
  	}
  
  	/*
*************** InitProcess(void)
*** 280,286 ****
  	/*
  	 * Arrange to clean up at backend exit.
  	 */
! 	on_shmem_exit(ProcKill, 0);
  
  	/*
  	 * Now that we have a PGPROC, we could try to acquire locks, so initialize
--- 292,301 ----
  	/*
  	 * Arrange to clean up at backend exit.
  	 */
! 	if (IsAutoVacuumWorkerProcess())
! 		on_shmem_exit(AutovacWorkerProcKill, 0);
! 	else
! 		on_shmem_exit(ProcKill, 0);
  
  	/*
  	 * Now that we have a PGPROC, we could try to acquire locks, so initialize
Index: src/backend/utils/init/globals.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/init/globals.c,v
retrieving revision 1.100
diff -c -p -r1.100 globals.c
*** src/backend/utils/init/globals.c	5 Jan 2007 22:19:44 -0000	1.100
--- src/backend/utils/init/globals.c	10 Apr 2007 21:00:28 -0000
*************** bool		allowSystemTableMods = false;
*** 95,103 ****
  int			work_mem = 1024;
  int			maintenance_work_mem = 16384;
  
! /* Primary determinants of sizes of shared-memory structures: */
  int			NBuffers = 1000;
  int			MaxBackends = 100;
  
  int			VacuumCostPageHit = 1;		/* GUC parameters for vacuum */
  int			VacuumCostPageMiss = 10;
--- 95,108 ----
  int			work_mem = 1024;
  int			maintenance_work_mem = 16384;
  
! /*
!  * Primary determinants of sizes of shared-memory structures.  MaxBackends is
!  * MaxConnections + autovacuum_max_workers (it is computed by the GUC assign
!  * hook):
!  */
  int			NBuffers = 1000;
  int			MaxBackends = 100;
+ int			MaxConnections = 90;
  
  int			VacuumCostPageHit = 1;		/* GUC parameters for vacuum */
  int			VacuumCostPageMiss = 10;
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.383
diff -c -p -r1.383 guc.c
*** src/backend/utils/misc/guc.c	19 Mar 2007 23:38:30 -0000	1.383
--- src/backend/utils/misc/guc.c	10 Apr 2007 20:51:14 -0000
*************** static bool assign_tcp_keepalives_count(
*** 161,166 ****
--- 161,168 ----
  static const char *show_tcp_keepalives_idle(void);
  static const char *show_tcp_keepalives_interval(void);
  static const char *show_tcp_keepalives_count(void);
+ static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
+ static bool assign_maxconnections(int newval, bool doit, GucSource source);
  
  /*
   * GUC option variables that are exported from this module
*************** static struct config_int ConfigureNamesI
*** 1147,1162 ****
  	 * number.
  	 *
  	 * MaxBackends is limited to INT_MAX/4 because some places compute
! 	 * 4*MaxBackends without any overflow check.  Likewise we have to limit
! 	 * NBuffers to INT_MAX/2.
  	 */
  	{
  		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
  			gettext_noop("Sets the maximum number of concurrent connections."),
  			NULL
  		},
! 		&MaxBackends,
! 		100, 1, INT_MAX / 4, NULL, NULL
  	},
  
  	{
--- 1149,1167 ----
  	 * number.
  	 *
  	 * MaxBackends is limited to INT_MAX/4 because some places compute
! 	 * 4*MaxBackends without any overflow check.  This check is made on the
! 	 * assign_maxconnections, since MaxBackends is computed as MaxConnections +
! 	 * autovacuum_max_workers.
! 	 *
! 	 * Likewise we have to limit NBuffers to INT_MAX/2.
  	 */
  	{
  		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
  			gettext_noop("Sets the maximum number of concurrent connections."),
  			NULL
  		},
! 		&MaxConnections,
! 		100, 1, INT_MAX / 4, assign_maxconnections, NULL
  	},
  
  	{
*************** static struct config_int ConfigureNamesI
*** 1620,1625 ****
--- 1625,1639 ----
  		&autovacuum_freeze_max_age,
  		200000000, 100000000, 2000000000, NULL, NULL
  	},
+ 	{
+ 		/* see max_connections */
+ 		{"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ 			gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ 			NULL
+ 		},
+ 		&autovacuum_max_workers,
+ 		10, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL
+ 	},
  
  	{
  		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
*************** show_tcp_keepalives_count(void)
*** 6659,6663 ****
--- 6673,6704 ----
  	return nbuf;
  }
  
+ static bool
+ assign_maxconnections(int newval, bool doit, GucSource source)
+ {
+ 	if (doit)
+ 	{
+ 		if (newval + autovacuum_max_workers > INT_MAX / 4)
+ 			return false;
+ 
+ 		MaxBackends = newval + autovacuum_max_workers;
+ 	}
+ 
+ 	return true;
+ }
+ 
+ static bool
+ assign_autovacuum_max_workers(int newval, bool doit, GucSource source)
+ {
+ 	if (doit)
+ 	{
+ 		if (newval + MaxConnections > INT_MAX / 4)
+ 			return false;
+ 
+ 		MaxBackends = newval + MaxConnections;
+ 	}
+ 
+ 	return true;
+ }
  
  #include "guc-file.c"
Index: src/backend/utils/misc/postgresql.conf.sample
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/postgresql.conf.sample,v
retrieving revision 1.213
diff -c -p -r1.213 postgresql.conf.sample
*** src/backend/utils/misc/postgresql.conf.sample	19 Mar 2007 23:38:30 -0000	1.213
--- src/backend/utils/misc/postgresql.conf.sample	11 Apr 2007 20:42:25 -0000
***************
*** 376,381 ****
--- 376,382 ----
  #autovacuum = on			# enable autovacuum subprocess?
  					# 'on' requires stats_start_collector
  					# and stats_row_level to also be on
+ #autovacuum_max_workers = 10 # max # of autovacuum subprocesses
  #autovacuum_naptime = 1min		# time between autovacuum runs
  #autovacuum_vacuum_threshold = 500	# min # of tuple updates before
  					# vacuum
Index: src/include/miscadmin.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/miscadmin.h,v
retrieving revision 1.193
diff -c -p -r1.193 miscadmin.h
*** src/include/miscadmin.h	1 Mar 2007 14:52:04 -0000	1.193
--- src/include/miscadmin.h	10 Apr 2007 20:51:06 -0000
*************** extern DLLIMPORT char *DataDir;
*** 129,134 ****
--- 129,135 ----
  
  extern DLLIMPORT int NBuffers;
  extern int	MaxBackends;
+ extern int	MaxConnections;
  
  extern DLLIMPORT int MyProcPid;
  extern DLLIMPORT struct Port *MyProcPort;
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h	15 Feb 2007 23:23:23 -0000	1.8
--- src/include/postmaster/autovacuum.h	11 Apr 2007 23:43:45 -0000
***************
*** 14,21 ****
--- 14,24 ----
  #ifndef AUTOVACUUM_H
  #define AUTOVACUUM_H
  
+ #include "storage/lock.h"
+ 
  /* GUC variables */
  extern bool autovacuum_start_daemon;
+ extern int	autovacuum_max_workers;
  extern int	autovacuum_naptime;
  extern int	autovacuum_vac_thresh;
  extern double autovacuum_vac_scale;
*************** extern bool IsAutoVacuumWorkerProcess(vo
*** 34,39 ****
--- 37,48 ----
  extern void autovac_init(void);
  extern int	StartAutoVacLauncher(void);
  extern int	StartAutoVacWorker(void);
+ extern PGPROC *AutoVacuumGetFreeProc(void);
+ extern void AutovacWorkerProcKill(int code, Datum arg);
+ extern int AutoVacuumSemas(void);
+ 
+ /* autovacuum cost-delay balancer */
+ extern void AutoVacuumUpdateDelay(void);
  
  #ifdef EXEC_BACKEND
  extern void AutoVacLauncherMain(int argc, char *argv[]);
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.35
diff -c -p -r1.35 lwlock.h
*** src/include/storage/lwlock.h	3 Apr 2007 16:34:36 -0000	1.35
--- src/include/storage/lwlock.h	6 Apr 2007 02:20:17 -0000
*************** typedef enum LWLockId
*** 61,66 ****
--- 61,67 ----
  	BtreeVacuumLock,
  	AddinShmemInitLock,
  	AutovacuumLock,
+ 	AutovacuumScheduleLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
