bg worker: patch 2 of 6 - job cache

Started by Markus Wannerover 15 years ago1 messages
#1Markus Wanner
markus@bluegap.ch
1 attachment(s)

This is the major refactoring patch, which turns the background workers
into more permanent work-horses. After having processed a job, they now
stay connected to their database and wait for more jobs from the
coordinator.

On the coordinator side, a job cache got added, so it is able to queue
requests for jobs on databases which don't currently have an idle
background worker available. The existing autovacuum functionality got
separated from the background worker infrastructure.

The coordinator uses a transaction to read pg_database and start
background workers as appropriate. However, autovacuum still uses
pg_stat, which needs to get initialized after startup. The coordinator
triggers an initial VACUUM on the template database (template1), which
populates the required statistics to kick off autovacuum.

Note that none of the statistics functions differenciate between
autovacuum workers and other background job types, which might or might
not be what we want.

Another open issue is the starvation problem: jobs for a database, which
doesn't currently have any connected background workers might starve, if
the coordinator isn't allowed to fork any new worker (and all others are
idle on other databases).

Attachments:

step2-job_cache.difftext/x-diff; charset=iso-8859-1; name=step2-job_cache.diffDownload
*** src/backend/storage/ipc/shmqueue.c	2fbe744d1fc4b2a45f1eaa1704276a23edd3f542
--- src/backend/storage/ipc/shmqueue.c	de5c363e57d6086b863eb3b580f235e4dec92fd9
*************** SHMQueueInit(SHM_QUEUE *queue)
*** 44,57 ****
   * SHMQueueIsDetached -- TRUE if element is not currently
   *		in a queue.
   */
- #ifdef NOT_USED
  bool
  SHMQueueIsDetached(SHM_QUEUE *queue)
  {
  	Assert(ShmemAddrIsValid(queue));
  	return (queue->prev == NULL);
  }
