LISTEN denial of service with aborted transaction
This morning with our production database I began receiving reports
of the database being "down".
I checked the log and was surprised to see extremely long durations for a
LISTEN that happens after each connection is made by our database library.
This coincided with many(approx 600) new connections happening in a short time
window due to render nodes automatically being turned on when the first job of
the morning was submitted(idle nodes are turned off to save power). My
initial hunch was that there was some code in postgres that resulted
exponential execution time if enough listens on new connections happened at
the same time. As I was trying to gather more information the listen times
began to decrease and after about 20 minutes things were back to normal.
A few hours later the symptoms returned but this time the listen was taking
upwards of 15 minutes. I did some more reading and checked the pg_notify
directory and found that there were 49 files. Having never checked that
directory before I wasn't sure if that was normal. A short time later I
noticed there was a process sitting idle with an aborted transaction. After
killing the process things quickly returned to normal.
After doing a search for "idle in transaction (aborted)" I came upon
http://stackoverflow.com/questions/15036438/postgres-connection-leaks-idle-in-transaction-aborted
While this is a potential solution for dealing with the problem it seems that
the postgresql developers have decided to let connections stay in the "idle in
transaction (aborted)" state for a reason, most likely under the assumption
that it's relatively safe and only eats up the resources of a single
connection. However it's easy to demonstrate that doing:
listen "abc";
begin;
select bad_column from non_existant_table;
...will eventually cause a denial of service situation if the DBA hasn't setup
guards against connection sitting idle in an aborted transaction.
Looking at the code in src/backend/commands/async.c I think there are a couple
ways to eliminate this problem.
1. When a connection issues it's first LISTEN command, in Exec_ListenPreCommit
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
this causes the connection to iterate through every notify queued in the slru,
even though at that point I believe the connection can safely ignore any
notifications from transactions that are already committed, and if I
understand correctly notifications aren't put into the slru until precommit,
so wouldn't it be safe to do:
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
inside Async_Listen? If that's not safe, then could a new member be added to
AsyncQueueControl that points to the first uncommitted QueuePosition (wouldn't
have to be kept completely up to date).
This would solve the problem of slow initial LISTEN in the face of a growing
pg_notify queue.
2. Would it be possible when a backend is signaled to check if it is idle
inside an aborted transaction, and if so process the notifications and put any
that match what the backend is listening on into a local list. This would
allow the slru to be cleaned up. In practice I think most notifications would
either be disregarded or combined with a duplicate, so the list would most
likely end up staying very small. If the backend local list does grow too
large then the connection could be killed or handled in some other appropriate
way.
I am happy to attempt coming up with a patch if the ideas are deemed
worthwhile.
Thanks,
Matt Newell
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Matt Newell <newellm@blur.com> writes:
1. When a connection issues it's first LISTEN command, in Exec_ListenPreCommit
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
this causes the connection to iterate through every notify queued in the slru,
even though at that point I believe the connection can safely ignore any
notifications from transactions that are already committed, and if I
understand correctly notifications aren't put into the slru until precommit,
so wouldn't it be safe to do:
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
inside Async_Listen?
Per the comment in Exec_ListenPreCommit, the goal here is to see entries
that have been made and not yet committed. If you don't do it like this,
then a new listener has the possibility of receiving notifications from
one transaction, while not seeing notifications from another one that
committed later, even though longer-established listeners saw outputs from
both transactions. We felt that that sort of application-visible
inconsistency would be a bad thing.
If that's not safe, then could a new member be added to
AsyncQueueControl that points to the first uncommitted QueuePosition (wouldn't
have to be kept completely up to date).
Err ... isn't that QUEUE_TAIL already? I've not studied this code in
detail recently, though.
2. Would it be possible when a backend is signaled to check if it is idle
inside an aborted transaction, and if so process the notifications and put any
that match what the backend is listening on into a local list.
Possibly. sinval catchup notification works like that, so you could maybe
invent a similar mechanism for notify. Beware that we've had to fix
performance issues with sinval catchup; you don't want to release a whole
bunch of processes to do work at the same time.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Monday, September 28, 2015 07:22:26 PM Tom Lane wrote:
Matt Newell <newellm@blur.com> writes:
1. When a connection issues it's first LISTEN command, in
Exec_ListenPreCommit QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
this causes the connection to iterate through every notify queued in the
slru, even though at that point I believe the connection can safely
ignore any notifications from transactions that are already committed,
and if I understand correctly notifications aren't put into the slru
until precommit, so wouldn't it be safe to do:
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
inside Async_Listen?Per the comment in Exec_ListenPreCommit, the goal here is to see entries
that have been made and not yet committed. If you don't do it like this,
then a new listener has the possibility of receiving notifications from
one transaction, while not seeing notifications from another one that
committed later, even though longer-established listeners saw outputs from
both transactions. We felt that that sort of application-visible
inconsistency would be a bad thing.
Right, QUEUE_HEAD can't be used, however...
If that's not safe, then could a new member be added to
AsyncQueueControl that points to the first uncommitted QueuePosition
(wouldn't have to be kept completely up to date).Err ... isn't that QUEUE_TAIL already? I've not studied this code in
detail recently, though.
QUEUE_TAIL is the oldest notification. My idea is to keep a somewhat up-to-
date pointer to the oldest uncommitted notification, which can be used as a
starting point when a connection issues it's first listen. Just as the
current code will advance a backend's pointer past any committed notifications
when it calls asyncQueueReadAllNotifications in Exec_ListenPreCommit with no
registered listeners.
I came up with a proof of concept and it appears to work as expected. With
500,000 notifications queued a listen is consistently under .5ms, while my
9.4.4 install is taking 70ms, and the speedup should be much greater on a
busy server where there is more lock contention.
Attached is the patch and the ugly test script.
2. Would it be possible when a backend is signaled to check if it is idle
inside an aborted transaction, and if so process the notifications and put
any that match what the backend is listening on into a local list.Possibly. sinval catchup notification works like that, so you could maybe
invent a similar mechanism for notify. Beware that we've had to fix
performance issues with sinval catchup; you don't want to release a whole
bunch of processes to do work at the same time.
I'll have to think about this more but i don't envision it causing such as
scenario.
The extra processing that will happen at signaling time would have been done
anyway when the aborted transaction is finally rolled back, the only extra
work is copying the relevant notifications to local memory.
Regardless it might not be worthwhile since my patch for #1 seems to address
the symptom that bit me.
Matt Newell
Attachments:
async.c.difftext/x-patch; charset=UTF-8; name=async.c.diffDownload
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 3b71174..e89ece5 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -196,12 +196,24 @@ typedef struct QueuePosition
#define QUEUE_POS_EQUAL(x,y) \
((x).page == (y).page && (x).offset == (y).offset)
+/* Returns 1 if x is larger than y, 0 if equal, else -1 */
+#define QUEUE_POS_COMPARE(x,y) \
+ (((x).page > (y).page) ? 1 : \
+ ((x).page < (y).page) ? -1 : \
+ ((x).offset > (y).offset ? 1 : ((x).offset == (y).offset ? 0 : -1)))
+
/* choose logically smaller QueuePosition */
#define QUEUE_POS_MIN(x,y) \
(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
+/* choose logically smaller QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+ (x).page != (y).page ? (x) : \
+ (x).offset < (y).offset ? (y) : (x))
+
/*
* Struct describing a listening backend's status
*/
@@ -217,12 +229,13 @@ typedef struct QueueBackendStatus
* The AsyncQueueControl structure is protected by the AsyncQueueLock.
*
* When holding the lock in SHARED mode, backends may only inspect their own
- * entries as well as the head and tail pointers. Consequently we can allow a
- * backend to update its own record while holding only SHARED lock (since no
- * other backend will inspect it).
+ * entries as well as the head, tail, and firstUncommitted pointers.
+ * Consequently we can allow a backend to update its own record while holding
+ * only SHARED lock (since no other backend will inspect it).
*
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
- * of other backends and also change the head and tail pointers.
+ * of other backends and also change the head, tail and firstUncommitted
+ * pointers.
*
* In order to avoid deadlocks, whenever we need both locks, we always first
* get AsyncQueueLock and then AsyncCtlLock.
@@ -230,12 +243,23 @@ typedef struct QueueBackendStatus
* Each backend uses the backend[] array entry with index equal to its
* BackendId (which can range from 1 to MaxBackends). We rely on this to make
* SendProcSignal fast.
+ *
+ * In case a long running transaction is causing the size of the queue to grow
+ * we keep track of the first uncommitted transaction to prevent a
+ * connection from having to process a lot of old notifications when it
+ * issues it's first listen. To facilitate this we allow one process
+ * at a time to advance firstUncommitted by using advancingFirstUncommitted
+ * as a mutex.
*/
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* the global tail is equivalent to the tail
* of the "slowest" backend */
+
+ QueuePosition firstUncommitted;
+ int32 advancingFirstUncommitted; /* Backend ID */
+
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -245,6 +269,8 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
+#define QUEUE_FIRST_UNCOMMITTED (asyncQueueControl->firstUncommitted)
+#define QUEUE_ADVANCING_FIRST_UNCOMMITTED (asyncQueueControl->advancingFirstUncommitted)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
@@ -376,6 +402,7 @@ static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
+ volatile QueuePosition *firstUncommitted,
QueuePosition stop,
char *page_buffer);
static void asyncQueueAdvanceTail(void);
@@ -455,6 +482,9 @@ AsyncShmemInit(void)
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+ SET_QUEUE_POS(QUEUE_FIRST_UNCOMMITTED, 0, 0);
+ QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+
asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
for (i = 0; i <= MaxBackends; i++)
@@ -935,10 +965,12 @@ Exec_ListenPreCommit(void)
* doesn't hurt.
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+ QUEUE_BACKEND_POS(MyBackendId) = QUEUE_FIRST_UNCOMMITTED;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
LWLockRelease(AsyncQueueLock);
+ elog(DEBUG1, "Listener registered, queue position is: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+
/* Now we are listed in the global array, so remember we're listening */
amRegisteredListener = true;
@@ -1703,7 +1735,9 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition oldpos;
QueuePosition head;
+ QueuePosition firstUncommitted;
bool advanceTail;
+ bool advanceFirstUncommitted;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -1718,6 +1752,18 @@ asyncQueueReadAllNotifications(void)
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
+
+ if (!QUEUE_POS_EQUAL(pos,head) && /* If we aren't bailing out early */
+ (!QUEUE_ADVANCING_FIRST_UNCOMMITTED ||
+ QUEUE_BACKEND_PID(QUEUE_ADVANCING_FIRST_UNCOMMITTED) == InvalidPid))
+ {
+ advanceFirstUncommitted = true;
+ QUEUE_ADVANCING_FIRST_UNCOMMITTED = MyBackendId;
+ firstUncommitted = QUEUE_FIRST_UNCOMMITTED;
+ elog(DEBUG1, "Attempting to advance first uncommitted from: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+ } else
+ advanceFirstUncommitted = false;
+
LWLockRelease(AsyncQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
@@ -1812,8 +1858,9 @@ asyncQueueReadAllNotifications(void)
* rewrite pages under us. Especially we don't want to hold a lock
* while sending the notifications to the frontend.
*/
- reachedStop = asyncQueueProcessPageEntries(&pos, head,
- page_buffer.buf);
+ reachedStop = asyncQueueProcessPageEntries(&pos,
+ advanceFirstUncommitted ? &firstUncommitted : 0,
+ head, page_buffer.buf);
} while (!reachedStop);
}
PG_CATCH();
@@ -1822,6 +1869,11 @@ asyncQueueReadAllNotifications(void)
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+ if (advanceFirstUncommitted) {
+ QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(QUEUE_FIRST_UNCOMMITTED, firstUncommitted);
+ QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+ elog(DEBUG1, "Advanced first uncommitted to: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+ }
LWLockRelease(AsyncQueueLock);
/* If we were the laziest backend, try to advance the tail pointer */
@@ -1836,6 +1888,11 @@ asyncQueueReadAllNotifications(void)
LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+ if (advanceFirstUncommitted) {
+ QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(QUEUE_FIRST_UNCOMMITTED, firstUncommitted);
+ QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+ elog(DEBUG1, "Advanced first uncommitted to: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+ }
LWLockRelease(AsyncQueueLock);
/* If we were the laziest backend, try to advance the tail pointer */
@@ -1861,6 +1918,7 @@ asyncQueueReadAllNotifications(void)
*/
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
+ volatile QueuePosition *firstUncommitted,
QueuePosition stop,
char *page_buffer)
{
@@ -1928,6 +1986,29 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
*/
}
}
+ else if(firstUncommitted && QUEUE_POS_COMPARE(*firstUncommitted,thisentry) <= 0)
+ {
+ /*
+ * If we are trying to advance firstUncommitted we need
+ * to check for live transactions at every queue position.
+ */
+ if(qe->dboid != InvalidOid && TransactionIdIsInProgress(qe->xid))
+ {
+ /*
+ * We hit an uncommitted transaction so there is no possibility
+ * to further advance firstUncommitted
+ */
+ firstUncommitted = 0;
+ }
+ }
+
+ /*
+ * If we got to here with a valid firstUncommitted pointer then
+ * we know that all transactions up to thisentry are committed.
+ * If *firstUncommitted matches thisentry then we can advance.
+ */
+ if (firstUncommitted && QUEUE_POS_EQUAL(*firstUncommitted,thisentry))
+ *firstUncommitted = *current;
/* Loop back if we're not at end of page */
} while (!reachedEndOfPage);
@@ -1960,6 +2041,7 @@ asyncQueueAdvanceTail(void)
}
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
QUEUE_TAIL = min;
+ QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(min, QUEUE_FIRST_UNCOMMITTED);
LWLockRelease(AsyncQueueLock);
/*
Matt Newell <newellm@blur.com> writes:
On Monday, September 28, 2015 07:22:26 PM Tom Lane wrote:
Possibly. sinval catchup notification works like that, so you could maybe
invent a similar mechanism for notify. Beware that we've had to fix
performance issues with sinval catchup; you don't want to release a whole
bunch of processes to do work at the same time.
I'll have to think about this more but i don't envision it causing such as
scenario.
The extra processing that will happen at signaling time would have been done
anyway when the aborted transaction is finally rolled back, the only extra
work is copying the relevant notifications to local memory.
Hm. An idle-in-transaction listening backend will eventually cause a more
serious form of denial of service: once the pg_notify SLRU area fills up
to its maximum of 8GB, no more new messages can be inserted, and every
transaction that tries to do a NOTIFY will abort. However, that's a
documented hazard and we've not really had complaints about it. In any
case, trying to fix that by having such a backend absorb messages into
local memory doesn't seem like a great idea: if it sits for long enough,
its local memory usage will bloat. Eventually you'd have a failure
anyway.
Regardless it might not be worthwhile since my patch for #1 seems to address
the symptom that bit me.
I looked at this patch for a bit and found it kind of ugly, particularly
the business about "we allow one process at a time to advance
firstUncommitted by using advancingFirstUncommitted as a mutex".
That doesn't sound terribly safe or performant.
After further thought, I wonder whether instead of having an incoming
listener initialize its "pos" to QUEUE_TAIL, we couldn't have it
initialize to the maximum "pos" among existing listeners (with a floor of
QUEUE_TAIL of course). If any other listener has advanced over a given
message X, then X was committed (or aborted) earlier and the new listener
is not required to return it; so it should be all right to take that
listener's "pos" as a starting point.
The minimum-code-change way to do that would be to compute the max pos
the hard way while Exec_ListenPreCommit holds the AsyncQueueLock, which
it would now have to do in exclusive mode. That's a little bit annoying
but it's surely not much worse than what SignalBackends has to do after
every notification.
Alternatively, we could try to maintain a shared pointer that is always
equal to the frontmost backend's "pos". But I think that would require
more locking during queue reading than exists now; so it's far from clear
it would be a win, especially in systems where listening sessions last for
a reasonable amount of time (so that Exec_ListenPreCommit is infrequent).
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tuesday, September 29, 2015 09:58:35 PM Tom Lane wrote:
Matt Newell <newellm@blur.com> writes:
On Monday, September 28, 2015 07:22:26 PM Tom Lane wrote:
Possibly. sinval catchup notification works like that, so you could
maybe
invent a similar mechanism for notify. Beware that we've had to fix
performance issues with sinval catchup; you don't want to release a whole
bunch of processes to do work at the same time.I'll have to think about this more but i don't envision it causing such as
scenario.
The extra processing that will happen at signaling time would have been
done anyway when the aborted transaction is finally rolled back, the only
extra work is copying the relevant notifications to local memory.Hm. An idle-in-transaction listening backend will eventually cause a more
serious form of denial of service: once the pg_notify SLRU area fills up
to its maximum of 8GB, no more new messages can be inserted, and every
transaction that tries to do a NOTIFY will abort. However, that's a
documented hazard and we've not really had complaints about it. In any
case, trying to fix that by having such a backend absorb messages into
local memory doesn't seem like a great idea: if it sits for long enough,
its local memory usage will bloat. Eventually you'd have a failure
anyway.Regardless it might not be worthwhile since my patch for #1 seems to
address the symptom that bit me.I looked at this patch for a bit and found it kind of ugly, particularly
the business about "we allow one process at a time to advance
firstUncommitted by using advancingFirstUncommitted as a mutex".
That doesn't sound terribly safe or performant.
It can be implemented without that but then you'll have multiple backends
trying to do the same work. This might not be an issue at all but I couldn't
tell at first glance the potential cost of extra calls to
TransactionIdIsInProgress. Since there are no extra locks taken I figured
ensuring the work is only done once is good for performance.
If the cluster only has one database generating notifies then there is
practically no extra work with the patch.
As far as safety is concerned I think the worst case is that behavior returns
to what it is now, where firstUncommitted ends up tracking QUEUE_TAIL.
I originally used a boolean then realized I could get some extra safety by
using the backendId so that the mutex would be released automatically if the
backend dies.
After further thought, I wonder whether instead of having an incoming
listener initialize its "pos" to QUEUE_TAIL, we couldn't have it
initialize to the maximum "pos" among existing listeners (with a floor of
QUEUE_TAIL of course). If any other listener has advanced over a given
message X, then X was committed (or aborted) earlier and the new listener
is not required to return it; so it should be all right to take that
listener's "pos" as a starting point.
That's a great idea and I will try it. It does need to be the maximum
position of existing listeners *with matching db oid" since
asyncQueueReadAllNotifications doesn't check transaction status if the db oid
doesn't match it's own. Another advantage of this approach is that a queued
notify from an open transaction in one database won't incur additional cost on
listens coming from other databases, whereas with my patch such a situation
will prevent firstUncommitted from advancing.
The minimum-code-change way to do that would be to compute the max pos
the hard way while Exec_ListenPreCommit holds the AsyncQueueLock, which
it would now have to do in exclusive mode. That's a little bit annoying
but it's surely not much worse than what SignalBackends has to do after
every notification.
Yeah I think it's going to be a win regardless. Could also skip the whole
thing if QUEUE_HEAD - QUEUE_TAIL is under a fixed amount, which would probably
cover a lot of use cases.
Alternatively, we could try to maintain a shared pointer that is always
equal to the frontmost backend's "pos". But I think that would require
more locking during queue reading than exists now; so it's far from clear
it would be a win, especially in systems where listening sessions last for
a reasonable amount of time (so that Exec_ListenPreCommit is infrequent).
And that would be even more complicated since it has to be per db oid.
Matt Newell
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
After further thought, I wonder whether instead of having an incoming
listener initialize its "pos" to QUEUE_TAIL, we couldn't have it
initialize to the maximum "pos" among existing listeners (with a floor of
QUEUE_TAIL of course). If any other listener has advanced over a given
message X, then X was committed (or aborted) earlier and the new listener
is not required to return it; so it should be all right to take that
listener's "pos" as a starting point.That's a great idea and I will try it. It does need to be the maximum
position of existing listeners *with matching db oid" since
asyncQueueReadAllNotifications doesn't check transaction status if the db
oid doesn't match it's own. Another advantage of this approach is that a
queued notify from an open transaction in one database won't incur
additional cost on listens coming from other databases, whereas with my
patch such a situation will prevent firstUncommitted from advancing.
Patch attached works great. I added the dboid to the QueueBackendStatus
struct but that might not be needed if there is an easy and fast way to get a
db oid from a backend pid.
I also skip the max pos calculation loop if QUEUE_HEAD is on the same page as
QUEUE_TAIL, with the thought that it's not desirable to increase the time that
AsyncQueueLock is held if the queue is small and
asyncQueueReadAllNotifications will execute quickly.
I then noticed that we are taking the same lock twice in a row to read the
same values, first in Exec_ListenPreCommit then in
asyncQueueReadAllNotifications, and much of the time the latter will simply
return because pos will already be at QUEUE_HEAD. I prepared a second patch
that splits asyncQueueReadAllNotifications. Exec_ListPreCommit then only calls
the worker version only when needed. This avoids the duplicate lock.
Thanks,
Matt Newell
Attachments:
fix_slow_listen.patchtext/x-patch; charset=UTF-8; name=fix_slow_listen.patchDownload
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 91baede..58682dc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -202,6 +202,12 @@ typedef struct QueuePosition
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+ (x).page != (y).page ? (x) : \
+ (x).offset < (y).offset ? (y) : (x))
+
/*
* Struct describing a listening backend's status
*/
@@ -209,6 +215,7 @@ typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
QueuePosition pos; /* backend has read queue up to here */
+ Oid dboid; /* backend's database OID */
} QueueBackendStatus;
/*
@@ -248,6 +255,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
/*
* The SLRU buffer area through which we access the notification queue
@@ -461,6 +469,7 @@ AsyncShmemInit(void)
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
+ QUEUE_BACKEND_DBOID(i) = InvalidOid;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@@ -907,6 +916,9 @@ AtCommit_Notify(void)
static void
Exec_ListenPreCommit(void)
{
+ QueuePosition max;
+ int i;
+
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
@@ -936,7 +948,25 @@ Exec_ListenPreCommit(void)
* doesn't hurt.
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+ max = QUEUE_TAIL;
+ /*
+ * If there is over a page of notifications queued, then we should find
+ * the maximum position of all backends connected to our database, to
+ * avoid having to process all of the notifications that belong to already
+ * committed transactions that we will disregard anyway. This solves
+ * a significant performance problem with listen when there is a long
+ * running transaction
+ */
+ if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(QUEUE_HEAD) )
+ {
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ if (QUEUE_BACKEND_PID(i) != InvalidPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ }
+ }
+ QUEUE_BACKEND_POS(MyBackendId) = max;
+ QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
LWLockRelease(AsyncQueueLock);
@@ -1156,6 +1186,7 @@ asyncQueueUnregister(void)
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+ QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */
avoid_redundant_lock.patchtext/x-patch; charset=UTF-8; name=avoid_redundant_lock.patchDownload
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 58682dc..e2f942f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -384,6 +384,8 @@ static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
+static void asyncQueueReadAllNotificationsWorker(volatile QueuePosition pos,
+ QueuePosition head);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer);
@@ -917,6 +919,7 @@ static void
Exec_ListenPreCommit(void)
{
QueuePosition max;
+ QueuePosition head;
int i;
/*
@@ -949,6 +952,7 @@ Exec_ListenPreCommit(void)
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
max = QUEUE_TAIL;
+ head = QUEUE_HEAD;
/*
* If there is over a page of notifications queued, then we should find
* the maximum position of all backends connected to our database, to
@@ -957,7 +961,7 @@ Exec_ListenPreCommit(void)
* a significant performance problem with listen when there is a long
* running transaction
*/
- if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(QUEUE_HEAD) )
+ if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(head) )
{
for (i = 1; i <= MaxBackends; i++)
{
@@ -983,7 +987,10 @@ Exec_ListenPreCommit(void)
*
* This will also advance the global tail pointer if possible.
*/
- asyncQueueReadAllNotifications();
+ if (!QUEUE_POS_EQUAL(max,head))
+ {
+ asyncQueueReadAllNotificationsWorker(max,head);
+ }
}
/*
@@ -1732,23 +1739,14 @@ ProcessNotifyInterrupt(void)
static void
asyncQueueReadAllNotifications(void)
{
- volatile QueuePosition pos;
- QueuePosition oldpos;
+ QueuePosition pos;
QueuePosition head;
- bool advanceTail;
-
- /* page_buffer must be adequately aligned, so use a union */
- union
- {
- char buf[QUEUE_PAGESIZE];
- AsyncQueueEntry align;
- } page_buffer;
/* Fetch current state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
- pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+ pos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
LWLockRelease(AsyncQueueLock);
@@ -1757,6 +1755,24 @@ asyncQueueReadAllNotifications(void)
/* Nothing to do, we have read all notifications already. */
return;
}
+ asyncQueueReadAllNotificationsWorker(pos, head);
+}
+
+static void
+asyncQueueReadAllNotificationsWorker(volatile QueuePosition pos,
+ QueuePosition head)
+{
+ QueuePosition oldpos;
+ bool advanceTail;
+
+ /* page_buffer must be adequately aligned, so use a union */
+ union
+ {
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+ } page_buffer;
+
+ oldpos = pos;
/*----------
* Note that we deliver everything that we see in the queue and that
Matt Newell <newellm@blur.com> writes:
Patch attached works great. I added the dboid to the QueueBackendStatus
struct but that might not be needed if there is an easy and fast way to get a
db oid from a backend pid.
Not particularly; I agree with adding it to this data structure. One
reason is it makes the array stride a power of 2, so array accesses may be
a shade cheaper than before.
I also skip the max pos calculation loop if QUEUE_HEAD is on the same page as
QUEUE_TAIL, with the thought that it's not desirable to increase the time that
AsyncQueueLock is held if the queue is small and
asyncQueueReadAllNotifications will execute quickly.
Check.
There were a couple of minor bugs in this, but I fixed them and committed
it.
I then noticed that we are taking the same lock twice in a row to read the
same values, first in Exec_ListenPreCommit then in
asyncQueueReadAllNotifications, and much of the time the latter will simply
return because pos will already be at QUEUE_HEAD. I prepared a second patch
that splits asyncQueueReadAllNotifications. Exec_ListPreCommit then only calls
the worker version only when needed. This avoids the duplicate lock.
I was unimpressed with this; the refactoring of asyncQueueReadAllNotifications
seemed messy and definitely was undocumented. I think the main actual
value is to skip doing anything during Exec_ListenPreCommit when
max == head, so I extracted that part and included it.
regards, tom lane
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers