Builtin connection polling
Hi hackers,
My recent experiments with pthread version of Postgres show that
although pthread offers some performance advantages comparing with
processes for large number of connections, them still can not eliminate
need in connection pooling. Large number even of inactive connections
cause significant degrade of Postgres performance.
So we need connection pooling. Most of enterprise systems working with
Postgres are using pgbouncer or similar tools.
But pgbouncer has the following drawbacks:
1. It is an extra entity which complicates system installation and
administration.
2. Pgbouncer itself can be a bottleneck and point of failure. For
example with enabled SSL, single threaded model of pgbouncer becomes
limiting factor when a lot of clients try to simultaneously reestablish
connection. This is why some companies are building hierarchy of pgbouncers.
3. Using pool_mode other than "session" makes it not possible to use
prepared statements and session variables.
Lack of prepared statements can itself decrease speed of simple queries
up to two times.
So I thought about built-in connection pooling for Postgres. Ideally it
should be integrated with pthreads, because in this case scheduling of
sessions can be done more flexible and easily.
But I decided to start with patch to vanilla Postgres.
Idea is the following:
1. We start some number of normal backends (which forms backend pool for
serving client sessions).
2. When number of connections exceeds number of backends, then instead
of spawning new backend we choose some of existed backend and redirect
connection to it.
There is more or less portable way in Unix to pass socket descriptors
between processes using Unix sockets:
for example
https://stackoverflow.com/questions/28003921/sending-file-descriptor-by-linux-socket/
(this is one of the places where pthreads Postgres will win). So a
session is bounded to a backend. Backends and chosen using round-robin
policy which should guarantee more or less unform distribution of
sessions between backends if number of sessions is much larger than
number of backends. But certainly skews in client application access
patterns can violate this assumption.
3. Rescheduling is done at transaction level. So it is enough to have
one entry in procarray for backend to correctly handle locks. Also
transaction level pooling eliminates
problem with false deadlocks (caused by lack of free executors in the
pool). Also transaction level pooling minimize changes in Postgres core
needed to maintain correct session context:
no need to suspend/resume transaction state, static variables, ....
4. In the main Postgres query loop in PostgresMain we determine a
moment when backend is not in transaction state and perform select of
sockets of all active sessions and choose one of them.
5. When client is disconnected, then we close session but do not
terminate backend.
6. To support prepared statements, we append session identifier to the
name of the statement. So prepared statements of different sessions will
not interleave. As far as session is bounded to the backend, it is
possible to use prepared statements.
This is minimal plan for embedded session pooling I decided to implement
as prototype.
Several things are not addressed now:
1. Temporary tables. In principle them can be handled in the same way as
prepared statements: by concatenating session identifier to the name of
the table.
But it require adjusting references to this table in all queries. It is
much more complicated than in case of prepared statements.
2. Session level GUCs. In principle it is not difficult to remember GUCs
modified by session and save/restore them on session switch.
But it is just not implemented now.
3. Support of multiple users/databases/... It is the most critical
drawback. Right now my prototype implementation assumes that all clients
are connected to the same database
under the same user with some connection options. And it is a challenge
about which I want to know option of community. The name of the database
and user are retrieved from client connection by ProcessStartupPacket
function. In vanilla Posgres this function is executed by spawned
backend. So I do not know which database a client is going to access
before calling this function and reading data from the client's socket.
Now I just choose random backend and assign connection to this backend.
But it can happen that this backend is working with different
database/user. Now I just return error in this case. Certainly it is
possible to call ProcessStartupPacket at postmaster and then select
proper backend working with specified database/user.
But I afraid that postmaster can become bottleneck i this case,
especially in case of using SSL. Also larger number of databases/users
can significantly suffer efficiency of pooling if each backend will be
responsible only for database/user combination. May be backend should be
bounded only to the database and concrete role should be set on session
switch. But it can require flushing backend caches whichdevalues idea of
embedded session pooling. This problem can be easily solved with
multithreaded Postgres where it is possible to easily reassign session
to another thread.
Now results shown by my prototype. I used pgbench with scale factor 100
in readonly mode (-S option).
Precise pgbench command is "pgbench -S -c N -M prepared -T 100 -P 1 -n".
Results in the table below are in kTPS:
Connections
Vanilla Postgres
Postgres with session pool size=10
10
186
181
100
118
224
1000
59
191
As you see instead of degrade of performance with increasing number of
connections, Postgres with session pool shows stable performance result.
Moreover, for vanilla Postgres best results at my system are obtained
for 10 connections, but Postgres with session pool shows better
performance for 100 connections with the same number of spawned backends.
My patch to the Postgres is attached to this mail.
To switch on session polling set session_pool_size to some non-zero
value. Another GUC variable which I have added is "max_sessions" which
specifies maximal number of sessions handled by backend. So total number
of handled client connections is session_pool_size*max_sessions.
Certainly it is just prototype far from practical use.
In addition to the challenges mentioned above, there are also some other
issues which should be considered:
1. Long living transaction in client application blocks all other
sessions in the backend and so can suspend work of the Postgres.
So Uber-style programming when database transaction is started with
opening door of a car and finished at the end of the trip is completely
not compatible with this approach.
2. Fatal errors cause disconnect not only of one client caused the
problem but bunch of client sessions scheduled to this backend.
3. It is possible to use PL-APIs, such as plpython, but session level
variables may not be used.
4. There may be some memory leaks caused by allocation of memory using
malloc or in top memory context which is expected to be freed on backend
exit.
But it is not deallocated at session close, so large number of handled
sessions can cause memory overflow.
5. Some applications, handling mutliple connections inside single thread
and multiplexing them at statement level (rather than on transaction
level) may not work correctly.
It seems to be quite exotic use case. But pgbench actually behaves in
this way! This is why attempt to start pgbench with multistatement
transactions (-N) will fail if number of threads (-j) is smaller than
number of connections (-c).
6. The approach with passing socket descriptors between processes was
implemented only for Unix and tested only at Linux, although is expected
to work also as MacOS and other Unix dialects. Windows is not supported now.
I will be glad to receive an feedback and suggestion concerning
perspectives of embedded connection pooling.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Attachments:
session_pool.patchtext/x-patch; name=session_pool.patchDownload
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..a73d584 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,30 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+ HASH_SEQ_STATUS seq;
+ PreparedStatement *entry;
+ size_t idLen = strlen(sessionId);
+
+ /* nothing cached */
+ if (!prepared_queries)
+ return;
+
+ /* walk over cache */
+ hash_seq_init(&seq, prepared_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+ {
+ /* Release the plancache entry */
+ DropCachedPlan(entry->plansource);
+
+ /* Now we can remove the hash table entry */
+ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+ }
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..5b07a88 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,18 @@ pq_peekbyte(void)
}
/* --------------------------------
+ * pq_peekbyte - peek at next byte from connection
+ *
+ * Same as pq_getbyte() except we don't advance the pointer.
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+ return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..996a41c 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend* BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int nNormalBackends;
#ifdef EXEC_BACKEND
static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE PostmasterHandle;
#endif
/*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+ Backend* b = BackendListClockPtr;
+ do {
+ dlist_node* node = &b->elem;
+ node = node->next ? node->next : BackendList.head.next;
+ b = dlist_container(Backend, elem, node);
+ } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+ BackendListClockPtr = (b != BackendListClockPtr) ? b : NULL;
+}
+
+/*
* Postmaster main entry point
*/
void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
{
int32 len;
void *buf;
@@ -2043,7 +2068,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx);
}
/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ port->session_recv_sock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+ if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+ {
+ if (bp == BackendListClockPtr)
+ AdvanceBackendListClockPtr();
+ if (bp->session_send_sock != PGINVALID_SOCKET)
+ close(bp->session_send_sock);
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+ nNormalBackends -= 1;
+ }
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
break;
}
}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -4017,6 +4058,19 @@ BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ int session_pipe[2];
+
+ if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+ {
+ /* Instead of spawning new backend open new session at one of the existed backends. */
+ Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+ elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+ if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+ elog(FATAL, "Failed to send session socket: %m");
+ AdvanceBackendListClockPtr(); /* round-robin */
+ return STATUS_OK;
+ }
+
/*
* Create backend data structure. Better before the fork() so we can
@@ -4030,7 +4084,6 @@ BackendStartup(Port *port)
errmsg("out of memory")));
return STATUS_ERROR;
}
-
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4116,23 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ if (SessionPoolSize != 0)
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ if (SessionPoolSize != 0)
+ {
+ port->session_recv_sock = session_pipe[0];
+ close(session_pipe[1]);
+ }
free(bn);
/* Detangle from postmaster */
@@ -4110,9 +4174,19 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ if (SessionPoolSize != 0)
+ {
+ bn->session_send_sock = session_pipe[1];
+ close(session_pipe[0]);
+ }
+ else
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
-
+ if (BackendListClockPtr == NULL)
+ BackendListClockPtr = bn;
+ nNormalBackends += 1;
+ elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4299,7 +4373,7 @@ BackendInitialize(Port *port)
* Receive the startup packet (which might turn out to be a cancel request
* packet).
*/
- status = ProcessStartupPacket(port, false);
+ status = ProcessStartupPacket(port, false, TopMemoryContext);
/*
* Stop here if it was bad or a cancel packet. ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
#endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..779ebc0 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,6 +75,7 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
#include "mb/pg_wchar.h"
@@ -169,6 +170,10 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static WaitEventSet* SessionPool;
+static int64 SessionCount;
+static char* CurrentSessionId;
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -194,6 +199,15 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+ char buf[64];
+ pg_lltoa(++SessionCount, buf);
+ return strdup(buf);
+}
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -473,7 +487,7 @@ SocketBackend(StringInfo inBuf)
* fatal because we have probably lost message boundary sync, and
* there's no good way to recover.
*/
- ereport(FATAL,
+ ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d", qtype)));
break;
@@ -1232,6 +1246,12 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
+ if (CurrentSessionId && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+ }
+
/*
* Report query to various monitoring facilities.
*/
@@ -1503,6 +1523,12 @@ exec_bind_message(StringInfo input_message)
portal_name = pq_getmsgstring(input_message);
stmt_name = pq_getmsgstring(input_message);
+ if (CurrentSessionId && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+ }
+
ereport(DEBUG2,
(errmsg("bind %s to %s",
*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2351,12 @@ exec_describe_statement_message(const char *stmt_name)
CachedPlanSource *psrc;
int i;
+ if (CurrentSessionId && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+ }
+
/*
* Start up a transaction command. (Note that this will normally change
* current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3635,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
-
/* ----------------------------------------------------------------
* PostgresMain
* postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3685,10 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Assign session ID if use session pooling */
+ if (SessionPoolSize != 0)
+ CurrentSessionId = CreateSessionId();
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3783,7 +3818,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -4069,6 +4104,102 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ if (MyProcPort && MyProcPort->session_recv_sock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+ whereToSendOutput = DestRemote;
+ if (SessionPool == NULL)
+ {
+ SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+ AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
+ AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->session_recv_sock, NULL, NULL);
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, NULL);
+ }
+ Retry:
+ DoingCommandRead = true;
+ if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto Retry;
+ }
+
+ if (ready_client.fd == MyProcPort->session_recv_sock)
+ {
+ int status;
+ Port port;
+ Port* myPort;
+ StringInfoData buf;
+ pgsocket sock = pg_recv_sock(MyProcPort->session_recv_sock);
+ if (sock < 0)
+ elog(FATAL, "Failed to receive session socket: %m");
+
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ port.sock = sock;
+ myPort = MyProcPort;
+ MyProcPort = &port;
+ status = ProcessStartupPacket(&port, false, MessageContext);
+ MyProcPort = myPort;
+ if (strcmp(port.database_name, MyProcPort->database_name) ||
+ strcmp(port.user_name, MyProcPort->user_name))
+ {
+ elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+ port.database_name, port.user_name,
+ MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+ }
+ else if (status == STATUS_OK)
+ {
+ elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+ sock, MyProcPid, port.database_name, port.user_name);
+ CurrentSessionId = CreateSessionId();
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, CurrentSessionId);
+ MyProcPort->sock = sock;
+ send_ready_for_query = true;
+
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ CommitTransactionCommand();
+
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint32(&buf, (int32) MyProcPid);
+ pq_sendint32(&buf, (int32) MyCancelKey);
+ pq_endmessage(&buf);
+ /* Need not flush since ReadyForQuery will do it. */
+ continue;
+ }
+ elog(LOG, "Session startup failed");
+ close(sock);
+ goto Retry;
+ }
+ else
+ {
+ elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+ MyProcPort->sock = ready_client.fd;
+ CurrentSessionId = (char*)ready_client.user_data;
+ }
+ }
}
/*
@@ -4350,13 +4481,33 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+ if (SessionPool)
+ {
+ DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+ elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+ pq_getmsgend(&input_message);
+
+ if (CurrentSessionId)
+ {
+ DropSessionPreparedStatements(CurrentSessionId);
+ free(CurrentSessionId);
+ CurrentSessionId = NULL;
+ }
+
+ close(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+
+ send_ready_for_query = true;
+ break;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
case 'c': /* copy done */
case 'f': /* copy fail */
- /*
+ /*!
* Accept but ignore these messages, per protocol spec; we
* probably got here because a COPY failed, and the frontend
* is still sending data.
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int maintenance_work_mem = 16384;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..02373a3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,26 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of client session."),
+ NULL
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets number of backends serving client sessions."),
+ NULL
+ },
+ &SessionPoolSize,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 49cb263..f31f89b 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -127,7 +127,8 @@ typedef struct Port
int remote_hostname_errcode; /* see above */
char *remote_port; /* text rep of remote port */
CAC_state canAcceptConnections; /* postmaster connection status */
-
+ pgsocket session_recv_sock; /* socket for receiving descriptor of new session sockets */
+
/*
* Information that needs to be saved from the startup packet and passed
* into backend execution. "char *" fields are NULL if not set.
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
extern PGDLLIMPORT int max_worker_processes;
extern int max_parallel_workers;
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
+1 to the concept... A lot of user could benefit if we did this in a good
way.
On Wed, Jan 17, 2018 at 8:09 AM, Konstantin Knizhnik <
k.knizhnik@postgrespro.ru> wrote:
Hi hackers,
My recent experiments with pthread version of Postgres show that although
pthread offers some performance advantages comparing with processes for
large number of connections, them still can not eliminate need in
connection pooling. Large number even of inactive connections cause
significant degrade of Postgres performance.So we need connection pooling. Most of enterprise systems working with
Postgres are using pgbouncer or similar tools.
But pgbouncer has the following drawbacks:
1. It is an extra entity which complicates system installation and
administration.
2. Pgbouncer itself can be a bottleneck and point of failure. For example
with enabled SSL, single threaded model of pgbouncer becomes limiting
factor when a lot of clients try to simultaneously reestablish connection.
This is why some companies are building hierarchy of pgbouncers.
3. Using pool_mode other than "session" makes it not possible to use
prepared statements and session variables.
Lack of prepared statements can itself decrease speed of simple queries up
to two times.So I thought about built-in connection pooling for Postgres. Ideally it
should be integrated with pthreads, because in this case scheduling of
sessions can be done more flexible and easily.
But I decided to start with patch to vanilla Postgres.Idea is the following:
1. We start some number of normal backends (which forms backend pool for
serving client sessions).
2. When number of connections exceeds number of backends, then instead of
spawning new backend we choose some of existed backend and redirect
connection to it.
There is more or less portable way in Unix to pass socket descriptors
between processes using Unix sockets:
for example https://stackoverflow.com/questions/28003921/sending-
file-descriptor-by-linux-socket/
(this is one of the places where pthreads Postgres will win). So a session
is bounded to a backend. Backends and chosen using round-robin policy which
should guarantee more or less unform distribution of sessions between
backends if number of sessions is much larger than number of backends. But
certainly skews in client application access patterns can violate this
assumption.
3. Rescheduling is done at transaction level. So it is enough to have one
entry in procarray for backend to correctly handle locks. Also transaction
level pooling eliminates
problem with false deadlocks (caused by lack of free executors in the
pool). Also transaction level pooling minimize changes in Postgres core
needed to maintain correct session context:
no need to suspend/resume transaction state, static variables, ....
4. In the main Postgres query loop in PostgresMain we determine a moment
when backend is not in transaction state and perform select of sockets of
all active sessions and choose one of them.
5. When client is disconnected, then we close session but do not terminate
backend.
6. To support prepared statements, we append session identifier to the
name of the statement. So prepared statements of different sessions will
not interleave. As far as session is bounded to the backend, it is possible
to use prepared statements.This is minimal plan for embedded session pooling I decided to implement
as prototype.Several things are not addressed now:
1. Temporary tables. In principle them can be handled in the same way as
prepared statements: by concatenating session identifier to the name of the
table.
But it require adjusting references to this table in all queries. It is
much more complicated than in case of prepared statements.
2. Session level GUCs. In principle it is not difficult to remember GUCs
modified by session and save/restore them on session switch.
But it is just not implemented now.
3. Support of multiple users/databases/... It is the most critical
drawback. Right now my prototype implementation assumes that all clients
are connected to the same database
under the same user with some connection options. And it is a challenge
about which I want to know option of community. The name of the database
and user are retrieved from client connection by ProcessStartupPacket
function. In vanilla Posgres this function is executed by spawned backend.
So I do not know which database a client is going to access before calling
this function and reading data from the client's socket. Now I just choose
random backend and assign connection to this backend. But it can happen
that this backend is working with different database/user. Now I just
return error in this case. Certainly it is possible to call
ProcessStartupPacket at postmaster and then select proper backend working
with specified database/user.
But I afraid that postmaster can become bottleneck i this case, especially
in case of using SSL. Also larger number of databases/users can
significantly suffer efficiency of pooling if each backend will be
responsible only for database/user combination. May be backend should be
bounded only to the database and concrete role should be set on session
switch. But it can require flushing backend caches which devalues idea of
embedded session pooling. This problem can be easily solved with
multithreaded Postgres where it is possible to easily reassign session to
another thread.Now results shown by my prototype. I used pgbench with scale factor 100 in
readonly mode (-S option).
Precise pgbench command is "pgbench -S -c N -M prepared -T 100 -P 1 -n".
Results in the table below are in kTPS:Connections
Vanilla Postgres
Postgres with session pool size=10
10
186
181
100
118
224
1000
59
191As you see instead of degrade of performance with increasing number of
connections, Postgres with session pool shows stable performance result.
Moreover, for vanilla Postgres best results at my system are obtained for
10 connections, but Postgres with session pool shows better performance for
100 connections with the same number of spawned backends.My patch to the Postgres is attached to this mail.
To switch on session polling set session_pool_size to some non-zero value.
Another GUC variable which I have added is "max_sessions" which specifies
maximal number of sessions handled by backend. So total number of handled
client connections is session_pool_size*max_sessions.Certainly it is just prototype far from practical use.
In addition to the challenges mentioned above, there are also some other
issues which should be considered:1. Long living transaction in client application blocks all other sessions
in the backend and so can suspend work of the Postgres.
So Uber-style programming when database transaction is started with
opening door of a car and finished at the end of the trip is completely not
compatible with this approach.
2. Fatal errors cause disconnect not only of one client caused the problem
but bunch of client sessions scheduled to this backend.
3. It is possible to use PL-APIs, such as plpython, but session level
variables may not be used.
4. There may be some memory leaks caused by allocation of memory using
malloc or in top memory context which is expected to be freed on backend
exit.
But it is not deallocated at session close, so large number of handled
sessions can cause memory overflow.
5. Some applications, handling mutliple connections inside single thread
and multiplexing them at statement level (rather than on transaction level)
may not work correctly.
It seems to be quite exotic use case. But pgbench actually behaves in this
way! This is why attempt to start pgbench with multistatement transactions
(-N) will fail if number of threads (-j) is smaller than number of
connections (-c).
6. The approach with passing socket descriptors between processes was
implemented only for Unix and tested only at Linux, although is expected to
work also as MacOS and other Unix dialects. Windows is not supported now.I will be glad to receive an feedback and suggestion concerning
perspectives of embedded connection pooling.--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
--
Ivan Novick, Product Manager Pivotal Greenplum
inovick@pivotal.io -- (Mobile) 408-230-6491
https://www.youtube.com/GreenplumDatabase
On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,
My recent experiments with pthread version of Postgres show that
although pthread offers some performance advantages comparing with
processes for large number of connections, them still can not
eliminate need in connection pooling. Large number even of inactive
connections cause significant degrade of Postgres performance.So we need connection pooling. Most of enterprise systems working
with Postgres are using pgbouncer or similar tools.
But pgbouncer has the following drawbacks:
1. It is an extra entity which complicates system installation and
administration.
2. Pgbouncer itself can be a bottleneck and point of failure. For
example with enabled SSL, single threaded model of pgbouncer becomes
limiting factor when a lot of clients try to simultaneously
reestablish connection. This is why some companies are building
hierarchy of pgbouncers.
3. Using pool_mode other than "session" makes it not possible to use
prepared statements and session variables.
Lack of prepared statements can itself decrease speed of simple
queries up to two times.So I thought about built-in connection pooling for Postgres. Ideally
it should be integrated with pthreads, because in this case scheduling
of sessions can be done more flexible and easily.
But I decided to start with patch to vanilla Postgres.Idea is the following:
1. We start some number of normal backends (which forms backend pool
for serving client sessions).
2. When number of connections exceeds number of backends, then instead
of spawning new backend we choose some of existed backend and redirect
connection to it.
There is more or less portable way in Unix to pass socket descriptors
between processes using Unix sockets:
for example
https://stackoverflow.com/questions/28003921/sending-file-descriptor-by-linux-socket/
(this is one of the places where pthreads Postgres will win). So a
session is bounded to a backend. Backends and chosen using round-robin
policy which should guarantee more or less unform distribution of
sessions between backends if number of sessions is much larger than
number of backends. But certainly skews in client application access
patterns can violate this assumption.
3. Rescheduling is done at transaction level. So it is enough to have
one entry in procarray for backend to correctly handle locks. Also
transaction level pooling eliminates
problem with false deadlocks (caused by lack of free executors in the
pool). Also transaction level pooling minimize changes in Postgres
core needed to maintain correct session context:
no need to suspend/resume transaction state, static variables, ....
4. In the main Postgres query loop in PostgresMain we determine a
moment when backend is not in transaction state and perform select of
sockets of all active sessions and choose one of them.
5. When client is disconnected, then we close session but do not
terminate backend.
6. To support prepared statements, we append session identifier to the
name of the statement. So prepared statements of different sessions
will not interleave. As far as session is bounded to the backend, it
is possible to use prepared statements.This is minimal plan for embedded session pooling I decided to
implement as prototype.Several things are not addressed now:
1. Temporary tables. In principle them can be handled in the same way
as prepared statements: by concatenating session identifier to the
name of the table.
But it require adjusting references to this table in all queries. It
is much more complicated than in case of prepared statements.
2. Session level GUCs. In principle it is not difficult to remember
GUCs modified by session and save/restore them on session switch.
But it is just not implemented now.
3. Support of multiple users/databases/... It is the most critical
drawback. Right now my prototype implementation assumes that all
clients are connected to the same database
under the same user with some connection options. And it is a
challenge about which I want to know option of community. The name of
the database and user are retrieved from client connection by
ProcessStartupPacket function. In vanilla Posgres this function is
executed by spawned backend. So I do not know which database a client
is going to access before calling this function and reading data from
the client's socket. Now I just choose random backend and assign
connection to this backend. But it can happen that this backend is
working with different database/user. Now I just return error in this
case. Certainly it is possible to call ProcessStartupPacket at
postmaster and then select proper backend working with specified
database/user.
But I afraid that postmaster can become bottleneck i this case,
especially in case of using SSL. Also larger number of databases/users
can significantly suffer efficiency of pooling if each backend will be
responsible only for database/user combination. May be backend should
be bounded only to the database and concrete role should be set on
session switch. But it can require flushing backend caches
whichdevalues idea of embedded session pooling. This problem can be
easily solved with multithreaded Postgres where it is possible to
easily reassign session to another thread.Now results shown by my prototype. I used pgbench with scale factor
100 in readonly mode (-S option).
Precise pgbench command is "pgbench -S -c N -M prepared -T 100 -P 1
-n". Results in the table below are in kTPS:Connections
Vanilla Postgres
Postgres with session pool size=10
10
186
181
100
118
224
1000
59
191As you see instead of degrade of performance with increasing number of
connections, Postgres with session pool shows stable performance result.
Moreover, for vanilla Postgres best results at my system are obtained
for 10 connections, but Postgres with session pool shows better
performance for 100 connections with the same number of spawned backends.My patch to the Postgres is attached to this mail.
To switch on session polling set session_pool_size to some non-zero
value. Another GUC variable which I have added is "max_sessions" which
specifies maximal number of sessions handled by backend. So total
number of handled client connections is session_pool_size*max_sessions.Certainly it is just prototype far from practical use.
In addition to the challenges mentioned above, there are also some
other issues which should be considered:1. Long living transaction in client application blocks all other
sessions in the backend and so can suspend work of the Postgres.
So Uber-style programming when database transaction is started with
opening door of a car and finished at the end of the trip is
completely not compatible with this approach.
2. Fatal errors cause disconnect not only of one client caused the
problem but bunch of client sessions scheduled to this backend.
3. It is possible to use PL-APIs, such as plpython, but session level
variables may not be used.
4. There may be some memory leaks caused by allocation of memory using
malloc or in top memory context which is expected to be freed on
backend exit.
But it is not deallocated at session close, so large number of handled
sessions can cause memory overflow.
5. Some applications, handling mutliple connections inside single
thread and multiplexing them at statement level (rather than on
transaction level) may not work correctly.
It seems to be quite exotic use case. But pgbench actually behaves in
this way! This is why attempt to start pgbench with multistatement
transactions (-N) will fail if number of threads (-j) is smaller than
number of connections (-c).
6. The approach with passing socket descriptors between processes was
implemented only for Unix and tested only at Linux, although is
expected to work also as MacOS and other Unix dialects. Windows is not
supported now.I will be glad to receive an feedback and suggestion concerning
perspectives of embedded connection pooling.--
Konstantin Knizhnik
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company
Attached please find new version of the patch with few fixes.
And more results at NUMA system with 144 cores and 3Tb of RAM.
Read-only pgbench (-S):
#Connections\kTPS
Vanilla Postgres
Session pool size 256
1k
1300 1505
10k
633
1519
100k
- 1425
Read-write contention test: access to small number of records with 1% of
updates.
#Clients\TPS Vanilla Postgres Session pool size 256
100 557232 573319
200 520395 551670
300 511423 533773
400 468562 523091
500 442268 514056
600 401860 526704
700 363912 530317
800 325148 512238
900 301310 512844
1000 278829 554516
So, as you can see, there is no degrade of performance with increased number of connections in case of using session pooling.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Attachments:
session_pool-2.patchtext/x-patch; name=session_pool-2.patchDownload
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..a73d584 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,30 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+ HASH_SEQ_STATUS seq;
+ PreparedStatement *entry;
+ size_t idLen = strlen(sessionId);
+
+ /* nothing cached */
+ if (!prepared_queries)
+ return;
+
+ /* walk over cache */
+ hash_seq_init(&seq, prepared_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+ {
+ /* Release the plancache entry */
+ DropCachedPlan(entry->plansource);
+
+ /* Now we can remove the hash table entry */
+ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+ }
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..5b07a88 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,18 @@ pq_peekbyte(void)
}
/* --------------------------------
+ * pq_peekbyte - peek at next byte from connection
+ *
+ * Same as pq_getbyte() except we don't advance the pointer.
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+ return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..710e22c 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend* BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int nNormalBackends;
#ifdef EXEC_BACKEND
static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE PostmasterHandle;
#endif
/*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+ Backend* b = BackendListClockPtr;
+ do {
+ dlist_node* node = &b->elem;
+ node = node->next ? node->next : BackendList.head.next;
+ b = dlist_container(Backend, elem, node);
+ } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+ BackendListClockPtr = b;
+}
+
+/*
* Postmaster main entry point
*/
void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
{
int32 len;
void *buf;
@@ -2043,7 +2068,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx);
}
/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ port->session_recv_sock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+ if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+ {
+ if (bp == BackendListClockPtr)
+ AdvanceBackendListClockPtr();
+ if (bp->session_send_sock != PGINVALID_SOCKET)
+ close(bp->session_send_sock);
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+ nNormalBackends -= 1;
+ }
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
break;
}
}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -4017,6 +4058,19 @@ BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ int session_pipe[2];
+
+ if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+ {
+ /* Instead of spawning new backend open new session at one of the existed backends. */
+ Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+ elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+ if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+ elog(FATAL, "Failed to send session socket: %m");
+ AdvanceBackendListClockPtr(); /* round-robin */
+ return STATUS_OK;
+ }
+
/*
* Create backend data structure. Better before the fork() so we can
@@ -4030,7 +4084,6 @@ BackendStartup(Port *port)
errmsg("out of memory")));
return STATUS_ERROR;
}
-
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4116,23 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ if (SessionPoolSize != 0)
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ if (SessionPoolSize != 0)
+ {
+ port->session_recv_sock = session_pipe[0];
+ close(session_pipe[1]);
+ }
free(bn);
/* Detangle from postmaster */
@@ -4110,9 +4174,19 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ if (SessionPoolSize != 0)
+ {
+ bn->session_send_sock = session_pipe[1];
+ close(session_pipe[0]);
+ }
+ else
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
-
+ if (BackendListClockPtr == NULL)
+ BackendListClockPtr = bn;
+ nNormalBackends += 1;
+ elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4299,7 +4373,7 @@ BackendInitialize(Port *port)
* Receive the startup packet (which might turn out to be a cancel request
* packet).
*/
- status = ProcessStartupPacket(port, false);
+ status = ProcessStartupPacket(port, false, TopMemoryContext);
/*
* Stop here if it was bad or a cancel packet. ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
#endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..ffc1494 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,6 +75,7 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
#include "mb/pg_wchar.h"
@@ -169,6 +170,10 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static WaitEventSet* SessionPool;
+static int64 SessionCount;
+static char* CurrentSessionId;
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -194,6 +199,15 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+ char buf[64];
+ pg_lltoa(++SessionCount, buf);
+ return strdup(buf);
+}
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -473,7 +487,7 @@ SocketBackend(StringInfo inBuf)
* fatal because we have probably lost message boundary sync, and
* there's no good way to recover.
*/
- ereport(FATAL,
+ ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid frontend message type %d", qtype)));
break;
@@ -1232,6 +1246,12 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
+ if (CurrentSessionId && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+ }
+
/*
* Report query to various monitoring facilities.
*/
@@ -1503,6 +1523,12 @@ exec_bind_message(StringInfo input_message)
portal_name = pq_getmsgstring(input_message);
stmt_name = pq_getmsgstring(input_message);
+ if (CurrentSessionId && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+ }
+
ereport(DEBUG2,
(errmsg("bind %s to %s",
*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2351,12 @@ exec_describe_statement_message(const char *stmt_name)
CachedPlanSource *psrc;
int i;
+ if (CurrentSessionId && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSessionId, stmt_name);
+ }
+
/*
* Start up a transaction command. (Note that this will normally change
* current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3635,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
-
/* ----------------------------------------------------------------
* PostgresMain
* postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3685,10 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Assign session ID if use session pooling */
+ if (SessionPoolSize != 0)
+ CurrentSessionId = CreateSessionId();
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3783,7 +3818,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -4069,6 +4104,102 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ if (MyProcPort && MyProcPort->session_recv_sock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+ whereToSendOutput = DestRemote;
+ if (SessionPool == NULL)
+ {
+ SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+ AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
+ AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->session_recv_sock, NULL, NULL);
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, NULL);
+ }
+ Retry:
+ DoingCommandRead = true;
+ if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto Retry;
+ }
+
+ if (ready_client.fd == MyProcPort->session_recv_sock)
+ {
+ int status;
+ Port port;
+ Port* myPort;
+ StringInfoData buf;
+ pgsocket sock = pg_recv_sock(MyProcPort->session_recv_sock);
+ if (sock < 0)
+ elog(FATAL, "Failed to receive session socket: %m");
+
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ port.sock = sock;
+ myPort = MyProcPort;
+ MyProcPort = &port;
+ status = ProcessStartupPacket(&port, false, MessageContext);
+ MyProcPort = myPort;
+ if (strcmp(port.database_name, MyProcPort->database_name) ||
+ strcmp(port.user_name, MyProcPort->user_name))
+ {
+ elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+ port.database_name, port.user_name,
+ MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+ }
+ else if (status == STATUS_OK)
+ {
+ elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+ sock, MyProcPid, port.database_name, port.user_name);
+ CurrentSessionId = CreateSessionId();
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, CurrentSessionId);
+ MyProcPort->sock = sock;
+ send_ready_for_query = true;
+
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ CommitTransactionCommand();
+
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint32(&buf, (int32) MyProcPid);
+ pq_sendint32(&buf, (int32) MyCancelKey);
+ pq_endmessage(&buf);
+ /* Need not flush since ReadyForQuery will do it. */
+ continue;
+ }
+ elog(LOG, "Session startup failed");
+ close(sock);
+ goto Retry;
+ }
+ else
+ {
+ elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+ MyProcPort->sock = ready_client.fd;
+ CurrentSessionId = (char*)ready_client.user_data;
+ }
+ }
}
/*
@@ -4350,13 +4481,36 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+ if (SessionPool)
+ {
+ DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+ elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ pq_getmsgend(&input_message);
+ if (pq_is_reading_msg())
+ pq_endmsgread();
+
+ if (CurrentSessionId)
+ {
+ DropSessionPreparedStatements(CurrentSessionId);
+ free(CurrentSessionId);
+ CurrentSessionId = NULL;
+ }
+
+ close(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+
+ send_ready_for_query = true;
+ break;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
case 'c': /* copy done */
case 'f': /* copy fail */
- /*
+ /*!
* Accept but ignore these messages, per protocol spec; we
* probably got here because a COPY failed, and the frontend
* is still sending data.
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int maintenance_work_mem = 16384;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..02373a3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,26 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of client session."),
+ NULL
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets number of backends serving client sessions."),
+ NULL
+ },
+ &SessionPoolSize,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 49cb263..f31f89b 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -127,7 +127,8 @@ typedef struct Port
int remote_hostname_errcode; /* see above */
char *remote_port; /* text rep of remote port */
CAC_state canAcceptConnections; /* postmaster connection status */
-
+ pgsocket session_recv_sock; /* socket for receiving descriptor of new session sockets */
+
/*
* Information that needs to be saved from the startup packet and passed
* into backend execution. "char *" fields are NULL if not set.
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
extern PGDLLIMPORT int max_worker_processes;
extern int max_parallel_workers;
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
On Thu, Jan 18, 2018 at 11:48 AM, Konstantin Knizhnik <
k.knizhnik@postgrespro.ru> wrote:
Attached please find new version of the patch with few fixes.
And more results at NUMA system with 144 cores and 3Tb of RAM.Read-only pgbench (-S):
#Connections\kTPS
Vanilla Postgres
Session pool size 256
1k
1300 1505
10k
633
1519
100k
- 1425Read-write contention test: access to small number of records with 1% of
updates.#Clients\TPS Vanilla Postgres Session pool size 256
100 557232 573319
200 520395 551670
300 511423 533773
400 468562 523091
500 442268 514056
600 401860 526704
700 363912 530317
800 325148 512238
900 301310 512844
1000 278829 554516So, as you can see, there is no degrade of performance with increased number of connections in case of using session pooling.
TBH, the tests you should be running are comparisons with a similar pool
size managed by pgbouncer, not just vanilla unlimited postgres.
Of course a limited pool size will beat thousands of concurrent queries by
a large margin. The real question is whether a pthread-based approach beats
the pgbouncer approach.
Hi Konstantin,
On 01/18/2018 03:48 PM, Konstantin Knizhnik wrote:
On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,
...
I haven't looked at the code yet, but after reading your message I have
a simple question - gow iss this going to work with SSL? If you're only
passing a file descriptor, that does not seem to be sufficient for the
backends to do crypto (that requires the SSL stuff from Port).
Maybe I'm missing something and it already works, though ...
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 18.01.2018 18:02, Tomas Vondra wrote:
Hi Konstantin,
On 01/18/2018 03:48 PM, Konstantin Knizhnik wrote:
On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,
...
I haven't looked at the code yet, but after reading your message I have
a simple question - gow iss this going to work with SSL? If you're only
passing a file descriptor, that does not seem to be sufficient for the
backends to do crypto (that requires the SSL stuff from Port).Maybe I'm missing something and it already works, though ...
regards
Ooops, I missed this aspect with SSL. Thank you.
New version of the patch which correctly maintain session context is
attached.
Now each session has its own allocator which should be used instead of
TopMemoryAllocator.
SSL connections work now.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Attachments:
session_pool-3.patchtext/x-patch; name=session_pool-3.patchDownload
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..a73d584 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,30 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+ HASH_SEQ_STATUS seq;
+ PreparedStatement *entry;
+ size_t idLen = strlen(sessionId);
+
+ /* nothing cached */
+ if (!prepared_queries)
+ return;
+
+ /* walk over cache */
+ hash_seq_init(&seq, prepared_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+ {
+ /* Release the plancache entry */
+ DropCachedPlan(entry->plansource);
+
+ /* Now we can remove the hash table entry */
+ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+ }
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..5b07a88 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,18 @@ pq_peekbyte(void)
}
/* --------------------------------
+ * pq_peekbyte - peek at next byte from connection
+ *
+ * Same as pq_getbyte() except we don't advance the pointer.
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+ return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..4586b57 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend* BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int nNormalBackends;
#ifdef EXEC_BACKEND
static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE PostmasterHandle;
#endif
/*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+ Backend* b = BackendListClockPtr;
+ do {
+ dlist_node* node = &b->elem;
+ node = node->next ? node->next : BackendList.head.next;
+ b = dlist_container(Backend, elem, node);
+ } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+ BackendListClockPtr = b;
+}
+
+/*
* Postmaster main entry point
*/
void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
{
int32 len;
void *buf;
@@ -2043,7 +2068,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx);
}
/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ SessionPoolSock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+ if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+ {
+ if (bp == BackendListClockPtr)
+ AdvanceBackendListClockPtr();
+ if (bp->session_send_sock != PGINVALID_SOCKET)
+ close(bp->session_send_sock);
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+ nNormalBackends -= 1;
+ }
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
break;
}
}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -4017,6 +4058,19 @@ BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ int session_pipe[2];
+
+ if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+ {
+ /* Instead of spawning new backend open new session at one of the existed backends. */
+ Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+ elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+ if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+ elog(FATAL, "Failed to send session socket: %m");
+ AdvanceBackendListClockPtr(); /* round-robin */
+ return STATUS_OK;
+ }
+
/*
* Create backend data structure. Better before the fork() so we can
@@ -4030,7 +4084,6 @@ BackendStartup(Port *port)
errmsg("out of memory")));
return STATUS_ERROR;
}
-
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4116,23 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ if (SessionPoolSize != 0)
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ if (SessionPoolSize != 0)
+ {
+ SessionPoolSock = session_pipe[0];
+ close(session_pipe[1]);
+ }
free(bn);
/* Detangle from postmaster */
@@ -4110,9 +4174,19 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ if (SessionPoolSize != 0)
+ {
+ bn->session_send_sock = session_pipe[1];
+ close(session_pipe[0]);
+ }
+ else
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
-
+ if (BackendListClockPtr == NULL)
+ BackendListClockPtr = bn;
+ nNormalBackends += 1;
+ elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4299,7 +4373,7 @@ BackendInitialize(Port *port)
* Receive the startup packet (which might turn out to be a cancel request
* packet).
*/
- status = ProcessStartupPacket(port, false);
+ status = ProcessStartupPacket(port, false, TopMemoryContext);
/*
* Stop here if it was bad or a cancel packet. ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
#endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..f8abfd0 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,9 +75,17 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
#include "mb/pg_wchar.h"
+typedef struct SessionContext
+{
+ MemoryContext memory;
+ Port* port;
+ char* id;
+} SessionContext;
+
/* ----------------
* global variables
* ----------------
@@ -98,6 +106,8 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
+/* Local socket for redirecting sessions to the backends */
+pgsocket SessionPoolSock = PGINVALID_SOCKET;
/* ----------------
@@ -169,6 +179,11 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static WaitEventSet* SessionPool;
+static int64 SessionCount;
+static SessionContext* CurrentSession;
+static Port* BackendPort;
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -194,6 +209,22 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+ char buf[64];
+ pg_lltoa(++SessionCount, buf);
+ return pstrdup(buf);
+}
+
+static void DeleteSession(SessionContext* session)
+{
+ elog(LOG, "Delete session %p, id=%s, memory context=%p", session, session->id, session->memory);
+ MemoryContextDelete(session->memory);
+ free(session);
+}
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -1232,6 +1263,12 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
/*
* Report query to various monitoring facilities.
*/
@@ -1503,6 +1540,12 @@ exec_bind_message(StringInfo input_message)
portal_name = pq_getmsgstring(input_message);
stmt_name = pq_getmsgstring(input_message);
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
ereport(DEBUG2,
(errmsg("bind %s to %s",
*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2368,12 @@ exec_describe_statement_message(const char *stmt_name)
CachedPlanSource *psrc;
int i;
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
/*
* Start up a transaction command. (Note that this will normally change
* current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3652,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
-
/* ----------------------------------------------------------------
* PostgresMain
* postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3702,21 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Assign session ID if use session pooling */
+ if (SessionPoolSize != 0)
+ {
+ MemoryContext oldcontext;
+ CurrentSession = (SessionContext*)malloc(sizeof(SessionContext));
+ CurrentSession->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(CurrentSession->memory);
+ CurrentSession->id = CreateSessionId();
+ CurrentSession->port = MyProcPort;
+ BackendPort = MyProcPort;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3783,7 +3846,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -4069,6 +4132,120 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ if (SessionPoolSock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+ if (SessionPool == NULL)
+ {
+ SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+ AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CurrentSession);
+ AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CurrentSession);
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, SessionPoolSock, NULL, CurrentSession);
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, CurrentSession);
+ }
+ ChooseSession:
+ DoingCommandRead = true;
+ if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto ChooseSession;
+ }
+
+ if (ready_client.fd == SessionPoolSock)
+ {
+ int status;
+ SessionContext* session;
+ StringInfoData buf;
+ Port* port;
+ pgsocket sock;
+ MemoryContext oldcontext;
+
+ sock = pg_recv_sock(SessionPoolSock);
+ if (sock < 0)
+ elog(FATAL, "Failed to receive session socket: %m");
+
+ session = (SessionContext*)malloc(sizeof(SessionContext));
+ session->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(session->memory);
+ port = palloc(sizeof(Port));
+ memcpy(port, BackendPort, sizeof(Port));
+
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ port->sock = sock;
+ session->port = port;
+ session->id = CreateSessionId();
+
+ MyProcPort = port;
+ status = ProcessStartupPacket(port, false, session->memory);
+ MemoryContextSwitchTo(oldcontext);
+
+ if (strcmp(port->database_name, MyProcPort->database_name) ||
+ strcmp(port->user_name, MyProcPort->user_name))
+ {
+ elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+ port->database_name, port->user_name,
+ MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+ }
+ else if (status == STATUS_OK)
+ {
+ elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+ sock, MyProcPid, port->database_name, port->user_name);
+ CurrentSession = session;
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, session);
+
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ CommitTransactionCommand();
+
+ BeginReportingGUCOptions();
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint32(&buf, (int32) MyProcPid);
+ pq_sendint32(&buf, (int32) MyCancelKey);
+ pq_endmessage(&buf);
+
+ /* Need not flush since ReadyForQuery will do it. */
+ send_ready_for_query = true;
+ continue;
+ }
+ else
+ {
+ DeleteSession(session);
+ elog(LOG, "Session startup failed");
+ close(sock);
+ goto ChooseSession;
+ }
+ }
+ else
+ {
+ elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+ CurrentSession = (SessionContext*)ready_client.user_data;
+ MyProcPort = CurrentSession->port;
+ }
+ }
}
/*
@@ -4350,6 +4527,29 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+ if (SessionPool)
+ {
+ DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+ elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ pq_getmsgend(&input_message);
+ if (pq_is_reading_msg())
+ pq_endmsgread();
+
+ close(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+ MyProcPort = NULL;
+
+ if (CurrentSession)
+ {
+ DropSessionPreparedStatements(CurrentSession->id);
+ DeleteSession(CurrentSession);
+ CurrentSession = NULL;
+ }
+ whereToSendOutput = DestRemote;
+ goto ChooseSession;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int maintenance_work_mem = 16384;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..02373a3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,26 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of client session."),
+ NULL
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets number of backends serving client sessions."),
+ NULL
+ },
+ &SessionPoolSize,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
extern PGDLLIMPORT int max_worker_processes;
extern int max_parallel_workers;
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 63b4e48..191eeaa 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -34,6 +34,7 @@ extern CommandDest whereToSendOutput;
extern PGDLLIMPORT const char *debug_query_string;
extern int max_stack_depth;
extern int PostAuthDelay;
+extern pgsocket SessionPoolSock;
/* GUC-configurable parameters */
On 18.01.2018 18:00, Claudio Freire wrote:
On Thu, Jan 18, 2018 at 11:48 AM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:Attached please find new version of the patch with few fixes.
And more results at NUMA system with 144 cores and 3Tb of RAM.Read-only pgbench (-S):
#Connections\kTPS
Vanilla Postgres
Session pool size 256
1k
1300 1505
10k
633
1519
100k
- 1425Read-write contention test: access to small number of records with
1% of updates.#Clients\TPS Vanilla Postgres Session pool size 256
100 557232 573319
200 520395 551670
300 511423 533773
400 468562 523091
500 442268 514056
600 401860 526704
700 363912 530317
800 325148 512238
900 301310 512844
1000 278829 554516So, as you can see, there is no degrade of performance with increased number of connections in case of using session pooling.
TBH, the tests you should be running are comparisons with a similar
pool size managed by pgbouncer, not just vanilla unlimited postgres.Of course a limited pool size will beat thousands of concurrent
queries by a large margin. The real question is whether a
pthread-based approach beats the pgbouncer approach.
Below are are results with pgbouncer:
#Connections\kTPS
Vanilla Postgres
Builti-in session pool size 256
Postgres + pgbouncer with transaction pooling mode and pool size 256
Postgres + 10 pgbouncers with pool size 20
1k
1300 1505
105
751
10k
633
1519
94
664
100k
- 1425
-
-
(-) here means that I failed to start such number of connections
(because of "resource temporary unavailable" and similar errors).
So single pgbouncer is 10 times slower than direct connection to the
postgres.
No surprise here: pgbouncer is snigle threaded and CPU usage for
pgbouncer is almost 100%.
So we have to launch several instances of pgbouncer and somehow
distribute load between them.
In Linux it is possible to use
REUSEPORT(https://lwn.net/Articles/542629/) to perform load balancing
between several pgbouncer instances.
But you have to edit pgbouncer code: it doesn't support such mode. So I
have started several instances of pgbouncer at different ports and
explicitly distribute several pgbench instances between them.
But even in this case performance is twice slower than direct connection
and built-in session pooling.
It is because of lacked of prepared statements which I can not use with
pgbouncer in statement/transaction pooling mode.
Also please notice that with session pooling performance is better than
with vanilla Postgres.
It is because with session pooling we can open more connections with out
launching more backends.
It is especially noticeable at my local desktop with 4 cores: for normal
Postgres optimal number of connections is about 10. But with session
pooling 100 connections shows about 30% better result.
So, summarizing all above:
1. pgbouncer doesn't allows to use prepared statements and it cause up
to two times performance penalty.
2. pgbouncer is single threaded and can not efficiently handle more than
1k connections.
3. pgbouncer never can provide better performance than application
connected directly to Postgres with optimal number of connections. In
contrast session pooling can provide better performance than vanilla
Postgres with optimal number of connections.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On 01/19/2018 10:52 AM, Konstantin Knizhnik wrote:
On 18.01.2018 18:02, Tomas Vondra wrote:
Hi Konstantin,
On 01/18/2018 03:48 PM, Konstantin Knizhnik wrote:
On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,
...
I haven't looked at the code yet, but after reading your message I have
a simple question - gow iss this going to work with SSL? If you're only
passing a file descriptor, that does not seem to be sufficient for the
backends to do crypto (that requires the SSL stuff from Port).Maybe I'm missing something and it already works, though ...
regards
Ooops, I missed this aspect with SSL. Thank you.
New version of the patch which correctly maintain session context is
attached.
Now each session has its own allocator which should be used instead
of TopMemoryAllocator. SSL connections work now.
OK. I've looked at the code, but I have a rather hard time understanding
it, because there are pretty much no comments explaining the intent of
the added code :-( I strongly suggest improving that, to help reviewers.
The questions I'm asking myself are mostly these:
1) When assigning a backend, we first try to get one from a pool, which
happens right at the beginning of BackendStartup. If we find a usable
backend, we send the info to the backend (pg_send_sock/pg_recv_sock).
But AFAICS this only only happens at connection time, right? But it your
initial message you say "Rescheduling is done at transaction level,"
which in my understanding means "transaction pooling". So, how does that
part work?
2) How does this deal with backends for different databases? I don't see
any checks that the requested database matches the backend database (not
any code switching the backend from one db to another - which would be
very tricky, I think).
3) Is there any sort of shrinking the pools? I mean, if the backend is
idle for certain period of time (or when we need backends for other
databases), does it get closed automatically?
Furthermore, I'm rather confused about the meaning of session_pool_size.
I mean, that GUC determines the number of backends in the pool, it has
nothing to do with sessions per se, right? Which would mean it's a bit
misleading to name it "session_..." (particularly if the pooling happens
at transaction level, not session level - which is question #1).
When I've been thinking about adding a built-in connection pool, my
rough plan was mostly "bgworker doing something like pgbouncer" (that
is, listening on a separate port and proxying everything to regular
backends). Obviously, that has pros and cons, and probably would not
work serve the threading use case well.
But it would have some features that I find valuable - for example, it's
trivial to decide which connection requests may or may not be served
from a pool (by connection to the main port or pool port).
That is not to say the bgworker approach is better than what you're
proposing, but I wonder if that would be possible with your approach.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 19.01.2018 18:53, Tomas Vondra wrote:
On 01/19/2018 10:52 AM, Konstantin Knizhnik wrote:
On 18.01.2018 18:02, Tomas Vondra wrote:
Hi Konstantin,
On 01/18/2018 03:48 PM, Konstantin Knizhnik wrote:
On 17.01.2018 19:09, Konstantin Knizhnik wrote:
Hi hackers,
...
I haven't looked at the code yet, but after reading your message I have
a simple question - gow iss this going to work with SSL? If you're only
passing a file descriptor, that does not seem to be sufficient for the
backends to do crypto (that requires the SSL stuff from Port).Maybe I'm missing something and it already works, though ...
regards
Ooops, I missed this aspect with SSL. Thank you.
New version of the patch which correctly maintain session context is
attached.
Now each session has its own allocator which should be used instead
of TopMemoryAllocator. SSL connections work now.OK. I've looked at the code, but I have a rather hard time understanding
it, because there are pretty much no comments explaining the intent of
the added code :-( I strongly suggest improving that, to help reviewers.
Sorry, sorry, sorry...
There are some comments and I will add more.
The questions I'm asking myself are mostly these:
1) When assigning a backend, we first try to get one from a pool, which
happens right at the beginning of BackendStartup. If we find a usable
backend, we send the info to the backend (pg_send_sock/pg_recv_sock).But AFAICS this only only happens at connection time, right? But it your
initial message you say "Rescheduling is done at transaction level,"
which in my understanding means "transaction pooling". So, how does that
part work?
Here it is:
ChooseSession:
DoingCommandRead = true;
/* Select which client session is ready to send new
query */
if (WaitEventSetWait(SessionPool, -1, &ready_client, 1,
PG_WAIT_CLIENT) != 1)
...
if (ready_client.fd == SessionPoolSock)
{
/* Here we handle case of attaching new session */
...
}
else /* and here we handle case when there is query
(new transaction) from some client */
{
elog(DEBUG2, "Switch to session %d in backend %d",
ready_client.fd, MyProcPid);
CurrentSession =
(SessionContext*)ready_client.user_data;
MyProcPort = CurrentSession->port;
}
2) How does this deal with backends for different databases? I don't see
any checks that the requested database matches the backend database (not
any code switching the backend from one db to another - which would be
very tricky, I think).
As I wrote in the initial mail this problem is not handled now.
It is expected that all clients are connected to the same database using
the same user.
I only check and report an error if this assumption is violated.
Definitely it should be fixed. And it is one of the main challenge with
this approach! And I want to receive some advices from community about
the best ways of solving it.
The problem is that we get information about database/user in
ProcessStartupPackage function in the beackend, when session is already
assigned to the particular backend.
We either have to somehow redirect session to some other backend
(somehow notify postmaster that we are not able to handle it)?
either obtain database/user name in postmaster. But it meas that
ProcessStartupPackage should be called in postmaster and Postmaster has
to read from client's socket.
I afraid that postmaster can be a bottleneck in this case.
The problem can be much easily solved in case of using pthread version
of Postgres. In this case reassigning session to another executor
(thread) can be don much easily.
And there is no need to use unportable trick with passing fiel
descriptor to other process.
And in future I am going to combine them. The problem is that pthread
version of Postgres is still in very raw state.
3) Is there any sort of shrinking the pools? I mean, if the backend is
idle for certain period of time (or when we need backends for other
databases), does it get closed automatically?
When client is disconnected, client session is closed. But backen is not
terminated even if there are no more sessions at this backend.
It was done intentionally, to avoid permanent spawning of new processes
when there is one or few clients which frequently connect/disconnect to
the database.
Furthermore, I'm rather confused about the meaning of session_pool_size.
I mean, that GUC determines the number of backends in the pool, it has
nothing to do with sessions per se, right? Which would mean it's a bit
misleading to name it "session_..." (particularly if the pooling happens
at transaction level, not session level - which is question #1).
Yehh, yes it is not right name. It means maximal number of backends
which should be used to serve client's sessions.
But "max backends" is already used and has completely different meaning.
When I've been thinking about adding a built-in connection pool, my
rough plan was mostly "bgworker doing something like pgbouncer" (that
is, listening on a separate port and proxying everything to regular
backends). Obviously, that has pros and cons, and probably would not
work serve the threading use case well.
And we will get the same problem as with pgbouncer: one process will not
be able to handle all connections...
Certainly it is possible to start several such scheduling bgworkers...
But in any case it is more efficient to multiplex session in backend
themselves.
But it would have some features that I find valuable - for example, it's
trivial to decide which connection requests may or may not be served
from a pool (by connection to the main port or pool port).That is not to say the bgworker approach is better than what you're
proposing, but I wonder if that would be possible with your approach.regards
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
When I've been thinking about adding a built-in connection pool, my
rough plan was mostly "bgworker doing something like pgbouncer" (that
is, listening on a separate port and proxying everything to regular
backends). Obviously, that has pros and cons, and probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one process will not
be able to handle all connections...
Certainly it is possible to start several such scheduling bgworkers... But
in any case it is more efficient to multiplex session in backend themselves.
pgbouncer hold all time client connect. When we implement the listeners,
then all work can be done by worker processes not by listeners.
Regards
Pavel
Show quoted text
But it would have some features that I find valuable - for example, it's
trivial to decide which connection requests may or may not be served
from a pool (by connection to the main port or pool port).That is not to say the bgworker approach is better than what you're
proposing, but I wonder if that would be possible with your approach.regards
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in connection
pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything to
regular
backends). Obviously, that has pros and cons, and probably
would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one process
will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to multiplex
session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by listeners.
Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only once at
the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is 100% and
it is mostly in kernel space: top of profile are kernel functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get speed more
than 100k, while at powerful NUMA system it is possible to achieve
millions of TPS.
It is illustrated by the results I have sent in the previous mail: by
spawning 10 instances of pgbouncer I was able to receive 7 times bigger
speed.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On 01/19/2018 05:17 PM, Konstantin Knizhnik wrote:
On 19.01.2018 18:53, Tomas Vondra wrote:
...
The questions I'm asking myself are mostly these:
1) When assigning a backend, we first try to get one from a pool, which
happens right at the beginning of BackendStartup. If we find a usable
backend, we send the info to the backend (pg_send_sock/pg_recv_sock).But AFAICS this only only happens at connection time, right? But it your
initial message you say "Rescheduling is done at transaction level,"
which in my understanding means "transaction pooling". So, how does that
part work?Here it is:
ChooseSession:
...
OK, thanks.
2) How does this deal with backends for different databases? I
don't see any checks that the requested database matches the
backend database (not any code switching the backend from one db to
another - which would be very tricky, I think).As I wrote in the initial mail this problem is not handled now.
It is expected that all clients are connected to the same database using
the same user.
I only check and report an error if this assumption is violated.
Definitely it should be fixed. And it is one of the main challenge with
this approach! And I want to receive some advices from community about
the best ways of solving it.
The problem is that we get information about database/user in
ProcessStartupPackage function in the beackend, when session is already
assigned to the particular backend.
We either have to somehow redirect session to some other backend
(somehow notify postmaster that we are not able to handle it)?
either obtain database/user name in postmaster. But it meas that
ProcessStartupPackage should be called in postmaster and Postmaster has
to read from client's socket.
I afraid that postmaster can be a bottleneck in this case.
Hmmm, that's unfortunate. I guess you'll have process the startup packet
in the main process, before it gets forked. At least partially.
The problem can be much easily solved in case of using pthread version
of Postgres. In this case reassigning session to another executor
(thread) can be don much easily.
And there is no need to use unportable trick with passing fiel
descriptor to other process.
And in future I am going to combine them. The problem is that pthread
version of Postgres is still in very raw state.
Yeah. Unfortunately, we're using processes now, and switching to threads
will take time (assuming it happens at all).
3) Is there any sort of shrinking the pools? I mean, if the backend is
idle for certain period of time (or when we need backends for other
databases), does it get closed automatically?When client is disconnected, client session is closed. But backen is not
terminated even if there are no more sessions at this backend.
It was done intentionally, to avoid permanent spawning of new processes
when there is one or few clients which frequently connect/disconnect to
the database.
Sure, but it means a short peak will exhaust the backends indefinitely.
That's acceptable for a PoC, but I think needs to be fixed eventually.
Furthermore, I'm rather confused about the meaning of session_pool_size.
I mean, that GUC determines the number of backends in the pool, it has
nothing to do with sessions per se, right? Which would mean it's a bit
misleading to name it "session_..." (particularly if the pooling happens
at transaction level, not session level - which is question #1).Yehh, yes it is not right name. It means maximal number of backends
which should be used to serve client's sessions.
But "max backends" is already used and has completely different meaning.When I've been thinking about adding a built-in connection pool, my
rough plan was mostly "bgworker doing something like pgbouncer" (that
is, listening on a separate port and proxying everything to regular
backends). Obviously, that has pros and cons, and probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one process will not
be able to handle all connections...
Certainly it is possible to start several such scheduling bgworkers...
But in any case it is more efficient to multiplex session in backend
themselves.
Well, I haven't said it has to be single-threaded like pgbouncer. I
don't see why the bgworker could not use multiple threads internally (of
course, it'd need to be not to mess the stuff that is not thread-safe).
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
2018-01-19 17:53 GMT+01:00 Konstantin Knizhnik <k.knizhnik@postgrespro.ru>:
On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in connection pool, my
rough plan was mostly "bgworker doing something like pgbouncer" (that
is, listening on a separate port and proxying everything to regular
backends). Obviously, that has pros and cons, and probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one process will not
be able to handle all connections...
Certainly it is possible to start several such scheduling bgworkers...
But in any case it is more efficient to multiplex session in backend
themselves.pgbouncer hold all time client connect. When we implement the listeners,
then all work can be done by worker processes not by listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only once at the
beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is 100% and
it is mostly in kernel space: top of profile are kernel functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several hundreds
of connection switches per second.
So with single multiplexing thread or process you can not get speed more
than 100k, while at powerful NUMA system it is possible to achieve millions
of TPS.
It is illustrated by the results I have sent in the previous mail: by
spawning 10 instances of pgbouncer I was able to receive 7 times bigger
speed.
pgbouncer is proxy sw. I don't think so native pooler should be proxy too.
So the compare pgbouncer with hypothetical native pooler is not fair,
because pgbouncer pass all communication
Regards
Pavel
Show quoted text
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On 01/19/2018 05:53 PM, Konstantin Knizhnik wrote:
On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in connection
pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything to
regular
backends). Obviously, that has pros and cons, and probably
would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one process
will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to multiplex
session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only once at
the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is 100% and
it is mostly in kernel space: top of profile are kernel functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get speed more
than 100k, while at powerful NUMA system it is possible to achieve
millions of TPS.
It is illustrated by the results I have sent in the previous mail: by
spawning 10 instances of pgbouncer I was able to receive 7 times bigger
speed.
AFAICS making pgbouncer multi-threaded would not be hugely complicated.
A simple solution would be a fixed number of worker threads, and client
connections randomly assigned to them.
But this generally is not a common bottleneck in practical workloads (of
course, YMMV).
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Fri, Jan 19, 2018 at 1:53 PM, Konstantin Knizhnik <
k.knizhnik@postgrespro.ru> wrote:
On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in connection pool, my
rough plan was mostly "bgworker doing something like pgbouncer" (that
is, listening on a separate port and proxying everything to regular
backends). Obviously, that has pros and cons, and probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one process will not
be able to handle all connections...
Certainly it is possible to start several such scheduling bgworkers...
But in any case it is more efficient to multiplex session in backend
themselves.pgbouncer hold all time client connect. When we implement the listeners,
then all work can be done by worker processes not by listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only once at the
beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is 100% and
it is mostly in kernel space: top of profile are kernel functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several hundreds
of connection switches per second.
So with single multiplexing thread or process you can not get speed more
than 100k, while at powerful NUMA system it is possible to achieve millions
of TPS.
It is illustrated by the results I have sent in the previous mail: by
spawning 10 instances of pgbouncer I was able to receive 7 times bigger
speed.
I'm sure pgbouncer can be improved. I've seen async code handle millions of
packets per second (zmq), pgbouncer shouldn't be radically different.
On 01/19/2018 06:03 PM, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 1:53 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in connection
pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything
to regular
backends). Obviously, that has pros and cons, and probably
would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one
process will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to multiplex
session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by
listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only once
at the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is 100%
and it is mostly in kernel space: top of profile are kernel functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get speed
more than 100k, while at powerful NUMA system it is possible to
achieve millions of TPS.
It is illustrated by the results I have sent in the previous mail:
by spawning 10 instances of pgbouncer I was able to receive 7 times
bigger speed.I'm sure pgbouncer can be improved. I've seen async code handle millions
of packets per second (zmq), pgbouncer shouldn't be radically different.
The trouble is pgbouncer is not handling individual packets. It needs to
do additional processing to assemble the messages, understand the state
of the connection (e.g. to do transaction pooling) etc. Or handle SSL.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 19.01.2018 19:59, Tomas Vondra wrote:
The problem can be much easily solved in case of using pthread version
of Postgres. In this case reassigning session to another executor
(thread) can be don much easily.
And there is no need to use unportable trick with passing fiel
descriptor to other process.
And in future I am going to combine them. The problem is that pthread
version of Postgres is still in very raw state.Yeah. Unfortunately, we're using processes now, and switching to threads
will take time (assuming it happens at all).
I have to agree with you.
3) Is there any sort of shrinking the pools? I mean, if the backend is
idle for certain period of time (or when we need backends for other
databases), does it get closed automatically?When client is disconnected, client session is closed. But backen is not
terminated even if there are no more sessions at this backend.
It was done intentionally, to avoid permanent spawning of new processes
when there is one or few clients which frequently connect/disconnect to
the database.Sure, but it means a short peak will exhaust the backends indefinitely.
That's acceptable for a PoC, but I think needs to be fixed eventually.
Sorry, I do not understand it.
You specify size of backends pool which will server client session.
Size of this pool is chosen to provide the best performance at the
particular system and workload.
So number of backends will never exceed this optimal value even in case
of "short peak".
From my point of view terminating backends when there are no active
sessions is wrong idea in any case, it was not temporary decision just
for PoC.
Well, I haven't said it has to be single-threaded like pgbouncer. I
don't see why the bgworker could not use multiple threads internally (of
course, it'd need to be not to mess the stuff that is not thread-safe).
Certainly architecture with N multiple scheduling bgworkers and M
executors (backends) may be more flexible
than solution when scheduling is done in executor itself. But we will
have to pay extra cost for redirection.
I am not sure that finally it will allow to reach better performance.
More flexible solution in many cases doesn't mean more efficient solution.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On Fri, Jan 19, 2018 at 2:06 PM, Tomas Vondra <tomas.vondra@2ndquadrant.com>
wrote:
On 01/19/2018 06:03 PM, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 1:53 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in connection
pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything
to regular
backends). Obviously, that has pros and cons, and probably
would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one
process will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to multiplex
session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by
listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only once
at the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is 100%
and it is mostly in kernel space: top of profile are kernelfunctions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get speed
more than 100k, while at powerful NUMA system it is possible to
achieve millions of TPS.
It is illustrated by the results I have sent in the previous mail:
by spawning 10 instances of pgbouncer I was able to receive 7 times
bigger speed.I'm sure pgbouncer can be improved. I've seen async code handle millions
of packets per second (zmq), pgbouncer shouldn't be radically different.The trouble is pgbouncer is not handling individual packets. It needs to
do additional processing to assemble the messages, understand the state
of the connection (e.g. to do transaction pooling) etc. Or handle SSL.
I understand. But zmq also has to process framing very similar to the fe
protocol, so I'm still hopeful.
On 19.01.2018 20:01, Pavel Stehule wrote:
2018-01-19 17:53 GMT+01:00 Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>>:On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in
connection pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything
to regular
backends). Obviously, that has pros and cons, and
probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one
process will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to
multiplex session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by
listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only
once at the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is
100% and it is mostly in kernel space: top of profile are kernel
functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get
speed more than 100k, while at powerful NUMA system it is possible
to achieve millions of TPS.
It is illustrated by the results I have sent in the previous mail:
by spawning 10 instances of pgbouncer I was able to receive 7
times bigger speed.pgbouncer is proxy sw. I don't think so native pooler should be proxy
too. So the compare pgbouncer with hypothetical native pooler is not
fair, because pgbouncer pass all communication
If we will have separate scheduling bgworker(s) as Tomas proposed, then
in any case we will have to do some kind of redirection.
It can be done in more efficient way than using Unix sockets (as it is
in case of locally installed pgbouncer), but even if we use shared
memory queue then
performance will be comparable and limited by number of context
switches. It is possible to increase it by combining several requests
into one parcel.
But it even more complicate communication protocol between clients,
scheduling proxies and executors.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On Fri, Jan 19, 2018 at 2:07 PM, Konstantin Knizhnik <
k.knizhnik@postgrespro.ru> wrote:
Well, I haven't said it has to be single-threaded like pgbouncer. I
don't see why the bgworker could not use multiple threads internally (of
course, it'd need to be not to mess the stuff that is not thread-safe).Certainly architecture with N multiple scheduling bgworkers and M
executors (backends) may be more flexible
than solution when scheduling is done in executor itself. But we will have
to pay extra cost for redirection.
I am not sure that finally it will allow to reach better performance.
More flexible solution in many cases doesn't mean more efficient solution.
I think you can take the best of both worlds.
You can take your approach of passing around fds, and build a "load
balancing protocol" in a bgworker.
The postmaster sends the socket to the bgworker, the bgworker waits for a
command as pgbouncer does, but instead of proxying everything, when
commands arrive, it passes the socket to a backend to handle.
That way, the bgworker can do what pgbouncer does, handle different pooling
modes, match backends to databases, etc, but it doesn't have to proxy all
data, it just delegates handling of a command to a backend, and forgets
about that socket.
Sounds like it could work.
On 19.01.2018 20:03, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 1:53 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in
connection pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything
to regular
backends). Obviously, that has pros and cons, and
probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one
process will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to
multiplex session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by
listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only
once at the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is
100% and it is mostly in kernel space: top of profile are kernel
functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get
speed more than 100k, while at powerful NUMA system it is possible
to achieve millions of TPS.
It is illustrated by the results I have sent in the previous mail:
by spawning 10 instances of pgbouncer I was able to receive 7
times bigger speed.I'm sure pgbouncer can be improved. I've seen async code handle
millions of packets per second (zmq), pgbouncer shouldn't be radically
different.
With pgbouncer you will never be able to use prepared statements which
slows down simple queries almost twice (unless my patch with
autoprepared statements is committed).
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On 01/19/2018 06:07 PM, Konstantin Knizhnik wrote:
...
3) Is there any sort of shrinking the pools? I mean, if the backend is
idle for certain period of time (or when we need backends for other
databases), does it get closed automatically?When client is disconnected, client session is closed. But backen is not
terminated even if there are no more sessions at this backend.
It was done intentionally, to avoid permanent spawning of new processes
when there is one or few clients which frequently connect/disconnect to
the database.Sure, but it means a short peak will exhaust the backends indefinitely.
That's acceptable for a PoC, but I think needs to be fixed eventually.Sorry, I do not understand it.
You specify size of backends pool which will server client session.
Size of this pool is chosen to provide the best performance at the
particular system and workload.
So number of backends will never exceed this optimal value even in case
of "short peak".
From my point of view terminating backends when there are no active
sessions is wrong idea in any case, it was not temporary decision just
for PoC.
That is probably true when there is just a single pool (for one
database/user). But when there are multiple such pools, it forces you to
keep the sum(pool_size) below max_connections. Which seems strange.
I do think the ability to evict backends after some timeout, or when
there is pressure in other pools (different user/database) is rather useful.
Well, I haven't said it has to be single-threaded like pgbouncer. I
don't see why the bgworker could not use multiple threads internally (of
course, it'd need to be not to mess the stuff that is not thread-safe).Certainly architecture with N multiple scheduling bgworkers and M
executors (backends) may be more flexible
than solution when scheduling is done in executor itself. But we will
have to pay extra cost for redirection.I am not sure that finally it will allow to reach better performance.
More flexible solution in many cases doesn't mean more efficient solution.
Sure, I wasn't really suggesting it's a clear win. I was responding to
your argument that pgbouncer in some cases reaches 100% CPU utilization
- that can be mitigated to a large extent by adding threads. Of course,
the cost for extra level of indirection is not zero.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 01/19/2018 06:13 PM, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 2:07 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:Well, I haven't said it has to be single-threaded like pgbouncer. I
don't see why the bgworker could not use multiple threads
internally (of
course, it'd need to be not to mess the stuff that is not
thread-safe).Certainly architecture with N multiple scheduling bgworkers and M
executors (backends) may be more flexible
than solution when scheduling is done in executor itself. But we
will have to pay extra cost for redirection.
I am not sure that finally it will allow to reach better performance.
More flexible solution in many cases doesn't mean more efficient
solution.I think you can take the best of both worlds.
You can take your approach of passing around fds, and build a "load
balancing protocol" in a bgworker.The postmaster sends the socket to the bgworker, the bgworker waits for
a command as pgbouncer does, but instead of proxying everything, when
commands arrive, it passes the socket to a backend to handle.That way, the bgworker can do what pgbouncer does, handle different
pooling modes, match backends to databases, etc, but it doesn't have to
proxy all data, it just delegates handling of a command to a backend,
and forgets about that socket.Sounds like it could work.
How could it do all that without actually processing all the data? For
example, how could it determine the statement/transaction boundaries?
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 01/19/2018 06:19 PM, Konstantin Knizhnik wrote:
On 19.01.2018 20:03, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 1:53 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:On 19.01.2018 19:28, Pavel Stehule wrote:
When I've been thinking about adding a built-in
connection pool, my
rough plan was mostly "bgworker doing something like
pgbouncer" (that
is, listening on a separate port and proxying everything
to regular
backends). Obviously, that has pros and cons, and
probably would not
work serve the threading use case well.And we will get the same problem as with pgbouncer: one
process will not be able to handle all connections...
Certainly it is possible to start several such scheduling
bgworkers... But in any case it is more efficient to
multiplex session in backend themselves.pgbouncer hold all time client connect. When we implement the
listeners, then all work can be done by worker processes not by
listeners.Sorry, I do not understand your point.
In my case pgbench establish connection to the pgbouncer only
once at the beginning of the test.
And pgbouncer spends all time in context switches (CPU usage is
100% and it is mostly in kernel space: top of profile are kernel
functions).
The same picture will be if instead of pgbouncer you will do such
scheduling in one bgworker.
For the modern systems are not able to perform more than several
hundreds of connection switches per second.
So with single multiplexing thread or process you can not get
speed more than 100k, while at powerful NUMA system it is possible
to achieve millions of TPS.
It is illustrated by the results I have sent in the previous mail:
by spawning 10 instances of pgbouncer I was able to receive 7
times bigger speed.I'm sure pgbouncer can be improved. I've seen async code handle
millions of packets per second (zmq), pgbouncer shouldn't be radically
different.With pgbouncer you will never be able to use prepared statements which
slows down simple queries almost twice (unless my patch with
autoprepared statements is committed).
I don't see why that wouldn't be possible? Perhaps not for prepared
statements with simple protocol, but I'm pretty sure it's doable for
extended protocol (which seems like a reasonable limitation).
That being said, I think it's a mistake to turn this thread into a
pgbouncer vs. the world battle. I could name things that are possible
only with standalone connection pool - e.g. pausing connections and
restarting the database without interrupting the clients.
But that does not mean built-in connection pool is not useful.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Fri, Jan 19, 2018 at 2:22 PM, Tomas Vondra <tomas.vondra@2ndquadrant.com>
wrote:
On 01/19/2018 06:13 PM, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 2:07 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru>> wrote:Well, I haven't said it has to be single-threaded like
pgbouncer. I
don't see why the bgworker could not use multiple threads
internally (of
course, it'd need to be not to mess the stuff that is not
thread-safe).Certainly architecture with N multiple scheduling bgworkers and M
executors (backends) may be more flexible
than solution when scheduling is done in executor itself. But we
will have to pay extra cost for redirection.
I am not sure that finally it will allow to reach better performance.
More flexible solution in many cases doesn't mean more efficient
solution.I think you can take the best of both worlds.
You can take your approach of passing around fds, and build a "load
balancing protocol" in a bgworker.The postmaster sends the socket to the bgworker, the bgworker waits for
a command as pgbouncer does, but instead of proxying everything, when
commands arrive, it passes the socket to a backend to handle.That way, the bgworker can do what pgbouncer does, handle different
pooling modes, match backends to databases, etc, but it doesn't have to
proxy all data, it just delegates handling of a command to a backend,
and forgets about that socket.Sounds like it could work.
How could it do all that without actually processing all the data? For
example, how could it determine the statement/transaction boundaries?
It only needs to determine statement/transaction start.
After that, it hands off the connection to a backend, and the backend
determines when to give it back.
So instead of processing all the data, it only processes a tiny part of it.
On 01/19/2018 07:35 PM, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 2:22 PM, Tomas Vondra
<tomas.vondra@2ndquadrant.com <mailto:tomas.vondra@2ndquadrant.com>> wrote:On 01/19/2018 06:13 PM, Claudio Freire wrote:
On Fri, Jan 19, 2018 at 2:07 PM, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru <mailto:k.knizhnik@postgrespro.ru><mailto:k.knizhnik@postgrespro.ru
<mailto:k.knizhnik@postgrespro.ru>>> wrote:Well, I haven't said it has to be single-threaded like
pgbouncer. I
don't see why the bgworker could not use multiple threads
internally (of
course, it'd need to be not to mess the stuff that is not
thread-safe).Certainly architecture with N multiple scheduling bgworkers and M
executors (backends) may be more flexible
than solution when scheduling is done in executor itself. But we
will have to pay extra cost for redirection.
I am not sure that finally it will allow to reach betterperformance.
More flexible solution in many cases doesn't mean more efficient
solution.I think you can take the best of both worlds.
You can take your approach of passing around fds, and build a "load
balancing protocol" in a bgworker.The postmaster sends the socket to the bgworker, the bgworker
waits for
a command as pgbouncer does, but instead of proxying everything, when
commands arrive, it passes the socket to a backend to handle.That way, the bgworker can do what pgbouncer does, handle different
pooling modes, match backends to databases, etc, but it doesn'thave to
proxy all data, it just delegates handling of a command to a backend,
and forgets about that socket.Sounds like it could work.
How could it do all that without actually processing all the data? For
example, how could it determine the statement/transaction boundaries?It only needs to determine statement/transaction start.
After that, it hands off the connection to a backend, and the
backend determines when to give it back.So instead of processing all the data, it only processes a tiny part of it.
How exactly would the backend "give back" the connection? The only way
for the backend and pgbouncer to communicate is by embedding information
in the data stream. Which means pgbouncer still has to parse it.
Furthermore, those are not the only bits of information pgbouncer may
need. For example, if pgbouncer gets improved to handle prepared
statements (which is likely) it'd need to handle PARSE/BIND/EXECUTE. And
it already needs to handle SET parameters. And so on.
In any case, this discussion is somewhat off topic in this thread, so
let's not hijack it.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On 19.01.2018 20:28, Tomas Vondra wrote:
With pgbouncer you will never be able to use prepared statements which
slows down simple queries almost twice (unless my patch with
autoprepared statements is committed).I don't see why that wouldn't be possible? Perhaps not for prepared
statements with simple protocol, but I'm pretty sure it's doable for
extended protocol (which seems like a reasonable limitation).That being said, I think it's a mistake to turn this thread into a
pgbouncer vs. the world battle. I could name things that are possible
only with standalone connection pool - e.g. pausing connections and
restarting the database without interrupting the clients.But that does not mean built-in connection pool is not useful.
regards
Sorry, I do not understand how extended protocol can help to handle
prepared statements without shared prepared statement cache or built-in
connection pooling.
The problems is that now in Postgres most of caches including catalog
cache, relation cache, prepared statements cache are private to a backend.
There is certainly one big advantage of such approach: no need to
synchronize access to the cache. But it seems to be the only advantage.
And there are a lot of drawbacks:
inefficient use of memory, complex invalidation mechanism, not
compatible with connection pooling...
So there are three possible ways (may be more, but I know only three):
1. Implement built-in connection pooling which will be aware of proper
use of local caches. This is what I have implemented with the proposed
approach.
2. Implicit autoprepare. Clients will not be able to use standard
Postgres prepare mechanism, but executor will try to generate generic
plan for ordinary queries. My implementation of this approach is at
commit fest.
3. Global caches. It seems to be the best solution but the most
difficult to implement.
Actually I think that the discussion about the value of built-in
connection pooling is very important.
Yes, external connection pooling is more flexible. It allows to perform
pooling either at client side either at server side (or even combine two
approaches).
Also external connection pooling for PostgreSQL is not limited by
pgbouncer/pgpool.
There are many frameworks maintaining their own connection pool, for
example J2EE, jboss, hibernate,...
I have a filling than about 70% of enterprise systems working with
databases are written in Java and doing connection pooling in their own way.
So may be embedded connection pooling is not needed for such applications...
But what I have heard from main people is that Postgres' poor connection
pooling is one of the main drawbacks of Postgres complicating it's usage
in enterprise environments.
In any case please find updated patch with some code cleanup and more
comments added.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Attachments:
session_pool-4.patchtext/x-patch; name=session_pool-4.patchDownload
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b945b15..8e8a737 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -813,3 +813,32 @@ build_regtype_array(Oid *param_types, int num_params)
result = construct_array(tmp_ary, num_params, REGTYPEOID, 4, true, 'i');
return PointerGetDatum(result);
}
+
+/*
+ * Drop all statements prepared in the specified session.
+ */
+void
+DropSessionPreparedStatements(char const* sessionId)
+{
+ HASH_SEQ_STATUS seq;
+ PreparedStatement *entry;
+ size_t idLen = strlen(sessionId);
+
+ /* nothing cached */
+ if (!prepared_queries)
+ return;
+
+ /* walk over cache */
+ hash_seq_init(&seq, prepared_queries);
+ while ((entry = hash_seq_search(&seq)) != NULL)
+ {
+ if (strncmp(entry->stmt_name, sessionId, idLen) == 0 && entry->stmt_name[idLen] == '.')
+ {
+ /* Release the plancache entry */
+ DropCachedPlan(entry->plansource);
+
+ /* Now we can remove the hash table entry */
+ hash_search(prepared_queries, entry->stmt_name, HASH_REMOVE, NULL);
+ }
+ }
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..7f40edb 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1029,6 +1029,17 @@ pq_peekbyte(void)
}
/* --------------------------------
+ * pq_available_bytes - get number of buffered bytes available for reading.
+ *
+ * --------------------------------
+ */
+int
+pq_available_bytes(void)
+{
+ return PqRecvLength - PqRecvPointer;
+}
+
+/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
*
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index aba1e92..56ec998 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = atomics.o dynloader.o pg_sema.o pg_shmem.o send_sock.o $(TAS)
ifeq ($(PORTNAME), win32)
SUBDIRS += win32
diff --git a/src/backend/port/send_sock.c b/src/backend/port/send_sock.c
new file mode 100644
index 0000000..7b36923
--- /dev/null
+++ b/src/backend/port/send_sock.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * send_sock.c
+ * Send socket descriptor to another process
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/port/send_sock.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/*
+ * Send socket descriptor "sock" to backend process through Unix socket "chan"
+ */
+int pg_send_sock(pgsocket chan, pgsocket sock)
+{
+ struct msghdr msg = { 0 };
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ char buf[CMSG_SPACE(sizeof(sock))];
+ memset(buf, '\0', sizeof(buf));
+
+ /* On Mac OS X, the struct iovec is needed, even if it points to minimal data */
+ io.iov_base = "";
+ io.iov_len = 1;
+
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(sock));
+
+ memcpy(CMSG_DATA(cmsg), &sock, sizeof(sock));
+ msg.msg_controllen = cmsg->cmsg_len;
+
+ if (sendmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+ return 0;
+}
+
+
+/*
+ * Receive socket descriptor from postmaster process through Unix socket "chan"
+ */
+pgsocket pg_recv_sock(pgsocket chan)
+{
+ struct msghdr msg = {0};
+ char c_buffer[256];
+ char m_buffer[256];
+ struct iovec io;
+ struct cmsghdr * cmsg;
+ pgsocket sock;
+
+ io.iov_base = m_buffer;
+ io.iov_len = sizeof(m_buffer);
+ msg.msg_iov = &io;
+ msg.msg_iovlen = 1;
+
+ msg.msg_control = c_buffer;
+ msg.msg_controllen = sizeof(c_buffer);
+
+ if (recvmsg(chan, &msg, 0) < 0)
+ {
+ return -1;
+ }
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ memcpy(&sock, CMSG_DATA(cmsg), sizeof(sock));
+
+ return sock;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index f3ddf82..2554075 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -169,6 +169,7 @@ typedef struct bkend
pid_t pid; /* process id of backend */
int32 cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
+ pgsocket session_send_sock; /* Write end of socket pipe to this backend used to send session socket descriptor to the backend process */
/*
* Flavor of backend or auxiliary process. Note that BACKEND_TYPE_WALSND
@@ -182,6 +183,15 @@ typedef struct bkend
} Backend;
static dlist_head BackendList = DLIST_STATIC_INIT(BackendList);
+/*
+ * Pointer in backend list used to implement round-robin distribution of sessions through backends.
+ * This variable either NULL, either points to the normal backend.
+ */
+static Backend* BackendListClockPtr;
+/*
+ * Number of active normal backends
+ */
+static int nNormalBackends;
#ifdef EXEC_BACKEND
static Backend *ShmemBackendArray;
@@ -412,7 +422,6 @@ static void BackendRun(Port *port) pg_attribute_noreturn();
static void ExitPostmaster(int status) pg_attribute_noreturn();
static int ServerLoop(void);
static int BackendStartup(Port *port);
-static int ProcessStartupPacket(Port *port, bool SSLdone);
static void SendNegotiateProtocolVersion(List *unrecognized_protocol_options);
static void processCancelRequest(Port *port, void *pkt);
static int initMasks(fd_set *rmask);
@@ -568,6 +577,22 @@ HANDLE PostmasterHandle;
#endif
/*
+ * Move current backend pointer to the next normal backend.
+ * This function is called either when new session is started to implement round-robin policy, either when backend pointer by BackendListClockPtr is terminated
+ */
+static void AdvanceBackendListClockPtr(void)
+{
+ Backend* b = BackendListClockPtr;
+ do {
+ dlist_node* node = &b->elem;
+ node = node->next ? node->next : BackendList.head.next;
+ b = dlist_container(Backend, elem, node);
+ } while (b->bkend_type != BACKEND_TYPE_NORMAL && b != BackendListClockPtr);
+
+ BackendListClockPtr = b;
+}
+
+/*
* Postmaster main entry point
*/
void
@@ -1944,8 +1969,8 @@ initMasks(fd_set *rmask)
* send anything to the client, which would typically be appropriate
* if we detect a communications failure.)
*/
-static int
-ProcessStartupPacket(Port *port, bool SSLdone)
+int
+ProcessStartupPacket(Port *port, bool SSLdone, MemoryContext memctx)
{
int32 len;
void *buf;
@@ -2043,7 +2068,7 @@ retry1:
#endif
/* regular startup packet, cancel, etc packet should follow... */
/* but not another SSL negotiation request */
- return ProcessStartupPacket(port, true);
+ return ProcessStartupPacket(port, true, memctx);
}
/* Could add additional special packet types here */
@@ -2073,7 +2098,7 @@ retry1:
* not worry about leaking this storage on failure, since we aren't in the
* postmaster process anymore.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ oldcontext = MemoryContextSwitchTo(memctx);
if (PG_PROTOCOL_MAJOR(proto) >= 3)
{
@@ -2449,7 +2474,7 @@ ConnCreate(int serverFd)
ConnFree(port);
return NULL;
}
-
+ SessionPoolSock = PGINVALID_SOCKET;
/*
* Allocate GSSAPI specific state struct
*/
@@ -3236,6 +3261,24 @@ CleanupBackgroundWorker(int pid,
}
/*
+ * Unlink backend from backend's list and free memory
+ */
+static void UnlinkBackend(Backend* bp)
+{
+ if (bp->bkend_type == BACKEND_TYPE_NORMAL)
+ {
+ if (bp == BackendListClockPtr)
+ AdvanceBackendListClockPtr();
+ if (bp->session_send_sock != PGINVALID_SOCKET)
+ close(bp->session_send_sock);
+ elog(DEBUG2, "Cleanup backend %d", bp->pid);
+ nNormalBackends -= 1;
+ }
+ dlist_delete(&bp->elem);
+ free(bp);
+}
+
+/*
* CleanupBackend -- cleanup after terminated backend.
*
* Remove all local state associated with backend.
@@ -3312,8 +3355,7 @@ CleanupBackend(int pid,
*/
BackgroundWorkerStopNotifications(bp->pid);
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
break;
}
}
@@ -3415,8 +3457,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
ShmemBackendArrayRemove(bp);
#endif
}
- dlist_delete(iter.cur);
- free(bp);
+ UnlinkBackend(bp);
/* Keep looping so we can signal remaining backends */
}
else
@@ -4017,6 +4058,20 @@ BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
+ int session_pipe[2];
+
+ if (SessionPoolSize != 0 && nNormalBackends >= SessionPoolSize)
+ {
+ /* In case of session pooling instead of spawning new backend open new session at one of the existed backends. */
+ Assert(BackendListClockPtr && BackendListClockPtr->session_send_sock != PGINVALID_SOCKET);
+ elog(DEBUG2, "Start new session for socket %d at backend %d total %d", port->sock, BackendListClockPtr->pid, nNormalBackends);
+ /* Send connection socket to the backend pointed by BackendListClockPtr */
+ if (pg_send_sock(BackendListClockPtr->session_send_sock, port->sock) < 0)
+ elog(FATAL, "Failed to send session socket: %m");
+ AdvanceBackendListClockPtr(); /* round-robin backends */
+ return STATUS_OK;
+ }
+
/*
* Create backend data structure. Better before the fork() so we can
@@ -4030,7 +4085,6 @@ BackendStartup(Port *port)
errmsg("out of memory")));
return STATUS_ERROR;
}
-
/*
* Compute the cancel key that will be assigned to this backend. The
* backend will have its own copy in the forked-off process' value of
@@ -4063,12 +4117,24 @@ BackendStartup(Port *port)
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
+ /* Create socket pair for sending session sockets to the backend */
+ if (SessionPoolSize != 0)
+ if (socketpair(AF_UNIX, SOCK_DGRAM, 0, session_pipe) < 0)
+ ereport(FATAL,
+ (errcode_for_file_access(),
+ errmsg_internal("could not create socket pair for launching sessions: %m")));
+
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
pid = fork_process();
if (pid == 0) /* child */
{
+ if (SessionPoolSize != 0)
+ {
+ SessionPoolSock = session_pipe[0]; /* Use this socket for receiving client session socket descriptor */
+ close(session_pipe[1]); /* Close unused end of the pipe */
+ }
free(bn);
/* Detangle from postmaster */
@@ -4110,9 +4176,19 @@ BackendStartup(Port *port)
* of backends.
*/
bn->pid = pid;
+ if (SessionPoolSize != 0)
+ {
+ bn->session_send_sock = session_pipe[1]; /* Use this socket for sending client session socket descriptor */
+ close(session_pipe[0]); /* Close unused end of the pipe */
+ }
+ else
+ bn->session_send_sock = PGINVALID_SOCKET;
bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */
dlist_push_head(&BackendList, &bn->elem);
-
+ if (BackendListClockPtr == NULL)
+ BackendListClockPtr = bn;
+ nNormalBackends += 1;
+ elog(DEBUG2, "Start backend %d total %d", pid, nNormalBackends);
#ifdef EXEC_BACKEND
if (!bn->dead_end)
ShmemBackendArrayAdd(bn);
@@ -4299,7 +4375,7 @@ BackendInitialize(Port *port)
* Receive the startup packet (which might turn out to be a cancel request
* packet).
*/
- status = ProcessStartupPacket(port, false);
+ status = ProcessStartupPacket(port, false, TopMemoryContext);
/*
* Stop here if it was bad or a cancel packet. ProcessStartupPacket
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index e6706f7..9c42fab 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -76,6 +76,7 @@ struct WaitEventSet
{
int nevents; /* number of registered events */
int nevents_space; /* maximum number of events in this set */
+ int free_events; /* L1-list of free events linked by "pos" and terminated by -1*/
/*
* Array, of nevents_space length, storing the definition of events this
@@ -129,7 +130,7 @@ static void drainSelfPipe(void);
#if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
#elif defined(WAIT_USE_POLL)
-static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
#elif defined(WAIT_USE_WIN32)
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
#endif
@@ -562,6 +563,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
set->latch = NULL;
set->nevents_space = nevents;
+ set->free_events = -1;
#if defined(WAIT_USE_EPOLL)
#ifdef EPOLL_CLOEXEC
@@ -667,6 +669,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
void *user_data)
{
WaitEvent *event;
+ int free_event;
/* not enough space */
Assert(set->nevents < set->nevents_space);
@@ -690,8 +693,19 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
- event = &set->events[set->nevents];
- event->pos = set->nevents++;
+ free_event = set->free_events;
+ if (free_event >= 0)
+ {
+ event = &set->events[free_event];
+ set->free_events = event->pos;
+ event->pos = free_event;
+ }
+ else
+ {
+ event = &set->events[set->nevents];
+ event->pos = set->nevents;
+ }
+ set->nevents += 1;
event->fd = fd;
event->events = events;
event->user_data = user_data;
@@ -718,7 +732,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -727,6 +741,27 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
}
/*
+ * Remove event with specified socket descriptor
+ */
+void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd)
+{
+ int i, n = set->nevents;
+ for (i = 0; i < n; i++)
+ {
+ WaitEvent *event = &set->events[i];
+ if (event->fd == fd)
+ {
+#if defined(WAIT_USE_EPOLL)
+ WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
+#elif defined(WAIT_USE_POLL)
+ WaitEventAdjustPoll(set, event, true);
+#endif
+ break;
+ }
+ }
+}
+
+/*
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
* with the WaitEvent.
*
@@ -774,7 +809,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
#if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
#elif defined(WAIT_USE_POLL)
- WaitEventAdjustPoll(set, event);
+ WaitEventAdjustPoll(set, event, false);
#elif defined(WAIT_USE_WIN32)
WaitEventAdjustWin32(set, event);
#endif
@@ -827,14 +862,33 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("epoll_ctl() failed: %m")));
+
+ if (action == EPOLL_CTL_DEL)
+ {
+ int pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ set->nevents -= 1;
+ event->pos = set->free_events;
+ set->free_events = pos;
+ }
}
#endif
#if defined(WAIT_USE_POLL)
static void
-WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
{
- struct pollfd *pollfd = &set->pollfds[event->pos];
+ int pos = event->pos;
+ struct pollfd *pollfd = &set->pollfds[pos];
+
+ if (remove)
+ {
+ set->nevents -= 1;
+ *pollfd = set->pollfds[set->nevents];
+ set->events[pos] = set->events[set->nevents];
+ event->pos = pos;
+ return;
+ }
pollfd->revents = 0;
pollfd->fd = event->fd;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index ddc3ec8..7dc8049 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -75,8 +75,18 @@
#include "utils/snapmgr.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+#include "utils/builtins.h"
#include "mb/pg_wchar.h"
+/*
+ * Information associated with client session
+ */
+typedef struct SessionContext
+{
+ MemoryContext memory; /* memory context used for global session data (replacement of TopMemoryContext) */
+ Port* port; /* connection port */
+ char* id; /* session identifier used to construct unique prepared statement names */
+} SessionContext;
/* ----------------
* global variables
@@ -98,6 +108,8 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
+/* Local socket for redirecting sessions to the backends */
+pgsocket SessionPoolSock = PGINVALID_SOCKET;
/* ----------------
@@ -169,6 +181,13 @@ static ProcSignalReason RecoveryConflictReason;
static MemoryContext row_description_context = NULL;
static StringInfoData row_description_buf;
+static WaitEventSet* SessionPool; /* Set of all sessions sockets */
+static int64 SessionCount; /* Number of sessions */
+static SessionContext* CurrentSession; /* Pointer to the active session */
+static Port* BackendPort; /* Reference to the original port of this backend created when this backend was launched.
+ * Session using this port may be already terminated, but since it is allocated in TopMemoryContext,
+ * its content is still valid and is used as template for ports of new sessions */
+
/* ----------------------------------------------------------------
* decls for routines only used in this file
* ----------------------------------------------------------------
@@ -194,6 +213,25 @@ static void log_disconnections(int code, Datum arg);
static void enable_statement_timeout(void);
static void disable_statement_timeout(void);
+/*
+ * Generate session ID unique within this backend
+ */
+static char* CreateSessionId(void)
+{
+ char buf[64];
+ pg_lltoa(++SessionCount, buf);
+ return pstrdup(buf);
+}
+
+/*
+ * Free all memory associated with session and delete session object itself
+ */
+static void DeleteSession(SessionContext* session)
+{
+ elog(DEBUG1, "Delete session %p, id=%s, memory context=%p", session, session->id, session->memory);
+ MemoryContextDelete(session->memory);
+ free(session);
+}
/* ----------------------------------------------------------------
* routines to obtain user input
@@ -1232,6 +1270,12 @@ exec_parse_message(const char *query_string, /* string to execute */
bool save_log_statement_stats = log_statement_stats;
char msec_str[32];
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
/*
* Report query to various monitoring facilities.
*/
@@ -1503,6 +1547,12 @@ exec_bind_message(StringInfo input_message)
portal_name = pq_getmsgstring(input_message);
stmt_name = pq_getmsgstring(input_message);
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
ereport(DEBUG2,
(errmsg("bind %s to %s",
*portal_name ? portal_name : "<unnamed>",
@@ -2325,6 +2375,12 @@ exec_describe_statement_message(const char *stmt_name)
CachedPlanSource *psrc;
int i;
+ if (CurrentSession && stmt_name[0] != '\0')
+ {
+ /* Make names of prepared statements unique for session in case of using internal session pool */
+ stmt_name = psprintf("%s.%s", CurrentSession->id, stmt_name);
+ }
+
/*
* Start up a transaction command. (Note that this will normally change
* current memory context.) Nothing happens if we are already in one.
@@ -3603,7 +3659,6 @@ process_postgres_switches(int argc, char *argv[], GucContext ctx,
#endif
}
-
/* ----------------------------------------------------------------
* PostgresMain
* postgres main loop -- all backends, interactive or otherwise start here
@@ -3654,6 +3709,21 @@ PostgresMain(int argc, char *argv[],
progname)));
}
+ /* Assign session for this backend in case of session pooling */
+ if (SessionPoolSize != 0)
+ {
+ MemoryContext oldcontext;
+ CurrentSession = (SessionContext*)malloc(sizeof(SessionContext));
+ CurrentSession->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(CurrentSession->memory);
+ CurrentSession->id = CreateSessionId();
+ CurrentSession->port = MyProcPort;
+ BackendPort = MyProcPort;
+ MemoryContextSwitchTo(oldcontext);
+ }
+
/* Acquire configuration parameters, unless inherited from postmaster */
if (!IsUnderPostmaster)
{
@@ -3783,7 +3853,7 @@ PostgresMain(int argc, char *argv[],
* ... else we'd need to copy the Port data first. Also, subsidiary data
* such as the username isn't lost either; see ProcessStartupPacket().
*/
- if (PostmasterContext)
+ if (PostmasterContext && SessionPoolSize == 0)
{
MemoryContextDelete(PostmasterContext);
PostmasterContext = NULL;
@@ -4069,6 +4139,142 @@ PostgresMain(int argc, char *argv[],
ReadyForQuery(whereToSendOutput);
send_ready_for_query = false;
+
+ /*
+ * Here we perform multiplexing of client sessions if session pooling is enabled.
+ * As far as we perform transaction level pooling, rescheduling is done only when we are not in transaction.
+ */
+ if (SessionPoolSock != PGINVALID_SOCKET && !IsTransactionState() && pq_available_bytes() == 0)
+ {
+ WaitEvent ready_client;
+ if (SessionPool == NULL)
+ {
+ /* Construct wait event set if not constructed yet */
+ SessionPool = CreateWaitEventSet(TopMemoryContext, MaxSessions);
+ /* Add event to detect postmaster death */
+ AddWaitEventToSet(SessionPool, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, CurrentSession);
+ /* Add event for backends latch */
+ AddWaitEventToSet(SessionPool, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, CurrentSession);
+ /* Add event for accepting new sessions */
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, SessionPoolSock, NULL, CurrentSession);
+ /* Add event for current session */
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, MyProcPort->sock, NULL, CurrentSession);
+ }
+ ChooseSession:
+ DoingCommandRead = true;
+ /* Select which client session is ready to send new query */
+ if (WaitEventSetWait(SessionPool, -1, &ready_client, 1, PG_WAIT_CLIENT) != 1)
+ {
+ /* TODO: do some error recovery here */
+ elog(FATAL, "Failed to poll client sessions");
+ }
+ CHECK_FOR_INTERRUPTS();
+ DoingCommandRead = false;
+
+ if (ready_client.events & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit")));
+
+ if (ready_client.events & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessClientReadInterrupt(true);
+ goto ChooseSession;
+ }
+
+ if (ready_client.fd == SessionPoolSock)
+ {
+ /* Here we handle case of attaching new session */
+ int status;
+ SessionContext* session;
+ StringInfoData buf;
+ Port* port;
+ pgsocket sock;
+ MemoryContext oldcontext;
+
+ sock = pg_recv_sock(SessionPoolSock);
+ if (sock < 0)
+ elog(FATAL, "Failed to receive session socket: %m");
+
+ session = (SessionContext*)malloc(sizeof(SessionContext));
+ session->memory = AllocSetContextCreate(TopMemoryContext,
+ "SessionMemoryContext",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(session->memory);
+ port = palloc(sizeof(Port));
+ memcpy(port, BackendPort, sizeof(Port));
+
+ /*
+ * Receive the startup packet (which might turn out to be a cancel request
+ * packet).
+ */
+ port->sock = sock;
+ session->port = port;
+ session->id = CreateSessionId();
+
+ MyProcPort = port;
+ status = ProcessStartupPacket(port, false, session->memory);
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * TODO: Currently we assume that all sessions are accessing the same database under the same user.
+ * Just report an error if it is not true
+ */
+ if (strcmp(port->database_name, MyProcPort->database_name) ||
+ strcmp(port->user_name, MyProcPort->user_name))
+ {
+ elog(FATAL, "Failed to open session (dbname=%s user=%s) in backend %d (dbname=%s user=%s)",
+ port->database_name, port->user_name,
+ MyProcPid, MyProcPort->database_name, MyProcPort->user_name);
+ }
+ else if (status == STATUS_OK)
+ {
+ elog(DEBUG2, "Start new session %d in backend %d for database %s user %s",
+ sock, MyProcPid, port->database_name, port->user_name);
+ CurrentSession = session;
+ AddWaitEventToSet(SessionPool, WL_SOCKET_READABLE, sock, NULL, session);
+
+ SetCurrentStatementStartTimestamp();
+ StartTransactionCommand();
+ PerformAuthentication(MyProcPort);
+ CommitTransactionCommand();
+
+ /*
+ * Send GUC options to the client
+ */
+ BeginReportingGUCOptions();
+
+ /*
+ * Send this backend's cancellation info to the frontend.
+ */
+ pq_beginmessage(&buf, 'K');
+ pq_sendint32(&buf, (int32) MyProcPid);
+ pq_sendint32(&buf, (int32) MyCancelKey);
+ pq_endmessage(&buf);
+
+ /* Need not flush since ReadyForQuery will do it. */
+ send_ready_for_query = true;
+ continue;
+ }
+ else
+ {
+ /* Error while processing of startup package
+ * Reject this session and return back to listening sockets
+ */
+ DeleteSession(session);
+ elog(LOG, "Session startup failed");
+ close(sock);
+ goto ChooseSession;
+ }
+ }
+ else
+ {
+ elog(DEBUG2, "Switch to session %d in backend %d", ready_client.fd, MyProcPid);
+ CurrentSession = (SessionContext*)ready_client.user_data;
+ MyProcPort = CurrentSession->port;
+ }
+ }
}
/*
@@ -4350,6 +4556,39 @@ PostgresMain(int argc, char *argv[],
* it will fail to be called during other backend-shutdown
* scenarios.
*/
+
+ if (SessionPool)
+ {
+ /* In case of session pooling close the session, but do not terminate the backend
+ * even if there are not more sessions in this backend.
+ * The reason for keeping backend alive is to prevent redundant process launches if
+ * some client repeatedly open/close connection to the database.
+ * Maximal number of launched backends in case of connection pooling is intended to be
+ * optimal for this system and workload, so there are no reasons to try to reduce this number
+ * when there are no active sessions.
+ */
+ DeleteWaitEventFromSet(SessionPool, MyProcPort->sock);
+ elog(DEBUG1, "Close session %d in backend %d", MyProcPort->sock, MyProcPid);
+
+ pq_getmsgend(&input_message);
+ if (pq_is_reading_msg())
+ pq_endmsgread();
+
+ close(MyProcPort->sock);
+ MyProcPort->sock = PGINVALID_SOCKET;
+ MyProcPort = NULL;
+
+ if (CurrentSession)
+ {
+ DropSessionPreparedStatements(CurrentSession->id);
+ DeleteSession(CurrentSession);
+ CurrentSession = NULL;
+ }
+ whereToSendOutput = DestRemote;
+ /* Need to perform rescheduling to some other session or accept new session */
+ goto ChooseSession;
+ }
+ elog(DEBUG1, "Terminate backend %d", MyProcPid);
proc_exit(0);
case 'd': /* copy data */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 54fa4a3..b2f43a8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -120,7 +120,9 @@ int maintenance_work_mem = 16384;
* register background workers.
*/
int NBuffers = 1000;
+int SessionPoolSize = 0;
int MaxConnections = 90;
+int MaxSessions = 1000;
int max_worker_processes = 8;
int max_parallel_workers = 8;
int MaxBackends = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index f9b3309..571c80f 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -65,7 +65,7 @@
static HeapTuple GetDatabaseTuple(const char *dbname);
static HeapTuple GetDatabaseTupleByOid(Oid dboid);
-static void PerformAuthentication(Port *port);
+void PerformAuthentication(Port *port);
static void CheckMyDatabase(const char *name, bool am_superuser);
static void InitCommunication(void);
static void ShutdownPostgres(int code, Datum arg);
@@ -180,7 +180,7 @@ GetDatabaseTupleByOid(Oid dboid)
*
* returns: nothing. Will not return at all if there's any failure.
*/
-static void
+void
PerformAuthentication(Port *port)
{
/* This should be set already, but let's make sure */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 72f6be3..9202728 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1871,6 +1871,29 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_sessions", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of client session."),
+ gettext_noop("Maximal number of client sessions which can be handled by one backend if session pooling is switched on. "
+ "So maximal number of client connections is session_pool_size*max_sessions")
+ },
+ &MaxSessions,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"session_pool_size", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets number of backends serving client sessions."),
+ gettext_noop("If non-zero then session pooling will be used: "
+ "client connections will be redirected to one of the backends and maximal number of backends is determined by this parameter."
+ "Launched backend are never terminated even in case of no active sessions.")
+ },
+ &SessionPoolSize,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
diff --git a/src/include/commands/prepare.h b/src/include/commands/prepare.h
index ffec029..cb5f8d4 100644
--- a/src/include/commands/prepare.h
+++ b/src/include/commands/prepare.h
@@ -56,5 +56,6 @@ extern TupleDesc FetchPreparedStatementResultDesc(PreparedStatement *stmt);
extern List *FetchPreparedStatementTargetList(PreparedStatement *stmt);
extern void DropAllPreparedStatements(void);
+extern void DropSessionPreparedStatements(char const* sessionId);
#endif /* PREPARE_H */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2e7725d..9169b21 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -71,6 +71,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
+extern int pq_available_bytes(void);
/*
* prototypes for functions in be-secure.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 54ee273..a9f9228 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -157,6 +157,8 @@ extern PGDLLIMPORT char *DataDir;
extern PGDLLIMPORT int NBuffers;
extern PGDLLIMPORT int MaxBackends;
extern PGDLLIMPORT int MaxConnections;
+extern PGDLLIMPORT int MaxSessions;
+extern PGDLLIMPORT int SessionPoolSize;
extern PGDLLIMPORT int max_worker_processes;
extern int max_parallel_workers;
@@ -420,6 +422,7 @@ extern void InitializeMaxBackends(void);
extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
Oid useroid, char *out_dbname);
extern void BaseInit(void);
+extern void PerformAuthentication(struct Port *port);
/* in utils/init/miscinit.c */
extern bool IgnoreSystemIndexes;
diff --git a/src/include/port.h b/src/include/port.h
index 3e528fa..c14a20d 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -41,6 +41,10 @@ typedef SOCKET pgsocket;
extern bool pg_set_noblock(pgsocket sock);
extern bool pg_set_block(pgsocket sock);
+/* send/receive socket descriptor */
+extern int pg_send_sock(pgsocket chan, pgsocket sock);
+extern pgsocket pg_recv_sock(pgsocket chan);
+
/* Portable path handling for Unix/Win32 (in path.c) */
extern bool has_drive_prefix(const char *filename);
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef..c9527c9 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -62,6 +62,9 @@ extern Size ShmemBackendArraySize(void);
extern void ShmemBackendArrayAllocation(void);
#endif
+struct Port;
+extern int ProcessStartupPacket(struct Port *port, bool SSLdone, MemoryContext memctx);
+
/*
* Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* for buffer references in buf_internals.h. This limitation could be lifted
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index a4bcb48..10f30d1 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -176,6 +176,8 @@ extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
+extern void DeleteWaitEventFromSet(WaitEventSet *set, pgsocket fd);
+
/*
* Unix implementation uses SIGUSR1 for inter-process signaling.
* Win32 doesn't need this.
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 63b4e48..191eeaa 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -34,6 +34,7 @@ extern CommandDest whereToSendOutput;
extern PGDLLIMPORT const char *debug_query_string;
extern int max_stack_depth;
extern int PostAuthDelay;
+extern pgsocket SessionPoolSock;
/* GUC-configurable parameters */
On 01/22/2018 05:05 PM, Konstantin Knizhnik wrote:
On 19.01.2018 20:28, Tomas Vondra wrote:
With pgbouncer you will never be able to use prepared statements which
slows down simple queries almost twice (unless my patch with
autoprepared statements is committed).I don't see why that wouldn't be possible? Perhaps not for prepared
statements with simple protocol, but I'm pretty sure it's doable for
extended protocol (which seems like a reasonable limitation).That being said, I think it's a mistake to turn this thread into a
pgbouncer vs. the world battle. I could name things that are possible
only with standalone connection pool - e.g. pausing connections and
restarting the database without interrupting the clients.But that does not mean built-in connection pool is not useful.
regards
Sorry, I do not understand how extended protocol can help to handle
prepared statements without shared prepared statement cache or
built-in connection pooling.
The extended protocol makes it easy for pgbouncer (or any other proxy)
to identify prepared statements, so that it can track (a) which prepared
statements a client defined, and (b) what prepared statements are
defined on a connection. And then do something when a client gets
assigned a connection missing some of those.
I do not claim doing this would be trivial, but I don't see why would
that be impossible.
Of course, the built-in pool can handle this in different ways, as it
has access to the internal caches.
The problems is that now in Postgres most of caches including catalog
cache, relation cache, prepared statements cache are private to a backend.
True. I wouldn't say it's a "problem" but it's certainly a challenge for
certain features.
There is certainly one big advantage of such approach: no need to
synchronize access to the cache. But it seems to be the only advantage.
And there are a lot of drawbacks:
inefficient use of memory, complex invalidation mechanism, not
compatible with connection pooling...
Perhaps. I personally see the minimal synchronization as a quite
valuable feature.
So there are three possible ways (may be more, but I know only three):
1. Implement built-in connection pooling which will be aware of proper
use of local caches. This is what I have implemented with the proposed
approach.
2. Implicit autoprepare. Clients will not be able to use standard
Postgres prepare mechanism, but executor will try to generate generic
plan for ordinary queries. My implementation of this approach is at
commit fest.
3. Global caches. It seems to be the best solution but the most
difficult to implement.
Perhaps.
Actually I think that the discussion about the value of built-in
connection pooling is very important.
I agree, and I wasn't speaking against built-in connection pooling.
Yes, external connection pooling is more flexible. It allows to
perform pooling either at client side either at server side (or even
combine two approaches).>
Also external connection pooling for PostgreSQL is not limited by
pgbouncer/pgpool.>
There are many frameworks maintaining their own connection pool, for
example J2EE, jboss, hibernate,...>
I have a filling than about 70% of enterprise systems working with
databases are written in Java and doing connection pooling in their
own way.>
True, but that does not really mean we don't need "our" connection
pooling (built-in or not). The connection pools are usually built into
the application servers, so each application server has their own
independent pool. With larger deployments (a couple of application
servers) that quickly causes problems with max_connections.
So may be embedded connection pooling is not needed for such
applications...But what I have heard from main people is that Postgres' poor
connection pooling is one of the main drawbacks of Postgres
complicating it's usage in enterprise environments.
Maybe. I'm sure there's room for improvement.
That being said, when enterprise developers tell me PostgreSQL is
missing some feature, 99% of the time it turns out they're doing
something quite stupid.
In any case please find updated patch with some code cleanup and
more comments added.
OK, will look.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Mon, Jan 22, 2018 at 06:51:08PM +0100, Tomas Vondra wrote:
Yes, external connection pooling is more flexible. It allows to
perform pooling either at client side either at server side (or even
combine two approaches).>
Also external connection pooling for PostgreSQL is not limited by
pgbouncer/pgpool.>
There are many frameworks maintaining their own connection pool, for
example J2EE, jboss, hibernate,...>
I have a filling than about 70% of enterprise systems working with
databases are written in Java and doing connection pooling in their
own way.>True, but that does not really mean we don't need "our" connection
pooling (built-in or not). The connection pools are usually built into
the application servers, so each application server has their own
independent pool. With larger deployments (a couple of application
servers) that quickly causes problems with max_connections.
I found this thread and the pthread thread very interesting. Konstantin,
thank you for writing prototypes and giving us very useful benchmarks
for ideas I thought I might never see.
As much as I would like to move forward with coding, I would like to
back up and understand where we need to go with these ideas.
First, it looks like pthreads and a builtin pooler help mostly with
1000+ connections. It seems like you found that pthreads wasn't
sufficient and the builtin pooler was better. Is that correct?
Is there anything we can do differently about allowing long-idle
connections to reduce their resource usage, e.g. free their caches?
Remove from PGPROC? Could we do it conditionally, e.g. only sessions
that don't have open transactions or cursors?
It feels like user and db mismatches are always going to cause pooling
problems. Could we actually exit and restart connections that have
default session state?
Right now, if you hit max_connections, we start rejecting new
connections. Would it make sense to allow an option to exit idle
connections when this happens so new users can connect?
I know we have relied on external connection poolers to solve all the
high connection problems but it seems there might be simple things we
can do to improve matters. FYI, I did write a blog entry comparing
external and internal connection poolers:
https://momjian.us/main/blogs/pgblog/2017.html#April_21_2017
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
On Sat, Jan 27, 2018 at 4:40 PM, Bruce Momjian <bruce@momjian.us> wrote:
On Mon, Jan 22, 2018 at 06:51:08PM +0100, Tomas Vondra wrote:
Right now, if you hit max_connections, we start rejecting new
connections. Would it make sense to allow an option to exit idle
connections when this happens so new users can connect?
A lot of users have bash scripts to check the system periodically and canel
idle connections to prevent other users from getting rejected by max
connections. They do this on a timer, like if the session appears to be
idle more than 10 minutes.
I know we have relied on external connection poolers to solve all the
high connection problems but it seems there might be simple things we
can do to improve matters. FYI, I did write a blog entry comparing
external and internal connection poolers:
Yes, that would be great.
The simplest thing sounds like a GUC that will automitcally end a
connection idle for X seconds.
Another option could be as you suggested, Bruce, if a user would have
failed because of max connections already reached, then terminate the
connection that has been idle the longest and allow a new connection to
come in.
These would greatly improve user experience as most folks have to automate
this all themselves anyway.
Cheers,
Ivan
On Sun, Jan 28, 2018 at 02:01:07PM -0800, Ivan Novick wrote:
On Sat, Jan 27, 2018 at 4:40 PM, Bruce Momjian <bruce@momjian.us> wrote:
On Mon, Jan 22, 2018 at 06:51:08PM +0100, Tomas Vondra wrote:
Right now, if you hit max_connections, we start rejecting new
connections.� Would it make sense to allow an option to exit idle
connections when this happens so new users can connect?A lot of users have bash scripts to check the system periodically and canel
idle connections to prevent other users from getting rejected by max
connections.� They do this on a timer, like if the session appears to be idle
more than 10 minutes.
�I know we have relied on external connection poolers to solve all the
high connection problems but it seems there might be simple things we
can do to improve matters.� FYI, I did write a blog entry comparing
external and internal connection poolers:Yes, that would be great.
The simplest thing sounds like a GUC that will automitcally end a connection
idle for X seconds.
Uh, we already have idle_in_transaction_session_timeout so we would just
need a simpler version.
Another option could be as you suggested, Bruce, if a user would have failed
because of max connections already reached, then terminate the connection that
has been idle the longest and allow a new connection to come in.These would greatly improve user experience as most folks have to automate this
all themselves anyway.
Plus the ability to auto-free resources like cached system tables if the
backend is idle for a specified duration.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
The simplest thing sounds like a GUC that will automitcally end a
connection
idle for X seconds.
Uh, we already have idle_in_transaction_session_timeout so we would just
need a simpler version.
Oh i see its in 9.6, AWESOME!
Cheers
On Sun, Jan 28, 2018 at 03:11:25PM -0800, Ivan Novick wrote:
The simplest thing sounds like a GUC that will automitcally end a connection
idle for X seconds.
Uh, we already have idle_in_transaction_session_timeout so we would just
need a simpler version.Oh i see its in 9.6, AWESOME!�
In summary, the good news is that adding an idle-session-timeout GUC, a
max_connections limit hit cancels idle connections GUC, and a GUC for
idle connections to reduce their resource usage shouldn't be too hard to
implement and will provide useful benefits.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
On 28.01.2018 03:40, Bruce Momjian wrote:
On Mon, Jan 22, 2018 at 06:51:08PM +0100, Tomas Vondra wrote:
Yes, external connection pooling is more flexible. It allows to
perform pooling either at client side either at server side (or even
combine two approaches).>
Also external connection pooling for PostgreSQL is not limited by
pgbouncer/pgpool.>
There are many frameworks maintaining their own connection pool, for
example J2EE, jboss, hibernate,...>
I have a filling than about 70% of enterprise systems working with
databases are written in Java and doing connection pooling in their
own way.>True, but that does not really mean we don't need "our" connection
pooling (built-in or not). The connection pools are usually built into
the application servers, so each application server has their own
independent pool. With larger deployments (a couple of application
servers) that quickly causes problems with max_connections.I found this thread and the pthread thread very interesting. Konstantin,
thank you for writing prototypes and giving us very useful benchmarks
for ideas I thought I might never see.As much as I would like to move forward with coding, I would like to
back up and understand where we need to go with these ideas.First, it looks like pthreads and a builtin pooler help mostly with
1000+ connections. It seems like you found that pthreads wasn't
sufficient and the builtin pooler was better. Is that correct?
Brief answer is yes.
Pthreads allows to minimize per-connection overhead and make it possible
to obtain better results for large number of connections.
But there is a principle problem: Postgres connection is "heave weight"
object: each connection maintains it own private cache of catalog,
relations, temporary
table pages, prepared statements,... So even through pthreads allows to
minimize per-connection memory usage, it is negligible comparing with
all this connection
private memory resources. It means that we still need to use connection
pooling.
Pthreads provides two main advantages:
1. Simplify interaction between different workers: on need to use shared
memory with it's fixed size limitation and
impossibility to use normal pointer for dynamic shared memory. Also no
need to implement specialized memory allocator for shared memory.
It makes implementation of parallel query execution and built-on
connection pooling much easier.
2. Optimize virtual-to-physical address translation. There is no need to
maintain separate address space for each backend, so TLB(translation
lookaside buffercan) becomes more efficient.
So it is not completely correct to consider session pooling as
alternative to pthreads.
Ideally this two approaches should be combined.
Is there anything we can do differently about allowing long-idle
connections to reduce their resource usage, e.g. free their caches?
Remove from PGPROC? Could we do it conditionally, e.g. only sessions
that don't have open transactions or cursors?
I think that the best approach is to switch to global (shared) caches
for execution plans, catalog,...
Most of the time this metadata caches are used to be identical for all
clients. So it is just waste of memory and time to maintain them
separately in each backend.
Certainly shared cached requires some synchronization when can be a
point of contention and cause significant degrade of performance.
But taking in account that metadata is updated much rarely than data, I
hope using copy-on-write and atomic operations can help to solve this
problems.
And in can give a lot of different advantages. For example it will be
possible to spend more time in optimizer for detecting optimal execution
plan and store manually plans for
future use.
It feels like user and db mismatches are always going to cause pooling
problems. Could we actually exit and restart connections that have
default session state?
Well, combining multiuser access and connection pooling is really a
challenged problem.
I do not know the best solution for it now. It will be much simpler to
find solution with pthreads model...
Most of enterprise systems are using pgbouncer or similar connection
pooler. In pgbouncer in statement/transaction pooling mode access to the
database is performed under the same user. So it means that many existed
statements are built in the assumption that database is accessed in this
manner.
Concerning "default session state": one of the main drawbacks of
pgbouncer and other external poolers is that them do not allow to use
prepared statements.
And it leads to up to two times performance penalty on typical OLTP
queries. One of the main ideads of built-on session pooling was to
eliminate such limitation.
Right now, if you hit max_connections, we start rejecting new
connections. Would it make sense to allow an option to exit idle
connections when this happens so new users can connect?
It will require changes in client applications, will not it? Them should
be ready that connection can be dropped by server at any moment of time.
I do not know it is possible to drop idle connection and hide this fact
from the client. In my implementation each session keeps minimal
necessary information requires for interaction with client (session
context). It includes socket, struct Port and session memory context
which should be used instead of TopMemoryContext for session specific data.
I know we have relied on external connection poolers to solve all the
high connection problems but it seems there might be simple things we
can do to improve matters. FYI, I did write a blog entry comparing
external and internal connection poolers:https://momjian.us/main/blogs/pgblog/2017.html#April_21_2017
I completely agree with your arguments in this post.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
On Mon, Jan 29, 2018 at 11:57:36AM +0300, Konstantin Knizhnik wrote:
Right now, if you hit max_connections, we start rejecting new
connections. Would it make sense to allow an option to exit idle
connections when this happens so new users can connect?It will require changes in client applications, will not it? Them should be
ready that connection can be dropped by server at any moment of time.
I do not know it is possible to drop idle connection and hide this fact from
the client. In my implementation each session keeps minimal necessary
information requires for interaction with client (session context).� It
includes socket, struct Port and session memory context which should be used
instead of TopMemoryContext for session specific data.
Yes, it would impact applications and you are right most applications
could not handle that cleanly. It is probably better to look into
freeing resources for idle connections instead and keep the socket open.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
Bruce>Yes, it would impact applications and you are right most applications
could not handle that cleanly.
I would disagree here.
We are discussing applications that produce "lots of idle" connections,
aren't we? That typically comes from an application-level connection pool.
Most of the connection pools have a setting that would "validate"
connection in case it was not used for a certain period of time.
That plays nicely in case server drops "idle, not in a transaction"
connection.
Of course, there are cases when application just grabs a connection from a
pool and uses it in a non-transacted way (e.g. does some action once an
hour and commits immediately). However that kind of application would
already face firewalls, etc. I mean the application should already be
prepared to handle "network issues".
Bruce> It is probably better to look into
Bruce>freeing resources for idle connections instead and keep the socket
open.
The application might expect for the session-specific data to be present,
so it might be even worse if the database deallocates all the things but
TCP connection.
For instance: application might expect for the server-prepared statements
to be there. Would you deallocate server-prepared statements for those
"idle" connections? The app would just break. There's no way (currently)
for the application to know that the statement expired unexpectedly.
Vladimir
On Mon, Jan 29, 2018 at 04:02:22PM +0000, Vladimir Sitnikov wrote:
Bruce>Yes, it would impact applications and you are right most applications
could not handle that cleanly.I would disagree here.
We are discussing applications that produce "lots of idle" connections, aren't
we? That typically comes from an application-level connection pool.
Most of the connection pools have a setting that would "validate" connection in
case it was not used for a certain period of time.That plays nicely in case server drops "idle, not in a transaction" connection.
Well, we could have the connection pooler disconnect those, right?
Of course, there are cases when application just grabs a connection from a pool
and uses it in a non-transacted way (e.g. does some action once an hour and
commits immediately). However that kind of application would already face
firewalls, etc. I mean the application should already be prepared to handle
"network issues".Bruce> It is probably better to look into
Bruce>freeing resources for idle connections instead and keep the socket open.The application might expect for the session-specific data to be present, so it
might be even worse if the database deallocates all the things but TCP
connection.For instance: application might expect for the server-prepared statements to be
there. Would you deallocate server-prepared statements for those "idle"
connections? The app would just break. There's no way (currently) for the
application to know that the statement expired unexpectedly.
I don't know what we would deallocate yet.
--
Bruce Momjian <bruce@momjian.us> http://momjian.us
EnterpriseDB http://enterprisedb.com
+ As you are, so once was I. As I am, so you will be. +
+ Ancient Roman grave inscription +
Bruce>Well, we could have the connection pooler disconnect those, right?
I agree. Do you think we could rely on all the applications being
configured in a sane way?
A fallback configuration at DB level could still be useful to ensure the DB
keeps running in case multiple applications access it. It might be
non-trivial to ensure proper configurations across all the apps.
What I do like is the behaviour of dropping connections should already be
considered by existing applications, so it should fit naturally to the
existing apps.
Alternative approach might be to dump to disk relevant resources for
inactive sessions, so the session could be recreated in case the connection
is requested again after a long pause (e.g. reprepare all the statements),
however it sounds scary.
Vladimir
I have obtained more results with YCSB benchmark and built-in connection
pooling.
Explanation of the benchmark and all results for vanilla Postgres and
Mongo are available in Oleg Bartunov presentation about JSON (at the
end of presentation):
http://www.sai.msu.su/~megera/postgres/talks/sqljson-pgconf.eu-2017.pdf
as you can see, Postgres shows significant slow down with increasing
number of connections in case of conflicting updates.
Built-in connection pooling can somehow eliminate this problem:
Workload-B (5% of updates) ops/sec:
Session pool size/clients
250
500
750
1000
0
151511
78078
48742
30186
32
522347
543863
546971
540462
64
736323
770010
763649
763358
128
245167
241377
243322
232482
256
144964
146723
149317
141049
Here the maximum is obtained near 70 backends which corresponds to the
number of physical cores at the target system.
But for workload A (50% of updates), optimum is achieved at much smaller
number of backends, after which we get very fast performance degradation:
Session pool size
kops/sec
16
220
30
353
32
362
40
120
70
53
256
20
Here the maximum is reached at 32 backends and with 70 backends
performance is 6 times worser.
It means that it is difficult to find optimal size of session pool if we
have varying workload.
If we set it too large, then we get high contention of conflicting
update queries, if it is too small, then we do not utilize all system
resource on read-only or not conflicting queries.
Look like we have to do something with Postgres locking mechanism and
may be implement some contention aware scheduler as described here:
http://www.vldb.org/pvldb/vol11/p648-tian.pdf
But this is a different story, not related to built-in connection pooling.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Konstantin>I have obtained more results with YCSB benchmark and built-in
connection pooling
Could you provide more information on the benchmark setup you have used?
For instance: benchmark library versions, PostgreSQL client version,
additional/default benchmark parameters.
Konstantin>Postgres shows significant slow down with increasing number of
connections in case of conflicting updates.
Konstantin>Built-in connection pooling can somehow eliminate this problem
Can you please clarify how connection pooling eliminates slow down?
Is the case as follows?
1) The application updates multiple of rows in a single transaction
2) There are multiple concurrent threads
3) The threads update the same rows at the same time
If that is the case, then the actual workload is different each time you
vary connection pool size.
For instance, if you use 1 thread, then the writes become uncontended.
Of course, you might use just it as a "black box" workload, however I
wonder if that kind of workload ever appears in a real-life applications. I
would expect for the applications to update the same row multiple times,
however I would expect the app is doing subsequent updates, not the
concurrent ones.
On the other hand, as you vary the pool size, the workload varies as well
(the resulting database contents is different), so it looks like comparing
apples to oranges.
Vladimir
Show quoted text
On 01.02.2018 15:21, Vladimir Sitnikov wrote:
Konstantin>I have obtained more results with YCSB benchmark and
built-in connection poolingCould you provide more information on the benchmark setup you have used?
For instance: benchmark library versions, PostgreSQL client version,
additional/default benchmark parameters.
I am using the latest Postgres sources with applied connection pooling
patch.
I have not built YCSB myself, use existed installation.
To launch tests I used the following YCSB command line:
To load data:
YCSB_MAXRUNTIME=60 YCSB_OPS=1000000000 YCSB_DBS="pgjsonb-local"
YCSB_CFG="bt" YCSB_CLIENTS="250" YCSB_WORKLOADS="load_a" ./ycsb.sh
To run test:
YCSB_MAXRUNTIME=60 YCSB_OPS=1000000000 YCSB_DBS="pgjsonb-local"
YCSB_CFG="bt" YCSB_CLIENTS="250 500 750 1000" YCSB_WORKLOADS="run_a"
./ycsb.sh
$ cat config/pgjsonb-local.dat
db.driver=org.postgresql.Driver
db.url=jdbc:postgresql://localhost:5432/ycsb
db.user=ycsb
db.passwd=ycsb
db.batchsize=100
jdbc.batchupdateapi=true
table=usertable
Konstantin>Postgres shows significant slow down with increasing number
of connections in case of conflicting updates.
Konstantin>Built-in connection pooling can somehow eliminate this problemCan you please clarify how connection pooling eliminates slow down?
Is the case as follows?
1) The application updates multiple of rows in a single transaction
2) There are multiple concurrent threads
3) The threads update the same rows at the same timeIf that is the case, then the actual workload is different each time
you vary connection pool size.
For instance, if you use 1 thread, then the writes become uncontended.Of course, you might use just it as a "black box" workload, however I
wonder if that kind of workload ever appears in a real-life
applications. I would expect for the applications to update the same
row multiple times, however I would expect the app is doing subsequent
updates, not the concurrent ones.On the other hand, as you vary the pool size, the workload varies as
well (the resulting database contents is different), so it looks like
comparing apples to oranges.Vladimir
Sorry, I am not sure that I completely understand your question.
YCSB (Yahoo! Cloud Serving Benchmark) framework is essentially
multiclient benchmark which assumes larger number concurrent requests to
the database.
Requests themselves are used to be very simple (benchmark emulates
key-vlaue storage).
In my tests I perform measurements for 250, 500, 750 and 1000 connections.
One of the main problems of Postgres is significant degrade of
performance in case of concurrent write access by multiple transactions
to the same sows.
This is why performance of pgbench and YCSB benchmark significantly
(more then linear) degrades with increasing number of client connections
especially in case o Zipf distribution
(significantly increasing possibility of conflict).
Connection pooling allows to fix number of backends and serve almost any
number of connections using fixed size of backends.
So results are almost the same for 250, 500, 750 and 1000 connections.
The problem is choosing optimal number of backends.
For readonly pgbench best results are achieved for 300 backends, for
YCSB with 5% of updates - for 70 backends, for YCSB with 50% of updates
- for 30 backends.
So something definitely need to be changes in Postgres locking mechanism.
Connection pooling allows to minimize contention on resource and degrade
of performance caused by such contention.
But unfortunately it is not a silver bullet fixing all Postgres
scalability problems.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Konstantin>I have not built YCSB myself, use existed installation.
Which pgjdbc version was in use?
Konstantin>One of the main problems of Postgres is significant degrade of
performance in case of concurrent write access by multiple transactions to
the same sows.
I would consider that a workload "problem" rather than PostgreSQL problem.
That is, if an application (e.g. YCSB) is trying to update the same rows in
multiple transactions concurrently, then the outcome of such updates is
likely to be unpredictable. Does it make sense?
At least, I do not see why Mongo would degrade in a different way there.
Oleg's charts suggest that Mongo does not degrade there, so I wonder if we
compare apples to apples in the first place.
Vladimir
On 01.02.2018 16:33, Vladimir Sitnikov wrote:
Konstantin>I have not built YCSB myself, use existed installation.
Which pgjdbc version was in use?
postgresql-9.4.1212.jar
Konstantin>One of the main problems of Postgres is significant degrade
of performance in case of concurrent write access by multiple
transactions to the same sows.I would consider that a workload "problem" rather than PostgreSQL problem.
That is, if an application (e.g. YCSB) is trying to update the same
rows in multiple transactions concurrently, then the outcome of such
updates is likely to be unpredictable. Does it make sense?
I can't agree with you.
Yes, there are workloads where updates are more or less local: clients
are used to update their own private data.
But there are many systems with "shared" resources which are
concurrently accessed by different users. They may just increment access
count or perform deposit/withdraw...
Just simple example: consider that you have something like AppStore and
there is some popular application which is bought by a lot of users.
From DBMS point of view a lot of clients perform concurrent update of
the same record.
So performance on such workload is also very important. And
unfortunately here Postgres loses to the competition with mySQL and most
of other DBMSes.
At least, I do not see why Mongo would degrade in a different way
there. Oleg's charts suggest that Mongo does not degrade there, so I
wonder if we compare apples to apples in the first place.
Postgres locks tuples in very inefficient way in case of high contention.
It first lock buffer and checks if tuple is locked by some other backend.
Then it tries to set heavy weight lock on the tuple's tcid. If there are
several processes trying update this tuple, then all of them will be
queued on this heavy-weight tuple lock.
After getting this tuple lock, backend tries to lock tid of transaction
which updated the tuple.
Once transaction updated this tuple is completed, Postgres unblocks
backends waiting for this transaction. It checks status of the tuple and
release tuple's lock, awaken one of waiting clients.
As far as Postgres using MVCC, it creates new version of the tuple on
each update.
So the tuple all clients are waiting for is not the last version of of
the tuple any more.
Depending on isolation policy them either need to report error (in case
of repeatable read) or update snapshot and repeat search with new
snapshot...
and perform all checks and locks mentioned above once again.
I hope that it is clear from this brief and not so precise explanation
that Postgres has to do a lot of redundant work if several client are
competing for the same tuple.
There is well known rule that pessimistic locking is more efficient than
optimistic in case of high contention.
So Postgres can provide better performance on this workload if it be
more pessimistic:
set lock not on TCID (identifier of particular tuple version), but on
tuple's PK (primary key) and hold it till end of the transaction
(because until transaction is completed nobody still be
able to update this tuple). This trick with locking PK really helps to
improve performance on this workload, but unfortunately can not reverse
the trend with the degradation of performance with increasing number of
competing transactions.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
config/pgjsonb-local.dat
Do you use standard "workload" configuration values?
(e.g. recordcount=1000, maxscanlength=100)
Could you share ycsb output (e.g. for workload a)?
I mean lines like
[TOTAL_GC_TIME], Time(ms), xxx
[TOTAL_GC_TIME_%], Time(%), xxx
postgresql-9.4.1212.jar
Ok, you have relevant performance fixes then.
Konstantin>Just simple example: consider that you have something like
AppStore and there is some popular application which is bought by a lot of
users.
Konstantin>From DBMS point of view a lot of clients perform concurrent
update of the same record.
I thought YCSB updated *multiple rows* per transaction. It turns out all
the default YCSB workloads update just one row per transaction. There is no
batching, etc. Batch-related parameters are used at "DB initial load" time
only.
Konstantin>Postgres locks tuples in very inefficient way in case of high
contention
Thank you for the explanation.
Vladimir
On 01.02.2018 23:28, Vladimir Sitnikov wrote:
config/pgjsonb-local.dat
Do you use standard "workload" configuration values?
(e.g. recordcount=1000, maxscanlength=100)
Yes, I used default value for workload. For example, workload-A has the
following settings:
# Yahoo! Cloud System Benchmark
# Workload A: Update heavy workload
# Application example: Session store recording recent actions
#
# Read/update ratio: 50/50
# Default data size: 1 KB records (10 fields, 100 bytes each, plus key)
# Request distribution: zipfian
recordcount=1000
operationcount=1000
workload=com.yahoo.ycsb.workloads.CoreWorkload
readallfields=true
readproportion=0.5
updateproportion=0.5
scanproportion=0
insertproportion=0
requestdistribution=zipfian
Could you share ycsb output (e.g. for workload a)?
I mean lines like
[TOTAL_GC_TIME], Time(ms), xxx
[TOTAL_GC_TIME_%], Time(%), xxx
$ cat results/last/run_pgjsonb-local_workloada_70_bt.out
[OVERALL], RunTime(ms), 60099.0
[OVERALL], Throughput(ops/sec), 50444.83269272367
[TOTAL_GCS_PS_Scavenge], Count, 6.0
[TOTAL_GC_TIME_PS_Scavenge], Time(ms), 70.0
[TOTAL_GC_TIME_%_PS_Scavenge], Time(%), 0.11647448376844872
[TOTAL_GCS_PS_MarkSweep], Count, 0.0
[TOTAL_GC_TIME_PS_MarkSweep], Time(ms), 0.0
[TOTAL_GC_TIME_%_PS_MarkSweep], Time(%), 0.0
[TOTAL_GCs], Count, 6.0
[TOTAL_GC_TIME], Time(ms), 70.0
[TOTAL_GC_TIME_%], Time(%), 0.11647448376844872
[READ], Operations, 1516174.0
[READ], AverageLatency(us), 135.802076146933
[READ], MinLatency(us), 57.0
[READ], MaxLatency(us), 23327.0
[READ], 95thPercentileLatency(us), 382.0
[READ], 99thPercentileLatency(us), 828.0
[READ], Return=OK, 1516174
[CLEANUP], Operations, 70.0
[CLEANUP], AverageLatency(us), 134.21428571428572
[CLEANUP], MinLatency(us), 55.0
[CLEANUP], MaxLatency(us), 753.0
[CLEANUP], 95thPercentileLatency(us), 728.0
[CLEANUP], 99thPercentileLatency(us), 750.0
[UPDATE], Operations, 1515510.0
[UPDATE], AverageLatency(us), 2622.6653258639008
[UPDATE], MinLatency(us), 86.0
[UPDATE], MaxLatency(us), 1059839.0
[UPDATE], 95thPercentileLatency(us), 1261.0
[UPDATE], 99thPercentileLatency(us), 87039.0
[UPDATE], Return=OK, 1515510
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Nikita Glukhov has added results of YCSB benchmark with connection
pooling to the common diagram (attached).
As you can see connection pooling provides stable results for all number
backends, except Workload-E.
I do not have explanation of performance degradation in case of this
particular workload.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
Attachments:
ycsb-zipf-pool.pngimage/png; name=ycsb-zipf-pool.pngDownload
�PNG
IHDR � � �'W IDATx���y|��8����}�fws'$ �#����P�(*�j+`/����<�h���E�E�V�\U��V�3�I !!���#{����'L���Iv'���f�}v��Mx2����<