- #endif
  
  /*
   * SHMQueueElemInit -- clear an element's links
--- 44,55 ----
============================================================
*** src/backend/tcop/postgres.c	62792c84c23e8b39d78a49f4113cdea2828c0cb7
--- src/backend/tcop/postgres.c	778b93a7067312d73d4c68561d8fa23476c0a292
*************** ProcessInterrupts(void)
*** 2936,2947 ****
  		}
  		if (IsAutoVacuumWorkerProcess())
  		{
  			ImmediateInterruptOK = false;		/* not idle anymore */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
  			ereport(ERROR,
  					(errcode(ERRCODE_QUERY_CANCELED),
! 					 errmsg("canceling autovacuum task")));
  		}
  		if (RecoveryConflictPending)
  		{
--- 2936,2948 ----
  		}
  		if (IsAutoVacuumWorkerProcess())
  		{
+ 			/* FIXME: make sure this works fine for all background jobs */
  			ImmediateInterruptOK = false;		/* not idle anymore */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
  			ereport(ERROR,
  					(errcode(ERRCODE_QUERY_CANCELED),
! 					 errmsg("bg worker [%d/%d]: canceling autovacuum task", MyProcPid, MyBackendId)));
  		}
  		if (RecoveryConflictPending)
  		{
============================================================
*** src/backend/utils/init/globals.c	0c8c71495665f7b1077f5c3f3e5a02c98c92be3a
--- src/backend/utils/init/globals.c	626dcd5939ff8882d155015f0aecae78e2945728
*************** volatile uint32 CritSectionCount = 0;
*** 31,36 ****
--- 31,37 ----
  volatile bool ImmediateInterruptOK = false;
  volatile uint32 InterruptHoldoffCount = 0;
  volatile uint32 CritSectionCount = 0;
+ volatile bool BgWorkerCleanupInProgress = false;
  
  int			MyProcPid;
  pg_time_t	MyStartTime;
============================================================
*** src/include/storage/shmem.h	068b4aff617ae86bce155915e1a8137cb16337c0
--- src/include/storage/shmem.h	8a0aa9faddf9bdc9bd1cdf7032ca89f9ce52bcc2
*************** extern void SHMQueueInit(SHM_QUEUE *queu
*** 119,124 ****
--- 119,125 ----
   * prototypes for functions in shmqueue.c
   */
  extern void SHMQueueInit(SHM_QUEUE *queue);
+ extern bool SHMQueueIsDetached(SHM_QUEUE *queue);
  extern void SHMQueueElemInit(SHM_QUEUE *queue);
  extern void SHMQueueDelete(SHM_QUEUE *queue);
  extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
============================================================
*** src/include/miscadmin.h	726d181c386b7dbd5202e3b19819876f7dadc0f7
--- src/include/miscadmin.h	a1e372be7cde09b2aeecb5d6ba5c59003fd0e831
*************** extern PGDLLIMPORT volatile uint32 CritS
*** 75,80 ****
--- 75,82 ----
  extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount;
  extern PGDLLIMPORT volatile uint32 CritSectionCount;
  
+ extern volatile bool BgWorkerCleanupInProgress;
+ 
  /* in tcop/postgres.c */
  extern void ProcessInterrupts(void);
  
============================================================
*** src/include/storage/lwlock.h	3a8cf5b9c3534985e33cac96819ca385878887d2
--- src/include/storage/lwlock.h	2aea3a2fcddca3a9f9eea62f65d08930d221beb5
*************** typedef enum LWLockId
*** 66,71 ****
--- 66,72 ----
  	AddinShmemInitLock,
  	AutovacuumLock,
  	AutovacuumScheduleLock,
+ 	CoordinatorDatabasesLock,
  	SyncScanLock,
  	RelationMappingLock,
  	AsyncCtlLock,
============================================================
*** src/backend/storage/ipc/procarray.c	62107b3a36554510ce82208eaa98cec1b40f3e14
--- src/backend/storage/ipc/procarray.c	903e6def62dfd4119752e20cb16711b66fdf6996
*************** BackendPidGetProc(int pid)
*** 1652,1657 ****
--- 1652,1692 ----
  }
  
  /*
+  * BackendIdGetProc -- get a backend's PGPROC given its backendId
+  *
+  * Returns NULL if not found.  Note that it is up to the caller to be
+  * sure that the question remains meaningful for long enough for the
+  * answer to be used ...
+  */
+ PGPROC *
+ BackendIdGetProc(BackendId id)
+ {
+ 	PGPROC	   *result = NULL;
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int			index;
+ 
+ 	if (id == InvalidBackendId)
+ 		return NULL;
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 	for (index = 0; index < arrayP->numProcs; index++)
+ 	{
+ 		PGPROC	   *proc = arrayP->procs[index];
+ 
+ 		if (proc->backendId == id)
+ 		{
+ 			result = proc;
+ 			break;
+ 		}
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	return result;
+ }
+ 
+ /*
   * BackendXidGetPid -- get a backend's pid given its XID
   *
   * Returns 0 if not found or it's a prepared transaction.  Note that
============================================================
*** src/include/storage/procarray.h	dd1cc5e6251c6a4d901434e91504b134e9e2b1dc
--- src/include/storage/procarray.h	2b020c22a35cecbbf0da861c0699d3546dfa9bac
*************** extern PGPROC *BackendPidGetProc(int pid
*** 52,57 ****
--- 52,58 ----
  extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids);
  
  extern PGPROC *BackendPidGetProc(int pid);
+ extern PGPROC *BackendIdGetProc(BackendId id);
  extern int	BackendXidGetPid(TransactionId xid);
  extern bool IsBackendPid(int pid);
  
============================================================
*** src/backend/postmaster/autovacuum.c	3b9cbe9ba0d576b32d490f21969b073a7ec55b98
--- src/backend/postmaster/autovacuum.c	50ad2e93982867e91a47e8ca7af5f4be8b975d8f
***************
*** 86,94 ****
--- 86,97 ----
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
  #include "storage/proc.h"
+ #include "storage/procarray.h"
  #include "storage/procsignal.h"
+ #include "storage/shmem.h"
  #include "storage/sinvaladt.h"
  #include "tcop/tcopprot.h"
+ #include "utils/builtins.h"
  #include "utils/fmgroids.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
*************** static int	default_freeze_table_age;
*** 139,147 ****
  static int	default_freeze_min_age;
  static int	default_freeze_table_age;
  
! /* Memory context for long-lived data */
! static MemoryContext AutovacMemCxt;
  
  /* struct to keep track of databases in launcher */
  typedef struct avl_dbase
  {
--- 142,155 ----
  static int	default_freeze_min_age;
  static int	default_freeze_table_age;
  
! /* Memory contexts for long-lived data */
! static MemoryContext AutovacLauncherMemCxt;
! static MemoryContext AutovacWorkerMemCxt;
  
+ /* job handling routines */
+ static void bgworker_job_initialize(worker_state new_state);
+ static void bgworker_job_completed(void);
+ 
  /* struct to keep track of databases in launcher */
  typedef struct avl_dbase
  {
*************** typedef struct avw_dbase
*** 153,164 ****
  /* struct to keep track of databases in worker */
  typedef struct avw_dbase
  {
! 	Oid			adw_datid;
  	char	   *adw_name;
  	TransactionId adw_frozenxid;
  	PgStat_StatDBEntry *adw_entry;
  } avw_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
  {
--- 161,177 ----
  /* struct to keep track of databases in worker */
  typedef struct avw_dbase
  {
! 	Oid			adw_datid;		/* hash key -- must be first */
  	char	   *adw_name;
  	TransactionId adw_frozenxid;
  	PgStat_StatDBEntry *adw_entry;
  } avw_dbase;
  
+ typedef struct cached_job {
+ 	Dlelem cj_links;
+ 	IMessage *cj_msg;
+ } cached_job;
+ 
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
  {
*************** typedef struct autovac_table
*** 186,222 ****
  } autovac_table;
  
  /*-------------
-  * This struct holds information about a single worker's whereabouts.  We keep
-  * an array of these in shared memory, sized according to
-  * autovacuum_max_workers.
-  *
-  * wi_links			entry into free list or running list
-  * wi_dboid			OID of the database this worker is supposed to work on
-  * wi_tableoid		OID of the table currently being vacuumed
-  * wi_backend_id	id of the running worker backend, NULL if not started
-  * wi_launchtime	Time at which this worker was launched
-  * wi_cost_*		Vacuum cost-based delay parameters current in this worker
-  *
-  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
-  * protected by AutovacuumScheduleLock (which is read-only for everyone except
-  * that worker itself).
-  *-------------
-  */
- typedef struct WorkerInfoData
- {
- 	SHM_QUEUE	wi_links;
- 	Oid			wi_dboid;
- 	Oid			wi_tableoid;
- 	BackendId	wi_backend_id;
- 	TimestampTz wi_launchtime;
- 	int			wi_cost_delay;
- 	int			wi_cost_limit;
- 	int			wi_cost_limit_base;
- } WorkerInfoData;
- 
- typedef struct WorkerInfoData *WorkerInfo;
- 
- /*-------------
   * The main autovacuum shmem struct.  On shared memory we store this main
   * struct and the array of WorkerInfo structs.	This struct keeps:
   *
--- 199,204 ----
*************** static AutoVacuumShmemStruct *AutoVacuum
*** 240,253 ****
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  /* the database list in the launcher, and the context that contains it */
  static Dllist *DatabaseList = NULL;
  static MemoryContext DatabaseListCxt = NULL;
  
  /* Pointer to my own WorkerInfo, valid on each worker */
  static WorkerInfo MyWorkerInfo = NULL;
  
- 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 222,241 ----
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /*
+  * Table of databases with at least one connected worker, resides in shared
+  * memory, protected by CoordinatorDatabasesLock
+  */
+ static HTAB *co_databases = NULL;
+ 
  /* the database list in the launcher, and the context that contains it */
  static Dllist *DatabaseList = NULL;
  static MemoryContext DatabaseListCxt = NULL;
  
  /* Pointer to my own WorkerInfo, valid on each worker */
  static WorkerInfo MyWorkerInfo = NULL;
+ static WorkerInfo terminatable_worker = NULL;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** NON_EXEC_STATIC void AutoVacLauncherMain
*** 256,273 ****
  static void handle_imessage(IMessage *msg);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void CoordinatorMaybeTriggerAutoVacuum(void);
  
! static void do_start_worker(avw_dbase *avdb);
! static Oid	do_start_autovacuum_worker(void);
  static void launcher_determine_sleep(bool can_launch, bool recursing,
  									 struct timeval *nap);
! static void launch_worker(TimestampTz now);
  static List *get_database_list(void);
  static void rebuild_database_list(Oid newdb);
  static int	db_comparator(const void *a, const void *b);
  static void autovac_balance_cost(void);
  
  static void do_autovacuum(void);
  static void FreeWorkerInfo(int code, Datum arg);
  
--- 244,278 ----
  static void handle_imessage(IMessage *msg);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void init_co_database(co_database *codb);
! static co_database *get_co_database(Oid dboid);
! static void populate_co_databases(void);
  
! static bool can_deliver_cached_job(co_database *codb, IMessage *msg,
!                                    BackendId *target);
! static WorkerInfo get_idle_worker(co_database *codb);
! static void cache_job(IMessage *msg, co_database *codb);
! static void forward_job(IMessage *msg, co_database *codb, BackendId backend_id);
! static void dispatch_job(IMessage *msg, co_database *codb);
! static void process_cached_jobs(co_database *codb);
! 
! static bool CoordinatorCanLaunchWorker(TimestampTz current_time);
! static void manage_workers(bool can_launch);
! static void autovacuum_maybe_trigger_job(TimestampTz current_time,
! 										 bool can_launch);
! 
! static void do_start_worker(Oid dboid);
! static Oid autovacuum_select_database(void);
  static void launcher_determine_sleep(bool can_launch, bool recursing,
  									 struct timeval *nap);
! static void autovacuum_update_timing(Oid dbid, TimestampTz now);
  static List *get_database_list(void);
  static void rebuild_database_list(Oid newdb);
  static int	db_comparator(const void *a, const void *b);
  static void autovac_balance_cost(void);
  
+ static void add_as_idle_worker(Oid dbid, bool inc_connected_count);
+ 
  static void do_autovacuum(void);
  static void FreeWorkerInfo(int code, Datum arg);
  
*************** static void autovac_refresh_stats(void);
*** 293,298 ****
--- 298,316 ----
  
  
  
+ char *
+ decode_worker_state(worker_state state)
+ {
+ 	switch (state)
+ 	{
+ 		case WS_IDLE: return "WS_IDLE";
+ 		case WS_AUTOVACUUM: return "WS_AUTOVACUUM";
+ 
+ 		default: return "UNKNOWN STATE";
+ 	}
+ }
+ 
+ 
  /********************************************************************
   *					  AUTOVACUUM LAUNCHER CODE
   ********************************************************************/
*************** StartAutoVacLauncher(void)
*** 369,375 ****
--- 387,586 ----
  	return 0;
  }
  
+ void
+ init_co_database(co_database *codb)
+ {
+ 	Assert(ShmemAddrIsValid(codb));
+ 	SHMQueueInit(&codb->codb_idle_workers);
+ 	codb->codb_num_idle_workers = 0;
+ 
+ 	/*
+ 	 * While only the coordinator may fiddle with this list, as its entries
+ 	 * reside in that process' memory, it's safe to set the counters to 0
+ 	 * and initialize the list headers with NULL values using DLInitList().
+ 	 */
+ 	codb->codb_num_cached_jobs = 0;
+ 	DLInitList(&codb->codb_cached_jobs);
+ 
+ 	codb->codb_num_connected_workers = 0;
+ }
+ 
+ static void
+ cache_job(IMessage *msg, co_database *codb)
+ {
+ 	cached_job *job;
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: caching job of type %s for database %d",
+ 		 decode_imessage_type(msg->type), codb->codb_dboid);
+ #endif
+ 
+ 	job = palloc(sizeof(cached_job));
+ 	DLInitElem(&job->cj_links, job);
+ 	job->cj_msg = msg;
+ 	DLAddTail(&codb->codb_cached_jobs, &job->cj_links);
+ 	codb->codb_num_cached_jobs++;
+ }
+ 
  /*
+  * get_idle_worker
+  *
+  * Returns the first idle worker for a given database, removing it from its
+  * list of idle workers. The caller is expected to make sure that there is
+  * at least one idle worker and it must hold the CoordinatorDatabasesLock.
+  */
+ static WorkerInfo
+ get_idle_worker(co_database *codb)
+ {
+ 	WorkerInfo worker;
+ 
+ 	/* remove a worker from the list of idle workers */
+ 	worker = (WorkerInfo) SHMQueueNext(&codb->codb_idle_workers,
+ 									   &codb->codb_idle_workers,
+ 									   offsetof(WorkerInfoData, wi_links));
+ 	Assert(worker);
+ 	SHMQueueDelete(&worker->wi_links);
+ 	Assert(worker->wi_backend_id != InvalidBackendId);
+ 
+ 	/* maintain per-database counter */
+ 	codb->codb_num_idle_workers--;
+ 
+ 	return worker;
+ }
+ 
+ /*
+  * forward_job
+  *
+  * Takes an imessage and forwards it to the first idle backend for the given
+  * database as its next job to process. The caller must hold the
+  * CoordinatorDatabasesLock.
+  */
+ static void
+ forward_job(IMessage *msg, co_database *codb, BackendId backend_id)
+ {
+ 	/* various actions before job delivery depending on the message type */
+ 	switch (msg->type)
+ 	{
+ 		case IMSGT_TERM_WORKER:
+ 			break;
+ 
+ 		case IMSGT_PERFORM_VACUUM:
+ #ifdef COORDINATOR_DEBUG
+ 			elog(DEBUG1, "Coordinator: delivering msg %s of size %d for "
+ 				 "database %d to backend %d",
+ 				 decode_imessage_type(msg->type), msg->size, codb->codb_dboid,
+ 				 backend_id);
+ #endif
+ 			autovacuum_update_timing(codb->codb_dboid, GetCurrentTimestamp());
+ 			break;
+ 
+ 		default:
+ 			/* no-op */
+ 			break;
+ 	}
+ 
+ 	IMessageActivate(msg, backend_id);
+ }
+ 
+ /*
+  * dispatch_job
+  *
+  * Depending on the status of idle workers, this either forwards a job to an
+  * idle worker directly or caches it for later processing. The caller must
+  * hold the CoordinatorDatabasesLock.
+  */
+ void
+ dispatch_job(IMessage *msg, co_database *codb)
+ {
+     WorkerInfo worker;
+ 
+ 	if (codb->codb_num_idle_workers > 0)
+     {
+         worker = get_idle_worker(codb); 
+ 		forward_job(msg, codb, worker->wi_backend_id);
+     }
+ 	else
+ 		cache_job(msg, codb);
+ }
+ 
+ /*
+  * process_cached_jobs
+  *
+  * Dispatches cached jobs to idle background workers, as long as there are
+  * of both. Before delivering a job, an additional check is performed with
+  * can_deliver_cached_job(), which also chooses the background worker to run
+  * the job on.
+  */
+ static void
+ process_cached_jobs(co_database *codb)
+ {
+ 	BackendId	target;
+ 	cached_job *job;
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: cached jobs: %d, idle workers: %d",
+ 		 codb->codb_num_cached_jobs, codb->codb_num_idle_workers);
+ #endif
+ 
+ 	job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
+ 	while ((codb->codb_num_cached_jobs > 0) &&
+ 		   (codb->codb_num_idle_workers > 0) &&
+ 		   (job != NULL))
+ 	{
+ 		target = InvalidBackendId;
+ 		if (can_deliver_cached_job(codb, job->cj_msg, &target))
+ 		{
+ 			/* remove the job from the cache */
+ 			DLRemove(&job->cj_links);
+ 			codb->codb_num_cached_jobs--;
+ 			
+ 			/* forward the job to some idle worker and cleanup */
+ 			if (target == InvalidBackendId)
+ 				target = get_idle_worker(codb)->wi_backend_id;
+ 
+ 			forward_job(job->cj_msg, codb, target);
+ 			pfree(job);
+ 			
+ 			job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
+ 		}
+ 		else
+ 			job = (cached_job*) DLGetSucc(&job->cj_links);
+ 	}
+ }
+ 
+ /*
+  * populate_co_databases
+  *
+  * Called at startup of the coordinator to scan pg_database. Schedules an
+  * initial VACUUM job on the template database to populate pg_stat.
+  */
+ static void
+ populate_co_databases()
+ {
+ 	List     *dblist;
+ 	ListCell *cell;
+ 	IMessage *msg;
+ 
+ 	dblist = get_database_list();
+ 	LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ 	foreach(cell, dblist)
+ 	{
+ 		avw_dbase  *avdb = lfirst(cell);
+ 		co_database *codb = get_co_database(avdb->adw_datid);
+ 		if (codb->codb_dboid == TemplateDbOid)
+ 		{
+ 			/*
+ 			 * Create a cached job as an imessage to ourselves, but without
+ 			 * activating it. It can get forwarded to a backend later on.
+ 			 */
+ 			msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
+ 			cache_job(msg, codb);
+ 		}
+ 	}
+ 	LWLockRelease(CoordinatorDatabasesLock);
+ }
+ 
+ /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
*************** AutoVacLauncherMain(int argc, char *argv
*** 377,382 ****
--- 588,594 ----
  {
  	sigjmp_buf	local_sigjmp_buf;
  	IMessage   *msg = NULL;
+ 	bool        can_launch;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 450,461 ****
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 										  "Autovacuum Launcher",
! 										  ALLOCSET_DEFAULT_MINSIZE,
! 										  ALLOCSET_DEFAULT_INITSIZE,
! 										  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  	/*
  	 * If an exception is encountered, processing resumes here.
--- 662,673 ----
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacLauncherMemCxt = AllocSetContextCreate(TopMemoryContext,
! 												  "Autovacuum Launcher",
! 												  ALLOCSET_DEFAULT_MINSIZE,
! 												  ALLOCSET_DEFAULT_INITSIZE,
! 												  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacLauncherMemCxt);
  
  	/*
  	 * If an exception is encountered, processing resumes here.
*************** AutoVacLauncherMain(int argc, char *argv
*** 479,495 ****
  		EmitErrorReport();
  
  		/* Abort the current transaction in order to recover */
! 		AbortCurrentTransaction();
  
  		/*
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
  
  		/* don't leave dangling pointers to freed memory */
  		DatabaseListCxt = NULL;
--- 691,707 ----
  		EmitErrorReport();
  
  		/* Abort the current transaction in order to recover */
! 		AbortOutOfAnyTransaction();
  
  		/*
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacLauncherMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacLauncherMemCxt);
  
  		/* don't leave dangling pointers to freed memory */
  		DatabaseListCxt = NULL;
*************** AutoVacLauncherMain(int argc, char *argv
*** 520,525 ****
--- 732,742 ----
  	AutoVacuumShmem->av_launcherid = MyBackendId;
  
  	/*
+ 	 * Initial population of the database list from pg_database
+ 	 */
+ 	populate_co_databases();
+ 
+ 	/*
  	 * Create the initial database list.  The invariant we want this list to
  	 * keep is that it's ordered by decreasing next_time.  As soon as an entry
  	 * is updated to a higher time, it will be moved to the front (which is
*************** AutoVacLauncherMain(int argc, char *argv
*** 530,536 ****
  
  	for (;;)
  	{
! 		struct timeval nap;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 747,754 ----
  
  	for (;;)
  	{
! 		TimestampTz		current_time;
! 		struct timeval	nap;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 539,546 ****
  		if (!PostmasterIsAlive(true))
  			proc_exit(1);
  
! 		launcher_determine_sleep((AutoVacuumShmem->av_freeWorkers != NULL),
! 								 false, &nap);
  
  		/* Allow sinval catchup interrupts while sleeping */
  		EnableCatchupInterrupt();
--- 757,764 ----
  		if (!PostmasterIsAlive(true))
  			proc_exit(1);
  
! 		can_launch = (AutoVacuumShmem->av_freeWorkers != NULL);
! 		launcher_determine_sleep(can_launch, false, &nap);
  
  		/* Allow sinval catchup interrupts while sleeping */
  		EnableCatchupInterrupt();
*************** AutoVacLauncherMain(int argc, char *argv
*** 647,658 ****
  			msg = IMessageCheck();
  		}
  
  		/*
  		 * Periodically check and trigger autovacuum workers, if autovacuum
  		 * is enabled.
  		 */
  		if (autovacuum_enabled)
! 			CoordinatorMaybeTriggerAutoVacuum();
  	}
  
  	/* Normal exit from the autovac launcher is here */
--- 865,881 ----
  			msg = IMessageCheck();
  		}
  
+ 		current_time  = GetCurrentTimestamp();
+ 		can_launch = CoordinatorCanLaunchWorker(current_time);
+ 
  		/*
  		 * Periodically check and trigger autovacuum workers, if autovacuum
  		 * is enabled.
  		 */
  		if (autovacuum_enabled)
! 			autovacuum_maybe_trigger_job(current_time, can_launch);
! 
! 		manage_workers(can_launch);
  	}
  
  	/* Normal exit from the autovac launcher is here */
*************** handle_imessage(IMessage *msg)
*** 666,680 ****
  void
  handle_imessage(IMessage *msg)
  {
- 	IMessageType	msg_type;
  	BackendId		msg_sender;
  
! 	msg_type = msg->type;
  	msg_sender = msg->sender;
  
  #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG3, "Coordinator: received %s from pid %d",
! 		 decode_imessage_type(msg->type), msg->sender);
  #endif
  
  	switch (msg->type)
--- 889,927 ----
  void
  handle_imessage(IMessage *msg)
  {
  	BackendId		msg_sender;
+ 	PGPROC         *proc;
+ 	TransactionId   local_xid = InvalidTransactionId;
+ 	co_database    *codb = NULL;
+ 	Oid             dboid = InvalidOid;
  
! 	/*
! 	 * Get the PGPROC entry of the sender and the related database info, if
! 	 * any.
! 	 */
  	msg_sender = msg->sender;
  
+ 	LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 	proc = BackendIdGetProc(msg_sender);
+ 	if (proc)
+ 	{
+ 		local_xid = proc->xid;
+ 		dboid = proc->databaseId;
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
  #ifdef COORDINATOR_DEBUG
! 	if (proc)
! 		elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
! 			 "\t(connected to db %d, local xid %d)",
! 			 decode_imessage_type(msg->type), msg->size, msg_sender,
! 			 dboid, local_xid);
! 	else
! 		elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
! 			 "\t(for which no PGPROC could be found)",
! 			 decode_imessage_type(msg->type), msg->size, msg_sender);
  #endif
  
  	switch (msg->type)
*************** handle_imessage(IMessage *msg)
*** 687,701 ****
  			/* consume the message */
  			IMessageRemove(msg);
  
  			/*
! 			 * default to immediately command the new background
! 			 * worker to perform an autovacuum step.
  			 */
! 			if (IMSGT_REGISTER_WORKER)
! 			{
! 				msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
! 				IMessageActivate(msg, msg_sender);
! 			}
  
  			/*
  			 * Rebalance cost limits, as the worker has already reported its
--- 934,952 ----
  			/* consume the message */
  			IMessageRemove(msg);
  
+ 			LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
+ 			codb = get_co_database(dboid);
+ 			process_cached_jobs(codb);
+ 			LWLockRelease(CoordinatorDatabasesLock);
+ 
  			/*
! 			 * We trigger a DatabaseList rebuild if it is still empty and
! 			 * after a job is done. This mainly covers the initialization
! 			 * phase after the first background worker is done with vacuuming
! 			 * template1 (and thus having populated pgstat).
  			 */
! 			if (DLGetHead(DatabaseList) == NULL)
! 				rebuild_database_list(InvalidOid);
  
  			/*
  			 * Rebalance cost limits, as the worker has already reported its
*************** handle_imessage(IMessage *msg)
*** 714,735 ****
  			IMessageRemove(msg);
  
  			/* trigger an autovacuum worker */
! 			do_start_autovacuum_worker();
! 
  			break;
  
  		default:
! 			elog(WARNING, "Unknown message type: %c, ignored!", msg->type);
  			IMessageRemove(msg);
  	}
  }
  
  static void
