Autovacuum launcher patch

Started by Alvaro Herreraalmost 19 years ago6 messages
#1Alvaro Herrera
alvherre@commandprompt.com
1 attachment(s)

Hello,

This patch separates autovacuum in two families of processes: one is the
"launcher", in charge of examining statistics and deciding when to start
a worker. The other is the worker, which is started by the postmaster
under command of the launcher, and processes what the launcher tells it
to process (by way of setting info up in shared memory).

The postmaster treats workers as regular backends; they are listed in
the backend list, so when it wants to shut down, it'll send a SIGTERM
signal just like everyone else, meaning it'll Just Work(tm).

The launcher is a dummy process; it never connects to any database.
Right now, the scheduling is more or less the same as before: it'll only
start a single worker, which will process a whole database. Or rather,
all tables in it that are determined to need vacuuming, per the old
rules. Currently, the launcher first examines the last autovacuum time
to determine which database to vacuum; the worker then examines the
stats to determine which tables to vacuum. Eventually this will need to
be changed so that the launcher tells the worker exactly what table to
work on.

I've been wondering how to make the scheduling work in the future, when
we need to have the launcher read stuff from catalogs to configure the
scheduling ... Maybe the solution will be to store flatfiles based on
the catalogs, like we do for pg_database and pg_authid.

Comments are welcome.

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

Attachments:

autovac-launcher.patchtext/x-diff; charset=us-asciiDownload
Index: src/backend/access/transam/varsup.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/access/transam/varsup.c,v
retrieving revision 1.77
diff -c -p -r1.77 varsup.c
*** src/backend/access/transam/varsup.c	5 Jan 2007 22:19:23 -0000	1.77
--- src/backend/access/transam/varsup.c	26 Jan 2007 23:13:35 -0000
*************** GetNewTransactionId(bool isSubXact)
*** 72,78 ****
  		 * still gives plenty of chances before we get into real trouble.
  		 */
  		if (IsUnderPostmaster && (xid % 65536) == 0)
! 			SendPostmasterSignal(PMSIGNAL_START_AUTOVAC);
  
  		if (IsUnderPostmaster &&
  		 TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidStopLimit))
--- 72,78 ----
  		 * still gives plenty of chances before we get into real trouble.
  		 */
  		if (IsUnderPostmaster && (xid % 65536) == 0)
! 			SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
  
  		if (IsUnderPostmaster &&
  		 TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidStopLimit))
*************** SetTransactionIdLimit(TransactionId olde
*** 286,292 ****
  	 */
  	if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
  		IsUnderPostmaster)
! 		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC);
  
  	/* Give an immediate warning if past the wrap warn point */
  	if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit))
--- 286,292 ----
  	 */
  	if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
  		IsUnderPostmaster)
! 		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
  
  	/* Give an immediate warning if past the wrap warn point */
  	if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit))
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.343
diff -c -p -r1.343 vacuum.c
*** src/backend/commands/vacuum.c	5 Jan 2007 22:19:26 -0000	1.343
--- src/backend/commands/vacuum.c	26 Jan 2007 17:23:00 -0000
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 303,309 ****
  	 * Send info about dead objects to the statistics collector, unless we are
  	 * in autovacuum --- autovacuum.c does this for itself.
  	 */
! 	if (vacstmt->vacuum && !IsAutoVacuumProcess())
  		pgstat_vacuum_tabstat();
  
  	/*
--- 303,309 ----
  	 * Send info about dead objects to the statistics collector, unless we are
  	 * in autovacuum --- autovacuum.c does this for itself.
  	 */
! 	if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
  		pgstat_vacuum_tabstat();
  
  	/*
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 472,478 ****
  		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  
! 	if (vacstmt->vacuum && !IsAutoVacuumProcess())
  	{
  		/*
  		 * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
--- 472,478 ----
  		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  
! 	if (vacstmt->vacuum && !IsAutoVacuumWorkerProcess())
  	{
  		/*
  		 * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.31
diff -c -p -r1.31 autovacuum.c
*** src/backend/postmaster/autovacuum.c	16 Jan 2007 13:28:56 -0000	1.31
--- src/backend/postmaster/autovacuum.c	26 Jan 2007 23:07:29 -0000
***************
*** 39,45 ****
--- 39,47 ----
  #include "postmaster/postmaster.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
+ #include "storage/pmsignal.h"
  #include "storage/proc.h"
+ #include "storage/procarray.h"
  #include "storage/sinval.h"
  #include "tcop/tcopprot.h"
  #include "utils/flatfiles.h"
***************
*** 50,55 ****
--- 52,59 ----
  #include "utils/syscache.h"
  
  
+ static bool avlauncher_exit_request = false;
+ 
  /*
   * GUC parameters
   */
*************** int			autovacuum_vac_cost_delay;
*** 65,71 ****
  int			autovacuum_vac_cost_limit;
  
  /* Flag to tell if we are in the autovacuum daemon process */
! static bool am_autovacuum = false;
  
  /* Last time autovac daemon started/stopped (only valid in postmaster) */
  static time_t last_autovac_start_time = 0;
--- 69,76 ----
  int			autovacuum_vac_cost_limit;
  
  /* Flag to tell if we are in the autovacuum daemon process */
! static bool am_autovacuum_launcher = false;
! static bool am_autovacuum_worker = false;
  
  /* Last time autovac daemon started/stopped (only valid in postmaster) */
  static time_t last_autovac_start_time = 0;
*************** static int	default_freeze_min_age;
*** 80,85 ****
--- 85,93 ----
  /* Memory context for long-lived data */
  static MemoryContext AutovacMemCxt;
  
+ /* Memory context for stat data */
+ static MemoryContext AutoVacStatContext;
+ 
  /* struct to keep list of candidate databases for vacuum */
  typedef struct autovac_dbase
  {
*************** typedef struct autovac_table
*** 101,113 ****
  	int			vacuum_cost_limit;
  } autovac_table;
  
  
  #ifdef EXEC_BACKEND
! static pid_t autovac_forkexec(void);
  #endif
! NON_EXEC_STATIC void AutoVacMain(int argc, char *argv[]);
  static void do_autovacuum(PgStat_StatDBEntry *dbentry);
  static List *autovac_get_database_list(void);
  static void test_rel_for_autovac(Oid relid, PgStat_StatTabEntry *tabentry,
  					 Form_pg_class classForm,
  					 Form_pg_autovacuum avForm,
--- 109,133 ----
  	int			vacuum_cost_limit;
  } autovac_table;
  
+ typedef struct
+ {
+ 	Oid		process_db;			/* OID of database to process */
+ 	int		worker_pid;			/* PID of the worker process, if any */
+ } AutoVacuumShmemStruct;
+ 
+ static AutoVacuumShmemStruct *AutoVacuumShmem;
+ 
  
  #ifdef EXEC_BACKEND
! static pid_t avlauncher_forkexec(void);
! static pid_t avworker_forkexec(void);
  #endif
! NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
! NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
! 
  static void do_autovacuum(PgStat_StatDBEntry *dbentry);
  static List *autovac_get_database_list(void);
