bg worker: patch 3 of 6 - sockets
Started by Markus Wannerover 15 years ago1 messages
This patch adds the capability for the coordinator to listen on sockets
while waiting for imessages to arrive. Before the coordinator just slept
until a signal arrives, notifying the coordinator about an internal message.
Major caveat here: I'm using pselect(), which might still not be
portable enough. The work-around for platforms on which a signal doesn't
interrupt select has been removed as well. I can't think of any way to
support platforms as broken as that.
Attachments:
step3-sockets.difftext/x-diff; charset=iso-8859-1; name=step3-sockets.diffDownload
*** src/backend/postmaster/autovacuum.c 50ad2e93982867e91a47e8ca7af5f4be8b975d8f
--- src/backend/postmaster/autovacuum.c ca02f9e08a1dbe34ff8049c0d51cc76594fb16a0
*************** static void launcher_determine_sleep(boo
*** 264,270 ****
static void do_start_worker(Oid dboid);
static Oid autovacuum_select_database(void);
static void launcher_determine_sleep(bool can_launch, bool recursing,
! struct timeval *nap);
static void autovacuum_update_timing(Oid dbid, TimestampTz now);
static List *get_database_list(void);
static void rebuild_database_list(Oid newdb);
--- 264,270 ----
static void do_start_worker(Oid dboid);
static Oid autovacuum_select_database(void);
static void launcher_determine_sleep(bool can_launch, bool recursing,
! struct timespec *nap);
static void autovacuum_update_timing(Oid dbid, TimestampTz now);
static List *get_database_list(void);
static void rebuild_database_list(Oid newdb);
*************** AutoVacLauncherMain(int argc, char *argv
*** 748,754 ****
for (;;)
{
TimestampTz current_time;
! struct timeval nap;
/*
* Emergency bailout if postmaster has died. This is to avoid the
--- 748,758 ----
for (;;)
{
TimestampTz current_time;
! struct timespec nap;
! sigset_t sigmask, oldmask;
! fd_set socks;
! int max_sock_id;
! bool socket_ready;
/*
* Emergency bailout if postmaster has died. This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 760,802 ****
can_launch = (AutoVacuumShmem->av_freeWorkers != NULL);
launcher_determine_sleep(can_launch, false, &nap);
/* Allow sinval catchup interrupts while sleeping */
EnableCatchupInterrupt();
/*
! * Sleep for a while according to schedule.
*
! * On some platforms, signals won't interrupt the sleep. To ensure we
! * respond reasonably promptly when someone signals us, break down the
! * sleep into 1-second increments, and check for interrupts after each
! * nap.
*/
! while (nap.tv_sec > 0 || nap.tv_usec > 0)
{
- uint32 sleeptime;
! if (nap.tv_sec > 0)
{
! sleeptime = 1000000;
! nap.tv_sec--;
}
else
! {
! sleeptime = nap.tv_usec;
! nap.tv_usec = 0;
! }
! pg_usleep(sleeptime);
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive(true))
proc_exit(1);
-
- msg = IMessageCheck();
- if (got_SIGTERM || got_SIGHUP || got_SIGUSR2 || msg)
- break;
}
DisableCatchupInterrupt();
--- 764,834 ----
can_launch = (AutoVacuumShmem->av_freeWorkers != NULL);
launcher_determine_sleep(can_launch, false, &nap);
+ /* Initialize variables for listening on sockets */
+ FD_ZERO(&socks);
+ max_sock_id = 0;
+ socket_ready = false;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG1, "Coordinator: listening...");
+ #endif
+
/* Allow sinval catchup interrupts while sleeping */
EnableCatchupInterrupt();
/*
! * Sleep for a while according to schedule - and possibly interrupted
! * by messages from one of the sockets or by internal messages from
! * background workers or normal backends.
*
! * Using pselect here prevents the possible loss of a singnal in
! * between the last check for imessages and following select call.
! * However, it requires a newish platform that supports pselect.
! *
! * On some platforms, signals won't interrupt select. Postgres used
! * to split the nap time into one second intervals to ensure to react
! * reasonably promptly for autovacuum purposes. However, for
! * Postgres-R this is not tolerable, so that mechanism has been
! * removed.
! *
! * FIXME: to support these platforms or others that don't implement
! * pselect properly, another work-around like for example the
! * self-pipe trick needs to be implemented. On Windows, we
! * could implement pselect based on the current port's select
! * method.
*/
!
! /* FIXME: indentation */
{
! sigemptyset(&sigmask);
! sigaddset(&sigmask, SIGINT);
! sigaddset(&sigmask, SIGHUP);
! sigaddset(&sigmask, SIGUSR2);
! sigprocmask(SIG_BLOCK, &sigmask, &oldmask);
!
! sigemptyset(&sigmask);
!
! if (pselect(max_sock_id + 1, &socks, NULL, NULL, &nap,
! &sigmask) < 0)
{
! if (errno != EINTR)
! {
! elog(WARNING, "Coordinator: pselect failed: %m");
! socket_ready = true;
! }
}
else
! socket_ready = true;
+ sigprocmask(SIG_SETMASK, &oldmask, NULL);
+
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive(true))
proc_exit(1);
}
DisableCatchupInterrupt();
*************** AutoVacLauncherMain(int argc, char *argv
*** 858,865 ****
}
}
/* handle pending imessages */
! while (msg != NULL)
{
handle_imessage(msg);
msg = IMessageCheck();
--- 890,902 ----
}
}
+ /* handle sockets with pending reads, just a placeholder for now */
+ if (socket_ready)
+ {
+ }
+
/* handle pending imessages */
! while ((msg = IMessageCheck()) != NULL)
{
handle_imessage(msg);
msg = IMessageCheck();
*************** static void
*** 1308,1314 ****
* cause a long sleep, which will be interrupted when a worker exits.
*/
static void
! launcher_determine_sleep(bool can_launch, bool recursing, struct timeval *nap)
{
Dlelem *elem;
--- 1345,1351 ----
* cause a long sleep, which will be interrupted when a worker exits.
*/
static void
! launcher_determine_sleep(bool can_launch, bool recursing, struct timespec *nap)
{
Dlelem *elem;
*************** launcher_determine_sleep(bool can_launch
*** 1321,1327 ****
if (!can_launch)
{
nap->tv_sec = autovacuum_naptime;
! nap->tv_usec = 0;
}
else if ((elem = DLGetTail(DatabaseList)) != NULL)
{
--- 1358,1364 ----
if (!can_launch)
{
nap->tv_sec = autovacuum_naptime;
! nap->tv_nsec = 0;
}
else if ((elem = DLGetTail(DatabaseList)) != NULL)
{
*************** launcher_determine_sleep(bool can_launch
*** 1335,1347 ****
TimestampDifference(current_time, next_wakeup, &secs, &usecs);
nap->tv_sec = secs;
! nap->tv_usec = usecs;
}
else
{
/* list is empty, sleep for whole autovacuum_naptime seconds */
nap->tv_sec = autovacuum_naptime;
! nap->tv_usec = 0;
}
/*
--- 1372,1384 ----
TimestampDifference(current_time, next_wakeup, &secs, &usecs);
nap->tv_sec = secs;
! nap->tv_nsec = usecs * 1000;
}
else
{
/* list is empty, sleep for whole autovacuum_naptime seconds */
nap->tv_sec = autovacuum_naptime;
! nap->tv_nsec = 0;
}
/*
*************** launcher_determine_sleep(bool can_launch
*** 1354,1360 ****
* We only recurse once. rebuild_database_list should always return times
* in the future, but it seems best not to trust too much on that.
*/
! if (nap->tv_sec == 0 && nap->tv_usec == 0 && !recursing)
{
rebuild_database_list(InvalidOid);
launcher_determine_sleep(can_launch, true, nap);
--- 1391,1397 ----
* We only recurse once. rebuild_database_list should always return times
* in the future, but it seems best not to trust too much on that.
*/
! if (nap->tv_sec == 0 && nap->tv_nsec == 0 && !recursing)
{
rebuild_database_list(InvalidOid);
launcher_determine_sleep(can_launch, true, nap);
*************** launcher_determine_sleep(bool can_launch
*** 1362,1371 ****
}
/* The smallest time we'll allow the launcher to sleep. */
! if (nap->tv_sec <= 0 && nap->tv_usec <= MIN_AUTOVAC_SLEEPTIME * 1000)
{
nap->tv_sec = 0;
! nap->tv_usec = MIN_AUTOVAC_SLEEPTIME * 1000;
}
}
--- 1399,1408 ----
}
/* The smallest time we'll allow the launcher to sleep. */
! if (nap->tv_sec <= 0 && nap->tv_nsec <= MIN_AUTOVAC_SLEEPTIME * 1000000)
{
nap->tv_sec = 0;
! nap->tv_nsec = MIN_AUTOVAC_SLEEPTIME * 1000000;
}
}