bg worker: patch 5 of 6 - splitting
Started by Markus Wannerover 15 years ago1 messages
Splits the coordinator and background worker infrastructure code into
separate source and header files, namely the additional coordinator.c
and coordinator.h files, taking stuff from autovacuum.[ch]. Again,
hopefully no functional change.
Attachments:
step5-splitted.difftext/x-diff; charset=iso-8859-1; name=step5-splitted.diffDownload
*** src/backend/access/transam/varsup.c 5e3252795acfc8e04994336e4b432bdd0bc13238
--- src/backend/access/transam/varsup.c 561fdf418ae6a9cbd546d15cc529eaa6743fb2b3
***************
*** 19,24 ****
--- 19,26 ----
#include "commands/dbcommands.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
+ #include "storage/imsg.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "utils/builtins.h"
============================================================
*** src/backend/commands/vacuum.c 5cad7d65313d1dd67a70016906a2ee6d88043022
--- src/backend/commands/vacuum.c c9c1f73b74e94457609806d447ea8f32ca45dfaa
***************
*** 33,38 ****
--- 33,39 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
============================================================
*** src/backend/postmaster/postmaster.c 7c8ba1424aa374a2583c65fd912f563e94ab0a73
--- src/backend/postmaster/postmaster.c daf15ef4bd10eb88bd547d14eb69fb54264935fa
***************
*** 104,109 ****
--- 104,110 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
#include "postmaster/fork_process.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
============================================================
*** src/backend/storage/ipc/ipc.c af20d2d3315ef31aa41441a3e77b8b975ee1a890
--- src/backend/storage/ipc/ipc.c 08ec591d004b63b5bd4e49d3e1476ab839fa10ae
***************
*** 25,31 ****
#include "miscadmin.h"
#ifdef PROFILE_PID_DIR
! #include "postmaster/autovacuum.h"
#endif
#include "storage/ipc.h"
#include "tcop/tcopprot.h"
--- 25,31 ----
#include "miscadmin.h"
#ifdef PROFILE_PID_DIR
! #include "postmaster/coordinator.h"
#endif
#include "storage/ipc.h"
#include "tcop/tcopprot.h"
============================================================
*** src/backend/storage/ipc/ipci.c 7e32569fd32941d7c26800f3077b0ff94e210f9e
--- src/backend/storage/ipc/ipci.c b63e952dcfaf0e502a17a5b9ed833d24420b4be4
***************
*** 23,30 ****
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
- #include "postmaster/autovacuum.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
--- 23,30 ----
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
+ #include "postmaster/coordinator.h"
#include "postmaster/postmaster.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
============================================================
*** src/backend/storage/lmgr/proc.c 227fffe14ecf0833f313fc7eba58a45e968210fe
--- src/backend/storage/lmgr/proc.c fa15621f3050e53eda5a0f23a4d2f9dc318b967b
***************
*** 38,44 ****
#include "access/transam.h"
#include "access/xact.h"
#include "miscadmin.h"
! #include "postmaster/autovacuum.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
--- 38,44 ----
#include "access/transam.h"
#include "access/xact.h"
#include "miscadmin.h"
! #include "postmaster/coordinator.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
============================================================
*** src/backend/tcop/postgres.c bbc0e7db5e6aae98d7077b6321089f4262547ca9
--- src/backend/tcop/postgres.c 713974257158c361ec0523636499eb0d6978e975
***************
*** 54,60 ****
#include "pg_trace.h"
#include "parser/analyze.h"
#include "parser/parser.h"
! #include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
--- 54,60 ----
#include "pg_trace.h"
#include "parser/analyze.h"
#include "parser/parser.h"
! #include "postmaster/coordinator.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
============================================================
*** src/backend/utils/init/miscinit.c d3022f8ecf3b3b7084e064ac2661d1ff7ec1c560
--- src/backend/utils/init/miscinit.c 36a9660601820685485763e3a24c7a7a5a266279
***************
*** 32,38 ****
#include "catalog/pg_authid.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
! #include "postmaster/autovacuum.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
--- 32,38 ----
#include "catalog/pg_authid.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
! #include "postmaster/coordinator.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
============================================================
*** src/backend/utils/init/postinit.c 7516795aa54c52d299b38a133329aa1ac8449a10
--- src/backend/utils/init/postinit.c 2311940745481586653cfe0f6e63ded907d079ae
***************
*** 34,40 ****
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
! #include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
--- 34,40 ----
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
! #include "postmaster/coordinator.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
============================================================
*** src/backend/postmaster/Makefile d1d789f751266c3718eeea6562fd89269f9bb9c0
--- src/backend/postmaster/Makefile 71cebd6ba9affa26f68e0323815ded2d848dc9c1
*************** include $(top_builddir)/src/Makefile.glo
*** 12,18 ****
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
! OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \
! syslogger.o walwriter.o
include $(top_srcdir)/src/backend/common.mk
--- 12,18 ----
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
! OBJS = autovacuum.o coordinator.o bgwriter.o fork_process.o pgarch.o pgstat.o \
! postmaster.o syslogger.o walwriter.o
include $(top_srcdir)/src/backend/common.mk
============================================================
*** src/backend/commands/analyze.c 618166236ea3b1f37c763478c0000bf14654717c
--- src/backend/commands/analyze.c ca6c52f63e546ac4c0ca8cdb16e52ed87caa93d4
***************
*** 35,40 ****
--- 35,41 ----
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
============================================================
*** src/backend/utils/misc/guc.c 194be259610536302033f04f41fcd59e83620e77
--- src/backend/utils/misc/guc.c 3be36ece10fafd251f2a75df97502d8fdde15b5f
***************
*** 52,57 ****
--- 52,58 ----
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/bgwriter.h"
+ #include "postmaster/coordinator.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
============================================================
*** src/backend/postmaster/pgstat.c 6f82a2553021742674644dccbb7072db58aa3711
--- src/backend/postmaster/pgstat.c 4db8ac340bca15d58b0684a59b043f053b3a2d86
***************
*** 49,55 ****
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pg_trace.h"
! #include "postmaster/autovacuum.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "storage/backendid.h"
--- 49,55 ----
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pg_trace.h"
! #include "postmaster/coordinator.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "storage/backendid.h"
*************** backend_read_statsfile(void)
*** 3699,3705 ****
if (count >= PGSTAT_POLL_LOOP_COUNT)
elog(WARNING, "pgstat wait timeout");
! /* Autovacuum launcher wants stats about all databases */
if (IsCoordinatorProcess())
pgStatDBHash = pgstat_read_statsfile(InvalidOid, false);
else
--- 3699,3705 ----
if (count >= PGSTAT_POLL_LOOP_COUNT)
elog(WARNING, "pgstat wait timeout");
! /* Coordinator wants stats about all databases */
if (IsCoordinatorProcess())
pgStatDBHash = pgstat_read_statsfile(InvalidOid, false);
else
============================================================
*** src/backend/commands/vacuumlazy.c ee705c606001471649559bc45d1676b7c943c3a2
--- src/backend/commands/vacuumlazy.c b07c6b71ca110d5302c58219c92398f1769bfa3d
***************
*** 47,52 ****
--- 47,53 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
============================================================
*** src/backend/postmaster/autovacuum.c 9f0dd25e08c15c2cf3989e24996554fa94c99f08
--- src/backend/postmaster/autovacuum.c c172d5501939734e131661bbf9264522ceafa467
***************
*** 1,47 ****
/*-------------------------------------------------------------------------
*
* autovacuum.c
*
! * PostgreSQL Integrated Autovacuum Daemon
*
! * The autovacuum system is structured in two different kinds of processes: the
! * autovacuum launcher and the autovacuum worker. The launcher is an
! * always-running process, started by the postmaster when the autovacuum GUC
! * parameter is set. The launcher schedules autovacuum workers to be started
! * when appropriate. The workers are the processes which execute the actual
! * vacuuming; they connect to a database as determined in the launcher, and
! * once connected they examine the catalogs to select the tables to vacuum.
*
! * The autovacuum launcher cannot start the worker processes by itself,
! * because doing so would cause robustness issues (namely, failure to shut
! * them down on exceptional conditions, and also, since the launcher is
! * connected to shared memory and is thus subject to corruption there, it is
! * not as robust as the postmaster). So it leaves that task to the postmaster.
*
- * There is an autovacuum shared memory area, where the launcher stores
- * information about the database it wants vacuumed. When it wants a new
- * worker to start, it sets a flag in shared memory and sends a signal to the
- * postmaster. Then postmaster knows nothing more than it must start a worker;
- * so it forks a new child, which turns into a worker. This new process
- * connects to shared memory, and there it can inspect the information that the
- * launcher has set up.
- *
- * If the fork() call fails in the postmaster, it sets a flag in the shared
- * memory area, and sends a signal to the launcher. The launcher, upon
- * noticing the flag, can try starting the worker again by resending the
- * signal. Note that the failure can only be transient (fork failure due to
- * high load, memory pressure, too many processes, etc); more permanent
- * problems, like failure to connect to a database, are detected later in the
- * worker and dealt with just by having the worker exit normally. The launcher
- * will launch a new worker again later, per schedule.
- *
- * When the worker is done vacuuming it sends SIGUSR2 to the launcher. The
- * launcher then wakes up and is able to launch another worker, if the schedule
- * is so tight that a new worker is needed immediately. At this time the
- * launcher can also balance the settings for the various remaining workers'
- * cost-based vacuum delay feature.
- *
- * Note that there can be more than one worker in a database concurrently.
* They will store the table they are currently vacuuming in shared memory, so
* that other workers avoid being blocked waiting for the vacuum lock for that
* table. They will also reload the pgstats data just before vacuuming each
--- 1,20 ----
/*-------------------------------------------------------------------------
*
* autovacuum.c
*
! * PostgreSQL Integrated Autovacuum
*
! * The autovacuum system now is a user of the generalized background worker
! * system. When the autovacuum GUC parameter is set, the coordinator begins
! * to schedule autovacuum jobs for background workers when appropriate. The
! * workers are then executing the actual vacuuming; once they receive their
! * job, they examine the catalogs to select the table(s) to vacuum.
*
! * Whenever a worker is done vacuuming the coordinator wakes up and rebalances
! * the settings for the various remaining workers' cost-based vacuum delay
! * feature. Note that there can well be more than one worker in a database
! * performing vacuum concurrently.
*
* They will store the table they are currently vacuuming in shared memory, so
* that other workers avoid being blocked waiting for the vacuum lock for that
* table. They will also reload the pgstats data just before vacuuming each
***************
*** 80,97 ****
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
- #include "storage/procarray.h"
#include "storage/procsignal.h"
- #include "storage/shmem.h"
#include "storage/sinvaladt.h"
#include "tcop/tcopprot.h"
- #include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
--- 53,68 ----
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
#include "tcop/tcopprot.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
*************** bool autovacuum_enabled = false;
*** 105,113 ****
* GUC parameters
*/
bool autovacuum_enabled = false;
- int max_background_workers;
- int min_spare_background_workers;
- int max_spare_background_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
--- 76,81 ----
*************** int Log_autovacuum_min_duration = -1;
*** 120,140 ****
int Log_autovacuum_min_duration = -1;
! /* how long to keep pgstat data in the launcher, in milliseconds */
#define STATS_READ_DELAY 1000
! /* the minimum allowed time between two awakenings of the launcher */
#define MIN_AUTOVAC_SLEEPTIME 100.0 /* milliseconds */
- /* Flags to tell if we are in a coordinator or background worker process */
- static bool am_coordinator = false;
- static bool am_background_worker = false;
-
- /* Flags set by signal handlers */
- static volatile sig_atomic_t got_SIGHUP = false;
- static volatile sig_atomic_t got_SIGUSR2 = false;
- static volatile sig_atomic_t got_SIGTERM = false;
-
/* Comparison point for determining whether freeze_max_age is exceeded */
static TransactionId recentXid;
--- 88,99 ----
int Log_autovacuum_min_duration = -1;
! /* how long to keep pgstat data in the coordinator, in milliseconds */
#define STATS_READ_DELAY 1000
! /* the minimum allowed time between two awakenings of the coordinator */
#define MIN_AUTOVAC_SLEEPTIME 100.0 /* milliseconds */
/* Comparison point for determining whether freeze_max_age is exceeded */
static TransactionId recentXid;
*************** static int default_freeze_table_age;
*** 142,177 ****
static int default_freeze_min_age;
static int default_freeze_table_age;
- /* Memory contexts for long-lived data */
- static MemoryContext CoordinatorMemCxt;
- static MemoryContext BgWorkerMemCxt;
-
- /* job handling routines */
- static void bgworker_job_initialize(worker_state new_state);
- static void bgworker_job_completed(void);
-
- /* struct to keep track of databases in launcher */
- typedef struct avl_dbase
- {
- Oid adl_datid; /* hash key -- must be first */
- TimestampTz adl_next_worker;
- int adl_score;
- } avl_dbase;
-
- /* struct to keep track of databases in worker */
- typedef struct avw_dbase
- {
- Oid adw_datid; /* hash key -- must be first */
- char *adw_name;
- TransactionId adw_frozenxid;
- PgStat_StatDBEntry *adw_entry;
- } avw_dbase;
-
- typedef struct cached_job {
- Dlelem cj_links;
- IMessage *cj_msg;
- } cached_job;
-
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
{
--- 101,106 ----
*************** typedef struct autovac_table
*** 198,282 ****
char *at_datname;
} autovac_table;
- /*-------------
- * The main background worker shmem struct. On shared memory we store this
- * main struct and the array of WorkerInfo structs. This struct keeps:
- *
- * co_coordinatorid the BackendId of the coordinator
- * co_freeWorkers the WorkerInfo freelist
- * co_runningWorkers the WorkerInfo non-free queue
- * co_startingWorker pointer to WorkerInfo currently being started
- * (cleared by the worker itself as soon as it's up and
- * running)
- *
- * This struct is protected by WorkerInfoLock, except for parts of the worker
- * list (see above).
- *-------------
- */
- typedef struct
- {
- BackendId co_coordinatorid;
- WorkerInfo co_freeWorkers;
- SHM_QUEUE co_runningWorkers;
- WorkerInfo co_startingWorker;
- } CoordinatorShmemStruct;
-
- static CoordinatorShmemStruct *CoordinatorShmem;
-
- /*
- * Table of databases with at least one connected worker, resides in shared
- * memory, protected by CoordinatorDatabasesLock
- */
- static HTAB *co_databases = NULL;
-
- /* the database list in the launcher, and the context that contains it */
- static Dllist *DatabaseList = NULL;
- static MemoryContext DatabaseListCxt = NULL;
-
- /* Pointer to my own WorkerInfo, valid on each worker */
- static WorkerInfo MyWorkerInfo = NULL;
- static WorkerInfo terminatable_worker = NULL;
-
- #ifdef EXEC_BACKEND
- static pid_t coordinator_forkexec(void);
- static pid_t bgworker_forkexec(void);
- #endif
- NON_EXEC_STATIC void BackgroundWorkerMain(int argc, char *argv[]);
- static void handle_imessage(IMessage *msg);
- NON_EXEC_STATIC void CoordinatorMain(int argc, char *argv[]);
-
- static void init_co_database(co_database *codb);
- static co_database *get_co_database(Oid dboid);
- static void populate_co_databases(void);
-
- static bool can_deliver_cached_job(co_database *codb, IMessage *msg,
- BackendId *target);
- static WorkerInfo get_idle_worker(co_database *codb);
- static void cache_job(IMessage *msg, co_database *codb);
- static void forward_job(IMessage *msg, co_database *codb, BackendId backend_id);
- static void dispatch_job(IMessage *msg, co_database *codb);
- static void process_cached_jobs(co_database *codb);
-
- static bool CoordinatorCanLaunchWorker(TimestampTz current_time);
- static void manage_workers(bool can_launch);
- static void autovacuum_maybe_trigger_job(TimestampTz current_time,
- bool can_launch);
-
- static void do_start_worker(Oid dboid);
- static Oid autovacuum_select_database(void);
- static void coordinator_determine_sleep(bool can_launch, bool recursing,
- struct timespec *nap);
- static void autovacuum_update_timing(Oid dbid, TimestampTz now);
- static List *get_database_list(void);
- static void rebuild_database_list(Oid newdb);
static int db_comparator(const void *a, const void *b);
- static void autovac_balance_cost(void);
- static void add_as_idle_worker(Oid dbid, bool inc_connected_count);
-
- static void do_autovacuum(void);
- static void FreeWorkerInfo(int code, Datum arg);
-
static autovac_table *table_recheck_autovac(Oid relid, HTAB *table_toast_map,
TupleDesc pg_class_desc);
static void relation_needs_vacanalyze(Oid relid, AutoVacOpts *relopts,
--- 127,134 ----
*************** static void autovac_report_activity(auto
*** 292,1279 ****
PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(autovac_table *tab);
- static void avl_sighup_handler(SIGNAL_ARGS);
- static void avl_sigusr2_handler(SIGNAL_ARGS);
- static void avl_sigterm_handler(SIGNAL_ARGS);
static void autovac_refresh_stats(void);
- char *
- decode_worker_state(worker_state state)
- {
- switch (state)
- {
- case WS_IDLE: return "WS_IDLE";
- case WS_AUTOVACUUM: return "WS_AUTOVACUUM";
-
- default: return "UNKNOWN STATE";
- }
- }
-
-
/********************************************************************
* COORDINATOR CODE
********************************************************************/
- #ifdef EXEC_BACKEND
- /*
- * forkexec routine for the coordinator process.
- *
- * Format up the arglist, then fork and exec.
- */
- static pid_t
- coordinator_forkexec(void)
- {
- char *av[10];
- int ac = 0;
-
- av[ac++] = "postgres";
- av[ac++] = "--forkcoordinator";
- av[ac++] = NULL; /* filled in by postmaster_forkexec */
- av[ac] = NULL;
-
- Assert(ac < lengthof(av));
-
- return postmaster_forkexec(ac, av);
- }
-
- /*
- * We need this set from the outside, before InitProcess is called
- */
void
- CoordinatorIAm(void)
- {
- am_coordinator = true;
- }
- #endif
-
- /*
- * Main entry point for coordinator process, to be called from the postmaster.
- */
- int
- StartCoordinator(void)
- {
- pid_t CoordinatorPID;
-
- #ifdef EXEC_BACKEND
- switch ((CoordinatorPID = coordinator_forkexec()))
- #else
- switch ((CoordinatorPID = fork_process()))
- #endif
- {
- case -1:
- ereport(LOG,
- (errmsg("could not fork the coordinator process: %m")));
- return 0;
-
- #ifndef EXEC_BACKEND
- case 0:
- /* in postmaster child ... */
- /* Close the postmaster's sockets */
- ClosePostmasterPorts(false);
-
- /* Lose the postmaster's on-exit routines */
- on_exit_reset();
-
- CoordinatorMain(0, NULL);
- break;
- #endif
- default:
- return (int) CoordinatorPID;
- }
-
- /* shouldn't get here */
- return 0;
- }
-
- void
- init_co_database(co_database *codb)
- {
- Assert(ShmemAddrIsValid(codb));
- SHMQueueInit(&codb->codb_idle_workers);
- codb->codb_num_idle_workers = 0;
-
- /*
- * While only the coordinator may fiddle with this list, as its entries
- * reside in that process' memory, it's safe to set the counters to 0
- * and initialize the list headers with NULL values using DLInitList().
- */
- codb->codb_num_cached_jobs = 0;
- DLInitList(&codb->codb_cached_jobs);
-
- codb->codb_num_connected_workers = 0;
- }
-
- static void
- cache_job(IMessage *msg, co_database *codb)
- {
- cached_job *job;
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG5, "Coordinator: caching job of type %s for database %d",
- decode_imessage_type(msg->type), codb->codb_dboid);
- #endif
-
- job = palloc(sizeof(cached_job));
- DLInitElem(&job->cj_links, job);
- job->cj_msg = msg;
- DLAddTail(&codb->codb_cached_jobs, &job->cj_links);
- codb->codb_num_cached_jobs++;
- }
-
- /*
- * get_idle_worker
- *
- * Returns the first idle worker for a given database, removing it from its
- * list of idle workers. The caller is expected to make sure that there is
- * at least one idle worker and it must hold the CoordinatorDatabasesLock.
- */
- static WorkerInfo
- get_idle_worker(co_database *codb)
- {
- WorkerInfo worker;
-
- /* remove a worker from the list of idle workers */
- worker = (WorkerInfo) SHMQueueNext(&codb->codb_idle_workers,
- &codb->codb_idle_workers,
- offsetof(WorkerInfoData, wi_links));
- Assert(worker);
- SHMQueueDelete(&worker->wi_links);
- Assert(worker->wi_backend_id != InvalidBackendId);
-
- /* maintain per-database counter */
- codb->codb_num_idle_workers--;
-
- return worker;
- }
-
- /*
- * forward_job
- *
- * Takes an imessage and forwards it to the first idle backend for the given
- * database as its next job to process. The caller must hold the
- * CoordinatorDatabasesLock.
- */
- static void
- forward_job(IMessage *msg, co_database *codb, BackendId backend_id)
- {
- /* various actions before job delivery depending on the message type */
- switch (msg->type)
- {
- case IMSGT_TERM_WORKER:
- break;
-
- case IMSGT_PERFORM_VACUUM:
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG1, "Coordinator: delivering msg %s of size %d for "
- "database %d to backend %d",
- decode_imessage_type(msg->type), msg->size, codb->codb_dboid,
- backend_id);
- #endif
- autovacuum_update_timing(codb->codb_dboid, GetCurrentTimestamp());
- break;
-
- default:
- /* no-op */
- break;
- }
-
- IMessageActivate(msg, backend_id);
- }
-
- /*
- * dispatch_job
- *
- * Depending on the status of idle workers, this either forwards a job to an
- * idle worker directly or caches it for later processing. The caller must
- * hold the CoordinatorDatabasesLock.
- */
- void
- dispatch_job(IMessage *msg, co_database *codb)
- {
- WorkerInfo worker;
-
- if (codb->codb_num_idle_workers > 0)
- {
- worker = get_idle_worker(codb);
- forward_job(msg, codb, worker->wi_backend_id);
- }
- else
- cache_job(msg, codb);
- }
-
- /*
- * process_cached_jobs
- *
- * Dispatches cached jobs to idle background workers, as long as there are
- * of both. Before delivering a job, an additional check is performed with
- * can_deliver_cached_job(), which also chooses the background worker to run
- * the job on.
- */
- static void
- process_cached_jobs(co_database *codb)
- {
- BackendId target;
- cached_job *job;
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG5, "Coordinator: cached jobs: %d, idle workers: %d",
- codb->codb_num_cached_jobs, codb->codb_num_idle_workers);
- #endif
-
- job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
- while ((codb->codb_num_cached_jobs > 0) &&
- (codb->codb_num_idle_workers > 0) &&
- (job != NULL))
- {
- target = InvalidBackendId;
- if (can_deliver_cached_job(codb, job->cj_msg, &target))
- {
- /* remove the job from the cache */
- DLRemove(&job->cj_links);
- codb->codb_num_cached_jobs--;
-
- /* forward the job to some idle worker and cleanup */
- if (target == InvalidBackendId)
- target = get_idle_worker(codb)->wi_backend_id;
-
- forward_job(job->cj_msg, codb, target);
- pfree(job);
-
- job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
- }
- else
- job = (cached_job*) DLGetSucc(&job->cj_links);
- }
- }
-
- /*
- * populate_co_databases
- *
- * Called at startup of the coordinator to scan pg_database. Schedules an
- * initial VACUUM job on the template database to populate pg_stat.
- */
- static void
- populate_co_databases()
- {
- List *dblist;
- ListCell *cell;
- IMessage *msg;
-
- dblist = get_database_list();
- LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
- foreach(cell, dblist)
- {
- avw_dbase *avdb = lfirst(cell);
- co_database *codb = get_co_database(avdb->adw_datid);
- if (codb->codb_dboid == TemplateDbOid)
- {
- /*
- * Create a cached job as an imessage to ourselves, but without
- * activating it. It can get forwarded to a backend later on.
- */
- msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
- cache_job(msg, codb);
- }
- }
- LWLockRelease(CoordinatorDatabasesLock);
- }
-
- /*
- * Main loop for the coordinator process.
- */
- NON_EXEC_STATIC void
- CoordinatorMain(int argc, char *argv[])
- {
- sigjmp_buf local_sigjmp_buf;
- IMessage *msg = NULL;
- bool can_launch;
-
- /* we are a postmaster subprocess now */
- IsUnderPostmaster = true;
- am_coordinator = true;
-
- /* reset MyProcPid */
- MyProcPid = getpid();
-
- /* record Start Time for logging */
- MyStartTime = time(NULL);
-
- /* Identify myself via ps */
- init_ps_display("coordinator process", "", "", "");
-
- ereport(LOG,
- (errmsg("coordinator started")));
-
- if (PostAuthDelay)
- pg_usleep(PostAuthDelay * 1000000L);
-
- SetProcessingMode(InitProcessing);
-
- /*
- * If possible, make this process a group leader, so that the postmaster
- * can signal any child processes too. (coordinator probably never has
- * any child processes, but for consistency we make all postmaster child
- * processes do this.)
- */
- #ifdef HAVE_SETSID
- if (setsid() < 0)
- elog(FATAL, "setsid() failed: %m");
- #endif
-
- /*
- * Set up signal handlers. We operate on databases much like a regular
- * backend, so we use the same signal handling. See equivalent code in
- * tcop/postgres.c.
- */
- pqsignal(SIGHUP, avl_sighup_handler);
- pqsignal(SIGINT, StatementCancelHandler);
- pqsignal(SIGTERM, avl_sigterm_handler);
-
- pqsignal(SIGQUIT, quickdie);
- pqsignal(SIGALRM, handle_sig_alarm);
-
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, avl_sigusr2_handler);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /* Early initialization */
- BaseInit();
-
- /*
- * Create a per-backend PGPROC struct in shared memory, except in the
- * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
- * this before we can use LWLocks (and in the EXEC_BACKEND case we already
- * had to do some stuff with LWLocks).
- */
- #ifndef EXEC_BACKEND
- InitProcess();
- #endif
-
- InitPostgres(NULL, InvalidOid, NULL, NULL);
-
- SetProcessingMode(NormalProcessing);
-
- /*
- * Create a memory context that we will do all our work in. We do this so
- * that we can reset the context during error recovery and thereby avoid
- * possible memory leaks.
- */
- CoordinatorMemCxt = AllocSetContextCreate(TopMemoryContext,
- "Coordinator",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(CoordinatorMemCxt);
-
- /*
- * If an exception is encountered, processing resumes here.
- *
- * This code is a stripped down version of PostgresMain error recovery.
- */
- if (sigsetjmp(local_sigjmp_buf, 1) != 0)
- {
- /* since not using PG_TRY, must reset error stack by hand */
- error_context_stack = NULL;
-
- /* Prevents interrupts while cleaning up */
- HOLD_INTERRUPTS();
-
- /* Forget any pending QueryCancel request */
- QueryCancelPending = false;
- disable_sig_alarm(true);
- QueryCancelPending = false; /* again in case timeout occurred */
-
- /* Report the error to the server log */
- EmitErrorReport();
-
- /* Abort the current transaction in order to recover */
- AbortOutOfAnyTransaction();
-
- /*
- * Now return to normal top-level context and clear ErrorContext for
- * next time.
- */
- MemoryContextSwitchTo(CoordinatorMemCxt);
- FlushErrorState();
-
- /* Flush any leaked data in the top-level context */
- MemoryContextResetAndDeleteChildren(CoordinatorMemCxt);
-
- /* don't leave dangling pointers to freed memory */
- DatabaseListCxt = NULL;
- DatabaseList = NULL;
-
- /*
- * Make sure pgstat also considers our stat data as gone. Note: we
- * mustn't use autovac_refresh_stats here.
- */
- pgstat_clear_snapshot();
-
- /* Now we can allow interrupts again */
- RESUME_INTERRUPTS();
-
- /*
- * Sleep at least 1 second after any error. We don't want to be
- * filling the error logs as fast as we can.
- */
- pg_usleep(1000000L);
- }
-
- /* We can now handle ereport(ERROR) */
- PG_exception_stack = &local_sigjmp_buf;
-
- /* must unblock signals before calling rebuild_database_list */
- PG_SETMASK(&UnBlockSig);
-
- CoordinatorShmem->co_coordinatorid = MyBackendId;
-
- /*
- * Initial population of the database list from pg_database
- */
- populate_co_databases();
-
- /*
- * Create the initial database list. The invariant we want this list to
- * keep is that it's ordered by decreasing next_time. As soon as an entry
- * is updated to a higher time, it will be moved to the front (which is
- * correct because the only operation is to add autovacuum_naptime to the
- * entry, and time always increases).
- */
- rebuild_database_list(InvalidOid);
-
- for (;;)
- {
- TimestampTz current_time;
- struct timespec nap;
- sigset_t sigmask, oldmask;
- fd_set socks;
- int max_sock_id;
- bool socket_ready;
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive(true))
- proc_exit(1);
-
- can_launch = (CoordinatorShmem->co_freeWorkers != NULL);
- coordinator_determine_sleep(can_launch, false, &nap);
-
- /* Initialize variables for listening on sockets */
- FD_ZERO(&socks);
- max_sock_id = 0;
- socket_ready = false;
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG1, "Coordinator: listening...");
- #endif
-
- /* Allow sinval catchup interrupts while sleeping */
- EnableCatchupInterrupt();
-
- /*
- * Sleep for a while according to schedule - and possibly interrupted
- * by messages from one of the sockets or by internal messages from
- * background workers or normal backends.
- *
- * Using pselect here prevents the possible loss of a singnal in
- * between the last check for imessages and following select call.
- * However, it requires a newish platform that supports pselect.
- *
- * On some platforms, signals won't interrupt select. Postgres used
- * to split the nap time into one second intervals to ensure to react
- * reasonably promptly for autovacuum purposes. However, for
- * Postgres-R this is not tolerable, so that mechanism has been
- * removed.
- *
- * FIXME: to support these platforms or others that don't implement
- * pselect properly, another work-around like for example the
- * self-pipe trick needs to be implemented. On Windows, we
- * could implement pselect based on the current port's select
- * method.
- */
-
- /* FIXME: indentation */
- {
-
- sigemptyset(&sigmask);
- sigaddset(&sigmask, SIGINT);
- sigaddset(&sigmask, SIGHUP);
- sigaddset(&sigmask, SIGUSR2);
- sigprocmask(SIG_BLOCK, &sigmask, &oldmask);
-
- sigemptyset(&sigmask);
-
- if (pselect(max_sock_id + 1, &socks, NULL, NULL, &nap,
- &sigmask) < 0)
- {
- if (errno != EINTR)
- {
- elog(WARNING, "Coordinator: pselect failed: %m");
- socket_ready = true;
- }
- }
- else
- socket_ready = true;
-
- sigprocmask(SIG_SETMASK, &oldmask, NULL);
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
- */
- if (!PostmasterIsAlive(true))
- proc_exit(1);
- }
-
- DisableCatchupInterrupt();
-
- /* the normal shutdown case */
- if (got_SIGTERM)
- break;
-
- if (got_SIGHUP)
- {
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG5, "Coordinator: got SIGHUP");
- #endif
-
- got_SIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
-
- /* rebalance in case the default cost parameters changed */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
- autovac_balance_cost();
- LWLockRelease(WorkerInfoLock);
-
- /* rebuild the list in case the naptime changed */
- rebuild_database_list(InvalidOid);
- }
-
- /*
- * postmaster signalled failure to start a worker
- */
- if (got_SIGUSR2)
- {
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG5, "Coordinator: got SIGUSR2");
- #endif
-
- got_SIGUSR2 = false;
-
- /* FIXME: fix indentation after merging */
- {
-
- /*
- * If the postmaster failed to start a new worker, we sleep
- * for a little while and resend the signal. The new worker's
- * state is still in memory, so this is sufficient. After
- * that, we restart the main loop.
- *
- * XXX should we put a limit to the number of times we retry?
- * I don't think it makes much sense, because a future start
- * of a worker will continue to fail in the same way.
- *
- * FIXME: for the autovac launcher, it might have been okay
- * to just sleep. But the coordinator needs to remain
- * as responsive as possible, even if the postmaster
- * is currently unable to fork new workers.
- */
- pg_usleep(1000000L); /* 1s */
- SendPostmasterSignal(PMSIGNAL_START_BGWORKER);
- continue;
- }
- }
-
- /* handle sockets with pending reads, just a placeholder for now */
- if (socket_ready)
- {
- }
-
- /* handle pending imessages */
- while ((msg = IMessageCheck()) != NULL)
- handle_imessage(msg);
-
- current_time = GetCurrentTimestamp();
- can_launch = CoordinatorCanLaunchWorker(current_time);
-
- /*
- * Periodically check and trigger autovacuum workers, if autovacuum
- * is enabled.
- */
- if (autovacuum_enabled)
- autovacuum_maybe_trigger_job(current_time, can_launch);
-
- manage_workers(can_launch);
- }
-
- /* Normal exit from the coordinator is here */
- ereport(LOG,
- (errmsg("coordinator shutting down")));
- CoordinatorShmem->co_coordinatorid = InvalidBackendId;
-
- proc_exit(0); /* done */
- }
-
- void
- handle_imessage(IMessage *msg)
- {
- BackendId msg_sender;
- PGPROC *proc;
- TransactionId local_xid = InvalidTransactionId;
- co_database *codb = NULL;
- Oid dboid = InvalidOid;
-
- /*
- * Get the PGPROC entry of the sender and the related database info, if
- * any.
- */
- msg_sender = msg->sender;
-
- LWLockAcquire(ProcArrayLock, LW_SHARED);
-
- proc = BackendIdGetProc(msg_sender);
- if (proc)
- {
- local_xid = proc->xid;
- dboid = proc->databaseId;
- }
-
- LWLockRelease(ProcArrayLock);
-
- #ifdef COORDINATOR_DEBUG
- if (proc)
- elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
- "\t(connected to db %d, local xid %d)",
- decode_imessage_type(msg->type), msg->size, msg_sender,
- dboid, local_xid);
- else
- elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
- "\t(for which no PGPROC could be found)",
- decode_imessage_type(msg->type), msg->size, msg_sender);
- #endif
-
- switch (msg->type)
- {
- /*
- * Standard messages from background worker processes
- */
- case IMSGT_REGISTER_WORKER:
- case IMSGT_READY:
- /* consume the message */
- IMessageRemove(msg);
-
- LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
- codb = get_co_database(dboid);
- process_cached_jobs(codb);
- LWLockRelease(CoordinatorDatabasesLock);
-
- /*
- * We trigger a DatabaseList rebuild if it is still empty and
- * after a job is done. This mainly covers the initialization
- * phase after the first background worker is done with vacuuming
- * template1 (and thus having populated pgstat).
- */
- if (DLGetHead(DatabaseList) == NULL)
- rebuild_database_list(InvalidOid);
-
- /*
- * Rebalance cost limits, as the worker has already reported its
- * startup to the stats collector. However, that needs to be
- * removed, so there's probably no point in rebalancing here.
- * So: FIXME.
- */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
- autovac_balance_cost();
- LWLockRelease(WorkerInfoLock);
-
- break;
-
- case IMSGT_FORCE_VACUUM:
- /* consume the message */
- IMessageRemove(msg);
-
- /* trigger an autovacuum worker */
- dboid = autovacuum_select_database();
- if (dboid != InvalidOid)
- {
- LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
- codb = get_co_database(dboid);
- msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
- dispatch_job(msg, codb);
- LWLockRelease(CoordinatorDatabasesLock);
- }
- break;
-
- default:
- elog(WARNING, "Coordinator: unknown message type: %c, ignored!",
- msg->type);
- IMessageRemove(msg);
- }
- }
-
-
-
- /*
- * get_co_database
- *
- * Gets or creates the database info for replication in shared memory.
- * Expects the caller to have the CoordinatorDatabasesLock.
- */
- co_database *
- get_co_database(Oid dboid)
- {
- co_database *codb;
- bool found;
-
- codb = hash_search(co_databases, &dboid, HASH_ENTER, &found);
- if (!found)
- init_co_database(codb);
-
- return codb;
- }
-
- static bool
- can_deliver_cached_job(co_database *codb, IMessage *msg, BackendId *target)
- {
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG5, "Coordinator: checking deliverability of job type %s",
- decode_imessage_type(msg->type));
- #endif
-
- switch (msg->type)
- {
- case IMSGT_TERM_WORKER:
- case IMSGT_PERFORM_VACUUM:
- return true;
-
- default:
- elog(WARNING, "Coordinator: missing deliverability check for "
- "message type %s", decode_imessage_type(msg->type));
- return false;
- }
- }
-
- /*
- * manage_workers
- *
- * Starts background workers for databases which have at least one cached
- * job or which have less than min_background_workers connected. Within the
- * same loop, the max_background_workers is checked and terminates a worker
- * accordingly.
- *
- * Note that at max one worker can be requested to start or stop per
- * invocation.
- */
- static void
- manage_workers(bool can_launch)
- {
- HASH_SEQ_STATUS hash_status;
- co_database *codb;
- Oid launch_dboid = InvalidOid;
- float max_score = 0.0,
- score;
- bool worker_slots_available;
- int idle_workers_required;
- int job_workers_required;
-
- LWLockAcquire(WorkerInfoLock, LW_SHARED);
- worker_slots_available = (CoordinatorShmem->co_freeWorkers != NULL);
- LWLockRelease(WorkerInfoLock);
-
- /*
- * Terminate an unneeded worker that has been fetched from the list of
- * idle workers in the last invocation. We defer sending the signal one
- * invocation to make sure the coordinator had time to handle all
- * pending messages from that worker. As idle workers don't ever send
- * messages, we can safely assume there is no pending message from that
- * worker by now.
- */
- if (terminatable_worker != NULL)
- {
- IMessage *msg;
-
- #ifdef COORDINATOR_DEBUG
- PGPROC *proc = BackendIdGetProc(terminatable_worker->wi_backend_id);
- if (proc)
- elog(DEBUG3, "Coordinator: terminating worker [%d/%d].",
- proc->pid, terminatable_worker->wi_backend_id);
- else
- elog(WARNING, "Coordinator: terminating worker (no PGPROC, backend %d).",
- terminatable_worker->wi_backend_id);
- #endif
-
- msg = IMessageCreate(IMSGT_TERM_WORKER, 0);
- IMessageActivate(msg, terminatable_worker->wi_backend_id);
-
- terminatable_worker = NULL;
- }
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG3, "Coordinator: manage_workers: can_launch: %s, slots_available: %s",
- (can_launch ? "true" : "false"), (worker_slots_available ? "true" : "false"));
- #endif
-
- /*
- * Check the list of databases and fire the first pending request
- * we find.
- */
- idle_workers_required = 0;
- job_workers_required = 0;
- LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
- hash_seq_init(&hash_status, co_databases);
- while ((codb = (co_database*) hash_seq_search(&hash_status)))
- {
- score = ((float) codb->codb_num_cached_jobs /
- (float) (codb->codb_num_connected_workers + 1)) * 100.0;
-
- if (codb->codb_num_idle_workers < min_spare_background_workers)
- score += (min_spare_background_workers -
- codb->codb_num_idle_workers) * 10.0;
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG3, "Coordinator: db %d, idle/conn: %d/%d, jobs: %d, score: %0.1f",
- codb->codb_dboid, codb->codb_num_idle_workers,
- codb->codb_num_connected_workers, codb->codb_num_cached_jobs,
- score);
- #endif
-
- if (codb->codb_num_cached_jobs &&
- (codb->codb_num_connected_workers == 0))
- job_workers_required++;
-
- if (codb->codb_num_idle_workers < min_spare_background_workers)
- idle_workers_required += (min_spare_background_workers -
- codb->codb_num_idle_workers);
-
- /*
- * FIXME: "misconfiguration" allows "starvation" in case the global
- * maximum is reached all with idle workers, but other dbs
- * w/o a single worker still have jobs.
- */
- if (can_launch && ((codb->codb_num_cached_jobs > 0) ||
- (codb->codb_num_idle_workers <
- min_spare_background_workers)))
- {
- if (can_launch && (score > max_score))
- {
- launch_dboid = codb->codb_dboid;
- max_score = score;
- }
- }
-
- /*
- * If we are above limit, we fetch an idle worker from the list
- * and mark it as terminatable. Actual termination happens in
- * the following invocation, see above.
- */
- if ((terminatable_worker == NULL) &&
- (codb->codb_num_idle_workers > max_spare_background_workers))
- terminatable_worker = get_idle_worker(codb);
- }
- LWLockRelease(CoordinatorDatabasesLock);
-
- if (!worker_slots_available && idle_workers_required > 0)
- {
- elog(WARNING, "Coordinator: no more background workers available, but requiring %d more, according to min_spare_background_workers.",
- idle_workers_required);
- }
-
- if (!worker_slots_available && job_workers_required > 0)
- {
- elog(WARNING, "Coordinator: no background workers avalibale, but %d databases have background jobs pending.",
- job_workers_required);
- }
-
- /* request a worker for the first database found, which needs one */
- if (OidIsValid(launch_dboid))
- do_start_worker(launch_dboid);
- }
-
- static bool
- CoordinatorCanLaunchWorker(TimestampTz current_time)
- {
- bool can_launch;
-
- /* FIXME: indentation */
- {
-
- /*
- * There are some conditions that we need to check before trying to
- * start a launcher. First, we need to make sure that there is a
- * launcher slot available. Second, we need to make sure that no
- * other worker failed while starting up.
- */
-
- LWLockAcquire(WorkerInfoLock, LW_SHARED);
-
- can_launch = (CoordinatorShmem->co_freeWorkers != NULL);
-
- if (CoordinatorShmem->co_startingWorker != NULL)
- {
- int waittime;
- WorkerInfo worker = CoordinatorShmem->co_startingWorker;
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG5, "Coordinator: another worker is starting...");
- #endif
-
- /*
- * We can't launch another worker when another one is still
- * starting up (or failed while doing so), so just sleep for a bit
- * more; that worker will wake us up again as soon as it's ready.
- * We will only wait autovacuum_naptime seconds (up to a maximum
- * of 60 seconds) for this to happen however. Note that failure
- * to connect to a particular database is not a problem here,
- * because the worker removes itself from the startingWorker
- * pointer before trying to connect. Problems detected by the
- * postmaster (like fork() failure) are also reported and handled
- * differently. The only problems that may cause this code to
- * fire are errors in the earlier sections of BackgroundWorkerMain,
- * before the worker removes the WorkerInfo from the
- * startingWorker pointer.
- */
- waittime = Min(autovacuum_naptime, 60) * 1000;
- if (TimestampDifferenceExceeds(worker->wi_launchtime, current_time,
- waittime))
- {
- LWLockRelease(WorkerInfoLock);
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
-
- /*
- * No other process can put a worker in starting mode, so if
- * startingWorker is still INVALID after exchanging our lock,
- * we assume it's the same one we saw above (so we don't
- * recheck the launch time).
- */
- if (CoordinatorShmem->co_startingWorker != NULL)
- {
- worker = CoordinatorShmem->co_startingWorker;
- worker->wi_dboid = InvalidOid;
- worker->wi_tableoid = InvalidOid;
- worker->wi_backend_id = InvalidBackendId;
- worker->wi_launchtime = 0;
- worker->wi_links.next = (SHM_QUEUE *) CoordinatorShmem->co_freeWorkers;
- CoordinatorShmem->co_freeWorkers = worker;
- CoordinatorShmem->co_startingWorker = NULL;
- elog(WARNING, "worker took too long to start; cancelled");
- }
- }
- else
- can_launch = false;
- }
- LWLockRelease(WorkerInfoLock); /* either shared or exclusive */
- }
-
- return can_launch;
- }
-
- static void
autovacuum_maybe_trigger_job(TimestampTz current_time, bool can_launch)
{
Oid dboid = InvalidOid;
--- 144,158 ----
*************** autovacuum_maybe_trigger_job(TimestampTz
*** 1302,1310 ****
if (OidIsValid(dboid))
{
LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
- codb = get_co_database(dboid);
-
/*
* Only dispatch a job, if it can be processed immediately
* so we don't end up having lots of autovacuum requests
--- 181,188 ----
if (OidIsValid(dboid))
{
LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ codb = get_co_database(dboid);
/*
* Only dispatch a job, if it can be processed immediately
* so we don't end up having lots of autovacuum requests
*************** autovacuum_maybe_trigger_job(TimestampTz
*** 1341,1347 ****
* for example due to the workers being all busy. If this is false, we will
* cause a long sleep, which will be interrupted when a worker exits.
*/
! static void
coordinator_determine_sleep(bool can_launch, bool recursing, struct timespec *nap)
{
Dlelem *elem;
--- 219,225 ----
* for example due to the workers being all busy. If this is false, we will
* cause a long sleep, which will be interrupted when a worker exits.
*/
! void
coordinator_determine_sleep(bool can_launch, bool recursing, struct timespec *nap)
{
Dlelem *elem;
*************** coordinator_determine_sleep(bool can_lau
*** 1416,1422 ****
* end of the interval. The actual values are not saved, which should not be
* much of a problem.
*/
! static void
rebuild_database_list(Oid newdb)
{
List *dblist;
--- 294,300 ----
* end of the interval. The actual values are not saved, which should not be
* much of a problem.
*/
! void
rebuild_database_list(Oid newdb)
{
List *dblist;
*************** db_comparator(const void *a, const void
*** 1621,1668 ****
}
/*
- * do_start_worker
- *
- * Bare-bones procedure for starting a background worker from the
- * coordinator. It sets up shared memory stuff and signals the postmaster to
- * start a worker.
- */
- void
- do_start_worker(Oid dboid)
- {
- WorkerInfo worker;
-
- Assert(OidIsValid(dboid));
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG3, "Coordinator: requesting worker for database %d.", dboid);
- #endif
-
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
-
- /*
- * Get a worker entry from the freelist. We checked above, so there
- * really should be a free slot -- complain very loudly if there
- * isn't.
- */
- worker = CoordinatorShmem->co_freeWorkers;
- if (worker == NULL)
- elog(FATAL, "no free worker found");
-
- CoordinatorShmem->co_freeWorkers = (WorkerInfo) worker->wi_links.next;
-
- worker->wi_dboid = dboid;
- worker->wi_backend_id = InvalidBackendId;
- worker->wi_launchtime = GetCurrentTimestamp();
-
- CoordinatorShmem->co_startingWorker = worker;
-
- LWLockRelease(WorkerInfoLock);
-
- SendPostmasterSignal(PMSIGNAL_START_BGWORKER);
- }
-
- /*
* autovacuum_select_database
*
* It determines what database to work on and sets up shared memory stuff. It
--- 499,504 ----
*************** do_start_worker(Oid dboid)
*** 1671,1677 ****
* Returns a pointer to the coordinator info struct of the database that the
* next worker should process, or NULL if no database needs vacuuming.
*/
! static Oid
autovacuum_select_database(void)
{
List *dblist;
--- 507,513 ----
* Returns a pointer to the coordinator info struct of the database that the
* next worker should process, or NULL if no database needs vacuuming.
*/
! Oid
autovacuum_select_database(void)
{
List *dblist;
*************** autovacuum_select_database(void)
*** 1845,1851 ****
* the selected database was previously absent from the list.
*/
! static void
autovacuum_update_timing(Oid dbid, TimestampTz now)
{
Dlelem *elem;
--- 681,687 ----
* the selected database was previously absent from the list.
*/
! void
autovacuum_update_timing(Oid dbid, TimestampTz now)
{
Dlelem *elem;
*************** autovacuum_update_timing(Oid dbid, Times
*** 1889,2536 ****
}
}
- /* SIGHUP: set flag to re-read config file at next convenient time */
- static void
- avl_sighup_handler(SIGNAL_ARGS)
- {
- got_SIGHUP = true;
- }
-
- /* SIGUSR2: postmaster failed to fork a worker for us */
- static void
- avl_sigusr2_handler(SIGNAL_ARGS)
- {
- got_SIGUSR2 = true;
- }
-
- /* SIGTERM: time to die */
- static void
- avl_sigterm_handler(SIGNAL_ARGS)
- {
- got_SIGTERM = true;
- }
-
-
/********************************************************************
* AUTOVACUUM WORKER CODE
********************************************************************/
- #ifdef EXEC_BACKEND
/*
- * forkexec routines for background workers.
- *
- * Format up the arglist, then fork and exec.
- */
- static pid_t
- bgworker_forkexec(void)
- {
- char *av[10];
- int ac = 0;
-
- av[ac++] = "postgres";
- av[ac++] = "--forkbgworker";
- av[ac++] = NULL; /* filled in by postmaster_forkexec */
- av[ac] = NULL;
-
- Assert(ac < lengthof(av));
-
- return postmaster_forkexec(ac, av);
- }
-
- /*
- * We need this set from the outside, before InitProcess is called
- */
- void
- BackgroundWorkerIAm(void)
- {
- am_background_worker = true;
- }
- #endif
-
- /*
- * Main entry point for a background worker process.
- *
- * This code is heavily based on pgarch.c, q.v.
- */
- int
- StartBackgroundWorker(void)
- {
- pid_t worker_pid;
-
- #ifdef EXEC_BACKEND
- switch ((worker_pid = bgworker_forkexec()))
- #else
- switch ((worker_pid = fork_process()))
- #endif
- {
- case -1:
- ereport(LOG,
- (errmsg("could not fork background worker process: %m")));
- return 0;
-
- #ifndef EXEC_BACKEND
- case 0:
- /* in postmaster child ... */
- /* Close the postmaster's sockets */
- ClosePostmasterPorts(false);
-
- /* Lose the postmaster's on-exit routines */
- on_exit_reset();
-
- BackgroundWorkerMain(0, NULL);
- break;
- #endif
- default:
- return (int) worker_pid;
- }
-
- /* shouldn't get here */
- return 0;
- }
-
- /*
- * add_as_idle_worker
- *
- * Marks the current worker as idle by adding it to the database's list of
- * idle worker backends. The caller is expected to hold the WorkerInfoLock.
- */
- static void
- add_as_idle_worker(Oid dbid, bool inc_connected_count)
- {
- co_database *codb;
-
- Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
-
- /* Lookup the corresponding database, or create an entry for it */
- LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
- codb = get_co_database(dbid);
-
- if (inc_connected_count)
- codb->codb_num_connected_workers++;
-
- /* add as an idle worker */
- SHMQueueInsertBefore(&codb->codb_idle_workers, &MyWorkerInfo->wi_links);
- codb->codb_num_idle_workers++;
-
- LWLockRelease(CoordinatorDatabasesLock);
- }
-
- /*
- * bgworker_job_initialize
- *
- * Initializes the memory contexts for a background job.
- */
- void
- bgworker_job_initialize(worker_state new_state)
- {
- /*
- * Note that the coordinator is responsible for dequeuing the worker from
- * the list of idle backends, but is shall *NOT* assign a worker state,
- * we do that from the worker exclusively.
- */
- Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
- Assert(MyWorkerInfo->wi_state == WS_IDLE);
-
- MyWorkerInfo->wi_state = new_state;
- switch (new_state)
- {
- case WS_IDLE:
- Assert(false); /* use bgworker_job_completed instead */
- break;
- case WS_AUTOVACUUM:
- set_ps_display("bg worker: autovacuum", false);
- break;
- default:
- set_ps_display("bg worker: unknown", false);
- }
-
- /*
- * StartTransactionCommand and CommitTransactionCommand will
- * automatically switch to other contexts. None the less we need this
- * one for other book-keeping of the various background jobs across
- * transactions, for example to keep the list of relations to vacuum.
- */
- Assert(BgWorkerMemCxt == NULL);
- BgWorkerMemCxt = AllocSetContextCreate(TopMemoryContext,
- "Background Worker",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- MessageContext = AllocSetContextCreate(TopMemoryContext,
- "MessageContext",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
-
- MemoryContextSwitchTo(BgWorkerMemCxt);
- }
-
- /*
- * bgworker_job_completed
- *
- * Cleans up the memory contexts used for the worker's current job and
- * informs the coordinator.
- */
- void
- bgworker_job_completed(void)
- {
- /* Notify the coordinator of the job completion. */
- #ifdef COORDINATOR_DEBUG
- ereport(DEBUG3,
- (errmsg("bg worker [%d]: job completed.", MyProcPid)));
- #endif
-
- /* reset the worker state */
- bgworker_reset();
- }
-
- void
- bgworker_reset(void)
- {
- BackendId CoordinatorId;
- IMessage *msg;
-
- elog(DEBUG5, "bg worker [%d/%d]: resetting",
- MyProcPid, MyBackendId);
-
- Assert(MyWorkerInfo->wi_state != WS_IDLE);
- Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
-
- /* reset the worker state */
- MyWorkerInfo->wi_state = WS_IDLE;
- set_ps_display("bg worker: idle", false);
-
- /* clean up memory contexts */
- Assert(BgWorkerMemCxt);
- MemoryContextSwitchTo(TopMemoryContext);
- MemoryContextDelete(BgWorkerMemCxt);
- BgWorkerMemCxt = NULL;
- MemoryContextDelete(MessageContext);
- MessageContext = NULL;
-
- /* Reset the process-local cleanup handler state. */
- BgWorkerCleanupInProgress = false;
-
- /* propagate as idle worker, inform the coordinator */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
- add_as_idle_worker(MyDatabaseId, false);
- LWLockRelease(WorkerInfoLock);
-
- CoordinatorId = GetCoordinatorId();
- if (CoordinatorId != InvalidBackendId)
- {
- msg = IMessageCreate(IMSGT_READY, 0);
- IMessageActivate(msg, CoordinatorId);
- }
- else
- elog(WARNING, "bg worker [%d/%d]: no coordinator?!?",
- MyProcPid, MyBackendId);
- }
-
- void
- bgworker_job_failed(int errcode)
- {
- TransactionId xid;
-
- xid = GetTopTransactionIdIfAny();
-
- #ifdef COORDINATOR_DEBUG
- ereport(DEBUG3,
- (errmsg("bg worker [%d/%d]: job failed (xid: %d)",
- MyProcPid, MyBackendId, xid)));
- #endif
-
- /*
- * Abort any transaction that might still be running and tell the
- * coordinator that we are ready to process the next background job.
- */
- AbortOutOfAnyTransaction();
-
- /*
- * Flush the error state.
- */
- FlushErrorState();
-
- /*
- * Make sure pgstat also considers our stat data as gone.
- */
- pgstat_clear_snapshot();
-
- Assert(!IMessageCheck());
- }
-
- /*
- * BackgroundWorkerMain
- */
- NON_EXEC_STATIC void
- BackgroundWorkerMain(int argc, char *argv[])
- {
- sigjmp_buf local_sigjmp_buf;
- BackendId coordinator_id;
- IMessage *msg;
- Oid dbid;
- char dbname[NAMEDATALEN];
- bool terminate_worker = false;
-
- /* we are a postmaster subprocess now */
- IsUnderPostmaster = true;
- am_background_worker = true;
-
- /* reset MyProcPid */
- MyProcPid = getpid();
-
- /* record Start Time for logging */
- MyStartTime = time(NULL);
-
- /* Identify myself via ps */
- init_ps_display("background worker process", "", "", "");
-
- SetProcessingMode(InitProcessing);
-
- /*
- * If possible, make this process a group leader, so that the postmaster
- * can signal any child processes too. (autovacuum probably never has any
- * child processes, but for consistency we make all postmaster child
- * processes do this.)
- */
- #ifdef HAVE_SETSID
- if (setsid() < 0)
- elog(FATAL, "setsid() failed: %m");
- #endif
-
- /*
- * Set up signal handlers. We operate on databases much like a regular
- * backend, so we use the same signal handling. See equivalent code in
- * tcop/postgres.c.
- *
- * Currently, we don't pay attention to postgresql.conf changes that
- * happen during a single daemon iteration, so we can ignore SIGHUP.
- */
- pqsignal(SIGHUP, SIG_IGN);
-
- /*
- * SIGINT is used to signal cancelling the current table's vacuum; SIGTERM
- * means abort and exit cleanly, and SIGQUIT means abandon ship.
- */
- pqsignal(SIGINT, StatementCancelHandler);
- pqsignal(SIGTERM, die);
- pqsignal(SIGQUIT, quickdie);
- pqsignal(SIGALRM, handle_sig_alarm);
-
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /* Early initialization */
- BaseInit();
-
- /*
- * Create a per-backend PGPROC struct in shared memory, except in the
- * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
- * this before we can use LWLocks (and in the EXEC_BACKEND case we already
- * had to do some stuff with LWLocks).
- */
- #ifndef EXEC_BACKEND
- InitProcess();
- #endif
-
- /*
- * If an exception is encountered, processing resumes here.
- *
- * See notes in postgres.c about the design of this coding.
- */
- if (sigsetjmp(local_sigjmp_buf, 1) != 0)
- {
- /* Prevents interrupts while cleaning up */
- HOLD_INTERRUPTS();
-
- /* Report the error to the server log */
- EmitErrorReport();
-
- /*
- * We can now go away. Note that because we called InitProcess, a
- * callback was registered to do ProcKill, which will clean up
- * necessary state.
- */
- proc_exit(0);
- }
-
- /* We can now handle ereport(ERROR) */
- PG_exception_stack = &local_sigjmp_buf;
-
- PG_SETMASK(&UnBlockSig);
-
- /*
- * Force zero_damaged_pages OFF in the background worker, even if it is
- * set in postgresql.conf. We don't really want such a dangerous option
- * being applied non-interactively.
- */
- SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
-
- /*
- * Force statement_timeout to zero to avoid a timeout setting from
- * preventing regular maintenance from being executed.
- */
- SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
-
- /*
- * Get the info about the database we're going to work on.
- */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
-
- /*
- * beware of startingWorker being INVALID; this should normally not
- * happen, but if a worker fails after forking and before this, the
- * launcher might have decided to remove it from the queue and start
- * again.
- */
- if (CoordinatorShmem->co_startingWorker == NULL)
- {
- /* no worker entry for me, go away */
- elog(WARNING, "background worker started without a worker entry");
- LWLockRelease(WorkerInfoLock);
- proc_exit(0);
- }
-
- MyWorkerInfo = CoordinatorShmem->co_startingWorker;
- dbid = MyWorkerInfo->wi_dboid;
-
- /* FIXME: indentation */
- {
-
- /*
- * remove from the "starting" pointer, so that the launcher can start
- * a new worker if required
- */
- CoordinatorShmem->co_startingWorker = NULL;
-
- coordinator_id = CoordinatorShmem->co_coordinatorid;
- LWLockRelease(WorkerInfoLock);
-
- on_shmem_exit(FreeWorkerInfo, 0);
-
- /*
- * Report autovac startup to the stats collector. We deliberately do
- * this before InitPostgres, so that the last_autovac_time will get
- * updated even if the connection attempt fails. This is to prevent
- * autovac from getting "stuck" repeatedly selecting an unopenable
- * database, rather than making any progress on stuff it can connect
- * to.
- */
- pgstat_report_autovac(dbid);
-
- /*
- * Connect to the selected database
- *
- * Note: if we have selected a just-deleted database (due to using
- * stale stats info), we'll fail and exit here.
- */
- InitPostgres(NULL, dbid, NULL, dbname);
- SetProcessingMode(NormalProcessing);
- set_ps_display("bg worker: idle", false);
- }
-
- BgWorkerMemCxt = NULL;
-
- MyWorkerInfo->wi_backend_id = MyBackendId;
- MyWorkerInfo->wi_state = WS_IDLE;
-
- #ifdef COORDINATOR_DEBUG
- elog(DEBUG3, "bg worker [%d/%d]: connected to database %d",
- MyProcPid, MyBackendId, dbid);
- #endif
-
- /*
- * Add as an idle worker and notify the coordinator only *after* having
- * set MyProc->databaseId in InitPostgres, so the coordinator can
- * determine which database we are connected to.
- */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
- add_as_idle_worker(dbid, true);
- LWLockRelease(WorkerInfoLock);
-
- /* register with the coordinator */
- if (coordinator_id != InvalidBackendId)
- {
- msg = IMessageCreate(IMSGT_REGISTER_WORKER, 0);
- IMessageActivate(msg, coordinator_id);
- }
-
- if (PostAuthDelay)
- pg_usleep(PostAuthDelay * 1000000L);
-
- while (!terminate_worker)
- {
- PG_TRY();
- {
- /* FIXME: indentation */
-
- CHECK_FOR_INTERRUPTS();
-
- ImmediateInterruptOK = true;
- pg_usleep(1000000L);
- ImmediateInterruptOK = false;
-
- /*
- * FIXME: check different ways of terminating a background worker
- * via ProcDiePending. How about postmaster initiated
- * restarts?
- */
- if (ProcDiePending)
- elog(FATAL, "bg worker [%d/%d]: Terminated via ProcDie",
- MyProcPid, MyBackendId);
-
- while ((msg = IMessageCheck()) && !terminate_worker)
- {
- #ifdef COORDINATOR_DEBUG
- ereport(DEBUG3,
- (errmsg("bg worker [%d/%d]: received message %s of size %d "
- "from backend id %d, db %d",
- MyProcPid, MyBackendId,
- decode_imessage_type(msg->type),
- msg->size, msg->sender,
- MyDatabaseId)));
- #endif
-
- switch (msg->type)
- {
- case IMSGT_TERM_WORKER:
- IMessageRemove(msg);
- terminate_worker = true;
- break;
-
- case IMSGT_PERFORM_VACUUM:
- /* immediately remove the message to free shared memory */
- IMessageRemove(msg);
-
- bgworker_job_initialize(WS_AUTOVACUUM);
-
- /*
- * Add ourselves to the list of runningWorkers
- */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
- SHMQueueInsertBefore(&CoordinatorShmem->co_runningWorkers,
- &MyWorkerInfo->wi_links);
- LWLockRelease(WorkerInfoLock);
-
- /* do an appropriate amount of work */
- do_autovacuum();
-
- /*
- * Remove ourselves from the list of runningWorkers and
- * mark as available background worker.
- */
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
- SHMQueueDelete(&MyWorkerInfo->wi_links);
- LWLockRelease(WorkerInfoLock);
-
- bgworker_job_completed();
- break;
-
- default:
- /* keep shared memory clean */
- IMessageRemove(msg);
-
- ereport(WARNING,
- (errmsg("bg worker [%d]: invalid message type "
- "'%c' ignored",
- MyProcPid, msg->type)));
- }
-
- CHECK_FOR_INTERRUPTS();
- }
-
- }
- PG_CATCH();
- {
- ErrorData *errdata;
- MemoryContext ecxt;
-
- ecxt = MemoryContextSwitchTo(BgWorkerMemCxt);
- errdata = CopyErrorData();
-
- elog(WARNING, "bg worker [%d/%d]: caught error '%s' in %s:%d, state %s",
- MyProcPid, MyBackendId, errdata->message,
- errdata->filename, errdata->lineno,
- decode_worker_state(MyWorkerInfo->wi_state));
-
- /*
- * Inform the coordinator about the failure.
- */
- bgworker_job_failed(errdata->sqlerrcode);
-
- if (errdata->sqlerrcode == ERRCODE_QUERY_CANCELED)
- {
- #ifdef DEBUG_CSET_APPL
- elog(DEBUG3, "bg worker [%d/%d]: cancelled active job.",
- MyProcPid, MyBackendId);
- #endif
-
- bgworker_reset();
- }
- else
- {
- elog(WARNING, "bg worker [%s:%d]: unexpected error %d: '%s'!\n"
- " triggered from %s:%d (in %s)\n",
- __FILE__, __LINE__, errdata->sqlerrcode,
- errdata->message, errdata->filename, errdata->lineno,
- errdata->funcname);
- /* re-throw the error, so the backend quits */
- MemoryContextSwitchTo(ecxt);
- PG_RE_THROW();
- }
- }
- PG_END_TRY();
- }
-
-
- /* All done, go away */
- ereport(DEBUG1, (errmsg("bg worker [%d/%d]: terminating",
- MyProcPid, MyBackendId)));
- proc_exit(0);
- }
-
- /*
- * Return a WorkerInfo to the free list
- */
- static void
- FreeWorkerInfo(int code, Datum arg)
- {
- co_database *codb;
- if (MyWorkerInfo != NULL)
- {
- LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
-
- if (!SHMQueueIsDetached(&MyWorkerInfo->wi_links))
- SHMQueueDelete(&MyWorkerInfo->wi_links);
-
- MyWorkerInfo->wi_links.next = (SHM_QUEUE *) CoordinatorShmem->co_freeWorkers;
- MyWorkerInfo->wi_dboid = InvalidOid;
- MyWorkerInfo->wi_tableoid = InvalidOid;
- MyWorkerInfo->wi_backend_id = InvalidBackendId;
- MyWorkerInfo->wi_launchtime = 0;
- MyWorkerInfo->wi_cost_delay = 0;
- MyWorkerInfo->wi_cost_limit = 0;
- MyWorkerInfo->wi_cost_limit_base = 0;
- CoordinatorShmem->co_freeWorkers = MyWorkerInfo;
- /* not mine anymore */
- MyWorkerInfo = NULL;
-
- /* decrease the conn count */
- LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
- codb = hash_search(co_databases, &MyDatabaseId, HASH_FIND, NULL);
- Assert(codb);
- codb->codb_num_connected_workers--;
- LWLockRelease(CoordinatorDatabasesLock);
-
- LWLockRelease(WorkerInfoLock);
- }
- }
-
- /*
* Update the cost-based delay parameters, so that multiple workers consume
* each a fraction of the total available I/O.
*/
--- 725,735 ----
*************** AutoVacuumUpdateDelay(void)
*** 2550,2556 ****
*
* Caller must hold the WorkerInfoLock in exclusive mode.
*/
! static void
autovac_balance_cost(void)
{
WorkerInfo worker;
--- 749,755 ----
*
* Caller must hold the WorkerInfoLock in exclusive mode.
*/
! void
autovac_balance_cost(void)
{
WorkerInfo worker;
*************** autovac_balance_cost(void)
*** 2624,2692 ****
}
/*
- * get_database_list
- * Return a list of all databases found in pg_database.
- *
- * Note: this is the only function in which the coordinator uses a
- * transaction. Although we aren't attached to any particular database and
- * therefore can't access most catalogs, we do have enough infrastructure
- * to do a seqscan on pg_database.
- */
- static List *
- get_database_list(void)
- {
- List *dblist = NIL;
- Relation rel;
- HeapScanDesc scan;
- HeapTuple tup;
-
- /*
- * Start a transaction so we can access pg_database, and get a snapshot.
- * We don't have a use for the snapshot itself, but we're interested in
- * the secondary effect that it sets RecentGlobalXmin. (This is critical
- * for anything that reads heap pages, because HOT may decide to prune
- * them even if the process doesn't attempt to modify any tuples.)
- */
- StartTransactionCommand();
- (void) GetTransactionSnapshot();
-
- /* Allocate our results in CoordinatorMemCxt, not transaction context */
- MemoryContextSwitchTo(CoordinatorMemCxt);
-
- rel = heap_open(DatabaseRelationId, AccessShareLock);
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
-
- while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
- {
- Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
- avw_dbase *avdb;
-
- avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
-
- avdb->adw_datid = HeapTupleGetOid(tup);
- avdb->adw_name = pstrdup(NameStr(pgdatabase->datname));
- avdb->adw_frozenxid = pgdatabase->datfrozenxid;
- /* this gets set later: */
- avdb->adw_entry = NULL;
-
- dblist = lappend(dblist, avdb);
- }
-
- heap_endscan(scan);
- heap_close(rel, AccessShareLock);
-
- CommitTransactionCommand();
-
- return dblist;
- }
-
- /*
* Process a database table-by-table
*
* Note that CHECK_FOR_INTERRUPTS is supposed to be used in certain spots in
* order not to ignore shutdown commands for too long.
*/
! static void
do_autovacuum(void)
{
Relation classRel;
--- 823,834 ----
}
/*
* Process a database table-by-table
*
* Note that CHECK_FOR_INTERRUPTS is supposed to be used in certain spots in
* order not to ignore shutdown commands for too long.
*/
! void
do_autovacuum(void)
{
Relation classRel;
*************** autovac_init(void)
*** 3576,3688 ****
}
/*
- * process identification functions
- * Return whether this is either a coordinator process or a background
- * worker process.
- */
- bool
- IsCoordinatorProcess(void)
- {
- return am_coordinator;
- }
-
- bool
- IsBackgroundWorkerProcess(void)
- {
- return am_background_worker;
- }
-
- /*
- * GetCoordinatorId
- * Returns the backendId of the currently active coordinator process.
- */
- BackendId
- GetCoordinatorId(void)
- {
- BackendId CoordinatorId;
-
- LWLockAcquire(WorkerInfoLock, LW_SHARED);
- CoordinatorId = CoordinatorShmem->co_coordinatorid;
- LWLockRelease(WorkerInfoLock);
-
- return CoordinatorId;
- }
-
- /*
- * CoordinatorShmemSize
- * Compute space needed for autovacuum-related shared memory
- */
- Size
- CoordinatorShmemSize(void)
- {
- Size size;
-
- /*
- * Need the fixed struct and the array of WorkerInfoData, plus per
- * database entries in a hash. As we only track databases which have at
- * least one worker attached, we won't ever need more than
- * max_background_workers entries.
- */
- size = sizeof(CoordinatorShmemStruct);
- size = MAXALIGN(size);
- size = add_size(size, mul_size(max_background_workers,
- sizeof(WorkerInfoData)));
- size = add_size(size, hash_estimate_size(max_background_workers,
- sizeof(co_database)));
- return size;
- }
-
- /*
- * CoordinatorShmemInit
- * Allocate and initialize autovacuum-related shared memory
- */
- void
- CoordinatorShmemInit(void)
- {
- HASHCTL hctl;
- bool found;
-
- CoordinatorShmem = (CoordinatorShmemStruct *)
- ShmemInitStruct("Background Worker Data",
- CoordinatorShmemSize(),
- &found);
-
- if (!IsUnderPostmaster)
- {
- WorkerInfo worker;
- int i;
-
- Assert(!found);
-
- CoordinatorShmem->co_coordinatorid = InvalidBackendId;
- CoordinatorShmem->co_freeWorkers = NULL;
- SHMQueueInit(&CoordinatorShmem->co_runningWorkers);
- CoordinatorShmem->co_startingWorker = NULL;
-
- worker = (WorkerInfo) ((char *) CoordinatorShmem +
- MAXALIGN(sizeof(CoordinatorShmemStruct)));
-
- /* initialize the WorkerInfo free list */
- for (i = 0; i < max_background_workers; i++)
- {
- worker[i].wi_links.next = (SHM_QUEUE *) CoordinatorShmem->co_freeWorkers;
- CoordinatorShmem->co_freeWorkers = &worker[i];
- }
- }
- else
- Assert(found);
-
- hctl.keysize = sizeof(Oid);
- hctl.entrysize = sizeof(co_database);
- hctl.hash = oid_hash;
- co_databases = ShmemInitHash("Coordinator Database Info",
- max_background_workers,
- max_background_workers,
- &hctl,
- HASH_ELEM | HASH_FUNCTION);
- }
-
- /*
* autovac_refresh_stats
* Refresh pgstats data for an autovacuum process
*
--- 1718,1723 ----
============================================================
*** src/include/postmaster/autovacuum.h dc90d7a23ab19ae8e9f5fb9815469058bce97f31
--- src/include/postmaster/autovacuum.h fdfbead934ecaafdcfbed3908d62cd76853b9cc1
***************
*** 1,7 ****
/*-------------------------------------------------------------------------
*
* autovacuum.h
! * header file for integrated autovacuum daemon
*
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
--- 1,7 ----
/*-------------------------------------------------------------------------
*
* autovacuum.h
! * header file for integrated autovacuum feature
*
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
***************
*** 17,96 ****
#include "postgres.h"
#include "pgstat.h"
#include "lib/dllist.h"
- #include "storage/imsg.h"
#include "storage/lock.h"
- /*
- * Valid backend states for background workers.
- */
- typedef enum
- {
- WS_IDLE = 'I',
-
- WS_AUTOVACUUM = 'V',
-
- } worker_state;
-
- #define IsIdleWorker(wi) (IsBackgroundWorkerProcess() && (wi->wi_state == WS_IDLE))
- #define IsAutoVacuumWorker(wi) (IsBackgroundWorkerProcess() && (wi->wi_state == WS_AUTOVACUUM))
-
-
- /*-------------
- * This struct holds information about a single worker's whereabouts. We keep
- * an array of these in shared memory, sized according to
- * max_background_workers.
- *
- * wi_links entry into free list or running list
- * wi_dboid OID of the database this worker is supposed to work on
- * wi_backend_id id of the running worker backend, NULL if not started
- * wi_launchtime Time at which this worker was launched
- *
- * wi_tableoid OID of the table currently being vacuumed
- * wi_cost_* Vacuum cost-based delay parameters current in this worker
- *
- * All fields are protected by WorkerInfoLock, except for wi_tableoid which is
- * protected by WorkerScheduleLock (which is read-only for everyone except
- * that worker itself).
- *-------------
- */
- typedef struct WorkerInfoData
- {
- SHM_QUEUE wi_links;
- Oid wi_dboid;
- BackendId wi_backend_id;
- TimestampTz wi_launchtime;
- worker_state wi_state;
-
- /* autovacuum specific fields */
- Oid wi_tableoid;
- int wi_cost_delay;
- int wi_cost_limit;
- int wi_cost_limit_base;
- } WorkerInfoData;
-
- typedef struct WorkerInfoData *WorkerInfo;
-
- /* struct to keep track of databases in the coordinator */
- typedef struct co_database
- {
- Oid codb_dboid;
-
- /* for internal use by the coordinator */
- int codb_num_cached_jobs;
- Dllist codb_cached_jobs;
-
- /* tracking of idle workers */
- int codb_num_idle_workers;
- SHM_QUEUE codb_idle_workers;
-
- int codb_num_connected_workers;
- } co_database;
-
/* GUC variables */
extern bool autovacuum_enabled;
- extern int max_background_workers;
- extern int min_spare_background_workers;
- extern int max_spare_background_workers;
extern int autovacuum_naptime;
extern int autovacuum_vac_thresh;
extern double autovacuum_vac_scale;
--- 17,26 ----
*************** extern int autovacuum_vac_cost_limit;
*** 99,135 ****
extern int autovacuum_freeze_max_age;
extern int autovacuum_vac_cost_delay;
extern int autovacuum_vac_cost_limit;
-
extern int Log_autovacuum_min_duration;
- extern char *decode_worker_state(worker_state state);
-
- /* Status inquiry functions */
- extern bool IsCoordinatorProcess(void);
- extern bool IsBackgroundWorkerProcess(void);
- extern BackendId GetCoordinatorId(void);
-
/* Functions to start autovacuum process, called from postmaster */
extern void autovac_init(void);
- extern int StartCoordinator(void);
- extern int StartBackgroundWorker(void);
/* autovacuum cost-delay balancer */
extern void AutoVacuumUpdateDelay(void);
- #ifdef EXEC_BACKEND
- extern void CoordinatorMain(int argc, char *argv[]);
- extern void BackgroundWorkerMain(int argc, char *argv[]);
- extern void BackgroundWorkerIAm(void);
- extern void CoordinatorIAm(void);
- #endif
-
- /* shared memory stuff */
- extern Size CoordinatorShmemSize(void);
- extern void CoordinatorShmemInit(void);
-
- /* bgworker job management functions */
- extern void bgworker_job_failed(int errcode);
- extern void bgworker_reset(void);
-
#endif /* AUTOVACUUM_H */
--- 29,52 ----
extern int autovacuum_freeze_max_age;
extern int autovacuum_vac_cost_delay;
extern int autovacuum_vac_cost_limit;
extern int Log_autovacuum_min_duration;
/* Functions to start autovacuum process, called from postmaster */
extern void autovac_init(void);
+ /* Functions called from the coordinator */
+ extern void autovacuum_maybe_trigger_job(TimestampTz current_time,
+ bool can_launch);
+ extern void coordinator_determine_sleep(bool canlaunch, bool recursing,
+ struct timespec *nap);
+ extern void autovacuum_update_timing(Oid dbid, TimestampTz now);
+ extern void autovacuum_check_timings(void);
+ extern void rebuild_database_list(Oid newdb);
+ extern void do_autovacuum(void);
+ extern void autovac_balance_cost(void);
+ extern Oid autovacuum_select_database(void);
+
/* autovacuum cost-delay balancer */
extern void AutoVacuumUpdateDelay(void);
#endif /* AUTOVACUUM_H */
============================================================
*** src/backend/access/gin/ginvacuum.c a5ee1da6e8a01eaa4e7df35f77e1b3696de99f4b
--- src/backend/access/gin/ginvacuum.c 9797e5672e4c59ddfdc9f6b6b5a141c0eeacce09
***************
*** 19,25 ****
#include "catalog/storage.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
! #include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
--- 19,25 ----
#include "catalog/storage.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
! #include "postmaster/coordinator.h"
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
============================================================
*** /dev/null
--- src/backend/postmaster/coordinator.c c12c70fed2f6025831ef4ba9555af0debe063003
***************
*** 1,0 ****
--- 1,1993 ----
+ /*-------------------------------------------------------------------------
+ *
+ * coordinator.c
+ *
+ * PostgreSQL Coordinator of Background Worker
+ *
+ * The background worker system is structured in two different kinds of
+ * processes: the coordinator and the background workers. The coordinator
+ * is an always-running process, started by the postmaster. It schedules
+ * background workers to be started when appropriate. The workers are the
+ * processes which execute the actual job, for example vacuuming; they
+ * connect to a database as requested by the coordinator, and once connected
+ * register with the coordinator to receive background jobs to process for
+ * that database.
+ *
+ * The coordinator cannot start the worker processes by itself, because doing
+ * so would cause robustness issues (namely, failure to shut them down on
+ * exceptional conditions, and also, since the coordinator is connected to
+ * shared memory and is thus subject to corruption there, it is not as robust
+ * as the postmaster). So it leaves that task to the postmaster.
+ *
+ * There is a shared memory area for the coordinator, where it stores state
+ * information about the background workers. When it wants a new worker to
+ * start, it sets a flag in shared memory and sends a signal to the
+ * postmaster. Then postmaster knows nothing more than it must start a worker;
+ * so it forks a new child, which turns into a background worker. This new
+ * process connects to shared memory, and there it can inspect the information
+ * that the launcher has set up, including what database to connect to.
+ *
+ * If the fork() call fails in the postmaster, it sets a flag in the shared
+ * memory area, and sends a signal to the coordinator. The coordinator, upon
+ * noticing the flag, can try again to start a worker for the same database
+ * by resending the signal. Note that the failure can only be transient (fork
+ * failure due to high load, memory pressure, too many processes, etc); more
+ * permanent problems, like failure to connect to a database, are detected
+ * later in the worker and dealt with just by having the background worker
+ * exit normally. The coordinator will launch a new worker again later, per
+ * schedule. (FIXME: that might be fine for VACUUM, but not for replication).
+ *
+ * When a background worker is done with a job it sends an IMSGT_READY to the
+ * coordinator, signaling that it is ready to process another job for the
+ * database it is connected to.
+ *
+ * Note that there can be more than one background worker in a database
+ * concurrently. The coordinator takes care of enforcing a lower and an
+ * upper limit of spare workers (i.e. workers that are still connected to a
+ * database, but are still waiting for a next job to process).
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include <signal.h>
+ #include <sys/types.h>
+ #include <sys/time.h>
+ #include <time.h>
+ #include <unistd.h>
+
+ #include "access/heapam.h"
+ #include "access/reloptions.h"
+ #include "access/transam.h"
+ #include "access/xact.h"
+ #include "catalog/dependency.h"
+ #include "catalog/namespace.h"
+ #include "catalog/pg_database.h"
+ #include "commands/dbcommands.h"
+ #include "commands/vacuum.h"
+ #include "libpq/pqsignal.h"
+ #include "miscadmin.h"
+ #include "pgstat.h"
+ #include "postmaster/autovacuum.h"
+ #include "postmaster/coordinator.h"
+ #include "postmaster/fork_process.h"
+ #include "postmaster/postmaster.h"
+ #include "storage/bufmgr.h"
+ #include "storage/ipc.h"
+ #include "storage/pmsignal.h"
+ #include "storage/proc.h"
+ #include "storage/procarray.h"
+ #include "storage/procsignal.h"
+ #include "storage/sinvaladt.h"
+ #include "tcop/tcopprot.h"
+ #include "utils/fmgroids.h"
+ #include "utils/lsyscache.h"
+ #include "utils/memutils.h"
+ #include "utils/ps_status.h"
+ #include "utils/snapmgr.h"
+ #include "utils/syscache.h"
+ #include "utils/tqual.h"
+
+
+ /*
+ * GUC parameters
+ */
+ int max_background_workers;
+ int min_spare_background_workers;
+ int max_spare_background_workers;
+
+ /* Flags to tell if we are in a coordinator or background worker process */
+ static bool am_coordinator = false;
+ static bool am_background_worker = false;
+
+ /* Flags set by signal handlers */
+ static volatile sig_atomic_t got_SIGHUP = false;
+ static volatile sig_atomic_t got_SIGUSR2 = false;
+ static volatile sig_atomic_t got_SIGTERM = false;
+
+ /* Memory contexts for long-lived data */
+ MemoryContext CoordinatorMemCxt;
+ MemoryContext BgWorkerMemCxt;
+
+ typedef struct cached_job {
+ Dlelem cj_links;
+ IMessage *cj_msg;
+ } cached_job;
+
+ CoordinatorShmemStruct *CoordinatorShmem;
+
+ /*
+ * Table of databases with at least one connected worker, resides in shared
+ * memory, protected by CoordinatorDatabasesLock
+ */
+ HTAB *co_databases = NULL;
+
+ /* the database list in the launcher, and the context that contains it */
+ Dllist *DatabaseList = NULL;
+ MemoryContext DatabaseListCxt = NULL;
+
+ /* Pointer to my own WorkerInfo, valid on each worker */
+ WorkerInfo MyWorkerInfo = NULL;
+ WorkerInfo terminatable_worker = NULL;
+
+ #ifdef EXEC_BACKEND
+ static pid_t coordinator_forkexec(void);
+ static pid_t bgworker_forkexec(void);
+ #endif
+ NON_EXEC_STATIC void BackgroundWorkerMain(int argc, char *argv[]);
+ static void handle_imessage(IMessage *msg);
+ NON_EXEC_STATIC void CoordinatorMain(int argc, char *argv[]);
+
+ static void init_co_database(co_database *codb);
+ static void populate_co_databases(void);
+
+ static bool can_deliver_cached_job(co_database *codb, IMessage *msg,
+ BackendId *target);
+ static WorkerInfo get_idle_worker(co_database *codb);
+ static void cache_job(IMessage *msg, co_database *codb);
+ static void forward_job(IMessage *msg, co_database *codb,
+ BackendId backend_id);
+ static void process_cached_jobs(co_database *codb);
+
+ static void manage_workers(bool can_launch);
+
+ static void do_start_worker(Oid dboid);
+
+ static void add_as_idle_worker(Oid dbid, bool inc_connected_count);
+ static void FreeWorkerInfo(int code, Datum arg);
+
+ static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr2_handler(SIGNAL_ARGS);
+ static void avl_sigterm_handler(SIGNAL_ARGS);
+
+
+ char *
+ decode_worker_state(worker_state state)
+ {
+ switch (state)
+ {
+ case WS_IDLE: return "WS_IDLE";
+ case WS_AUTOVACUUM: return "WS_AUTOVACUUM";
+
+ default: return "UNKNOWN STATE";
+ }
+ }
+
+
+ /********************************************************************
+ * COORDINATOR CODE
+ ********************************************************************/
+
+ #ifdef EXEC_BACKEND
+ /*
+ * forkexec routine for the coordinator process.
+ *
+ * Format up the arglist, then fork and exec.
+ */
+ static pid_t
+ coordinator_forkexec(void)
+ {
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkcoordinator";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+ }
+
+ /*
+ * We need this set from the outside, before InitProcess is called
+ */
+ void
+ CoordinatorIAm(void)
+ {
+ am_coordinator = true;
+ }
+ #endif
+
+ /*
+ * Main entry point for coordinator process, to be called from the postmaster.
+ */
+ int
+ StartCoordinator(void)
+ {
+ pid_t CoordinatorPID;
+
+ #ifdef EXEC_BACKEND
+ switch ((CoordinatorPID = coordinator_forkexec()))
+ #else
+ switch ((CoordinatorPID = fork_process()))
+ #endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork the coordinator process: %m")));
+ return 0;
+
+ #ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ /* Lose the postmaster's on-exit routines */
+ on_exit_reset();
+
+ CoordinatorMain(0, NULL);
+ break;
+ #endif
+ default:
+ return (int) CoordinatorPID;
+ }
+
+ /* shouldn't get here */
+ return 0;
+ }
+
+ static void
+ init_co_database(co_database *codb)
+ {
+ Assert(ShmemAddrIsValid(codb));
+ SHMQueueInit(&codb->codb_idle_workers);
+ codb->codb_num_idle_workers = 0;
+
+ /*
+ * While only the coordinator may fiddle with this list, as its entries
+ * reside in that process' memory, it's safe to set the counters to 0
+ * and initialize the list headers with NULL values using DLInitList().
+ */
+ codb->codb_num_cached_jobs = 0;
+ DLInitList(&codb->codb_cached_jobs);
+
+ codb->codb_num_connected_workers = 0;
+ }
+
+ static void
+ cache_job(IMessage *msg, co_database *codb)
+ {
+ cached_job *job;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: caching job of type %s for database %d",
+ decode_imessage_type(msg->type), codb->codb_dboid);
+ #endif
+
+ job = palloc(sizeof(cached_job));
+ DLInitElem(&job->cj_links, job);
+ job->cj_msg = msg;
+ DLAddTail(&codb->codb_cached_jobs, &job->cj_links);
+ codb->codb_num_cached_jobs++;
+ }
+
+ /*
+ * get_idle_worker
+ *
+ * Returns the first idle worker for a given database, removing it from its
+ * list of idle workers. The caller is expected to make sure that there is
+ * at least one idle worker and it must hold the CoordinatorDatabasesLock.
+ */
+ static WorkerInfo
+ get_idle_worker(co_database *codb)
+ {
+ WorkerInfo worker;
+
+ /* remove a worker from the list of idle workers */
+ worker = (WorkerInfo) SHMQueueNext(&codb->codb_idle_workers,
+ &codb->codb_idle_workers,
+ offsetof(WorkerInfoData, wi_links));
+ Assert(worker);
+ SHMQueueDelete(&worker->wi_links);
+ Assert(worker->wi_backend_id != InvalidBackendId);
+
+ /* maintain per-database counter */
+ codb->codb_num_idle_workers--;
+
+ return worker;
+ }
+
+ /*
+ * forward_job
+ *
+ * Takes an imessage and forwards it to the first idle backend for the given
+ * database as its next job to process. The caller must hold the
+ * CoordinatorDatabasesLock.
+ */
+ static void
+ forward_job(IMessage *msg, co_database *codb, BackendId backend_id)
+ {
+ /* various actions before job delivery depending on the message type */
+ switch (msg->type)
+ {
+ case IMSGT_TERM_WORKER:
+ break;
+
+ case IMSGT_PERFORM_VACUUM:
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG1, "Coordinator: delivering msg %s of size %d for "
+ "database %d to backend %d",
+ decode_imessage_type(msg->type), msg->size, codb->codb_dboid,
+ backend_id);
+ #endif
+ autovacuum_update_timing(codb->codb_dboid, GetCurrentTimestamp());
+ break;
+
+ default:
+ /* no-op */
+ break;
+ }
+
+ IMessageActivate(msg, backend_id);
+ }
+
+ /*
+ * dispatch_job
+ *
+ * Depending on the status of idle workers, this either forwards a job to an
+ * idle worker directly or caches it for later processing. The caller must
+ * hold the CoordinatorDatabasesLock.
+ */
+ void
+ dispatch_job(IMessage *msg, co_database *codb)
+ {
+ WorkerInfo worker;
+
+ if (codb->codb_num_idle_workers > 0)
+ {
+ worker = get_idle_worker(codb);
+ forward_job(msg, codb, worker->wi_backend_id);
+ }
+ else
+ cache_job(msg, codb);
+ }
+
+ /*
+ * process_cached_jobs
+ *
+ * Dispatches cached jobs to idle background workers, as long as there are
+ * of both. Before delivering a job, an additional check is performed with
+ * can_deliver_cached_job(), which also chooses the background worker to run
+ * the job on.
+ */
+ static void
+ process_cached_jobs(co_database *codb)
+ {
+ BackendId target;
+ cached_job *job;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: cached jobs: %d, idle workers: %d",
+ codb->codb_num_cached_jobs, codb->codb_num_idle_workers);
+ #endif
+
+ job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
+ while ((codb->codb_num_cached_jobs > 0) &&
+ (codb->codb_num_idle_workers > 0) &&
+ (job != NULL))
+ {
+ target = InvalidBackendId;
+ if (can_deliver_cached_job(codb, job->cj_msg, &target))
+ {
+ /* remove the job from the cache */
+ DLRemove(&job->cj_links);
+ codb->codb_num_cached_jobs--;
+
+ /* forward the job to some idle worker and cleanup */
+ if (target == InvalidBackendId)
+ target = get_idle_worker(codb)->wi_backend_id;
+
+ forward_job(job->cj_msg, codb, target);
+ pfree(job);
+
+ job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
+ }
+ else
+ job = (cached_job*) DLGetSucc(&job->cj_links);
+ }
+ }
+
+ /*
+ * populate_co_databases
+ *
+ * Called at startup of the coordinator to scan pg_database. Schedules an
+ * initial VACUUM job on the template database to populate pg_stat.
+ */
+ static void
+ populate_co_databases()
+ {
+ List *dblist;
+ ListCell *cell;
+ IMessage *msg;
+
+ dblist = get_database_list();
+ LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ foreach(cell, dblist)
+ {
+ avw_dbase *avdb = lfirst(cell);
+ co_database *codb = get_co_database(avdb->adw_datid);
+ if (codb->codb_dboid == TemplateDbOid)
+ {
+ /*
+ * Create a cached job as an imessage to ourselves, but without
+ * activating it. It can get forwarded to a backend later on.
+ */
+ msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
+ cache_job(msg, codb);
+ }
+ }
+ LWLockRelease(CoordinatorDatabasesLock);
+ }
+
+ /*
+ * Main loop for the coordinator process.
+ */
+ NON_EXEC_STATIC void
+ CoordinatorMain(int argc, char *argv[])
+ {
+ sigjmp_buf local_sigjmp_buf;
+ IMessage *msg = NULL;
+ bool can_launch;
+
+ /* we are a postmaster subprocess now */
+ IsUnderPostmaster = true;
+ am_coordinator = true;
+
+ /* reset MyProcPid */
+ MyProcPid = getpid();
+
+ /* record Start Time for logging */
+ MyStartTime = time(NULL);
+
+ /* Identify myself via ps */
+ init_ps_display("coordinator process", "", "", "");
+
+ ereport(LOG,
+ (errmsg("coordinator started")));
+
+ if (PostAuthDelay)
+ pg_usleep(PostAuthDelay * 1000000L);
+
+ SetProcessingMode(InitProcessing);
+
+ /*
+ * If possible, make this process a group leader, so that the postmaster
+ * can signal any child processes too. (coordinator probably never has
+ * any child processes, but for consistency we make all postmaster child
+ * processes do this.)
+ */
+ #ifdef HAVE_SETSID
+ if (setsid() < 0)
+ elog(FATAL, "setsid() failed: %m");
+ #endif
+
+ /*
+ * Set up signal handlers. We operate on databases much like a regular
+ * backend, so we use the same signal handling. See equivalent code in
+ * tcop/postgres.c.
+ */
+ pqsignal(SIGHUP, avl_sighup_handler);
+ pqsignal(SIGINT, StatementCancelHandler);
+ pqsignal(SIGTERM, avl_sigterm_handler);
+
+ pqsignal(SIGQUIT, quickdie);
+ pqsignal(SIGALRM, handle_sig_alarm);
+
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, avl_sigusr2_handler);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /* Early initialization */
+ BaseInit();
+
+ /*
+ * Create a per-backend PGPROC struct in shared memory, except in the
+ * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
+ * this before we can use LWLocks (and in the EXEC_BACKEND case we already
+ * had to do some stuff with LWLocks).
+ */
+ #ifndef EXEC_BACKEND
+ InitProcess();
+ #endif
+
+ InitPostgres(NULL, InvalidOid, NULL, NULL);
+
+ SetProcessingMode(NormalProcessing);
+
+ /*
+ * Create a memory context that we will do all our work in. We do this so
+ * that we can reset the context during error recovery and thereby avoid
+ * possible memory leaks.
+ */
+ CoordinatorMemCxt = AllocSetContextCreate(TopMemoryContext,
+ "Coordinator",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(CoordinatorMemCxt);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * This code is a stripped down version of PostgresMain error recovery.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Forget any pending QueryCancel request */
+ QueryCancelPending = false;
+ disable_sig_alarm(true);
+ QueryCancelPending = false; /* again in case timeout occurred */
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /* Abort the current transaction in order to recover */
+ AbortOutOfAnyTransaction();
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(CoordinatorMemCxt);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(CoordinatorMemCxt);
+
+ /* don't leave dangling pointers to freed memory */
+ DatabaseListCxt = NULL;
+ DatabaseList = NULL;
+
+ /*
+ * Make sure pgstat also considers our stat data as gone. Note: we
+ * mustn't use autovac_refresh_stats here.
+ */
+ pgstat_clear_snapshot();
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /*
+ * Sleep at least 1 second after any error. We don't want to be
+ * filling the error logs as fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /* must unblock signals before calling rebuild_database_list */
+ PG_SETMASK(&UnBlockSig);
+
+ CoordinatorShmem->co_coordinatorid = MyBackendId;
+
+ /*
+ * Initial population of the database list from pg_database
+ */
+ populate_co_databases();
+
+ /*
+ * Create the initial database list. The invariant we want this list to
+ * keep is that it's ordered by decreasing next_time. As soon as an entry
+ * is updated to a higher time, it will be moved to the front (which is
+ * correct because the only operation is to add autovacuum_naptime to the
+ * entry, and time always increases).
+ */
+ rebuild_database_list(InvalidOid);
+
+ for (;;)
+ {
+ TimestampTz current_time;
+ struct timespec nap;
+ sigset_t sigmask, oldmask;
+ fd_set socks;
+ int max_sock_id;
+ bool socket_ready;
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ proc_exit(1);
+
+ can_launch = (CoordinatorShmem->co_freeWorkers != NULL);
+ coordinator_determine_sleep(can_launch, false, &nap);
+
+ /* Initialize variables for listening on sockets */
+ FD_ZERO(&socks);
+ max_sock_id = 0;
+ socket_ready = false;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG1, "Coordinator: listening...");
+ #endif
+
+ /* Allow sinval catchup interrupts while sleeping */
+ EnableCatchupInterrupt();
+
+ /*
+ * Sleep for a while according to schedule - and possibly interrupted
+ * by messages from one of the sockets or by internal messages from
+ * background workers or normal backends.
+ *
+ * Using pselect here prevents the possible loss of a singnal in
+ * between the last check for imessages and following select call.
+ * However, it requires a newish platform that supports pselect.
+ *
+ * On some platforms, signals won't interrupt select. Postgres used
+ * to split the nap time into one second intervals to ensure to react
+ * reasonably promptly for autovacuum purposes. However, for
+ * Postgres-R this is not tolerable, so that mechanism has been
+ * removed.
+ *
+ * FIXME: to support these platforms or others that don't implement
+ * pselect properly, another work-around like for example the
+ * self-pipe trick needs to be implemented. On Windows, we
+ * could implement pselect based on the current port's select
+ * method.
+ */
+
+ /* FIXME: indentation */
+ {
+
+ sigemptyset(&sigmask);
+ sigaddset(&sigmask, SIGINT);
+ sigaddset(&sigmask, SIGHUP);
+ sigaddset(&sigmask, SIGUSR2);
+ sigprocmask(SIG_BLOCK, &sigmask, &oldmask);
+
+ sigemptyset(&sigmask);
+
+ if (pselect(max_sock_id + 1, &socks, NULL, NULL, &nap,
+ &sigmask) < 0)
+ {
+ if (errno != EINTR)
+ {
+ elog(WARNING, "Coordinator: pselect failed: %m");
+ socket_ready = true;
+ }
+ }
+ else
+ socket_ready = true;
+
+ sigprocmask(SIG_SETMASK, &oldmask, NULL);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ proc_exit(1);
+ }
+
+ DisableCatchupInterrupt();
+
+ /* the normal shutdown case */
+ if (got_SIGTERM)
+ break;
+
+ if (got_SIGHUP)
+ {
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: got SIGHUP");
+ #endif
+
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ /* rebalance in case the default cost parameters changed */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(WorkerInfoLock);
+
+ /* rebuild the list in case the naptime changed */
+ rebuild_database_list(InvalidOid);
+ }
+
+ /*
+ * postmaster signalled failure to start a worker
+ */
+ if (got_SIGUSR2)
+ {
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: got SIGUSR2");
+ #endif
+
+ got_SIGUSR2 = false;
+
+ /* FIXME: fix indentation after merging */
+ {
+
+ /*
+ * If the postmaster failed to start a new worker, we sleep
+ * for a little while and resend the signal. The new worker's
+ * state is still in memory, so this is sufficient. After
+ * that, we restart the main loop.
+ *
+ * XXX should we put a limit to the number of times we retry?
+ * I don't think it makes much sense, because a future start
+ * of a worker will continue to fail in the same way.
+ *
+ * FIXME: for the autovac launcher, it might have been okay
+ * to just sleep. But the coordinator needs to remain
+ * as responsive as possible, even if the postmaster
+ * is currently unable to fork new workers.
+ */
+ pg_usleep(1000000L); /* 1s */
+ SendPostmasterSignal(PMSIGNAL_START_BGWORKER);
+ continue;
+ }
+ }
+
+ /* handle sockets with pending reads, just a placeholder for now */
+ if (socket_ready)
+ {
+ }
+
+ /* handle pending imessages */
+ while ((msg = IMessageCheck()) != NULL)
+ handle_imessage(msg);
+
+ current_time = GetCurrentTimestamp();
+ can_launch = CoordinatorCanLaunchWorker(current_time);
+
+ /*
+ * Periodically check and trigger autovacuum workers, if autovacuum
+ * is enabled.
+ */
+ if (autovacuum_enabled)
+ autovacuum_maybe_trigger_job(current_time, can_launch);
+
+ manage_workers(can_launch);
+ }
+
+ /* Normal exit from the coordinator is here */
+ ereport(LOG,
+ (errmsg("coordinator shutting down")));
+ CoordinatorShmem->co_coordinatorid = InvalidBackendId;
+
+ proc_exit(0); /* done */
+ }
+
+ void
+ handle_imessage(IMessage *msg)
+ {
+ BackendId msg_sender;
+ PGPROC *proc;
+ TransactionId local_xid = InvalidTransactionId;
+ co_database *codb = NULL;
+ Oid dboid = InvalidOid;
+
+ /*
+ * Get the PGPROC entry of the sender and the related database info, if
+ * any.
+ */
+ msg_sender = msg->sender;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ proc = BackendIdGetProc(msg_sender);
+ if (proc)
+ {
+ local_xid = proc->xid;
+ dboid = proc->databaseId;
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ #ifdef COORDINATOR_DEBUG
+ if (proc)
+ elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
+ "\t(connected to db %d, local xid %d)",
+ decode_imessage_type(msg->type), msg->size, msg_sender,
+ dboid, local_xid);
+ else
+ elog(DEBUG3, "Coordinator: received %s of size %d from backend %d\n"
+ "\t(for which no PGPROC could be found)",
+ decode_imessage_type(msg->type), msg->size, msg_sender);
+ #endif
+
+ switch (msg->type)
+ {
+ /*
+ * Standard messages from background worker processes
+ */
+ case IMSGT_REGISTER_WORKER:
+ case IMSGT_READY:
+ /* consume the message */
+ IMessageRemove(msg);
+
+ LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
+ codb = get_co_database(dboid);
+ process_cached_jobs(codb);
+ LWLockRelease(CoordinatorDatabasesLock);
+
+ /*
+ * We trigger a DatabaseList rebuild if it is still empty and
+ * after a job is done. This mainly covers the initialization
+ * phase after the first background worker is done with vacuuming
+ * template1 (and thus having populated pgstat).
+ */
+ if (DLGetHead(DatabaseList) == NULL)
+ rebuild_database_list(InvalidOid);
+
+ /*
+ * Rebalance cost limits, as the worker has already reported its
+ * startup to the stats collector. However, that needs to be
+ * removed, so there's probably no point in rebalancing here.
+ * So: FIXME.
+ */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+ autovac_balance_cost();
+ LWLockRelease(WorkerInfoLock);
+
+ break;
+
+ case IMSGT_FORCE_VACUUM:
+ /* consume the message */
+ IMessageRemove(msg);
+
+ /* trigger an autovacuum worker */
+ dboid = autovacuum_select_database();
+ if (dboid != InvalidOid)
+ {
+ LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
+ codb = get_co_database(dboid);
+ msg = IMessageCreate(IMSGT_PERFORM_VACUUM, 0);
+ dispatch_job(msg, codb);
+ LWLockRelease(CoordinatorDatabasesLock);
+ }
+ break;
+
+ default:
+ elog(WARNING, "Coordinator: unknown message type: %c, ignored!",
+ msg->type);
+ IMessageRemove(msg);
+ }
+ }
+
+
+
+ /*
+ * get_co_database
+ *
+ * Gets or creates the database info for replication in shared memory.
+ * Expects the caller to have the CoordinatorDatabasesLock.
+ */
+ co_database *
+ get_co_database(Oid dboid)
+ {
+ co_database *codb;
+ bool found;
+
+ codb = hash_search(co_databases, &dboid, HASH_ENTER, &found);
+ if (!found)
+ init_co_database(codb);
+
+ return codb;
+ }
+
+ static bool
+ can_deliver_cached_job(co_database *codb, IMessage *msg, BackendId *target)
+ {
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: checking deliverability of job type %s",
+ decode_imessage_type(msg->type));
+ #endif
+
+ switch (msg->type)
+ {
+ case IMSGT_TERM_WORKER:
+ case IMSGT_PERFORM_VACUUM:
+ return true;
+
+ default:
+ elog(WARNING, "Coordinator: missing deliverability check for "
+ "message type %s", decode_imessage_type(msg->type));
+ return false;
+ }
+ }
+
+ /*
+ * manage_workers
+ *
+ * Starts background workers for databases which have at least one cached
+ * job or which have less than min_background_workers connected. Within the
+ * same loop, the max_background_workers is checked and terminates a worker
+ * accordingly.
+ *
+ * Note that at max one worker can be requested to start or stop per
+ * invocation.
+ */
+ static void
+ manage_workers(bool can_launch)
+ {
+ HASH_SEQ_STATUS hash_status;
+ co_database *codb;
+ Oid launch_dboid = InvalidOid;
+ float max_score = 0.0,
+ score;
+ bool worker_slots_available;
+ int idle_workers_required;
+ int job_workers_required;
+
+ LWLockAcquire(WorkerInfoLock, LW_SHARED);
+ worker_slots_available = (CoordinatorShmem->co_freeWorkers != NULL);
+ LWLockRelease(WorkerInfoLock);
+
+ /*
+ * Terminate an unneeded worker that has been fetched from the list of
+ * idle workers in the last invocation. We defer sending the signal one
+ * invocation to make sure the coordinator had time to handle all
+ * pending messages from that worker. As idle workers don't ever send
+ * messages, we can safely assume there is no pending message from that
+ * worker by now.
+ */
+ if (terminatable_worker != NULL)
+ {
+ IMessage *msg;
+
+ #ifdef COORDINATOR_DEBUG
+ PGPROC *proc = BackendIdGetProc(terminatable_worker->wi_backend_id);
+ if (proc)
+ elog(DEBUG3, "Coordinator: terminating worker [%d/%d].",
+ proc->pid, terminatable_worker->wi_backend_id);
+ else
+ elog(WARNING, "Coordinator: terminating worker (no PGPROC, backend %d).",
+ terminatable_worker->wi_backend_id);
+ #endif
+
+ msg = IMessageCreate(IMSGT_TERM_WORKER, 0);
+ IMessageActivate(msg, terminatable_worker->wi_backend_id);
+
+ terminatable_worker = NULL;
+ }
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG3, "Coordinator: manage_workers: can_launch: %s, slots_available: %s",
+ (can_launch ? "true" : "false"), (worker_slots_available ? "true" : "false"));
+ #endif
+
+ /*
+ * Check the list of databases and fire the first pending request
+ * we find.
+ */
+ idle_workers_required = 0;
+ job_workers_required = 0;
+ LWLockAcquire(CoordinatorDatabasesLock, LW_SHARED);
+ hash_seq_init(&hash_status, co_databases);
+ while ((codb = (co_database*) hash_seq_search(&hash_status)))
+ {
+ score = ((float) codb->codb_num_cached_jobs /
+ (float) (codb->codb_num_connected_workers + 1)) * 100.0;
+
+ if (codb->codb_num_idle_workers < min_spare_background_workers)
+ score += (min_spare_background_workers -
+ codb->codb_num_idle_workers) * 10.0;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG3, "Coordinator: db %d, idle/conn: %d/%d, jobs: %d, score: %0.1f",
+ codb->codb_dboid, codb->codb_num_idle_workers,
+ codb->codb_num_connected_workers, codb->codb_num_cached_jobs,
+ score);
+ #endif
+
+ if (codb->codb_num_cached_jobs &&
+ (codb->codb_num_connected_workers == 0))
+ job_workers_required++;
+
+ if (codb->codb_num_idle_workers < min_spare_background_workers)
+ idle_workers_required += (min_spare_background_workers -
+ codb->codb_num_idle_workers);
+
+ /*
+ * FIXME: "misconfiguration" allows "starvation" in case the global
+ * maximum is reached all with idle workers, but other dbs
+ * w/o a single worker still have jobs.
+ */
+ if (can_launch && ((codb->codb_num_cached_jobs > 0) ||
+ (codb->codb_num_idle_workers <
+ min_spare_background_workers)))
+ {
+ if (can_launch && (score > max_score))
+ {
+ launch_dboid = codb->codb_dboid;
+ max_score = score;
+ }
+ }
+
+ /*
+ * If we are above limit, we fetch an idle worker from the list
+ * and mark it as terminatable. Actual termination happens in
+ * the following invocation, see above.
+ */
+ if ((terminatable_worker == NULL) &&
+ (codb->codb_num_idle_workers > max_spare_background_workers))
+ terminatable_worker = get_idle_worker(codb);
+ }
+ LWLockRelease(CoordinatorDatabasesLock);
+
+ if (!worker_slots_available && idle_workers_required > 0)
+ {
+ elog(WARNING, "Coordinator: no more background workers available, but requiring %d more, according to min_spare_background_workers.",
+ idle_workers_required);
+ }
+
+ if (!worker_slots_available && job_workers_required > 0)
+ {
+ elog(WARNING, "Coordinator: no background workers avalibale, but %d databases have background jobs pending.",
+ job_workers_required);
+ }
+
+ /* request a worker for the first database found, which needs one */
+ if (OidIsValid(launch_dboid))
+ do_start_worker(launch_dboid);
+ }
+
+ bool
+ CoordinatorCanLaunchWorker(TimestampTz current_time)
+ {
+ bool can_launch;
+
+ /* FIXME: indentation */
+ {
+
+ /*
+ * There are some conditions that we need to check before trying to
+ * start a launcher. First, we need to make sure that there is a
+ * launcher slot available. Second, we need to make sure that no
+ * other worker failed while starting up.
+ */
+
+ LWLockAcquire(WorkerInfoLock, LW_SHARED);
+
+ can_launch = (CoordinatorShmem->co_freeWorkers != NULL);
+
+ if (CoordinatorShmem->co_startingWorker != NULL)
+ {
+ int waittime;
+ WorkerInfo worker = CoordinatorShmem->co_startingWorker;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: another worker is starting...");
+ #endif
+
+ /*
+ * We can't launch another worker when another one is still
+ * starting up (or failed while doing so), so just sleep for a bit
+ * more; that worker will wake us up again as soon as it's ready.
+ * We will only wait autovacuum_naptime seconds (up to a maximum
+ * of 60 seconds) for this to happen however. Note that failure
+ * to connect to a particular database is not a problem here,
+ * because the worker removes itself from the startingWorker
+ * pointer before trying to connect. Problems detected by the
+ * postmaster (like fork() failure) are also reported and handled
+ * differently. The only problems that may cause this code to
+ * fire are errors in the earlier sections of BackgroundWorkerMain,
+ * before the worker removes the WorkerInfo from the
+ * startingWorker pointer.
+ */
+ waittime = Min(autovacuum_naptime, 60) * 1000;
+ if (TimestampDifferenceExceeds(worker->wi_launchtime, current_time,
+ waittime))
+ {
+ LWLockRelease(WorkerInfoLock);
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+
+ /*
+ * No other process can put a worker in starting mode, so if
+ * startingWorker is still INVALID after exchanging our lock,
+ * we assume it's the same one we saw above (so we don't
+ * recheck the launch time).
+ */
+ if (CoordinatorShmem->co_startingWorker != NULL)
+ {
+ worker = CoordinatorShmem->co_startingWorker;
+ worker->wi_dboid = InvalidOid;
+ worker->wi_tableoid = InvalidOid;
+ worker->wi_backend_id = InvalidBackendId;
+ worker->wi_launchtime = 0;
+ worker->wi_links.next = (SHM_QUEUE *) CoordinatorShmem->co_freeWorkers;
+ CoordinatorShmem->co_freeWorkers = worker;
+ CoordinatorShmem->co_startingWorker = NULL;
+ elog(WARNING, "worker took too long to start; cancelled");
+ }
+ }
+ else
+ can_launch = false;
+ }
+ LWLockRelease(WorkerInfoLock); /* either shared or exclusive */
+ }
+
+ return can_launch;
+ }
+
+
+ /*
+ * do_start_worker
+ *
+ * Bare-bones procedure for starting a background worker from the
+ * coordinator. It sets up shared memory stuff and signals the postmaster to
+ * start a worker.
+ */
+ void
+ do_start_worker(Oid dboid)
+ {
+ WorkerInfo worker;
+
+ Assert(OidIsValid(dboid));
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG3, "Coordinator: requesting worker for database %d.", dboid);
+ #endif
+
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+
+ /*
+ * Get a worker entry from the freelist. We checked above, so there
+ * really should be a free slot -- complain very loudly if there
+ * isn't.
+ */
+ worker = CoordinatorShmem->co_freeWorkers;
+ if (worker == NULL)
+ elog(FATAL, "no free worker found");
+
+ CoordinatorShmem->co_freeWorkers = (WorkerInfo) worker->wi_links.next;
+
+ worker->wi_dboid = dboid;
+ worker->wi_backend_id = InvalidBackendId;
+ worker->wi_launchtime = GetCurrentTimestamp();
+
+ CoordinatorShmem->co_startingWorker = worker;
+
+ LWLockRelease(WorkerInfoLock);
+
+ SendPostmasterSignal(PMSIGNAL_START_BGWORKER);
+ }
+
+ /* SIGHUP: set flag to re-read config file at next convenient time */
+ static void
+ avl_sighup_handler(SIGNAL_ARGS)
+ {
+ got_SIGHUP = true;
+ }
+
+ /* SIGUSR2: postmaster failed to fork a worker for us */
+ static void
+ avl_sigusr2_handler(SIGNAL_ARGS)
+ {
+ got_SIGUSR2 = true;
+ }
+
+ /* SIGTERM: time to die */
+ static void
+ avl_sigterm_handler(SIGNAL_ARGS)
+ {
+ got_SIGTERM = true;
+ }
+
+
+ /********************************************************************
+ * AUTOVACUUM WORKER CODE
+ ********************************************************************/
+
+ #ifdef EXEC_BACKEND
+ /*
+ * forkexec routines for background workers.
+ *
+ * Format up the arglist, then fork and exec.
+ */
+ static pid_t
+ bgworker_forkexec(void)
+ {
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkbgworker";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+ }
+
+ /*
+ * We need this set from the outside, before InitProcess is called
+ */
+ void
+ BackgroundWorkerIAm(void)
+ {
+ am_background_worker = true;
+ }
+ #endif
+
+ /*
+ * Main entry point for a background worker process.
+ *
+ * This code is heavily based on pgarch.c, q.v.
+ */
+ int
+ StartBackgroundWorker(void)
+ {
+ pid_t worker_pid;
+
+ #ifdef EXEC_BACKEND
+ switch ((worker_pid = bgworker_forkexec()))
+ #else
+ switch ((worker_pid = fork_process()))
+ #endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork background worker process: %m")));
+ return 0;
+
+ #ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ /* Lose the postmaster's on-exit routines */
+ on_exit_reset();
+
+ BackgroundWorkerMain(0, NULL);
+ break;
+ #endif
+ default:
+ return (int) worker_pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+ }
+
+ /*
+ * add_as_idle_worker
+ *
+ * Marks the current worker as idle by adding it to the database's list of
+ * idle worker backends. The caller is expected to hold the WorkerInfoLock.
+ */
+ static void
+ add_as_idle_worker(Oid dbid, bool inc_connected_count)
+ {
+ co_database *codb;
+
+ Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
+
+ /* Lookup the corresponding database, or create an entry for it */
+ LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ codb = get_co_database(dbid);
+
+ if (inc_connected_count)
+ codb->codb_num_connected_workers++;
+
+ /* add as an idle worker */
+ SHMQueueInsertBefore(&codb->codb_idle_workers, &MyWorkerInfo->wi_links);
+ codb->codb_num_idle_workers++;
+
+ LWLockRelease(CoordinatorDatabasesLock);
+ }
+
+ /*
+ * bgworker_job_initialize
+ *
+ * Initializes the memory contexts for a background job.
+ */
+ void
+ bgworker_job_initialize(worker_state new_state)
+ {
+ /*
+ * Note that the coordinator is responsible for dequeuing the worker from
+ * the list of idle backends, but is shall *NOT* assign a worker state,
+ * we do that from the worker exclusively.
+ */
+ Assert(SHMQueueIsDetached(&MyWorkerInfo->wi_links));
+ Assert(MyWorkerInfo->wi_state == WS_IDLE);
+
+ MyWorkerInfo->wi_state = new_state;
+ switch (new_state)
+ {
+ case WS_IDLE:
+ Assert(false); /* use bgworker_job_completed instead */
+ break;
+ case WS_AUTOVACUUM:
+ set_ps_display("bg worker: autovacuum", false);
+ break;
+ default:
+ set_ps_display("bg worker: unknown", false);
+ }
+
+ /*
+ * StartTransactionCommand and CommitTransactionCommand will
+ * automatically switch to other contexts. None the less we need this
+ * one for other book-keeping of the various background jobs across
+ * transactions, for example to keep the list of relations to vacuum.
+ */
+ Assert(BgWorkerMemCxt == NULL);
+ BgWorkerMemCxt = AllocSetContextCreate(TopMemoryContext,
+ "Background Worker",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ MessageContext = AllocSetContextCreate(TopMemoryContext,
+ "MessageContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ MemoryContextSwitchTo(BgWorkerMemCxt);
+ }
+
+ /*
+ * bgworker_job_completed
+ *
+ * Cleans up the memory contexts used for the worker's current job and
+ * informs the coordinator.
+ */
+ void
+ bgworker_job_completed(void)
+ {
+ /* Notify the coordinator of the job completion. */
+ #ifdef COORDINATOR_DEBUG
+ ereport(DEBUG3,
+ (errmsg("bg worker [%d]: job completed.", MyProcPid)));
+ #endif
+
+ /* reset the worker state */
+ bgworker_reset();
+ }
+
+ void
+ bgworker_reset(void)
+ {
+ BackendId CoordinatorId;
+ IMessage *msg;
+
+ elog(DEBUG5, "bg worker [%d/%d]: resetting",
+ MyProcPid, MyBackendId);
+
+ Assert(MyWorkerInfo->wi_state != WS_IDLE);
+ Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
+
+ /* reset the worker state */
+ MyWorkerInfo->wi_state = WS_IDLE;
+ set_ps_display("bg worker: idle", false);
+
+ /* clean up memory contexts */
+ Assert(BgWorkerMemCxt);
+ MemoryContextSwitchTo(TopMemoryContext);
+ MemoryContextDelete(BgWorkerMemCxt);
+ BgWorkerMemCxt = NULL;
+ MemoryContextDelete(MessageContext);
+ MessageContext = NULL;
+
+ /* Reset the process-local cleanup handler state. */
+ BgWorkerCleanupInProgress = false;
+
+ /* propagate as idle worker, inform the coordinator */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+ add_as_idle_worker(MyDatabaseId, false);
+ LWLockRelease(WorkerInfoLock);
+
+ CoordinatorId = GetCoordinatorId();
+ if (CoordinatorId != InvalidBackendId)
+ {
+ msg = IMessageCreate(IMSGT_READY, 0);
+ IMessageActivate(msg, CoordinatorId);
+ }
+ else
+ elog(WARNING, "bg worker [%d/%d]: no coordinator?!?",
+ MyProcPid, MyBackendId);
+ }
+
+ void
+ bgworker_job_failed(int errcode)
+ {
+ TransactionId xid;
+
+ xid = GetTopTransactionIdIfAny();
+
+ #ifdef COORDINATOR_DEBUG
+ ereport(DEBUG3,
+ (errmsg("bg worker [%d/%d]: job failed (xid: %d)",
+ MyProcPid, MyBackendId, xid)));
+ #endif
+
+ /*
+ * Abort any transaction that might still be running and tell the
+ * coordinator that we are ready to process the next background job.
+ */
+ AbortOutOfAnyTransaction();
+
+ /*
+ * Flush the error state.
+ */
+ FlushErrorState();
+
+ /*
+ * Make sure pgstat also considers our stat data as gone.
+ */
+ pgstat_clear_snapshot();
+
+ Assert(!IMessageCheck());
+ }
+
+ /*
+ * BackgroundWorkerMain
+ */
+ NON_EXEC_STATIC void
+ BackgroundWorkerMain(int argc, char *argv[])
+ {
+ sigjmp_buf local_sigjmp_buf;
+ BackendId coordinator_id;
+ IMessage *msg;
+ Oid dbid;
+ char dbname[NAMEDATALEN];
+ bool terminate_worker = false;
+
+ /* we are a postmaster subprocess now */
+ IsUnderPostmaster = true;
+ am_background_worker = true;
+
+ /* reset MyProcPid */
+ MyProcPid = getpid();
+
+ /* record Start Time for logging */
+ MyStartTime = time(NULL);
+
+ /* Identify myself via ps */
+ init_ps_display("background worker process", "", "", "");
+
+ SetProcessingMode(InitProcessing);
+
+ /*
+ * If possible, make this process a group leader, so that the postmaster
+ * can signal any child processes too. (autovacuum probably never has any
+ * child processes, but for consistency we make all postmaster child
+ * processes do this.)
+ */
+ #ifdef HAVE_SETSID
+ if (setsid() < 0)
+ elog(FATAL, "setsid() failed: %m");
+ #endif
+
+ /*
+ * Set up signal handlers. We operate on databases much like a regular
+ * backend, so we use the same signal handling. See equivalent code in
+ * tcop/postgres.c.
+ *
+ * Currently, we don't pay attention to postgresql.conf changes that
+ * happen during a single daemon iteration, so we can ignore SIGHUP.
+ */
+ pqsignal(SIGHUP, SIG_IGN);
+
+ /*
+ * SIGINT is used to signal cancelling the current table's vacuum; SIGTERM
+ * means abort and exit cleanly, and SIGQUIT means abandon ship.
+ */
+ pqsignal(SIGINT, StatementCancelHandler);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGQUIT, quickdie);
+ pqsignal(SIGALRM, handle_sig_alarm);
+
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /* Early initialization */
+ BaseInit();
+
+ /*
+ * Create a per-backend PGPROC struct in shared memory, except in the
+ * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
+ * this before we can use LWLocks (and in the EXEC_BACKEND case we already
+ * had to do some stuff with LWLocks).
+ */
+ #ifndef EXEC_BACKEND
+ InitProcess();
+ #endif
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * See notes in postgres.c about the design of this coding.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /*
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
+ */
+ proc_exit(0);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ PG_SETMASK(&UnBlockSig);
+
+ /*
+ * Force zero_damaged_pages OFF in the background worker, even if it is
+ * set in postgresql.conf. We don't really want such a dangerous option
+ * being applied non-interactively.
+ */
+ SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
+ * Force statement_timeout to zero to avoid a timeout setting from
+ * preventing regular maintenance from being executed.
+ */
+ SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
+ * Get the info about the database we're going to work on.
+ */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+
+ /*
+ * beware of startingWorker being INVALID; this should normally not
+ * happen, but if a worker fails after forking and before this, the
+ * launcher might have decided to remove it from the queue and start
+ * again.
+ */
+ if (CoordinatorShmem->co_startingWorker == NULL)
+ {
+ /* no worker entry for me, go away */
+ elog(WARNING, "background worker started without a worker entry");
+ LWLockRelease(WorkerInfoLock);
+ proc_exit(0);
+ }
+
+ MyWorkerInfo = CoordinatorShmem->co_startingWorker;
+ dbid = MyWorkerInfo->wi_dboid;
+
+ /* FIXME: indentation */
+ {
+
+ /*
+ * remove from the "starting" pointer, so that the launcher can start
+ * a new worker if required
+ */
+ CoordinatorShmem->co_startingWorker = NULL;
+
+ coordinator_id = CoordinatorShmem->co_coordinatorid;
+ LWLockRelease(WorkerInfoLock);
+
+ on_shmem_exit(FreeWorkerInfo, 0);
+
+ /*
+ * Report autovac startup to the stats collector. We deliberately do
+ * this before InitPostgres, so that the last_autovac_time will get
+ * updated even if the connection attempt fails. This is to prevent
+ * autovac from getting "stuck" repeatedly selecting an unopenable
+ * database, rather than making any progress on stuff it can connect
+ * to.
+ */
+ pgstat_report_autovac(dbid);
+
+ /*
+ * Connect to the selected database
+ *
+ * Note: if we have selected a just-deleted database (due to using
+ * stale stats info), we'll fail and exit here.
+ */
+ InitPostgres(NULL, dbid, NULL, dbname);
+ SetProcessingMode(NormalProcessing);
+ set_ps_display("bg worker: idle", false);
+ }
+
+ BgWorkerMemCxt = NULL;
+
+ MyWorkerInfo->wi_backend_id = MyBackendId;
+ MyWorkerInfo->wi_state = WS_IDLE;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG3, "bg worker [%d/%d]: connected to database %d",
+ MyProcPid, MyBackendId, dbid);
+ #endif
+
+ /*
+ * Add as an idle worker and notify the coordinator only *after* having
+ * set MyProc->databaseId in InitPostgres, so the coordinator can
+ * determine which database we are connected to.
+ */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+ add_as_idle_worker(dbid, true);
+ LWLockRelease(WorkerInfoLock);
+
+ /* register with the coordinator */
+ if (coordinator_id != InvalidBackendId)
+ {
+ msg = IMessageCreate(IMSGT_REGISTER_WORKER, 0);
+ IMessageActivate(msg, coordinator_id);
+ }
+
+ if (PostAuthDelay)
+ pg_usleep(PostAuthDelay * 1000000L);
+
+ while (!terminate_worker)
+ {
+ PG_TRY();
+ {
+ /* FIXME: indentation */
+
+ CHECK_FOR_INTERRUPTS();
+
+ ImmediateInterruptOK = true;
+ pg_usleep(1000000L);
+ ImmediateInterruptOK = false;
+
+ /*
+ * FIXME: check different ways of terminating a background worker
+ * via ProcDiePending. How about postmaster initiated
+ * restarts?
+ */
+ if (ProcDiePending)
+ elog(FATAL, "bg worker [%d/%d]: Terminated via ProcDie",
+ MyProcPid, MyBackendId);
+
+ while ((msg = IMessageCheck()) && !terminate_worker)
+ {
+ #ifdef COORDINATOR_DEBUG
+ ereport(DEBUG3,
+ (errmsg("bg worker [%d/%d]: received message %s of size %d "
+ "from backend id %d, db %d",
+ MyProcPid, MyBackendId,
+ decode_imessage_type(msg->type),
+ msg->size, msg->sender,
+ MyDatabaseId)));
+ #endif
+
+ switch (msg->type)
+ {
+ case IMSGT_TERM_WORKER:
+ IMessageRemove(msg);
+ terminate_worker = true;
+ break;
+
+ case IMSGT_PERFORM_VACUUM:
+ /* immediately remove the message to free shared memory */
+ IMessageRemove(msg);
+
+ bgworker_job_initialize(WS_AUTOVACUUM);
+
+ /*
+ * Add ourselves to the list of runningWorkers
+ */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+ SHMQueueInsertBefore(&CoordinatorShmem->co_runningWorkers,
+ &MyWorkerInfo->wi_links);
+ LWLockRelease(WorkerInfoLock);
+
+ /* do an appropriate amount of work */
+ do_autovacuum();
+
+ /*
+ * Remove ourselves from the list of runningWorkers and
+ * mark as available background worker.
+ */
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+ SHMQueueDelete(&MyWorkerInfo->wi_links);
+ LWLockRelease(WorkerInfoLock);
+
+ bgworker_job_completed();
+ break;
+
+ default:
+ /* keep shared memory clean */
+ IMessageRemove(msg);
+
+ ereport(WARNING,
+ (errmsg("bg worker [%d]: invalid message type "
+ "'%c' ignored",
+ MyProcPid, msg->type)));
+ }
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ }
+ PG_CATCH();
+ {
+ ErrorData *errdata;
+ MemoryContext ecxt;
+
+ ecxt = MemoryContextSwitchTo(BgWorkerMemCxt);
+ errdata = CopyErrorData();
+
+ elog(WARNING, "bg worker [%d/%d]: caught error '%s' in %s:%d, state %s",
+ MyProcPid, MyBackendId, errdata->message,
+ errdata->filename, errdata->lineno,
+ decode_worker_state(MyWorkerInfo->wi_state));
+
+ /*
+ * Inform the coordinator about the failure.
+ */
+ bgworker_job_failed(errdata->sqlerrcode);
+
+ if (errdata->sqlerrcode == ERRCODE_QUERY_CANCELED)
+ {
+ #ifdef DEBUG_CSET_APPL
+ elog(DEBUG3, "bg worker [%d/%d]: cancelled active job.",
+ MyProcPid, MyBackendId);
+ #endif
+
+ bgworker_reset();
+ }
+ else
+ {
+ elog(WARNING, "bg worker [%s:%d]: unexpected error %d: '%s'!\n"
+ " triggered from %s:%d (in %s)\n",
+ __FILE__, __LINE__, errdata->sqlerrcode,
+ errdata->message, errdata->filename, errdata->lineno,
+ errdata->funcname);
+ /* re-throw the error, so the backend quits */
+ MemoryContextSwitchTo(ecxt);
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+ }
+
+
+ /* All done, go away */
+ ereport(DEBUG1, (errmsg("bg worker [%d/%d]: terminating",
+ MyProcPid, MyBackendId)));
+ proc_exit(0);
+ }
+
+ /*
+ * Return a WorkerInfo to the free list
+ */
+ static void
+ FreeWorkerInfo(int code, Datum arg)
+ {
+ co_database *codb;
+ if (MyWorkerInfo != NULL)
+ {
+ LWLockAcquire(WorkerInfoLock, LW_EXCLUSIVE);
+
+ if (!SHMQueueIsDetached(&MyWorkerInfo->wi_links))
+ SHMQueueDelete(&MyWorkerInfo->wi_links);
+
+ MyWorkerInfo->wi_links.next = (SHM_QUEUE *) CoordinatorShmem->co_freeWorkers;
+ MyWorkerInfo->wi_dboid = InvalidOid;
+ MyWorkerInfo->wi_tableoid = InvalidOid;
+ MyWorkerInfo->wi_backend_id = InvalidBackendId;
+ MyWorkerInfo->wi_launchtime = 0;
+ MyWorkerInfo->wi_cost_delay = 0;
+ MyWorkerInfo->wi_cost_limit = 0;
+ MyWorkerInfo->wi_cost_limit_base = 0;
+ CoordinatorShmem->co_freeWorkers = MyWorkerInfo;
+ /* not mine anymore */
+ MyWorkerInfo = NULL;
+
+ /* decrease the conn count */
+ LWLockAcquire(CoordinatorDatabasesLock, LW_EXCLUSIVE);
+ codb = hash_search(co_databases, &MyDatabaseId, HASH_FIND, NULL);
+ Assert(codb);
+ codb->codb_num_connected_workers--;
+ LWLockRelease(CoordinatorDatabasesLock);
+
+ LWLockRelease(WorkerInfoLock);
+ }
+ }
+
+ /*
+ * get_database_list
+ * Return a list of all databases found in pg_database.
+ *
+ * Note: this is the only function in which the coordinator uses a
+ * transaction. Although we aren't attached to any particular database and
+ * therefore can't access most catalogs, we do have enough infrastructure
+ * to do a seqscan on pg_database.
+ */
+ List *
+ get_database_list(void)
+ {
+ List *dblist = NIL;
+ Relation rel;
+ HeapScanDesc scan;
+ HeapTuple tup;
+
+ /*
+ * Start a transaction so we can access pg_database, and get a snapshot.
+ * We don't have a use for the snapshot itself, but we're interested in
+ * the secondary effect that it sets RecentGlobalXmin. (This is critical
+ * for anything that reads heap pages, because HOT may decide to prune
+ * them even if the process doesn't attempt to modify any tuples.)
+ */
+ StartTransactionCommand();
+ (void) GetTransactionSnapshot();
+
+ /* Allocate our results in CoordinatorMemCxt, not transaction context */
+ MemoryContextSwitchTo(CoordinatorMemCxt);
+
+ rel = heap_open(DatabaseRelationId, AccessShareLock);
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+
+ while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+ {
+ Form_pg_database pgdatabase = (Form_pg_database) GETSTRUCT(tup);
+ avw_dbase *avdb;
+
+ avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
+
+ avdb->adw_datid = HeapTupleGetOid(tup);
+ avdb->adw_name = pstrdup(NameStr(pgdatabase->datname));
+ avdb->adw_frozenxid = pgdatabase->datfrozenxid;
+ /* this gets set later: */
+ avdb->adw_entry = NULL;
+
+ dblist = lappend(dblist, avdb);
+ }
+
+ heap_endscan(scan);
+ heap_close(rel, AccessShareLock);
+
+ CommitTransactionCommand();
+
+ return dblist;
+ }
+
+ /*
+ * process identification functions
+ * Return whether this is either a coordinator process or a background
+ * worker process.
+ */
+ bool
+ IsCoordinatorProcess(void)
+ {
+ return am_coordinator;
+ }
+
+ bool
+ IsBackgroundWorkerProcess(void)
+ {
+ return am_background_worker;
+ }
+
+ /*
+ * GetCoordinatorId
+ * Returns the backendId of the currently active coordinator process.
+ */
+ BackendId
+ GetCoordinatorId(void)
+ {
+ BackendId CoordinatorId;
+
+ LWLockAcquire(WorkerInfoLock, LW_SHARED);
+ CoordinatorId = CoordinatorShmem->co_coordinatorid;
+ LWLockRelease(WorkerInfoLock);
+
+ return CoordinatorId;
+ }
+
+ /*
+ * CoordinatorShmemSize
+ * Compute space needed for autovacuum-related shared memory
+ */
+ Size
+ CoordinatorShmemSize(void)
+ {
+ Size size;
+
+ /*
+ * Need the fixed struct and the array of WorkerInfoData, plus per
+ * database entries in a hash. As we only track databases which have at
+ * least one worker attached, we won't ever need more than
+ * max_background_workers entries.
+ */
+ size = sizeof(CoordinatorShmemStruct);
+ size = MAXALIGN(size);
+ size = add_size(size, mul_size(max_background_workers,
+ sizeof(WorkerInfoData)));
+ size = add_size(size, hash_estimate_size(max_background_workers,
+ sizeof(co_database)));
+ return size;
+ }
+
+ /*
+ * CoordinatorShmemInit
+ * Allocate and initialize autovacuum-related shared memory
+ */
+ void
+ CoordinatorShmemInit(void)
+ {
+ HASHCTL hctl;
+ bool found;
+
+ CoordinatorShmem = (CoordinatorShmemStruct *)
+ ShmemInitStruct("Background Worker Data",
+ CoordinatorShmemSize(),
+ &found);
+
+ if (!IsUnderPostmaster)
+ {
+ WorkerInfo worker;
+ int i;
+
+ Assert(!found);
+
+ CoordinatorShmem->co_coordinatorid = InvalidBackendId;
+ CoordinatorShmem->co_freeWorkers = NULL;
+ SHMQueueInit(&CoordinatorShmem->co_runningWorkers);
+ CoordinatorShmem->co_startingWorker = NULL;
+
+ worker = (WorkerInfo) ((char *) CoordinatorShmem +
+ MAXALIGN(sizeof(CoordinatorShmemStruct)));
+
+ /* initialize the WorkerInfo free list */
+ for (i = 0; i < max_background_workers; i++)
+ {
+ worker[i].wi_links.next = (SHM_QUEUE *) CoordinatorShmem->co_freeWorkers;
+ CoordinatorShmem->co_freeWorkers = &worker[i];
+ }
+ }
+ else
+ Assert(found);
+
+ hctl.keysize = sizeof(Oid);
+ hctl.entrysize = sizeof(co_database);
+ hctl.hash = oid_hash;
+ co_databases = ShmemInitHash("Coordinator Database Info",
+ max_background_workers,
+ max_background_workers,
+ &hctl,
+ HASH_ELEM | HASH_FUNCTION);
+ }
+
+
============================================================
*** /dev/null
--- src/include/postmaster/coordinator.h 77a64f4b18c483a00779f8d5922066d8aa6fc156
***************
*** 1,0 ****
--- 1,187 ----
+ /*-------------------------------------------------------------------------
+ *
+ * coordinator.h
+ * header file for the coordinator
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef COORDINATOR_H
+ #define COORDINATOR_H
+
+ #include "pgstat.h"
+ #include "lib/dllist.h"
+ #include "utils/palloc.h"
+ #include "storage/imsg.h"
+ #include "storage/lock.h"
+
+ /*
+ * Valid backend states for background workers.
+ */
+ typedef enum
+ {
+ WS_IDLE = 'I',
+
+ WS_AUTOVACUUM = 'V',
+
+ } worker_state;
+
+ #define IsIdleWorker(wi) (IsBackgroundWorkerProcess() && (wi->wi_state == WS_IDLE))
+ #define IsAutoVacuumWorker(wi) (IsBackgroundWorkerProcess() && (wi->wi_state == WS_AUTOVACUUM))
+
+
+ /*-------------
+ * This struct holds information about a single worker's whereabouts. We keep
+ * an array of these in shared memory, sized according to
+ * max_background_workers.
+ *
+ * wi_links entry into free list or running list
+ * wi_dboid OID of the database this worker is supposed to work on
+ * wi_proc pointer to PGPROC of the running worker, NULL if not started
+ * wi_launchtime Time at which this worker was launched
+ *
+ * wi_tableoid OID of the table currently being vacuumed
+ * wi_cost_* Vacuum cost-based delay parameters current in this worker
+ *
+ * All fields are protected by WorkerInfoLock, except for wi_tableoid which is
+ * protected by WorkerScheduleLock (which is read-only for everyone except
+ * that worker itself).
+ *-------------
+ */
+ typedef struct WorkerInfoData
+ {
+ SHM_QUEUE wi_links;
+ Oid wi_dboid;
+ BackendId wi_backend_id;
+ TimestampTz wi_launchtime;
+ worker_state wi_state;
+
+ /* autovacuum specific fields */
+ Oid wi_tableoid;
+ int wi_cost_delay;
+ int wi_cost_limit;
+ int wi_cost_limit_base;
+ } WorkerInfoData;
+
+ typedef struct WorkerInfoData *WorkerInfo;
+
+ /*-------------
+ * The main background worker shmem struct. On shared memory we store this
+ * main struct and the array of WorkerInfo structs. This struct keeps:
+ *
+ * co_coordinatorid the PID of the coordinator
+ * co_freeWorkers the WorkerInfo freelist
+ * co_runningWorkers the WorkerInfo non-free queue
+ * co_startingWorker pointer to WorkerInfo currently being started
+ * (cleared by the worker itself as soon as it's up and
+ * running)
+ *
+ * This struct is protected by WorkerInfoLock, except for parts of the worker
+ * list (see above).
+ *-------------
+ */
+ typedef struct
+ {
+ pid_t co_coordinatorid;
+ WorkerInfo co_freeWorkers;
+ SHM_QUEUE co_runningWorkers;
+ WorkerInfo co_startingWorker;
+ } CoordinatorShmemStruct;
+
+ extern CoordinatorShmemStruct *CoordinatorShmem;
+
+ /* struct to keep track of databases in launcher */
+ typedef struct avl_dbase
+ {
+ Oid adl_datid; /* hash key -- must be first */
+ TimestampTz adl_next_worker;
+ int adl_score;
+ } avl_dbase;
+
+ /* struct to keep track of databases in worker */
+ typedef struct avw_dbase
+ {
+ Oid adw_datid; /* hash key -- must be first */
+ char *adw_name;
+ TransactionId adw_frozenxid;
+ PgStat_StatDBEntry *adw_entry;
+ } avw_dbase;
+
+ /* struct to keep track of databases in the coordinator */
+ typedef struct co_database
+ {
+ Oid codb_dboid;
+
+ /* for internal use by the coordinator */
+ int codb_num_cached_jobs;
+ Dllist codb_cached_jobs;
+
+ /* tracking of idle workers, shared */
+ int codb_num_idle_workers;
+ SHM_QUEUE codb_idle_workers;
+
+ /* tracking of connected workers, shared */
+ int codb_num_connected_workers;
+ } co_database;
+
+
+ /* GUC variables */
+ extern int max_background_workers;
+ extern int min_spare_background_workers;
+ extern int max_spare_background_workers;
+
+ extern HTAB *co_databases;
+
+ /*
+ * The database list in the coordinator, and the context that contains it,
+ * for use by autovacuum
+ */
+ extern Dllist *DatabaseList;
+ extern MemoryContext DatabaseListCxt;
+
+ extern MemoryContext CoordinatorMemCxt;
+ extern MemoryContext BgWorkerMemCxt;
+
+ extern WorkerInfo MyWorkerInfo;
+
+ extern char *decode_worker_state(worker_state state);
+ extern co_database *get_co_database(Oid dboid);
+
+ /* Status inquiry functions */
+ extern bool IsCoordinatorProcess(void);
+ extern bool IsBackgroundWorkerProcess(void);
+ extern BackendId GetCoordinatorId(void);
+ extern bool CoordinatorCanLaunchWorker(TimestampTz current_time);
+
+ extern void dispatch_job(IMessage *msg, co_database *codb);
+
+ /* Process startup functions */
+ extern int StartCoordinator(void);
+ extern int StartBackgroundWorker(void);
+
+ #ifdef EXEC_BACKEND
+ extern void CoordinatorMain(int argc, char *argv[]);
+ extern void BackgroundWorkerMain(int argc, char *argv[]);
+ extern void BackgroundWorkerIAm(void);
+ extern void CoordinatorIAm(void);
+ #endif
+
+ /* shared memory stuff */
+ extern Size CoordinatorShmemSize(void);
+ extern void CoordinatorShmemInit(void);
+
+ /* various query functions */
+ extern List *get_database_list(void);
+
+ /* background worker control */
+ extern void bgworker_job_initialize(worker_state new_state);
+ extern void bgworker_job_completed(void);
+ extern void bgworker_job_failed(int errcode);
+ extern void bgworker_reset(void);
+
+ #endif /* COORDINATOR_H */