+ static char *autovac_get_database_name(Oid dbid);
  static void test_rel_for_autovac(Oid relid, PgStat_StatTabEntry *tabentry,
  					 Form_pg_class classForm,
  					 Form_pg_autovacuum avForm,
*************** static void test_rel_for_autovac(Oid rel
*** 116,130 ****
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  
  
  /*
!  * Main entry point for autovacuum controller process.
   *
!  * This code is heavily based on pgarch.c, q.v.
   */
  int
! autovac_start(void)
  {
  	time_t		curtime;
  	pid_t		AutoVacPID;
--- 136,188 ----
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
+ static void avlauncher_exit(SIGNAL_ARGS);
+ static void avl_quickdie(SIGNAL_ARGS);
+ 
+ 
  
+ /********************************************************************
+  *                    AUTOVACUUM LAUNCHER CODE
+  ********************************************************************/
  
+ #ifdef EXEC_BACKEND
  /*
!  * forkexec routine for the autovacuum launcher process.
   *
!  * Format up the arglist, then fork and exec.
!  */
! static pid_t
! avlauncher_forkexec(void)
! {
! 	char	   *av[10];
! 	int			ac = 0;
! 
! 	av[ac++] = "postgres";
! 	av[ac++] = "--forkavlauncher";
! 	av[ac++] = NULL;			/* filled in by postmaster_forkexec */
! 	av[ac] = NULL;
! 
! 	Assert(ac < lengthof(av));
! 
! 	return postmaster_forkexec(ac, av);
! }
! 
! /*
!  * We need this set from the outside, before InitProcess is called
!  */
! void
! AutovacuumLauncherIAm(void)
! {
! 	am_autovacuum_launcher = true;
! }
! #endif
! 
! /*
!  * Main entry point for autovacuum launcher process, to be called from the
!  * postmaster.
   */
  int
! StartAutoVacLauncher(void)
  {
  	time_t		curtime;
  	pid_t		AutoVacPID;
*************** autovac_start(void)
*** 142,147 ****
--- 200,209 ----
  	 * will get another chance later if we do nothing now.
  	 *
  	 * XXX todo: implement sleep scale factor that existed in contrib code.
+ 	 *
+ 	 * FIXME -- figure out how much of this is still relevant.  It's leftover
+ 	 * from before we had a separation between launcher and worker, probably
+ 	 * most of it is obsolete.
  	 */
  
  	curtime = time(NULL);
*************** autovac_start(void)
*** 156,162 ****
  	last_autovac_start_time = curtime;
  
  #ifdef EXEC_BACKEND
! 	switch ((AutoVacPID = autovac_forkexec()))
  #else
  	switch ((AutoVacPID = fork_process()))
  #endif
--- 218,224 ----
  	last_autovac_start_time = curtime;
  
  #ifdef EXEC_BACKEND
! 	switch ((AutoVacPID = avlauncher_forkexec()))
  #else
  	switch ((AutoVacPID = fork_process()))
  #endif
*************** autovac_start(void)
*** 175,181 ****
  			/* Lose the postmaster's on-exit routines */
  			on_exit_reset();
  
! 			AutoVacMain(0, NULL);
  			break;
  #endif
  		default:
--- 237,243 ----
  			/* Lose the postmaster's on-exit routines */
  			on_exit_reset();
  
! 			AutoVacLauncherMain(0, NULL);
  			break;
  #endif
  		default:
*************** autovac_start(void)
*** 187,214 ****
  }
  
  /*
!  * autovac_stopped --- called by postmaster when subprocess exit is detected
   */
! void
! autovac_stopped(void)
  {
! 	last_autovac_stop_time = time(NULL);
  }
  
  #ifdef EXEC_BACKEND
  /*
!  * autovac_forkexec()
   *
!  * Format up the arglist for the autovacuum process, then fork and exec.
   */
  static pid_t
! autovac_forkexec(void)
  {
  	char	   *av[10];
  	int			ac = 0;
  
  	av[ac++] = "postgres";
! 	av[ac++] = "--forkautovac";
  	av[ac++] = NULL;			/* filled in by postmaster_forkexec */
  	av[ac] = NULL;
  
--- 249,596 ----
  }
  
  /*
!  * Main loop for the autovacuum launcher process.
   */
! NON_EXEC_STATIC void
! AutoVacLauncherMain(int argc, char *argv[])
! {
! 	sigjmp_buf	local_sigjmp_buf;
! 	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
! 	MemoryContext	avlauncher_cxt;
! 
! 	/* we are a postmaster subprocess now */
! 	IsUnderPostmaster = true;
! 	am_autovacuum_launcher = true;
! 
! 	/* reset MyProcPid */
! 	MyProcPid = getpid();
! 
! 	/* Identify myself via ps */
! 	init_ps_display("autovacuum launcher process", "", "", "");
! 
! 	SetProcessingMode(InitProcessing);
! 
! 	/*
! 	 * If possible, make this process a group leader, so that the postmaster
! 	 * can signal any child processes too.  (autovacuum probably never has
! 	 * any child processes, but for consistency we make all postmaster
! 	 * child processes do this.)
! 	 */
! #ifdef HAVE_SETSID
! 	if (setsid() < 0)
! 		elog(FATAL, "setsid() failed: %m");
! #endif
! 
! 	/*
! 	 * Set up signal handlers.	Since this is a "dummy" process, it has
! 	 * particular signal requirements -- no deadlock checker or sinval
! 	 * catchup, for example.
! 	 *
! 	 * Currently, we don't pay attention to postgresql.conf changes that
! 	 * happen during a single daemon iteration, so we can ignore SIGHUP.
! 	 *
! 	 * FIXME -- this was correct for the old autovacuum, but the avlauncher
! 	 * should be paying attention.  Additionally, it may be a good idea to
! 	 * receive signals when an avworker process finishes.
! 	 */
! 	pqsignal(SIGHUP, SIG_IGN);
! 
! 	pqsignal(SIGINT, SIG_IGN);
! 	pqsignal(SIGTERM, avlauncher_exit);
! 	pqsignal(SIGQUIT, avl_quickdie);
! 	pqsignal(SIGALRM, SIG_IGN);
! 
! 	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
! 	/* We don't listen for async notifies */
! 	pqsignal(SIGUSR2, SIG_IGN);
! 	pqsignal(SIGFPE, FloatExceptionHandler);
! 	pqsignal(SIGCHLD, SIG_DFL);
! 
! 	/* Early initialization */
! 	BaseInit();
! 
! 	/*
! 	 * Create a per-backend PGPROC struct in shared memory, except in the
! 	 * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
! 	 * this before we can use LWLocks (and in the EXEC_BACKEND case we already
! 	 * had to do some stuff with LWLocks).
! 	 */
! #ifndef EXEC_BACKEND
! 	InitDummyProcess();
! #endif
! 
! 	/*
! 	 * Create a memory context that we will do all our work in.  We do this so
! 	 * that we can reset the context during error recovery and thereby avoid
! 	 * possible memory leaks.
! 	 */
! 	avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
! 										   "Autovacuum Launcher",
! 										   ALLOCSET_DEFAULT_MINSIZE,
! 										   ALLOCSET_DEFAULT_INITSIZE,
! 										   ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(avlauncher_cxt);
! 
! 
! 	/*
! 	 * If an exception is encountered, processing resumes here.
! 	 *
! 	 * This code is heavily based on bgwriter.c, q.v.
! 	 */
! 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
! 	{
! 		/* since not using PG_TRY, must reset error stack by hand */
! 		error_context_stack = NULL;
! 
! 		/* Prevents interrupts while cleaning up */
! 		HOLD_INTERRUPTS();
! 
! 		/* Report the error to the server log */
! 		EmitErrorReport();
! 
! 		/*
! 		 * These operations are really just a minimal subset of
! 		 * AbortTransaction().  We don't have very many resources to worry
! 		 * about, but we do have LWLocks.
! 		 */
! 		LWLockReleaseAll();
! 		AtEOXact_Files();
! 
! 		/*
! 		 * Now return to normal top-level context and clear ErrorContext for
! 		 * next time.
! 		 */
! 		MemoryContextSwitchTo(avlauncher_cxt);
! 		FlushErrorState();
! 
! 		/* Flush any leaked data in the top-level context */
! 		MemoryContextResetAndDeleteChildren(avlauncher_cxt);
! 		/* Make sure pgstat also considers our stat data as gone */
! 		pgstat_stathash_reset();
! 
! 		/* Now we can allow interrupts again */
! 		RESUME_INTERRUPTS();
! 
! 		/*
! 		 * Sleep at least 1 second after any error.  We don't want to be
! 		 * filling the error logs as fast as we can.
! 		 */
! 		pg_usleep(1000000L);
! 	}
! 
! 	/* We can now handle ereport(ERROR) */
! 	PG_exception_stack = &local_sigjmp_buf;
! 
! 	ereport(LOG,
! 			(errmsg("autovacuum launcher started")));
! 
! 	PG_SETMASK(&UnBlockSig);
! 
! 	AutoVacStatContext = AllocSetContextCreate(avlauncher_cxt,
! 											   "AVLauncher Stat Hash",
! 											   ALLOCSET_DEFAULT_MINSIZE,
! 											   ALLOCSET_DEFAULT_INITSIZE,
! 											   ALLOCSET_DEFAULT_MAXSIZE);
! 
! 	for (;;)
! 	{
! 		TransactionId xidForceLimit;
! 		ListCell *cell;
! 		int		worker_pid;
! 
! 		/*
! 		 * Emergency bailout if postmaster has died.  This is to avoid the
! 		 * necessity for manual cleanup of all postmaster children.
! 		 */
! 		if (!PostmasterIsAlive(true))
! 			exit(1);
! 
! 		if (avlauncher_exit_request)
! 		{
! 			ereport(LOG,
! 					(errmsg("autovacuum launcher shutting down")));
! 
! 			/* Normal exit from the autovac launcher is here */
! 			proc_exit(0);		/* done */
! 		}
! 
! 		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
! 		 */
! 		LWLockAcquire(AutovacuumLock, LW_SHARED);
! 		worker_pid = AutoVacuumShmem->worker_pid;
! 		LWLockRelease(AutovacuumLock);
! 
! 		if (worker_pid != 0)
! 		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
! 
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
! 			else
! 			{
! 				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
! 				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
! 			}
! 		}
! 
! 		/* Get a list of databases */
! 		dblist = autovac_get_database_list();
! 
! 		/*
! 		 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
! 		 * to pass without forcing a vacuum.  (This limit can be tightened for
! 		 * particular tables, but not loosened.)
! 		 */
! 		recentXid = ReadNewTransactionId();
! 		xidForceLimit = recentXid - autovacuum_freeze_max_age;
! 		/* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
! 		if (xidForceLimit < FirstNormalTransactionId)
! 			xidForceLimit -= FirstNormalTransactionId;
! 
! 		/*
! 		 * Choose a database to connect to.  We pick the database that was least
! 		 * recently auto-vacuumed, or one that needs vacuuming to prevent Xid
! 		 * wraparound-related data loss.  If any db at risk of wraparound is
! 		 * found, we pick the one with oldest datfrozenxid, independently of
! 		 * autovacuum times.
! 		 *
! 		 * Note that a database with no stats entry is not considered, except for
! 		 * Xid wraparound purposes.  The theory is that if no one has ever
! 		 * connected to it since the stats were last initialized, it doesn't need
! 		 * vacuuming.
! 		 *
! 		 * XXX This could be improved if we had more info about whether it needs
! 		 * vacuuming before connecting to it.  Perhaps look through the pgstats
! 		 * data for the database's tables?  One idea is to keep track of the
! 		 * number of new and dead tuples per database in pgstats.  However it
! 		 * isn't clear how to construct a metric that measures that and not cause
! 		 * starvation for less busy databases.
! 		 */
! 		db = NULL;
! 		for_xid_wrap = false;
! 		foreach(cell, dblist)
! 		{
! 			autovac_dbase *tmp = lfirst(cell);
! 
! 			/* Find pgstat entry if any */
! 			tmp->entry = pgstat_fetch_stat_dbentry(tmp->oid);
! 
! 			/* Check to see if this one is at risk of wraparound */
! 			if (TransactionIdPrecedes(tmp->frozenxid, xidForceLimit))
! 			{
! 				if (db == NULL ||
! 					TransactionIdPrecedes(tmp->frozenxid, db->frozenxid))
! 					db = tmp;
! 				for_xid_wrap = true;
! 				continue;
! 			}
! 			else if (for_xid_wrap)
! 				continue;			/* ignore not-at-risk DBs */
! 
! 			/*
! 			 * Otherwise, skip a database with no pgstat entry; it means it
! 			 * hasn't seen any activity.
! 			 */
! 			if (!tmp->entry)
! 				continue;
! 
! 			/*
! 			 * Remember the db with oldest autovac time.  (If we are here,
! 			 * both tmp->entry and db->entry must be non-null.)
! 			 */
! 			if (db == NULL ||
! 				tmp->entry->last_autovac_time < db->entry->last_autovac_time)
! 				db = tmp;
! 		}
! 
! 		/* Found a database -- process it */
! 		if (db != NULL)
! 		{
! 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 			AutoVacuumShmem->process_db = db->oid;
! 			LWLockRelease(AutovacuumLock);
! 
! 			SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
! 		}
! 
! sleep:
! 		/* have pgstat read the file again next time */
! 		MemoryContextReset(AutoVacStatContext);
! 		pgstat_stathash_reset();
! 
! 		/* now sleep until the next autovac iteration */
! 		pg_usleep(autovacuum_naptime * 1000000L); 
! 	}
! }
! 
! MemoryContext
! AutoVacGetStatsContext(void)
! {
! 	return AutoVacStatContext;
! }
! 
! static void
! avlauncher_exit(SIGNAL_ARGS)
! {
! 	avlauncher_exit_request = true;
! }
! 
! /*
!  * avl_quickdie occurs when signalled SIGQUIT from postmaster.
!  *
!  * Some backend has bought the farm, so we need to stop what we're doing
!  * and exit.
!  */
! static void
! avl_quickdie(SIGNAL_ARGS)
  {
! 	PG_SETMASK(&BlockSig);
! 
! 	/*
! 	 * DO NOT proc_exit() -- we're here because shared memory may be
! 	 * corrupted, so we don't want to try to clean up our transaction. Just
! 	 * nail the windows shut and get out of town.
! 	 *
! 	 * Note we do exit(2) not exit(0).	This is to force the postmaster into a
! 	 * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
! 	 * backend.  This is necessary precisely because we don't clean up our
! 	 * shared memory state.
! 	 */
! 	exit(2);
  }
  
+ 
+ /********************************************************************
+  *                    AUTOVACUUM WORKER CODE
+  ********************************************************************/
+ 
  #ifdef EXEC_BACKEND
  /*
!  * forkexec routines for the autovacuum worker.
   *
!  * Format up the arglist, then fork and exec.
   */
  static pid_t
! avworker_forkexec(void)
  {
  	char	   *av[10];
  	int			ac = 0;
  
  	av[ac++] = "postgres";
! 	av[ac++] = "--forkavworker";
  	av[ac++] = NULL;			/* filled in by postmaster_forkexec */
  	av[ac] = NULL;
  
*************** autovac_forkexec(void)
*** 221,254 ****
   * We need this set from the outside, before InitProcess is called
   */
  void
! AutovacuumIAm(void)
  {
! 	am_autovacuum = true;
  }
- #endif   /* EXEC_BACKEND */
  
  /*
!  * AutoVacMain
   */
  NON_EXEC_STATIC void
! AutoVacMain(int argc, char *argv[])
  {
- 	ListCell   *cell;
- 	List	   *dblist;
- 	autovac_dbase *db;
- 	TransactionId xidForceLimit;
- 	bool		for_xid_wrap;
  	sigjmp_buf	local_sigjmp_buf;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
! 	am_autovacuum = true;
  
  	/* reset MyProcPid */
  	MyProcPid = getpid();
  
  	/* Identify myself via ps */
! 	init_ps_display("autovacuum process", "", "", "");
  
  	SetProcessingMode(InitProcessing);
  
--- 603,673 ----
   * We need this set from the outside, before InitProcess is called
   */
  void
! AutovacuumWorkerIAm(void)
! {
! 	am_autovacuum_worker = true;
! }
! #endif
! 
! /*
!  * Main entry point for autovacuum worker process.
!  *
!  * This code is heavily based on pgarch.c, q.v.
!  */
! int
! StartAutoVacWorker(void)
  {
! 	pid_t		worker_pid;
! 
! #ifdef EXEC_BACKEND
! 	switch ((worker_pid = avworker_forkexec()))
! #else
! 	switch ((worker_pid = fork_process()))
! #endif
! 	{
! 		case -1:
! 			ereport(LOG,
! 					(errmsg("could not fork autovacuum process: %m")));
! 			return 0;
! 
! #ifndef EXEC_BACKEND
! 		case 0:
! 			/* in postmaster child ... */
! 			/* Close the postmaster's sockets */
! 			ClosePostmasterPorts(false);
! 
! 			/* Lose the postmaster's on-exit routines */
! 			on_exit_reset();
! 
! 			AutoVacWorkerMain(0, NULL);
! 			break;
! #endif
! 		default:
! 			return (int) worker_pid;
! 	}
! 
! 	/* shouldn't get here */
! 	return 0;
  }
  
  /*
!  * AutoVacWorkerMain
   */
  NON_EXEC_STATIC void
! AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
+ 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
! 	am_autovacuum_worker = true;
  
  	/* reset MyProcPid */
  	MyProcPid = getpid();
  
  	/* Identify myself via ps */
! 	init_ps_display("autovacuum worker process", "", "", "");
  
  	SetProcessingMode(InitProcessing);
  
*************** AutoVacMain(int argc, char *argv[])
*** 335,412 ****
  	 */
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
- 	/* Get a list of databases */
- 	dblist = autovac_get_database_list();
- 
  	/*
! 	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
! 	 * to pass without forcing a vacuum.  (This limit can be tightened for
! 	 * particular tables, but not loosened.)
  	 */
! 	recentXid = ReadNewTransactionId();
! 	xidForceLimit = recentXid - autovacuum_freeze_max_age;
! 	/* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
! 	if (xidForceLimit < FirstNormalTransactionId)
! 		xidForceLimit -= FirstNormalTransactionId;
  
! 	/*
! 	 * Choose a database to connect to.  We pick the database that was least
! 	 * recently auto-vacuumed, or one that needs vacuuming to prevent Xid
! 	 * wraparound-related data loss.  If any db at risk of wraparound is
! 	 * found, we pick the one with oldest datfrozenxid,
! 	 * independently of autovacuum times.
! 	 *
! 	 * Note that a database with no stats entry is not considered, except for
! 	 * Xid wraparound purposes.  The theory is that if no one has ever
! 	 * connected to it since the stats were last initialized, it doesn't need
! 	 * vacuuming.
! 	 *
! 	 * XXX This could be improved if we had more info about whether it needs
! 	 * vacuuming before connecting to it.  Perhaps look through the pgstats
! 	 * data for the database's tables?  One idea is to keep track of the
! 	 * number of new and dead tuples per database in pgstats.  However it
! 	 * isn't clear how to construct a metric that measures that and not cause
! 	 * starvation for less busy databases.
! 	 */
! 	db = NULL;
! 	for_xid_wrap = false;
! 	foreach(cell, dblist)
! 	{
! 		autovac_dbase *tmp = lfirst(cell);
  
! 		/* Find pgstat entry if any */
! 		tmp->entry = pgstat_fetch_stat_dbentry(tmp->oid);
  
! 		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->frozenxid, xidForceLimit))
! 		{
! 			if (db == NULL ||
! 				TransactionIdPrecedes(tmp->frozenxid, db->frozenxid))
! 				db = tmp;
! 			for_xid_wrap = true;
! 			continue;
! 		}
! 		else if (for_xid_wrap)
! 			continue;			/* ignore not-at-risk DBs */
! 
! 		/*
! 		 * Otherwise, skip a database with no pgstat entry; it means it
! 		 * hasn't seen any activity.
! 		 */
! 		if (!tmp->entry)
! 			continue;
! 
! 		/*
! 		 * Remember the db with oldest autovac time.  (If we are here,
! 		 * both tmp->entry and db->entry must be non-null.)
! 		 */
! 		if (db == NULL ||
! 			tmp->entry->last_autovac_time < db->entry->last_autovac_time)
! 			db = tmp;
! 	}
! 
! 	if (db)
  	{
  		/*
  		 * Report autovac startup to the stats collector.  We deliberately do
  		 * this before InitPostgres, so that the last_autovac_time will get
--- 754,777 ----
  	 */
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
! 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
  
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
  
! 	LWLockRelease(AutovacuumLock);
  
! 	if (OidIsValid(dbid))
  	{
+ 		char	*dbname;
+ 		PgStat_StatDBEntry *dbentry;
+ 
  		/*
  		 * Report autovac startup to the stats collector.  We deliberately do
  		 * this before InitPostgres, so that the last_autovac_time will get
*************** AutoVacMain(int argc, char *argv[])
*** 415,421 ****
  		 * database, rather than making any progress on stuff it can connect
  		 * to.
  		 */
! 		pgstat_report_autovac(db->oid);
  
  		/*
  		 * Connect to the selected database
--- 780,793 ----
  		 * database, rather than making any progress on stuff it can connect
  		 * to.
  		 */
! 		pgstat_report_autovac(dbid);
! 
! 		/*
! 		 * FIXME -- maybe it's better to change InitPostgres to be able to
! 		 * receive an OID, so that we don't have to read the pg_database
! 		 * flatfile again?
! 		 */
! 		dbname = autovac_get_database_name(dbid);
  
  		/*
  		 * Connect to the selected database
*************** AutoVacMain(int argc, char *argv[])
*** 423,433 ****
  		 * Note: if we have selected a just-deleted database (due to using
  		 * stale stats info), we'll fail and exit here.
  		 */
! 		InitPostgres(db->name, NULL);
  		SetProcessingMode(NormalProcessing);
! 		set_ps_display(db->name, false);
  		ereport(DEBUG1,
! 				(errmsg("autovacuum: processing database \"%s\"", db->name)));
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
--- 795,805 ----
  		 * Note: if we have selected a just-deleted database (due to using
  		 * stale stats info), we'll fail and exit here.
  		 */
! 		InitPostgres(dbname, NULL);
  		SetProcessingMode(NormalProcessing);
! 		set_ps_display(dbname, false);
  		ereport(DEBUG1,
! 				(errmsg("autovacuum: processing database \"%s\"", dbname)));
  
  		/* Create the memory context where cross-transaction state is stored */
  		AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
*************** AutoVacMain(int argc, char *argv[])
*** 439,448 ****
  		/*
  		 * And do an appropriate amount of work
  		 */
! 		do_autovacuum(db->entry);
  	}
  
! 	/* One iteration done, go away */
  	proc_exit(0);
  }
  
--- 811,829 ----
  		/*
  		 * And do an appropriate amount of work
  		 */
! 		dbentry = pgstat_fetch_stat_dbentry(dbid);
! 		do_autovacuum(dbentry);
  	}
  
! 	/*
! 	 * Now remove our PID from shared memory, so that the launcher can start
! 	 * another worker as soon as appropriate.
! 	 */
! 	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	AutoVacuumShmem->worker_pid = 0;
! 	LWLockRelease(AutovacuumLock);
! 
! 	/* All done, go away */
  	proc_exit(0);
  }
  