! CoordinatorMaybeTriggerAutoVacuum()
  {
! 	TimestampTz current_time = 0;
  	bool		can_launch;
- 	Dlelem	   *elem;
  
  	/* FIXME: indentation */
  	{
--- 965,1170 ----
  			IMessageRemove(msg);
  
  			/* trigger an autovacuum worker */
! 			dboid = autovacuum_select_database();
! 			if (dboid != InvalidOid)
! 			{
! 				LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
! 				codb = get_co_database(dboid);
! 				msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
! 				dispatch_job(msg, codb);
! 				LWLockRelease(CoordinatorDatabasesLock);
! 			}
  			break;
  
  		default:
! 			elog(WARNING, "Coordinator: unknown message type: %c, ignored!",
! 				 msg->type);
  			IMessageRemove(msg);
  	}
  }
  
+ 
+ 
+ /*
+  * get_co_database
+  *
+  * Gets or creates the database info for replication in shared memory.
+  * Expects the caller to have the CoordinatorDatabasesLock.
+  */
+ co_database *
+ get_co_database(Oid dboid)
+ {
+     co_database *codb;
+     bool found;
+ 
+     codb = hash_search(co_databases, &dboid, HASH_ENTER, &found);
+     if (!found)
+         init_co_database(codb);
+ 
+     return codb;
+ }
+ 
+ static bool
+ can_deliver_cached_job(co_database *codb, IMessage *msg, BackendId *target)
+ {
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: checking deliverability of job type %s",
+ 		 decode_imessage_type(msg->type));
+ #endif
+ 
+ 	switch (msg->type)
+ 	{
+ 		case IMSGT_TERM_WORKER:
+ 		case IMSGT_PERFORM_VACUUM:
+ 			return true;
+ 
+ 		default:
+ 			elog(WARNING, "Coordinator: missing deliverability check for "
+ 				 "message type %s", decode_imessage_type(msg->type));
+ 			return false;
+ 	}
+ }
+ 
+ /*
+  * manage_workers
+  *
+  * Starts background workers for databases which have at least one cached
+  * job or which have less than min_background_workers connected. Within the
+  * same loop, the autovacuum_max_workers is checked and terminates a worker
+  * accordingly.
+  * 
+  * Note that at max one worker can be requested to start or stop per
+  * invocation.
+  */
  static void
