[PATCH] Improve performance of NOTIFY over many databases (v2)
Hoi hackers,
Here is a reworked version of the previous patches.
The original three patches have been collapsed into one as given the
changes discussed it didn't make sense to keep them separate. There
are now two patches (the third is just to help with testing):
Patch 1: Tracks the listening backends in a list so non-listening
backends can be quickly skipped over. This is separate because it's
orthogonal to the rest of the changes and there are other ways to do
this.
Patch 2: This is the meat of the change. It implements all the
suggestions discussed:
- The queue tail is now only updated lazily, whenever the notify queue
moves to a new page. This did require a new global to track this state
through the transaction commit, but it seems worth it.
- Only backends for the current database are signalled when a
notification is made
- Slow backends are woken up one at a time rather than all at once
- A backend is allowed to lag up to 4 SLRU pages behind before being
signalled. This is a tradeoff between how often to get woken up verses
how much work to do once woken up.
- All the relevant comments have been updated to describe the new
algorithm. Locking should also be correct now.
This means in the normal case where listening backends get a
notification occasionally, no-one will ever be considered slow. An
exclusive lock for cleanup will happen about once per SLRU page.
There's still the exclusive locks on adding notifications but that's
unavoidable.
One minor issue is that pg_notification_queue_usage() will now return
a small but non-zero number (about 3e-6) even when nothing is really
going on. This could be fixed by having it take an exclusive lock
instead and updating to the latest values but that barely seems worth
it.
Performance-wise it's even better than my original patches, with about
20-25% reduction in CPU usage in my test setup (using the test script
sent previously).
Here is the log output from my postgres, where you see the signalling in action:
------
16:42:48.673 [10188] martijn@test_131 DEBUG: PreCommit_Notify
16:42:48.673 [10188] martijn@test_131 DEBUG: NOTIFY QUEUE = (74,896)...(79,0)
16:42:48.673 [10188] martijn@test_131 DEBUG: backendTryAdvanceTail -> true
16:42:48.673 [10188] martijn@test_131 DEBUG: AtCommit_Notify
16:42:48.673 [10188] martijn@test_131 DEBUG: ProcessCompletedNotifies
16:42:48.673 [10188] martijn@test_131 DEBUG: backendTryAdvanceTail -> false
16:42:48.673 [10188] martijn@test_131 DEBUG: asyncQueueAdvanceTail
16:42:48.673 [10188] martijn@test_131 DEBUG: waking backend 137 (pid 10055)
16:42:48.673 [10055] martijn@test_067 DEBUG: ProcessIncomingNotify
16:42:48.673 [10187] martijn@test_131 DEBUG: ProcessIncomingNotify
16:42:48.673 [10055] martijn@test_067 DEBUG: asyncQueueAdvanceTail
16:42:48.673 [10055] martijn@test_067 DEBUG: waking backend 138 (pid 10056)
16:42:48.673 [10187] martijn@test_131 DEBUG: ProcessIncomingNotify: done
16:42:48.673 [10055] martijn@test_067 DEBUG: ProcessIncomingNotify: done
16:42:48.673 [10056] martijn@test_067 DEBUG: ProcessIncomingNotify
16:42:48.673 [10056] martijn@test_067 DEBUG: asyncQueueAdvanceTail
16:42:48.673 [10056] martijn@test_067 DEBUG: ProcessIncomingNotify: done
16:42:48.683 [9991] martijn@test_042 DEBUG: Async_Notify(changes)
16:42:48.683 [9991] martijn@test_042 DEBUG: PreCommit_Notify
16:42:48.683 [9991] martijn@test_042 DEBUG: NOTIFY QUEUE = (75,7744)...(79,32)
16:42:48.683 [9991] martijn@test_042 DEBUG: AtCommit_Notify
-----
Have a nice weekend.
--
Martijn van Oosterhout <kleptog@gmail.com> http://svana.org/kleptog/
Attachments:
0001-Maintain-queue-of-listening-backends-to-speed-up-loo.patchtext/x-patch; charset=US-ASCII; name=0001-Maintain-queue-of-listening-backends-to-speed-up-loo.patchDownload
From 82366f1dbc0fc234fdd10dbc15519b3cf7104684 Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <oosterhout@fox-it.com>
Date: Tue, 23 Jul 2019 16:49:30 +0200
Subject: [PATCH 1/3] Maintain queue of listening backends to speed up loops
Especially the loop to advance the tail pointer is called fairly often while
holding an exclusive lock.
---
src/backend/commands/async.c | 45 +++++++++++++++++++++++++++++++++++++++-----
1 file changed, 40 insertions(+), 5 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e9c580ec6..ba0b1baecc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -217,6 +217,7 @@ typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database OID, or InvalidOid */
+ int nextListener; /* backendid of next listener, 0=last */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
@@ -247,6 +248,7 @@ typedef struct AsyncQueueControl
QueuePosition tail; /* the global tail is equivalent to the pos of
* the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
+ int firstListener; /* backendId of first listener, 0=none */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
@@ -257,8 +259,11 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
+#define QUEUE_LISTENER_NEXT(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
+
/*
* The SLRU buffer area through which we access the notification queue
*/
@@ -465,11 +470,13 @@ AsyncShmemInit(void)
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
asyncQueueControl->lastQueueFillWarn = 0;
+ QUEUE_FIRST_LISTENER = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
+ QUEUE_LISTENER_NEXT(i) = InvalidBackendId;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@@ -908,6 +915,7 @@ Exec_ListenPreCommit(void)
QueuePosition head;
QueuePosition max;
int i;
+ int prevListener;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -953,10 +961,14 @@ Exec_ListenPreCommit(void)
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
head = QUEUE_HEAD;
max = QUEUE_TAIL;
+ prevListener = 0;
if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
{
- for (i = 1; i <= MaxBackends; i++)
+ for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
{
+ /* Find last listening backend before this one */
+ if (i < MyBackendId)
+ prevListener = i;
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
}
@@ -964,6 +976,15 @@ Exec_ListenPreCommit(void)
QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
+
+ /* Insert backend into list of listeners */
+ if (prevListener == 0) {
+ QUEUE_LISTENER_NEXT(MyBackendId) = QUEUE_FIRST_LISTENER;
+ QUEUE_FIRST_LISTENER = MyBackendId;
+ } else {
+ QUEUE_LISTENER_NEXT(MyBackendId) = QUEUE_LISTENER_NEXT(prevListener);
+ QUEUE_LISTENER_NEXT(prevListener) = MyBackendId;
+ }
LWLockRelease(AsyncQueueLock);
/* Now we are listed in the global array, so remember we're listening */
@@ -1170,19 +1191,33 @@ static void
asyncQueueUnregister(void)
{
bool advanceTail;
+ int i;
Assert(listenChannels == NIL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
- LWLockAcquire(AsyncQueueLock, LW_SHARED);
+ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
/* check if entry is valid and oldest ... */
advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
+ /* and remove it from the list */
+ if (QUEUE_FIRST_LISTENER == MyBackendId)
+ QUEUE_FIRST_LISTENER = QUEUE_LISTENER_NEXT(MyBackendId);
+ else {
+ for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
+ {
+ if (QUEUE_LISTENER_NEXT(i) == MyBackendId) {
+ QUEUE_LISTENER_NEXT(i) = QUEUE_LISTENER_NEXT(QUEUE_LISTENER_NEXT(i));
+ break;
+ }
+ }
+ }
+ QUEUE_LISTENER_NEXT(MyBackendId) = InvalidBackendId;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */
@@ -1459,7 +1494,7 @@ asyncQueueFillWarning(void)
int32 minPid = InvalidPid;
int i;
- for (i = 1; i <= MaxBackends; i++)
+ for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
{
if (QUEUE_BACKEND_PID(i) != InvalidPid)
{
@@ -1519,7 +1554,7 @@ SignalBackends(void)
count = 0;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- for (i = 1; i <= MaxBackends; i++)
+ for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
{
pid = QUEUE_BACKEND_PID(i);
if (pid != InvalidPid && pid != MyProcPid)
@@ -1995,7 +2030,7 @@ asyncQueueAdvanceTail(void)
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
- for (i = 1; i <= MaxBackends; i++)
+ for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
{
if (QUEUE_BACKEND_PID(i) != InvalidPid)
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
--
2.11.0
0002-Improve-performance-of-async-notifications.patchtext/x-patch; charset=US-ASCII; name=0002-Improve-performance-of-async-notifications.patchDownload
From a6d598dd26a0bd6ea4211a63571d0c78dbf8aea0 Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <oosterhout@fox-it.com>
Date: Mon, 3 Jun 2019 17:13:31 +0200
Subject: [PATCH 2/3] Improve performance of async notifications
Advancing the tail pointer requires an exclusive lock which can block
backends from other databases, so it's worth keeping these attempts to a
minimum.
Instead of tracking the slowest backend exactly we update the queue more
lazily, only checking when we switch to a new SLRU page. Additionally,
instead of waking up every slow backend at once, we do them one at a time.
---
src/backend/commands/async.c | 145 ++++++++++++++++++++++++++++++-------------
1 file changed, 103 insertions(+), 42 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ba0b1baecc..20bed5ca71 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -73,10 +73,11 @@
* Finally, after we are out of the transaction altogether, we check if
* we need to signal listening backends. In SignalBackends() we scan the
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- * to every listening backend (we don't know which backend is listening on
- * which channel so we must signal them all). We can exclude backends that
- * are already up to date, though. We don't bother with a self-signal
- * either, but just process the queue directly.
+ * to every listening backend for the relavent database (we don't know
+ * which backend is listening on which channel so we must signal them
+ * all). We can exclude backends that are already up to date, though.
+ * We don't bother with a self-signal either, but just process the queue
+ * directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* sets the process's latch, which triggers the event to be processed
@@ -89,13 +90,25 @@
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
- * the head pointer's position. Then we check if we were the laziest
- * backend: if our pointer is set to the same position as the global tail
- * pointer is set, then we move the global tail pointer ahead to where the
- * second-laziest backend is (in general, we take the MIN of the current
- * head position and all active backends' new tail pointers). Whenever we
- * move the global tail pointer we also truncate now-unused pages (i.e.,
- * delete files in pg_notify/ that are no longer used).
+ * the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and minimize disk space the tail pointer
+ * needs to be advanced so that old pages can be truncated. This
+ * however requires an exclusive lock and as such should be done
+ * infrequently.
+ *
+ * When a new notification is added, the writer checks to see if the
+ * tail pointer is more than QUEUE_CLEANUP_DELAY pages behind. If
+ * so, it attempts to advance the tail, and if there are slow
+ * backends (perhaps because all the notifications were for other
+ * databases), wake one of them up by sending a signal.
+ *
+ * When the slow backend processes the queue it notes it was behind
+ * and so also tries to advance the tail, possibly waking up another
+ * slow backend. Eventually all backends will have processed the
+ * queue and the global tail pointer is move to a new page and we
+ * also truncate now-unused pages (i.e., delete files in pg_notify/
+ * that are no longer used).
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -210,6 +223,12 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* how many pages does a backend need to be behind before it needs to be signalled */
+#define QUEUE_CLEANUP_DELAY 4
+
+/* is a backend so far behind it needs to be signalled? */
+#define QUEUE_SLOW_BACKEND(i) \
+ (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY)
/*
* Struct describing a listening backend's status
*/
@@ -245,7 +264,7 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the pos of
+ QueuePosition tail; /* the global tail is some place older than the
* the "slowest" backend */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
int firstListener; /* backendId of first listener, 0=none */
@@ -370,10 +389,15 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
+/* has this backend switched to new page, and so should attempt to advance
+ * the queue tail? */
+static bool backendTryAdvanceTail = false;
+
/* GUC parameter */
bool Trace_notify = false;
/* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
@@ -389,7 +413,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static bool SignalMyDBBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
@@ -403,8 +427,8 @@ static void ClearPendingActionsAndNotifies(void);
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
*/
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
{
int diff;
@@ -420,7 +444,13 @@ asyncQueuePagePrecedes(int p, int q)
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
- return diff < 0;
+ return diff;
+}
+
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+ return asyncQueuePageDiff(p, q) < 0;
}
/*
@@ -850,6 +880,12 @@ PreCommit_Notify(void)
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+
+ /* If we are advancing to a new page, remember this so after the
+ * transaction commits we can attempt to advance the tail
+ * pointer, see ProcessCompletedNotifies() */
+ if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+ backendTryAdvanceTail = true;
LWLockRelease(AsyncQueueLock);
}
}
@@ -999,8 +1035,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend. Also, although our transaction might
* have executed NOTIFY, those message(s) aren't queued yet so we can't
* see them in the queue.
- *
- * This will also advance the global tail pointer if possible.
*/
if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications();
@@ -1133,7 +1167,7 @@ ProcessCompletedNotifies(void)
StartTransactionCommand();
/* Send signals to other backends */
- signalled = SignalBackends();
+ signalled = SignalMyDBBackends();
if (listenChannels != NIL)
{
@@ -1151,6 +1185,16 @@ ProcessCompletedNotifies(void)
* harmless.)
*/
asyncQueueAdvanceTail();
+ backendTryAdvanceTail = false;
+ }
+
+ if (backendTryAdvanceTail)
+ {
+ /* We switched to a new page while writing our notifies to the
+ * queue, so we try to advance the tail ourselves, possibly waking
+ * up another backend if it is running behind */
+ backendTryAdvanceTail = false;
+ asyncQueueAdvanceTail();
}
CommitTransactionCommand();
@@ -1190,7 +1234,6 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- bool advanceTail;
int i;
Assert(listenChannels == NIL); /* else caller error */
@@ -1199,10 +1242,7 @@ asyncQueueUnregister(void)
return;
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- /* check if entry is valid and oldest ... */
- advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
- QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
- /* ... then mark it invalid */
+ /* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
/* and remove it from the list */
@@ -1222,10 +1262,6 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
-
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
}
/*
@@ -1518,7 +1554,7 @@ asyncQueueFillWarning(void)
}
/*
- * Send signals to all listening backends (except our own).
+ * Send signals to all listening backends (except our own) for our database.
*
* Returns true if we sent at least one signal.
*
@@ -1531,7 +1567,7 @@ asyncQueueFillWarning(void)
* Since we know the BackendId and the Pid the signalling is quite cheap.
*/
static bool
-SignalBackends(void)
+SignalMyDBBackends(void)
{
bool signalled = false;
int32 *pids;
@@ -1541,9 +1577,9 @@ SignalBackends(void)
int32 pid;
/*
- * Identify all backends that are listening and not already up-to-date. We
- * don't want to send signals while holding the AsyncQueueLock, so we just
- * build a list of target PIDs.
+ * Identify all backends with MyDatabaseId that are listening and not
+ * already up-to-date. We don't want to send signals while holding the
+ * AsyncQueueLock, so we just build a list of target PIDs.
*
* XXX in principle these pallocs could fail, which would be bad. Maybe
* preallocate the arrays? But in practice this is only run in trivial
@@ -1557,7 +1593,7 @@ SignalBackends(void)
for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
{
pid = QUEUE_BACKEND_PID(i);
- if (pid != InvalidPid && pid != MyProcPid)
+ if (pid != InvalidPid && pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{
QueuePosition pos = QUEUE_BACKEND_POS(i);
@@ -1782,6 +1818,9 @@ asyncQueueReadAllNotifications(void)
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
+ /* If we're behind, we possibly got signalled to catchup. Remember
+ * this so we attempt to advance the tail later */
+ advanceTail = QUEUE_SLOW_BACKEND(MyBackendId);
LWLockRelease(AsyncQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
@@ -1889,12 +1928,9 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
+ /* We don't try to advance the tail here. */
PG_RE_THROW();
}
@@ -1903,10 +1939,10 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
+ /* We were behind, so try to advance the tail pointer, possibly
+ * signalling another backend if necessary */
if (advanceTail)
asyncQueueAdvanceTail();
@@ -2016,8 +2052,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
}
/*
- * Advance the shared queue tail variable to the minimum of all the
- * per-backend tail pointers. Truncate pg_notify space if possible.
+ * Advance the shared queue tail variable if possible. If a slow backend is
+ * holding everything up, signal it. Truncate pg_notify space if possible.
*/
static void
asyncQueueAdvanceTail(void)
@@ -2027,18 +2063,43 @@ asyncQueueAdvanceTail(void)
int oldtailpage;
int newtailpage;
int boundary;
+ int slowbackendid = InvalidBackendId;
+ int slowbackendpid;
+ /* Advance the tail as far as possible, noting if there is a slow
+ * backend we could kick */
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (i = QUEUE_FIRST_LISTENER; i; i = QUEUE_LISTENER_NEXT(i))
{
if (QUEUE_BACKEND_PID(i) != InvalidPid)
+ {
+ if (QUEUE_BACKEND_PID(i) != MyProcPid && QUEUE_SLOW_BACKEND(i))
+ {
+ slowbackendid = i;
+ slowbackendpid = QUEUE_BACKEND_PID(i);
+ }
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+ }
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;
LWLockRelease(AsyncQueueLock);
+ /* At least one backend was slow, so signal a random one to wake it
+ * up. It should in turn call this function to signal the next,
+ * see asyncQueueReadAllNotifications() */
+ if (slowbackendid != InvalidBackendId) {
+
+ /* Note: assuming things aren't broken, a signal failure here could
+ * only occur if the target backend exited since we released
+ * AsyncQueueLock; which is unlikely but certainly possible. So we
+ * just log a low-level debug message if it happens.
+ */
+ if (SendProcSignal(slowbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowbackendid) < 0)
+ elog(DEBUG3, "could not signal backend with PID %d: %m", slowbackendpid);
+ }
+
/*
* We can truncate something if the global tail advanced across an SLRU
* segment boundary.
--
2.11.0
0003-Debugging-for-NOTIFY.patchtext/x-patch; charset=US-ASCII; name=0003-Debugging-for-NOTIFY.patchDownload
From a499a683c6f4a93bdc58ea05cecb531d2b917cf7 Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <oosterhout@fox-it.com>
Date: Fri, 2 Aug 2019 14:22:46 +0200
Subject: [PATCH 3/3] Debugging for NOTIFY
---
src/backend/commands/async.c | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 20bed5ca71..597e568c85 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -880,12 +880,20 @@ PreCommit_Notify(void)
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+ if (Trace_notify)
+ elog(DEBUG1, "NOTIFY QUEUE = (%d,%d)...(%d,%d)",
+ QUEUE_POS_PAGE(QUEUE_TAIL), QUEUE_POS_OFFSET(QUEUE_TAIL),
+ QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_OFFSET(QUEUE_HEAD));
/* If we are advancing to a new page, remember this so after the
* transaction commits we can attempt to advance the tail
* pointer, see ProcessCompletedNotifies() */
if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+ {
+ if (Trace_notify)
+ elog(DEBUG1, "backendTryAdvanceTail -> true");
backendTryAdvanceTail = true;
+ }
LWLockRelease(AsyncQueueLock);
}
}
@@ -1194,6 +1202,8 @@ ProcessCompletedNotifies(void)
* queue, so we try to advance the tail ourselves, possibly waking
* up another backend if it is running behind */
backendTryAdvanceTail = false;
+ if (Trace_notify)
+ elog(DEBUG1, "backendTryAdvanceTail -> false");
asyncQueueAdvanceTail();
}
@@ -2066,6 +2076,8 @@ asyncQueueAdvanceTail(void)
int slowbackendid = InvalidBackendId;
int slowbackendpid;
+ if (Trace_notify)
+ elog(DEBUG1, "asyncQueueAdvanceTail");
/* Advance the tail as far as possible, noting if there is a slow
* backend we could kick */
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
@@ -2096,6 +2108,8 @@ asyncQueueAdvanceTail(void)
* AsyncQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
+ if (Trace_notify)
+ elog(DEBUG1, "waking backend %d (pid %d)", slowbackendid, slowbackendpid);
if (SendProcSignal(slowbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowbackendid) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", slowbackendpid);
}
--
2.11.0
Martijn van Oosterhout <kleptog@gmail.com> writes:
The original three patches have been collapsed into one as given the
changes discussed it didn't make sense to keep them separate. There
are now two patches (the third is just to help with testing):
Patch 1: Tracks the listening backends in a list so non-listening
backends can be quickly skipped over. This is separate because it's
orthogonal to the rest of the changes and there are other ways to do
this.
Patch 2: This is the meat of the change. It implements all the
suggestions discussed:
I pushed 0001 after doing some hacking on it --- it was sloppy about
datatypes, and about whether the invalid-entry value is 0 or -1,
and it was just wrong about keeping the list in backendid order.
(You can't conditionally skip looking for where to put the new
entry, if you want to maintain the order. I thought about just
defining the list as unordered, which would simplify joining the
list initially, but that could get pretty cache-unfriendly when
there are lots of entries.)
0002 is now going to need a rebase, so please do that.
regards, tom lane
Hoi Tom,
On Wed, 11 Sep 2019 at 00:18, Tom Lane <tgl@sss.pgh.pa.us> wrote:
I pushed 0001 after doing some hacking on it --- it was sloppy about
datatypes, and about whether the invalid-entry value is 0 or -1,
and it was just wrong about keeping the list in backendid order.
(You can't conditionally skip looking for where to put the new
entry, if you want to maintain the order. I thought about just
defining the list as unordered, which would simplify joining the
list initially, but that could get pretty cache-unfriendly when
there are lots of entries.)0002 is now going to need a rebase, so please do that.
Thanks for this, and good catch. Looks like I didn't test the first patch
by itself very well.
Here is the rebased second patch.
Thanks in advance,
--
Martijn van Oosterhout <kleptog@gmail.com> http://svana.org/kleptog/
Attachments:
0002-Improve-performance-of-async-notifications.patchtext/x-patch; charset=US-ASCII; name=0002-Improve-performance-of-async-notifications.patchDownload
From bc4b1b458564f758b7fa1c1f7b0397aade71db06 Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <oosterhout@fox-it.com>
Date: Mon, 3 Jun 2019 17:13:31 +0200
Subject: [PATCH 1/2] Improve performance of async notifications
Advancing the tail pointer requires an exclusive lock which can block
backends from other databases, so it's worth keeping these attempts to a
minimum.
Instead of tracking the slowest backend exactly we update the queue more
lazily, only checking when we switch to a new SLRU page. Additionally,
instead of waking up every slow backend at once, we do them one at a time.
---
src/backend/commands/async.c | 142 +++++++++++++++++++++++++----------
1 file changed, 101 insertions(+), 41 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b5ea..b9dd0ca139 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -73,10 +73,11 @@
* Finally, after we are out of the transaction altogether, we check if
* we need to signal listening backends. In SignalBackends() we scan the
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- * to every listening backend (we don't know which backend is listening on
- * which channel so we must signal them all). We can exclude backends that
- * are already up to date, though. We don't bother with a self-signal
- * either, but just process the queue directly.
+ * to every listening backend for the relavent database (we don't know
+ * which backend is listening on which channel so we must signal them
+ * all). We can exclude backends that are already up to date, though.
+ * We don't bother with a self-signal either, but just process the queue
+ * directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* sets the process's latch, which triggers the event to be processed
@@ -89,13 +90,25 @@
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
- * the head pointer's position. Then we check if we were the laziest
- * backend: if our pointer is set to the same position as the global tail
- * pointer is set, then we move the global tail pointer ahead to where the
- * second-laziest backend is (in general, we take the MIN of the current
- * head position and all active backends' new tail pointers). Whenever we
- * move the global tail pointer we also truncate now-unused pages (i.e.,
- * delete files in pg_notify/ that are no longer used).
+ * the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and minimize disk space the tail pointer
+ * needs to be advanced so that old pages can be truncated. This
+ * however requires an exclusive lock and as such should be done
+ * infrequently.
+ *
+ * When a new notification is added, the writer checks to see if the
+ * tail pointer is more than QUEUE_CLEANUP_DELAY pages behind. If
+ * so, it attempts to advance the tail, and if there are slow
+ * backends (perhaps because all the notifications were for other
+ * databases), wake one of them up by sending a signal.
+ *
+ * When the slow backend processes the queue it notes it was behind
+ * and so also tries to advance the tail, possibly waking up another
+ * slow backend. Eventually all backends will have processed the
+ * queue and the global tail pointer is move to a new page and we
+ * also truncate now-unused pages (i.e., delete files in pg_notify/
+ * that are no longer used).
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -211,6 +224,12 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* how many pages does a backend need to be behind before it needs to be signalled */
+#define QUEUE_CLEANUP_DELAY 4
+
+/* is a backend so far behind it needs to be signalled? */
+#define QUEUE_SLOW_BACKEND(i) \
+ (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY)
/*
* Struct describing a listening backend's status
*/
@@ -252,7 +271,7 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the pos of
+ QueuePosition tail; /* the global tail is some place older than the
* the "slowest" backend */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
@@ -402,10 +421,15 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
+/* has this backend switched to new page, and so should attempt to advance
+ * the queue tail? */
+static bool backendTryAdvanceTail = false;
+
/* GUC parameter */
bool Trace_notify = false;
/* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +445,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static bool SignalMyDBBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
@@ -438,8 +462,8 @@ static void ClearPendingActionsAndNotifies(void);
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
*/
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
{
int diff;
@@ -455,7 +479,13 @@ asyncQueuePagePrecedes(int p, int q)
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
- return diff < 0;
+ return diff;
+}
+
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+ return asyncQueuePageDiff(p, q) < 0;
}
/*
@@ -905,6 +935,12 @@ PreCommit_Notify(void)
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+
+ /* If we are advancing to a new page, remember this so after the
+ * transaction commits we can attempt to advance the tail
+ * pointer, see ProcessCompletedNotifies() */
+ if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+ backendTryAdvanceTail = true;
LWLockRelease(AsyncQueueLock);
}
}
@@ -1051,8 +1087,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend. Also, although our transaction might
* have executed NOTIFY, those message(s) aren't queued yet so we can't
* see them in the queue.
- *
- * This will also advance the global tail pointer if possible.
*/
if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications();
@@ -1185,7 +1219,7 @@ ProcessCompletedNotifies(void)
StartTransactionCommand();
/* Send signals to other backends */
- signalled = SignalBackends();
+ signalled = SignalMyDBBackends();
if (listenChannels != NIL)
{
@@ -1203,6 +1237,16 @@ ProcessCompletedNotifies(void)
* harmless.)
*/
asyncQueueAdvanceTail();
+ backendTryAdvanceTail = false;
+ }
+
+ if (backendTryAdvanceTail)
+ {
+ /* We switched to a new page while writing our notifies to the
+ * queue, so we try to advance the tail ourselves, possibly waking
+ * up another backend if it is running behind */
+ backendTryAdvanceTail = false;
+ asyncQueueAdvanceTail();
}
CommitTransactionCommand();
@@ -1253,10 +1297,7 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- /* check if entry is valid and oldest ... */
- advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
- QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
- /* ... then mark it invalid */
+ /* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
/* and remove it from the list */
@@ -1278,10 +1319,6 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
-
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
}
/*
@@ -1570,7 +1607,7 @@ asyncQueueFillWarning(void)
}
/*
- * Send signals to all listening backends (except our own).
+ * Send signals to all listening backends (except our own) for our database.
*
* Returns true if we sent at least one signal.
*
@@ -1583,7 +1620,7 @@ asyncQueueFillWarning(void)
* Since we know the BackendId and the Pid the signalling is quite cheap.
*/
static bool
-SignalBackends(void)
+SignalMyDBBackends(void)
{
bool signalled = false;
int32 *pids;
@@ -1592,9 +1629,9 @@ SignalBackends(void)
int32 pid;
/*
- * Identify all backends that are listening and not already up-to-date. We
- * don't want to send signals while holding the AsyncQueueLock, so we just
- * build a list of target PIDs.
+ * Identify all backends with MyDatabaseId that are listening and not
+ * already up-to-date. We don't want to send signals while holding the
+ * AsyncQueueLock, so we just build a list of target PIDs.
*
* XXX in principle these pallocs could fail, which would be bad. Maybe
* preallocate the arrays? But in practice this is only run in trivial
@@ -1609,7 +1646,7 @@ SignalBackends(void)
{
pid = QUEUE_BACKEND_PID(i);
Assert(pid != InvalidPid);
- if (pid != MyProcPid)
+ if (pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{
QueuePosition pos = QUEUE_BACKEND_POS(i);
@@ -1859,6 +1896,9 @@ asyncQueueReadAllNotifications(void)
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
+ /* If we're behind, we possibly got signalled to catchup. Remember
+ * this so we attempt to advance the tail later */
+ advanceTail = QUEUE_SLOW_BACKEND(MyBackendId);
LWLockRelease(AsyncQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
@@ -1966,12 +2006,9 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
+ /* We don't try to advance the tail here. */
PG_RE_THROW();
}
@@ -1980,10 +2017,10 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
+ /* We were behind, so try to advance the tail pointer, possibly
+ * signalling another backend if necessary */
if (advanceTail)
asyncQueueAdvanceTail();
@@ -2093,8 +2130,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
}
/*
- * Advance the shared queue tail variable to the minimum of all the
- * per-backend tail pointers. Truncate pg_notify space if possible.
+ * Advance the shared queue tail variable if possible. If a slow backend is
+ * holding everything up, signal it. Truncate pg_notify space if possible.
*/
static void
asyncQueueAdvanceTail(void)
@@ -2103,18 +2140,41 @@ asyncQueueAdvanceTail(void)
int oldtailpage;
int newtailpage;
int boundary;
+ int slowbackendid = InvalidBackendId;
+ int slowbackendpid;
+ /* Advance the tail as far as possible, noting if there is a slow
+ * backend we could kick */
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
+ if (QUEUE_BACKEND_PID(i) != MyProcPid && QUEUE_SLOW_BACKEND(i))
+ {
+ slowbackendid = i;
+ slowbackendpid = QUEUE_BACKEND_PID(i);
+ }
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;
LWLockRelease(AsyncQueueLock);
+ /* At least one backend was slow, so signal a random one to wake it
+ * up. It should in turn call this function to signal the next,
+ * see asyncQueueReadAllNotifications() */
+ if (slowbackendid != InvalidBackendId) {
+
+ /* Note: assuming things aren't broken, a signal failure here could
+ * only occur if the target backend exited since we released
+ * AsyncQueueLock; which is unlikely but certainly possible. So we
+ * just log a low-level debug message if it happens.
+ */
+ if (SendProcSignal(slowbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowbackendid) < 0)
+ elog(DEBUG3, "could not signal backend with PID %d: %m", slowbackendpid);
+ }
+
/*
* We can truncate something if the global tail advanced across an SLRU
* segment boundary.
--
2.20.1
Martijn van Oosterhout <kleptog@gmail.com> writes:
Here is the rebased second patch.
This throws multiple compiler warnings for me:
async.c: In function 'asyncQueueUnregister':
async.c:1293: warning: unused variable 'advanceTail'
async.c: In function 'asyncQueueAdvanceTail':
async.c:2153: warning: 'slowbackendpid' may be used uninitialized in this function
Also, I don't exactly believe this bit:
+ /* If we are advancing to a new page, remember this so after the
+ * transaction commits we can attempt to advance the tail
+ * pointer, see ProcessCompletedNotifies() */
+ if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+ backendTryAdvanceTail = true;
It seems unlikely that insertion would stop exactly at a page boundary,
but that seems to be what this is looking for.
But, really ... do we need the backendTryAdvanceTail flag at all?
I'm dubious, because it seems like asyncQueueReadAllNotifications
would have already covered the case if we're listening. If we're
not listening, but we signalled some other listeners, it falls
to them to kick us if we're the slowest backend. If we're not the
slowest backend then doing asyncQueueAdvanceTail isn't useful.
I agree with getting rid of the asyncQueueAdvanceTail call in
asyncQueueUnregister; on reflection doing that there seems pretty unsafe,
because we're not necessarily in a transaction and hence anything that
could possibly error is a bad idea. However, it'd be good to add a
comment explaining that we're not doing that and why it's ok not to.
I'm fairly unimpressed with the "kick a random slow backend" logic.
There can be no point in kicking any but the slowest backend, ie
one whose pointer is exactly the oldest. Since we're already computing
the min pointer in that loop, it would actually take *less* logic inside
the loop to remember the/a backend that had that pointer value, and then
decide afterwards whether it's slow enough to merit a kick.
regards, tom lane
Hoi Tom,
On Fri, 13 Sep 2019 at 22:04, Tom Lane <tgl@sss.pgh.pa.us> wrote:
This throws multiple compiler warnings for me:
Fixed.
Also, I don't exactly believe this bit:
[snip]
It seems unlikely that insertion would stop exactly at a page boundary,
but that seems to be what this is looking for.
This is how asyncQueueAddEntries() works. Entries are never split over
pages. If there is not enough room, then it advances to the beginning
of the next page and returns. Hence here the offset is zero. I could
set the global inside asyncQueueAddEntries() but that seems icky.
Another alternative is to have asyncQueueAddEntries() return a boolean
"moved to new page", but that's just a long-winded way of doing what
it is now.
But, really ... do we need the backendTryAdvanceTail flag at all?
I'm dubious, because it seems like asyncQueueReadAllNotifications
would have already covered the case if we're listening. If we're
not listening, but we signalled some other listeners, it falls
to them to kick us if we're the slowest backend. If we're not the
slowest backend then doing asyncQueueAdvanceTail isn't useful.
There are multiple issues here. asyncQueueReadAllNotifications() is
going to be called by each listener simultaneously, so each listener
is going to come to the same conclusion. On the other side, there is
no guarantee we wake up anyone as a result of the NOTIFY, e.g. if
there are no listeners in the current database. To be sure you try to
advance the tail, you have to trigger on the sending side. The global
is there because at the point we are inserting entries we are still in
a user transaction, potentially holding many table locks (the issue we
were running into in the first place). By setting
backendTryAdvanceTail we can move the work to
ProcessCompletedNotifies() which is after the transaction has
committed and the locks released.
I agree with getting rid of the asyncQueueAdvanceTail call in
asyncQueueUnregister; on reflection doing that there seems pretty unsafe,
because we're not necessarily in a transaction and hence anything that
could possibly error is a bad idea. However, it'd be good to add a
comment explaining that we're not doing that and why it's ok not to.
Comment added.
I'm fairly unimpressed with the "kick a random slow backend" logic.
There can be no point in kicking any but the slowest backend, ie
one whose pointer is exactly the oldest. Since we're already computing
the min pointer in that loop, it would actually take *less* logic inside
the loop to remember the/a backend that had that pointer value, and then
decide afterwards whether it's slow enough to merit a kick.
Adjusted this. I'm not sure it's actually clearer this way, but it is
less work inside the loop. A small change is that now it won't signal
anyone if this backend is the slowest, which more correct.
Thanks for the feedback. Attached is version 3.
Have a nice weekend,
--
Martijn van Oosterhout <kleptog@gmail.com> http://svana.org/kleptog/
Attachments:
0002-Improve-performance-of-async-notifications-v3.patchtext/x-patch; charset=US-ASCII; name=0002-Improve-performance-of-async-notifications-v3.patchDownload
From 539d97b47c4535314c23df22e5e87ecc43149f3a Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <kleptog@svana.org>
Date: Sat, 14 Sep 2019 11:01:11 +0200
Subject: [PATCH 1/2] Improve performance of async notifications
Advancing the tail pointer requires an exclusive lock which can block
backends from other databases, so it's worth keeping these attempts to a
minimum.
Instead of tracking the slowest backend exactly we update the queue more
lazily, only checking when we switch to a new SLRU page. Additionally,
instead of waking up every slow backend at once, we do them one at a time.
---
src/backend/commands/async.c | 167 ++++++++++++++++++++++++++---------
1 file changed, 124 insertions(+), 43 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b5ea..ffd7c7e90b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -73,10 +73,11 @@
* Finally, after we are out of the transaction altogether, we check if
* we need to signal listening backends. In SignalBackends() we scan the
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- * to every listening backend (we don't know which backend is listening on
- * which channel so we must signal them all). We can exclude backends that
- * are already up to date, though. We don't bother with a self-signal
- * either, but just process the queue directly.
+ * to every listening backend for the relavent database (we don't know
+ * which backend is listening on which channel so we must signal them
+ * all). We can exclude backends that are already up to date, though.
+ * We don't bother with a self-signal either, but just process the queue
+ * directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* sets the process's latch, which triggers the event to be processed
@@ -89,13 +90,25 @@
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
- * the head pointer's position. Then we check if we were the laziest
- * backend: if our pointer is set to the same position as the global tail
- * pointer is set, then we move the global tail pointer ahead to where the
- * second-laziest backend is (in general, we take the MIN of the current
- * head position and all active backends' new tail pointers). Whenever we
- * move the global tail pointer we also truncate now-unused pages (i.e.,
- * delete files in pg_notify/ that are no longer used).
+ * the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and minimize disk space the tail pointer
+ * needs to be advanced so that old pages can be truncated. This
+ * however requires an exclusive lock and as such should be done
+ * infrequently.
+ *
+ * When a new notification is added, the writer checks to see if the
+ * tail pointer is more than QUEUE_CLEANUP_DELAY pages behind. If
+ * so, it attempts to advance the tail, and if there are slow
+ * backends (perhaps because all the notifications were for other
+ * databases), wake the slowest by sending it a signal.
+ *
+ * When the slow backend processes the queue it notes it was behind
+ * and so also tries to advance the tail, possibly waking up another
+ * slow backend. Eventually all backends will have processed the
+ * queue and the global tail pointer is move to a new page and we
+ * also truncate now-unused pages (i.e., delete files in pg_notify/
+ * that are no longer used).
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -199,6 +212,12 @@ typedef struct QueuePosition
#define QUEUE_POS_EQUAL(x,y) \
((x).page == (y).page && (x).offset == (y).offset)
+/* compare QueuePositions */
+#define QUEUE_POS_LT(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (1) : \
+ (x).page != (y).page ? (0) : \
+ (x).offset < (y).offset ? (1) : (0))
+
/* choose logically smaller QueuePosition */
#define QUEUE_POS_MIN(x,y) \
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
@@ -211,6 +230,12 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* how many pages does a backend need to be behind before it needs to be signalled */
+#define QUEUE_CLEANUP_DELAY 4
+
+/* is a backend so far behind it needs to be signalled? */
+#define QUEUE_SLOW_BACKEND(i) \
+ (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY)
/*
* Struct describing a listening backend's status
*/
@@ -252,7 +277,7 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the pos of
+ QueuePosition tail; /* the global tail is some place older than the
* the "slowest" backend */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
@@ -402,10 +427,15 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
+/* has this backend switched to new page, and so should attempt to advance
+ * the queue tail? */
+static bool backendTryAdvanceTail = false;
+
/* GUC parameter */
bool Trace_notify = false;
/* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +451,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static bool SignalMyDBBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
@@ -438,8 +468,8 @@ static void ClearPendingActionsAndNotifies(void);
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
*/
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
{
int diff;
@@ -455,7 +485,13 @@ asyncQueuePagePrecedes(int p, int q)
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
- return diff < 0;
+ return diff;
+}
+
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+ return asyncQueuePageDiff(p, q) < 0;
}
/*
@@ -905,6 +941,12 @@ PreCommit_Notify(void)
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+
+ /* If we are advancing to a new page, remember this so after the
+ * transaction commits we can attempt to advance the tail
+ * pointer, see ProcessCompletedNotifies() */
+ if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+ backendTryAdvanceTail = true;
LWLockRelease(AsyncQueueLock);
}
}
@@ -1051,8 +1093,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend. Also, although our transaction might
* have executed NOTIFY, those message(s) aren't queued yet so we can't
* see them in the queue.
- *
- * This will also advance the global tail pointer if possible.
*/
if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications();
@@ -1185,7 +1225,7 @@ ProcessCompletedNotifies(void)
StartTransactionCommand();
/* Send signals to other backends */
- signalled = SignalBackends();
+ signalled = SignalMyDBBackends();
if (listenChannels != NIL)
{
@@ -1203,6 +1243,16 @@ ProcessCompletedNotifies(void)
* harmless.)
*/
asyncQueueAdvanceTail();
+ backendTryAdvanceTail = false;
+ }
+
+ if (backendTryAdvanceTail)
+ {
+ /* We switched to a new page while writing our notifies to the
+ * queue, so we try to advance the tail ourselves, possibly waking
+ * up another backend if it is running behind */
+ backendTryAdvanceTail = false;
+ asyncQueueAdvanceTail();
}
CommitTransactionCommand();
@@ -1242,8 +1292,6 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- bool advanceTail;
-
Assert(listenChannels == NIL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
@@ -1253,10 +1301,7 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- /* check if entry is valid and oldest ... */
- advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
- QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
- /* ... then mark it invalid */
+ /* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
/* and remove it from the list */
@@ -1279,9 +1324,9 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
+ /* Don't try to advance the tail. We're possibly not in a
+ transaction to handle errors, and it'll get cleaned up later
+ anyway. */
}
/*
@@ -1570,7 +1615,7 @@ asyncQueueFillWarning(void)
}
/*
- * Send signals to all listening backends (except our own).
+ * Send signals to all listening backends (except our own) for our database.
*
* Returns true if we sent at least one signal.
*
@@ -1583,7 +1628,7 @@ asyncQueueFillWarning(void)
* Since we know the BackendId and the Pid the signalling is quite cheap.
*/
static bool
-SignalBackends(void)
+SignalMyDBBackends(void)
{
bool signalled = false;
int32 *pids;
@@ -1592,9 +1637,9 @@ SignalBackends(void)
int32 pid;
/*
- * Identify all backends that are listening and not already up-to-date. We
- * don't want to send signals while holding the AsyncQueueLock, so we just
- * build a list of target PIDs.
+ * Identify all backends with MyDatabaseId that are listening and not
+ * already up-to-date. We don't want to send signals while holding the
+ * AsyncQueueLock, so we just build a list of target PIDs.
*
* XXX in principle these pallocs could fail, which would be bad. Maybe
* preallocate the arrays? But in practice this is only run in trivial
@@ -1609,7 +1654,7 @@ SignalBackends(void)
{
pid = QUEUE_BACKEND_PID(i);
Assert(pid != InvalidPid);
- if (pid != MyProcPid)
+ if (pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{
QueuePosition pos = QUEUE_BACKEND_POS(i);
@@ -1859,6 +1904,9 @@ asyncQueueReadAllNotifications(void)
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
+ /* If we're behind, we possibly got signalled to catchup. Remember
+ * this so we attempt to advance the tail later */
+ advanceTail = QUEUE_SLOW_BACKEND(MyBackendId);
LWLockRelease(AsyncQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
@@ -1966,12 +2014,9 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
+ /* We don't try to advance the tail here. */
PG_RE_THROW();
}
@@ -1980,10 +2025,10 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
+ /* We were behind, so try to advance the tail pointer, possibly
+ * signalling another backend if necessary */
if (advanceTail)
asyncQueueAdvanceTail();
@@ -2093,8 +2138,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
}
/*
- * Advance the shared queue tail variable to the minimum of all the
- * per-backend tail pointers. Truncate pg_notify space if possible.
+ * Advance the shared queue tail variable if possible. If a slow backend is
+ * holding everything up, signal it. Truncate pg_notify space if possible.
*/
static void
asyncQueueAdvanceTail(void)
@@ -2103,18 +2148,54 @@ asyncQueueAdvanceTail(void)
int oldtailpage;
int newtailpage;
int boundary;
+ int slowestbackendid = InvalidBackendId;
+ int slowestbackendpid;
+ /* Advance the tail as far as possible, noting if there is a slow
+ * backend we could kick */
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
- min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+ if (QUEUE_POS_LT(QUEUE_BACKEND_POS(i), min))
+ {
+ /* this finds the tail of the queue and remembers who */
+ min = QUEUE_BACKEND_POS(i);
+ slowestbackendid = i;
+ }
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;
+ /* if the we weren't the slowest, get the pid so we can kick it */
+ if (slowestbackendid != InvalidBackendId)
+ {
+ if (QUEUE_SLOW_BACKEND(slowestbackendid) &&
+ QUEUE_BACKEND_PID(slowestbackendid) != MyProcPid)
+ {
+ slowestbackendpid = QUEUE_BACKEND_PID(slowestbackendid);
+ }
+ else
+ {
+ slowestbackendid = InvalidBackendId;
+ }
+ }
LWLockRelease(AsyncQueueLock);
+ /* Wake up the backend furthest behind, if it is considered "slow".
+ * It should in turn call this function to signal the next, see
+ * asyncQueueReadAllNotifications() */
+ if (slowestbackendid != InvalidBackendId) {
+
+ /* Note: assuming things aren't broken, a signal failure here could
+ * only occur if the target backend exited since we released
+ * AsyncQueueLock; which is unlikely but certainly possible. So we
+ * just log a low-level debug message if it happens.
+ */
+ if (SendProcSignal(slowestbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowestbackendid) < 0)
+ elog(DEBUG3, "could not signal backend with PID %d: %m", slowestbackendpid);
+ }
+
/*
* We can truncate something if the global tail advanced across an SLRU
* segment boundary.
--
2.17.1
Martijn van Oosterhout <kleptog@gmail.com> writes:
On Fri, 13 Sep 2019 at 22:04, Tom Lane <tgl@sss.pgh.pa.us> wrote:
But, really ... do we need the backendTryAdvanceTail flag at all?
There are multiple issues here. asyncQueueReadAllNotifications() is
going to be called by each listener simultaneously, so each listener
is going to come to the same conclusion. On the other side, there is
no guarantee we wake up anyone as a result of the NOTIFY, e.g. if
there are no listeners in the current database. To be sure you try to
advance the tail, you have to trigger on the sending side. The global
is there because at the point we are inserting entries we are still in
a user transaction, potentially holding many table locks (the issue we
were running into in the first place). By setting
backendTryAdvanceTail we can move the work to
ProcessCompletedNotifies() which is after the transaction has
committed and the locks released.
None of this seems to respond to my point: it looks to me like it would
work fine if you simply dropped the patch's additions in PreCommit_Notify
and ProcessCompletedNotifies, because there is already enough logic to
decide when to call asyncQueueAdvanceTail. In particular, the result from
Signal[MyDB]Backends tells us whether anyone else was awakened, and
ProcessCompletedNotifies already does asyncQueueAdvanceTail if not.
As long as we did awaken someone, the ball's now in their court to
make sure asyncQueueAdvanceTail happens eventually.
There are corner cases where someone else might get signaled but never
do asyncQueueAdvanceTail -- for example, if they're in process of exiting
--- but I think the whole point of this patch is that we don't care too
much if that occasionally fails to happen. If there's a continuing
stream of NOTIFY activity, asyncQueueAdvanceTail will happen often
enough to ensure that the queue storage doesn't bloat unreasonably.
regards, tom lane
On Sat, 14 Sep 2019 at 17:08, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Martijn van Oosterhout <kleptog@gmail.com> writes:
On Fri, 13 Sep 2019 at 22:04, Tom Lane <tgl@sss.pgh.pa.us> wrote:
But, really ... do we need the backendTryAdvanceTail flag at all?
None of this seems to respond to my point: it looks to me like it would
work fine if you simply dropped the patch's additions in PreCommit_Notify
and ProcessCompletedNotifies, because there is already enough logic to
decide when to call asyncQueueAdvanceTail. In particular, the result from
Signal[MyDB]Backends tells us whether anyone else was awakened, and
ProcessCompletedNotifies already does asyncQueueAdvanceTail if not.
As long as we did awaken someone, the ball's now in their court to
make sure asyncQueueAdvanceTail happens eventually.
Ah, I think I see what you're getting at. As written,
asyncQueueReadAllNotifications() only calls asyncQueueAdvanceTail() if
*it* was a slow backend (advanceTail =
QUEUE_SLOW_BACKEND(MyBackendId)). In a situation where some databases
are regularly using NOTIFY and a few others never (but still
listening) it will lead to the situation where the tail never gets
advanced.
However, I guess you're thinking of asyncQueueReadAllNotifications()
triggering if the queue as a whole was too long. This could in
principle work but it does mean that at some point all backends
sending NOTIFY are going to start calling asyncQueueAdvanceTail()
every time, until the tail gets advanced, and if there are many idle
listening backends behind this could take a while. The slowest backend
might receive more signals while it is processing and so end up
running asyncQueueAdvanceTail() twice. The fact that signals coalesce
stops the process getting completely out of hand but it does feel a
little uncontrolled.
The whole point of this patch is to ensure that at any time only one
backend is being woken up and calling asyncQueueAdvanceTail() at a
time.
But you do point out that the use of the return value of
SignalMyDBBackends() is used wrongly. The fact that no-one got
signalled only meant there were no other listeners on this database
which means nothing in terms of global queue cleanup. What you want to
know is if you're the only listener in the whole system and you can
test for that directly (QUEUE_FIRST_BACKEND == MyBackendId &&
QUEUE_NEXT_BACKEND(MyBackendId) == InvalidBackendId). I can adjust
this in the next version if necessary, it's fairly harmless as is as
it only triggers in the case where a database is only notifying
itself, which probably isn't that common.
I hope I have correctly understood this time.
Have a nice weekend.
--
Martijn van Oosterhout <kleptog@gmail.com> http://svana.org/kleptog/
Martijn van Oosterhout <kleptog@gmail.com> writes:
On Sat, 14 Sep 2019 at 17:08, Tom Lane <tgl@sss.pgh.pa.us> wrote:
None of this seems to respond to my point: it looks to me like it would
work fine if you simply dropped the patch's additions in PreCommit_Notify
and ProcessCompletedNotifies, because there is already enough logic to
decide when to call asyncQueueAdvanceTail.
...
However, I guess you're thinking of asyncQueueReadAllNotifications()
triggering if the queue as a whole was too long. This could in
principle work but it does mean that at some point all backends
sending NOTIFY are going to start calling asyncQueueAdvanceTail()
every time, until the tail gets advanced, and if there are many idle
listening backends behind this could take a while. The slowest backend
might receive more signals while it is processing and so end up
running asyncQueueAdvanceTail() twice. The fact that signals coalesce
stops the process getting completely out of hand but it does feel a
little uncontrolled.
The whole point of this patch is to ensure that at any time only one
backend is being woken up and calling asyncQueueAdvanceTail() at a
time.
I spent some more time thinking about this, and I'm still not too
satisfied with this patch's approach. It seems to me the key insights
we're trying to make use of are:
1. We don't really need to keep the global tail pointer exactly
up to date. It's bad if it falls way behind, but a few pages back
is fine.
2. When sending notifies, only listening backends connected to our
own database need be awakened immediately. Backends connected to
other DBs will need to advance their queue pointer sometime, but
again it doesn't need to be right away.
3. It's bad for multiple processes to all be trying to do
asyncQueueAdvanceTail concurrently: they'll contend for exclusive
access to the AsyncQueueLock. Therefore, having the listeners
do it is really the wrong thing, and instead we should do it on
the sending side.
However, the patch as presented doesn't go all the way on point 3,
instead having listeners maybe-or-maybe-not do asyncQueueAdvanceTail
in asyncQueueReadAllNotifications. I propose that we should go all
the way and just define tail-advancing as something that happens on
the sending side, and only once every few pages. I also think we
can simplify the handling of other-database listeners by including
them in the set signaled by SignalBackends, but only if they're
several pages behind. So that leads me to the attached patch;
what do you think?
BTW, in my hands it seems like point 2 (skip wakening other-database
listeners) is the only really significant win here, and of course
that only wins when the notify traffic is spread across a fair number
of databases. Which I fear is not the typical use-case. In single-DB
use-cases, point 2 helps not at all. I had a really hard time measuring
any benefit from point 3 --- I eventually saw a noticeable savings
when I tried having one notifier and 100 listen-only backends, but
again that doesn't seem like a typical use-case. I could not replicate
your report of lots of time spent in asyncQueueAdvanceTail's lock
acquisition. I wonder whether you're using a very large max_connections
setting and we already fixed most of the problem with that in bca6e6435.
Still, this patch doesn't seem to make any cases worse, so I don't mind
if it's just improving unusual use-cases.
regards, tom lane
Attachments:
Improve-performance-of-async-notifications-v4.patchtext/x-diff; charset=us-ascii; name=Improve-performance-of-async-notifications-v4.patchDownload
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b..7791f78 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -75,8 +75,10 @@
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend (we don't know which backend is listening on
* which channel so we must signal them all). We can exclude backends that
- * are already up to date, though. We don't bother with a self-signal
- * either, but just process the queue directly.
+ * are already up to date, though, and we can also exclude backends that
+ * are in other databases (unless they are way behind and should be kicked
+ * to make them advance their pointers). We don't bother with a
+ * self-signal either, but just process the queue directly.
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* sets the process's latch, which triggers the event to be processed
@@ -89,13 +91,14 @@
* Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or
- * the head pointer's position. Then we check if we were the laziest
- * backend: if our pointer is set to the same position as the global tail
- * pointer is set, then we move the global tail pointer ahead to where the
- * second-laziest backend is (in general, we take the MIN of the current
- * head position and all active backends' new tail pointers). Whenever we
- * move the global tail pointer we also truncate now-unused pages (i.e.,
- * delete files in pg_notify/ that are no longer used).
+ * the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
+ * pointer needs to be advanced so that old pages can be truncated.
+ * This is relatively expensive (notably, it requires an exclusive lock),
+ * so we don't want to do it often. We make sending backends do this work
+ * if they advanced the queue head into a new page, but only once every
+ * QUEUE_CLEANUP_DELAY pages.
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
@@ -212,6 +215,19 @@ typedef struct QueuePosition
(x).offset > (y).offset ? (x) : (y))
/*
+ * Parameter determining how often we try to advance the tail pointer:
+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
+ * also the distance by which a backend in another database needs to be
+ * behind before we'll decide we need to wake it up to advance its pointer.
+ *
+ * Resist the temptation to make this really large. While that would save
+ * work in some places, it would add cost in others. In particular, this
+ * should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
+ * catch up before the pages they'll need to read fall out of SLRU cache.
+ */
+#define QUEUE_CLEANUP_DELAY 4
+
+/*
* Struct describing a listening backend's status
*/
typedef struct QueueBackendStatus
@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
- QueuePosition tail; /* the global tail is equivalent to the pos of
- * the "slowest" backend */
+ QueuePosition tail; /* tail must be <= the queue position of every
+ * listening backend */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
@@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false;
+/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
+static bool backendTryAdvanceTail = false;
+
/* GUC parameter */
bool Trace_notify = false;
/* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static void SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
@@ -436,10 +456,11 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/*
- * We will work on the page range of 0..QUEUE_MAX_PAGE.
+ * Compute the difference between two queue page numbers (i.e., p - q),
+ * accounting for wraparound.
*/
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
{
int diff;
@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1;
- return diff < 0;
+ return diff;
+}
+
+/* Is p < q, accounting for wraparound? */
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+ return asyncQueuePageDiff(p, q) < 0;
}
/*
@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend. Also, although our transaction might
* have executed NOTIFY, those message(s) aren't queued yet so we can't
* see them in the queue.
- *
- * This will also advance the global tail pointer if possible.
*/
if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications();
@@ -1156,7 +1182,6 @@ void
ProcessCompletedNotifies(void)
{
MemoryContext caller_context;
- bool signalled;
/* Nothing to do if we didn't send any notifications */
if (!backendHasSentNotifications)
@@ -1185,23 +1210,20 @@ ProcessCompletedNotifies(void)
StartTransactionCommand();
/* Send signals to other backends */
- signalled = SignalBackends();
+ SignalBackends();
if (listenChannels != NIL)
{
/* Read the queue ourselves, and send relevant stuff to the frontend */
asyncQueueReadAllNotifications();
}
- else if (!signalled)
+
+ /*
+ * If it's time to try to advance the global tail pointer, do that.
+ */
+ if (backendTryAdvanceTail)
{
- /*
- * If we found no other listening backends, and we aren't listening
- * ourselves, then we must execute asyncQueueAdvanceTail to flush the
- * queue, because ain't nobody else gonna do it. This prevents queue
- * overflow when we're sending useless notifies to nobody. (A new
- * listener could have joined since we looked, but if so this is
- * harmless.)
- */
+ backendTryAdvanceTail = false;
asyncQueueAdvanceTail();
}
@@ -1242,8 +1264,6 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- bool advanceTail;
-
Assert(listenChannels == NIL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
@@ -1253,10 +1273,7 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links.
*/
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
- /* check if entry is valid and oldest ... */
- advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
- QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
- /* ... then mark it invalid */
+ /* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
/* and remove it from the list */
@@ -1278,10 +1295,6 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false;
-
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
}
/*
@@ -1467,6 +1480,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
* page without overrunning the queue.
*/
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
+
+ /*
+ * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
+ * set flag to remember that we should try to advance the tail
+ * pointer (we don't want to actually do that right here).
+ */
+ if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+ backendTryAdvanceTail = true;
+
/* And exit the loop */
break;
}
@@ -1570,31 +1592,30 @@ asyncQueueFillWarning(void)
}
/*
- * Send signals to all listening backends (except our own).
+ * Send signals to listening backends.
*
- * Returns true if we sent at least one signal.
+ * We never signal our own process; that should be handled by our caller.
*
- * Since we need EXCLUSIVE lock anyway we also check the position of the other
- * backends and in case one is already up-to-date we don't signal it.
- * This can happen if concurrent notifying transactions have sent a signal and
- * the signaled backend has read the other notifications and ours in the same
- * step.
+ * Normally we signal only backends in our own database, since only those
+ * backends could be interested in notifies we send. However, if there's
+ * notify traffic in our database but no traffic in another database that
+ * does have listener(s), those listeners will fall further and further
+ * behind. Waken them anyway if they're far enough behind, so that they'll
+ * advance their queue position pointers, allowing the global tail to advance.
*
* Since we know the BackendId and the Pid the signalling is quite cheap.
*/
-static bool
+static void
SignalBackends(void)
{
- bool signalled = false;
int32 *pids;
BackendId *ids;
int count;
- int32 pid;
/*
- * Identify all backends that are listening and not already up-to-date. We
- * don't want to send signals while holding the AsyncQueueLock, so we just
- * build a list of target PIDs.
+ * Identify backends that we need to signal. We don't want to send
+ * signals while holding the AsyncQueueLock, so this loop just builds a
+ * list of target PIDs.
*
* XXX in principle these pallocs could fail, which would be bad. Maybe
* preallocate the arrays? But in practice this is only run in trivial
@@ -1607,26 +1628,43 @@ SignalBackends(void)
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
- pid = QUEUE_BACKEND_PID(i);
+ int32 pid = QUEUE_BACKEND_PID(i);
+ QueuePosition pos;
+
Assert(pid != InvalidPid);
- if (pid != MyProcPid)
+ if (pid == MyProcPid)
+ continue; /* never signal self */
+ pos = QUEUE_BACKEND_POS(i);
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{
- QueuePosition pos = QUEUE_BACKEND_POS(i);
-
- if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
- {
- pids[count] = pid;
- ids[count] = i;
- count++;
- }
+ /*
+ * Always signal listeners in our own database, unless they're
+ * already caught up (unlikely, but possible).
+ */
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ continue;
+ }
+ else
+ {
+ /*
+ * Listeners in other databases should be signaled only if they
+ * are far behind.
+ */
+ if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+ QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+ continue;
}
+ /* OK, need to signal this one */
+ pids[count] = pid;
+ ids[count] = i;
+ count++;
}
LWLockRelease(AsyncQueueLock);
/* Now send signals */
for (int i = 0; i < count; i++)
{
- pid = pids[i];
+ int32 pid = pids[i];
/*
* Note: assuming things aren't broken, a signal failure here could
@@ -1636,14 +1674,10 @@ SignalBackends(void)
*/
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
- else
- signalled = true;
}
pfree(pids);
pfree(ids);
-
- return signalled;
}
/*
@@ -1844,7 +1878,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition oldpos;
QueuePosition head;
Snapshot snapshot;
- bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -1966,13 +1999,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
-
PG_RE_THROW();
}
PG_END_TRY();
@@ -1980,13 +2008,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
- advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock);
- /* If we were the laziest backend, try to advance the tail pointer */
- if (advanceTail)
- asyncQueueAdvanceTail();
-
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
Hoi Tom,
On Mon, 16 Sep 2019 at 00:14, Tom Lane <tgl@sss.pgh.pa.us> wrote:
I spent some more time thinking about this, and I'm still not too
satisfied with this patch's approach. It seems to me the key insights
we're trying to make use of are:1. We don't really need to keep the global tail pointer exactly
up to date. It's bad if it falls way behind, but a few pages back
is fine.
Agreed.
2. When sending notifies, only listening backends connected to our
own database need be awakened immediately. Backends connected to
other DBs will need to advance their queue pointer sometime, but
again it doesn't need to be right away.
Agreed.
3. It's bad for multiple processes to all be trying to do
asyncQueueAdvanceTail concurrently: they'll contend for exclusive
access to the AsyncQueueLock. Therefore, having the listeners
do it is really the wrong thing, and instead we should do it on
the sending side.
Agreed, but I'd add that listeners in databases that are largely idle
there may never be a sender, and thus need to be advanced up some
other way.
However, the patch as presented doesn't go all the way on point 3,
instead having listeners maybe-or-maybe-not do asyncQueueAdvanceTail
in asyncQueueReadAllNotifications. I propose that we should go all
the way and just define tail-advancing as something that happens on
the sending side, and only once every few pages. I also think we
can simplify the handling of other-database listeners by including
them in the set signaled by SignalBackends, but only if they're
several pages behind. So that leads me to the attached patch;
what do you think?
I think I like the idea of having SignalBackend do the waking up a
slow backend but I'm not enthused by the "lets wake up (at once)
everyone that is behind". That's one of the issues I was explicitly
trying to solve. If there are any significant number of "slow"
backends then we get the "thundering herd" again. If the number of
slow backends exceeds the number of cores then commits across the
system could be held up quite a while (which is what caused me to make
this patch, multiple seconds was not unusual).
The maybe/maybe not in asyncQueueReadAllNotifications is that "if I
was behind, then I probably got woken up, hence I need to wake up
someone else", thus ensuring the cleanup proceeds in an orderly
fashion, leaving gaps where the lock isn't held allowing COMMITs to
proceed.
BTW, in my hands it seems like point 2 (skip wakening other-database
listeners) is the only really significant win here, and of course
that only wins when the notify traffic is spread across a fair number
of databases. Which I fear is not the typical use-case. In single-DB
use-cases, point 2 helps not at all. I had a really hard time measuring
any benefit from point 3 --- I eventually saw a noticeable savings
when I tried having one notifier and 100 listen-only backends, but
again that doesn't seem like a typical use-case. I could not replicate
your report of lots of time spent in asyncQueueAdvanceTail's lock
acquisition. I wonder whether you're using a very large max_connections
setting and we already fixed most of the problem with that in bca6e6435.
Still, this patch doesn't seem to make any cases worse, so I don't mind
if it's just improving unusual use-cases.
I'm not sure if it's an unusual use-case, but it is my use-case :).
Specifically, there are 100+ instances of the same application running
on the same cluster with wildly different usage patterns. Some will be
idle because no-one is logged in, some will be quite busy. Although
there are only 2 listeners per database, that's still a lot of
listeners that can be behind. Though I agree that bca6e6435 will have
mitigated quite a lot (yes, max_connections is quite high). Another
mitigation would be to spread across more smaller database clusters,
which we need to do anyway.
That said, your approach is conceptually simpler which is also worth
something and it gets essentially all the same benefits for more
normal use cases. If the QUEUE_CLEANUP_DELAY were raised a bit then we
could do mitigation of the rest on the client side by having idle
databases send dummy notifies every now and then to trigger clean up
for their database. The flip-side is that slow backends will then have
further to catch up, thus holding the lock longer. It's not worth
making it configurable so we have to guess, but 16 is perhaps a good
compromise.
Have a nice day,
--
Martijn van Oosterhout <kleptog@gmail.com> http://svana.org/kleptog/
Martijn van Oosterhout <kleptog@gmail.com> writes:
On Mon, 16 Sep 2019 at 00:14, Tom Lane <tgl@sss.pgh.pa.us> wrote:
... I also think we
can simplify the handling of other-database listeners by including
them in the set signaled by SignalBackends, but only if they're
several pages behind. So that leads me to the attached patch;
what do you think?
I think I like the idea of having SignalBackend do the waking up a
slow backend but I'm not enthused by the "lets wake up (at once)
everyone that is behind". That's one of the issues I was explicitly
trying to solve. If there are any significant number of "slow"
backends then we get the "thundering herd" again.
But do we care? With asyncQueueAdvanceTail gone from the listeners,
there's no longer an exclusive lock for them to contend on. And,
again, I failed to see any significant contention even in HEAD as it
stands; so I'm unconvinced that you're solving a live problem.
regards, tom lane
Hoi Tom,
On Mon, 16 Sep 2019 at 15:33, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Martijn van Oosterhout <kleptog@gmail.com> writes:
I think I like the idea of having SignalBackend do the waking up a
slow backend but I'm not enthused by the "lets wake up (at once)
everyone that is behind". That's one of the issues I was explicitly
trying to solve. If there are any significant number of "slow"
backends then we get the "thundering herd" again.But do we care? With asyncQueueAdvanceTail gone from the listeners,
there's no longer an exclusive lock for them to contend on. And,
again, I failed to see any significant contention even in HEAD as it
stands; so I'm unconvinced that you're solving a live problem.
You're right, they only acquire a shared lock which is much less of a
problem. And I forgot that we're still reducing the load from a few
hundred signals and exclusive locks per NOTIFY to perhaps a dozen
shared locks every thousand messages. You'd be hard pressed to
demonstrate there's a real problem here.
So I think your patch is fine as is.
Looking at the release cycle it looks like the earliest either of
these patches will appear in a release is PG13, right?
Thanks again.
--
Martijn van Oosterhout <kleptog@gmail.com> http://svana.org/kleptog/
Martijn van Oosterhout <kleptog@gmail.com> writes:
On Mon, 16 Sep 2019 at 15:33, Tom Lane <tgl@sss.pgh.pa.us> wrote:
But do we care? With asyncQueueAdvanceTail gone from the listeners,
there's no longer an exclusive lock for them to contend on. And,
again, I failed to see any significant contention even in HEAD as it
stands; so I'm unconvinced that you're solving a live problem.
You're right, they only acquire a shared lock which is much less of a
problem. And I forgot that we're still reducing the load from a few
hundred signals and exclusive locks per NOTIFY to perhaps a dozen
shared locks every thousand messages. You'd be hard pressed to
demonstrate there's a real problem here.
So I think your patch is fine as is.
OK, pushed.
Looking at the release cycle it looks like the earliest either of
these patches will appear in a release is PG13, right?
Right.
regards, tom lane