*************** AutoVacMain(int argc, char *argv[])
*** 450,456 ****
   * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
!  *		because we aren't connected yet; we use the flat database file.
   */
  static List *
  autovac_get_database_list(void)
--- 831,837 ----
   * autovac_get_database_list
   *
   *		Return a list of all databases.  Note we cannot use pg_database,
!  *		because we aren't connected; we use the flat database file.
   */
  static List *
  autovac_get_database_list(void)
*************** autovac_get_database_list(void)
*** 493,498 ****
--- 874,920 ----
  }
  
  /*
+  * autovac_get_database_name
+  *
+  * 		Given a database OID, get its name from the flat database file.
+  */
+ static char *
+ autovac_get_database_name(Oid dbid)
+ {
+ 	char   *filename;
+ 	char   *dbname = NULL;
+ 	Oid		this_oid;
+ 	Oid		dummy_oid;
+ 	FILE   *db_file;
+ 	char	thisname[NAMEDATALEN];
+ 	
+ 	filename = database_getflatfilename();
+ 	db_file = AllocateFile(filename, "r");
+ 	if (db_file == NULL)
+ 		ereport(FATAL,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not open file \"%s\": %m", filename)));
+ 
+ 	while (read_pg_database_line(db_file, thisname, &this_oid,
+ 								 &dummy_oid, &dummy_oid))
+ 	{
+ 		if (this_oid == dbid)
+ 		{
+ 			dbname = pstrdup(thisname);
+ 			break;
+ 		}
+ 	}
+ 
+ 	if (dbname == NULL)
+ 		elog(FATAL, "could not find name for database %u", dbid);
+ 
+ 	FreeFile(db_file);
+ 	pfree(filename);
+ 
+ 	return dbname;
+ }
+ 
+ /*
   * Process a database table-by-table
   *
   * dbentry is either a pointer to the database entry in the stats databases
*************** autovac_init(void)
*** 1011,1021 ****
  }
  
  /*
!  * IsAutoVacuumProcess
!  *		Return whether this process is an autovacuum process.
   */
  bool
! IsAutoVacuumProcess(void)
  {
! 	return am_autovacuum;
  }
--- 1433,1484 ----
  }
  
  /*
!  * IsAutoVacuum functions
!  *		Return whether this is either a launcher autovacuum process or a worker
!  *		process.
   */
  bool