! manage_workers(bool can_launch)
  {
! 	HASH_SEQ_STATUS			hash_status;
! 	co_database	           *codb;
! 	Oid                     launch_dboid = InvalidOid;
! 	float                   max_score = 0.0,
! 		                    score;
! 	bool                    worker_slots_available;
! 	int                     idle_workers_required;
! 	int                     job_workers_required;
! 
! 	LWLockAcquire(AutovacuumLock, LW_SHARED);
! 	worker_slots_available = (AutoVacuumShmem->av_freeWorkers != NULL);
! 	LWLockRelease(AutovacuumLock);
! 
! 	/*
! 	 * Terminate an unneeded worker that has been fetched from the list of
! 	 * idle workers in the last invocation. We defer sending the signal one
! 	 * invocation to make sure the coordinator had time to handle all
! 	 * pending messages from that worker. As idle workers don't ever send
! 	 * messages, we can safely assume there is no pending message from that
! 	 * worker by now.
! 	 */
! 	if (terminatable_worker != NULL)
! 	{
! 		IMessage *msg;
! 
! #ifdef COORDINATOR_DEBUG
! 		PGPROC *proc = BackendIdGetProc(terminatable_worker->wi_backend_id);
! 		if (proc)
! 			elog(DEBUG3, "Coordinator: terminating worker [%d/%d].",
! 				 proc->pid, terminatable_worker->wi_backend_id);
! 		else
! 			elog(WARNING, "Coordinator: terminating worker (no PGPROC, backend %d).",
! 				 terminatable_worker->wi_backend_id);
! #endif
! 
! 		msg = IMessageCreate(IMSGT_TERM_WORKER, 0);
! 		IMessageActivate(msg, terminatable_worker->wi_backend_id);
! 
! 		terminatable_worker = NULL;
! 	}
! 
! #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG3, "Coordinator: manage_workers: can_launch: %s, slots_available: %s",
! 		 (can_launch ? "true" : "false"), (worker_slots_available ? "true" : "false"));
! #endif
! 
! 	/*
! 	 * Check the list of databases and fire the first pending request
! 	 * we find.
! 	 */
! 	idle_workers_required = 0;
! 	job_workers_required = 0;
! 	LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
! 	hash_seq_init(&hash_status, co_databases);
! 	while ((codb = (co_database*) hash_seq_search(&hash_status)))
! 	{
! 		score = ((float) codb->codb_num_cached_jobs /
! 				 (float) (codb->codb_num_connected_workers + 1)) * 100.0;
! 
! 		if (codb->codb_num_idle_workers < min_spare_background_workers)
! 			score += (min_spare_background_workers -
! 					  codb->codb_num_idle_workers) * 10.0;
! 
! #ifdef COORDINATOR_DEBUG
! 		elog(DEBUG3, "Coordinator:     db %d, idle/conn: %d/%d, jobs: %d, score: %0.1f",
! 			 codb->codb_dboid, codb->codb_num_idle_workers,
! 			 codb->codb_num_connected_workers, codb->codb_num_cached_jobs,
! 			 score);
! #endif
! 
! 		if (codb->codb_num_cached_jobs &&
! 			(codb->codb_num_connected_workers == 0))
! 			job_workers_required++;
! 
! 		if (codb->codb_num_idle_workers < min_spare_background_workers)
! 			idle_workers_required += (min_spare_background_workers -
! 									  codb->codb_num_idle_workers);
! 
! 		/*
! 		 * FIXME: "misconfiguration" allows "starvation" in case the global
! 		 *        maximum is reached all with idle workers, but other dbs
! 		 *        w/o a single worker still have jobs.
! 		 */
! 		if (can_launch && ((codb->codb_num_cached_jobs > 0) ||
! 						   (codb->codb_num_idle_workers <
! 							min_spare_background_workers)))
! 		{
! 			if (can_launch && (score > max_score))
! 			{
! 				launch_dboid = codb->codb_dboid;
! 				max_score = score;
! 			}
! 		}
! 
! 		/*
! 		 * If we are above limit, we fetch an idle worker from the list
! 		 * and mark it as terminatable. Actual termination happens in
! 		 * the following invocation, see above.
! 		 */
! 		if ((terminatable_worker == NULL) &&
! 			(codb->codb_num_idle_workers > max_spare_background_workers))
! 			terminatable_worker = get_idle_worker(codb);
! 	}
! 	LWLockRelease(CoordinatorDatabasesLock);
! 
! 	if (!worker_slots_available && idle_workers_required > 0)
! 	{
! 		elog(WARNING, "Coordinator: no more background workers available, but requiring %d more, according to min_spare_background_workers.",
! 			 idle_workers_required);
! 	}
! 
! 	if (!worker_slots_available && job_workers_required > 0)
! 	{
! 		elog(WARNING, "Coordinator: no background workers avalibale, but %d databases have background jobs pending.",
! 			 job_workers_required);
! 	}
! 
! 	/* request a worker for the first database found, which needs one */
! 	if (OidIsValid(launch_dboid))
! 		do_start_worker(launch_dboid);
! }
! 
! static bool
! CoordinatorCanLaunchWorker(TimestampTz current_time)
! {
  	bool		can_launch;
  
  	/* FIXME: indentation */
  	{
*************** CoordinatorMaybeTriggerAutoVacuum()
*** 741,747 ****
  		 * other worker failed while starting up.
  		 */
  
- 		current_time = GetCurrentTimestamp();
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
  
  		can_launch = (AutoVacuumShmem->av_freeWorkers != NULL);
--- 1176,1181 ----
*************** CoordinatorMaybeTriggerAutoVacuum()
*** 800,810 ****
  				can_launch = false;
  		}
  		LWLockRelease(AutovacuumLock);	/* either shared or exclusive */
  
! 		/* if we can't do anything, just return. */
! 		if (!can_launch)
! 			return;
  
  		/* We're OK to start a new worker */
  
  		elem = DLGetTail(DatabaseList);
--- 1234,1255 ----
  				can_launch = false;
  		}
  		LWLockRelease(AutovacuumLock);	/* either shared or exclusive */
