[PATCH] OAuth: fix performance bug with stuck multiplexer events
Hi all,
The current implementation of the OAuth flow is overly eager to signal
readiness when there's nothing to do, so sometimes we burn CPU for no
reason. This was discussed briefly back in [1]/messages/by-id/CAOYmi+n4EDOOUL27_OqYT2-F2rS6S+3mK-ppWb2Ec92UEoUbYA@mail.gmail.com, but I'll summarize
here so no one has to dig through that megathread.
= Background =
A major interaction between libpq and libcurl happens through a
callback -- register_socket() in oauth-curl.c -- which links the
sockets that Curl cares about into a single descriptor that our
clients can wait on. This descriptor, which is either an epoll set or
a kqueue, is called the multiplexer.
Curl has wide latitude to make this interaction as simple or as
complicated as it wants, and the complexity additionally depends on
the transport in use. For a simple HTTP request/response with older
Curls, the callback order might look like
- wait for reads on fd 5
- done with fd 5
But for a modern dual-stack system speaking HTTPS, you might get something like
- wait for writes on fd 6, time out after 200ms
- wait for writes on fd 7
- done with fd 6, cancel the timeout
- wait for reads only on fd 7, time out after five minutes
- wait for both reads and writes on fd 7
...
- done with fd 7, cancel the timeout
= Getting Stuck =
The Linux/epoll implementation handles the latter case well (with one
caveat, discussed later), but with the BSD/kqueue implementation, the
multiplexer can get "stuck open" on stale events that Curl no longer
cares about, and clients of libpq can loop on PQconnectPoll()
pointlessly.
The flow still works -- there's no bug in functionality, just in
performance -- but that also makes it hard to detect in tests. And for
simpler network cases (the ones in our tests), the bug is nearly
invisible, because Curl will unregister descriptors and clear the
stuck events before they have any real effect.
0001, attached, fixes this by counting the number of outstanding event
registrations and draining up to that many events from the kqueue
after libcurl gives control back to us. This ensures that any stale
events will be knocked back down. (This design was originally
suggested by Thomas Munro in a related conversation about kqueue
corner cases; thanks Thomas!)
If you follow the flow from start to finish, you may notice the
following oddity: I took great pains not to track which events had
been set, instead telling libcurl "something happened, figure it out"
-- and now in this patchset I'm pulling all of those events off in a
big wave only to discard them. As punishment for trying to keep the
epoll side of things simple, I'm now having to add unnecessary
complexity to the kqueue side. Seems like I'll eventually need to
balance the two out again.
= Timeouts =
There is a similar "stale event" bug for the timer, with both epoll
and kqueue: if Curl does not disable a timeout explicitly after it
fires, that timer's descriptor will simply remain signaled until
something else happens to set it.
Unfortunately, we can't solve this bug in the same place as 0001. If
we clear stale timers after calling into Curl, that would introduce a
race: Curl may have set a short timeout that has already passed, and
we need that event to remain signaled so that the client will call us
back.
So expired timers have to be cleared _before_ calling back into Curl,
which I have done in 0002. As part of that, I've strengthened the
timer_expired() API so that it always returns false when the timer is
disabled. It was previously a don't-care, since we only called it when
we knew the timer must be running, so the kqueue and epoll
implementations used to give different answers.
= Testing =
It was hard to notice any of this with the current tests, because
they're mostly focused on ensuring that the flow works, not on
internal efficiency. I tried to find a way to internally detect when a
file descriptor had gone stale by accident, but I think it's hard to
do that without false positives if we don't know what Curl is doing.
In the end, I decided to test a heuristic: we expect roughly 5-15
calls for these simple localhost flows, so if we see something more
than an order of magnitude above that, something is wrong. (When "bad
behavior" happens in the CI, I see hundreds or thousands of calls.)
This is implemented in 0003 by simply counting the number of calls and
printing them at the end of the flow in debug mode.
I strengthened a bunch of postconditions in the first two patches, so
to try to prevent decay (and keep other maintainers from having to
remember all this), I've added a unit test executable that pins many
internal expectations. It's essentially a version of the libpq-oauth
plugin which has been given a main() function and TAP macros. This is
done in 0004.
I feel confident that these new tests are useful, because:
- The new unit tests caught that my initial register_socket attempt,
written on macOS, did not work on the other BSDs. (EV_RECEIPT works
differently there.)
- If I revert the changes to register_socket, the new unit tests fail
on all BSDs in the CI, and the call-count test fails on NetBSD and
OpenBSD.
- If I comment out the new call to drain_timer_events(), the
call-count test fails on macOS.
- If I revert the changes to timer_expired, the new unit tests fail on
Linux. (There are no user-visible effects of that change, because it
doesn't lead to more calls.)
Unfortunately, if I just comment out the new call to
drain_socket_events(), nothing fails in the CI. :( But my personal
test suite (which makes use of TLS plus dual-stack sockets) does
notice the regression in the call count immediately, with some tests
using tens of thousands of calls for a single client flow without the
fix. I hope that's good enough for now, because I'm not sure if I can
safely push the additional TLS infrastructure into the oauth_validator
tests for 18.
My plan, if this code seems reasonable, is to backport 0001-0003, but
keep the larger 0004 on HEAD only until it has proven to be stable.
It's a big new suite and I want to make sure it's not flapping on some
buildfarm animal. Eventually I'll backport that too.
WDYT?
Thanks,
--Jacob
[1]: /messages/by-id/CAOYmi+n4EDOOUL27_OqYT2-F2rS6S+3mK-ppWb2Ec92UEoUbYA@mail.gmail.com
Attachments:
0001-oauth-Remove-stale-events-from-the-kqueue-multiplexe.patchapplication/octet-stream; name=0001-oauth-Remove-stale-events-from-the-kqueue-multiplexe.patchDownload
From fd9660170073fd86da033608f52c604b9664ae91 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH 1/4] oauth: Remove stale events from the kqueue multiplexer
If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().
In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.
Implement drain_socket_events() to call kevent() and unstick any stale
events. This is called right after drive_request(), before we return
control to the client to wait. To make sure we've taken a look at the
entire queue, register_socket() now tracks the number of outstanding
registrations.
Suggested-by: Thomas Munro <thomas.munro@gmail.com>
---
src/interfaces/libpq-oauth/oauth-curl.c | 218 ++++++++++++++++++------
1 file changed, 166 insertions(+), 52 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..8430356cfb5 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,10 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+
+#if defined(HAVE_SYS_EVENT_H)
+ int nevents; /* how many events are we waiting on? */
+#endif
};
/*
@@ -1291,41 +1295,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return 0;
#elif defined(HAVE_SYS_EVENT_H)
- struct kevent ev[2] = {0};
+ struct kevent ev[2];
struct kevent ev_out[2];
struct timespec timeout = {0};
- int nev = 0;
+ int nev;
int res;
+ /*
+ * First, any existing registrations for this socket need to be removed,
+ * both to track the outstanding number of events, and to ensure that
+ * we're not woken up for things that Curl no longer cares about.
+ *
+ * ENOENT is okay, but we have to track how many we get, so use
+ * EV_RECEIPT.
+ */
+ nev = 0;
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
+
+ Assert(nev <= lengthof(ev));
+ Assert(nev <= lengthof(ev_out));
+
+ res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
+ if (res < 0)
+ {
+ actx_error(actx, "could not delete from kqueue: %m");
+ return -1;
+ }
+
+ /*
+ * We can't use the simple errno version of kevent, because we need to
+ * skip over ENOENT while still allowing a second change to be processed.
+ * So we need a longer-form error checking loop.
+ */
+ for (int i = 0; i < res; ++i)
+ {
+ /*
+ * EV_RECEIPT should guarantee one EV_ERROR result for every change,
+ * whether successful or not. Failed entries contain a non-zero errno
+ * in the data field.
+ */
+ Assert(ev_out[i].flags & EV_ERROR);
+
+ errno = ev_out[i].data;
+ if (!errno)
+ {
+ /* Successfully removed; update the event count. */
+ Assert(actx->nevents > 0);
+ actx->nevents--;
+ }
+ else if (errno != ENOENT)
+ {
+ actx_error(actx, "could not delete from kqueue: %m");
+ return -1;
+ }
+ }
+
+ /* If we're only removing registrations, we're done. */
+ if (what == CURL_POLL_REMOVE)
+ return 0;
+
+ /*
+ * Now add the new filters. This is more straightfoward than deletion.
+ *
+ * Combining this kevent() call with the one above seems like it should be
+ * theoretically possible, but beware that not all BSDs keep the original
+ * event flags when using EV_RECEIPT, so it's tricky to figure out which
+ * operations succeeded. For now we keep the deletions and the additions
+ * separate.
+ */
+ nev = 0;
+
switch (what)
{
case CURL_POLL_IN:
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
nev++;
break;
case CURL_POLL_OUT:
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
nev++;
break;
case CURL_POLL_INOUT:
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
- nev++;
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
- nev++;
- break;
-
- case CURL_POLL_REMOVE:
-
- /*
- * We don't know which of these is currently registered, perhaps
- * both, so we try to remove both. This means we need to tolerate
- * ENOENT below.
- */
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
nev++;
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
nev++;
break;
@@ -1334,45 +1392,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return -1;
}
- res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+ Assert(nev <= lengthof(ev));
+
+ res = kevent(actx->mux, ev, nev, NULL, 0, NULL);
if (res < 0)
{
actx_error(actx, "could not modify kqueue: %m");
return -1;
}
+ /* Update the event count, and we're done. */
+ actx->nevents += nev;
+
+ return 0;
+#else
+#error register_socket is not implemented on this platform
+#endif
+}
+
+/*-------
+ * Drains any stale level-triggered events out of the multiplexer. This is
+ * necessary only if the mux implementation requires it.
+ *
+ * As an example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ * client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ * and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ * The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+drain_socket_events(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+ /* The epoll implementation doesn't need to drain pending events. */
+ return true;
+#elif defined(HAVE_SYS_EVENT_H)
+ struct timespec timeout = {0};
+ struct kevent *drain;
+ int drain_len;
+
/*
- * We can't use the simple errno version of kevent, because we need to
- * skip over ENOENT while still allowing a second change to be processed.
- * So we need a longer-form error checking loop.
+ * Drain the events in one call, rather than looping. (We could maybe call
+ * kevent() drain_len times, instead of allocating space for the maximum
+ * number of events, but that relies on the events being in FIFO order to
+ * avoid starvation. The kqueue man pages don't seem to make any
+ * guarantees about that.)
+ *
+ * register_socket() keeps actx->nevents updated with the number of
+ * outstanding event filters. We don't track the registration of the
+ * timer; we just assume one could be registered here.
*/
- for (int i = 0; i < res; ++i)
+ drain_len = actx->nevents + 1;
+
+ drain = malloc(sizeof(*drain) * drain_len);
+ if (!drain)
{
- /*
- * EV_RECEIPT should guarantee one EV_ERROR result for every change,
- * whether successful or not. Failed entries contain a non-zero errno
- * in the data field.
- */
- Assert(ev_out[i].flags & EV_ERROR);
+ actx_error(actx, "out of memory");
+ return false;
+ }
- errno = ev_out[i].data;
- if (errno && errno != ENOENT)
- {
- switch (what)
- {
- case CURL_POLL_REMOVE:
- actx_error(actx, "could not delete from kqueue: %m");
- break;
- default:
- actx_error(actx, "could not add to kqueue: %m");
- }
- return -1;
- }
+ /*
+ * Discard all pending events. Since our registrations are level-triggered
+ * (even the timer, since we use a chained kqueue for that instead of an
+ * EVFILT_TIMER on the top-level mux!), any events that we still need will
+ * remain signalled, and the stale ones will be swept away.
+ */
+ if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0)
+ {
+ actx_error(actx, "could not drain kqueue: %m");
+ free(drain);
+ return false;
}
- return 0;
+ free(drain);
+ return true;
#else
-#error register_socket is not implemented on this platform
+#error drain_socket_events is not implemented on this platform
#endif
}
@@ -1441,7 +1545,8 @@ set_timer(struct async_ctx *actx, long timeout)
* macOS.)
*
* If there was no previous timer set, the kevent calls will result in
- * ENOENT, which is fine.
+ * ENOENT, which is fine. (We don't track actx->nevents for this case;
+ * instead, drain_socket_events() just assumes a timer could be set.)
*/
EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0);
if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT)
@@ -2755,13 +2860,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
if (status == PGRES_POLLING_FAILED)
goto error_return;
- else if (status != PGRES_POLLING_OK)
- {
- /* not done yet */
- return status;
- }
+ else if (status == PGRES_POLLING_OK)
+ break; /* done! */
+
+ /*
+ * This request is still running.
+ *
+ * Drain any stale socket events from the mux before we
+ * ask the client to poll. (Currently, this can occur only
+ * with kqueue.) If this is forgotten, the multiplexer can
+ * get stuck in a signalled state and we'll burn CPU
+ * cycles pointlessly.
+ */
+ if (!drain_socket_events(actx))
+ goto error_return;
- break;
+ return status;
}
case OAUTH_STEP_WAIT_INTERVAL:
--
2.34.1
0002-oauth-Remove-expired-timers-from-the-multiplexer.patchapplication/octet-stream; name=0002-oauth-Remove-expired-timers-from-the-multiplexer.patchDownload
From d3cf69f3462e06a04b85eddf63d048cfc05c6b44 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH 2/4] oauth: Remove expired timers from the multiplexer
In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().
Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.
The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.
---
src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
1 file changed, 68 insertions(+), 40 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 8430356cfb5..78ba3399495 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1588,40 +1588,20 @@ set_timer(struct async_ctx *actx, long timeout)
/*
* Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
*/
static int
timer_expired(struct async_ctx *actx)
{
-#if defined(HAVE_SYS_EPOLL_H)
- struct itimerspec spec = {0};
-
- if (timerfd_gettime(actx->timerfd, &spec) < 0)
- {
- actx_error(actx, "getting timerfd value: %m");
- return -1;
- }
-
- /*
- * This implementation assumes we're using single-shot timers. If you
- * change to using intervals, you'll need to reimplement this function
- * too, possibly with the read() or select() interfaces for timerfd.
- */
- Assert(spec.it_interval.tv_sec == 0
- && spec.it_interval.tv_nsec == 0);
-
- /* If the remaining time to expiration is zero, we're done. */
- return (spec.it_value.tv_sec == 0
- && spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
int res;
- /* Is the timer queue ready? */
+ /* Is the timer ready? */
res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
if (res < 0)
{
- actx_error(actx, "checking kqueue for timeout: %m");
+ actx_error(actx, "checking timer expiration: %m");
return -1;
}
@@ -1653,6 +1633,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
return 0;
}
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+ int res;
+
+ res = timer_expired(actx);
+ if (res < 0)
+ return false;
+
+ if (res > 0)
+ {
+ /*
+ * Timer is expired. We could drain the event manually from the
+ * timerfd, but it's easier to simply disable it; that keeps the
+ * platform-specific code in set_timer().
+ */
+ if (!set_timer(actx, -1))
+ return false;
+ }
+
+ if (was_expired)
+ *was_expired = (res > 0);
+
+ return true;
+}
+
/*
* Prints Curl request debugging information to stderr.
*
@@ -2856,6 +2866,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
{
PostgresPollingStatusType status;
+ /*
+ * Clear any expired timeout before calling back into
+ * Curl. Curl is not guaranteed to do this for us, because
+ * its API expects us to use single-shot (i.e.
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
+ * This can't be combined with the drain_socket_events()
+ * call below: we might accidentally clear a short timeout
+ * that was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
+ goto error_return;
+
+ /* Move the request forward. */
status = drive_request(actx);
if (status == PGRES_POLLING_FAILED)
@@ -2879,24 +2905,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
}
case OAUTH_STEP_WAIT_INTERVAL:
-
- /*
- * The client application is supposed to wait until our timer
- * expires before calling PQconnectPoll() again, but that
- * might not happen. To avoid sending a token request early,
- * check the timer before continuing.
- */
- if (!timer_expired(actx))
{
- set_conn_altsock(conn, actx->timerfd);
- return PGRES_POLLING_READING;
- }
+ bool expired;
- /* Disable the expired timer. */
- if (!set_timer(actx, -1))
- goto error_return;
+ /*
+ * The client application is supposed to wait until our
+ * timer expires before calling PQconnectPoll() again, but
+ * that might not happen. To avoid sending a token request
+ * early, check the timer before continuing.
+ */
+ if (!drain_timer_events(actx, &expired))
+ goto error_return;
- break;
+ if (!expired)
+ {
+ set_conn_altsock(conn, actx->timerfd);
+ return PGRES_POLLING_READING;
+ }
+
+ break;
+ }
}
/*
--
2.34.1
0003-oauth-Track-total-call-count-during-a-client-flow.patchapplication/octet-stream; name=0003-oauth-Track-total-call-count-during-a-client-flow.patchDownload
From e96aab665c33b2a8f198fe802dae7ab410a01471 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH 3/4] oauth: Track total call count during a client flow
Tracking down the bugs that led to the addition of drain_socket_events()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.
A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++
.../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 78ba3399495..68303106a5d 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
#if defined(HAVE_SYS_EVENT_H)
int nevents; /* how many events are we waiting on? */
@@ -3074,6 +3075,8 @@ PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
PostgresPollingStatusType result;
+ fe_oauth_state *state = conn_sasl_state(conn);
+ struct async_ctx *actx;
#ifndef WIN32
sigset_t osigset;
bool sigpipe_pending;
@@ -3102,6 +3105,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
+ * To assist with finding bugs in drain_socket_events() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
+ * Be careful that state->async_ctx could be NULL if early initialization
+ * fails during the first call.
+ */
+ actx = state->async_ctx;
+ Assert(actx || result == PGRES_POLLING_FAILED);
+
+ if (actx && actx->debugging)
+ {
+ actx->dbg_num_calls++;
+ if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+ fprintf(stderr, "[libpq] total number of polls: %d\n",
+ actx->dbg_num_calls);
+ }
+
#ifndef WIN32
if (masked)
{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
);
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+ 'postgres',
+ "SELECT 'connected for call count'",
+ extra_params => ['-w'],
+ connstr => connstr(stage => 'token', retries => 2),
+ on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+ $stderr,
+ qr@Visit https://example\.com/ and enter the code: postgresuser@,
+ "call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+ # For reference, a typical flow with two retries might take between 5-15
+ # calls to the client implementation. And while this will probably continue
+ # to change across OSes and Curl updates, we're likely in trouble if we see
+ # hundreds or thousands of calls.
+ $stderr =~ $count_pattern;
+ cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
# Stress test: make sure our builtin flow operates correctly even if the client
# application isn't respecting PGRES_POLLING_READING/WRITING signals returned
# from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
connstr(stage => 'all', retries => 1, interval => 1));
note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
unlike(
--
2.34.1
0004-oauth-Add-unit-tests-for-multiplexer-handling.patchapplication/octet-stream; name=0004-oauth-Add-unit-tests-for-multiplexer-handling.patchDownload
From 36dc710b84b86931963a3fb586d7279d4548bc36 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH 4/4] oauth: Add unit tests for multiplexer handling
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
---
src/interfaces/libpq-oauth/Makefile | 14 +
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 474 +++++++++++++++++++
4 files changed, 547 insertions(+)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 270fc0cf2d9..9da8e4b7143 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..e769856c2c9
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
+ '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..e52d59863dd
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,474 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation, but it's not a required part of the API. If
+ * you've added a new implementation that doesn't have that behavior, feel free
+ * to modify this test.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is drained.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_socket_events(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is drained, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(drain_socket_events(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters to avoid false negatives. First drain the socket,
+ * then unset the timer. We're trying to catch the case where the
+ * pending timer expiration event takes the place of one of the
+ * socket events we're attempting to drain.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_socket_events(actx));
+ Assert(set_timer(actx, -1));
+
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
Hi all,
On Thu, Jun 26, 2025 at 4:33 PM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
My plan, if this code seems reasonable, is to backport 0001-0003, but
keep the larger 0004 on HEAD only until it has proven to be stable.
It's a big new suite and I want to make sure it's not flapping on some
buildfarm animal. Eventually I'll backport that too.
Any thoughts on the approach? Too big/too scary/too BSD-specific?
A small bit of self-review: a comment I wrote in the tests suggested
that the choice of readable/writable events was up to the multiplexer
implementation, but it *must* choose readable, due to the hardcoded
use of PGRES_POLLING_READING throughout the current code. Updated in
v2.
Thanks,
--Jacob
Attachments:
since-v1.diff.txttext/plain; charset=US-ASCII; name=since-v1.diff.txtDownload
1: ddb7875bb58 = 1: 379c12b5d26 oauth: Remove stale events from the kqueue multiplexer
2: a871ce498ea = 2: f30317d7265 oauth: Remove expired timers from the multiplexer
3: 5033b6d51c1 = 3: d243d28964d oauth: Track total call count during a client flow
4: f4a640f7995 ! 4: ca6fd237653 oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
-+ * here to record that expectation, but it's not a required part of the API. If
-+ * you've added a new implementation that doesn't have that behavior, feel free
-+ * to modify this test.
++ * here to record that expectation; PGRES_POLLING_READING is hardcoded
++ * throughout the flow and would need to be changed if a new multiplexer does
++ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
v2-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patchapplication/octet-stream; name=v2-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patchDownload
From 379c12b5d26c93eaf104bed128f58a7a99b1e0b4 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v2 1/4] oauth: Remove stale events from the kqueue multiplexer
If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().
In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.
Implement drain_socket_events() to call kevent() and unstick any stale
events. This is called right after drive_request(), before we return
control to the client to wait. To make sure we've taken a look at the
entire queue, register_socket() now tracks the number of outstanding
registrations.
Suggested-by: Thomas Munro <thomas.munro@gmail.com>
---
src/interfaces/libpq-oauth/oauth-curl.c | 218 ++++++++++++++++++------
1 file changed, 166 insertions(+), 52 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..8430356cfb5 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,10 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+
+#if defined(HAVE_SYS_EVENT_H)
+ int nevents; /* how many events are we waiting on? */
+#endif
};
/*
@@ -1291,41 +1295,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return 0;
#elif defined(HAVE_SYS_EVENT_H)
- struct kevent ev[2] = {0};
+ struct kevent ev[2];
struct kevent ev_out[2];
struct timespec timeout = {0};
- int nev = 0;
+ int nev;
int res;
+ /*
+ * First, any existing registrations for this socket need to be removed,
+ * both to track the outstanding number of events, and to ensure that
+ * we're not woken up for things that Curl no longer cares about.
+ *
+ * ENOENT is okay, but we have to track how many we get, so use
+ * EV_RECEIPT.
+ */
+ nev = 0;
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
+
+ Assert(nev <= lengthof(ev));
+ Assert(nev <= lengthof(ev_out));
+
+ res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
+ if (res < 0)
+ {
+ actx_error(actx, "could not delete from kqueue: %m");
+ return -1;
+ }
+
+ /*
+ * We can't use the simple errno version of kevent, because we need to
+ * skip over ENOENT while still allowing a second change to be processed.
+ * So we need a longer-form error checking loop.
+ */
+ for (int i = 0; i < res; ++i)
+ {
+ /*
+ * EV_RECEIPT should guarantee one EV_ERROR result for every change,
+ * whether successful or not. Failed entries contain a non-zero errno
+ * in the data field.
+ */
+ Assert(ev_out[i].flags & EV_ERROR);
+
+ errno = ev_out[i].data;
+ if (!errno)
+ {
+ /* Successfully removed; update the event count. */
+ Assert(actx->nevents > 0);
+ actx->nevents--;
+ }
+ else if (errno != ENOENT)
+ {
+ actx_error(actx, "could not delete from kqueue: %m");
+ return -1;
+ }
+ }
+
+ /* If we're only removing registrations, we're done. */
+ if (what == CURL_POLL_REMOVE)
+ return 0;
+
+ /*
+ * Now add the new filters. This is more straightfoward than deletion.
+ *
+ * Combining this kevent() call with the one above seems like it should be
+ * theoretically possible, but beware that not all BSDs keep the original
+ * event flags when using EV_RECEIPT, so it's tricky to figure out which
+ * operations succeeded. For now we keep the deletions and the additions
+ * separate.
+ */
+ nev = 0;
+
switch (what)
{
case CURL_POLL_IN:
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
nev++;
break;
case CURL_POLL_OUT:
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
nev++;
break;
case CURL_POLL_INOUT:
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
- nev++;
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
- nev++;
- break;
-
- case CURL_POLL_REMOVE:
-
- /*
- * We don't know which of these is currently registered, perhaps
- * both, so we try to remove both. This means we need to tolerate
- * ENOENT below.
- */
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
nev++;
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
nev++;
break;
@@ -1334,45 +1392,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return -1;
}
- res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+ Assert(nev <= lengthof(ev));
+
+ res = kevent(actx->mux, ev, nev, NULL, 0, NULL);
if (res < 0)
{
actx_error(actx, "could not modify kqueue: %m");
return -1;
}
+ /* Update the event count, and we're done. */
+ actx->nevents += nev;
+
+ return 0;
+#else
+#error register_socket is not implemented on this platform
+#endif
+}
+
+/*-------
+ * Drains any stale level-triggered events out of the multiplexer. This is
+ * necessary only if the mux implementation requires it.
+ *
+ * As an example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ * client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ * and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ * The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+drain_socket_events(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+ /* The epoll implementation doesn't need to drain pending events. */
+ return true;
+#elif defined(HAVE_SYS_EVENT_H)
+ struct timespec timeout = {0};
+ struct kevent *drain;
+ int drain_len;
+
/*
- * We can't use the simple errno version of kevent, because we need to
- * skip over ENOENT while still allowing a second change to be processed.
- * So we need a longer-form error checking loop.
+ * Drain the events in one call, rather than looping. (We could maybe call
+ * kevent() drain_len times, instead of allocating space for the maximum
+ * number of events, but that relies on the events being in FIFO order to
+ * avoid starvation. The kqueue man pages don't seem to make any
+ * guarantees about that.)
+ *
+ * register_socket() keeps actx->nevents updated with the number of
+ * outstanding event filters. We don't track the registration of the
+ * timer; we just assume one could be registered here.
*/
- for (int i = 0; i < res; ++i)
+ drain_len = actx->nevents + 1;
+
+ drain = malloc(sizeof(*drain) * drain_len);
+ if (!drain)
{
- /*
- * EV_RECEIPT should guarantee one EV_ERROR result for every change,
- * whether successful or not. Failed entries contain a non-zero errno
- * in the data field.
- */
- Assert(ev_out[i].flags & EV_ERROR);
+ actx_error(actx, "out of memory");
+ return false;
+ }
- errno = ev_out[i].data;
- if (errno && errno != ENOENT)
- {
- switch (what)
- {
- case CURL_POLL_REMOVE:
- actx_error(actx, "could not delete from kqueue: %m");
- break;
- default:
- actx_error(actx, "could not add to kqueue: %m");
- }
- return -1;
- }
+ /*
+ * Discard all pending events. Since our registrations are level-triggered
+ * (even the timer, since we use a chained kqueue for that instead of an
+ * EVFILT_TIMER on the top-level mux!), any events that we still need will
+ * remain signalled, and the stale ones will be swept away.
+ */
+ if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0)
+ {
+ actx_error(actx, "could not drain kqueue: %m");
+ free(drain);
+ return false;
}
- return 0;
+ free(drain);
+ return true;
#else
-#error register_socket is not implemented on this platform
+#error drain_socket_events is not implemented on this platform
#endif
}
@@ -1441,7 +1545,8 @@ set_timer(struct async_ctx *actx, long timeout)
* macOS.)
*
* If there was no previous timer set, the kevent calls will result in
- * ENOENT, which is fine.
+ * ENOENT, which is fine. (We don't track actx->nevents for this case;
+ * instead, drain_socket_events() just assumes a timer could be set.)
*/
EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0);
if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT)
@@ -2755,13 +2860,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
if (status == PGRES_POLLING_FAILED)
goto error_return;
- else if (status != PGRES_POLLING_OK)
- {
- /* not done yet */
- return status;
- }
+ else if (status == PGRES_POLLING_OK)
+ break; /* done! */
+
+ /*
+ * This request is still running.
+ *
+ * Drain any stale socket events from the mux before we
+ * ask the client to poll. (Currently, this can occur only
+ * with kqueue.) If this is forgotten, the multiplexer can
+ * get stuck in a signalled state and we'll burn CPU
+ * cycles pointlessly.
+ */
+ if (!drain_socket_events(actx))
+ goto error_return;
- break;
+ return status;
}
case OAUTH_STEP_WAIT_INTERVAL:
--
2.34.1
v2-0002-oauth-Remove-expired-timers-from-the-multiplexer.patchapplication/octet-stream; name=v2-0002-oauth-Remove-expired-timers-from-the-multiplexer.patchDownload
From f30317d7265bf463b0c0f6c3b92097e021761c95 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH v2 2/4] oauth: Remove expired timers from the multiplexer
In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().
Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.
The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.
---
src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
1 file changed, 68 insertions(+), 40 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 8430356cfb5..78ba3399495 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1588,40 +1588,20 @@ set_timer(struct async_ctx *actx, long timeout)
/*
* Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
*/
static int
timer_expired(struct async_ctx *actx)
{
-#if defined(HAVE_SYS_EPOLL_H)
- struct itimerspec spec = {0};
-
- if (timerfd_gettime(actx->timerfd, &spec) < 0)
- {
- actx_error(actx, "getting timerfd value: %m");
- return -1;
- }
-
- /*
- * This implementation assumes we're using single-shot timers. If you
- * change to using intervals, you'll need to reimplement this function
- * too, possibly with the read() or select() interfaces for timerfd.
- */
- Assert(spec.it_interval.tv_sec == 0
- && spec.it_interval.tv_nsec == 0);
-
- /* If the remaining time to expiration is zero, we're done. */
- return (spec.it_value.tv_sec == 0
- && spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
int res;
- /* Is the timer queue ready? */
+ /* Is the timer ready? */
res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
if (res < 0)
{
- actx_error(actx, "checking kqueue for timeout: %m");
+ actx_error(actx, "checking timer expiration: %m");
return -1;
}
@@ -1653,6 +1633,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
return 0;
}
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+ int res;
+
+ res = timer_expired(actx);
+ if (res < 0)
+ return false;
+
+ if (res > 0)
+ {
+ /*
+ * Timer is expired. We could drain the event manually from the
+ * timerfd, but it's easier to simply disable it; that keeps the
+ * platform-specific code in set_timer().
+ */
+ if (!set_timer(actx, -1))
+ return false;
+ }
+
+ if (was_expired)
+ *was_expired = (res > 0);
+
+ return true;
+}
+
/*
* Prints Curl request debugging information to stderr.
*
@@ -2856,6 +2866,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
{
PostgresPollingStatusType status;
+ /*
+ * Clear any expired timeout before calling back into
+ * Curl. Curl is not guaranteed to do this for us, because
+ * its API expects us to use single-shot (i.e.
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
+ * This can't be combined with the drain_socket_events()
+ * call below: we might accidentally clear a short timeout
+ * that was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
+ goto error_return;
+
+ /* Move the request forward. */
status = drive_request(actx);
if (status == PGRES_POLLING_FAILED)
@@ -2879,24 +2905,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
}
case OAUTH_STEP_WAIT_INTERVAL:
-
- /*
- * The client application is supposed to wait until our timer
- * expires before calling PQconnectPoll() again, but that
- * might not happen. To avoid sending a token request early,
- * check the timer before continuing.
- */
- if (!timer_expired(actx))
{
- set_conn_altsock(conn, actx->timerfd);
- return PGRES_POLLING_READING;
- }
+ bool expired;
- /* Disable the expired timer. */
- if (!set_timer(actx, -1))
- goto error_return;
+ /*
+ * The client application is supposed to wait until our
+ * timer expires before calling PQconnectPoll() again, but
+ * that might not happen. To avoid sending a token request
+ * early, check the timer before continuing.
+ */
+ if (!drain_timer_events(actx, &expired))
+ goto error_return;
- break;
+ if (!expired)
+ {
+ set_conn_altsock(conn, actx->timerfd);
+ return PGRES_POLLING_READING;
+ }
+
+ break;
+ }
}
/*
--
2.34.1
v2-0003-oauth-Track-total-call-count-during-a-client-flow.patchapplication/octet-stream; name=v2-0003-oauth-Track-total-call-count-during-a-client-flow.patchDownload
From d243d28964dd7afb28abfaf470dc7612e7235cb9 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH v2 3/4] oauth: Track total call count during a client flow
Tracking down the bugs that led to the addition of drain_socket_events()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.
A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++
.../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 78ba3399495..68303106a5d 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
#if defined(HAVE_SYS_EVENT_H)
int nevents; /* how many events are we waiting on? */
@@ -3074,6 +3075,8 @@ PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
PostgresPollingStatusType result;
+ fe_oauth_state *state = conn_sasl_state(conn);
+ struct async_ctx *actx;
#ifndef WIN32
sigset_t osigset;
bool sigpipe_pending;
@@ -3102,6 +3105,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
+ * To assist with finding bugs in drain_socket_events() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
+ * Be careful that state->async_ctx could be NULL if early initialization
+ * fails during the first call.
+ */
+ actx = state->async_ctx;
+ Assert(actx || result == PGRES_POLLING_FAILED);
+
+ if (actx && actx->debugging)
+ {
+ actx->dbg_num_calls++;
+ if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+ fprintf(stderr, "[libpq] total number of polls: %d\n",
+ actx->dbg_num_calls);
+ }
+
#ifndef WIN32
if (masked)
{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
);
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+ 'postgres',
+ "SELECT 'connected for call count'",
+ extra_params => ['-w'],
+ connstr => connstr(stage => 'token', retries => 2),
+ on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+ $stderr,
+ qr@Visit https://example\.com/ and enter the code: postgresuser@,
+ "call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+ # For reference, a typical flow with two retries might take between 5-15
+ # calls to the client implementation. And while this will probably continue
+ # to change across OSes and Curl updates, we're likely in trouble if we see
+ # hundreds or thousands of calls.
+ $stderr =~ $count_pattern;
+ cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
# Stress test: make sure our builtin flow operates correctly even if the client
# application isn't respecting PGRES_POLLING_READING/WRITING signals returned
# from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
connstr(stage => 'all', retries => 1, interval => 1));
note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
unlike(
--
2.34.1
v2-0004-oauth-Add-unit-tests-for-multiplexer-handling.patchapplication/octet-stream; name=v2-0004-oauth-Add-unit-tests-for-multiplexer-handling.patchDownload
From ca6fd237653d8de9038b8f81a2c08c882a7e5f51 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v2 4/4] oauth: Add unit tests for multiplexer handling
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
---
src/interfaces/libpq-oauth/Makefile | 14 +
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 474 +++++++++++++++++++
4 files changed, 547 insertions(+)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 270fc0cf2d9..9da8e4b7143 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..e769856c2c9
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
+ '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..1a03b0fc552
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,474 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is drained.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_socket_events(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is drained, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(drain_socket_events(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters to avoid false negatives. First drain the socket,
+ * then unset the timer. We're trying to catch the case where the
+ * pending timer expiration event takes the place of one of the
+ * socket events we're attempting to drain.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_socket_events(actx));
+ Assert(set_timer(actx, -1));
+
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
On Tue, Jul 29, 2025 at 8:52 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
On Thu, Jun 26, 2025 at 4:33 PM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:My plan, if this code seems reasonable, is to backport 0001-0003, but
keep the larger 0004 on HEAD only until it has proven to be stable.
It's a big new suite and I want to make sure it's not flapping on some
buildfarm animal. Eventually I'll backport that too.Any thoughts on the approach? Too big/too scary/too BSD-specific?
A small bit of self-review: a comment I wrote in the tests suggested
that the choice of readable/writable events was up to the multiplexer
implementation, but it *must* choose readable, due to the hardcoded
use of PGRES_POLLING_READING throughout the current code. Updated in
v2.
[FYI, I'm looking into this and planning to post a review in 1-2 days...]
On Mon, Aug 4, 2025 at 7:53 AM Thomas Munro <thomas.munro@gmail.com> wrote:
[FYI, I'm looking into this and planning to post a review in 1-2 days...]
Thanks so much!
--Jacob
On Tue, Aug 5, 2025 at 3:24 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
On Mon, Aug 4, 2025 at 7:53 AM Thomas Munro <thomas.munro@gmail.com> wrote:
[FYI, I'm looking into this and planning to post a review in 1-2 days...]
0001:
So, the problem is that poll(kqueue_fd) reports POLLIN if any events
are queued, but level-triggered events are only rechecked and possibly
cancelled if you actually call kevent(). Hmm, what if you just called
kevent() with an output array of size one?
* If it returns 0, that means it has rechecked all queued
level-triggered events and booted them all out because they are no
longer true. poll(fd) won't report POLLIN until one of them is queued
again.
* If it returns 1, then it stopped on the first level-triggered event
that it rechecked and found to be still true. Who cares if there are
more that didn't get rechecked? poll(fd) will report POLLIN either
way, and that's what you want.
On Wed, Aug 6, 2025 at 8:04 AM Thomas Munro <thomas.munro@gmail.com> wrote:
* If it returns 1, then it stopped on the first level-triggered event
that it rechecked and found to be still true. Who cares if there are
more that didn't get rechecked? poll(fd) will report POLLIN either
way, and that's what you want.
I think the weaker guarantee might be sufficient. I was trying to get
a stronger primitive in place so that we wouldn't have to worry about
it down the line, but it is a lot of code to pay...
One sharp edge of that strategy is caught by the new tests, which is
that if you call drain_socket_events() and then unset the timer, your
multiplexer is still readable until you call drain_socket_events() yet
again. At the moment, our code only ever calls those two in the
opposite order (due to the race condition pointed out in 0002); we'd
just have to keep that in mind. Maybe "drain" would no longer be the
verb to use there.
--Jacob
On Wed, Aug 6, 2025 at 9:13 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
Maybe "drain" would no longer be the
verb to use there.
I keep describing this as "combing" the queue when I talk about it in
person, so v3-0001 renames this new operation to comb_multiplexer().
And the CI (plus the more strenuous TLS tests) confirms that the
callback count is still stable with this weaker guarantee, so I've
gotten rid of the event-counting code.
Now that I'm no longer counting events, I can collapse the changes to
register_socket(). I can't revert those changes entirely, because then
we regress the case where Curl switches a socket from IN to OUT (this
is enforced by the new unit tests). But I'm not sure that the existing
comment adequately explained that fix anyway, and I didn't remember to
call it out in my initial email, so I've split it out into v3-0002.
It's much smaller.
The tests (now in 0005) have been adjusted for the new "combing"
behavior, and I've added a case to ensure that multiple stale events
are swept up by a single call to comb_multiplexer().
Thanks!
--Jacob
Attachments:
since-v2.diff.txttext/plain; charset=US-ASCII; name=since-v2.diff.txtDownload
1: 379c12b5d26 < -: ----------- oauth: Remove stale events from the kqueue multiplexer
-: ----------- > 1: c5cdccfe374 oauth: Remove stale events from the kqueue multiplexer
-: ----------- > 2: 7725e0c173b oauth: Ensure unused socket registrations are removed
2: f30317d7265 ! 3: 6ccf7a5d156 oauth: Remove expired timers from the multiplexer
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn)
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
-+ * This can't be combined with the drain_socket_events()
-+ * call below: we might accidentally clear a short timeout
-+ * that was both set and expired during the call to
++ * This can't be combined with the comb_multiplexer() call
++ * below: we might accidentally clear a short timeout that
++ * was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
3: d243d28964d ! 4: 2be993b8f07 oauth: Track total call count during a client flow
@@ Metadata
## Commit message ##
oauth: Track total call count during a client flow
- Tracking down the bugs that led to the addition of drain_socket_events()
+ Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
@@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
+ };
- #if defined(HAVE_SYS_EVENT_H)
- int nevents; /* how many events are we waiting on? */
+ /*
@@ src/interfaces/libpq-oauth/oauth-curl.c: PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
-+ * To assist with finding bugs in drain_socket_events() and
++ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
4: ca6fd237653 ! 5: 50257bf32eb oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
-+ * event is drained.
++ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ ssize_t written;
+
+ /*
-+ * Fill the pipe. Once the old writable event is drained, the mux
++ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
++ bool expired;
++
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
-+ * Order matters to avoid false negatives. First drain the socket,
-+ * then unset the timer. We're trying to catch the case where the
-+ * pending timer expiration event takes the place of one of the
-+ * socket events we're attempting to drain.
++ * Order matters, since comb_multiplexer() doesn't have to remove
++ * stale events when active events exist. Follow the call sequence
++ * used in the code: drain the timer expiration, drain the pipe,
++ * then clear the stale events.
+ */
++ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
-+ Assert(set_timer(actx, -1));
++ Assert(comb_multiplexer(actx));
+
++ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
++
++ /* Ensure comb_multiplexer() can handle multiple stale events. */
++ {
++ int rfd2,
++ wfd2;
++
++ /* Create a second local pipe. */
++ Assert(pipe(pipefd) == 0);
++ rfd2 = pipefd[0];
++ wfd2 = pipefd[1];
++
++ /* Make both rfds appear unidirectional if necessary. */
++ if (bidirectional)
++ {
++ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
++ Assert(fill_pipe(rfd2) == bidi_pipe_size);
++ }
++
++ /* Register for read events on both fds, and make them readable. */
++ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
++
++ Assert(write(wfd, "x", 1) == 1);
++ Assert(write(wfd2, "x", 1) == 1);
++
++ mux_is_ready(actx->mux, deadline, "when two fds are readable");
++
++ /*
++ * Drain both fds. comb_multiplexer() should then ensure that the
++ * mux is no longer readable.
++ */
++ Assert(drain_pipe(rfd, 1));
++ Assert(drain_pipe(rfd2, 1));
++ Assert(comb_multiplexer(actx));
++ mux_is_not_ready(actx->mux, "when two fds are drained");
++
++ /* Stop listening. */
++ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
++
++ /* Undo any unidirectional emulation. */
++ if (bidirectional)
++ {
++ Assert(drain_pipe(wfd, bidi_pipe_size));
++ Assert(drain_pipe(wfd2, bidi_pipe_size));
++ }
++
++ close(rfd2);
++ close(wfd2);
++ }
+ }
+
+ close(rfd);
v3-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patchapplication/octet-stream; name=v3-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patchDownload
From c5cdccfe374ff9d45219f705511785318833f6eb Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v3 1/5] oauth: Remove stale events from the kqueue multiplexer
If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().
In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.
Implement comb_multiplexer() to call kevent() and unstick any stale
events that would cause unnecessary callbacks. This is called right
after drive_request(), before we return control to the client to wait.
Suggested-by: Thomas Munro <thomas.munro@gmail.com>
---
src/interfaces/libpq-oauth/oauth-curl.c | 74 +++++++++++++++++++++++--
1 file changed, 68 insertions(+), 6 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..3380a17628e 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1376,6 +1376,60 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
#endif
}
+/*-------
+ * If there is no work to do on any of the descriptors in the multiplexer, then
+ * this function must ensure that the multiplexer is not readable.
+ *
+ * As a motivating example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ * client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ * and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ * The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+comb_multiplexer(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+ /* The epoll implementation doesn't hold onto stale events. */
+ return true;
+#elif defined(HAVE_SYS_EVENT_H)
+ struct timespec timeout = {0};
+ struct kevent ev;
+
+ /*
+ * Try to read a single pending event. We can actually ignore the result:
+ * either we found an event to process, in which case the multiplexer is
+ * correctly readable for that event at minimum, and it doesn't matter if
+ * there are any stale events; or we didn't find any, in which case the
+ * kernel will have discarded any stale events as it traveled to the end
+ * of the queue.
+ *
+ * Note that this depends on our registrations being level-triggered --
+ * even the timer, so we use a chained kqueue for that instead of an
+ * EVFILT_TIMER on the top-level mux. If we used edge-triggered events,
+ * this call would improperly discard them.
+ */
+ if (kevent(actx->mux, NULL, 0, &ev, 1, &timeout) < 0)
+ {
+ actx_error(actx, "could not comb kqueue: %m");
+ return false;
+ }
+
+ return true;
+#else
+#error comb_multiplexer is not implemented on this platform
+#endif
+}
+
/*
* Enables or disables the timer in the multiplexer set. The timeout value is
* in milliseconds (negative values disable the timer).
@@ -2755,13 +2809,21 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
if (status == PGRES_POLLING_FAILED)
goto error_return;
- else if (status != PGRES_POLLING_OK)
- {
- /* not done yet */
- return status;
- }
+ else if (status == PGRES_POLLING_OK)
+ break; /* done! */
- break;
+ /*
+ * This request is still running.
+ *
+ * Make sure that stale events don't cause us to come back
+ * early. (Currently, this can occur only with kqueue.) If
+ * this is forgotten, the multiplexer can get stuck in a
+ * signalled state and we'll burn CPU cycles pointlessly.
+ */
+ if (!comb_multiplexer(actx))
+ goto error_return;
+
+ return status;
}
case OAUTH_STEP_WAIT_INTERVAL:
--
2.34.1
v3-0002-oauth-Ensure-unused-socket-registrations-are-remo.patchapplication/octet-stream; name=v3-0002-oauth-Ensure-unused-socket-registrations-are-remo.patchDownload
From 7725e0c173bb42c1511e780b93d637a9b787904e Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 6 Aug 2025 15:18:08 -0700
Subject: [PATCH v3 2/5] oauth: Ensure unused socket registrations are removed
If Curl needs to switch the direction of a socket's registration (e.g.
from CURL_POLL_IN to CURL_POLL_OUT), it expects the old registration to
be discarded. For epoll, this happened via EPOLL_CTL_MOD, but for
kqueue, the old registration would remain if it was not explicitly
removed by Curl.
Explicitly remove the opposite-direction event during registrations. (If
that event doesn't exist, we'll just get an ENOENT, which will be
ignored by the same code that handles CURL_POLL_REMOVE.) A few
assertions are also added to strengthen the relationship between the
number of events added, the number of events pulled off the queue, and
the lengths of the kevent arrays.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 3380a17628e..7a87a96d2ed 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1291,22 +1291,31 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return 0;
#elif defined(HAVE_SYS_EVENT_H)
- struct kevent ev[2] = {0};
+ struct kevent ev[2];
struct kevent ev_out[2];
struct timespec timeout = {0};
int nev = 0;
int res;
+ /*
+ * We don't know which of the events is currently registered, perhaps
+ * both, so we always try to remove unneeded events. This means we need to
+ * tolerate ENOENT below.
+ */
switch (what)
{
case CURL_POLL_IN:
EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
nev++;
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
break;
case CURL_POLL_OUT:
EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
nev++;
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
break;
case CURL_POLL_INOUT:
@@ -1317,12 +1326,6 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
break;
case CURL_POLL_REMOVE:
-
- /*
- * We don't know which of these is currently registered, perhaps
- * both, so we try to remove both. This means we need to tolerate
- * ENOENT below.
- */
EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
nev++;
EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
@@ -1334,7 +1337,10 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return -1;
}
- res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+ Assert(nev <= lengthof(ev));
+ Assert(nev <= lengthof(ev_out));
+
+ res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
if (res < 0)
{
actx_error(actx, "could not modify kqueue: %m");
--
2.34.1
v3-0003-oauth-Remove-expired-timers-from-the-multiplexer.patchapplication/octet-stream; name=v3-0003-oauth-Remove-expired-timers-from-the-multiplexer.patchDownload
From 6ccf7a5d156975f8f4bf29a82cf3ef2f20d76853 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH v3 3/5] oauth: Remove expired timers from the multiplexer
In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().
Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.
The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.
---
src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
1 file changed, 68 insertions(+), 40 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 7a87a96d2ed..1c354174083 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1543,40 +1543,20 @@ set_timer(struct async_ctx *actx, long timeout)
/*
* Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
*/
static int
timer_expired(struct async_ctx *actx)
{
-#if defined(HAVE_SYS_EPOLL_H)
- struct itimerspec spec = {0};
-
- if (timerfd_gettime(actx->timerfd, &spec) < 0)
- {
- actx_error(actx, "getting timerfd value: %m");
- return -1;
- }
-
- /*
- * This implementation assumes we're using single-shot timers. If you
- * change to using intervals, you'll need to reimplement this function
- * too, possibly with the read() or select() interfaces for timerfd.
- */
- Assert(spec.it_interval.tv_sec == 0
- && spec.it_interval.tv_nsec == 0);
-
- /* If the remaining time to expiration is zero, we're done. */
- return (spec.it_value.tv_sec == 0
- && spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
int res;
- /* Is the timer queue ready? */
+ /* Is the timer ready? */
res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
if (res < 0)
{
- actx_error(actx, "checking kqueue for timeout: %m");
+ actx_error(actx, "checking timer expiration: %m");
return -1;
}
@@ -1608,6 +1588,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
return 0;
}
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+ int res;
+
+ res = timer_expired(actx);
+ if (res < 0)
+ return false;
+
+ if (res > 0)
+ {
+ /*
+ * Timer is expired. We could drain the event manually from the
+ * timerfd, but it's easier to simply disable it; that keeps the
+ * platform-specific code in set_timer().
+ */
+ if (!set_timer(actx, -1))
+ return false;
+ }
+
+ if (was_expired)
+ *was_expired = (res > 0);
+
+ return true;
+}
+
/*
* Prints Curl request debugging information to stderr.
*
@@ -2811,6 +2821,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
{
PostgresPollingStatusType status;
+ /*
+ * Clear any expired timeout before calling back into
+ * Curl. Curl is not guaranteed to do this for us, because
+ * its API expects us to use single-shot (i.e.
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
+ * This can't be combined with the comb_multiplexer() call
+ * below: we might accidentally clear a short timeout that
+ * was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
+ goto error_return;
+
+ /* Move the request forward. */
status = drive_request(actx);
if (status == PGRES_POLLING_FAILED)
@@ -2833,24 +2859,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
}
case OAUTH_STEP_WAIT_INTERVAL:
-
- /*
- * The client application is supposed to wait until our timer
- * expires before calling PQconnectPoll() again, but that
- * might not happen. To avoid sending a token request early,
- * check the timer before continuing.
- */
- if (!timer_expired(actx))
{
- set_conn_altsock(conn, actx->timerfd);
- return PGRES_POLLING_READING;
- }
+ bool expired;
- /* Disable the expired timer. */
- if (!set_timer(actx, -1))
- goto error_return;
+ /*
+ * The client application is supposed to wait until our
+ * timer expires before calling PQconnectPoll() again, but
+ * that might not happen. To avoid sending a token request
+ * early, check the timer before continuing.
+ */
+ if (!drain_timer_events(actx, &expired))
+ goto error_return;
- break;
+ if (!expired)
+ {
+ set_conn_altsock(conn, actx->timerfd);
+ return PGRES_POLLING_READING;
+ }
+
+ break;
+ }
}
/*
--
2.34.1
v3-0004-oauth-Track-total-call-count-during-a-client-flow.patchapplication/octet-stream; name=v3-0004-oauth-Track-total-call-count-during-a-client-flow.patchDownload
From 2be993b8f07e3bcb257f2bb8b579113985d4214c Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH v3 4/5] oauth: Track total call count during a client flow
Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.
A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++
.../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 1c354174083..41cd3bf9691 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
};
/*
@@ -3028,6 +3029,8 @@ PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
PostgresPollingStatusType result;
+ fe_oauth_state *state = conn_sasl_state(conn);
+ struct async_ctx *actx;
#ifndef WIN32
sigset_t osigset;
bool sigpipe_pending;
@@ -3056,6 +3059,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
+ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
+ * Be careful that state->async_ctx could be NULL if early initialization
+ * fails during the first call.
+ */
+ actx = state->async_ctx;
+ Assert(actx || result == PGRES_POLLING_FAILED);
+
+ if (actx && actx->debugging)
+ {
+ actx->dbg_num_calls++;
+ if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+ fprintf(stderr, "[libpq] total number of polls: %d\n",
+ actx->dbg_num_calls);
+ }
+
#ifndef WIN32
if (masked)
{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
);
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+ 'postgres',
+ "SELECT 'connected for call count'",
+ extra_params => ['-w'],
+ connstr => connstr(stage => 'token', retries => 2),
+ on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+ $stderr,
+ qr@Visit https://example\.com/ and enter the code: postgresuser@,
+ "call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+ # For reference, a typical flow with two retries might take between 5-15
+ # calls to the client implementation. And while this will probably continue
+ # to change across OSes and Curl updates, we're likely in trouble if we see
+ # hundreds or thousands of calls.
+ $stderr =~ $count_pattern;
+ cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
# Stress test: make sure our builtin flow operates correctly even if the client
# application isn't respecting PGRES_POLLING_READING/WRITING signals returned
# from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
connstr(stage => 'all', retries => 1, interval => 1));
note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
unlike(
--
2.34.1
v3-0005-oauth-Add-unit-tests-for-multiplexer-handling.patchapplication/octet-stream; name=v3-0005-oauth-Add-unit-tests-for-multiplexer-handling.patchDownload
From 50257bf32eb2b0972e5139ac4a79367372c77385 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v3 5/5] oauth: Add unit tests for multiplexer handling
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
---
src/interfaces/libpq-oauth/Makefile | 14 +
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 527 +++++++++++++++++++
4 files changed, 600 insertions(+)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 682f17413b3..e73573694b9 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..e769856c2c9
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
+ '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..8263aff2f4a
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,527 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ bool expired;
+
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters, since comb_multiplexer() doesn't have to remove
+ * stale events when active events exist. Follow the call sequence
+ * used in the code: drain the timer expiration, drain the pipe,
+ * then clear the stale events.
+ */
+ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+
+ /* Ensure comb_multiplexer() can handle multiple stale events. */
+ {
+ int rfd2,
+ wfd2;
+
+ /* Create a second local pipe. */
+ Assert(pipe(pipefd) == 0);
+ rfd2 = pipefd[0];
+ wfd2 = pipefd[1];
+
+ /* Make both rfds appear unidirectional if necessary. */
+ if (bidirectional)
+ {
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+ Assert(fill_pipe(rfd2) == bidi_pipe_size);
+ }
+
+ /* Register for read events on both fds, and make them readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
+
+ Assert(write(wfd, "x", 1) == 1);
+ Assert(write(wfd2, "x", 1) == 1);
+
+ mux_is_ready(actx->mux, deadline, "when two fds are readable");
+
+ /*
+ * Drain both fds. comb_multiplexer() should then ensure that the
+ * mux is no longer readable.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_pipe(rfd2, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when two fds are drained");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ {
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ Assert(drain_pipe(wfd2, bidi_pipe_size));
+ }
+
+ close(rfd2);
+ close(wfd2);
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
On Thu, Aug 7, 2025 at 11:55 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
On Wed, Aug 6, 2025 at 9:13 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:Maybe "drain" would no longer be the
verb to use there.I keep describing this as "combing" the queue when I talk about it in
person, so v3-0001 renames this new operation to comb_multiplexer().
And the CI (plus the more strenuous TLS tests) confirms that the
callback count is still stable with this weaker guarantee, so I've
gotten rid of the event-counting code.
I was about to hit send on an email suggesting "reset_multiplexer()",
and an attempt at distilling the explanation to a short paragraph, but
your names and explanations are also good so please feel 100% free to
ignore these suggestions.
"Unlike epoll descriptors, kqueue descriptors only transition from
readable to unreadable when kevent() is called and finds nothing,
after removing level-triggered conditions that have gone away. We
therefore need a dummy kevent() call after operations might have been
performed on the monitored sockets or timer_fd. Any event returned is
ignored here, but it also remains queued (being level-triggered) and
leaves the descriptor readable. This is a no-op for epoll
descriptors."
Reviewing the timer stuff, it is again tempting to try to use an
EVFILT_TIMER directly, but I like your approach: it's nice to be able
to mirror the Linux coding, with this minor kink ironed out.
FWIW I re-read the kqueue paper's discussion of the goals of making
kqueue descriptors themselves monitorable/pollable, and it seems it
was mainly intended for hierarchies of kqueues, like your timer_fd,
with the specific aim of expressing priorities. It doesn't talk about
giving them to code that doesn't know it has a kqueue fd (the client)
and never calls kevent() and infers the events instead (libcurl).
That said, the fact that the filter function for kqueue fds just
checks queue size > 0 without running the filter functions for the
queued events does seem like a bit of an abstraction leak from this
vantage point. At least it's easy enough to work around in the
kqueue-managing middleman code once you understand it.
Now that I'm no longer counting events, I can collapse the changes to
register_socket(). I can't revert those changes entirely, because then
we regress the case where Curl switches a socket from IN to OUT (this
is enforced by the new unit tests). But I'm not sure that the existing
comment adequately explained that fix anyway, and I didn't remember to
call it out in my initial email, so I've split it out into v3-0002.
It's much smaller.
Much nicer! Yeah, that all makes sense.
The tests (now in 0005) have been adjusted for the new "combing"
behavior, and I've added a case to ensure that multiple stale events
are swept up by a single call to comb_multiplexer().
This all looks pretty good to me. I like the C TAP test. PostgreSQL
needs more of this.
s/signalled/signaled/ (= US spelling) in a couple of places.
On Thu, Aug 7, 2025 at 1:45 PM Thomas Munro <thomas.munro@gmail.com> wrote:
I like the C TAP test. PostgreSQL
needs more of this.
I should add, I didn't look closely at that part since you said it's
not in scope for back-patching. I'd like to, though, later.
I wonder if you would be interested in this attempt at centralised
infrastructure for unit testing our C code over here. I'm not
suggesting it for your immediate problem, just noting the overlap:
/messages/by-id/CA+hUKG+ajSQ_8eu2AogTncOnZ5me2D-Cn66iN_-wZnRjLN+icg@mail.gmail.com
Basically I would like to be able to dump easy-to-write files into the
tree that say stuff like this ↓ and have the build scripts find them,
build them and test them without all the module boilerplate stuff or a
running server (though that aspect is obviously not relevant for your
frontend case). Like you find in googletest or various xunit-style
systems in other projects, but integrated with our TAP universe (or
whatever replaces it if we escape from Perl). But I never got the
configure part of it working.
PG_BEGIN_TESTS();
...
PG_EXPECT_EQ(pg_preadv(fd, iov, 2, 11), 0);
PG_EXPECT_EQ(pg_pread(fd, buffer, 10, 0), 10);
PG_EXPECT_EQ_STR(buffer, "helloworld");
...
PG_END_TESTS();
Jacob Champion <jacob.champion@enterprisedb.com> writes:
From 50257bf32eb2b0972e5139ac4a79367372c77385 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v3 5/5] oauth: Add unit tests for multiplexer handling
I haven't read the meat of the patch, but I have some comments on the
tests:
+IPC::Run::run ['oauth_tests'], + '>', IPC::Run::new_chunker, sub { print {$out} $_[0] }, + '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] } + or die "oauth_tests returned $?";
We've recently switched to using fat commas (=>) between options and
their arguments, and that includes the file redirections in IPC::Run.
Although not semantically meaningful, I'd also be tempted to put parens
around the argument list for each redirect, so it's clear that they go
together.
Also, indirect object syntax (print {$fh} ...) is ugly and
old-fashioned, it's nicer to call it as a method on the filehandle.
So I'd write the above as:
IPC::Run::run ['oauth_tests'],
'>' => (IPC::Run::new_chunker, sub { $out->print($_[0]) }),
'2>' => (IPC::Run::new_chunker, sub { $err->print($_[0]) })
or die "oauth_tests returned $?";
As for the C TAP tests, there's already a bunch of TAP-outputting
infrastructure in pg_regress.c. Would it make sense to factor that out
into a common library?
- ilmari
On Wed, Aug 6, 2025 at 6:46 PM Thomas Munro <thomas.munro@gmail.com> wrote:
"Unlike epoll descriptors, kqueue descriptors only transition from
readable to unreadable when kevent() is called and finds nothing,
after removing level-triggered conditions that have gone away. We
therefore need a dummy kevent() call after operations might have been
performed on the monitored sockets or timer_fd. Any event returned is
ignored here, but it also remains queued (being level-triggered) and
leaves the descriptor readable. This is a no-op for epoll
descriptors."
I really like this; I'm working it into the doc comment.
FWIW I re-read the kqueue paper's discussion of the goals of making
kqueue descriptors themselves monitorable/pollable, and it seems it
was mainly intended for hierarchies of kqueues, like your timer_fd,
with the specific aim of expressing priorities. It doesn't talk about
giving them to code that doesn't know it has a kqueue fd (the client)
and never calls kevent() and infers the events instead (libcurl).
Interesting! It would be nice if they papered over this for us, but I
guess that's water under the bridge.
s/signalled/signaled/ (= US spelling) in a couple of places.
Ah. Will fix(?) or else lobby the dictionary companies.
Thank you so much for the reviews!
--Jacob
On Thu, Aug 7, 2025 at 9:35 AM Dagfinn Ilmari Mannsåker
<ilmari@ilmari.org> wrote:
I haven't read the meat of the patch, but I have some comments on the
tests:
Thanks for the review!
+IPC::Run::run ['oauth_tests'], + '>', IPC::Run::new_chunker, sub { print {$out} $_[0] }, + '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] } + or die "oauth_tests returned $?";We've recently switched to using fat commas (=>) between options and
their arguments, and that includes the file redirections in IPC::Run.
Although not semantically meaningful, I'd also be tempted to put parens
around the argument list for each redirect, so it's clear that they go
together.
I have two concerns:
- If I don't put parentheses around the list, the fat comma is
actively misleading.
- As far as I can tell, IPC::Run neither documents nor tests the
ability to pass a list here. (But the tests are a bit of a maze, so
please correct me if there is one.) My fear is that I'll be coupling
against an implementation detail if I write it that way.
So I'm leaning towards keeping it as-is, unless you know of a reason
that the list syntax is guaranteed to work, with the understanding
that it does diverge from what you authored in 19c6e92b1. But I don't
think any of those examples use filters, so I don't feel too bad about
the difference yet?
Also, indirect object syntax (print {$fh} ...) is ugly and
old-fashioned, it's nicer to call it as a method on the filehandle.
That is much nicer; I'll do that.
As for the C TAP tests, there's already a bunch of TAP-outputting
infrastructure in pg_regress.c. Would it make sense to factor that out
into a common library?
Maybe if we got to rule-of-three, but I'd rather not make either
implementation compromise for the sake of the other. IMO, this is a
situation where a bad abstraction would be much costlier than the
duplication: TAP is lightweight, and I think the needs of a unit test
suite and the needs of a characterization test collector are very
different.
--Jacob
On Wed, Aug 6, 2025 at 7:43 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Thu, Aug 7, 2025 at 1:45 PM Thomas Munro <thomas.munro@gmail.com> wrote:
I wonder if you would be interested in this attempt at centralised
infrastructure for unit testing our C code over here. I'm not
suggesting it for your immediate problem, just noting the overlap:/messages/by-id/CA+hUKG+ajSQ_8eu2AogTncOnZ5me2D-Cn66iN_-wZnRjLN+icg@mail.gmail.com
I am all for better testability at the function level, and it even
looks like 0002 solved a problem that stopped me from getting rid of
the Perl shim...
The "test without a running server" part also might have some overlap
with my fuzzing work. So I'll take a closer look later; thanks!
--Jacob
On Thu, Aug 7, 2025 at 11:11 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
Thank you so much for the reviews!
Here is v4, with the feedback from both of you. 0001-0004 are planned
for backport; 0005 is slated for master only. Thanks again for the
reviews!
--Jacob
Attachments:
since-v3.diff.txttext/plain; charset=UTF-8; name=since-v3.diff.txtDownload
1: c5cdccfe374 ! 1: a515435d3b4 oauth: Remove stale events from the kqueue multiplexer
@@ Commit message
after drive_request(), before we return control to the client to wait.
Suggested-by: Thomas Munro <thomas.munro@gmail.com>
+ Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
+ Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
+ Backpatch-through: 18
+ Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
## src/interfaces/libpq-oauth/oauth-curl.c ##
@@ src/interfaces/libpq-oauth/oauth-curl.c: register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
#endif
}
-+/*-------
++/*
+ * If there is no work to do on any of the descriptors in the multiplexer, then
+ * this function must ensure that the multiplexer is not readable.
+ *
-+ * As a motivating example, consider the following sequence of events:
-+ * 1. libcurl tries to write data to the send buffer, but it fills up.
-+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
-+ * client to wait.
-+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
-+ * and the client wakes up and calls back into the flow.
-+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
-+ * The socket is no longer writable.
-+ *
-+ * At this point, an epoll-based mux no longer signals readiness, so nothing
-+ * further needs to be done. But a kqueue-based mux will continue to signal
-+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
-+ * or the old socket-writable event is read from the queue. Since Curl isn't
-+ * guaranteed to do the former, we must do the latter here.
++ * Unlike epoll descriptors, kqueue descriptors only transition from readable to
++ * unreadable when kevent() is called and finds nothing, after removing
++ * level-triggered conditions that have gone away. We therefore need a dummy
++ * kevent() call after operations might have been performed on the monitored
++ * sockets or timer_fd. Any event returned is ignored here, but it also remains
++ * queued (being level-triggered) and leaves the descriptor readable. This is a
++ * no-op for epoll descriptors.
+ */
+static bool
+comb_multiplexer(struct async_ctx *actx)
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn)
+ * Make sure that stale events don't cause us to come back
+ * early. (Currently, this can occur only with kqueue.) If
+ * this is forgotten, the multiplexer can get stuck in a
-+ * signalled state and we'll burn CPU cycles pointlessly.
++ * signaled state and we'll burn CPU cycles pointlessly.
+ */
+ if (!comb_multiplexer(actx))
+ goto error_return;
2: 7725e0c173b ! 2: a34be19f17f oauth: Ensure unused socket registrations are removed
@@ Commit message
number of events added, the number of events pulled off the queue, and
the lengths of the kevent arrays.
+ Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
+ Backpatch-through: 18
+ Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
+
## src/interfaces/libpq-oauth/oauth-curl.c ##
@@ src/interfaces/libpq-oauth/oauth-curl.c: register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
3: 6ccf7a5d156 ! 3: 7408778d579 oauth: Remove expired timers from the multiplexer
@@ Commit message
the timer was known to be set, but both implementations now use the
kqueue logic.
+ Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
+ Backpatch-through: 18
+ Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
+
## src/interfaces/libpq-oauth/oauth-curl.c ##
@@ src/interfaces/libpq-oauth/oauth-curl.c: set_timer(struct async_ctx *actx, long timeout)
4: 2be993b8f07 ! 4: 8241255e84c oauth: Track total call count during a client flow
@@ Commit message
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
+ Backpatch-through: 18
+ Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
+
## src/interfaces/libpq-oauth/oauth-curl.c ##
@@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
5: 50257bf32eb ! 5: 337124064f3 oauth: Add unit tests for multiplexer handling
@@ Commit message
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
+ Reviewed-by: Dagfinn Ilmari Mannsåker <ilmari@ilmari.org>
+ Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
+
## src/interfaces/libpq-oauth/Makefile ##
@@ src/interfaces/libpq-oauth/Makefile: uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
@@ src/interfaces/libpq-oauth/t/001_oauth.pl (new)
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
-+ '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
-+ '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
++ '>', IPC::Run::new_chunker, sub { $out->print($_[0]) },
++ '2>', IPC::Run::new_chunker, sub { $err->print($_[0]) }
+ or die "oauth_tests returned $?";
## src/interfaces/libpq-oauth/test-oauth-curl.c (new) ##
v4-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patchapplication/octet-stream; name=v4-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patchDownload
From a515435d3b4e02d20563a75e3dfa4177d333ca4c Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v4 1/5] oauth: Remove stale events from the kqueue multiplexer
If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().
In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.
Implement comb_multiplexer() to call kevent() and unstick any stale
events that would cause unnecessary callbacks. This is called right
after drive_request(), before we return control to the client to wait.
Suggested-by: Thomas Munro <thomas.munro@gmail.com>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
src/interfaces/libpq-oauth/oauth-curl.c | 67 ++++++++++++++++++++++---
1 file changed, 61 insertions(+), 6 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..433135cb86d 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1376,6 +1376,53 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
#endif
}
+/*
+ * If there is no work to do on any of the descriptors in the multiplexer, then
+ * this function must ensure that the multiplexer is not readable.
+ *
+ * Unlike epoll descriptors, kqueue descriptors only transition from readable to
+ * unreadable when kevent() is called and finds nothing, after removing
+ * level-triggered conditions that have gone away. We therefore need a dummy
+ * kevent() call after operations might have been performed on the monitored
+ * sockets or timer_fd. Any event returned is ignored here, but it also remains
+ * queued (being level-triggered) and leaves the descriptor readable. This is a
+ * no-op for epoll descriptors.
+ */
+static bool
+comb_multiplexer(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+ /* The epoll implementation doesn't hold onto stale events. */
+ return true;
+#elif defined(HAVE_SYS_EVENT_H)
+ struct timespec timeout = {0};
+ struct kevent ev;
+
+ /*
+ * Try to read a single pending event. We can actually ignore the result:
+ * either we found an event to process, in which case the multiplexer is
+ * correctly readable for that event at minimum, and it doesn't matter if
+ * there are any stale events; or we didn't find any, in which case the
+ * kernel will have discarded any stale events as it traveled to the end
+ * of the queue.
+ *
+ * Note that this depends on our registrations being level-triggered --
+ * even the timer, so we use a chained kqueue for that instead of an
+ * EVFILT_TIMER on the top-level mux. If we used edge-triggered events,
+ * this call would improperly discard them.
+ */
+ if (kevent(actx->mux, NULL, 0, &ev, 1, &timeout) < 0)
+ {
+ actx_error(actx, "could not comb kqueue: %m");
+ return false;
+ }
+
+ return true;
+#else
+#error comb_multiplexer is not implemented on this platform
+#endif
+}
+
/*
* Enables or disables the timer in the multiplexer set. The timeout value is
* in milliseconds (negative values disable the timer).
@@ -2755,13 +2802,21 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
if (status == PGRES_POLLING_FAILED)
goto error_return;
- else if (status != PGRES_POLLING_OK)
- {
- /* not done yet */
- return status;
- }
+ else if (status == PGRES_POLLING_OK)
+ break; /* done! */
- break;
+ /*
+ * This request is still running.
+ *
+ * Make sure that stale events don't cause us to come back
+ * early. (Currently, this can occur only with kqueue.) If
+ * this is forgotten, the multiplexer can get stuck in a
+ * signaled state and we'll burn CPU cycles pointlessly.
+ */
+ if (!comb_multiplexer(actx))
+ goto error_return;
+
+ return status;
}
case OAUTH_STEP_WAIT_INTERVAL:
--
2.34.1
v4-0002-oauth-Ensure-unused-socket-registrations-are-remo.patchapplication/octet-stream; name=v4-0002-oauth-Ensure-unused-socket-registrations-are-remo.patchDownload
From a34be19f17fcf7a15d64fc241c73ead993ba89e6 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 6 Aug 2025 15:18:08 -0700
Subject: [PATCH v4 2/5] oauth: Ensure unused socket registrations are removed
If Curl needs to switch the direction of a socket's registration (e.g.
from CURL_POLL_IN to CURL_POLL_OUT), it expects the old registration to
be discarded. For epoll, this happened via EPOLL_CTL_MOD, but for
kqueue, the old registration would remain if it was not explicitly
removed by Curl.
Explicitly remove the opposite-direction event during registrations. (If
that event doesn't exist, we'll just get an ENOENT, which will be
ignored by the same code that handles CURL_POLL_REMOVE.) A few
assertions are also added to strengthen the relationship between the
number of events added, the number of events pulled off the queue, and
the lengths of the kevent arrays.
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 433135cb86d..97c33299a79 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1291,22 +1291,31 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return 0;
#elif defined(HAVE_SYS_EVENT_H)
- struct kevent ev[2] = {0};
+ struct kevent ev[2];
struct kevent ev_out[2];
struct timespec timeout = {0};
int nev = 0;
int res;
+ /*
+ * We don't know which of the events is currently registered, perhaps
+ * both, so we always try to remove unneeded events. This means we need to
+ * tolerate ENOENT below.
+ */
switch (what)
{
case CURL_POLL_IN:
EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
nev++;
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
break;
case CURL_POLL_OUT:
EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
nev++;
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
break;
case CURL_POLL_INOUT:
@@ -1317,12 +1326,6 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
break;
case CURL_POLL_REMOVE:
-
- /*
- * We don't know which of these is currently registered, perhaps
- * both, so we try to remove both. This means we need to tolerate
- * ENOENT below.
- */
EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
nev++;
EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
@@ -1334,7 +1337,10 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return -1;
}
- res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+ Assert(nev <= lengthof(ev));
+ Assert(nev <= lengthof(ev_out));
+
+ res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
if (res < 0)
{
actx_error(actx, "could not modify kqueue: %m");
--
2.34.1
v4-0003-oauth-Remove-expired-timers-from-the-multiplexer.patchapplication/octet-stream; name=v4-0003-oauth-Remove-expired-timers-from-the-multiplexer.patchDownload
From 7408778d57995eba1c4d82acf96497f5cd48f82c Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH v4 3/5] oauth: Remove expired timers from the multiplexer
In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().
Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.
The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
1 file changed, 68 insertions(+), 40 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 97c33299a79..aa5d2bfd96c 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1536,40 +1536,20 @@ set_timer(struct async_ctx *actx, long timeout)
/*
* Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
*/
static int
timer_expired(struct async_ctx *actx)
{
-#if defined(HAVE_SYS_EPOLL_H)
- struct itimerspec spec = {0};
-
- if (timerfd_gettime(actx->timerfd, &spec) < 0)
- {
- actx_error(actx, "getting timerfd value: %m");
- return -1;
- }
-
- /*
- * This implementation assumes we're using single-shot timers. If you
- * change to using intervals, you'll need to reimplement this function
- * too, possibly with the read() or select() interfaces for timerfd.
- */
- Assert(spec.it_interval.tv_sec == 0
- && spec.it_interval.tv_nsec == 0);
-
- /* If the remaining time to expiration is zero, we're done. */
- return (spec.it_value.tv_sec == 0
- && spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
int res;
- /* Is the timer queue ready? */
+ /* Is the timer ready? */
res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
if (res < 0)
{
- actx_error(actx, "checking kqueue for timeout: %m");
+ actx_error(actx, "checking timer expiration: %m");
return -1;
}
@@ -1601,6 +1581,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
return 0;
}
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+ int res;
+
+ res = timer_expired(actx);
+ if (res < 0)
+ return false;
+
+ if (res > 0)
+ {
+ /*
+ * Timer is expired. We could drain the event manually from the
+ * timerfd, but it's easier to simply disable it; that keeps the
+ * platform-specific code in set_timer().
+ */
+ if (!set_timer(actx, -1))
+ return false;
+ }
+
+ if (was_expired)
+ *was_expired = (res > 0);
+
+ return true;
+}
+
/*
* Prints Curl request debugging information to stderr.
*
@@ -2804,6 +2814,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
{
PostgresPollingStatusType status;
+ /*
+ * Clear any expired timeout before calling back into
+ * Curl. Curl is not guaranteed to do this for us, because
+ * its API expects us to use single-shot (i.e.
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
+ * This can't be combined with the comb_multiplexer() call
+ * below: we might accidentally clear a short timeout that
+ * was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
+ goto error_return;
+
+ /* Move the request forward. */
status = drive_request(actx);
if (status == PGRES_POLLING_FAILED)
@@ -2826,24 +2852,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
}
case OAUTH_STEP_WAIT_INTERVAL:
-
- /*
- * The client application is supposed to wait until our timer
- * expires before calling PQconnectPoll() again, but that
- * might not happen. To avoid sending a token request early,
- * check the timer before continuing.
- */
- if (!timer_expired(actx))
{
- set_conn_altsock(conn, actx->timerfd);
- return PGRES_POLLING_READING;
- }
+ bool expired;
- /* Disable the expired timer. */
- if (!set_timer(actx, -1))
- goto error_return;
+ /*
+ * The client application is supposed to wait until our
+ * timer expires before calling PQconnectPoll() again, but
+ * that might not happen. To avoid sending a token request
+ * early, check the timer before continuing.
+ */
+ if (!drain_timer_events(actx, &expired))
+ goto error_return;
- break;
+ if (!expired)
+ {
+ set_conn_altsock(conn, actx->timerfd);
+ return PGRES_POLLING_READING;
+ }
+
+ break;
+ }
}
/*
--
2.34.1
v4-0004-oauth-Track-total-call-count-during-a-client-flow.patchapplication/octet-stream; name=v4-0004-oauth-Track-total-call-count-during-a-client-flow.patchDownload
From 8241255e84c20000446c8fb2786999b9a0c1dd5c Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH v4 4/5] oauth: Track total call count during a client flow
Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.
A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++
.../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index aa5d2bfd96c..aa50b00d053 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
};
/*
@@ -3021,6 +3022,8 @@ PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
PostgresPollingStatusType result;
+ fe_oauth_state *state = conn_sasl_state(conn);
+ struct async_ctx *actx;
#ifndef WIN32
sigset_t osigset;
bool sigpipe_pending;
@@ -3049,6 +3052,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
+ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
+ * Be careful that state->async_ctx could be NULL if early initialization
+ * fails during the first call.
+ */
+ actx = state->async_ctx;
+ Assert(actx || result == PGRES_POLLING_FAILED);
+
+ if (actx && actx->debugging)
+ {
+ actx->dbg_num_calls++;
+ if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+ fprintf(stderr, "[libpq] total number of polls: %d\n",
+ actx->dbg_num_calls);
+ }
+
#ifndef WIN32
if (masked)
{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
);
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+ 'postgres',
+ "SELECT 'connected for call count'",
+ extra_params => ['-w'],
+ connstr => connstr(stage => 'token', retries => 2),
+ on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+ $stderr,
+ qr@Visit https://example\.com/ and enter the code: postgresuser@,
+ "call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+ # For reference, a typical flow with two retries might take between 5-15
+ # calls to the client implementation. And while this will probably continue
+ # to change across OSes and Curl updates, we're likely in trouble if we see
+ # hundreds or thousands of calls.
+ $stderr =~ $count_pattern;
+ cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
# Stress test: make sure our builtin flow operates correctly even if the client
# application isn't respecting PGRES_POLLING_READING/WRITING signals returned
# from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
connstr(stage => 'all', retries => 1, interval => 1));
note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
unlike(
--
2.34.1
v4-0005-oauth-Add-unit-tests-for-multiplexer-handling.patchapplication/octet-stream; name=v4-0005-oauth-Add-unit-tests-for-multiplexer-handling.patchDownload
From 337124064f37eccfb5d2f9ba27edbf70e945aeb2 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v4 5/5] oauth: Add unit tests for multiplexer handling
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
Reviewed-by: Dagfinn Ilmari Mannsåker <ilmari@ilmari.org>
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
src/interfaces/libpq-oauth/Makefile | 14 +
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 527 +++++++++++++++++++
4 files changed, 600 insertions(+)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 682f17413b3..e73573694b9 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..5af6c860768
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>', IPC::Run::new_chunker, sub { $out->print($_[0]) },
+ '2>', IPC::Run::new_chunker, sub { $err->print($_[0]) }
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..8263aff2f4a
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,527 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ bool expired;
+
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters, since comb_multiplexer() doesn't have to remove
+ * stale events when active events exist. Follow the call sequence
+ * used in the code: drain the timer expiration, drain the pipe,
+ * then clear the stale events.
+ */
+ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+
+ /* Ensure comb_multiplexer() can handle multiple stale events. */
+ {
+ int rfd2,
+ wfd2;
+
+ /* Create a second local pipe. */
+ Assert(pipe(pipefd) == 0);
+ rfd2 = pipefd[0];
+ wfd2 = pipefd[1];
+
+ /* Make both rfds appear unidirectional if necessary. */
+ if (bidirectional)
+ {
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+ Assert(fill_pipe(rfd2) == bidi_pipe_size);
+ }
+
+ /* Register for read events on both fds, and make them readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
+
+ Assert(write(wfd, "x", 1) == 1);
+ Assert(write(wfd2, "x", 1) == 1);
+
+ mux_is_ready(actx->mux, deadline, "when two fds are readable");
+
+ /*
+ * Drain both fds. comb_multiplexer() should then ensure that the
+ * mux is no longer readable.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_pipe(rfd2, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when two fds are drained");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ {
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ Assert(drain_pipe(wfd2, bidi_pipe_size));
+ }
+
+ close(rfd2);
+ close(wfd2);
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
On Fri, Aug 8, 2025 at 8:04 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
On Thu, Aug 7, 2025 at 11:11 AM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:Thank you so much for the reviews!
Here is v4, with the feedback from both of you. 0001-0004 are planned
for backport; 0005 is slated for master only. Thanks again for the
reviews!
LGTM.
On Fri, Aug 8, 2025 at 3:39 AM Thomas Munro <thomas.munro@gmail.com> wrote:
LGTM.
Committed!
Thanks,
--Jacob
Jacob Champion <jacob.champion@enterprisedb.com> writes:
On Thu, Aug 7, 2025 at 9:35 AM Dagfinn Ilmari Mannsåker
<ilmari@ilmari.org> wrote:I haven't read the meat of the patch, but I have some comments on the
tests:Thanks for the review!
+IPC::Run::run ['oauth_tests'], + '>', IPC::Run::new_chunker, sub { print {$out} $_[0] }, + '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] } + or die "oauth_tests returned $?";We've recently switched to using fat commas (=>) between options and
their arguments, and that includes the file redirections in IPC::Run.
Although not semantically meaningful, I'd also be tempted to put parens
around the argument list for each redirect, so it's clear that they go
together.I have two concerns:
- If I don't put parentheses around the list, the fat comma is
actively misleading.
- As far as I can tell, IPC::Run neither documents nor tests the
ability to pass a list here. (But the tests are a bit of a maze, so
please correct me if there is one.) My fear is that I'll be coupling
against an implementation detail if I write it that way.
So I'm leaning towards keeping it as-is, unless you know of a reason
that the list syntax is guaranteed to work, with the understanding
that it does diverge from what you authored in 19c6e92b1. But I don't
think any of those examples use filters, so I don't feel too bad about
the difference yet?
When I said "not semantically meaningful" I meant that the parens don't
change what gets passed to the function. In Perl, parens only serve to
override precedence and as visual grouping, they don't affect the
structure of the data at all.
To demonstrate:
$ perl -MJSON::PP=encode_json -E 'say encode_json([1, 2, 3])'
[1,2,3]
$ perl -MJSON::PP=encode_json -E 'say encode_json([1 => (2, 3)])'
[1,2,3]
Also, indirect object syntax (print {$fh} ...) is ugly and
old-fashioned, it's nicer to call it as a method on the filehandle.That is much nicer; I'll do that.
As for the C TAP tests, there's already a bunch of TAP-outputting
infrastructure in pg_regress.c. Would it make sense to factor that out
into a common library?Maybe if we got to rule-of-three, but I'd rather not make either
implementation compromise for the sake of the other. IMO, this is a
situation where a bad abstraction would be much costlier than the
duplication: TAP is lightweight, and I think the needs of a unit test
suite and the needs of a characterization test collector are very
different.
Fair enough.
--Jacob
- ilmari
On Fri, Aug 8, 2025 at 1:07 PM Dagfinn Ilmari Mannsåker
<ilmari@ilmari.org> wrote:
$ perl -MJSON::PP=encode_json -E 'say encode_json([1, 2, 3])'
[1,2,3]$ perl -MJSON::PP=encode_json -E 'say encode_json([1 => (2, 3)])'
[1,2,3]
I swear, this language.
But:
$ perl -MJSON::PP=encode_json -E 'say encode_json(1,2)'
Too many arguments for JSON::PP::encode_json at -e line 1, near "2)
$ perl -MJSON::PP=encode_json -E 'say encode_json((1,2))'
2
So what's going on there? (Google is not very helpful for these sorts
of Perl problems; I don't even know how to describe this.)
I had to revert the test for unrelated reasons [1]/messages/by-id/CAOYmi+nCkoh3zB+GkZad44=FNskwUg6F1kmuxqQZzng7Zgj5tw@mail.gmail.com, so if this is
indeed guaranteed to be safe then I can make the change in my next
attempt.
Thanks!
--Jacob
[1]: /messages/by-id/CAOYmi+nCkoh3zB+GkZad44=FNskwUg6F1kmuxqQZzng7Zgj5tw@mail.gmail.com
Jacob Champion <jacob.champion@enterprisedb.com> writes:
On Fri, Aug 8, 2025 at 1:07 PM Dagfinn Ilmari Mannsåker
<ilmari@ilmari.org> wrote:$ perl -MJSON::PP=encode_json -E 'say encode_json([1, 2, 3])'
[1,2,3]$ perl -MJSON::PP=encode_json -E 'say encode_json([1 => (2, 3)])'
[1,2,3]I swear, this language.
But:
$ perl -MJSON::PP=encode_json -E 'say encode_json(1,2)'
Too many arguments for JSON::PP::encode_json at -e line 1, near "2)
$ perl -MJSON::PP=encode_json -E 'say encode_json((1,2))'
2So what's going on there? (Google is not very helpful for these sorts
of Perl problems; I don't even know how to describe this.)
That's because encode_json has a prototype[1]https://perldoc.perl.org/perlsub#Prototypes, which changes how the
argument list is parsed: no longer just as a flat list of values like a
normal function. Specifically, it has a prototype of '$', which means
it only takes one argument, which is evaluated in scalar context. So
the first example is a syntax error, but in the second example the
parenthesised expression is the single argument. Becuse it's in scalar
context, the comma is actually the scalar comma operator, not the list
element separator, so the return value is the right-hand side of the
comma (just like in C), not the length of the would-be list.
Onfusingly, they are both 2 here (which is why 1,2,3,... are bad values
to use when exploring presedence/context issues in Perl, sorry for doing
that in my example). More clearly (fsvo):
$ perl -MJSON::PP=encode_json -E 'say encode_json((11,12))'
12
- ilmari
On Fri, Aug 8, 2025 at 2:16 PM Dagfinn Ilmari Mannsåker
<ilmari@ilmari.org> wrote:
That's because encode_json has a prototype[1], which changes how the
argument list is parsed: no longer just as a flat list of values like a
normal function. Specifically, it has a prototype of '$', which means
it only takes one argument, which is evaluated in scalar context. So
the first example is a syntax error, but in the second example the
parenthesised expression is the single argument. Becuse it's in scalar
context, the comma is actually the scalar comma operator, not the list
element separator, so the return value is the right-hand side of the
comma (just like in C), not the length of the would-be list.
ron-swanson-throws-away-computer.gif
Well, thank you for the explanation. I'll make that change.
--Jacob
On Fri, Aug 8, 2025 at 2:31 PM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:
Well, thank you for the explanation. I'll make that change.
Done in v5.
v5-0001 is planned for backport to 18 once the freeze lifts. It
ensures that -lm is part of the link line for libpq-oauth, since the
module uses floor(). I probably wouldn't have ever noticed, except
that the new test executable, which uses the same link flags,
complained on Clang [1]/messages/by-id/CAOYmi+m=xY0P_uAzAP_884uF-GhQ3wrineGwc9AEnb6fYxVqVQ@mail.gmail.com.
(In that thread, I incorrectly said the problem was with "Meson
animals". The Meson side is fine, and both alligator and bushmaster
use Autoconf, so I'm not sure how I ended up with that idea.)
v5-0002 should fix the more general buildfarm failure that caused the
revert. The farm finds the new t/ subdirectory and starts running Make
on src/interfaces/libpq-oauth directly, bypassing the skip logic in
src/interfaces/Makefile. So I've wrapped the "standard" top-level
targets that build and install things in a conditional. The targets
that clean things up have been left alone, at Tom's suggestion in [1]/messages/by-id/CAOYmi+m=xY0P_uAzAP_884uF-GhQ3wrineGwc9AEnb6fYxVqVQ@mail.gmail.com.
Thanks,
--Jacob
[1]: /messages/by-id/CAOYmi+m=xY0P_uAzAP_884uF-GhQ3wrineGwc9AEnb6fYxVqVQ@mail.gmail.com
Attachments:
since-v4.diff.txttext/plain; charset=UTF-8; name=since-v4.diff.txtDownload
1: a515435d3b4 < -: ----------- oauth: Remove stale events from the kqueue multiplexer
2: a34be19f17f < -: ----------- oauth: Ensure unused socket registrations are removed
3: 7408778d579 < -: ----------- oauth: Remove expired timers from the multiplexer
4: 8241255e84c < -: ----------- oauth: Track total call count during a client flow
-: ----------- > 1: c9962268ef0 oauth: Always link with -lm for floor()
5: 337124064f3 ! 2: 2e36b329c76 oauth: Add unit tests for multiplexer handling
@@ Commit message
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
+ This commit is a replay of 1443b6c0e, which was reverted due to
+ buildfarm failures. Compared with that, this version protects the build
+ targets in the Makefile with a with_libcurl conditional, and it tweaks
+ the code style in 001_oauth.pl.
+
Reviewed-by: Dagfinn Ilmari Mannsåker <ilmari@ilmari.org>
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
## src/interfaces/libpq-oauth/Makefile ##
-@@ src/interfaces/libpq-oauth/Makefile: uninstall:
- rm -f '$(DESTDIR)$(libdir)/$(stlib)'
- rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+@@ src/interfaces/libpq-oauth/Makefile: SHLIB_EXPORTS = exports.txt
+ # Disable -bundle_loader on macOS.
+ BE_DLLLIBS =
+
+-# By default, a library without an SONAME doesn't get a static library, so we
+-# add it to the build explicitly.
+-all: all-lib all-static-lib
+-
+ # Shared library stuff
+ include $(top_srcdir)/src/Makefile.shlib
+
+@@ src/interfaces/libpq-oauth/Makefile: include $(top_srcdir)/src/Makefile.shlib
+ %_shlib.o: %.c %.o
+ $(CC) $(CFLAGS) $(CFLAGS_SL) $(CPPFLAGS) $(CPPFLAGS_SHLIB) -c $< -o $@
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
@@ src/interfaces/libpq-oauth/Makefile: uninstall:
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
++#
++# Top-Level Targets
++#
++# The existence of a t/ folder induces the buildfarm to run Make directly on
++# this subdirectory, bypassing the recursion skip in src/interfaces/Makefile.
++# Wrap the standard build targets in a with_libcurl conditional to avoid
++# building OAuth code on platforms that haven't requested it. (The "clean"-style
++# targets remain available.)
++#
++
++ifeq ($(with_libcurl), yes)
++
++# By default, a library without an SONAME doesn't get a static library, so we
++# add it to the build explicitly.
++all: all-lib all-static-lib
++
+ # Ignore the standard rules for SONAME-less installation; we want both the
+ # static and shared libraries to go into libdir.
+ install: all installdirs $(stlib) $(shlib)
+@@ src/interfaces/libpq-oauth/Makefile: install: all installdirs $(stlib) $(shlib)
+ installdirs:
+ $(MKDIR_P) '$(DESTDIR)$(libdir)'
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
++endif # with_libcurl
++
+ uninstall:
+ rm -f '$(DESTDIR)$(libdir)/$(stlib)'
+ rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
@@ src/interfaces/libpq-oauth/t/001_oauth.pl (new)
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
-+ '>', IPC::Run::new_chunker, sub { $out->print($_[0]) },
-+ '2>', IPC::Run::new_chunker, sub { $err->print($_[0]) }
++ '>' => (IPC::Run::new_chunker, sub { $out->print($_[0]) }),
++ '2>' => (IPC::Run::new_chunker, sub { $err->print($_[0]) })
+ or die "oauth_tests returned $?";
## src/interfaces/libpq-oauth/test-oauth-curl.c (new) ##
v5-0001-oauth-Always-link-with-lm-for-floor.patchapplication/octet-stream; name=v5-0001-oauth-Always-link-with-lm-for-floor.patchDownload
From c9962268ef0448bfc1173990a265138af786beba Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Mon, 11 Aug 2025 13:56:51 -0700
Subject: [PATCH v5 1/2] oauth: Always link with -lm for floor()
libpq-oauth uses floor() but did not link against libm. Since libpq
itself uses -lm, nothing in the buildfarm has had problems with
libpq-oauth yet, and it seems difficult to hit a failure in practice.
But commit 1443b6c0e attempted to add an executable based on
libpq-oauth, which ran into link-time failures with Clang due to this
omission. It seems prudent to fix this for both the module and the
executable simultaneously so that no one trips over it in the future.
This is a Makefile-only change. The Meson side already pulls in libm,
through the os_deps dependency.
Backpatch-through: 18
---
src/interfaces/libpq-oauth/Makefile | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 682f17413b3..8819fa8650e 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -47,7 +47,7 @@ $(stlib): override OBJS += $(OBJS_STATIC)
$(stlib): $(OBJS_STATIC)
SHLIB_LINK_INTERNAL = $(libpq_pgport_shlib)
-SHLIB_LINK = $(LIBCURL_LDFLAGS) $(LIBCURL_LDLIBS) $(filter -lintl, $(LIBS))
+SHLIB_LINK = $(LIBCURL_LDFLAGS) $(LIBCURL_LDLIBS) $(filter -lintl -lm, $(LIBS))
SHLIB_PREREQS = submake-libpq
SHLIB_EXPORTS = exports.txt
--
2.34.1
v5-0002-oauth-Add-unit-tests-for-multiplexer-handling.patchapplication/octet-stream; name=v5-0002-oauth-Add-unit-tests-for-multiplexer-handling.patchDownload
From 2e36b329c76df96d6f0385b2859e7bca1bb1a376 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 12 Aug 2025 14:47:34 -0700
Subject: [PATCH v5 2/2] oauth: Add unit tests for multiplexer handling
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
This commit is a replay of 1443b6c0e, which was reverted due to
buildfarm failures. Compared with that, this version protects the build
targets in the Makefile with a with_libcurl conditional, and it tweaks
the code style in 001_oauth.pl.
Reviewed-by: Dagfinn Ilmari Mannsåker <ilmari@ilmari.org>
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
src/interfaces/libpq-oauth/Makefile | 36 +-
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 527 +++++++++++++++++++
4 files changed, 618 insertions(+), 4 deletions(-)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 8819fa8650e..c8c38947ace 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -54,10 +54,6 @@ SHLIB_EXPORTS = exports.txt
# Disable -bundle_loader on macOS.
BE_DLLLIBS =
-# By default, a library without an SONAME doesn't get a static library, so we
-# add it to the build explicitly.
-all: all-lib all-static-lib
-
# Shared library stuff
include $(top_srcdir)/src/Makefile.shlib
@@ -66,6 +62,28 @@ include $(top_srcdir)/src/Makefile.shlib
%_shlib.o: %.c %.o
$(CC) $(CFLAGS) $(CFLAGS_SL) $(CPPFLAGS) $(CPPFLAGS_SHLIB) -c $< -o $@
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+#
+# Top-Level Targets
+#
+# The existence of a t/ folder induces the buildfarm to run Make directly on
+# this subdirectory, bypassing the recursion skip in src/interfaces/Makefile.
+# Wrap the standard build targets in a with_libcurl conditional to avoid
+# building OAuth code on platforms that haven't requested it. (The "clean"-style
+# targets remain available.)
+#
+
+ifeq ($(with_libcurl), yes)
+
+# By default, a library without an SONAME doesn't get a static library, so we
+# add it to the build explicitly.
+all: all-lib all-static-lib
+
# Ignore the standard rules for SONAME-less installation; we want both the
# static and shared libraries to go into libdir.
install: all installdirs $(stlib) $(shlib)
@@ -75,9 +93,19 @@ install: all installdirs $(stlib) $(shlib)
installdirs:
$(MKDIR_P) '$(DESTDIR)$(libdir)'
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
+endif # with_libcurl
+
uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..6c972056bbd
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>' => (IPC::Run::new_chunker, sub { $out->print($_[0]) }),
+ '2>' => (IPC::Run::new_chunker, sub { $err->print($_[0]) })
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..8263aff2f4a
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,527 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ bool expired;
+
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters, since comb_multiplexer() doesn't have to remove
+ * stale events when active events exist. Follow the call sequence
+ * used in the code: drain the timer expiration, drain the pipe,
+ * then clear the stale events.
+ */
+ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
+ Assert(comb_multiplexer(actx));
+
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+
+ /* Ensure comb_multiplexer() can handle multiple stale events. */
+ {
+ int rfd2,
+ wfd2;
+
+ /* Create a second local pipe. */
+ Assert(pipe(pipefd) == 0);
+ rfd2 = pipefd[0];
+ wfd2 = pipefd[1];
+
+ /* Make both rfds appear unidirectional if necessary. */
+ if (bidirectional)
+ {
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+ Assert(fill_pipe(rfd2) == bidi_pipe_size);
+ }
+
+ /* Register for read events on both fds, and make them readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
+
+ Assert(write(wfd, "x", 1) == 1);
+ Assert(write(wfd2, "x", 1) == 1);
+
+ mux_is_ready(actx->mux, deadline, "when two fds are readable");
+
+ /*
+ * Drain both fds. comb_multiplexer() should then ensure that the
+ * mux is no longer readable.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_pipe(rfd2, 1));
+ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when two fds are drained");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ {
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ Assert(drain_pipe(wfd2, bidi_pipe_size));
+ }
+
+ close(rfd2);
+ close(wfd2);
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
On 2025-08-12 Tu 6:11 PM, Jacob Champion wrote:
On Fri, Aug 8, 2025 at 2:31 PM Jacob Champion
<jacob.champion@enterprisedb.com> wrote:Well, thank you for the explanation. I'll make that change.
Done in v5.
v5-0001 is planned for backport to 18 once the freeze lifts. It
ensures that -lm is part of the link line for libpq-oauth, since the
module uses floor(). I probably wouldn't have ever noticed, except
that the new test executable, which uses the same link flags,
complained on Clang [1].(In that thread, I incorrectly said the problem was with "Meson
animals". The Meson side is fine, and both alligator and bushmaster
use Autoconf, so I'm not sure how I ended up with that idea.)v5-0002 should fix the more general buildfarm failure that caused the
revert. The farm finds the new t/ subdirectory and starts running Make
on src/interfaces/libpq-oauth directly, bypassing the skip logic in
src/interfaces/Makefile. So I've wrapped the "standard" top-level
targets that build and install things in a conditional. The targets
that clean things up have been left alone, at Tom's suggestion in [1].Thanks,
--Jacob[1] /messages/by-id/CAOYmi+m=xY0P_uAzAP_884uF-GhQ3wrineGwc9AEnb6fYxVqVQ@mail.gmail.com
I don't think that's quite going to work. The buildfarm will now get a
"target not found" in the without-curl case, I suspect. I think you'll
need an alternative definition of the check target.
cheers
andrew
--
Andrew Dunstan
EDB: https://www.enterprisedb.com
On Tue, Aug 12, 2025 at 3:21 PM Andrew Dunstan <andrew@dunslane.net> wrote:
I don't think that's quite going to work. The buildfarm will now get a
"target not found" in the without-curl case, I suspect. I think you'll
need an alternative definition of the check target.
We're still including Makefile.global, so the targets are still there;
they just won't add the libpq-oauth specific pieces. So `make check -C
src/interfaces/libpq-oauth` still works on my machine. Or have I
misunderstood the problem?
--Jacob
On 2025-08-12 Tu 6:25 PM, Jacob Champion wrote:
On Tue, Aug 12, 2025 at 3:21 PM Andrew Dunstan <andrew@dunslane.net> wrote:
I don't think that's quite going to work. The buildfarm will now get a
"target not found" in the without-curl case, I suspect. I think you'll
need an alternative definition of the check target.We're still including Makefile.global, so the targets are still there;
they just won't add the libpq-oauth specific pieces. So `make check -C
src/interfaces/libpq-oauth` still works on my machine. Or have I
misunderstood the problem?
Oh, I see, and because the default definition of check doesn't call
prove_check nothing is done. OK, sorry for the noise.
cheers
andrew
--
Andrew Dunstan
EDB: https://www.enterprisedb.com