*** src/backend/storage/ipc/shmqueue.c	2fbe744d1fc4b2a45f1eaa1704276a23edd3f542
--- src/backend/storage/ipc/shmqueue.c	de5c363e57d6086b863eb3b580f235e4dec92fd9
*************** SHMQueueInit(SHM_QUEUE *queue)
*** 44,57 ****
   * SHMQueueIsDetached -- TRUE if element is not currently
   *		in a queue.
   */
- #ifdef NOT_USED
  bool
  SHMQueueIsDetached(SHM_QUEUE *queue)
  {
  	Assert(ShmemAddrIsValid(queue));
  	return (queue->prev == NULL);
  }
- #endif
  
  /*
   * SHMQueueElemInit -- clear an element's links
--- 44,55 ----
============================================================
*** src/backend/tcop/postgres.c	62792c84c23e8b39d78a49f4113cdea2828c0cb7
--- src/backend/tcop/postgres.c	778b93a7067312d73d4c68561d8fa23476c0a292
*************** ProcessInterrupts(void)
*** 2936,2947 ****
  		}
  		if (IsAutoVacuumWorkerProcess())
  		{
  			ImmediateInterruptOK = false;		/* not idle anymore */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
  			ereport(ERROR,
  					(errcode(ERRCODE_QUERY_CANCELED),
! 					 errmsg("canceling autovacuum task")));
  		}
  		if (RecoveryConflictPending)
  		{
--- 2936,2948 ----
  		}
  		if (IsAutoVacuumWorkerProcess())
  		{
+ 			/* FIXME: make sure this works fine for all background jobs */
  			ImmediateInterruptOK = false;		/* not idle anymore */
  			DisableNotifyInterrupt();
  			DisableCatchupInterrupt();
  			ereport(ERROR,
  					(errcode(ERRCODE_QUERY_CANCELED),
! 					 errmsg("bg worker [%d/%d]: canceling autovacuum task", MyProcPid, MyBackendId)));
  		}
  		if (RecoveryConflictPending)
  		{
============================================================
*** src/backend/utils/init/globals.c	0c8c71495665f7b1077f5c3f3e5a02c98c92be3a
--- src/backend/utils/init/globals.c	626dcd5939ff8882d155015f0aecae78e2945728
*************** volatile uint32 CritSectionCount = 0;
*** 31,36 ****
--- 31,37 ----
  volatile bool ImmediateInterruptOK = false;
  volatile uint32 InterruptHoldoffCount = 0;
  volatile uint32 CritSectionCount = 0;
+ volatile bool BgWorkerCleanupInProgress = false;
  
  int			MyProcPid;
  pg_time_t	MyStartTime;
============================================================
*** src/include/storage/shmem.h	068b4aff617ae86bce155915e1a8137cb16337c0
--- src/include/storage/shmem.h	8a0aa9faddf9bdc9bd1cdf7032ca89f9ce52bcc2
*************** extern void SHMQueueInit(SHM_QUEUE *queu
*** 119,124 ****
--- 119,125 ----
   * prototypes for functions in shmqueue.c
   */
  extern void SHMQueueInit(SHM_QUEUE *queue);
+ extern bool SHMQueueIsDetached(SHM_QUEUE *queue);
  extern void SHMQueueElemInit(SHM_QUEUE *queue);
  extern void SHMQueueDelete(SHM_QUEUE *queue);
  extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
============================================================
*** src/include/miscadmin.h	726d181c386b7dbd5202e3b19819876f7dadc0f7
--- src/include/miscadmin.h	a1e372be7cde09b2aeecb5d6ba5c59003fd0e831
*************** extern PGDLLIMPORT volatile uint32 CritS
*** 75,80 ****
--- 75,82 ----
  extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount;
  extern PGDLLIMPORT volatile uint32 CritSectionCount;
  
+ extern volatile bool BgWorkerCleanupInProgress;
+ 
  /* in tcop/postgres.c */
  extern void ProcessInterrupts(void);
  
============================================================
*** src/include/storage/lwlock.h	3a8cf5b9c3534985e33cac96819ca385878887d2
--- src/include/storage/lwlock.h	2aea3a2fcddca3a9f9eea62f65d08930d221beb5
*************** typedef enum LWLockId
*** 66,71 ****
--- 66,72 ----
  	AddinShmemInitLock,
  	AutovacuumLock,
  	AutovacuumScheduleLock,
+ 	CoordinatorDatabasesLock,
  	SyncScanLock,
  	RelationMappingLock,
  	AsyncCtlLock,
============================================================
*** src/backend/storage/ipc/procarray.c	62107b3a36554510ce82208eaa98cec1b40f3e14
--- src/backend/storage/ipc/procarray.c	903e6def62dfd4119752e20cb16711b66fdf6996
*************** BackendPidGetProc(int pid)
*** 1652,1657 ****
--- 1652,1692 ----
  }
  
  /*
+  * BackendIdGetProc -- get a backend's PGPROC given its backendId
+  *
+  * Returns NULL if not found.  Note that it is up to the caller to be
+  * sure that the question remains meaningful for long enough for the
+  * answer to be used ...
+  */
+ PGPROC *
+ BackendIdGetProc(BackendId id)
+ {
+ 	PGPROC	   *result = NULL;
+ 	ProcArrayStruct *arrayP = procArray;
+ 	int			index;
+ 
+ 	if (id == InvalidBackendId)
+ 		return NULL;
+ 
+ 	LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 	for (index = 0; index < arrayP->numProcs; index++)
+ 	{
+ 		PGPROC	   *proc = arrayP->procs[index];
+ 
+ 		if (proc->backendId == id)
+ 		{
+ 			result = proc;
+ 			break;
+ 		}
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
+ 	return result;
+ }
+ 
+ /*
   * BackendXidGetPid -- get a backend's pid given its XID
   *
   * Returns 0 if not found or it's a prepared transaction.  Note that
============================================================
*** src/include/storage/procarray.h	dd1cc5e6251c6a4d901434e91504b134e9e2b1dc
--- src/include/storage/procarray.h	2b020c22a35cecbbf0da861c0699d3546dfa9bac
*************** extern PGPROC *BackendPidGetProc(int pid
*** 52,57 ****
--- 52,58 ----
  extern bool HaveTransactionsInCommit(TransactionId *xids, int nxids);
  
  extern PGPROC *BackendPidGetProc(int pid);
+ extern PGPROC *BackendIdGetProc(BackendId id);
  extern int	BackendXidGetPid(TransactionId xid);
  extern bool IsBackendPid(int pid);
  
============================================================
*** src/backend/postmaster/autovacuum.c	3b9cbe9ba0d576b32d490f21969b073a7ec55b98
--- src/backend/postmaster/autovacuum.c	50ad2e93982867e91a47e8ca7af5f4be8b975d8f
***************
*** 86,94 ****
--- 86,97 ----
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
  #include "storage/proc.h"
+ #include "storage/procarray.h"
  #include "storage/procsignal.h"
+ #include "storage/shmem.h"
  #include "storage/sinvaladt.h"
  #include "tcop/tcopprot.h"
+ #include "utils/builtins.h"
  #include "utils/fmgroids.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
*************** static int	default_freeze_table_age;
*** 139,147 ****
  static int	default_freeze_min_age;
  static int	default_freeze_table_age;
  
! /* Memory context for long-lived data */
! static MemoryContext AutovacMemCxt;
  
  /* struct to keep track of databases in launcher */
  typedef struct avl_dbase
  {
--- 142,155 ----
  static int	default_freeze_min_age;
  static int	default_freeze_table_age;
  
! /* Memory contexts for long-lived data */
! static MemoryContext AutovacLauncherMemCxt;
! static MemoryContext AutovacWorkerMemCxt;
  
+ /* job handling routines */
+ static void bgworker_job_initialize(worker_state new_state);
+ static void bgworker_job_completed(void);
+ 
  /* struct to keep track of databases in launcher */
  typedef struct avl_dbase
  {
*************** typedef struct avw_dbase
*** 153,164 ****
  /* struct to keep track of databases in worker */
  typedef struct avw_dbase
  {
! 	Oid			adw_datid;
  	char	   *adw_name;
  	TransactionId adw_frozenxid;
  	PgStat_StatDBEntry *adw_entry;
  } avw_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
  {
--- 161,177 ----
  /* struct to keep track of databases in worker */
  typedef struct avw_dbase
  {
! 	Oid			adw_datid;		/* hash key -- must be first */
  	char	   *adw_name;
  	TransactionId adw_frozenxid;
  	PgStat_StatDBEntry *adw_entry;
  } avw_dbase;
  
+ typedef struct cached_job {
+ 	Dlelem cj_links;
+ 	IMessage *cj_msg;
+ } cached_job;
+ 
  /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
  typedef struct av_relation
  {
*************** typedef struct autovac_table
*** 186,222 ****
  } autovac_table;
  
  /*-------------
-  * This struct holds information about a single worker's whereabouts.  We keep
-  * an array of these in shared memory, sized according to
-  * autovacuum_max_workers.
-  *
-  * wi_links			entry into free list or running list
-  * wi_dboid			OID of the database this worker is supposed to work on
-  * wi_tableoid		OID of the table currently being vacuumed
-  * wi_backend_id	id of the running worker backend, NULL if not started
-  * wi_launchtime	Time at which this worker was launched
-  * wi_cost_*		Vacuum cost-based delay parameters current in this worker
-  *
-  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
-  * protected by AutovacuumScheduleLock (which is read-only for everyone except
-  * that worker itself).
-  *-------------
-  */
- typedef struct WorkerInfoData
- {
- 	SHM_QUEUE	wi_links;
- 	Oid			wi_dboid;
- 	Oid			wi_tableoid;
- 	BackendId	wi_backend_id;
- 	TimestampTz wi_launchtime;
- 	int			wi_cost_delay;
- 	int			wi_cost_limit;
- 	int			wi_cost_limit_base;
- } WorkerInfoData;
- 
- typedef struct WorkerInfoData *WorkerInfo;
- 
- /*-------------
   * The main autovacuum shmem struct.  On shared memory we store this main
   * struct and the array of WorkerInfo structs.	This struct keeps:
   *
--- 199,204 ----
*************** static AutoVacuumShmemStruct *AutoVacuum
*** 240,253 ****
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  /* the database list in the launcher, and the context that contains it */
  static Dllist *DatabaseList = NULL;
  static MemoryContext DatabaseListCxt = NULL;
  
  /* Pointer to my own WorkerInfo, valid on each worker */
  static WorkerInfo MyWorkerInfo = NULL;
  
- 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 222,241 ----
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ /*
+  * Table of databases with at least one connected worker, resides in shared
+  * memory, protected by CoordinatorDatabasesLock
+  */
+ static HTAB *co_databases = NULL;
+ 
  /* the database list in the launcher, and the context that contains it */
  static Dllist *DatabaseList = NULL;
  static MemoryContext DatabaseListCxt = NULL;
  
  /* Pointer to my own WorkerInfo, valid on each worker */
  static WorkerInfo MyWorkerInfo = NULL;
+ static WorkerInfo terminatable_worker = NULL;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** NON_EXEC_STATIC void AutoVacLauncherMain
*** 256,273 ****
  static void handle_imessage(IMessage *msg);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void CoordinatorMaybeTriggerAutoVacuum(void);
  
! static void do_start_worker(avw_dbase *avdb);
! static Oid	do_start_autovacuum_worker(void);
  static void launcher_determine_sleep(bool can_launch, bool recursing,
  									 struct timeval *nap);
! static void launch_worker(TimestampTz now);
  static List *get_database_list(void);
  static void rebuild_database_list(Oid newdb);
  static int	db_comparator(const void *a, const void *b);
  static void autovac_balance_cost(void);
  
  static void do_autovacuum(void);
  static void FreeWorkerInfo(int code, Datum arg);
  
--- 244,278 ----
  static void handle_imessage(IMessage *msg);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void init_co_database(co_database *codb);
! static co_database *get_co_database(Oid dboid);
! static void populate_co_databases(void);
  
! static bool can_deliver_cached_job(co_database *codb, IMessage *msg,
!                                    BackendId *target);
! static WorkerInfo get_idle_worker(co_database *codb);
! static void cache_job(IMessage *msg, co_database *codb);
! static void forward_job(IMessage *msg, co_database *codb, BackendId backend_id);
! static void dispatch_job(IMessage *msg, co_database *codb);
! static void process_cached_jobs(co_database *codb);
! 
! static bool CoordinatorCanLaunchWorker(TimestampTz current_time);
! static void manage_workers(bool can_launch);
! static void autovacuum_maybe_trigger_job(TimestampTz current_time,
! 										 bool can_launch);
! 
! static void do_start_worker(Oid dboid);
! static Oid autovacuum_select_database(void);
  static void launcher_determine_sleep(bool can_launch, bool recursing,
  									 struct timeval *nap);
! static void autovacuum_update_timing(Oid dbid, TimestampTz now);
  static List *get_database_list(void);
  static void rebuild_database_list(Oid newdb);
  static int	db_comparator(const void *a, const void *b);
  static void autovac_balance_cost(void);
  
+ static void add_as_idle_worker(Oid dbid, bool inc_connected_count);
+ 
  static void do_autovacuum(void);
  static void FreeWorkerInfo(int code, Datum arg);
  
*************** static void autovac_refresh_stats(void);
*** 293,298 ****
--- 298,316 ----
  
  
  
+ char *
+ decode_worker_state(worker_state state)
+ {
+ 	switch (state)
+ 	{
+ 		case WS_IDLE: return "WS_IDLE";
+ 		case WS_AUTOVACUUM: return "WS_AUTOVACUUM";
+ 
+ 		default: return "UNKNOWN STATE";
+ 	}
+ }
+ 
+ 
  /********************************************************************
   *					  AUTOVACUUM LAUNCHER CODE
   ********************************************************************/
*************** StartAutoVacLauncher(void)
*** 369,375 ****
--- 387,586 ----
  	return 0;
  }
  
+ void
+ init_co_database(co_database *codb)
+ {
+ 	Assert(ShmemAddrIsValid(codb));
+ 	SHMQueueInit(&codb->codb_idle_workers);
+ 	codb->codb_num_idle_workers = 0;
+ 
+ 	/*
+ 	 * While only the coordinator may fiddle with this list, as its entries
+ 	 * reside in that process' memory, it's safe to set the counters to 0
+ 	 * and initialize the list headers with NULL values using DLInitList().
+ 	 */
+ 	codb->codb_num_cached_jobs = 0;
+ 	DLInitList(&codb->codb_cached_jobs);
+ 
+ 	codb->codb_num_connected_workers = 0;
+ }
+ 
+ static void
+ cache_job(IMessage *msg, co_database *codb)
+ {
+ 	cached_job *job;
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: caching job of type %s for database %d",
+ 		 decode_imessage_type(msg->type), codb->codb_dboid);
+ #endif
+ 
+ 	job = palloc(sizeof(cached_job));
+ 	DLInitElem(&job->cj_links, job);
+ 	job->cj_msg = msg;
+ 	DLAddTail(&codb->codb_cached_jobs, &job->cj_links);
+ 	codb->codb_num_cached_jobs++;
+ }
+ 
  /*
+  * get_idle_worker
+  *
+  * Returns the first idle worker for a given database, removing it from its
+  * list of idle workers. The caller is expected to make sure that there is
+  * at least one idle worker and it must hold the CoordinatorDatabasesLock.
+  */
+ static WorkerInfo
+ get_idle_worker(co_database *codb)
+ {
+ 	WorkerInfo worker;
+ 
+ 	/* remove a worker from the list of idle workers */
+ 	worker = (WorkerInfo) SHMQueueNext(&codb->codb_idle_workers,
+ 									   &codb->codb_idle_workers,
+ 									   offsetof(WorkerInfoData, wi_links));
+ 	Assert(worker);
+ 	SHMQueueDelete(&worker->wi_links);
+ 	Assert(worker->wi_backend_id != InvalidBackendId);
+ 
+ 	/* maintain per-database counter */
+ 	codb->codb_num_idle_workers--;
+ 
+ 	return worker;
+ }
+ 
+ /*
+  * forward_job
+  *
+  * Takes an imessage and forwards it to the first idle backend for the given
+  * database as its next job to process. The caller must hold the
+  * CoordinatorDatabasesLock.
+  */
+ static void
+ forward_job(IMessage *msg, co_database *codb, BackendId backend_id)
+ {
+ 	/* various actions before job delivery depending on the message type */
+ 	switch (msg->type)
+ 	{
+ 		case IMSGT_TERM_WORKER:
+ 			break;
+ 
+ 		case IMSGT_PERFORM_VACUUM:
+ #ifdef COORDINATOR_DEBUG
+ 			elog(DEBUG1, "Coordinator: delivering msg %s of size %d for "
+ 				 "database %d to backend %d",
+ 				 decode_imessage_type(msg->type), msg->size, codb->codb_dboid,
+ 				 backend_id);
+ #endif
+ 			autovacuum_update_timing(codb->codb_dboid, GetCurrentTimestamp());
+ 			break;
+ 
+ 		default:
+ 			/* no-op */
+ 			break;
+ 	}
+ 
+ 	IMessageActivate(msg, backend_id);
+ }
+ 
+ /*
+  * dispatch_job
+  *
+  * Depending on the status of idle workers, this either forwards a job to an
+  * idle worker directly or caches it for later processing. The caller must
+  * hold the CoordinatorDatabasesLock.
+  */
+ void
+ dispatch_job(IMessage *msg, co_database *codb)
+ {
+     WorkerInfo worker;
+ 
+ 	if (codb->codb_num_idle_workers > 0)
+     {
+         worker = get_idle_worker(codb); 
+ 		forward_job(msg, codb, worker->wi_backend_id);
+     }
+ 	else
+ 		cache_job(msg, codb);
+ }
+ 
+ /*
+  * process_cached_jobs
+  *
+  * Dispatches cached jobs to idle background workers, as long as there are
+  * of both. Before delivering a job, an additional check is performed with
+  * can_deliver_cached_job(), which also chooses the background worker to run
+  * the job on.
+  */
+ static void
+ process_cached_jobs(co_database *codb)
+ {
+ 	BackendId	target;
+ 	cached_job *job;
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: cached jobs: %d, idle workers: %d",
+ 		 codb->codb_num_cached_jobs, codb->codb_num_idle_workers);
+ #endif
+ 
+ 	job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
+ 	while ((codb->codb_num_cached_jobs > 0) &&
+ 		   (codb->codb_num_idle_workers > 0) &&
+ 		   (job != NULL))
+ 	{
+ 		target = InvalidBackendId;
+ 		if (can_deliver_cached_job(codb, job->cj_msg, &target))
+ 		{
+ 			/* remove the job from the cache */
+ 			DLRemove(&job->cj_links);
+ 			codb->codb_num_cached_jobs--;
+ 			
+ 			/* forward the job to some idle worker and cleanup */
+ 			if (target == InvalidBackendId)
+ 				target = get_idle_worker(codb)->wi_backend_id;
+ 
+ 			forward_job(job->cj_msg, codb, target);
+ 			pfree(job);
+ 			
+ 			job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
+ 		}
+ 		else
+ 			job = (cached_job*) DLGetSucc(&job->cj_links);
+ 	}
+ }
+ 
+ /*
+  * populate_co_databases
+  *
+  * Called at startup of the coordinator to scan pg_database. Schedules an
+  * initial VACUUM job on the template database to populate pg_stat.
+  */
+ static void
+ populate_co_databases()
+ {
+ 	List     *dblist;
+ 	ListCell *cell;
+ 	IMessage *msg;
+ 
+ 	dblist = get_database_list();
+ 	LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ 	foreach(cell, dblist)
+ 	{
+ 		avw_dbase  *avdb = lfirst(cell);
+ 		co_database *codb = get_co_database(avdb->adw_datid);
+ 		if (codb->codb_dboid == TemplateDbOid)
+ 		{
+ 			/*
+ 			 * Create a cached job as an imessage to ourselves, but without
+ 			 * activating it. It can get forwarded to a backend later on.
+ 			 */
+ 			msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
+ 			cache_job(msg, codb);
+ 		}
+ 	}
+ 	LWLockRelease(CoordinatorDatabasesLock);
+ }
+ 
+ /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
*************** AutoVacLauncherMain(int argc, char *argv
*** 377,382 ****
--- 588,594 ----
  {
  	sigjmp_buf	local_sigjmp_buf;
  	IMessage   *msg = NULL;
+ 	bool        can_launch;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 450,461 ****
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
! 										  "Autovacuum Launcher",
! 										  ALLOCSET_DEFAULT_MINSIZE,
! 										  ALLOCSET_DEFAULT_INITSIZE,
! 										  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  	/*
  	 * If an exception is encountered, processing resumes here.
--- 662,673 ----
  	 * that we can reset the context during error recovery and thereby avoid
  	 * possible memory leaks.
  	 */
! 	AutovacLauncherMemCxt = AllocSetContextCreate(TopMemoryContext,
! 												  "Autovacuum Launcher",
! 												  ALLOCSET_DEFAULT_MINSIZE,
! 												  ALLOCSET_DEFAULT_INITSIZE,
! 												  ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(AutovacLauncherMemCxt);
  
  	/*
  	 * If an exception is encountered, processing resumes here.
*************** AutoVacLauncherMain(int argc, char *argv
*** 479,495 ****
  		EmitErrorReport();
  
  		/* Abort the current transaction in order to recover */
! 		AbortCurrentTransaction();
  
  		/*
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacMemCxt);
  
  		/* don't leave dangling pointers to freed memory */
  		DatabaseListCxt = NULL;
--- 691,707 ----
  		EmitErrorReport();
  
  		/* Abort the current transaction in order to recover */
! 		AbortOutOfAnyTransaction();
  
  		/*
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
  		 */
! 		MemoryContextSwitchTo(AutovacLauncherMemCxt);
  		FlushErrorState();
  
  		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(AutovacLauncherMemCxt);
  
  		/* don't leave dangling pointers to freed memory */
  		DatabaseListCxt = NULL;
*************** AutoVacLauncherMain(int argc, char *argv
*** 520,525 ****
--- 732,742 ----
  	AutoVacuumShmem->av_launcherid = MyBackendId;
  
  	/*
+ 	 * Initial population of the database list from pg_database
+ 	 */
+ 	populate_co_databases();
+ 
+ 	/*
  	 * Create the initial database list.  The invariant we want this list to
  	 * keep is that it's ordered by decreasing next_time.  As soon as an entry
  	 * is updated to a higher time, it will be moved to the front (which is
*************** AutoVacLauncherMain(int argc, char *argv
*** 530,536 ****
  
  	for (;;)
  	{
! 		struct timeval nap;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 747,754 ----
  
  	for (;;)
  	{
! 		TimestampTz		current_time;
! 		struct timeval	nap;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 539,546 ****
  		if (!PostmasterIsAlive(true))
  			proc_exit(1);
  
! 		launcher_determine_sleep((AutoVacuumShmem->av_freeWorkers != NULL),
! 								 false, &nap);
  
  		/* Allow sinval catchup interrupts while sleeping */
  		EnableCatchupInterrupt();
--- 757,764 ----
  		if (!PostmasterIsAlive(true))
  			proc_exit(1);
  
! 		can_launch = (AutoVacuumShmem->av_freeWorkers != NULL);
! 		launcher_determine_sleep(can_launch, false, &nap);
  
  		/* Allow sinval catchup interrupts while sleeping */
  		EnableCatchupInterrupt();
*************** AutoVacLauncherMain(int argc, char *argv
*** 647,658 ****
  			msg = IMessageCheck();
  		}
  
  		/*
  		 * Periodically check and trigger autovacuum workers, if autovacuum
  		 * is enabled.
  		 */
  		if (autovacuum_enabled)
! 			CoordinatorMaybeTriggerAutoVacuum();
  	}
  
  	/* Normal exit from the autovac launcher is here */
--- 865,881 ----
  			msg = IMessageCheck();
  		}
  
+ 		current_time  = GetCurrentTimestamp();
+ 		can_launch = CoordinatorCanLaunchWorker(current_time);
+ 
  		/*
  		 * Periodically check and trigger autovacuum workers, if autovacuum
  		 * is enabled.
  		 */
  		if (autovacuum_enabled)
! 			autovacuum_maybe_trigger_job(current_time, can_launch);
! 
! 		manage_workers(can_launch);
  	}
  
  	/* Normal exit from the autovac launcher is here */
*************** handle_imessage(IMessage *msg)
*** 666,680 ****
  void
  handle_imessage(IMessage *msg)
  {
- 	IMessageType	msg_type;
  	BackendId		msg_sender;
  
! 	msg_type = msg->type;
  	msg_sender = msg->sender;
  
  #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG3, "Coordinator: received %s from pid %d",
! 		 decode_imessage_type(msg->type), msg->sender);
  #endif
  
  	switch (msg->type)
--- 889,927 ----
  void
  handle_imessage(IMessage *msg)
  {
  	BackendId		msg_sender;
+ 	PGPROC         *proc;
+ 	TransactionId   local_xid = InvalidTransactionId;
+ 	co_database    *codb = NULL;
+ 	Oid             dboid = InvalidOid;
  
! 	/*
! 	 * Get the PGPROC entry of the sender and the related database info, if
! 	 * any.
! 	 */
  	msg_sender = msg->sender;
  
+ 	LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 
+ 	proc = BackendIdGetProc(msg_sender);
+ 	if (proc)
+ 	{
+ 		local_xid = proc->xid;
+ 		dboid = proc->databaseId;
+ 	}
+ 
+ 	LWLockRelease(ProcArrayLock);
+ 
  #ifdef COORDINATOR_DEBUG
! 	if (proc)
! 		elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
! 			 "\t(connected to db %d, local xid %d)",
! 			 decode_imessage_type(msg->type), msg->size, msg_sender,
! 			 dboid, local_xid);
! 	else
! 		elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
! 			 "\t(for which no PGPROC could be found)",
! 			 decode_imessage_type(msg->type), msg->size, msg_sender);
  #endif
  
  	switch (msg->type)
*************** handle_imessage(IMessage *msg)
*** 687,701 ****
  			/* consume the message */
  			IMessageRemove(msg);
  
  			/*
! 			 * default to immediately command the new background
! 			 * worker to perform an autovacuum step.
  			 */
! 			if (IMSGT_REGISTER_WORKER)
! 			{
! 				msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
! 				IMessageActivate(msg, msg_sender);
! 			}
  
  			/*
  			 * Rebalance cost limits, as the worker has already reported its
--- 934,952 ----
  			/* consume the message */
  			IMessageRemove(msg);
  
+ 			LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
+ 			codb = get_co_database(dboid);
+ 			process_cached_jobs(codb);
+ 			LWLockRelease(CoordinatorDatabasesLock);
+ 
  			/*
! 			 * We trigger a DatabaseList rebuild if it is still empty and
! 			 * after a job is done. This mainly covers the initialization
! 			 * phase after the first background worker is done with vacuuming
! 			 * template1 (and thus having populated pgstat).
  			 */
! 			if (DLGetHead(DatabaseList) == NULL)
! 				rebuild_database_list(InvalidOid);
  
  			/*
  			 * Rebalance cost limits, as the worker has already reported its
*************** handle_imessage(IMessage *msg)
*** 714,735 ****
  			IMessageRemove(msg);
  
  			/* trigger an autovacuum worker */
! 			do_start_autovacuum_worker();
! 
  			break;
  
  		default:
! 			elog(WARNING, "Unknown message type: %c, ignored!", msg->type);
  			IMessageRemove(msg);
  	}
  }
  
  static void