+ 	}
  
! 	return can_launch;
! }
  
+ static void
+ autovacuum_maybe_trigger_job(TimestampTz current_time, bool can_launch)
+ {
+ 	Oid         dboid = InvalidOid;
+ 	Dlelem	   *elem;
+ 	IMessage   *msg;
+ 	co_database *codb;
+ 
+ 	/* FIXME: indentation */
+ 	{
+ 
  		/* We're OK to start a new worker */
  
  		elem = DLGetTail(DatabaseList);
*************** CoordinatorMaybeTriggerAutoVacuum()
*** 818,836 ****
  			 */
  			if (TimestampDifferenceExceeds(avdb->adl_next_worker,
  										   current_time, 0))
! 				launch_worker(current_time);
  		}
  		else
  		{
  			/*
! 			 * Special case when the list is empty: start a worker right away.
! 			 * This covers the initial case, when no database is in pgstats
! 			 * (thus the list is empty).  Note that the constraints in
! 			 * launcher_determine_sleep keep us from starting workers too
! 			 * quickly (at most once every autovacuum_naptime when the list is
! 			 * empty).
  			 */
! 			launch_worker(current_time);
  		}
  	}
  }
--- 1263,1301 ----
  			 */
  			if (TimestampDifferenceExceeds(avdb->adl_next_worker,
  										   current_time, 0))
! 			{
! 				dboid = autovacuum_select_database();
! 				if (OidIsValid(dboid))
! 				{
! 					LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
! 
! 					codb = get_co_database(dboid);
! 
! 					/*
! 					 * Only dispatch a job, if it can be processed immediately
! 					 * so we don't end up having lots of autovacuum requests
! 					 * in the job cache.
! 					 */
! 					if (can_launch || codb->codb_num_idle_workers > 0)
! 					{
! 						msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
! 						dispatch_job(msg, codb);
! 					}
! 
! 					LWLockRelease(CoordinatorDatabasesLock);
! 				}
! 			}
  		}
  		else
  		{
  			/*
! 			 * If the list is still empty, this is a no-op. Instead we simply
! 			 * wait until the initial vacuum on the template database is
! 			 * done. That will populate pg_stat.
  			 */
! #ifdef COORDINATOR_DEBUG
! 			elog(DEBUG5, "Coordinator: DatabaseList is still empty.");
! #endif
  		}
  	}
  }
*************** rebuild_database_list(Oid newdb)
*** 933,939 ****
  	/* use fresh stats */
  	autovac_refresh_stats();
  
! 	newcxt = AllocSetContextCreate(AutovacMemCxt,
  								   "AV dblist",
  								   ALLOCSET_DEFAULT_MINSIZE,
  								   ALLOCSET_DEFAULT_INITSIZE,
--- 1398,1404 ----
  	/* use fresh stats */
  	autovac_refresh_stats();
  
! 	newcxt = AllocSetContextCreate(AutovacLauncherMemCxt,
  								   "AV dblist",
  								   ALLOCSET_DEFAULT_MINSIZE,
  								   ALLOCSET_DEFAULT_INITSIZE,
*************** void
*** 1129,1141 ****
   * start a worker.
   */
  void
! do_start_worker(avw_dbase *avdb)
  {
  	WorkerInfo	worker;
  
  #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG5, "Coordinator: requesting worker for database %s",
! 		 avdb->adw_name);
  #endif
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
--- 1594,1607 ----
   * start a worker.
   */
  void
! do_start_worker(Oid dboid)
  {
  	WorkerInfo	worker;
  
+ 	Assert(OidIsValid(dboid));
+ 
  #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG3, "Coordinator: requesting worker for database %d.", dboid);
  #endif
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
*************** do_start_worker(avw_dbase *avdb)
*** 1151,1157 ****
  
  	AutoVacuumShmem->av_freeWorkers = (WorkerInfo) worker->wi_links.next;
  
! 	worker->wi_dboid = avdb->adw_datid;
  	worker->wi_backend_id = InvalidBackendId;
  	worker->wi_launchtime = GetCurrentTimestamp();
  
--- 1617,1623 ----
  
  	AutoVacuumShmem->av_freeWorkers = (WorkerInfo) worker->wi_links.next;
  
! 	worker->wi_dboid = dboid;
  	worker->wi_backend_id = InvalidBackendId;
  	worker->wi_launchtime = GetCurrentTimestamp();
  
*************** do_start_worker(avw_dbase *avdb)
*** 1163,1188 ****
  }
  
  /*
!  * do_start_autovacuum_worker
   *
!  * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.	It fails gracefully if invoked when
!  * autovacuum_workers are already active.
   *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
  static Oid
! do_start_autovacuum_worker(void)
  {
! 	List	   *dblist;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
  	bool		for_xid_wrap;
  	avw_dbase  *avdb;
  	TimestampTz current_time;
  	bool		skipit = false;
! 	Oid			retval = InvalidOid;
  	MemoryContext tmpcxt,
  				oldcxt;
  
--- 1629,1653 ----
  }
  
  /*
!  * autovacuum_select_database
   *
!  * It determines what database to work on and sets up shared memory stuff. It
!  * fails gracefully if invoked when autovacuum_workers are already active.
   *
!  * Returns a pointer to the coordinator info struct of the database that the
!  * next worker should process, or NULL if no database needs vacuuming.
   */
  static Oid
! autovacuum_select_database(void)
  {
! 	List		*dblist;
! 	ListCell    *cell;
  	TransactionId xidForceLimit;
  	bool		for_xid_wrap;
  	avw_dbase  *avdb;
  	TimestampTz current_time;
  	bool		skipit = false;
! 	Oid         retval = InvalidOid;
  	MemoryContext tmpcxt,
  				oldcxt;
  
*************** do_start_autovacuum_worker(void)
*** 1315,1324 ****
  			avdb = tmp;
  	}
  
- 	/* Found a database -- process it */
  	if (avdb != NULL)
  	{
! 		do_start_worker(avdb);
  		retval = avdb->adw_datid;
  	}
  	else if (skipit)
--- 1780,1788 ----
  			avdb = tmp;
  	}
  
  	if (avdb != NULL)
  	{
! 		/* We've found a database that needs vacuuming, return its id */
  		retval = avdb->adw_datid;
  	}
  	else if (skipit)
*************** do_start_autovacuum_worker(void)
*** 1337,1361 ****
  }
  
  /*
!  * launch_worker
   *
!  * Wrapper for starting a worker from the launcher.  Besides actually starting
!  * it, update the database list to reflect the next time that another one will
!  * need to be started on the selected database.  The actual database choice is
!  * left to do_start_autovacuum_worker.
   *
   * This routine is also expected to insert an entry into the database list if
   * the selected database was previously absent from the list.
   */
  static void