! IsAutoVacuumLauncherProcess(void)
! {
! 	return am_autovacuum_launcher;
! }
! 
! bool
! IsAutoVacuumWorkerProcess(void)
! {
! 	return am_autovacuum_worker;
! }
! 
! 
! /*
!  * AutoVacuumShmemSize
!  * 		Compute space needed for autovacuum-related shared memory
!  */
! Size
! AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
! }
! 
! /*
!  * AutoVacuumShmemInit
!  *		Allocate and initialize autovacuum-related shared memory
!  */
! void
! AutoVacuumShmemInit(void)
! {
! 	bool        found;
! 
! 	AutoVacuumShmem = (AutoVacuumShmemStruct *)
! 		ShmemInitStruct("AutoVacuum Data",
! 						AutoVacuumShmemSize(),
! 						&found);
! 	if (AutoVacuumShmem == NULL)
! 		ereport(FATAL,
! 				(errcode(ERRCODE_OUT_OF_MEMORY),
! 				 errmsg("not enough shared memory for autovacuum")));
! 	if (found)
! 		return;                 /* already initialized */
! 
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
Index: src/backend/postmaster/pgstat.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/postmaster/pgstat.c,v
retrieving revision 1.143
diff -c -p -r1.143 pgstat.c
*** src/backend/postmaster/pgstat.c	11 Jan 2007 23:06:03 -0000	1.143
--- src/backend/postmaster/pgstat.c	26 Jan 2007 17:23:00 -0000
*************** static time_t last_pgstat_start_time;
*** 110,115 ****
--- 110,116 ----
  
  static bool pgStatRunningInCollector = false;
  
+ 
  /*
   * Place where backends store per-table info to be sent to the collector.
   * We store shared relations separately from non-shared ones, to be able to
*************** static TabStatArray SharedTabStat = {0, 
*** 130,135 ****
--- 131,137 ----
  static int	pgStatXactCommit = 0;
  static int	pgStatXactRollback = 0;
  
+ static bool stathash_set = false;
  static TransactionId pgStatDBHashXact = InvalidTransactionId;
  static HTAB *pgStatDBHash = NULL;
  static TransactionId pgStatLocalStatusXact = InvalidTransactionId;
*************** pgstat_report_vacuum(Oid tableoid, bool 
*** 930,936 ****
  	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
  	msg.m_tableoid = tableoid;
  	msg.m_analyze = analyze;
! 	msg.m_autovacuum = IsAutoVacuumProcess();	/* is this autovacuum? */
  	msg.m_vacuumtime = GetCurrentTimestamp();
  	msg.m_tuples = tuples;
  	pgstat_send(&msg, sizeof(msg));
--- 932,938 ----
  	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
  	msg.m_tableoid = tableoid;
  	msg.m_analyze = analyze;
! 	msg.m_autovacuum = IsAutoVacuumWorkerProcess();	/* is this autovacuum? */
  	msg.m_vacuumtime = GetCurrentTimestamp();
  	msg.m_tuples = tuples;
  	pgstat_send(&msg, sizeof(msg));
*************** pgstat_report_analyze(Oid tableoid, bool
*** 955,961 ****
  	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
  	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
  	msg.m_tableoid = tableoid;
! 	msg.m_autovacuum = IsAutoVacuumProcess();	/* is this autovacuum? */
  	msg.m_analyzetime = GetCurrentTimestamp();
  	msg.m_live_tuples = livetuples;
  	msg.m_dead_tuples = deadtuples;
--- 957,963 ----
  	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
  	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
  	msg.m_tableoid = tableoid;
! 	msg.m_autovacuum = IsAutoVacuumWorkerProcess();	/* is this autovacuum? */
  	msg.m_analyzetime = GetCurrentTimestamp();
  	msg.m_live_tuples = livetuples;
  	msg.m_dead_tuples = deadtuples;
*************** pgstat_read_statsfile(HTAB **dbhash, Oid
*** 2098,2110 ****
  	int			mcxt_flags;
  
  	/*
! 	 * If running in the collector or the autovacuum process, we use the
! 	 * DynaHashCxt memory context.	If running in a backend, we use the
! 	 * TopTransactionContext instead, so the caller must only know the last
! 	 * XactId when this call happened to know if his tables are still valid or
! 	 * already gone!
  	 */
! 	if (pgStatRunningInCollector || IsAutoVacuumProcess())
  	{
  		use_mcxt = NULL;
  		mcxt_flags = 0;
--- 2100,2118 ----
  	int			mcxt_flags;
  
  	/*
! 	 * If running in the collector or an autovacuum worker process, we use the
! 	 * DynaHashCxt memory context.	If running in the autovacuum launcher, we
! 	 * use a context that process will provide us.  If running in a backend, we
! 	 * use the TopTransactionContext instead, so the caller must only know the
! 	 * last XactId when this call happened to know if his tables are still
! 	 * valid or already gone!
  	 */
! 	if (IsAutoVacuumLauncherProcess())
! 	{
! 		use_mcxt = AutoVacGetStatsContext();
! 		mcxt_flags = HASH_CONTEXT;
! 	}
! 	else if (pgStatRunningInCollector || IsAutoVacuumWorkerProcess())
  	{
  		use_mcxt = NULL;
  		mcxt_flags = 0;
*************** done:
*** 2269,2289 ****
  }
  
  /*
   * If not done for this transaction, read the statistics collector
   * stats file into some hash tables.
   *
   * Because we store the tables in TopTransactionContext, the result
   * is good for the entire current main transaction.
   *
!  * Inside the autovacuum process, the statfile is assumed to be valid
   * "forever", that is one iteration, within one database.  This means
   * we only consider the statistics as they were when the autovacuum
   * iteration started.
   */
  static void
  backend_read_statsfile(void)
  {
! 	if (IsAutoVacuumProcess())
  	{
  		/* already read it? */
  		if (pgStatDBHash)
--- 2277,2319 ----
  }
  
  /*
+  * Called when the memory context that holds the stathash has been reset,
+  * i.e., we have to read the stats file again.
+  */
+ void
+ pgstat_stathash_reset(void)
+ {
+ 	stathash_set = false;
+ }
+ 
+ /*
   * If not done for this transaction, read the statistics collector
   * stats file into some hash tables.
   *
   * Because we store the tables in TopTransactionContext, the result
   * is good for the entire current main transaction.
   *
!  * Inside an autovacuum worker process, the statfile is assumed to be valid
   * "forever", that is one iteration, within one database.  This means
   * we only consider the statistics as they were when the autovacuum
   * iteration started.
+  *
+  * Inside the autovacuum launcher process, we store the tables in a
+  * memory context that's reset after each check iteration.  It'll tell us
+  * when the context has been reset by calling pgstat_stathash_reset.
   */
  static void
  backend_read_statsfile(void)
  {
! 	if (IsAutoVacuumLauncherProcess())
! 	{
! 		if (stathash_set)
! 			return;
! 		Assert(!pgStatRunningInCollector);
! 		pgstat_read_statsfile(&pgStatDBHash, InvalidOid);
! 		stathash_set = true;
! 	}
! 	else if (IsAutoVacuumWorkerProcess())
  	{
  		/* already read it? */
  		if (pgStatDBHash)
Index: src/backend/postmaster/postmaster.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/postmaster/postmaster.c,v
retrieving revision 1.512
diff -c -p -r1.512 postmaster.c
*** src/backend/postmaster/postmaster.c	23 Jan 2007 03:28:49 -0000	1.512
--- src/backend/postmaster/postmaster.c	26 Jan 2007 23:13:43 -0000
***************
*** 129,141 ****
   * authorization phase).  This is used mainly to keep track of how many
   * children we have and send them appropriate signals when necessary.
   *
!  * "Special" children such as the startup and bgwriter tasks are not in
!  * this list.
   */
  typedef struct bkend
  {
  	pid_t		pid;			/* process id of backend */
  	long		cancel_key;		/* cancel key for cancels for this backend */
  } Backend;
  
  static Dllist *BackendList;
--- 129,143 ----
   * authorization phase).  This is used mainly to keep track of how many
   * children we have and send them appropriate signals when necessary.
   *
!  * "Special" children such as the startup, bgwriter and autovacuum launcher
!  * tasks are not in this list.  Autovacuum worker processes, however, *are* on
!  * it.
   */
  typedef struct bkend
  {
  	pid_t		pid;			/* process id of backend */
  	long		cancel_key;		/* cancel key for cancels for this backend */
+ 	bool		is_autovacuum;	/* is it an autovacuum worker? */
  } Backend;
  
  static Dllist *BackendList;
*************** bool		ClientAuthInProgress = false;		/* 
*** 218,223 ****
--- 220,226 ----
  												 * authentication */
  
  static bool force_autovac = false; /* received START_AUTOVAC signal */
+ static bool start_autovac_worker = false;
  
  /*
   * State for assigning random salts and cancel keys.
*************** ServerLoop(void)
*** 1145,1157 ****
  		/*
  		 * Wait for something to happen.
  		 *
! 		 * We wait at most one minute, or the minimum autovacuum delay, to
! 		 * ensure that the other background tasks handled below get done even
! 		 * when no requests are arriving.
  		 */
  		memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));
  
! 		timeout.tv_sec = Min(60, autovacuum_naptime);
  		timeout.tv_usec = 0;
  
  		PG_SETMASK(&UnBlockSig);
--- 1148,1159 ----
  		/*
  		 * Wait for something to happen.
  		 *
! 		 * We wait at most one minute, to ensure that the other background
! 		 * tasks handled below get done even when no requests are arriving.
  		 */
  		memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));
  
! 		timeout.tv_sec = 60;
  		timeout.tv_usec = 0;
  
  		PG_SETMASK(&UnBlockSig);
*************** ServerLoop(void)
*** 1238,1256 ****
  				signal_child(BgWriterPID, SIGUSR2);
  		}
  
! 		/*
! 		 * Start a new autovacuum process, if there isn't one running already.
! 		 * (It'll die relatively quickly.)  We check that it's not started too
! 		 * frequently in autovac_start.
! 		 */
  		if ((AutoVacuumingActive() || force_autovac) && AutoVacPID == 0 &&
  			StartupPID == 0 && !FatalError && Shutdown == NoShutdown)
  		{
! 			AutoVacPID = autovac_start();
  			if (AutoVacPID != 0)
  				force_autovac = false;	/* signal successfully processed */
  		}
  
  		/* If we have lost the archiver, try to start a new one */
  		if (XLogArchivingActive() && PgArchPID == 0 &&
  			StartupPID == 0 && !FatalError && Shutdown == NoShutdown)
--- 1240,1279 ----
  				signal_child(BgWriterPID, SIGUSR2);
  		}
  
! 		/* If we have lost the autovacuum launcher, try to start a new one */
  		if ((AutoVacuumingActive() || force_autovac) && AutoVacPID == 0 &&
  			StartupPID == 0 && !FatalError && Shutdown == NoShutdown)
  		{
! 			AutoVacPID = StartAutoVacLauncher();
  			if (AutoVacPID != 0)
  				force_autovac = false;	/* signal successfully processed */
  		}
  
+ 		if (start_autovac_worker && !FatalError && Shutdown == NoShutdown)
+ 		{
+ 			Backend	   *bn;
+ 
+ 			start_autovac_worker = false;
+ 			bn = (Backend *) malloc(sizeof(Backend));
+ 			if (!bn)
+ 			{
+ 				ereport(LOG,
+ 						(errcode(ERRCODE_OUT_OF_MEMORY),
+ 						 errmsg("out of memory")));
+ 			}
+ 			else
+ 			{
+ 				bn->pid = StartAutoVacWorker();
+ 				if (bn->pid > 0)
+ 				{
+ 					DLAddHead(BackendList, DLNewElem(bn));
+ #ifdef EXEC_BACKEND
+ 					ShmemBackendArrayAdd(bn);
+ #endif
+ 				}
+ 			}
+ 		}
+ 
  		/* If we have lost the archiver, try to start a new one */
  		if (XLogArchivingActive() && PgArchPID == 0 &&
  			StartupPID == 0 && !FatalError && Shutdown == NoShutdown)
*************** pmdie(SIGNAL_ARGS)
*** 1873,1888 ****
  			ereport(LOG,
  					(errmsg("received smart shutdown request")));
  