! CoordinatorMaybeTriggerAutoVacuum()
  {
! 	TimestampTz current_time = 0;
  	bool		can_launch;
- 	Dlelem	   *elem;
  
  	/* FIXME: indentation */
  	{
--- 965,1170 ----
  			IMessageRemove(msg);
  
  			/* trigger an autovacuum worker */
! 			dboid = autovacuum_select_database();
! 			if (dboid != InvalidOid)
! 			{
! 				LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
! 				codb = get_co_database(dboid);
! 				msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
! 				dispatch_job(msg, codb);
! 				LWLockRelease(CoordinatorDatabasesLock);
! 			}
  			break;
  
  		default:
! 			elog(WARNING, "Coordinator: unknown message type: %c, ignored!",
! 				 msg->type);
  			IMessageRemove(msg);
  	}
  }
  
+ 
+ 
+ /*
+  * get_co_database
+  *
+  * Gets or creates the database info for replication in shared memory.
+  * Expects the caller to have the CoordinatorDatabasesLock.
+  */
+ co_database *
+ get_co_database(Oid dboid)
+ {
+     co_database *codb;
+     bool found;
+ 
+     codb = hash_search(co_databases, &dboid, HASH_ENTER, &found);
+     if (!found)
+         init_co_database(codb);
+ 
+     return codb;
+ }
+ 
+ static bool
+ can_deliver_cached_job(co_database *codb, IMessage *msg, BackendId *target)
+ {
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: checking deliverability of job type %s",
+ 		 decode_imessage_type(msg->type));
+ #endif
+ 
+ 	switch (msg->type)
+ 	{
+ 		case IMSGT_TERM_WORKER:
+ 		case IMSGT_PERFORM_VACUUM:
+ 			return true;
+ 
+ 		default:
+ 			elog(WARNING, "Coordinator: missing deliverability check for "
+ 				 "message type %s", decode_imessage_type(msg->type));
+ 			return false;
+ 	}
+ }
+ 
+ /*
+  * manage_workers
+  *
+  * Starts background workers for databases which have at least one cached
+  * job or which have less than min_background_workers connected. Within the
+  * same loop, the autovacuum_max_workers is checked and terminates a worker
+  * accordingly.
+  * 
+  * Note that at max one worker can be requested to start or stop per
+  * invocation.
+  */
  static void
! manage_workers(bool can_launch)
  {
! 	HASH_SEQ_STATUS			hash_status;
! 	co_database	           *codb;
! 	Oid                     launch_dboid = InvalidOid;
! 	float                   max_score = 0.0,
! 		                    score;
! 	bool                    worker_slots_available;
! 	int                     idle_workers_required;
! 	int                     job_workers_required;
! 
! 	LWLockAcquire(AutovacuumLock, LW_SHARED);
! 	worker_slots_available = (AutoVacuumShmem->av_freeWorkers != NULL);
! 	LWLockRelease(AutovacuumLock);
! 
! 	/*
! 	 * Terminate an unneeded worker that has been fetched from the list of
! 	 * idle workers in the last invocation. We defer sending the signal one
! 	 * invocation to make sure the coordinator had time to handle all
! 	 * pending messages from that worker. As idle workers don't ever send
! 	 * messages, we can safely assume there is no pending message from that
! 	 * worker by now.
! 	 */
! 	if (terminatable_worker != NULL)
! 	{
! 		IMessage *msg;
! 
! #ifdef COORDINATOR_DEBUG
! 		PGPROC *proc = BackendIdGetProc(terminatable_worker->wi_backend_id);
! 		if (proc)
! 			elog(DEBUG3, "Coordinator: terminating worker [%d/%d].",
! 				 proc->pid, terminatable_worker->wi_backend_id);
! 		else
! 			elog(WARNING, "Coordinator: terminating worker (no PGPROC, backend %d).",
! 				 terminatable_worker->wi_backend_id);
! #endif
! 
! 		msg = IMessageCreate(IMSGT_TERM_WORKER, 0);
! 		IMessageActivate(msg, terminatable_worker->wi_backend_id);
! 
! 		terminatable_worker = NULL;
! 	}
! 
! #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG3, "Coordinator: manage_workers: can_launch: %s, slots_available: %s",
! 		 (can_launch ? "true" : "false"), (worker_slots_available ? "true" : "false"));
! #endif
! 
! 	/*
! 	 * Check the list of databases and fire the first pending request
! 	 * we find.
! 	 */
! 	idle_workers_required = 0;
! 	job_workers_required = 0;
! 	LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
! 	hash_seq_init(&hash_status, co_databases);
! 	while ((codb = (co_database*) hash_seq_search(&hash_status)))
! 	{
! 		score = ((float) codb->codb_num_cached_jobs /
! 				 (float) (codb->codb_num_connected_workers + 1)) * 100.0;
! 
! 		if (codb->codb_num_idle_workers < min_spare_background_workers)
! 			score += (min_spare_background_workers -
! 					  codb->codb_num_idle_workers) * 10.0;
! 
! #ifdef COORDINATOR_DEBUG
! 		elog(DEBUG3, "Coordinator:     db %d, idle/conn: %d/%d, jobs: %d, score: %0.1f",
! 			 codb->codb_dboid, codb->codb_num_idle_workers,
! 			 codb->codb_num_connected_workers, codb->codb_num_cached_jobs,
! 			 score);
! #endif
! 
! 		if (codb->codb_num_cached_jobs &&
! 			(codb->codb_num_connected_workers == 0))
! 			job_workers_required++;
! 
! 		if (codb->codb_num_idle_workers < min_spare_background_workers)
! 			idle_workers_required += (min_spare_background_workers -
! 									  codb->codb_num_idle_workers);
! 
! 		/*
! 		 * FIXME: "misconfiguration" allows "starvation" in case the global
! 		 *        maximum is reached all with idle workers, but other dbs
! 		 *        w/o a single worker still have jobs.
! 		 */
! 		if (can_launch && ((codb->codb_num_cached_jobs > 0) ||
! 						   (codb->codb_num_idle_workers <
! 							min_spare_background_workers)))
! 		{
! 			if (can_launch && (score > max_score))
! 			{
! 				launch_dboid = codb->codb_dboid;
! 				max_score = score;
! 			}
! 		}
! 
! 		/*
! 		 * If we are above limit, we fetch an idle worker from the list
! 		 * and mark it as terminatable. Actual termination happens in
! 		 * the following invocation, see above.
! 		 */
! 		if ((terminatable_worker == NULL) &&
! 			(codb->codb_num_idle_workers > max_spare_background_workers))
! 			terminatable_worker = get_idle_worker(codb);
! 	}
! 	LWLockRelease(CoordinatorDatabasesLock);
! 
! 	if (!worker_slots_available && idle_workers_required > 0)
! 	{
! 		elog(WARNING, "Coordinator: no more background workers available, but requiring %d more, according to min_spare_background_workers.",
! 			 idle_workers_required);
! 	}
! 
! 	if (!worker_slots_available && job_workers_required > 0)
! 	{
! 		elog(WARNING, "Coordinator: no background workers avalibale, but %d databases have background jobs pending.",
! 			 job_workers_required);
! 	}
! 
! 	/* request a worker for the first database found, which needs one */
! 	if (OidIsValid(launch_dboid))
! 		do_start_worker(launch_dboid);
! }
! 
! static bool
! CoordinatorCanLaunchWorker(TimestampTz current_time)
! {
  	bool		can_launch;
  
  	/* FIXME: indentation */
  	{
*************** CoordinatorMaybeTriggerAutoVacuum()
*** 741,747 ****
  		 * other worker failed while starting up.
  		 */
  
- 		current_time = GetCurrentTimestamp();
  		LWLockAcquire(AutovacuumLock, LW_SHARED);
  
  		can_launch = (AutoVacuumShmem->av_freeWorkers != NULL);
--- 1176,1181 ----
*************** CoordinatorMaybeTriggerAutoVacuum()
*** 800,810 ****
  				can_launch = false;
  		}
  		LWLockRelease(AutovacuumLock);	/* either shared or exclusive */
  
! 		/* if we can't do anything, just return. */
! 		if (!can_launch)
! 			return;
  
  		/* We're OK to start a new worker */
  
  		elem = DLGetTail(DatabaseList);
--- 1234,1255 ----
  				can_launch = false;
  		}
  		LWLockRelease(AutovacuumLock);	/* either shared or exclusive */
+ 	}
  
! 	return can_launch;
! }
  
+ static void
+ autovacuum_maybe_trigger_job(TimestampTz current_time, bool can_launch)
+ {
+ 	Oid         dboid = InvalidOid;
+ 	Dlelem	   *elem;
+ 	IMessage   *msg;
+ 	co_database *codb;
+ 
+ 	/* FIXME: indentation */
+ 	{
+ 
  		/* We're OK to start a new worker */
  
  		elem = DLGetTail(DatabaseList);
*************** CoordinatorMaybeTriggerAutoVacuum()
*** 818,836 ****
  			 */
  			if (TimestampDifferenceExceeds(avdb->adl_next_worker,
  										   current_time, 0))
! 				launch_worker(current_time);
  		}
  		else
  		{
  			/*
! 			 * Special case when the list is empty: start a worker right away.
! 			 * This covers the initial case, when no database is in pgstats
! 			 * (thus the list is empty).  Note that the constraints in
! 			 * launcher_determine_sleep keep us from starting workers too
! 			 * quickly (at most once every autovacuum_naptime when the list is
! 			 * empty).
  			 */
! 			launch_worker(current_time);
  		}
  	}
  }
--- 1263,1301 ----
  			 */
  			if (TimestampDifferenceExceeds(avdb->adl_next_worker,
  										   current_time, 0))
! 			{
! 				dboid = autovacuum_select_database();
! 				if (OidIsValid(dboid))
! 				{
! 					LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
! 
! 					codb = get_co_database(dboid);
! 
! 					/*
! 					 * Only dispatch a job, if it can be processed immediately
! 					 * so we don't end up having lots of autovacuum requests
! 					 * in the job cache.
! 					 */
! 					if (can_launch || codb->codb_num_idle_workers > 0)
! 					{
! 						msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
! 						dispatch_job(msg, codb);
! 					}
! 
! 					LWLockRelease(CoordinatorDatabasesLock);
! 				}
! 			}
  		}
  		else
  		{
  			/*
! 			 * If the list is still empty, this is a no-op. Instead we simply
! 			 * wait until the initial vacuum on the template database is
! 			 * done. That will populate pg_stat.
  			 */
! #ifdef COORDINATOR_DEBUG
! 			elog(DEBUG5, "Coordinator: DatabaseList is still empty.");
! #endif
  		}
  	}
  }
*************** rebuild_database_list(Oid newdb)
*** 933,939 ****
  	/* use fresh stats */
  	autovac_refresh_stats();
  
! 	newcxt = AllocSetContextCreate(AutovacMemCxt,
  								   "AV dblist",
  								   ALLOCSET_DEFAULT_MINSIZE,
  								   ALLOCSET_DEFAULT_INITSIZE,
--- 1398,1404 ----
  	/* use fresh stats */
  	autovac_refresh_stats();
  
! 	newcxt = AllocSetContextCreate(AutovacLauncherMemCxt,
  								   "AV dblist",
  								   ALLOCSET_DEFAULT_MINSIZE,
  								   ALLOCSET_DEFAULT_INITSIZE,
*************** void
*** 1129,1141 ****
   * start a worker.
   */
  void
! do_start_worker(avw_dbase *avdb)
  {
  	WorkerInfo	worker;
  
  #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG5, "Coordinator: requesting worker for database %s",
! 		 avdb->adw_name);
  #endif
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
--- 1594,1607 ----
   * start a worker.
   */
  void
! do_start_worker(Oid dboid)
  {
  	WorkerInfo	worker;
  
+ 	Assert(OidIsValid(dboid));
+ 
  #ifdef COORDINATOR_DEBUG
! 	elog(DEBUG3, "Coordinator: requesting worker for database %d.", dboid);
  #endif
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
*************** do_start_worker(avw_dbase *avdb)
*** 1151,1157 ****
  
  	AutoVacuumShmem->av_freeWorkers = (WorkerInfo) worker->wi_links.next;
  
! 	worker->wi_dboid = avdb->adw_datid;
  	worker->wi_backend_id = InvalidBackendId;
  	worker->wi_launchtime = GetCurrentTimestamp();
  
--- 1617,1623 ----
  
  	AutoVacuumShmem->av_freeWorkers = (WorkerInfo) worker->wi_links.next;
  
! 	worker->wi_dboid = dboid;
  	worker->wi_backend_id = InvalidBackendId;
  	worker->wi_launchtime = GetCurrentTimestamp();
  
*************** do_start_worker(avw_dbase *avdb)
*** 1163,1188 ****
  }
  
  /*
!  * do_start_autovacuum_worker
   *
!  * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.	It fails gracefully if invoked when
!  * autovacuum_workers are already active.
   *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
   */
  static Oid
! do_start_autovacuum_worker(void)
  {
! 	List	   *dblist;
! 	ListCell   *cell;
  	TransactionId xidForceLimit;
  	bool		for_xid_wrap;
  	avw_dbase  *avdb;
  	TimestampTz current_time;
  	bool		skipit = false;
! 	Oid			retval = InvalidOid;
  	MemoryContext tmpcxt,
  				oldcxt;
  
--- 1629,1653 ----
  }
  
  /*
!  * autovacuum_select_database
   *
!  * It determines what database to work on and sets up shared memory stuff. It
!  * fails gracefully if invoked when autovacuum_workers are already active.
   *
!  * Returns a pointer to the coordinator info struct of the database that the
!  * next worker should process, or NULL if no database needs vacuuming.
   */
  static Oid
! autovacuum_select_database(void)
  {
! 	List		*dblist;
! 	ListCell    *cell;
  	TransactionId xidForceLimit;
  	bool		for_xid_wrap;
  	avw_dbase  *avdb;
  	TimestampTz current_time;
  	bool		skipit = false;
! 	Oid         retval = InvalidOid;
  	MemoryContext tmpcxt,
  				oldcxt;
  
*************** do_start_autovacuum_worker(void)
*** 1315,1324 ****
  			avdb = tmp;
  	}
  
- 	/* Found a database -- process it */
  	if (avdb != NULL)
  	{
! 		do_start_worker(avdb);
  		retval = avdb->adw_datid;
  	}
  	else if (skipit)
--- 1780,1788 ----
  			avdb = tmp;
  	}
  
  	if (avdb != NULL)
  	{
! 		/* We've found a database that needs vacuuming, return its id */
  		retval = avdb->adw_datid;
  	}
  	else if (skipit)
*************** do_start_autovacuum_worker(void)
*** 1337,1361 ****
  }
  
  /*
!  * launch_worker
   *
!  * Wrapper for starting a worker from the launcher.  Besides actually starting
!  * it, update the database list to reflect the next time that another one will
!  * need to be started on the selected database.  The actual database choice is
!  * left to do_start_autovacuum_worker.
   *
   * This routine is also expected to insert an entry into the database list if
   * the selected database was previously absent from the list.
   */
  static void
