walreceiver is uninterruptible on win32
Hi,
http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.
I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
win32_interruptible_walreceiver_v1.patchtext/x-patch; charset=US-ASCII; name=win32_interruptible_walreceiver_v1.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 18,23 ****
--- 18,24 ----
#include <unistd.h>
#include <sys/time.h>
+ #include <time.h>
#include "libpq-fe.h"
#include "access/xlog.h"
***************
*** 53,59 **** static bool libpqrcv_receive(int timeout, unsigned char *type,
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
! static bool libpq_select(int timeout_ms);
/*
* Module load callback
--- 54,61 ----
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
! static int libpq_select(bool forRead, bool forWrite, int timeout_ms);
! static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
***************
*** 83,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
TimeLineID standby_tli;
PGresult *res;
char cmd[64];
/* Connect */
snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
! streamConn = PQconnectdb(conninfo_repl);
! if (PQstatus(streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server : %s",
PQerrorMessage(streamConn))));
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
--- 85,198 ----
TimeLineID standby_tli;
PGresult *res;
char cmd[64];
+ PQconninfoOption *options;
+ time_t finish_time = ((time_t) -1);
+
+ /*
+ * Extract timeout from the connection string
+ */
+ options = PQconninfoParse(conninfo, NULL);
+ if (options)
+ {
+ PQconninfoOption *option;
+ for (option = options; option->keyword != NULL; option++)
+ {
+ if (strcmp(option->keyword, "connect_timeout") == 0)
+ {
+ if (option->val != NULL && option->val[0] != '\0')
+ {
+ int timeout = atoi(option->val);
+
+ if (timeout > 0)
+ {
+ /*
+ * Rounding could cause connection to fail;
+ * need at least 2 secs
+ */
+ if (timeout < 2)
+ timeout = 2;
+ /* calculate the finish time based on start + timeout */
+ finish_time = time(NULL) + timeout;
+ }
+ }
+ }
+ }
+ PQconninfoFree(options);
+ }
/* Connect */
snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
! streamConn = PQconnectStart(conninfo_repl);
! if (PQstatus(streamConn) == CONNECTION_BAD)
ereport(ERROR,
(errmsg("could not connect to the primary server : %s",
PQerrorMessage(streamConn))));
/*
+ * Wait for connection to be established
+ */
+ for (;;)
+ {
+ PostgresPollingStatusType status;
+ bool established = false;
+ bool forRead = false;
+ bool forWrite = false;
+ int timeout_ms;
+ int ret;
+
+ status = PQconnectPoll(streamConn);
+ switch (status)
+ {
+ case PGRES_POLLING_READING:
+ forRead = true;
+ break;
+ case PGRES_POLLING_WRITING:
+ forWrite = true;
+ break;
+ case PGRES_POLLING_OK:
+ established = true;
+ break;
+ case PGRES_POLLING_FAILED:
+ default:
+ ereport(ERROR,
+ (errmsg("could not connect to the primary server : %s",
+ PQerrorMessage(streamConn))));
+ }
+
+ if (established)
+ break;
+
+ retry:
+ /* Compute appropriate timeout interval */
+ if (finish_time == ((time_t) -1))
+ timeout_ms = -1;
+ else
+ {
+ time_t now = time(NULL);
+
+ if (finish_time > now)
+ timeout_ms = (finish_time - now) * 1000;
+ else
+ timeout_ms = 0;
+ }
+
+ /*
+ * Wait until we can read or write the connection socket
+ */
+ ret = libpq_select(forRead, forWrite, timeout_ms);
+ if (ret == 0) /* timeout */
+ ereport(ERROR,
+ (errmsg("could not connect to the primary server : timeout expired")));
+ if (ret < 0) /* interrupted */
+ goto retry;
+ }
+
+ /*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = PQexec(streamConn, cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
ereport(ERROR,
(errmsg("could not start XLOG streaming: %s",
PQerrorMessage(streamConn))));
PQclear(res);
justconnected = true;
--- 244,257 ----
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ PQclear(res);
ereport(ERROR,
(errmsg("could not start XLOG streaming: %s",
PQerrorMessage(streamConn))));
+ }
PQclear(res);
justconnected = true;
***************
*** 162,176 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
}
/*
! * Wait until we can read WAL stream, or timeout.
*
! * Returns true if data has become available for reading, false if timed out
! * or interrupted by signal.
*
* This is based on pqSocketCheck.
*/
! static bool
! libpq_select(int timeout_ms)
{
int ret;
--- 260,275 ----
}
/*
! * Wait until we can read or write the connection socket
*
! * Returns >0 if data has been ready to be read, written,
! * or both, 0 if timed out, -1 if interrupted by signal.
! * Throws an error if an error occurred.
*
* This is based on pqSocketCheck.
*/
! static int
! libpq_select(bool forRead, bool forWrite, int timeout_ms)
{
int ret;
***************
*** 180,203 **** libpq_select(int timeout_ms)
(errcode_for_socket_access(),
errmsg("socket not open")));
/* We use poll(2) if available, otherwise select(2) */
{
#ifdef HAVE_POLL
struct pollfd input_fd;
input_fd.fd = PQsocket(streamConn);
! input_fd.events = POLLIN | POLLERR;
input_fd.revents = 0;
ret = poll(&input_fd, 1, timeout_ms);
#else /* !HAVE_POLL */
fd_set input_mask;
struct timeval timeout;
struct timeval *ptr_timeout;
FD_ZERO(&input_mask);
! FD_SET (PQsocket(streamConn), &input_mask);
if (timeout_ms < 0)
ptr_timeout = NULL;
--- 279,314 ----
(errcode_for_socket_access(),
errmsg("socket not open")));
+ Assert(forRead || forWrite);
+
/* We use poll(2) if available, otherwise select(2) */
{
#ifdef HAVE_POLL
struct pollfd input_fd;
input_fd.fd = PQsocket(streamConn);
! input_fd.events = POLLERR;
input_fd.revents = 0;
+ if (forRead)
+ input_fd.events |= POLLIN;
+ if (forWrite)
+ input_fd.events |= POLLOUT;
+
ret = poll(&input_fd, 1, timeout_ms);
#else /* !HAVE_POLL */
fd_set input_mask;
+ fd_set output_mask;
struct timeval timeout;
struct timeval *ptr_timeout;
FD_ZERO(&input_mask);
! FD_ZERO(&output_mask);
! if (forRead)
! FD_SET (PQsocket(streamConn), &input_mask);
! if (forWrite)
! FD_SET (PQsocket(streamConn), &output_mask);
if (timeout_ms < 0)
ptr_timeout = NULL;
***************
*** 209,225 **** libpq_select(int timeout_ms)
}
ret = select(PQsocket(streamConn) + 1, &input_mask,
! NULL, NULL, ptr_timeout);
#endif /* HAVE_POLL */
}
! if (ret == 0 || (ret < 0 && errno == EINTR))
! return false;
if (ret < 0)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("select() failed: %m")));
! return true;
}
/*
--- 320,397 ----
}
ret = select(PQsocket(streamConn) + 1, &input_mask,
! &output_mask, NULL, ptr_timeout);
#endif /* HAVE_POLL */
}
! if (ret == 0) /* timeout */
! return 0;
! if (ret < 0 && errno == EINTR) /* interrupted */
! return -1;
if (ret < 0)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("select() failed: %m")));
! return ret;
! }
!
! /*
! * Send a query and wait for the result by using the asynchronous libpq
! * functions and the backend version of select().
! *
! * On Windows, walreceiver must use this function instead of the blocking
! * libpq functions like PQexec() since they use the vanilla select() and
! * are uninterruptible by our emulated signals. On the other hand, this
! * function is interruptible since it uses the signal emulation layer
! * compatible select().
! */
! static PGresult *
! libpqrcv_PQexec(const char *query)
! {
! PGresult *res = NULL;
!
! /*
! * Submit a query. Since we don't use non-blocking mode, this also
! * can block. But its risk is relatively small, so we ignore that
! * for now.
! */
! if (!PQsendQuery(streamConn, query))
! return NULL;
!
! for (;;)
! {
! PGresult *next;
!
! /*
! * Receive data until PQgetResult has been ready to get the
! * result without blocking.
! */
! while (PQisBusy(streamConn))
! {
! if (libpq_select(true, false, -1) < 0)
! continue; /* interrupted */
! if (PQconsumeInput(streamConn) == 0)
! return NULL; /* trouble */
! }
!
! /*
! * Don't emulate the PQexec()'s behavior of returning the last
! * result, if there's many, since walreceiver never sends a query
! * returning multiple results.
! */
! if ((next = PQgetResult(streamConn)) == NULL)
! break; /* query is complete */
! if (PQresultStatus(next) == PGRES_FATAL_ERROR)
! return next;
! PQclear(res);
! res = next;
! if (PQresultStatus(res) == PGRES_COPY_IN ||
! PQresultStatus(res) == PGRES_COPY_OUT ||
! PQstatus(streamConn) == CONNECTION_BAD)
! break;
! }
!
! return res;
}
/*
***************
*** 268,274 **** libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
*/
if (timeout > 0 && !justconnected)
{
! if (!libpq_select(timeout))
return false;
if (PQconsumeInput(streamConn) == 0)
--- 440,446 ----
*/
if (timeout > 0 && !justconnected)
{
! if (libpq_select(true, false, timeout) <= 0)
return false;
if (PQconsumeInput(streamConn) == 0)
On Wed, Mar 10, 2010 at 10:09, Fujii Masao <masao.fujii@gmail.com> wrote:
Hi,
http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.
These are issues that affect other things running libpq in the backend
as well, right? Such as dblink? Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Fri, Mar 12, 2010 at 8:13 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Wed, Mar 10, 2010 at 10:09, Fujii Masao <masao.fujii@gmail.com> wrote:
Hi,
http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.These are issues that affect other things running libpq in the backend
as well, right? Such as dblink?
Yes. So Heikki wrote the patch for dblink.
http://archives.postgresql.org/pgsql-hackers/2010-01/msg02072.php
Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?
Sorry. I couldn't get your point. Could you explain it in detail?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Mon, Mar 15, 2010 at 10:14, Fujii Masao <masao.fujii@gmail.com> wrote:
On Fri, Mar 12, 2010 at 8:13 PM, Magnus Hagander <magnus@hagander.net> wrote:
On Wed, Mar 10, 2010 at 10:09, Fujii Masao <masao.fujii@gmail.com> wrote:
Hi,
http://archives.postgresql.org/pgsql-hackers/2010-01/msg01672.php
On win32, the blocking libpq functions like PQconnectdb() and
PQexec() are uninterruptible since they use the vanilla select()
instead of our signal emulation layer compatible select().
Nevertheless, currently walreceiver uses them to establish a
connection, send a handshake message and wait for the reply.
So walreceiver also becomes uninterruptible for a while. This
is the must-fix problem for 9.0.I replaced the blocking libpq functions currently used with
asynchronous ones, and used the emulated version of select()
to wait, to make walreceiver interruptible. Here is the patch.These are issues that affect other things running libpq in the backend
as well, right? Such as dblink?Yes. So Heikki wrote the patch for dblink.
http://archives.postgresql.org/pgsql-hackers/2010-01/msg02072.php
IIRC that was never applied.
Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?Sorry. I couldn't get your point. Could you explain it in detail?
What I'm referring to is the part that Heikki writes as "The
implementation should be shared between the two, but I'm not sure
how". I think we should try to factor out things that can be shared
into separate functions and stick those in port/win32 (assuming
they're win32-specific, otherwise, in another suitable location), and
then call them from both. There seems to be a lot of things that
should be doable that way.
I notice for example that the dblink patch doesn't have the code for
timeout handling - shouldn't it?
I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Mon, Mar 15, 2010 at 6:42 PM, Magnus Hagander <magnus@hagander.net> wrote:
Perhaps we can factor out most of this
into functions in backend/port/win32 so that we can re-use it fro
there?Sorry. I couldn't get your point. Could you explain it in detail?
What I'm referring to is the part that Heikki writes as "The
implementation should be shared between the two, but I'm not sure
how". I think we should try to factor out things that can be shared
into separate functions and stick those in port/win32 (assuming
they're win32-specific, otherwise, in another suitable location), and
then call them from both. There seems to be a lot of things that
should be doable that way.I notice for example that the dblink patch doesn't have the code for
timeout handling - shouldn't it?I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.
Thanks for the explanation. I agree that the code should be shared,
but am not sure how, too.
Something like libpq_select() which waits for the socket to become
ready would be required for walreceiver and dblink. But it's necessary
for walreceiver on not only win32 but also the other, so some functions
might need to be placed in the location other than port/win32.
I'll think of this issue for a while.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Fujii Masao wrote:
On Mon, Mar 15, 2010 at 6:42 PM, Magnus Hagander <magnus@hagander.net> wrote:
I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.
Agreed.
Something like libpq_select() which waits for the socket to become
ready would be required for walreceiver and dblink. But it's necessary
for walreceiver on not only win32 but also the other, ...
Really, why? I thought this is a purely Windows specific problem.
Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
On 03/15/2010 02:42 AM, Magnus Hagander wrote:
I think we need to look at this as a single problem needing to be
solved, and then have the same solution applied to dblink and
walreceiver.
+1
Joe
On Tue, Mar 16, 2010 at 12:32 AM, Heikki Linnakangas
<heikki.linnakangas@enterprisedb.com> wrote:
Something like libpq_select() which waits for the socket to become
ready would be required for walreceiver and dblink. But it's necessary
for walreceiver on not only win32 but also the other, ...Really, why? I thought this is a purely Windows specific problem.
Because, on all the platforms, libpq_receive() needs to call libpq_select().
Or you mean that we should leave the existing libpq_select() as it is, and
create new win32 specific function which waits for the socket to become ready,
for this issue?
Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.
I'll try to replace PQexec() first, and PQconnectdb() second if I have
enough time.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Mar 16, 2010 at 10:35 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.I'll try to replace PQexec() first, and PQconnectdb() second if I have
enough time.
Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().
pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.
I'm not sure if my patch is in the right way. If you notice anything,
please feel free to comment!
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
pgwin32_PQexec_v1.patchapplication/octet-stream; name=pgwin32_PQexec_v1.patchDownload
*** a/contrib/dblink/Makefile
--- b/contrib/dblink/Makefile
***************
*** 20,22 **** top_builddir = ../..
--- 20,27 ----
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
+
+ ifeq ($(PORTNAME), win32)
+ override PG_CPPFLAGS := -I$(libpqbe_srcdir) $(PG_CPPFLAGS)
+ override SHLIB_LINK := $(libpqbe) $(SHLIB_LINK)
+ endif
*** a/contrib/dblink/dblink.c
--- b/contrib/dblink/dblink.c
***************
*** 67,72 ****
--- 67,76 ----
#include "dblink.h"
+ #ifdef WIN32
+ #include "port/win32/libpqbe.h"
+ #endif
+
PG_MODULE_MAGIC;
typedef struct remoteConn
*** a/src/Makefile
--- b/src/Makefile
***************
*** 21,26 **** all install installdirs uninstall distprep:
--- 21,27 ----
$(MAKE) -C backend/snowball $@
$(MAKE) -C include $@
$(MAKE) -C interfaces $@
+ $(MAKE) -C backend/port/win32/libpqbe $@
$(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
***************
*** 52,57 **** clean:
--- 53,59 ----
$(MAKE) -C backend/snowball $@
$(MAKE) -C include $@
$(MAKE) -C interfaces $@
+ $(MAKE) -C backend/port/win32/libpqbe $@
$(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
***************
*** 67,72 **** distclean maintainer-clean:
--- 69,75 ----
$(MAKE) -C backend/snowball $@
$(MAKE) -C include $@
$(MAKE) -C interfaces $@
+ $(MAKE) -C backend/port/win32/libpqbe $@
$(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
***************
*** 82,87 **** coverage:
--- 85,91 ----
$(MAKE) -C backend/utils/mb/conversion_procs $@
$(MAKE) -C backend/snowball $@
$(MAKE) -C interfaces $@
+ $(MAKE) -C backend/port/win32/libpqbe $@
$(MAKE) -C backend/replication/libpqwalreceiver $@
$(MAKE) -C bin $@
$(MAKE) -C pl $@
*** a/src/Makefile.global.in
--- b/src/Makefile.global.in
***************
*** 395,400 **** else
--- 395,411 ----
libpq_pgport = -L$(top_builddir)/src/port -lpgport $(libpq)
endif
+ # This macro is for use by backend libraries that use libpq.
+ ifdef PGXS
+ libpqbe_srcdir = $(includedir)
+ libpqbe_builddir = $(libdir)
+ libpqbe = -L$(libdir) -lpqbe $(libpq)
+ else
+ libpqbe_srcdir = $(top_srcdir)/src/include/port/win32
+ libpqbe_builddir = $(top_builddir)/src/backend/port/win32/libpqbe
+ libpqbe = -L$(libpqbe_builddir) -lpqbe $(libpq)
+ endif
+
submake-libpq:
$(MAKE) -C $(libpq_builddir) all
***************
*** 402,408 **** submake-libpq:
submake-libpgport:
$(MAKE) -C $(top_builddir)/src/port all
! .PHONY: submake-libpq submake-libpgport
##########################################################################
--- 413,422 ----
submake-libpgport:
$(MAKE) -C $(top_builddir)/src/port all
! submake-libpqbe: submake-libpq
! $(MAKE) -C $(libpqbe_builddir) all
!
! .PHONY: submake-libpq submake-libpgport submake-libpqbe
##########################################################################
*** /dev/null
--- b/src/backend/port/win32/libpqbe/Makefile
***************
*** 0 ****
--- 1,36 ----
+ #-------------------------------------------------------------------------
+ #
+ # Makefile--
+ # Makefile for src/backend/port/win32/libpqbe
+ #
+ # IDENTIFICATION
+ # $PostgreSQL$
+ #
+ #-------------------------------------------------------------------------
+
+ subdir = src/backend/port/win32/libpqbe
+ top_builddir = ../../../../..
+ include $(top_builddir)/src/Makefile.global
+
+ override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+
+ OBJS = libpqbe.o
+ SHLIB_LINK = $(libpq)
+ NAME = libpqbe
+
+ SHLIB_EXPORTS = exports.txt
+
+ ifeq ($(PORTNAME),win32)
+ all: submake-libpq all-shared-lib
+
+ include $(top_srcdir)/src/Makefile.shlib
+
+ install: all installdirs install-lib
+
+ installdirs: installdirs-lib
+
+ uninstall: uninstall-lib
+
+ clean distclean maintainer-clean: clean-lib
+ rm -f $(OBJS)
+ endif
*** /dev/null
--- b/src/backend/port/win32/libpqbe/exports.txt
***************
*** 0 ****
--- 1,3 ----
+ # $PostgreSQL$
+ # Functions to be exported by libpqbe DLL
+ pgwin32_PQexec 1
*** /dev/null
--- b/src/backend/port/win32/libpqbe/libpqbe.c
***************
*** 0 ****
--- 1,102 ----
+ /*-------------------------------------------------------------------------
+ *
+ * libpqbe.c
+ * functions related to sending a query from the backend
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include "libpq-fe.h"
+ #include "port/win32/libpqbe.h"
+
+ /*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * On Windows, dblink and walreceiver must use this function instead of
+ * the blocking libpq functions like PQexec() since they use the vanilla
+ * select() and are uninterruptible by our emulated signals. On the other
+ * hand, this function is interruptible since it uses the signal emulation
+ * layer compatible select().
+ */
+ PGresult *
+ pgwin32_PQexec(PGconn *conn, const char *query)
+ {
+ PGresult *result = NULL;
+ PGresult *lastResult = NULL;
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also
+ * can block. But its risk is relatively small, so we ignore that
+ * for now.
+ */
+ if (!PQsendQuery(conn, query))
+ return NULL;
+
+ for (;;)
+ {
+ /*
+ * Receive data until PQgetResult has been ready to get the
+ * result without blocking.
+ */
+ while (PQisBusy(conn))
+ {
+ int ret;
+ int sock = PQsocket(conn);
+ fd_set input_mask;
+
+ if (sock < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("socket not open")));
+
+ FD_ZERO(&input_mask);
+ FD_SET (sock, &input_mask);
+
+ ret = select(sock + 1, &input_mask, NULL, NULL, NULL);
+ if (ret == 0 || (ret < 0 && errno == EINTR))
+ continue; /* interrupted */
+ if (ret < 0)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ if (PQconsumeInput(conn) == 0)
+ return NULL;
+ }
+
+ /*
+ * Emulate the PQexec()'s behavior of returning the last result,
+ * if there's many.
+ *
+ * We don't try to concatenate error messages like PQexec() does.
+ * Doesn't seem worth the effort.
+ */
+ if ((result = PQgetResult(conn)) == NULL)
+ break; /* query is complete */
+ if (lastResult)
+ {
+ if (PQresultStatus(lastResult) == PGRES_FATAL_ERROR &&
+ PQresultStatus(result) == PGRES_FATAL_ERROR)
+ {
+ PQclear(result);
+ result = lastResult;
+ }
+ else
+ PQclear(lastResult);
+ }
+ lastResult = result;
+ if (PQresultStatus(result) == PGRES_COPY_IN ||
+ PQresultStatus(result) == PGRES_COPY_OUT ||
+ PQstatus(conn) == CONNECTION_BAD)
+ break;
+ }
+
+ return lastResult;
+ }
*** a/src/backend/replication/libpqwalreceiver/Makefile
--- b/src/backend/replication/libpqwalreceiver/Makefile
***************
*** 15,24 **** include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
OBJS = libpqwalreceiver.o
SHLIB_LINK = $(libpq)
NAME = libpqwalreceiver
! all: submake-libpq all-shared-lib
include $(top_srcdir)/src/Makefile.shlib
--- 15,33 ----
override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
OBJS = libpqwalreceiver.o
+ ifeq ($(PORTNAME), win32)
+ override CPPFLAGS := -I$(libpqbe_srcdir) $(CPPFLAGS)
+ SHLIB_LINK = $(libpqbe)
+ else
SHLIB_LINK = $(libpq)
+ endif
NAME = libpqwalreceiver
! ifeq ($(PORTNAME), win32)
! all: submake-libpqbe all-shared-lib
! else
! all: submake-libpq all-shared-lib
! endif
include $(top_srcdir)/src/Makefile.shlib
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 25,30 ****
--- 25,34 ----
#include "replication/walreceiver.h"
#include "utils/builtins.h"
+ #ifdef WIN32
+ #include "port/win32/libpqbe.h"
+ #endif
+
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
*** /dev/null
--- b/src/include/port/win32/libpqbe.h
***************
*** 0 ****
--- 1,23 ----
+ /*-------------------------------------------------------------------------
+ *
+ * libpqbe.h
+ * functions related to sending a query from the backend
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+ #ifndef LIBPQBE_H
+ #define LIBPQBE_H
+
+ #include "libpq-fe.h"
+
+ #define PQexec(conn, query) pgwin32_PQexec(conn, query)
+ PGresult *pgwin32_PQexec(PGconn *conn, const char *query);
+
+ #endif /* LIBPQBE_H */
On Thu, Mar 25, 2010 at 15:33, Fujii Masao <masao.fujii@gmail.com> wrote:
On Tue, Mar 16, 2010 at 10:35 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.I'll try to replace PQexec() first, and PQconnectdb() second if I have
enough time.Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.I'm not sure if my patch is in the right way. If you notice anything,
please feel free to comment!
Well, first of all, it would require the addition of the new target to
the msvc build system, but that's easy - I can do that for you.
More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.
How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.
The downside would be that other third-party modules who need to call
libpq wouldn't be able to access this without copying the function.
But is this really something that's useful for third party modules?
As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Magnus Hagander <magnus@hagander.net> writes:
On Thu, Mar 25, 2010 at 15:33, Fujii Masao <masao.fujii@gmail.com> wrote:
Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.
[ assorted objections ]
I disapprove of the whole approach, actually. The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly. Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
You've more or less done that loop, but you've put it in the wrong
place.
The larger point is that I don't believe this issue exists only on
Windows. I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.
regards, tom lane
On Fri, Apr 2, 2010 at 17:26, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Magnus Hagander <magnus@hagander.net> writes:
On Thu, Mar 25, 2010 at 15:33, Fujii Masao <masao.fujii@gmail.com> wrote:
Sorry for the delay. The attached patch replaces PQexec() used by dblink
and libpqwalreceiver with pgwin32_PQexec() which is the win32 version of
PQexec().pgwin32_PQexec() is provided as the library 'libpqbe.dll', which is created
only on win32. dblink.dll and libpqwalreceiver.dll refer to libpqbe.dll.
Also libpqbe.dll refers to libpq.dll.[ assorted objections ]
I disapprove of the whole approach, actually. The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly. Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
You've more or less done that loop, but you've put it in the wrong
place.
Any particular reason not to wrap that in a function? Not called
pgwin32_PQexec() then, but something more generic? And not doing any
#defines to change PQexec, but call that wrapper directly?
The larger point is that I don't believe this issue exists only on
Windows. I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.
The most important part of the issue doesn't (because PQexec will be
interrupted by a signal), but there may certainly be others.
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.
Yeah, good point.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Magnus Hagander <magnus@hagander.net> writes:
On Fri, Apr 2, 2010 at 17:26, Tom Lane <tgl@sss.pgh.pa.us> wrote:
I disapprove of the whole approach, actually. �The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly. �Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
You've more or less done that loop, but you've put it in the wrong
place.
Any particular reason not to wrap that in a function? Not called
pgwin32_PQexec() then, but something more generic? And not doing any
#defines to change PQexec, but call that wrapper directly?
Yeah, that's fine. I just think it's easier to deal with this as a
local issue in walreceiver and dblink than to try to pretend we're
changing libpq's API.
The larger point is that I don't believe this issue exists only on
Windows. �I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.
The most important part of the issue doesn't (because PQexec will be
interrupted by a signal), but there may certainly be others.
Really? As you pointed out yourself, if control doesn't reach a
CHECK_FOR_INTERRUPTS then we have a problem. We also know that select
isn't interrupted by signals on all platforms.
regards, tom lane
On Fri, Apr 2, 2010 at 11:11 PM, Magnus Hagander <magnus@hagander.net> wrote:
More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.
ISTM that we cannot compile dblink using USE_PGXS=1, if that DLL doesn't
exist in the installation directory. No?
As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?
Yes. I'll add a CHECK_FOR_INTERRUPTS in the loop.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Sat, Apr 3, 2010 at 12:26 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
I disapprove of the whole approach, actually. The right way to fix this
is to not touch or replace libpq at all, but to change walreceiver to
use libpq's async-query facilities directly. Instead of PQexec, use
PQsendQuery and then a loop involving PQisBusy, PQgetResult, etc.
Yes.
You've more or less done that loop, but you've put it in the wrong
place.
OK. I'll reconsider about how to use those asynchronous libpq functions.
But, if possible, could you point out where "the right place" is?
The larger point is that I don't believe this issue exists only on
Windows. I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.
OK. I'll change the part so that poll() is used if HAVE_POLL is defined,
select() otherwise.
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.
OK. I'll address this problem. Since PGconn->errorMessage cannot be
updated from outside of libpq, I'm thinking of making the caller give
the StringInfo variable as a parameter to pgwin32_PQexec(), and
putting error messages in it. Then the caller use the StringInfo
instead of PQerrorMessage(PGconn), to get the error messages.
And ISTM that dblink needs to hold the StringInfo using hash as
do the PGconn.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Mon, Apr 5, 2010 at 3:18 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Fri, Apr 2, 2010 at 11:11 PM, Magnus Hagander <magnus@hagander.net> wrote:
More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.ISTM that we cannot compile dblink using USE_PGXS=1, if that DLL doesn't
exist in the installation directory. No?
I might have misinterpreted your point. You mean that the same source
file defining something like pgwin32_PQexec should be placed in both
contrib/dblink and src/backend/replication/libpqwalreceiver? If so,
we can compile dblink using USE_PGXS without the DLL.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Apr 6, 2010 at 2:25 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Mon, Apr 5, 2010 at 3:18 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Fri, Apr 2, 2010 at 11:11 PM, Magnus Hagander <magnus@hagander.net> wrote:
More to the point, I'm not sure I like the creation of yet another DLL
to deal with this. The reason this isn't just exported from the main
backend is the same reason we created the libpqwalreceiver library I'm
sure - bt that means we already have one.How about we just use this same source file, but compile and link it
directly into both dblink and libpqwalreceiver? That'd leave us with
one DLL less, making life easier.ISTM that we cannot compile dblink using USE_PGXS=1, if that DLL doesn't
exist in the installation directory. No?I might have misinterpreted your point. You mean that the same source
file defining something like pgwin32_PQexec should be placed in both
contrib/dblink and src/backend/replication/libpqwalreceiver? If so,
we can compile dblink using USE_PGXS without the DLL.
No, I don't mean that. I mean store it in one place, and copy/link it
into where it's used. Look at for example how crypt.c and
getaddrinfo.c are handled in libpq.
Not sure how that will play with PGXS, though, but I'm not entirely
sure we care if it can be built that way? If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.
Also note that per Tom's comments this is not a win32 only fix, so it
shouldn't be called pgwin32_*().
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Wed, Apr 7, 2010 at 1:45 AM, Magnus Hagander <magnus@hagander.net> wrote:
No, I don't mean that. I mean store it in one place, and copy/link it
into where it's used. Look at for example how crypt.c and
getaddrinfo.c are handled in libpq.
Thanks for the advice!
Not sure how that will play with PGXS, though, but I'm not entirely
sure we care if it can be built that way?
Probably Yes.
If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.
If we can copy/link the source file defining "new PQexec" when
we compile the dblink, DLL doesn't seem to be required. So I
stop creating new DLL for PGXS.
Also note that per Tom's comments this is not a win32 only fix, so it
shouldn't be called pgwin32_*().
Yep.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Thu, Apr 8, 2010 at 5:01 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.If we can copy/link the source file defining "new PQexec" when
we compile the dblink, DLL doesn't seem to be required. So I
stop creating new DLL for PGXS.
On second thought, ISTM that we cannot use any source files which exist
in places other than contrib/dblink and installation directory when we
compile dblink under USE_PGXS=1. But we can put the file implementing
new PQexec on those neither. So I'm thinking again that it should be
provided as the shared library and be linked from walreceiver and dblink.
Is this right?
If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Fujii Masao wrote:
If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?
I would agree with this. No one has ever complained that I am aware of.
Joe
On Mon, Apr 12, 2010 at 13:54, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Apr 8, 2010 at 5:01 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
If it does, there should be
some way to get PGXS to execute that rule as well, I'm sure.If we can copy/link the source file defining "new PQexec" when
we compile the dblink, DLL doesn't seem to be required. So I
stop creating new DLL for PGXS.On second thought, ISTM that we cannot use any source files which exist
in places other than contrib/dblink and installation directory when we
compile dblink under USE_PGXS=1. But we can put the file implementing
new PQexec on those neither. So I'm thinking again that it should be
provided as the shared library and be linked from walreceiver and dblink.
Is this right?If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?
+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Tue, Apr 13, 2010 at 1:56 AM, Magnus Hagander <magnus@hagander.net> wrote:
If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...
OK. I'll focus on walreceiver now.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Tue, Apr 13, 2010 at 9:21 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Tue, Apr 13, 2010 at 1:56 AM, Magnus Hagander <magnus@hagander.net> wrote:
If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...OK. I'll focus on walreceiver now.
The attached patch changes walreceiver so that it uses new function
libpqrcv_PQexec() instead of PQexec() for sending handshake message.
libpqrcv_PQexec() sends a query and wait for the results by using
the asynchronous libpq functions and the backend version of select().
It's interruptible by signals. You would be able to shut the standby
server down in the middle of handshake for replication connection.
http://archives.postgresql.org/pgsql-hackers/2010-03/msg00625.php
Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.
According to the suggestion, I replaced only the PQexec() and didn't
add the replacement of PQconnect() for now.
http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php
As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?
Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.
http://archives.postgresql.org/pgsql-hackers/2010-04/msg00084.php
The larger point is that I don't believe this issue exists only on
Windows. I think we're going to want something like this on all
platforms, and that implies supporting poll() not just select() for the
waiting part.
The patch uses libpq_select() to check for the socket. It supports
poll() and select().
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.
The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
libpqrcv_PQexec_v1.patchapplication/octet-stream; name=libpqrcv_PQexec_v1.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 54,59 **** static void libpqrcv_disconnect(void);
--- 54,60 ----
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
+ static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
***************
*** 97,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
--- 98,104 ----
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
***************
*** 149,155 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = PQexec(streamConn, cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
--- 150,156 ----
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
***************
*** 225,230 **** libpq_select(int timeout_ms)
--- 226,302 ----
}
/*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the blocking libpq functions like PQexec() for that
+ * purpose because they are uninterruptible by signals on some platforms.
+ * Similarly, we must not use the vanilla select() here because it cannot
+ * handle the signals emulated for Windows. The signal emulation layer
+ * compatible select() must be called instead.
+ */
+ static PGresult *
+ libpqrcv_PQexec(const char *query)
+ {
+ PGresult *res = NULL;
+
+ /*
+ * PQexec() silently discards any prior query results at first.
+ * But this preparation is not required for walreceiver because
+ * it's expected that walsender doesn't generate such junk results.
+ */
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also
+ * can block. But its risk is relatively small, so we ignore that
+ * for now.
+ */
+ if (!PQsendQuery(streamConn, query))
+ return NULL;
+
+ for (;;)
+ {
+ PGresult *next;
+
+ /*
+ * Receive data until PQgetResult has been ready to get the
+ * result without blocking.
+ */
+ while (PQisBusy(streamConn))
+ {
+ /*
+ * We don't need to break down the sleep into smaller increments,
+ * and check for interrupts after each nap. Because we can just
+ * elog(FATAL) within SIGTERM signal handler when the signal
+ * arrives in the middle of establishment of replication connection.
+ */
+ if (!libpq_select(-1))
+ continue; /* interrupted */
+ if (PQconsumeInput(streamConn) == 0)
+ return NULL; /* trouble */
+ }
+
+ /*
+ * Don't emulate the PQexec()'s behavior of returning the last
+ * result when there are many, since walreceiver never sends a
+ * query returning multiple results.
+ */
+ if ((next = PQgetResult(streamConn)) == NULL)
+ break; /* query is complete */
+ if (PQresultStatus(next) == PGRES_FATAL_ERROR)
+ return next;
+ PQclear(res);
+ res = next;
+ if (PQresultStatus(res) == PGRES_COPY_IN ||
+ PQresultStatus(res) == PGRES_COPY_OUT ||
+ PQstatus(streamConn) == CONNECTION_BAD)
+ break;
+ }
+
+ return res;
+ }
+
+ /*
* Disconnect connection to primary, if any.
*/
static void
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 86,93 **** static void DisableWalRcvImmediateExit(void);
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
! * check it in the main loop, because we perform some blocking libpq
! * operations like PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
--- 86,93 ----
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
! * check it in the main loop, because we perform some blocking operations
! * like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
On Wed, Apr 14, 2010 at 10:02 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Tue, Apr 13, 2010 at 9:21 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Tue, Apr 13, 2010 at 1:56 AM, Magnus Hagander <magnus@hagander.net> wrote:
If adding new shared library is too big change at this point, I think
that we should postpone the fix only for dblink to 9.1 or later. Since
no one has complained about this long-term problem of dblink, I'm not
sure it really should be fixed right now. Thought?+1. Let's fix walreceiver for now, and we can revisit dblink later.
Since we haven't had any complaints so far...OK. I'll focus on walreceiver now.
The attached patch changes walreceiver so that it uses new function
libpqrcv_PQexec() instead of PQexec() for sending handshake message.
libpqrcv_PQexec() sends a query and wait for the results by using
the asynchronous libpq functions and the backend version of select().
It's interruptible by signals. You would be able to shut the standby
server down in the middle of handshake for replication connection.http://archives.postgresql.org/pgsql-hackers/2010-03/msg00625.php
Just replacing PQexec() with PQsendQuery() is pretty straightforward, we
could put that replacement in a file in port/win32. Replacing
PQconnectdb() is more complicated because you need to handle connection
timeout. I suggest that we only add the replacement for PQexec(), and
live with the situation for PQconnectdb(), that covers 99% of the
scenarios anyway.According to the suggestion, I replaced only the PQexec() and didn't
add the replacement of PQconnect() for now.http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php
As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.
It will call the signal handler, yes. Normally, the signal handler
just sets a flag somewhere, which needs to be checked with
CHECK_FOR_INTERRUPTS.
From how I read the walreceiver.c code (which I'm not that familiar
with), the signal handlers call ProcessWalRcvInterrupts() which in
turn has CHECK_FOR_INTERRUPTS in it, and this is where it ends up
being called.
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.
Given those shortcuts, can't we simplify it even further like
attached? (If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)
Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.
Finally, I've updated some of the comments.
Note: I've only tested this patch as far as that it compiles. I don't
have a SR env around right now, so I'll have to complete with that
later. But if you have a chance to test that it fixes your test case,
please do!
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
Attachments:
libpqrcv_PQexec_v2.patchapplication/octet-stream; name=libpqrcv_PQexec_v2.patchDownload
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e451c55..5e759b6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
+static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
@@ -97,10 +98,11 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
- res = PQexec(streamConn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
+ if (res == NULL || PQresultStatus(res) != PGRES_TUPLES_OK)
{
- PQclear(res);
+ if (res)
+ PQclear(res);
ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
@@ -149,11 +151,15 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
- res = PQexec(streamConn, cmd);
- if (PQresultStatus(res) != PGRES_COPY_OUT)
+ res = libpqrcv_PQexec(cmd);
+ if (res == NULL || PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ if (res)
+ PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
+ }
PQclear(res);
justconnected = true;
@@ -225,6 +231,66 @@ libpq_select(int timeout_ms)
}
/*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the regular blocking libpq functions like PQexec()
+ * since they are uninterruptible by signals on some platforms, such as
+ * Windows.
+ *
+ * We must also not use vanilla select() here since it cannot handle the
+ * signal emulation layer on Windows.
+ *
+ * The function is modeled on PQexec() in libpq, but only implements
+ * those parts that are in use in the walreceiver.
+ *
+ * Queries are always executed on the connection in streamConn.
+ */
+static PGresult *
+libpqrcv_PQexec(const char *query)
+{
+ /*
+ * PQexec() silently discards any prior query results on the
+ * connection. This is not required for walreceiver since it's
+ * expected that walsender won't generate any such junk results.
+ */
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also
+ * can block. But the risk is relatively small, so we ignore that
+ * for now.
+ */
+ if (!PQsendQuery(streamConn, query))
+ return NULL;
+
+ /*
+ * Receive data until PQgetResult is ready to get the result
+ * without blocking.
+ */
+ while (PQisBusy(streamConn))
+ {
+ /*
+ * We don't need to break down the sleep into smaller increments,
+ * and check for interrupts after each nap, since we can just
+ * elog(FATAL) within SIGTERM signal handler if the signal
+ * arrives in the middle of establishment of replication connection.
+ */
+ if (!libpq_select(-1))
+ continue; /* interrupted */
+ if (PQconsumeInput(streamConn) == 0)
+ return NULL; /* trouble */
+ }
+
+ /*
+ * Don't emulate the PQexec()'s behavior of returning the last
+ * result when there are many, since walreceiver never sends a
+ * query returning multiple results. Just return the first result
+ * available on the connection.
+ */
+ return PQgetResult(streamConn);
+}
+
+/*
* Disconnect connection to primary, if any.
*/
static void
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ff5f293..5ab8f68 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void);
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
- * check it in the main loop, because we perform some blocking libpq
- * operations like PQexec(), which can take a long time to finish.
+ * check it in the main loop, because we perform some blocking operations
+ * like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
Magnus Hagander <magnus@hagander.net> writes:
Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.
I think you're just adding useless complexity there. PQresultStatus
defends itself just fine against a NULL input, and most other libpq
functions likewise.
regards, tom lane
On Wed, Apr 14, 2010 at 11:15 PM, Magnus Hagander <magnus@hagander.net> wrote:
http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php
As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.It will call the signal handler, yes. Normally, the signal handler
just sets a flag somewhere, which needs to be checked with
CHECK_FOR_INTERRUPTS.From how I read the walreceiver.c code (which I'm not that familiar
with), the signal handlers call ProcessWalRcvInterrupts() which in
turn has CHECK_FOR_INTERRUPTS in it, and this is where it ends up
being called.
Yes. While establishing replication connection (i.e., executing
walrcv_connect function), the SIGTERM signal handler directly calls
ProcessWalRcvInterrupts() which does CHECK_FOR_INTERRUPTS() and
elog(FATAL).
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.Given those shortcuts, can't we simplify it even further like
attached?
No, we need to repeat PQgetResult() at least two times. The first call
of it reads the RowDescription, DataRow and CommandComplete messages
and switches the state to PGASYNC_READY. The second one reads the
ReadyForQuery message and switches the state to PGASYNC_IDLE. So if we
don't repeat it, libpqrcv_PQexec() would end in a half-finished state.
(If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)
PQclear(NULL) might be called in my patch, but this is not a problem
since PQclear() does nothing if the specified PGresult argument is NULL.
Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.
As Tom pointed out in another post, we don't need to treat the
"result is NULL" case as special. OTOH, as you pointed out, I
forgot calling PQclear() when the second call of libpqrcv_PQexec()
in libpqrcv_connect() fails. I added it to the patch. Thanks!
Finally, I've updated some of the comments.
Thanks a lot! I applied that improvements to the patch.
I attached the revised patch.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
libpqrcv_PQexec_v3.patchapplication/octet-stream; name=libpqrcv_PQexec_v3.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 54,59 **** static void libpqrcv_disconnect(void);
--- 54,60 ----
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
+ static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
***************
*** 97,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
--- 98,104 ----
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = PQexec(streamConn, cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
PQclear(res);
justconnected = true;
--- 150,163 ----
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
+ }
PQclear(res);
justconnected = true;
***************
*** 225,230 **** libpq_select(int timeout_ms)
--- 229,311 ----
}
/*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the regular blocking libpq functions like PQexec()
+ * since they are uninterruptible by signals on some platforms, such as
+ * Windows.
+ *
+ * We must also not use vanilla select() here since it cannot handle the
+ * signal emulation layer on Windows.
+ *
+ * The function is modeled on PQexec() in libpq, but only implements
+ * those parts that are in use in the walreceiver.
+ *
+ * Queries are always executed on the connection in streamConn.
+ */
+ static PGresult *
+ libpqrcv_PQexec(const char *query)
+ {
+ PGresult *res = NULL;
+
+ /*
+ * PQexec() silently discards any prior query results on the
+ * connection. This is not required for walreceiver since it's
+ * expected that walsender won't generate any such junk results.
+ */
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also
+ * can block. But its risk is relatively small, so we ignore that
+ * for now.
+ */
+ if (!PQsendQuery(streamConn, query))
+ return NULL;
+
+ for (;;)
+ {
+ PGresult *next;
+
+ /*
+ * Receive data until PQgetResult is ready to get the result
+ * without blocking.
+ */
+ while (PQisBusy(streamConn))
+ {
+ /*
+ * We don't need to break down the sleep into smaller increments,
+ * and check for interrupts after each nap, since we can just
+ * elog(FATAL) within SIGTERM signal handler if the signal
+ * arrives in the middle of establishment of replication connection.
+ */
+ if (!libpq_select(-1))
+ continue; /* interrupted */
+ if (PQconsumeInput(streamConn) == 0)
+ return NULL; /* trouble */
+ }
+
+ /*
+ * Don't emulate the PQexec()'s behavior of returning the last
+ * result when there are many, since walreceiver never sends a
+ * query returning multiple results.
+ */
+ if ((next = PQgetResult(streamConn)) == NULL)
+ break; /* query is complete */
+ if (PQresultStatus(next) == PGRES_FATAL_ERROR)
+ return next;
+ PQclear(res);
+ res = next;
+ if (PQresultStatus(res) == PGRES_COPY_IN ||
+ PQresultStatus(res) == PGRES_COPY_OUT ||
+ PQstatus(streamConn) == CONNECTION_BAD)
+ break;
+ }
+
+ return res;
+ }
+
+ /*
* Disconnect connection to primary, if any.
*/
static void
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 86,93 **** static void DisableWalRcvImmediateExit(void);
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
! * check it in the main loop, because we perform some blocking libpq
! * operations like PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
--- 86,93 ----
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
! * check it in the main loop, because we perform some blocking operations
! * like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
On Thu, Apr 15, 2010 at 4:17 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote:
Magnus Hagander <magnus@hagander.net> writes:
Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.I think you're just adding useless complexity there. PQresultStatus
defends itself just fine against a NULL input, and most other libpq
functions likewise.
Yeah, I realized that after posting it. I was still stuck in ancient
times when at least some of those functions couldn't be called with
NULL pointers, so I just put that in there by default :-)
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Thu, Apr 15, 2010 at 5:13 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Wed, Apr 14, 2010 at 11:15 PM, Magnus Hagander <magnus@hagander.net> wrote:
The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.Given those shortcuts, can't we simplify it even further like
attached?No, we need to repeat PQgetResult() at least two times. The first call
of it reads the RowDescription, DataRow and CommandComplete messages
and switches the state to PGASYNC_READY. The second one reads the
ReadyForQuery message and switches the state to PGASYNC_IDLE. So if we
don't repeat it, libpqrcv_PQexec() would end in a half-finished state.
Ah, ok. That's what I get for not testing it :-)
I still think that could be implemented in a much clearer way though.
Just calling PQgetResult() twice, and checking the return values
sequentially would be much easier to read, imho. Looking through taht
set of "break" statements at the end of the loop is just confusing. If
nothing else, it needs more comments.
But maybe I'm just bikeshedding ;)
(If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)PQclear(NULL) might be called in my patch, but this is not a problem
since PQclear() does nothing if the specified PGresult argument is NULL.
Ah, I missed that you initialized it to NULL.
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
On Wed, Apr 14, 2010 at 11:13 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Wed, Apr 14, 2010 at 11:15 PM, Magnus Hagander <magnus@hagander.net> wrote:
http://archives.postgresql.org/pgsql-hackers/2010-04/msg00077.php
As for the code itself, don't we need a CHECK_FOR_INTERRUPTS in there
for it to be actually useful?Since the backend version of select() receives any incoming signals
on Win32, CHECK_FOR_INTERRUPTS seems not to be needed in the loop,
at least in walreceiver. No? The patch doesn't put it in there, and
I was able to interrupt walreceiver executing libpqrcv_PQexec() when
I tested the patch on Win32.It will call the signal handler, yes. Normally, the signal handler
just sets a flag somewhere, which needs to be checked with
CHECK_FOR_INTERRUPTS.From how I read the walreceiver.c code (which I'm not that familiar
with), the signal handlers call ProcessWalRcvInterrupts() which in
turn has CHECK_FOR_INTERRUPTS in it, and this is where it ends up
being called.Yes. While establishing replication connection (i.e., executing
walrcv_connect function), the SIGTERM signal handler directly calls
ProcessWalRcvInterrupts() which does CHECK_FOR_INTERRUPTS() and
elog(FATAL).The patch also seems confused about whether it's intending to be a
general-purpose solution or not. You can maybe take some shortcuts
if it's only going to be for walreceiver, but if you're going to put
it in dblink it is *not* acceptable to take shortcuts like not
processing errors completely.The patch still takes some shortcuts since we decided to postpone
the fix for dblink to 9.1 or later.Given those shortcuts, can't we simplify it even further like
attached?No, we need to repeat PQgetResult() at least two times. The first call
of it reads the RowDescription, DataRow and CommandComplete messages
and switches the state to PGASYNC_READY. The second one reads the
ReadyForQuery message and switches the state to PGASYNC_IDLE. So if we
don't repeat it, libpqrcv_PQexec() would end in a half-finished state.(If nothing else, your code did PQclear() on an
uninitialized pointer - must've been pure luck that it worked)PQclear(NULL) might be called in my patch, but this is not a problem
since PQclear() does nothing if the specified PGresult argument is NULL.Looking at the call-sites, there are bugs now - if PQexec() returns
NULL, we don't deal with it. It also doesn't always free the result
properly. I've added checks for that.As Tom pointed out in another post, we don't need to treat the
"result is NULL" case as special. OTOH, as you pointed out, I
forgot calling PQclear() when the second call of libpqrcv_PQexec()
in libpqrcv_connect() fails. I added it to the patch. Thanks!Finally, I've updated some of the comments.
Thanks a lot! I applied that improvements to the patch.
I attached the revised patch.
I have to admit to finding this confusing. According to the comments:
+ /*
+ * Don't emulate the PQexec()'s behavior of returning the last
+ * result when there are many, since walreceiver never sends a
+ * query returning multiple results.
+ */
...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.
...Robert
On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:
I have to admit to finding this confusing. According to the comments:
+ /* + * Don't emulate the PQexec()'s behavior of returning the last + * result when there are many, since walreceiver never sends a + * query returning multiple results. + */...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.
Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.
So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
libpqrcv_PQexec_v4.patchapplication/octet-stream; name=libpqrcv_PQexec_v4.patchDownload
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 54,59 **** static void libpqrcv_disconnect(void);
--- 54,60 ----
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
+ static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
***************
*** 97,103 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
--- 98,104 ----
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
! res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
***************
*** 149,159 **** libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = PQexec(streamConn, cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
PQclear(res);
justconnected = true;
--- 150,163 ----
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
! res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
+ }
PQclear(res);
justconnected = true;
***************
*** 225,230 **** libpq_select(int timeout_ms)
--- 229,309 ----
}
/*
+ * Send a query and wait for the results by using the asynchronous libpq
+ * functions and the backend version of select().
+ *
+ * We must not use the regular blocking libpq functions like PQexec()
+ * since they are uninterruptible by signals on some platforms, such as
+ * Windows.
+ *
+ * We must also not use vanilla select() here since it cannot handle the
+ * signal emulation layer on Windows.
+ *
+ * The function is modeled on PQexec() in libpq, but only implements
+ * those parts that are in use in the walreceiver.
+ *
+ * Queries are always executed on the connection in streamConn.
+ */
+ static PGresult *
+ libpqrcv_PQexec(const char *query)
+ {
+ PGresult *result = NULL;
+ PGresult *lastResult = NULL;
+
+ /*
+ * PQexec() silently discards any prior query results on the
+ * connection. This is not required for walreceiver since it's
+ * expected that walsender won't generate any such junk results.
+ */
+
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also
+ * can block. But its risk is relatively small, so we ignore that
+ * for now.
+ */
+ if (!PQsendQuery(streamConn, query))
+ return NULL;
+
+ for (;;)
+ {
+ /*
+ * Receive data until PQgetResult is ready to get the result
+ * without blocking.
+ */
+ while (PQisBusy(streamConn))
+ {
+ /*
+ * We don't need to break down the sleep into smaller increments,
+ * and check for interrupts after each nap, since we can just
+ * elog(FATAL) within SIGTERM signal handler if the signal
+ * arrives in the middle of establishment of replication connection.
+ */
+ if (!libpq_select(-1))
+ continue; /* interrupted */
+ if (PQconsumeInput(streamConn) == 0)
+ return NULL; /* trouble */
+ }
+
+ /*
+ * Emulate the PQexec()'s behavior of returning the last result
+ * when there are many. But we don't try to concatenate error
+ * results like PQexec() does since it's expected that walsender
+ * won't return multiple error results.
+ */
+ if ((result = PQgetResult(streamConn)) == NULL)
+ break; /* query is complete */
+ PQclear(lastResult);
+ lastResult = result;
+ if (PQresultStatus(result) == PGRES_COPY_IN ||
+ PQresultStatus(result) == PGRES_COPY_OUT ||
+ PQstatus(streamConn) == CONNECTION_BAD)
+ break;
+ }
+
+ return lastResult;
+ }
+
+ /*
* Disconnect connection to primary, if any.
*/
static void
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 86,93 **** static void DisableWalRcvImmediateExit(void);
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
! * check it in the main loop, because we perform some blocking libpq
! * operations like PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
--- 86,93 ----
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
! * check it in the main loop, because we perform some blocking operations
! * like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
On Fri, Apr 16, 2010 at 3:03 AM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:
I have to admit to finding this confusing. According to the comments:
+ /* + * Don't emulate the PQexec()'s behavior of returning the last + * result when there are many, since walreceiver never sends a + * query returning multiple results. + */...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?
Well, the comment definitely makes more sense in this version. I
can't speak to the behavior.
...Robert
On Fri, Apr 16, 2010 at 4:03 PM, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:
I have to admit to finding this confusing. According to the comments:
+ /* + * Don't emulate the PQexec()'s behavior of returning the last + * result when there are many, since walreceiver never sends a + * query returning multiple results. + */...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?
BTW, even if you apply the patch, you would not be able to interrupt the
walreceiver by using smart shutdown because of the bug reported in another
thread. Please be careful about that when you test the patch.
http://archives.postgresql.org/pgsql-hackers/2010-04/msg00592.php
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
On Fri, Apr 16, 2010 at 09:03, Fujii Masao <masao.fujii@gmail.com> wrote:
On Thu, Apr 15, 2010 at 11:26 PM, Robert Haas <robertmhaas@gmail.com> wrote:
I have to admit to finding this confusing. According to the comments:
+ /* + * Don't emulate the PQexec()'s behavior of returning the last + * result when there are many, since walreceiver never sends a + * query returning multiple results. + */...but it looks like the code actually is implementing some sort of
loop-that-returns-the-last result.Yeah, it's not a very accurate description. And I found another problem:
libpqrcv_PQexec() ends as soon as an error result arrives even if its
state has not been PGASYNC_IDLE yet.So I changed libpqrcv_PQexec() so that it emulates the PQexec()'s behavior
except the concatenation of error messages. How about the attached patch?
Applied with only minor stylistic changes. Thanks!
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/