! launch_worker(TimestampTz now)
  {
! 	Oid			dbid;
! 	Dlelem	   *elem;
  
! 	dbid = do_start_autovacuum_worker();
! 	if (OidIsValid(dbid))
  	{
  		/*
  		 * Walk the database list and update the corresponding entry.  If the
  		 * database is not on the list, we'll recreate the list.
--- 1801,1824 ----
  }
  
  /*
!  * autovacuum_update_timing
   *
!  * After having started an autovacuum job, the coordinator needs to update
!  * the database list to reflect the next time that another one will need to
!  * be started on the selected database.
   *
   * This routine is also expected to insert an entry into the database list if
   * the selected database was previously absent from the list.
   */
+ 
  static void
! autovacuum_update_timing(Oid dbid, TimestampTz now)
  {
! 	Dlelem		   *elem;
  
! 	/* FIXME: indentation */
  	{
+ 
  		/*
  		 * Walk the database list and update the corresponding entry.  If the
  		 * database is not on the list, we'll recreate the list.
*************** StartAutoVacWorker(void)
*** 1492,1497 ****
--- 1955,2132 ----
  }
  
  /*
+  * add_as_idle_worker
+  *
+  * Marks the current worker as idle by adding it to the database's list of
+  * idle worker backends. The caller is expected to hold the AutovacuumLock.
+  */
+ static void
+ add_as_idle_worker(Oid dbid, bool inc_connected_count)
+ {
+ 	co_database *codb;
+ 
+ 	Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
+ 
+ 	/* Lookup the corresponding database, or create an entry for it */
+ 	LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ 	codb = get_co_database(dbid);
+ 
+ 	if (inc_connected_count)
+ 		codb->codb_num_connected_workers++;
+ 
+ 	/* add as an idle worker */
+ 	SHMQueueInsertBefore(&codb->codb_idle_workers, &MyWorkerInfo->wi_links);
+ 	codb->codb_num_idle_workers++;
+ 
+ 	LWLockRelease(CoordinatorDatabasesLock);
+ }
+ 
+ /*
+  * bgworker_job_initialize
+  *
+  * Initializes the memory contexts for a background job.
+  */
+ void
+ bgworker_job_initialize(worker_state new_state)
+ {
+ 	/*
+ 	 * Note that the coordinator is responsible for dequeuing the worker from
+ 	 * the list of idle backends, but is shall *NOT* assign a worker state,
+ 	 * we do that from the worker exclusively.
+ 	 */
+ 	Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
+ 	Assert(MyWorkerInfo->wi_state == WS_IDLE);
+ 
+ 	MyWorkerInfo->wi_state = new_state;
+ 	switch (new_state)
+ 	{
+ 		case WS_IDLE:
+ 			Assert(false);    /* use bgworker_job_completed instead */
+ 			break;
+ 		case WS_AUTOVACUUM:
+ 			set_ps_display("bg worker: autovacuum", false);
+ 			break;
+ 		default:
+ 			set_ps_display("bg worker: unknown", false);
+ 	}
+ 
+ 	/*
+ 	 * StartTransactionCommand and CommitTransactionCommand will
+ 	 * automatically switch to other contexts.  None the less we need this
+ 	 * one for other book-keeping of the various background jobs across
+ 	 * transactions, for example to keep the list of relations to vacuum.
+ 	 */
+ 	Assert(AutovacWorkerMemCxt == NULL);
+ 	AutovacWorkerMemCxt = AllocSetContextCreate(TopMemoryContext,
+ 										   "Background Worker",
+ 										   ALLOCSET_DEFAULT_MINSIZE,
+ 										   ALLOCSET_DEFAULT_INITSIZE,
+ 										   ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	MessageContext = AllocSetContextCreate(TopMemoryContext,
+ 										   "MessageContext",
+ 										   ALLOCSET_DEFAULT_MINSIZE,
+ 										   ALLOCSET_DEFAULT_INITSIZE,
+ 										   ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	MemoryContextSwitchTo(AutovacWorkerMemCxt);
+ }
+ 
+ /*
+  * bgworker_job_completed
+  *
+  * Cleans up the memory contexts used for the worker's current job and
+  * informs the coordinator.
+  */
+ void
+ bgworker_job_completed(void)
+ {
+ 	/* Notify the coordinator of the job completion. */
+ #ifdef COORDINATOR_DEBUG
+ 	ereport(DEBUG3,
+ 			(errmsg("bg worker [%d]: job completed.", MyProcPid)));
+ #endif
+ 
+ 	/* reset the worker state */
+ 	bgworker_reset();
+ }
+ 
+ void
+ bgworker_reset(void)
+ {
+ 	BackendId AutovacuumLauncherId;
+ 	IMessage *msg;
+ 
+ 	elog(DEBUG5, "bg worker [%d/%d]: resetting",
+ 		 MyProcPid, MyBackendId);
+ 
+ 	Assert(MyWorkerInfo->wi_state != WS_IDLE);
+ 	Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
+ 
+ 	/* reset the worker state */
+ 	MyWorkerInfo->wi_state = WS_IDLE;
+ 	set_ps_display("bg worker: idle", false);
+ 
+ 	/* clean up memory contexts */
+ 	Assert(AutovacWorkerMemCxt);
+ 	MemoryContextSwitchTo(TopMemoryContext);
+ 	MemoryContextDelete(AutovacWorkerMemCxt);
+ 	AutovacWorkerMemCxt = NULL;
+ 	MemoryContextDelete(MessageContext);
+ 	MessageContext = NULL;
+ 
+ 	/* Reset the the process-local cleanup handler state. */
+ 	BgWorkerCleanupInProgress = false;
+ 
+ 	/* propagate as idle worker, inform the coordinator */
+ 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 	add_as_idle_worker(MyDatabaseId, false);
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	AutovacuumLauncherId = GetAutovacuumLauncherId();
+ 	if (AutovacuumLauncherId != InvalidBackendId)
+ 	{
+ 		msg = IMessageCreate(IMSGT_READY, 0);
+ 		IMessageActivate(msg, AutovacuumLauncherId);
+ 	}
+ 	else
+ 		elog(WARNING, "bg worker [%d/%d]: no coordinator?!?",
+ 			 MyProcPid, MyBackendId);
+ }
+ 
+ void
+ bgworker_job_failed(int errcode)
+ {
+ 	TransactionId xid;
+ 
+ 	xid = GetTopTransactionIdIfAny();
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	ereport(DEBUG3,
+ 			(errmsg("bg worker [%d/%d]: job failed (xid: %d)",
+ 					MyProcPid, MyBackendId, xid)));
+ #endif
+ 
+ 	/*
+ 	 * Abort any transaction that might still be running and tell the
+ 	 * coordinator that we are ready to process the next background job.
+ 	 */
+ 	AbortOutOfAnyTransaction();
+ 
+ 	/*
+ 	 * Flush the error state.
+ 	 */
+ 	FlushErrorState();
+ 
+ 	/*
+ 	 * Make sure pgstat also considers our stat data as gone.
+ 	 */
+ 	pgstat_clear_snapshot();
+ 
+ 	Assert(!IMessageCheck());
+ }
+ 
+ /*
   * AutoVacWorkerMain
   */
  NON_EXEC_STATIC void
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1632,1641 ****
  	/* FIXME: indentation */
  	{
  
- 		/* insert into the running list */
- 		SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
- 							 &MyWorkerInfo->wi_links);
- 
  		/*
  		 * remove from the "starting" pointer, so that the launcher can start
  		 * a new worker if required
--- 2267,2272 ----
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1665,1677 ****
  		 */
  		InitPostgres(NULL, dbid, NULL, dbname);
  		SetProcessingMode(NormalProcessing);
! 		set_ps_display(dbname, false);
! 		ereport(DEBUG1,
! 				(errmsg("autovacuum: processing database \"%s\"", dbname)));
  	}
  
  	MyWorkerInfo->wi_backend_id = MyBackendId;
  
  	/* register with the coordinator */
  	if (coordinator_id != InvalidBackendId)
  	{
--- 2296,2323 ----
  		 */
  		InitPostgres(NULL, dbid, NULL, dbname);
  		SetProcessingMode(NormalProcessing);
! 		set_ps_display("bg worker: idle", false);
  	}
  
+ 	AutovacWorkerMemCxt = NULL;
+ 
  	MyWorkerInfo->wi_backend_id = MyBackendId;
+ 	MyWorkerInfo->wi_state = WS_IDLE;
  
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG3, "bg worker [%d/%d]: connected to database %d",
+ 		 MyProcPid, MyBackendId, dbid);
+ #endif
+ 
+ 	/*
+ 	 * Add as an idle worker and notify the coordinator only *after* having
+ 	 * set MyProc->databaseId in InitPostgres, so the coordinator can
+ 	 * determine which database we are connected to.
+ 	 */
+ 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 	add_as_idle_worker(dbid, true);
+ 	LWLockRelease(AutovacuumLock);
+ 
  	/* register with the coordinator */
  	if (coordinator_id != InvalidBackendId)
  	{
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1684,1695 ****
--- 2330,2354 ----
  
  	while (!terminate_worker)
  	{
+ 		PG_TRY();
+ 		{
+ 		/* FIXME: indentation */
+ 
  		CHECK_FOR_INTERRUPTS();
  
  		ImmediateInterruptOK = true;
  		pg_usleep(1000000L);
  		ImmediateInterruptOK = false;
  
+ 		/*
+ 		 * FIXME: check different ways of terminating a background worker
+ 		 *        via ProcDiePending. How about postmaster initiated
+ 		 *        restarts?
+ 		 */
+ 		if (ProcDiePending)
+ 			elog(FATAL, "bg worker [%d/%d]: Terminated via ProcDie",
+ 				 MyProcPid, MyBackendId);
+ 
  		while ((msg = IMessageCheck()) && !terminate_worker)
  		{
  #ifdef COORDINATOR_DEBUG
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1702,1762 ****
  							MyDatabaseId)));
  #endif
  
- 			/*
- 			 * StartTransactionCommand and CommitTransactionCommand will
- 			 * automatically switch to other contexts.  None the less we need
- 			 * this one for other book-keeping of the various background
- 			 * jobs across transactions, for example to keep the list of
- 			 * relations to vacuum.
- 			 */
- 			AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
- 												  "Background Worker",
- 												  ALLOCSET_DEFAULT_MINSIZE,
- 												  ALLOCSET_DEFAULT_INITSIZE,
- 												  ALLOCSET_DEFAULT_MAXSIZE);
- 			MemoryContextSwitchTo(AutovacMemCxt);
- 
  			switch (msg->type)
  			{
  				case IMSGT_PERFORM_VACUUM:
  					/* immediately remove the message to free shared memory */
  					IMessageRemove(msg);
  
  					/* do an appropriate amount of work */
  					do_autovacuum();
  
- 
  					/*
! 					 * send an IMSGT_READY to inform the coordinator that we
! 					 * finished vacuuming.
  					 */
! 					msg = IMessageCreate(IMSGT_READY, 0);
! 					IMessageActivate(msg, GetAutovacuumLauncherId());
  
! 					/*
! 					 * for now, we always terminate the worker after an 
! 					 * autovacuum job.
! 					 */
! 					terminate_worker = true;
  					break;
  
  				default:
  					ereport(WARNING,
  							(errmsg("bg worker [%d]: invalid message type "
  									"'%c' ignored",
  									MyProcPid, msg->type)));
  
! 					/* keep shared memory clean */
! 					IMessageRemove(msg);
! 			}
  
! 			MemoryContextSwitchTo(TopMemoryContext);
! 			MemoryContextDelete(AutovacMemCxt);
  
! 			CHECK_FOR_INTERRUPTS();
  		}
  	}
  
  	/* All done, go away */
  	ereport(DEBUG1, (errmsg("bg worker [%d/%d]: terminating",
  							MyProcPid, MyBackendId)));
--- 2361,2458 ----
  							MyDatabaseId)));
  #endif
  
  			switch (msg->type)
  			{
+ 				case IMSGT_TERM_WORKER:
+ 					IMessageRemove(msg);
+ 					terminate_worker = true;
+ 					break;
+ 
  				case IMSGT_PERFORM_VACUUM:
  					/* immediately remove the message to free shared memory */
  					IMessageRemove(msg);
  
+ 					bgworker_job_initialize(WS_AUTOVACUUM);
+ 
+ 					/*
+ 					 * Add ourselves to the list of runningWorkers
+ 					 */
+ 					LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 					SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
+ 										 &MyWorkerInfo->wi_links);
+ 					LWLockRelease(AutovacuumLock);
+ 
  					/* do an appropriate amount of work */
  					do_autovacuum();
  
  					/*
! 					 * Remove ourselves from the list of runningWorkers and
! 					 * mark as available background worker.
  					 */