- 			/*
- 			 * We won't wait out an autovacuum iteration ...
- 			 */
- 			if (AutoVacPID != 0)
- 			{
- 				/* Use statement cancel to shut it down */
- 				signal_child(AutoVacPID, SIGINT);
- 				break;			/* let reaper() handle this */
- 			}
- 
  			if (DLGetHead(BackendList))
  				break;			/* let reaper() handle this */
  
--- 1896,1901 ----
*************** pmdie(SIGNAL_ARGS)
*** 1903,1908 ****
--- 1916,1924 ----
  			/* Tell pgstat to shut down too; nothing left for it to do */
  			if (PgStatPID != 0)
  				signal_child(PgStatPID, SIGQUIT);
+ 			/* Tell autovac launcher to shut down too */
+ 			if (AutoVacPID != 0)
+ 				signal_child(AutoVacPID, SIGTERM);
  			break;
  
  		case SIGINT:
*************** pmdie(SIGNAL_ARGS)
*** 1919,1933 ****
  			ereport(LOG,
  					(errmsg("received fast shutdown request")));
  
! 			if (DLGetHead(BackendList) || AutoVacPID != 0)
  			{
  				if (!FatalError)
  				{
  					ereport(LOG,
  							(errmsg("aborting any active transactions")));
  					SignalChildren(SIGTERM);
- 					if (AutoVacPID != 0)
- 						signal_child(AutoVacPID, SIGTERM);
  					/* reaper() does the rest */
  				}
  				break;
--- 1935,1947 ----
  			ereport(LOG,
  					(errmsg("received fast shutdown request")));
  
! 			if (DLGetHead(BackendList))
  			{
  				if (!FatalError)
  				{
  					ereport(LOG,
  							(errmsg("aborting any active transactions")));
  					SignalChildren(SIGTERM);
  					/* reaper() does the rest */
  				}
  				break;
*************** pmdie(SIGNAL_ARGS)
*** 1958,1963 ****
--- 1972,1980 ----
  			/* Tell pgstat to shut down too; nothing left for it to do */
  			if (PgStatPID != 0)
  				signal_child(PgStatPID, SIGQUIT);
+ 			/* Tell autovac launcher to shut down too */
+ 			if (AutoVacPID != 0)
+ 				signal_child(AutoVacPID, SIGTERM);
  			break;
  
  		case SIGQUIT:
*************** reaper(SIGNAL_ARGS)
*** 2072,2079 ****
  
  			/*
  			 * Go to shutdown mode if a shutdown request was pending.
! 			 * Otherwise, try to start the archiver and stats collector too.
! 			 * (We could, but don't, try to start autovacuum here.)
  			 */
  			if (Shutdown > NoShutdown && BgWriterPID != 0)
  				signal_child(BgWriterPID, SIGUSR2);
--- 2089,2096 ----
  
  			/*
  			 * Go to shutdown mode if a shutdown request was pending.
! 			 * Otherwise, try to start the archiver, stats collector and
! 			 * autovacuum launcher.
  			 */
  			if (Shutdown > NoShutdown && BgWriterPID != 0)
  				signal_child(BgWriterPID, SIGUSR2);
*************** reaper(SIGNAL_ARGS)
*** 2083,2088 ****
--- 2100,2107 ----
  					PgArchPID = pgarch_start();
  				if (PgStatPID == 0)
  					PgStatPID = pgstat_start();
+ 				if (AutoVacuumingActive() && AutoVacPID == 0)
+ 					AutoVacPID = StartAutoVacLauncher();
  			}
  
  			continue;
*************** reaper(SIGNAL_ARGS)
*** 2137,2154 ****
  		}
  
  		/*
! 		 * Was it the autovacuum process?  Normal or FATAL exit can be
! 		 * ignored; we'll start a new one at the next iteration of the
! 		 * postmaster's main loop, if necessary.  Any other exit condition
! 		 * is treated as a crash.
  		 */
  		if (AutoVacPID != 0 && pid == AutoVacPID)
  		{
  			AutoVacPID = 0;
! 			autovac_stopped();
! 			if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
  				HandleChildCrash(pid, exitstatus,
! 								 _("autovacuum process"));
  			continue;
  		}
  
--- 2156,2171 ----
  		}
  
  		/*
! 		 * Was it the autovacuum launcher?  Normal exit can be ignored; we'll
! 		 * start a new one at the next iteration of the postmaster's main loop,
! 		 * if necessary.  Any other exit condition is treated as a crash.
  		 */
  		if (AutoVacPID != 0 && pid == AutoVacPID)
  		{
  			AutoVacPID = 0;
! 			if (!EXIT_STATUS_0(exitstatus))
  				HandleChildCrash(pid, exitstatus,
! 								 _("autovacuum launcher process"));
  			continue;
  		}
  
*************** reaper(SIGNAL_ARGS)
*** 2226,2232 ****
  
  	if (Shutdown > NoShutdown)
  	{
! 		if (DLGetHead(BackendList) || StartupPID != 0 || AutoVacPID != 0)
  			goto reaper_done;
  		/* Start the bgwriter if not running */
  		if (BgWriterPID == 0)
--- 2243,2249 ----
  
  	if (Shutdown > NoShutdown)
  	{
! 		if (DLGetHead(BackendList) || StartupPID != 0)
  			goto reaper_done;
  		/* Start the bgwriter if not running */
  		if (BgWriterPID == 0)
*************** reaper(SIGNAL_ARGS)
*** 2240,2245 ****
--- 2257,2265 ----
  		/* Tell pgstat to shut down too; nothing left for it to do */
  		if (PgStatPID != 0)
  			signal_child(PgStatPID, SIGQUIT);
+ 		/* Tell autovac launcher to shut down too */
+ 		if (AutoVacPID != 0)
+ 			signal_child(AutoVacPID, SIGTERM);
  	}
  
  reaper_done:
*************** HandleChildCrash(int pid, int exitstatus
*** 2367,2373 ****
  		signal_child(BgWriterPID, (SendStop ? SIGSTOP : SIGQUIT));
  	}
  
! 	/* Take care of the autovacuum daemon too */
  	if (pid == AutoVacPID)
  		AutoVacPID = 0;
  	else if (AutoVacPID != 0 && !FatalError)
--- 2387,2393 ----
  		signal_child(BgWriterPID, (SendStop ? SIGSTOP : SIGQUIT));
  	}
  
! 	/* Take care of the autovacuum launcher too */
  	if (pid == AutoVacPID)
  		AutoVacPID = 0;
  	else if (AutoVacPID != 0 && !FatalError)
*************** signal_child(pid_t pid, int signal)
*** 2487,2493 ****
  }
  
  /*
!  * Send a signal to all backend children (but NOT special children)
   */
  static void
  SignalChildren(int signal)
--- 2507,2514 ----
  }
  
  /*
!  * Send a signal to all backend children, including autovacuum workers (but NOT
!  * special children)
   */
  static void
  SignalChildren(int signal)
*************** SubPostmasterMain(int argc, char *argv[]
*** 3305,3317 ****
  	 * same address the postmaster used.
  	 */
  	if (strcmp(argv[1], "--forkbackend") == 0 ||
! 		strcmp(argv[1], "--forkautovac") == 0 ||
  		strcmp(argv[1], "--forkboot") == 0)
  		PGSharedMemoryReAttach();
  
  	/* autovacuum needs this set before calling InitProcess */
! 	if (strcmp(argv[1], "--forkautovac") == 0)
! 		AutovacuumIAm();
  
  	/*
  	 * Start our win32 signal implementation. This has to be done after we
--- 3326,3341 ----
  	 * same address the postmaster used.
  	 */
  	if (strcmp(argv[1], "--forkbackend") == 0 ||
! 		strcmp(argv[1], "--forkavlauncher") == 0 ||
! 		strcmp(argv[1], "--forkavworker") == 0 ||
  		strcmp(argv[1], "--forkboot") == 0)
  		PGSharedMemoryReAttach();
  
  	/* autovacuum needs this set before calling InitProcess */
! 	if (strcmp(argv[1], "--forkavlauncher") == 0)
! 		AutovacuumLauncherIAm();
! 	if (strcmp(argv[1], "--forkavworker") == 0)
! 		AutovacuumWorkerIAm();
  
  	/*
  	 * Start our win32 signal implementation. This has to be done after we
*************** SubPostmasterMain(int argc, char *argv[]
*** 3397,3403 ****
  		BootstrapMain(argc - 2, argv + 2);
  		proc_exit(0);
  	}
! 	if (strcmp(argv[1], "--forkautovac") == 0)
  	{
  		/* Close the postmaster's sockets */
  		ClosePostmasterPorts(false);
--- 3421,3444 ----
  		BootstrapMain(argc - 2, argv + 2);
  		proc_exit(0);
  	}
! 	if (strcmp(argv[1], "--forkavlauncher") == 0)
! 	{
! 		/* Close the postmaster's sockets */
! 		ClosePostmasterPorts(false);
! 
! 		/* Restore basic shared memory pointers */
! 		InitShmemAccess(UsedShmemSegAddr);
! 
! 		/* Need a PGPROC to run CreateSharedMemoryAndSemaphores */
! 		InitDummyProcess();
! 
! 		/* Attach process to shared data structures */
! 		CreateSharedMemoryAndSemaphores(false, 0);
! 
! 		AutoVacLauncherMain(argc - 2, argv + 2);
! 		proc_exit(0);
! 	}
! 	if (strcmp(argv[1], "--forkavworker") == 0)
  	{
  		/* Close the postmaster's sockets */
  		ClosePostmasterPorts(false);
*************** SubPostmasterMain(int argc, char *argv[]
*** 3411,3417 ****
  		/* Attach process to shared data structures */
  		CreateSharedMemoryAndSemaphores(false, 0);
  
! 		AutoVacMain(argc - 2, argv + 2);
  		proc_exit(0);
  	}
  	if (strcmp(argv[1], "--forkarch") == 0)
--- 3452,3458 ----
  		/* Attach process to shared data structures */
  		CreateSharedMemoryAndSemaphores(false, 0);
  
! 		AutoVacWorkerMain(argc - 2, argv + 2);
  		proc_exit(0);
  	}
  	if (strcmp(argv[1], "--forkarch") == 0)
*************** sigusr1_handler(SIGNAL_ARGS)
*** 3495,3505 ****
  		 * See storage/ipc/sinval[adt].c for the use of this.
  		 */
  		if (Shutdown <= SmartShutdown)
- 		{
  			SignalChildren(SIGUSR1);
- 			if (AutoVacPID != 0)
- 				signal_child(AutoVacPID, SIGUSR1);
- 		}
  	}
  
  	if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) &&
--- 3536,3542 ----
*************** sigusr1_handler(SIGNAL_ARGS)
*** 3519,3525 ****
  		signal_child(SysLoggerPID, SIGUSR1);
  	}
  
! 	if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC))
  	{
  		/*
  		 * Start one iteration of the autovacuum daemon, even if autovacuuming
--- 3556,3562 ----
  		signal_child(SysLoggerPID, SIGUSR1);
  	}
  
! 	if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER))
  	{
  		/*
  		 * Start one iteration of the autovacuum daemon, even if autovacuuming
*************** sigusr1_handler(SIGNAL_ARGS)
*** 3533,3538 ****
--- 3570,3584 ----
  		force_autovac = true;
  	}
  
+ 	if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER))
+ 	{
+ 		/*
+ 		 * The autovacuum launcher wants us to start an autovacuum worker
+ 		 * process.  Let the main loop do it.
+ 		 */
+ 		start_autovac_worker = true;
+ 	}
+ 
  	PG_SETMASK(&UnBlockSig);
  
  	errno = save_errno;