! launch_worker(TimestampTz now)
  {
! 	Oid			dbid;
! 	Dlelem	   *elem;
  
! 	dbid = do_start_autovacuum_worker();
! 	if (OidIsValid(dbid))
  	{
  		/*
  		 * Walk the database list and update the corresponding entry.  If the
  		 * database is not on the list, we'll recreate the list.
--- 1801,1824 ----
  }
  
  /*
!  * autovacuum_update_timing
   *
!  * After having started an autovacuum job, the coordinator needs to update
!  * the database list to reflect the next time that another one will need to
!  * be started on the selected database.
   *
   * This routine is also expected to insert an entry into the database list if
   * the selected database was previously absent from the list.
   */
+ 
  static void
! autovacuum_update_timing(Oid dbid, TimestampTz now)
  {
! 	Dlelem		   *elem;
  
! 	/* FIXME: indentation */
  	{
+ 
  		/*
  		 * Walk the database list and update the corresponding entry.  If the
  		 * database is not on the list, we'll recreate the list.
*************** StartAutoVacWorker(void)
*** 1492,1497 ****
--- 1955,2132 ----
  }
  
  /*
+  * add_as_idle_worker
+  *
+  * Marks the current worker as idle by adding it to the database's list of
+  * idle worker backends. The caller is expected to hold the AutovacuumLock.
+  */
+ static void
+ add_as_idle_worker(Oid dbid, bool inc_connected_count)
+ {
+ 	co_database *codb;
+ 
+ 	Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
+ 
+ 	/* Lookup the corresponding database, or create an entry for it */
+ 	LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ 	codb = get_co_database(dbid);
+ 
+ 	if (inc_connected_count)
+ 		codb->codb_num_connected_workers++;
+ 
+ 	/* add as an idle worker */
+ 	SHMQueueInsertBefore(&codb->codb_idle_workers, &MyWorkerInfo->wi_links);
+ 	codb->codb_num_idle_workers++;
+ 
+ 	LWLockRelease(CoordinatorDatabasesLock);
+ }
+ 
+ /*
+  * bgworker_job_initialize
+  *
+  * Initializes the memory contexts for a background job.
+  */
+ void
+ bgworker_job_initialize(worker_state new_state)
+ {
+ 	/*
+ 	 * Note that the coordinator is responsible for dequeuing the worker from
+ 	 * the list of idle backends, but is shall *NOT* assign a worker state,
+ 	 * we do that from the worker exclusively.
+ 	 */
+ 	Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
+ 	Assert(MyWorkerInfo->wi_state == WS_IDLE);
+ 
+ 	MyWorkerInfo->wi_state = new_state;
+ 	switch (new_state)
+ 	{
+ 		case WS_IDLE:
+ 			Assert(false);    /* use bgworker_job_completed instead */
+ 			break;
+ 		case WS_AUTOVACUUM:
+ 			set_ps_display("bg worker: autovacuum", false);
+ 			break;
+ 		default:
+ 			set_ps_display("bg worker: unknown", false);
+ 	}
+ 
+ 	/*
+ 	 * StartTransactionCommand and CommitTransactionCommand will
+ 	 * automatically switch to other contexts.  None the less we need this
+ 	 * one for other book-keeping of the various background jobs across
+ 	 * transactions, for example to keep the list of relations to vacuum.
+ 	 */
+ 	Assert(AutovacWorkerMemCxt == NULL);
+ 	AutovacWorkerMemCxt = AllocSetContextCreate(TopMemoryContext,
+ 										   "Background Worker",
+ 										   ALLOCSET_DEFAULT_MINSIZE,
+ 										   ALLOCSET_DEFAULT_INITSIZE,
+ 										   ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	MessageContext = AllocSetContextCreate(TopMemoryContext,
+ 										   "MessageContext",
+ 										   ALLOCSET_DEFAULT_MINSIZE,
+ 										   ALLOCSET_DEFAULT_INITSIZE,
+ 										   ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	MemoryContextSwitchTo(AutovacWorkerMemCxt);
+ }
+ 
+ /*
+  * bgworker_job_completed
+  *
+  * Cleans up the memory contexts used for the worker's current job and
+  * informs the coordinator.
+  */
+ void
+ bgworker_job_completed(void)
+ {
+ 	/* Notify the coordinator of the job completion. */
+ #ifdef COORDINATOR_DEBUG
+ 	ereport(DEBUG3,
+ 			(errmsg("bg worker [%d]: job completed.", MyProcPid)));
+ #endif
+ 
+ 	/* reset the worker state */
+ 	bgworker_reset();
+ }
+ 
+ void
+ bgworker_reset(void)
+ {
+ 	BackendId AutovacuumLauncherId;
+ 	IMessage *msg;
+ 
+ 	elog(DEBUG5, "bg worker [%d/%d]: resetting",
+ 		 MyProcPid, MyBackendId);
+ 
+ 	Assert(MyWorkerInfo->wi_state != WS_IDLE);
+ 	Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
+ 
+ 	/* reset the worker state */
+ 	MyWorkerInfo->wi_state = WS_IDLE;
+ 	set_ps_display("bg worker: idle", false);
+ 
+ 	/* clean up memory contexts */
+ 	Assert(AutovacWorkerMemCxt);
+ 	MemoryContextSwitchTo(TopMemoryContext);
+ 	MemoryContextDelete(AutovacWorkerMemCxt);
+ 	AutovacWorkerMemCxt = NULL;
+ 	MemoryContextDelete(MessageContext);
+ 	MessageContext = NULL;
+ 
+ 	/* Reset the the process-local cleanup handler state. */
+ 	BgWorkerCleanupInProgress = false;
+ 
+ 	/* propagate as idle worker, inform the coordinator */
+ 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 	add_as_idle_worker(MyDatabaseId, false);
+ 	LWLockRelease(AutovacuumLock);
+ 
+ 	AutovacuumLauncherId = GetAutovacuumLauncherId();
+ 	if (AutovacuumLauncherId != InvalidBackendId)
+ 	{
+ 		msg = IMessageCreate(IMSGT_READY, 0);
+ 		IMessageActivate(msg, AutovacuumLauncherId);
+ 	}
+ 	else
+ 		elog(WARNING, "bg worker [%d/%d]: no coordinator?!?",
+ 			 MyProcPid, MyBackendId);
+ }
+ 
+ void
+ bgworker_job_failed(int errcode)
+ {
+ 	TransactionId xid;
+ 
+ 	xid = GetTopTransactionIdIfAny();
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	ereport(DEBUG3,
+ 			(errmsg("bg worker [%d/%d]: job failed (xid: %d)",
+ 					MyProcPid, MyBackendId, xid)));
+ #endif
+ 
+ 	/*
+ 	 * Abort any transaction that might still be running and tell the
+ 	 * coordinator that we are ready to process the next background job.
+ 	 */
+ 	AbortOutOfAnyTransaction();
+ 
+ 	/*
+ 	 * Flush the error state.
+ 	 */
+ 	FlushErrorState();
+ 
+ 	/*
+ 	 * Make sure pgstat also considers our stat data as gone.
+ 	 */
+ 	pgstat_clear_snapshot();
+ 
+ 	Assert(!IMessageCheck());
+ }
+ 
+ /*
   * AutoVacWorkerMain
   */
  NON_EXEC_STATIC void
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1632,1641 ****
  	/* FIXME: indentation */
  	{
  
- 		/* insert into the running list */
- 		SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
- 							 &MyWorkerInfo->wi_links);
- 
  		/*
  		 * remove from the "starting" pointer, so that the launcher can start
  		 * a new worker if required
--- 2267,2272 ----
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1665,1677 ****
  		 */
  		InitPostgres(NULL, dbid, NULL, dbname);
  		SetProcessingMode(NormalProcessing);
! 		set_ps_display(dbname, false);
! 		ereport(DEBUG1,
! 				(errmsg("autovacuum: processing database \"%s\"", dbname)));
  	}
  
  	MyWorkerInfo->wi_backend_id = MyBackendId;
  
  	/* register with the coordinator */
  	if (coordinator_id != InvalidBackendId)
  	{
--- 2296,2323 ----
  		 */
  		InitPostgres(NULL, dbid, NULL, dbname);
  		SetProcessingMode(NormalProcessing);
! 		set_ps_display("bg worker: idle", false);
  	}
  
+ 	AutovacWorkerMemCxt = NULL;
+ 
  	MyWorkerInfo->wi_backend_id = MyBackendId;
+ 	MyWorkerInfo->wi_state = WS_IDLE;
  
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG3, "bg worker [%d/%d]: connected to database %d",
+ 		 MyProcPid, MyBackendId, dbid);
+ #endif
+ 
+ 	/*
+ 	 * Add as an idle worker and notify the coordinator only *after* having
+ 	 * set MyProc->databaseId in InitPostgres, so the coordinator can
+ 	 * determine which database we are connected to.
+ 	 */
+ 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 	add_as_idle_worker(dbid, true);
+ 	LWLockRelease(AutovacuumLock);
+ 
  	/* register with the coordinator */
  	if (coordinator_id != InvalidBackendId)
  	{
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1684,1695 ****
--- 2330,2354 ----
  
  	while (!terminate_worker)
  	{
+ 		PG_TRY();
+ 		{
+ 		/* FIXME: indentation */
+ 
  		CHECK_FOR_INTERRUPTS();
  
  		ImmediateInterruptOK = true;
  		pg_usleep(1000000L);
  		ImmediateInterruptOK = false;
  
+ 		/*
+ 		 * FIXME: check different ways of terminating a background worker
+ 		 *        via ProcDiePending. How about postmaster initiated
+ 		 *        restarts?
+ 		 */
+ 		if (ProcDiePending)
+ 			elog(FATAL, "bg worker [%d/%d]: Terminated via ProcDie",
+ 				 MyProcPid, MyBackendId);
+ 
  		while ((msg = IMessageCheck()) && !terminate_worker)
  		{
  #ifdef COORDINATOR_DEBUG
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 1702,1762 ****
  							MyDatabaseId)));
  #endif
  
- 			/*
- 			 * StartTransactionCommand and CommitTransactionCommand will
- 			 * automatically switch to other contexts.  None the less we need
- 			 * this one for other book-keeping of the various background
- 			 * jobs across transactions, for example to keep the list of
- 			 * relations to vacuum.
- 			 */
- 			AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
- 												  "Background Worker",
- 												  ALLOCSET_DEFAULT_MINSIZE,
- 												  ALLOCSET_DEFAULT_INITSIZE,
- 												  ALLOCSET_DEFAULT_MAXSIZE);
- 			MemoryContextSwitchTo(AutovacMemCxt);
- 
  			switch (msg->type)
  			{
  				case IMSGT_PERFORM_VACUUM:
  					/* immediately remove the message to free shared memory */
  					IMessageRemove(msg);
  
  					/* do an appropriate amount of work */
  					do_autovacuum();
  
- 
  					/*
! 					 * send an IMSGT_READY to inform the coordinator that we
! 					 * finished vacuuming.
  					 */
! 					msg = IMessageCreate(IMSGT_READY, 0);
! 					IMessageActivate(msg, GetAutovacuumLauncherId());
  
! 					/*
! 					 * for now, we always terminate the worker after an 
! 					 * autovacuum job.
! 					 */
! 					terminate_worker = true;
  					break;
  
  				default:
  					ereport(WARNING,
  							(errmsg("bg worker [%d]: invalid message type "
  									"'%c' ignored",
  									MyProcPid, msg->type)));
  
! 					/* keep shared memory clean */
! 					IMessageRemove(msg);
! 			}
  
! 			MemoryContextSwitchTo(TopMemoryContext);
! 			MemoryContextDelete(AutovacMemCxt);
  
! 			CHECK_FOR_INTERRUPTS();
  		}
  	}
  
  	/* All done, go away */
  	ereport(DEBUG1, (errmsg("bg worker [%d/%d]: terminating",
  							MyProcPid, MyBackendId)));
--- 2361,2458 ----
  							MyDatabaseId)));
  #endif
  
  			switch (msg->type)
  			{
+ 				case IMSGT_TERM_WORKER:
+ 					IMessageRemove(msg);
+ 					terminate_worker = true;
+ 					break;
+ 
  				case IMSGT_PERFORM_VACUUM:
  					/* immediately remove the message to free shared memory */
  					IMessageRemove(msg);
  
+ 					bgworker_job_initialize(WS_AUTOVACUUM);
+ 
+ 					/*
+ 					 * Add ourselves to the list of runningWorkers
+ 					 */
+ 					LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ 					SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
+ 										 &MyWorkerInfo->wi_links);
+ 					LWLockRelease(AutovacuumLock);
+ 
  					/* do an appropriate amount of work */
  					do_autovacuum();
  
  					/*
! 					 * Remove ourselves from the list of runningWorkers and
! 					 * mark as available background worker.
  					 */