! 					LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 					SHMQueueDelete(&MyWorkerInfo->wi_links);
! 					LWLockRelease(AutovacuumLock);
  
! 					bgworker_job_completed();
  					break;
  
  				default:
+ 					/* keep shared memory clean */
+ 					IMessageRemove(msg);
+ 
  					ereport(WARNING,
  							(errmsg("bg worker [%d]: invalid message type "
  									"'%c' ignored",
  									MyProcPid, msg->type)));
+ 			}
  
! 			CHECK_FOR_INTERRUPTS();
! 		}
  
! 		}
! 		PG_CATCH();
! 		{
! 			ErrorData *errdata;
! 			MemoryContext ecxt;
  
! 			ecxt = MemoryContextSwitchTo(AutovacWorkerMemCxt);
! 			errdata = CopyErrorData();
! 
! 			elog(WARNING, "bg worker [%d/%d]: caught error '%s' in %s:%d, state %s",
! 				 MyProcPid, MyBackendId, errdata->message,
! 				 errdata->filename, errdata->lineno,
! 				 decode_worker_state(MyWorkerInfo->wi_state));
! 
! 			/*
! 			 * Inform the coordinator about the failure.
! 			 */
! 			bgworker_job_failed(errdata->sqlerrcode);
! 
! 			if (errdata->sqlerrcode == ERRCODE_QUERY_CANCELED)
! 			{
! #ifdef DEBUG_CSET_APPL
! 				elog(DEBUG3, "bg worker [%d/%d]: cancelled active job.",
! 					 MyProcPid, MyBackendId);
! #endif
! 
! 				bgworker_reset();
! 			}
! 			else
! 			{
! 				elog(WARNING, "bg worker [%s:%d]: unexpected error %d: '%s'!\n"
! 					 "    triggered from %s:%d (in %s)\n",
! 					 __FILE__, __LINE__, errdata->sqlerrcode,
! 					 errdata->message, errdata->filename, errdata->lineno,
! 					 errdata->funcname);
! 				/* re-throw the error, so the backend quits */
! 				MemoryContextSwitchTo(ecxt);
! 				PG_RE_THROW();
! 			}
  		}
+ 		PG_END_TRY();
  	}
  
+ 
  	/* All done, go away */
  	ereport(DEBUG1, (errmsg("bg worker [%d/%d]: terminating",
  							MyProcPid, MyBackendId)));