Index: src/backend/storage/ipc/ipci.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/storage/ipc/ipci.c,v
retrieving revision 1.90
diff -c -p -r1.90 ipci.c
*** src/backend/storage/ipc/ipci.c	5 Jan 2007 22:19:37 -0000	1.90
--- src/backend/storage/ipc/ipci.c	26 Jan 2007 17:23:00 -0000
***************
*** 21,26 ****
--- 21,27 ----
  #include "access/twophase.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "postmaster/autovacuum.h"
  #include "postmaster/bgwriter.h"
  #include "postmaster/postmaster.h"
  #include "storage/freespace.h"
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 109,114 ****
--- 110,116 ----
  		size = add_size(size, SInvalShmemSize());
  		size = add_size(size, FreeSpaceShmemSize());
  		size = add_size(size, BgWriterShmemSize());
+ 		size = add_size(size, AutoVacuumShmemSize());
  		size = add_size(size, BTreeShmemSize());
  #ifdef EXEC_BACKEND
  		size = add_size(size, ShmemBackendArraySize());
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 208,213 ****
--- 210,216 ----
  	 */
  	PMSignalInit();
  	BgWriterShmemInit();
+ 	AutoVacuumShmemInit();
  
  	/*
  	 * Set up other modules that need some shared memory space
Index: src/backend/storage/lmgr/proc.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/storage/lmgr/proc.c,v
retrieving revision 1.183
diff -c -p -r1.183 proc.c
*** src/backend/storage/lmgr/proc.c	16 Jan 2007 13:28:56 -0000	1.183
--- src/backend/storage/lmgr/proc.c	26 Jan 2007 23:08:05 -0000
*************** InitProcess(void)
*** 259,265 ****
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
  	MyProc->inVacuum = false;
! 	MyProc->isAutovacuum = IsAutoVacuumProcess();
  	MyProc->lwWaiting = false;
  	MyProc->lwExclusive = false;
  	MyProc->lwWaitLink = NULL;
--- 259,265 ----
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
  	MyProc->inVacuum = false;
! 	MyProc->isAutovacuum = IsAutoVacuumWorkerProcess();
  	MyProc->lwWaiting = false;
  	MyProc->lwExclusive = false;
  	MyProc->lwWaitLink = NULL;
*************** InitDummyProcess(void)
*** 392,398 ****
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
  	MyProc->inVacuum = false;
! 	MyProc->isAutovacuum = false;
  	MyProc->lwWaiting = false;
  	MyProc->lwExclusive = false;
  	MyProc->lwWaitLink = NULL;
--- 392,398 ----
  	MyProc->databaseId = InvalidOid;
  	MyProc->roleId = InvalidOid;
  	MyProc->inVacuum = false;
! 	MyProc->isAutovacuum = IsAutoVacuumLauncherProcess(); /* is this needed? */
  	MyProc->lwWaiting = false;
  	MyProc->lwExclusive = false;
  	MyProc->lwWaitLink = NULL;