! 					LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 					SHMQueueDelete(&MyWorkerInfo->wi_links);
! 					LWLockRelease(AutovacuumLock);
  
! 					bgworker_job_completed();
  					break;
  
  				default:
+ 					/* keep shared memory clean */
+ 					IMessageRemove(msg);
+ 
  					ereport(WARNING,
  							(errmsg("bg worker [%d]: invalid message type "
  									"'%c' ignored",
  									MyProcPid, msg->type)));
+ 			}
  
! 			CHECK_FOR_INTERRUPTS();
! 		}
  
! 		}
! 		PG_CATCH();
! 		{
! 			ErrorData *errdata;
! 			MemoryContext ecxt;
  
! 			ecxt = MemoryContextSwitchTo(AutovacWorkerMemCxt);
! 			errdata = CopyErrorData();
! 
! 			elog(WARNING, "bg worker [%d/%d]: caught error '%s' in %s:%d, state %s",
! 				 MyProcPid, MyBackendId, errdata->message,
! 				 errdata->filename, errdata->lineno,
! 				 decode_worker_state(MyWorkerInfo->wi_state));
! 
! 			/*
! 			 * Inform the coordinator about the failure.
! 			 */
! 			bgworker_job_failed(errdata->sqlerrcode);
! 
! 			if (errdata->sqlerrcode == ERRCODE_QUERY_CANCELED)
! 			{
! #ifdef DEBUG_CSET_APPL
! 				elog(DEBUG3, "bg worker [%d/%d]: cancelled active job.",
! 					 MyProcPid, MyBackendId);
! #endif
! 
! 				bgworker_reset();
! 			}
! 			else
! 			{
! 				elog(WARNING, "bg worker [%s:%d]: unexpected error %d: '%s'!\n"
! 					 "    triggered from %s:%d (in %s)\n",
! 					 __FILE__, __LINE__, errdata->sqlerrcode,
! 					 errdata->message, errdata->filename, errdata->lineno,
! 					 errdata->funcname);
! 				/* re-throw the error, so the backend quits */
! 				MemoryContextSwitchTo(ecxt);
! 				PG_RE_THROW();
! 			}
  		}
+ 		PG_END_TRY();
  	}
  
+ 
  	/* All done, go away */
  	ereport(DEBUG1, (errmsg("bg worker [%d/%d]: terminating",
  							MyProcPid, MyBackendId)));