*************** FreeWorkerInfo(int code, Datum arg)
*** 1769,1779 ****
  static void
  FreeWorkerInfo(int code, Datum arg)
  {
  	if (MyWorkerInfo != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 		SHMQueueDelete(&MyWorkerInfo->wi_links);
  		MyWorkerInfo->wi_links.next = (SHM_QUEUE *) AutoVacuumShmem->av_freeWorkers;
  		MyWorkerInfo->wi_dboid = InvalidOid;
  		MyWorkerInfo->wi_tableoid = InvalidOid;
--- 2465,2478 ----
  static void
  FreeWorkerInfo(int code, Datum arg)
  {
+ 	co_database *codb;
  	if (MyWorkerInfo != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 		if (!SHMQueueIsDetached(&MyWorkerInfo->wi_links))
! 			SHMQueueDelete(&MyWorkerInfo->wi_links);
! 
  		MyWorkerInfo->wi_links.next = (SHM_QUEUE *) AutoVacuumShmem->av_freeWorkers;
  		MyWorkerInfo->wi_dboid = InvalidOid;
  		MyWorkerInfo->wi_tableoid = InvalidOid;
*************** FreeWorkerInfo(int code, Datum arg)
*** 1786,1791 ****
--- 2485,2497 ----
  		/* not mine anymore */
  		MyWorkerInfo = NULL;
  
+ 		/* decrease the conn count */
+ 		LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ 		codb = hash_search(co_databases, &MyDatabaseId, HASH_FIND, NULL);
+ 		Assert(codb);
+ 		codb->codb_num_connected_workers--;
+ 		LWLockRelease(CoordinatorDatabasesLock);
+ 
  		LWLockRelease(AutovacuumLock);
  	}
  }
*************** get_database_list(void)
*** 1910,1917 ****
  	StartTransactionCommand();
  	(void) GetTransactionSnapshot();
  
! 	/* Allocate our results in AutovacMemCxt, not transaction context */
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  	rel = heap_open(DatabaseRelationId, AccessShareLock);
  	scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
--- 2616,2623 ----
  	StartTransactionCommand();
  	(void) GetTransactionSnapshot();
  
! 	/* Allocate our results in AutovacLauncherMemCxt, not transaction context */
! 	MemoryContextSwitchTo(AutovacLauncherMemCxt);
  
  	rel = heap_open(DatabaseRelationId, AccessShareLock);
  	scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
*************** get_database_list(void)
*** 1919,1925 ****
  	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
  	{
  		Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
! 		avw_dbase  *avdb;
  
  		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
--- 2625,2631 ----
  	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
  	{
  		Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
! 		avw_dbase *avdb;
  
  		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
*************** do_autovacuum(void)
*** 2004,2011 ****
  
  	ReleaseSysCache(tuple);
  
  	/* StartTransactionCommand changed elsewhere */
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  	/* The database hash where pgstat keeps shared relations */
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
--- 2710,2721 ----
  
  	ReleaseSysCache(tuple);
  
+ 	ereport(DEBUG1,
+ 			(errmsg("autovacuum: processing database \"%s\"",
+ 					NameStr(dbForm->datname))));
+ 
  	/* StartTransactionCommand changed elsewhere */
! 	MemoryContextSwitchTo(AutovacWorkerMemCxt);
  
  	/* The database hash where pgstat keeps shared relations */
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
*************** do_autovacuum(void)
*** 2219,2225 ****
  	 * create a memory context to act as fake PortalContext, so that the
  	 * contexts created in the vacuum code are cleaned up for each table.
  	 */
! 	PortalContext = AllocSetContextCreate(AutovacMemCxt,
  										  "Autovacuum Portal",
  										  ALLOCSET_DEFAULT_INITSIZE,
  										  ALLOCSET_DEFAULT_MINSIZE,
--- 2929,2935 ----
  	 * create a memory context to act as fake PortalContext, so that the
  	 * contexts created in the vacuum code are cleaned up for each table.
  	 */
! 	PortalContext = AllocSetContextCreate(AutovacWorkerMemCxt,
  										  "Autovacuum Portal",
  										  ALLOCSET_DEFAULT_INITSIZE,
  										  ALLOCSET_DEFAULT_MINSIZE,
*************** do_autovacuum(void)
*** 2291,2297 ****
  		 * that somebody just finished vacuuming this table.  The window to
  		 * the race condition is not closed but it is very small.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		tab = table_recheck_autovac(relid, table_toast_map, pg_class_desc);
  		if (tab == NULL)
  		{
--- 3001,3007 ----
  		 * that somebody just finished vacuuming this table.  The window to
  		 * the race condition is not closed but it is very small.
  		 */
! 		MemoryContextSwitchTo(AutovacWorkerMemCxt);
  		tab = table_recheck_autovac(relid, table_toast_map, pg_class_desc);
  		if (tab == NULL)
  		{
*************** AutoVacuumShmemSize(void)
*** 2874,2885 ****
  	Size		size;
  
  	/*
! 	 * Need the fixed struct and the array of WorkerInfoData.
  	 */
  	size = sizeof(AutoVacuumShmemStruct);
  	size = MAXALIGN(size);
  	size = add_size(size, mul_size(autovacuum_max_workers,
  								   sizeof(WorkerInfoData)));
  	return size;
  }
  
--- 3584,3600 ----
  	Size		size;
  
  	/*
! 	 * Need the fixed struct and the array of WorkerInfoData, plus per
! 	 * database entries in a hash. As we only track databases which have at
! 	 * least one worker attached, we won't ever need more than
! 	 * autovacuum_max_workers entries.
  	 */
  	size = sizeof(AutoVacuumShmemStruct);
  	size = MAXALIGN(size);
  	size = add_size(size, mul_size(autovacuum_max_workers,
  								   sizeof(WorkerInfoData)));
+ 	size = add_size(size, hash_estimate_size(autovacuum_max_workers,
+ 											 sizeof(co_database)));
  	return size;
  }
  
*************** AutoVacuumShmemInit(void)
*** 2890,2895 ****
--- 3605,3611 ----
  void
  AutoVacuumShmemInit(void)
  {
+ 	HASHCTL     hctl;
  	bool		found;
  
  	AutoVacuumShmem = (AutoVacuumShmemStruct *)
*************** AutoVacuumShmemInit(void)
*** 2921,2926 ****
--- 3637,3651 ----
  	}
  	else
  		Assert(found);
+ 
+ 	hctl.keysize = sizeof(Oid);
+ 	hctl.entrysize = sizeof(co_database);
+ 	hctl.hash = oid_hash;
+ 	co_databases = ShmemInitHash("Coordinator Database Info",
+ 								 autovacuum_max_workers,
+ 								 autovacuum_max_workers,
+ 								 &hctl,
+ 								 HASH_ELEM | HASH_FUNCTION);
  }
  
  /*
============================================================
*** src/include/postmaster/autovacuum.h	9d1ae53662236fcc871f84a9b1036619931bffe6
--- src/include/postmaster/autovacuum.h	0937b36c82e931677a8fed1cdd638f0138155086
***************
*** 14,22 ****
--- 14,91 ----
  #ifndef AUTOVACUUM_H
  #define AUTOVACUUM_H
  
+ #include "postgres.h"
+ #include "pgstat.h"
+ #include "lib/dllist.h"
  #include "storage/imsg.h"
  #include "storage/lock.h"
  
+ /*
+  * Valid backend states for background workers.
+  */
+ typedef enum
+ {
+ 	WS_IDLE = 'I',
+ 
+ 	WS_AUTOVACUUM = 'V',
+ 
+ } worker_state;
+ 
+ #define IsIdleWorker(wi)			(IsAutoVacuumWorkerProcess() && (wi->wi_state == WS_IDLE))
+ #define IsAutoVacuumWorker(wi)      (IsAutoVacuumWorkerProcess() && (wi->wi_state == WS_AUTOVACUUM))
+ 
+ 
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_links			entry into free list or running list
+  * wi_dboid			OID of the database this worker is supposed to work on
+  * wi_backend_id	id of the running worker backend, NULL if not started
+  * wi_launchtime	Time at which this worker was launched
+  *
+  * wi_tableoid		OID of the table currently being vacuumed
+  * wi_cost_*		Vacuum cost-based delay parameters current in this worker
+  *
+  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+  * protected by AutovacuumScheduleLock (which is read-only for everyone except
+  * that worker itself).
+  *-------------
+  */
+ typedef struct WorkerInfoData
+ {
+ 	SHM_QUEUE	wi_links;
+ 	Oid			wi_dboid;
+ 	BackendId   wi_backend_id;
+ 	TimestampTz wi_launchtime;
+ 	worker_state wi_state;
+ 
+ 	/* autovacuum specific fields */
+ 	Oid			wi_tableoid;
+ 	int			wi_cost_delay;
+ 	int			wi_cost_limit;
+ 	int			wi_cost_limit_base;
+ } WorkerInfoData;
+ 
+ typedef struct WorkerInfoData *WorkerInfo;
+ 
+ /* struct to keep track of databases in the coordinator */
+ typedef struct co_database
+ {
+ 	Oid					codb_dboid;
+ 
+ 	/* for internal use by the coordinator */
+ 	int                 codb_num_cached_jobs;
+ 	Dllist              codb_cached_jobs;
+ 
+ 	/* tracking of idle workers */
+ 	int				    codb_num_idle_workers;
+ 	SHM_QUEUE           codb_idle_workers;
+ 
+ 	int                 codb_num_connected_workers;
+ } co_database;
+ 
  /* GUC variables */
  extern bool autovacuum_enabled;
  extern int	autovacuum_max_workers;
*************** extern int	Log_autovacuum_min_duration;
*** 33,38 ****
--- 102,109 ----
  
  extern int	Log_autovacuum_min_duration;
  
+ extern char *decode_worker_state(worker_state state);
+ 
  /* Status inquiry functions */
  extern bool IsAutoVacuumLauncherProcess(void);
  extern bool IsAutoVacuumWorkerProcess(void);
*************** extern void AutoVacuumShmemInit(void);
*** 57,60 ****
--- 128,135 ----
  extern Size AutoVacuumShmemSize(void);
  extern void AutoVacuumShmemInit(void);
  
+ /* bgworker job management functions */
+ extern void bgworker_job_failed(int errcode);
+ extern void bgworker_reset(void);
+ 
  #endif   /* AUTOVACUUM_H */
============================================================
*** src/backend/storage/ipc/imsg.c	81f58ac6067b9e4ebfaaf1e8ada1df68a20e8493
--- src/backend/storage/ipc/imsg.c	c9dbb1c2c90a6b1a5019b4edb42c682f92dac469
*************** decode_imessage_type(const IMessageType 
*** 82,87 ****
--- 82,89 ----
  
  		case IMSGT_REGISTER_WORKER:
  			return "IMSGT_REGISTER_WORKER";
+ 		case IMSGT_TERM_WORKER:
+ 			return "IMSGT_TERM_WORKER";
  		case IMSGT_READY:
  			return "IMSGT_READY";
  
============================================================
*** src/include/storage/imsg.h	33f7d6772ba8d599a4eed0b882ccfa474a5cf258
--- src/include/storage/imsg.h	036c47ec0fee900cdff2ab7a5264d5396eed89dd
*************** typedef enum
*** 33,38 ****
--- 33,39 ----
  	 * registration and job management.
  	 */
  	IMSGT_REGISTER_WORKER = 'W',
+ 	IMSGT_TERM_WORKER = 'M',
  	IMSGT_READY = 'r',
  
  	/* inform the coordinator about a database that needs vacuuming to