Index: src/backend/utils/init/miscinit.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/init/miscinit.c,v
retrieving revision 1.160
diff -c -p -r1.160 miscinit.c
*** src/backend/utils/init/miscinit.c	5 Jan 2007 22:19:44 -0000	1.160
--- src/backend/utils/init/miscinit.c	26 Jan 2007 23:08:32 -0000
*************** InitializeSessionUserId(const char *role
*** 401,407 ****
  	 *
  	 * We do not enforce them for the autovacuum process either.
  	 */
! 	if (IsUnderPostmaster && !IsAutoVacuumProcess())
  	{
  		/*
  		 * Is role allowed to login at all?
--- 401,407 ----
  	 *
  	 * We do not enforce them for the autovacuum process either.
  	 */
! 	if (IsUnderPostmaster && !IsAutoVacuumWorkerProcess())
  	{
  		/*
  		 * Is role allowed to login at all?
*************** void
*** 462,468 ****
  InitializeSessionUserIdStandalone(void)
  {
  	/* This function should only be called in a single-user backend. */
! 	AssertState(!IsUnderPostmaster || IsAutoVacuumProcess());
  
  	/* call only once */
  	AssertState(!OidIsValid(AuthenticatedUserId));
--- 462,468 ----
  InitializeSessionUserIdStandalone(void)
  {
  	/* This function should only be called in a single-user backend. */
! 	AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess());
  
  	/* call only once */
  	AssertState(!OidIsValid(AuthenticatedUserId));
Index: src/backend/utils/init/postinit.c
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/backend/utils/init/postinit.c,v
retrieving revision 1.173
diff -c -p -r1.173 postinit.c
*** src/backend/utils/init/postinit.c	5 Jan 2007 22:19:44 -0000	1.173
--- src/backend/utils/init/postinit.c	26 Jan 2007 23:05:23 -0000
*************** CheckMyDatabase(const char *name, bool a
*** 135,143 ****
  	 * a way to recover from disabling all access to all databases, for
  	 * example "UPDATE pg_database SET datallowconn = false;".
  	 *
! 	 * We do not enforce them for the autovacuum process either.
  	 */
! 	if (IsUnderPostmaster && !IsAutoVacuumProcess())
  	{
  		/*
  		 * Check that the database is currently allowing connections.
--- 135,143 ----
  	 * a way to recover from disabling all access to all databases, for
  	 * example "UPDATE pg_database SET datallowconn = false;".
  	 *
! 	 * We do not enforce them for the autovacuum worker processes either.
  	 */
! 	if (IsUnderPostmaster && !IsAutoVacuumWorkerProcess())
  	{
  		/*
  		 * Check that the database is currently allowing connections.
*************** bool
*** 288,294 ****
  InitPostgres(const char *dbname, const char *username)
  {
  	bool		bootstrap = IsBootstrapProcessingMode();
! 	bool		autovacuum = IsAutoVacuumProcess();
  	bool		am_superuser;
  	char	   *fullpath;
  
--- 288,294 ----
  InitPostgres(const char *dbname, const char *username)
  {
  	bool		bootstrap = IsBootstrapProcessingMode();
! 	bool		autovacuum = IsAutoVacuumWorkerProcess();
  	bool		am_superuser;
  	char	   *fullpath;
  
Index: src/include/c.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/c.h,v
retrieving revision 1.217
diff -c -p -r1.217 c.h
*** src/include/c.h	25 Jan 2007 03:30:43 -0000	1.217
--- src/include/c.h	26 Jan 2007 16:39:14 -0000
*************** extern int	fdatasync(int fildes);
*** 817,822 ****
--- 817,823 ----
  #define HAVE_STRTOULL 1
  #endif
  
+ #define EXEC_BACKEND
  /* EXEC_BACKEND defines */
  #ifdef EXEC_BACKEND
  #define NON_EXEC_STATIC
Index: src/include/pgstat.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/pgstat.h,v
retrieving revision 1.52
diff -c -p -r1.52 pgstat.h
*** src/include/pgstat.h	5 Jan 2007 22:19:50 -0000	1.52
--- src/include/pgstat.h	26 Jan 2007 16:39:14 -0000
*************** extern PgStat_StatDBEntry *pgstat_fetch_
*** 481,485 ****
--- 481,486 ----
  extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
  extern PgBackendStatus *pgstat_fetch_stat_beentry(int beid);
  extern int	pgstat_fetch_stat_numbackends(void);
+ extern void pgstat_stathash_reset(void);
  
  #endif   /* PGSTAT_H */
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.7
diff -c -p -r1.7 autovacuum.h
*** src/include/postmaster/autovacuum.h	16 Jan 2007 13:28:57 -0000	1.7
--- src/include/postmaster/autovacuum.h	26 Jan 2007 23:07:09 -0000
*************** extern int	autovacuum_vac_cost_limit;
*** 27,42 ****
  
  /* Status inquiry functions */
  extern bool AutoVacuumingActive(void);
! extern bool IsAutoVacuumProcess(void);
  
  /* Functions to start autovacuum process, called from postmaster */
  extern void autovac_init(void);
! extern int	autovac_start(void);
! extern void autovac_stopped(void);
  
  #ifdef EXEC_BACKEND
! extern void AutoVacMain(int argc, char *argv[]);
! extern void AutovacuumIAm(void);
  #endif
  
  #endif   /* AUTOVACUUM_H */
--- 27,51 ----
  
  /* Status inquiry functions */
  extern bool AutoVacuumingActive(void);
! extern bool IsAutoVacuumLauncherProcess(void);
! extern bool IsAutoVacuumWorkerProcess(void);
  
  /* Functions to start autovacuum process, called from postmaster */
  extern void autovac_init(void);
! extern int	StartAutoVacLauncher(void);
! extern int	StartAutoVacWorker(void);
  
  #ifdef EXEC_BACKEND
! extern void AutoVacLauncherMain(int argc, char *argv[]);
! extern void AutoVacWorkerMain(int argc, char *argv[]);
! extern void AutovacuumWorkerIAm(void);
! extern void AutovacuumLauncherIAm(void);
  #endif
  
+ /* shared memory stuff */
+ extern Size AutoVacuumShmemSize(void);
+ extern void AutoVacuumShmemInit(void);
+ 
+ extern MemoryContext AutoVacGetStatsContext(void);
+ 
  #endif   /* AUTOVACUUM_H */
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.33
diff -c -p -r1.33 lwlock.h
*** src/include/storage/lwlock.h	5 Jan 2007 22:19:58 -0000	1.33
--- src/include/storage/lwlock.h	26 Jan 2007 16:39:14 -0000
*************** typedef enum LWLockId
*** 62,67 ****
--- 62,68 ----
  	BtreeVacuumLock,
  	AddinShmemInitLock,
  	FirstBufMappingLock,
+ 	AutovacuumLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
  
  	/* must be last except for MaxDynamicLWLock: */
Index: src/include/storage/pmsignal.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/storage/pmsignal.h,v
retrieving revision 1.16
diff -c -p -r1.16 pmsignal.h
*** src/include/storage/pmsignal.h	5 Jan 2007 22:19:58 -0000	1.16
--- src/include/storage/pmsignal.h	26 Jan 2007 23:10:59 -0000
*************** typedef enum
*** 26,32 ****
  	PMSIGNAL_WAKEN_CHILDREN,	/* send a SIGUSR1 signal to all backends */
  	PMSIGNAL_WAKEN_ARCHIVER,	/* send a NOTIFY signal to xlog archiver */
  	PMSIGNAL_ROTATE_LOGFILE,	/* send SIGUSR1 to syslogger to rotate logfile */
! 	PMSIGNAL_START_AUTOVAC,		/* start an autovacuum iteration */
  
  	NUM_PMSIGNALS				/* Must be last value of enum! */
  } PMSignalReason;
--- 26,33 ----
  	PMSIGNAL_WAKEN_CHILDREN,	/* send a SIGUSR1 signal to all backends */
  	PMSIGNAL_WAKEN_ARCHIVER,	/* send a NOTIFY signal to xlog archiver */
  	PMSIGNAL_ROTATE_LOGFILE,	/* send SIGUSR1 to syslogger to rotate logfile */
! 	PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
! 	PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
  
  	NUM_PMSIGNALS				/* Must be last value of enum! */
  } PMSignalReason;
Index: src/include/storage/proc.h
===================================================================
RCS file: /home/alvherre/cvs/pgsql/src/include/storage/proc.h,v
retrieving revision 1.93
diff -c -p -r1.93 proc.h
*** src/include/storage/proc.h	16 Jan 2007 13:28:57 -0000	1.93
--- src/include/storage/proc.h	26 Jan 2007 16:39:14 -0000
*************** typedef struct PROC_HDR
*** 121,127 ****
   * We set aside some extra PGPROC structures for "dummy" processes,
   * ie things that aren't full-fledged backends but need shmem access.
   */
! #define NUM_DUMMY_PROCS		2
  
  
  /* configurable options */
--- 121,127 ----
   * We set aside some extra PGPROC structures for "dummy" processes,
   * ie things that aren't full-fledged backends but need shmem access.
   */
! #define NUM_DUMMY_PROCS		3
  
  
  /* configurable options */
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Alvaro Herrera (#1)
Re: Autovacuum launcher patch

Alvaro Herrera <alvherre@commandprompt.com> writes:

The launcher is a dummy process; it never connects to any database.
... Eventually this will need to
be changed so that the launcher tells the worker exactly what table to
work on.

I detect a certain lack of clarity of thinking here. Either the
launcher can read databases or it can't. Do you intend to solve the
problem of all the transaction/catcache infrastructure being designed
on the assumption of being in exactly one database?

I'd suggest sticking to something closer to the current two-phase design
where you make some preliminary decision which database to send a worker
to, and then the worker determines exactly what to do once it can look
around inside the DB. Possibly we need some back-signaling mechanism
whereby a worker can tell the launcher "hey boss, send help" if it sees
that there are enough tables that need work, but I'm unenthused about
having the launcher figure that out itself.

regards, tom lane

#3Alvaro Herrera
alvherre@commandprompt.com
In reply to: Tom Lane (#2)
Re: Autovacuum launcher patch

Tom Lane wrote:

Alvaro Herrera <alvherre@commandprompt.com> writes:

The launcher is a dummy process; it never connects to any database.
... Eventually this will need to
be changed so that the launcher tells the worker exactly what table to
work on.

I detect a certain lack of clarity of thinking here. Either the
launcher can read databases or it can't. Do you intend to solve the
problem of all the transaction/catcache infrastructure being designed
on the assumption of being in exactly one database?

I had the same thought, but then I realized that most of the needed data
is actually stored in pgstat, so we don't need to connect to any
database to get it. (Hmm, except pg_class.reltuples).

What will probably live in databases will be the scheduling catalogs,
but I think I suggested that we could solve that problem by storing the
contents of those in plain text files, like pg_database.

I don't think this is a fundamental problem with the current patch
though. I've refrained from committing it mostly because I'd like
someone else to eyeball it just for safety, so I'll still allow for
several days to pass (unless there is a rush for getting it in ...)

I'd suggest sticking to something closer to the current two-phase design
where you make some preliminary decision which database to send a worker
to, and then the worker determines exactly what to do once it can look
around inside the DB. Possibly we need some back-signaling mechanism
whereby a worker can tell the launcher "hey boss, send help" if it sees
that there are enough tables that need work, but I'm unenthused about
having the launcher figure that out itself.

Hmm, yeah, we'll probably need some communication channel eventually.

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

#4Markus Schiltknecht
markus@bluegap.ch
In reply to: Alvaro Herrera (#3)
Re: [pgsql-patches] Autovacuum launcher patch

Alvaro Herrera wrote:

I'd suggest sticking to something closer to the current two-phase design
where you make some preliminary decision which database to send a worker
to, and then the worker determines exactly what to do once it can look
around inside the DB. Possibly we need some back-signaling mechanism
whereby a worker can tell the launcher "hey boss, send help" if it sees
that there are enough tables that need work, but I'm unenthused about
having the launcher figure that out itself.

Hmm, yeah, we'll probably need some communication channel eventually.

Maybe my IMessages code could be of use?

It's still awfully slow compared with UNIX pipes or even System V IPC
message queues, since it uses LWLocks for sending and retrieving
messages. That could certainly be optimized, maybe even towards a
lock-free implementation, which could theoretically be as fast as System
V IPC messages. OTOH, such stuff is hard to get right.

Regards

Markus

#5Alvaro Herrera
alvherre@commandprompt.com
In reply to: Markus Schiltknecht (#4)
Re: [pgsql-patches] Autovacuum launcher patch

Markus Schiltknecht wrote:

Alvaro Herrera wrote:

I'd suggest sticking to something closer to the current two-phase design
where you make some preliminary decision which database to send a worker
to, and then the worker determines exactly what to do once it can look
around inside the DB. Possibly we need some back-signaling mechanism
whereby a worker can tell the launcher "hey boss, send help" if it sees
that there are enough tables that need work, but I'm unenthused about
having the launcher figure that out itself.

Hmm, yeah, we'll probably need some communication channel eventually.

Maybe my IMessages code could be of use?

It's still awfully slow compared with UNIX pipes or even System V IPC
message queues, since it uses LWLocks for sending and retrieving
messages. That could certainly be optimized, maybe even towards a
lock-free implementation, which could theoretically be as fast as System
V IPC messages. OTOH, such stuff is hard to get right.

Hmm, I remember eyeballing that code. Would you mind sending me an URL
to that file, or something? Or maybe send me the files themselves?

--
Alvaro Herrera http://www.CommandPrompt.com/
The PostgreSQL Company - Command Prompt, Inc.

#6Markus Schiltknecht
markus@bluegap.ch
In reply to: Alvaro Herrera (#5)
1 attachment(s)
Re: Autovacuum launcher patch

Alvaro Herrera wrote:

Hmm, I remember eyeballing that code. Would you mind sending me an URL
to that file, or something? Or maybe send me the files themselves?

Sure, here's a patch against current CVS. Please remove all the
functions referencing to "buffer" and "buffer.h" to compile.

Remember that it's a work in progress thing. It has flaws. One issue
that currently bugs me is, that processes can deadlock if they keep
trying to create a message (IMessagesCreate), but fail because the queue
is full of messages for themselves. A process should thus always try to
fetch messages (via IMessagesCheck) and remove pending ones before
retrying to send one. That's not always practical.

One design limitation is, that you have to know how large your message
is as soon as you reserve (shared) memory for it, but that's intended.

At least I've stress tested the wrap-around code and it seems to work.
No guarantees, though ;-)

Regards

Markus

Attachments:

curr_imessages.patchtext/x-diff; charset=iso-8859-1; name=curr_imessages.patchDownload
# 
# old_revision [9a68fa59cb0ca3246f03880664062abb98f1a61a]
# 
# add_file "src/backend/storage/ipc/imsg.c"
#  content [3e84c6372a47612a2fe233fee6b122808135580e]
# 
# add_file "src/include/storage/imsg.h"
#  content [3cf37b12a00b90f65b8393fc5e27c98d772dc22b]
# 
# patch "src/backend/storage/ipc/Makefile"
#  from [71276ab6483aebbb27f87c988d77ab876611f190]
#    to [9a99101d3e8bbfe52c97763db536804e94371828]
# 
# patch "src/backend/storage/ipc/ipci.c"
#  from [177f266b4668190a6ab1f2902305f7b7e577ef8d]
#    to [1971e2122ba4455c8b9784e70059d917fdf4f4c8]
# 
============================================================
--- src/backend/storage/ipc/imsg.c	3e84c6372a47612a2fe233fee6b122808135580e
+++ src/backend/storage/ipc/imsg.c	3e84c6372a47612a2fe233fee6b122808135580e
@@ -0,0 +1,375 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <markus@bluegap.ch>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+
+#ifdef HAVE_SYS_FILIO_H
+#include <sys/filio.h>
+#endif
+
+#include <sys/ioctl.h>
+
+#include "postgres.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "storage/imsg.h"
+#include "storage/ipc.h"
+#include "storage/buffer.h"
+#include "storage/spin.h"
+#include "utils/elog.h"
+
+/* global variable pointing to the shmem area */
+IMessageCtlData *IMessageCtl = NULL;
+
+/*
+ * Initialization of shared memory for internal messages.
+ */
+int
+IMessageShmemSize(void)
+{
+	return MAXALIGN(IMessageBufferSize);
+}
+
+void
+IMessageShmemInit(void)
+{
+	bool		foundIMessageCtl;
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+#endif
+
+	IMessageCtl = (IMessageCtlData *)
+		ShmemInitStruct("IMsgCtl",
+						MAXALIGN(IMessageBufferSize),
+						&foundIMessageCtl);
+
+	if (foundIMessageCtl)
+		return;
+
+	/* empty the control structure and all message descriptors */
+	memset(IMessageCtl, 0, MAXALIGN(IMessageBufferSize));
+
+	/* initialize start and end pointers */
+	IMessageCtl->queue_start = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+	IMessageCtl->queue_end = (IMessage*) IMSG_BUFFER_START(IMessageCtl);
+
+	SpinLockInit(&IMessageCtl->msgs_lck);
+}
+
+/*
+ *   IMessageCreate
+ *
+ * creates a new but deactivated message within the queue, returning the
+ * message header of the newly created message.
+ */
+IMessage*
+IMessageCreate(int recipient, int msg_size)
+{
+	IMessage	   *msg;
+	int				remaining_space;
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageCreate(): recipient: %d, size: %d",
+		recipient, msg_size);
+#endif
+
+	/* assert a reasonable maximum message size */
+	Assert(msg_size < (MAXALIGN(IMessageBufferSize) / 4));
+
+	START_CRIT_SECTION();
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+		SpinLockAcquire(&imsgctl->msgs_lck);
+
+		/*
+		 * Check if there is enough space for the message plus the
+		 * terminating header
+		 */
+		if (imsgctl->queue_end < imsgctl->queue_start)
+			remaining_space = (int) imsgctl->queue_start -
+							  (int) imsgctl->queue_end;
+		else
+			remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+							  (int) imsgctl->queue_end;
+
+		if (remaining_space < (MAXALIGN(IMessageBufferSize) / 8))
+		{
+#ifdef IMSG_DEBUG
+			elog(DEBUG3, "IMessageCreate(): cleanup starting");
+#endif
+
+			/* Clean up messages that have been removed. */
+			while (imsgctl->queue_start->recipient == 0)
+			{
+				if (imsgctl->queue_start > imsgctl->queue_end)
+				{
+					if ((imsgctl->queue_start->sender == 0) &&
+						(imsgctl->queue_start->recipient == 0))
+					{
+#ifdef IMSG_DEBUG
+			elog(DEBUG3, "IMessageCreate(): cleanup wrapped");
+#endif
+						imsgctl->queue_start = (IMessage*) IMSG_BUFFER_START(imsgctl);
+						continue;
+					}
+				}
+				else if (imsgctl->queue_start >= imsgctl->queue_end)
+					break;
+
+				imsgctl->queue_start = (IMessage*) (
+					(int) imsgctl->queue_start +
+					IMSG_ALIGN(imsgctl->queue_start->size +
+							   sizeof(IMessage)));
+			}
+
+			/* recalc remainig space */
+			if (imsgctl->queue_end < imsgctl->queue_start)
+				remaining_space = (int) imsgctl->queue_start -
+								  (int) imsgctl->queue_end;
+			else
+				remaining_space = (int) IMSG_BUFFER_END(imsgctl) -
+								  (int) imsgctl->queue_end;
+
+		}
+
+		if (IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) < remaining_space)
+		{
+			msg = (IMessage*) imsgctl->queue_end;
+			imsgctl->queue_end = (IMessage*) ((int) imsgctl->queue_end + 
+								 IMSG_ALIGN(msg_size + sizeof(IMessage)));
+		}
+		else
+		{
+			remaining_space = (int) imsgctl->queue_start -
+							  (int) IMSG_BUFFER_START(imsgctl);
+#ifdef IMSG_DEBUG
+			elog(DEBUG5, "IMessageCreate:    remaining wrap space: %d",
+				 remaining_space);
+#endif
+
+			/* There is not enough space. But maybe we can wrap around? */
+			if ((imsgctl->queue_end >= imsgctl->queue_start) &&
+				((int) IMSG_BUFFER_START(imsgctl) +
+				IMSG_ALIGN(msg_size + 2 * sizeof(IMessage)) <
+				(int) imsgctl->queue_start))
+			{
+				/* Yes, wrap around */
+#ifdef IMSG_DEBUG
+				elog(DEBUG5, "IMessageCreate: wrapped around.");
+#endif
+				msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+				imsgctl->queue_end = (IMessage*) ((int) msg +
+									IMSG_ALIGN(msg_size + sizeof(IMessage)));
+			}
+			else
+			{
+				/* TODO: correct error handling here... */
+				elog(ERROR, "Not enough space within IMessages buffer.");
+				SpinLockRelease(&imsgctl->msgs_lck);
+				return NULL;
+			}
+		}
+
+		/* initialize the message as inactive */
+		msg->sender = 0;
+		msg->recipient = recipient;
+		msg->size = msg_size;
+
+		/* clean the following block */
+		imsgctl->queue_end->sender = 0;
+		imsgctl->queue_end->recipient = 0;
+
+		/* queue editing finished */
+		SpinLockRelease(&imsgctl->msgs_lck);
+
+#ifdef IMSG_DEBUG
+	elog(DEBUG3, "IMessageCreate(): created at %08X size: %d (next: %08X)",
+		 (int) msg, msg->size, (unsigned int) imsgctl->queue_end);
+#endif
+	}
+	END_CRIT_SECTION();
+
+	return msg;
+}
+
+void
+IMessageForward(IMessage *msg, int new_recipient)
+{
+	msg->recipient = new_recipient;
+	msg->sender = 0;
+
+	IMessageActivate(msg);
+}
+
+void
+IMessageActivate(IMessage *msg)
+{
+	msg->sender = MyProcPid;
+
+	/* TODO: use PGPROC to determine if the recipient wants to be signaled,
+	 *       probably we can save that signaling step in certain occasions.
+	 */
+
+	/* send a signal to the recipient */
+	kill(msg->recipient, SIGUSR1);
+}
+
+/*
+ *   IMessageRemove
+ *
+ * Marks a message as removable by setting the recipient to null. The message
+ * will eventually be removed during creation of new messages, see
+ * IMessageCreate().
+ */
+void
+IMessageRemove(IMessage *msg)
+{
+	msg->recipient = 0;
+}
+
+/*
+ *   IMessageCheck
+ *
+ * Checks if there is a message in the queue for this process. Returns null
+ * if there is no message for this process, the message header otherwise. The
+ * message remains in the queue and should be removed by IMessageRemove().
+ */
+IMessage*
+IMessageCheck(void)
+{
+	IMessage	   *msg,
+				   *res;
+
+	res = NULL;
+	START_CRIT_SECTION();
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile IMessageCtlData *imsgctl = IMessageCtl;
+
+		SpinLockAcquire(&imsgctl->msgs_lck);
+
+		/* Loop through the queue from the start. Wraping might be
+		 * required */
+		msg = imsgctl->queue_start;
+		while (1)
+		{
+			if (((int) msg >= (int) imsgctl->queue_start) &&
+				((int) imsgctl->queue_start > (int) imsgctl->queue_end))
+			{
+				if ((msg->sender == 0) &&
+					(msg->recipient == 0))
+				{
+					msg = (IMessage*) IMSG_BUFFER_START(imsgctl);
+					continue;
+				}
+			}
+			else if (msg >= imsgctl->queue_end)
+				break;
+
+			if ((msg->sender != 0) && (msg->recipient == MyProcPid))
+			{
+				res = msg;
+				break;
+			}
+ 
+			msg = (IMessage*) ((int) msg +
+					IMSG_ALIGN(msg->size + sizeof(IMessage)));
+		}
+
+		SpinLockRelease(&imsgctl->msgs_lck);
+	}
+	END_CRIT_SECTION();
+
+#ifdef IMSG_DEBUG
+	if (res == NULL)
+		elog(DEBUG3, "IMessageCheck(): no new message for %d.", MyProcPid);
+	else
+		elog(DEBUG3, "IMessageCheck(): new message of size %d for %d.",
+				msg->size, MyProcPid);
+#endif
+
+	return res;
+}
+
+/*
+ *   IMessageAwait
+ *
+ * Waits for a message but leaves the message in the queue.
+ */
+IMessage*
+IMessageAwait(void)
+{
+	IMessage	   *msg;
+	struct timeval	tv;
+
+	msg = IMessageCheck();
+	while (!msg)
+	{
+		/*
+		 * TODO: we want to wait for signals here. Check if select() is
+		 * appropriate. Maybe pause() is better, but how about portability?
+		 * However, make sure we have a timeout here, since we could
+		 * probably miss a signal.
+		 */
+		tv.tv_sec = 2;
+		tv.tv_usec = 0;
+		select(1, NULL, NULL, NULL, &tv);
+		// pause();
+		msg = IMessageCheck();
+	}
+
+	return msg;
+}
+
+/*
+ *   IMessageGetReadBuffer
+ *
+ * gets a readable buffer for the given message
+ */
+buffer *
+IMessageGetReadBuffer(IMessage *msg)
+{
+	buffer *b = palloc(sizeof(buffer));
+
+	Assert(msg);
+	Assert(msg->size > 0);
+
+	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+	b->fill_size = msg->size;
+
+	return b;
+}
+
+/*
+ *   IMessageGetWriteBuffer
+ *
+ * gets a writeable buffer for the given message
+ */
+buffer *
+IMessageGetWriteBuffer(IMessage *msg)
+{
+	buffer *b = palloc(sizeof(buffer));
+
+	init_buffer(b, IMSG_DATA(msg), msg->size, NULL, NULL, NULL);
+
+	return b;
+}
+
+void
+IMessageFreeBuffer(buffer *b)
+{
+	pfree(b);
+}
============================================================
--- src/include/storage/imsg.h	3cf37b12a00b90f65b8393fc5e27c98d772dc22b
+++ src/include/storage/imsg.h	3cf37b12a00b90f65b8393fc5e27c98d772dc22b
@@ -0,0 +1,85 @@
+/*-------------------------------------------------------------------------
+ *
+ * imsg.c
+ *    internal messages from process to process sent via shared memory.
+ *
+ *
+ * Copyright (c) 2006, Markus Schiltknecht <markus@bluegap.ch>
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef IMSG_H
+#define IMSG_H
+
+#include <sys/types.h>
+#include "storage/spin.h"
+#include "storage/buffer.h"
+
+/* TODO: replace with GUC variable to be configurable */
+#define IMessageBufferSize 8388608		/* 8 MB */
+
+/* alignment for messages (8 bytes) */
+#define IMSG_ALIGN(size) (((size) + 7) & 0xFFFFFFF8)
+
+/* for convinience to buffer access */
+#define IMSG_BUFFER_START(imsgctl) ((int) \
+			(IMSG_ALIGN((int) imsgctl + sizeof(IMessageCtlData))))
+
+#define IMSG_BUFFER_END(imsgctl) ((int) \
+			(IMSG_ALIGN((int) imsgctl + MAXALIGN(IMessageBufferSize))))
+
+/* get a data pointer from the header */
+#define IMSG_DATA(imsg) ((void*) ((int) imsg + sizeof(IMessage)))
+
+/*
+ * Message descriptor in front of the message
+ */
+typedef struct
+{
+	/* pid of the sender, null means not yet activated message */
+	pid_t		sender;
+
+	/* pid of the recipient, null meaning has already been received */
+	pid_t		recipient;
+
+	/* message size following, but not including this header */
+	int			size;
+} IMessage;
+
+/*
+ * shared-memory pool for internal messages.
+ */
+typedef struct
+{
+	/* currently active messages */
+	unsigned int		count_messages;
+
+	/* start of messages within the cycling queue */
+	IMessage		   *queue_start;
+
+	/* next free place, just after the last message */
+	IMessage		   *queue_end;
+
+	/* lock for editing the message queue */
+	slock_t				msgs_lck;
+} IMessageCtlData;
+
+/* the global variable storing pointer to the shared memory area */
+extern IMessageCtlData *RmgrCtl;
+
+/* routines to send and receive internal messages */
+extern int IMessageShmemSize(void);
+extern void IMessageShmemInit(void);
+extern IMessage* IMessageCreate(int recipient, int msg_size);
+extern void IMessageForward(IMessage *msg, int new_recipient);
+extern void IMessageActivate(IMessage *msg);
+extern void IMessageRemove(IMessage *msg);
+extern IMessage* IMessageCheck(void);
+extern IMessage* IMessageAwait(void);
+
+extern buffer *IMessageGetReadBuffer(IMessage *msg);
+extern buffer *IMessageGetWriteBuffer(IMessage *msg);
+extern void IMessageFreeBuffer(buffer *b);
+
+#endif   /* IMSG_H */
============================================================
--- src/backend/storage/ipc/Makefile	71276ab6483aebbb27f87c988d77ab876611f190
+++ src/backend/storage/ipc/Makefile	9a99101d3e8bbfe52c97763db536804e94371828
@@ -16,7 +16,7 @@ OBJS = ipc.o ipci.o pmsignal.o procarray
 endif
 
 OBJS = ipc.o ipci.o pmsignal.o procarray.o shmem.o shmqueue.o \
-	sinval.o sinvaladt.o
+	sinval.o sinvaladt.o imsg.o buffer.o
 
 all: SUBSYS.o
 
============================================================
--- src/backend/storage/ipc/ipci.c	177f266b4668190a6ab1f2902305f7b7e577ef8d
+++ src/backend/storage/ipc/ipci.c	1971e2122ba4455c8b9784e70059d917fdf4f4c8
@@ -24,6 +24,7 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/postmaster.h"
 #include "storage/freespace.h"
+#include "storage/imsg.h"
 #include "storage/ipc.h"
 #include "storage/pg_shmem.h"
 #include "storage/pmsignal.h"
@@ -110,6 +111,7 @@ CreateSharedMemoryAndSemaphores(bool mak
 		size = add_size(size, FreeSpaceShmemSize());
 		size = add_size(size, BgWriterShmemSize());
 		size = add_size(size, BTreeShmemSize());
+		size = add_size(size, IMessageShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -178,6 +180,7 @@ CreateSharedMemoryAndSemaphores(bool mak
 	SUBTRANSShmemInit();
 	TwoPhaseShmemInit();
 	MultiXactShmemInit();
+	IMessageShmemInit();
 	InitBufferPool();
 
 	/*