*************** FreeWorkerInfo(int code, Datum arg)
*** 1769,1779 ****
  static void
  FreeWorkerInfo(int code, Datum arg)
  {
  	if (MyWorkerInfo != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 		SHMQueueDelete(&MyWorkerInfo->wi_links);
  		MyWorkerInfo->wi_links.next = (SHM_QUEUE *) AutoVacuumShmem->av_freeWorkers;
  		MyWorkerInfo->wi_dboid = InvalidOid;
  		MyWorkerInfo->wi_tableoid = InvalidOid;
--- 2465,2478 ----
  static void
  FreeWorkerInfo(int code, Datum arg)
  {
+ 	co_database *codb;
  	if (MyWorkerInfo != NULL)
  	{
  		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 		if (!SHMQueueIsDetached(&MyWorkerInfo->wi_links))
! 			SHMQueueDelete(&MyWorkerInfo->wi_links);
! 
  		MyWorkerInfo->wi_links.next = (SHM_QUEUE *) AutoVacuumShmem->av_freeWorkers;
  		MyWorkerInfo->wi_dboid = InvalidOid;
  		MyWorkerInfo->wi_tableoid = InvalidOid;
*************** FreeWorkerInfo(int code, Datum arg)
*** 1786,1791 ****
--- 2485,2497 ----
  		/* not mine anymore */
  		MyWorkerInfo = NULL;
  
+ 		/* decrease the conn count */
+ 		LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ 		codb = hash_search(co_databases, &MyDatabaseId, HASH_FIND, NULL);
+ 		Assert(codb);
+ 		codb->codb_num_connected_workers--;
+ 		LWLockRelease(CoordinatorDatabasesLock);
+ 
  		LWLockRelease(AutovacuumLock);
  	}
  }
*************** get_database_list(void)
*** 1910,1917 ****
  	StartTransactionCommand();
  	(void) GetTransactionSnapshot();
  
! 	/* Allocate our results in AutovacMemCxt, not transaction context */
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  	rel = heap_open(DatabaseRelationId, AccessShareLock);
  	scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
--- 2616,2623 ----
  	StartTransactionCommand();
  	(void) GetTransactionSnapshot();
  
! 	/* Allocate our results in AutovacLauncherMemCxt, not transaction context */
! 	MemoryContextSwitchTo(AutovacLauncherMemCxt);
  
  	rel = heap_open(DatabaseRelationId, AccessShareLock);
  	scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
*************** get_database_list(void)
*** 1919,1925 ****
  	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
  	{
  		Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
! 		avw_dbase  *avdb;
  
  		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
--- 2625,2631 ----
  	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
  	{
  		Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
! 		avw_dbase *avdb;
  
  		avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
  
*************** do_autovacuum(void)
*** 2004,2011 ****
  
  	ReleaseSysCache(tuple);
  
  	/* StartTransactionCommand changed elsewhere */
! 	MemoryContextSwitchTo(AutovacMemCxt);
  
  	/* The database hash where pgstat keeps shared relations */
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
--- 2710,2721 ----
  
  	ReleaseSysCache(tuple);
  
+ 	ereport(DEBUG1,
+ 			(errmsg("autovacuum: processing database \"%s\"",
+ 					NameStr(dbForm->datname))));
+ 
  	/* StartTransactionCommand changed elsewhere */
! 	MemoryContextSwitchTo(AutovacWorkerMemCxt);
  
  	/* The database hash where pgstat keeps shared relations */
  	shared = pgstat_fetch_stat_dbentry(InvalidOid);
*************** do_autovacuum(void)
*** 2219,2225 ****
  	 * create a memory context to act as fake PortalContext, so that the
  	 * contexts created in the vacuum code are cleaned up for each table.
  	 */
! 	PortalContext = AllocSetContextCreate(AutovacMemCxt,
  										  "Autovacuum Portal",
  										  ALLOCSET_DEFAULT_INITSIZE,
  										  ALLOCSET_DEFAULT_MINSIZE,
--- 2929,2935 ----
  	 * create a memory context to act as fake PortalContext, so that the
  	 * contexts created in the vacuum code are cleaned up for each table.
  	 */
! 	PortalContext = AllocSetContextCreate(AutovacWorkerMemCxt,
  										  "Autovacuum Portal",
  										  ALLOCSET_DEFAULT_INITSIZE,
  										  ALLOCSET_DEFAULT_MINSIZE,
*************** do_autovacuum(void)
*** 2291,2297 ****
  		 * that somebody just finished vacuuming this table.  The window to
  		 * the race condition is not closed but it is very small.
  		 */
! 		MemoryContextSwitchTo(AutovacMemCxt);
  		tab = table_recheck_autovac(relid, table_toast_map, pg_class_desc);
  		if (tab == NULL)
  		{
--- 3001,3007 ----
  		 * that somebody just finished vacuuming this table.  The window to
  		 * the race condition is not closed but it is very small.
  		 */
! 		MemoryContextSwitchTo(AutovacWorkerMemCxt);
  		tab = table_recheck_autovac(relid, table_toast_map, pg_class_desc);
  		if (tab == NULL)
  		{
*************** AutoVacuumShmemSize(void)
*** 2874,2885 ****
  	Size		size;
  
  	/*
! 	 * Need the fixed struct and the array of WorkerInfoData.
  	 */
  	size = sizeof(AutoVacuumShmemStruct);
  	size = MAXALIGN(size);
  	size = add_size(size, mul_size(autovacuum_max_workers,
  								   sizeof(WorkerInfoData)));
  	return size;
  }
  
--- 3584,3600 ----
  	Size		size;
  
  	/*
! 	 * Need the fixed struct and the array of WorkerInfoData, plus per
! 	 * database entries in a hash. As we only track databases which have at
! 	 * least one worker attached, we won't ever need more than
! 	 * autovacuum_max_workers entries.
  	 */
  	size = sizeof(AutoVacuumShmemStruct);
  	size = MAXALIGN(size);
  	size = add_size(size, mul_size(autovacuum_max_workers,
  								   sizeof(WorkerInfoData)));
+ 	size = add_size(size, hash_estimate_size(autovacuum_max_workers,
+ 											 sizeof(co_database)));
  	return size;
  }
  
*************** AutoVacuumShmemInit(void)
*** 2890,2895 ****
--- 3605,3611 ----
  void
  AutoVacuumShmemInit(void)
  {
+ 	HASHCTL     hctl;
  	bool		found;
  
  	AutoVacuumShmem = (AutoVacuumShmemStruct *)
*************** AutoVacuumShmemInit(void)
*** 2921,2926 ****
--- 3637,3651 ----
  	}
  	else
  		Assert(found);
+ 
+ 	hctl.keysize = sizeof(Oid);
+ 	hctl.entrysize = sizeof(co_database);
+ 	hctl.hash = oid_hash;
+ 	co_databases = ShmemInitHash("Coordinator Database Info",
+ 								 autovacuum_max_workers,
+ 								 autovacuum_max_workers,
+ 								 &hctl,
+ 								 HASH_ELEM | HASH_FUNCTION);
  }
  
  /*
============================================================
*** src/include/postmaster/autovacuum.h	9d1ae53662236fcc871f84a9b1036619931bffe6
--- src/include/postmaster/autovacuum.h	0937b36c82e931677a8fed1cdd638f0138155086
***************
*** 14,22 ****
--- 14,91 ----
  #ifndef AUTOVACUUM_H
  #define AUTOVACUUM_H
  
+ #include "postgres.h"
+ #include "pgstat.h"
+ #include "lib/dllist.h"
  #include "storage/imsg.h"
  #include "storage/lock.h"
  
+ /*
+  * Valid backend states for background workers.
+  */
+ typedef enum
+ {
+ 	WS_IDLE = 'I',
+ 
+ 	WS_AUTOVACUUM = 'V',
+ 
+ } worker_state;
+ 
+ #define IsIdleWorker(wi)			(IsAutoVacuumWorkerProcess() && (wi->wi_state == WS_IDLE))
+ #define IsAutoVacuumWorker(wi)      (IsAutoVacuumWorkerProcess() && (wi->wi_state == WS_AUTOVACUUM))
+ 
+ 
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_links			entry into free list or running list
+  * wi_dboid			OID of the database this worker is supposed to work on
+  * wi_backend_id	id of the running worker backend, NULL if not started
+  * wi_launchtime	Time at which this worker was launched
+  *
+  * wi_tableoid		OID of the table currently being vacuumed
+  * wi_cost_*		Vacuum cost-based delay parameters current in this worker
+  *
+  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
+  * protected by AutovacuumScheduleLock (which is read-only for everyone except
+  * that worker itself).
+  *-------------
+  */
+ typedef struct WorkerInfoData
+ {
+ 	SHM_QUEUE	wi_links;
+ 	Oid			wi_dboid;
+ 	BackendId   wi_backend_id;
+ 	TimestampTz wi_launchtime;
+ 	worker_state wi_state;
+ 
+ 	/* autovacuum specific fields */
+ 	Oid			wi_tableoid;
+ 	int			wi_cost_delay;
+ 	int			wi_cost_limit;
+ 	int			wi_cost_limit_base;
+ } WorkerInfoData;
+ 
+ typedef struct WorkerInfoData *WorkerInfo;
+ 
+ /* struct to keep track of databases in the coordinator */
+ typedef struct co_database
+ {
+ 	Oid					codb_dboid;
+ 
+ 	/* for internal use by the coordinator */
+ 	int                 codb_num_cached_jobs;
+ 	Dllist              codb_cached_jobs;
+ 
+ 	/* tracking of idle workers */
+ 	int				    codb_num_idle_workers;
+ 	SHM_QUEUE           codb_idle_workers;
+ 
+ 	int                 codb_num_connected_workers;
+ } co_database;
+ 
  /* GUC variables */
  extern bool autovacuum_enabled;
  extern int	autovacuum_max_workers;
*************** extern int	Log_autovacuum_min_duration;
*** 33,38 ****
--- 102,109 ----
  
  extern int	Log_autovacuum_min_duration;
  
+ extern char *decode_worker_state(worker_state state);
+ 
  /* Status inquiry functions */
  extern bool IsAutoVacuumLauncherProcess(void);
  extern bool IsAutoVacuumWorkerProcess(void);
*************** extern void AutoVacuumShmemInit(void);
*** 57,60 ****
--- 128,135 ----
  extern Size AutoVacuumShmemSize(void);
  extern void AutoVacuumShmemInit(void);
  
+ /* bgworker job management functions */
+ extern void bgworker_job_failed(int errcode);
+ extern void bgworker_reset(void);
+ 
  #endif   /* AUTOVACUUM_H */
============================================================
*** src/backend/storage/ipc/imsg.c	81f58ac6067b9e4ebfaaf1e8ada1df68a20e8493
--- src/backend/storage/ipc/imsg.c	c9dbb1c2c90a6b1a5019b4edb42c682f92dac469
*************** decode_imessage_type(const IMessageType 
*** 82,87 ****
--- 82,89 ----
  
  		case IMSGT_REGISTER_WORKER:
  			return "IMSGT_REGISTER_WORKER";
+ 		case IMSGT_TERM_WORKER:
+ 			return "IMSGT_TERM_WORKER";
  		case IMSGT_READY:
  			return "IMSGT_READY";
  
============================================================
*** src/include/storage/imsg.h	33f7d6772ba8d599a4eed0b882ccfa474a5cf258
--- src/include/storage/imsg.h	036c47ec0fee900cdff2ab7a5264d5396eed89dd
*************** typedef enum
*** 33,38 ****
--- 33,39 ----
  	 * registration and job management.
  	 */
  	IMSGT_REGISTER_WORKER = 'W',
+ 	IMSGT_TERM_WORKER = 'M',
  	IMSGT_READY = 'r',
  
  	/* inform the coordinator about a database that needs vacuuming to
