pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Started by Andres Freundalmost 4 years ago15 messages
#1Andres Freund
andres@anarazel.de

Hi,

I was wondering in [1]/messages/by-id/20211231192528.wirwj4qaaw3ted5g@alap3.anarazel.de what we could do about the slowest tests on
windows.

On 2021-12-31 11:25:28 -0800, Andres Freund wrote:

Picking a random successful cfbot run [1] I see the following tap tests taking
more than 20 seconds:

67188 ms pg_basebackup t/010_pg_basebackup.pl
59710 ms recovery t/001_stream_rep.pl

Comparing these times to measurements taken on my normal linux workstation,
something seemed just *very* off, even with a slow CI instance and windows in
the mix.

A bunch of printf debugging later, I realized the problem is that several of
the pg_basebackups in tests take a *long* time. E.g. for t/001_stream_rep.pl
the backups from the standby each take just over 10s. That's awfully
specific...

# Taking pg_basebackup my_backup from node "standby_1"
# Running: pg_basebackup -D C:/dev/postgres/./tmp_check/t_001_stream_rep_standby_1_data/backup/my_backup -h C:/Users/myadmin/AppData/Local/Temp/yba26PBYX1 -p 59181 --checkpoint fast --no-sync --label my_backup -v
# ran in 10.145s
# Backup finished

This reproduceably happens and it's *not* related to the socket shutdown()
changes we've been debugging lately - even after a revert the problem
persists.

Because our logging for basebackups is quite weak, both for server and client
side, I needed to add a fair bit more debugging to figure it out:

pg_basebackup: wait to finish at 0.492
pg_basebackup: waiting for background process to finish streaming ...
pg_basebackup: stream poll timeout 10.112

The problem is that there's just no implemented way to timely shutdown the WAL
streaming thread in pg_basebackup. The code in pg_basebackup.c says:

if (verbose)
pg_log_info("waiting for background process to finish streaming ...");
...
/*
* On Windows, since we are in the same process, we can just store the
* value directly in the variable, and then set the flag that says
* it's there.
*/
...
xlogendptr = ((uint64) hi) << 32 | lo;
InterlockedIncrement(&has_xlogendptr);

But just setting a variable doesn't do much if the thread is in
HandleCopyStream()->CopyStreamPoll()->select()

The only reason we ever succeed shutting down, without more WAL coming in, is
that pg_basebackup defaults to sending a status message every 10 seconds. At
which point the thread sees has_xlogendptr = true, and shuts down.

A test specific workaround would be to just add --status-interval=1 to
Cluster.pm::backup(). But that seems very unsatisfying.

I don't immediately see a solution for this, other than to add
StreamCtl->stop_event (mirroring ->stop_socket) and then convert
CopyStreamPoll() to use WaitForMultipleObjects(). Microsoft's select()
doesn't support pipes and there's no socketpair().

Any more straightforward ideas?

From a cursory look at history, it used to be that pg_basebackup had this
behaviour on all platforms, but it got fixed for other platforms in
7834d20b57a by Tom (assuming the problem wasn't present there).

Greetings,

Andres Freund

[1]: /messages/by-id/20211231192528.wirwj4qaaw3ted5g@alap3.anarazel.de

#2Magnus Hagander
magnus@hagander.net
In reply to: Andres Freund (#1)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On Sun, Jan 16, 2022 at 10:22 AM Andres Freund <andres@anarazel.de> wrote:

Hi,

I was wondering in [1] what we could do about the slowest tests on
windows.

On 2021-12-31 11:25:28 -0800, Andres Freund wrote:

Picking a random successful cfbot run [1] I see the following tap tests taking
more than 20 seconds:

67188 ms pg_basebackup t/010_pg_basebackup.pl
59710 ms recovery t/001_stream_rep.pl

Comparing these times to measurements taken on my normal linux workstation,
something seemed just *very* off, even with a slow CI instance and windows in
the mix.

A bunch of printf debugging later, I realized the problem is that several of
the pg_basebackups in tests take a *long* time. E.g. for t/001_stream_rep.pl
the backups from the standby each take just over 10s. That's awfully
specific...

# Taking pg_basebackup my_backup from node "standby_1"
# Running: pg_basebackup -D C:/dev/postgres/./tmp_check/t_001_stream_rep_standby_1_data/backup/my_backup -h C:/Users/myadmin/AppData/Local/Temp/yba26PBYX1 -p 59181 --checkpoint fast --no-sync --label my_backup -v
# ran in 10.145s
# Backup finished

This reproduceably happens and it's *not* related to the socket shutdown()
changes we've been debugging lately - even after a revert the problem
persists.

Because our logging for basebackups is quite weak, both for server and client
side, I needed to add a fair bit more debugging to figure it out:

pg_basebackup: wait to finish at 0.492
pg_basebackup: waiting for background process to finish streaming ...
pg_basebackup: stream poll timeout 10.112

The problem is that there's just no implemented way to timely shutdown the WAL
streaming thread in pg_basebackup. The code in pg_basebackup.c says:

if (verbose)
pg_log_info("waiting for background process to finish streaming ...");
...
/*
* On Windows, since we are in the same process, we can just store the
* value directly in the variable, and then set the flag that says
* it's there.
*/
...
xlogendptr = ((uint64) hi) << 32 | lo;
InterlockedIncrement(&has_xlogendptr);

But just setting a variable doesn't do much if the thread is in
HandleCopyStream()->CopyStreamPoll()->select()

The only reason we ever succeed shutting down, without more WAL coming in, is
that pg_basebackup defaults to sending a status message every 10 seconds. At
which point the thread sees has_xlogendptr = true, and shuts down.

A test specific workaround would be to just add --status-interval=1 to
Cluster.pm::backup(). But that seems very unsatisfying.

I don't immediately see a solution for this, other than to add
StreamCtl->stop_event (mirroring ->stop_socket) and then convert
CopyStreamPoll() to use WaitForMultipleObjects(). Microsoft's select()
doesn't support pipes and there's no socketpair().

Any more straightforward ideas?

From a cursory look at history, it used to be that pg_basebackup had this
behaviour on all platforms, but it got fixed for other platforms in
7834d20b57a by Tom (assuming the problem wasn't present there).

Ugh, yeah that sounds like a correct analysis to me, and ugh, yeah
that's not very nice.

And yes, I think we have to create an event, and then use
WSAEventSelect() + WaitForSingleObjectEx(). Should be enough to just
use one event I think, and then the timeout -- but it might be more
readable to have a separate event for the socket and the stop? But we
can have just one event that's both used to stop and then use
WSAEventSelect() to associate it with the socket as well as neede.

(And yes, I agree that it's a lot better to fix it properly than to
just reduce the timeout)

--
Magnus Hagander
Me: https://www.hagander.net/
Work: https://www.redpill-linpro.com/

#3Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andres Freund (#1)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Andres Freund <andres@anarazel.de> writes:

I don't immediately see a solution for this, other than to add
StreamCtl->stop_event (mirroring ->stop_socket) and then convert
CopyStreamPoll() to use WaitForMultipleObjects(). Microsoft's select()
doesn't support pipes and there's no socketpair().
Any more straightforward ideas?
From a cursory look at history, it used to be that pg_basebackup had this
behaviour on all platforms, but it got fixed for other platforms in
7834d20b57a by Tom (assuming the problem wasn't present there).

Hmm --- I see that I thought Windows was unaffected, but I didn't
consider this angle.

Can we send the child process a signal to kick it off its wait?

regards, tom lane

#4Magnus Hagander
magnus@hagander.net
In reply to: Tom Lane (#3)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On Sun, Jan 16, 2022 at 5:34 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Andres Freund <andres@anarazel.de> writes:

I don't immediately see a solution for this, other than to add
StreamCtl->stop_event (mirroring ->stop_socket) and then convert
CopyStreamPoll() to use WaitForMultipleObjects(). Microsoft's select()
doesn't support pipes and there's no socketpair().
Any more straightforward ideas?
From a cursory look at history, it used to be that pg_basebackup had this
behaviour on all platforms, but it got fixed for other platforms in
7834d20b57a by Tom (assuming the problem wasn't present there).

Hmm --- I see that I thought Windows was unaffected, but I didn't
consider this angle.

Can we send the child process a signal to kick it off its wait?

No. (1) on Windows it's not a child process, it's a thread. And (2)
Windows doesn't have signals. We emulate those *in the backend* for
win32, but this problem is in the frontend where that emulation layer
doesn't exist.

--
Magnus Hagander
Me: https://www.hagander.net/
Work: https://www.redpill-linpro.com/

#5Magnus Hagander
magnus@hagander.net
In reply to: Magnus Hagander (#4)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On Sun, Jan 16, 2022 at 5:36 PM Magnus Hagander <magnus@hagander.net> wrote:

On Sun, Jan 16, 2022 at 5:34 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Andres Freund <andres@anarazel.de> writes:

I don't immediately see a solution for this, other than to add
StreamCtl->stop_event (mirroring ->stop_socket) and then convert
CopyStreamPoll() to use WaitForMultipleObjects(). Microsoft's select()
doesn't support pipes and there's no socketpair().
Any more straightforward ideas?
From a cursory look at history, it used to be that pg_basebackup had this
behaviour on all platforms, but it got fixed for other platforms in
7834d20b57a by Tom (assuming the problem wasn't present there).

Hmm --- I see that I thought Windows was unaffected, but I didn't
consider this angle.

Can we send the child process a signal to kick it off its wait?

No. (1) on Windows it's not a child process, it's a thread. And (2)
Windows doesn't have signals. We emulate those *in the backend* for
win32, but this problem is in the frontend where that emulation layer
doesn't exist.

Actually, just after sending that...

What we could do is do a WSACancelBlockingCall() which will cancel the
select() thereby making us do the check. However, per the docs
(https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsacancelblockingcall)
this function is no longer exported in Winsock 2, so this does not
seem to be the right way forward. There is no replacement function for
it -- the suggestion is basically "don't do that, use multithreading
instaed" which I think brings us back to the original suggestion of
WSAEventSelect().

--
Magnus Hagander
Me: https://www.hagander.net/
Work: https://www.redpill-linpro.com/

#6Andres Freund
andres@anarazel.de
In reply to: Magnus Hagander (#5)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Hi,

On 2022-01-16 17:39:11 +0100, Magnus Hagander wrote:

On Sun, Jan 16, 2022 at 5:36 PM Magnus Hagander <magnus@hagander.net> wrote:

On Sun, Jan 16, 2022 at 5:34 PM Tom Lane <tgl@sss.pgh.pa.us> wrote:

Andres Freund <andres@anarazel.de> writes:

I don't immediately see a solution for this, other than to add
StreamCtl->stop_event (mirroring ->stop_socket) and then convert
CopyStreamPoll() to use WaitForMultipleObjects(). Microsoft's select()
doesn't support pipes and there's no socketpair().
Any more straightforward ideas?
From a cursory look at history, it used to be that pg_basebackup had this
behaviour on all platforms, but it got fixed for other platforms in
7834d20b57a by Tom (assuming the problem wasn't present there).

Hmm --- I see that I thought Windows was unaffected, but I didn't
consider this angle.

Can we send the child process a signal to kick it off its wait?

No. (1) on Windows it's not a child process, it's a thread. And (2)
Windows doesn't have signals. We emulate those *in the backend* for
win32, but this problem is in the frontend where that emulation layer
doesn't exist.

[...] which I think brings us back to the original suggestion of
WSAEventSelect().

I hacked that up last night. And a fix or two later, it seems to be
working. What I'd missed at first is that the event needs to be reset in
reached_end_position(), otherwise we'll busy loop.

I wonder if using a short-lived event handle would have dangers of missing
FD_CLOSE here as well? It'd probably be worth avoiding the risk by creating
the event just once.

I just wasn't immediately sure where to stash it. Probably just by adding a
field in StreamCtl, that ReceiveXlogStream() then sets? So far it's constant
once passed to ReceiveXlogStream(), but I don't really see a reason why it'd
need to stay that way?

Greetings,

Andres Freund

#7Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#6)
1 attachment(s)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Hi,

On 2022-01-16 15:28:00 -0800, Andres Freund wrote:

I hacked that up last night. And a fix or two later, it seems to be
working. What I'd missed at first is that the event needs to be reset in
reached_end_position(), otherwise we'll busy loop.

I wonder if using a short-lived event handle would have dangers of missing
FD_CLOSE here as well? It'd probably be worth avoiding the risk by creating
the event just once.

I just wasn't immediately sure where to stash it. Probably just by adding a
field in StreamCtl, that ReceiveXlogStream() then sets? So far it's constant
once passed to ReceiveXlogStream(), but I don't really see a reason why it'd
need to stay that way?

Oops, attached the patch this time.

Greetings,

Andres Freund

Attachments:

v2-0001-Avoid-slow-shutdown-of-pg_basebackup-windows-edit.patchtext/x-diff; charset=us-asciiDownload
From 5f7f9b49d2e443b9299bc273d06696b5bf6cbaa4 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 16 Jan 2022 01:58:24 -0800
Subject: [PATCH v2] Avoid slow shutdown of pg_basebackup, windows edition.

See also 7834d20b57a.
---
 src/bin/pg_basebackup/pg_basebackup.c | 24 ++++++-
 src/bin/pg_basebackup/pg_receivewal.c |  4 ++
 src/bin/pg_basebackup/receivelog.c    | 92 ++++++++++++++++++++++++---
 src/bin/pg_basebackup/receivelog.h    |  4 ++
 4 files changed, 112 insertions(+), 12 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index aa43fc09241..51f48ce587f 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -146,6 +146,8 @@ static const char *progress_filename;
 /* Pipe to communicate with background wal receiver process */
 #ifndef WIN32
 static int	bgpipe[2] = {-1, -1};
+#else
+HANDLE *bgevent = NULL;
 #endif
 
 /* Handle to child process */
@@ -473,7 +475,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 	/*
 	 * At this point we have an end pointer, so compare it to the current
 	 * position to figure out if it's time to stop.
+	 *
+	 * On windows we need to reset the event used to wake up the streaming
+	 * thread, otherwise CopyStreamPoll() will start to immediately return.
 	 */
+#ifdef WIN32
+	ResetEvent(bgevent);
+#endif
+
 	if (segendpos >= xlogendptr)
 		return true;
 
@@ -508,7 +517,7 @@ LogStreamerMain(logstreamer_param *param)
 #ifndef WIN32
 	stream.stop_socket = bgpipe[0];
 #else
-	stream.stop_socket = PGINVALID_SOCKET;
+	stream.stop_event = bgevent;
 #endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
@@ -590,6 +599,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		pg_log_error("could not create pipe for background process: %m");
 		exit(1);
 	}
+#else
+	bgevent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (bgevent == NULL)
+	{
+		pg_log_error("could not create event for background thread: %lu",
+					 GetLastError());
+		exit(1);
+	}
 #endif
 
 	/* Get a second connection */
@@ -1635,7 +1652,9 @@ BaseBackup(void)
 		/*
 		 * On Windows, since we are in the same process, we can just store the
 		 * value directly in the variable, and then set the flag that says
-		 * it's there.
+		 * it's there. To interrupt the thread while it's waiting for network
+		 * IO, we set an event (which the thread waits on in addition to the
+		 * socket).
 		 */
 		if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
 		{
@@ -1645,6 +1664,7 @@ BaseBackup(void)
 		}
 		xlogendptr = ((uint64) hi) << 32 | lo;
 		InterlockedIncrement(&has_xlogendptr);
+		SetEvent(bgevent);
 
 		/* First wait for the thread to exit */
 		if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index ccb215c398c..d27bd85b7ce 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -618,7 +618,11 @@ StreamLog(void)
 					stream.timeline);
 
 	stream.stream_stop = stop_streaming;
+#ifndef WIN32
 	stream.stop_socket = PGINVALID_SOCKET;
+#else
+	stream.stop_event = NULL;
+#endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d39e4b11a1a..a64a1a8e8fe 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,8 +37,8 @@ static bool still_sending = true;	/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 								  XLogRecPtr *stoppos);
-static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
-static int	CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+static int	CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream);
+static int	CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 							  char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
 								int len, XLogRecPtr blockpos, TimestampTz *last_status);
@@ -813,7 +813,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
 												 last_status);
 
-		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
+		r = CopyStreamReceive(conn, sleeptime, stream, &copybuf);
 		while (r != 0)
 		{
 			if (r == -1)
@@ -858,7 +858,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			 * Process the received data, and any subsequent data we can read
 			 * without blocking.
 			 */
-			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
+			r = CopyStreamReceive(conn, 0, stream, &copybuf);
 		}
 	}
 
@@ -877,8 +877,9 @@ error:
  * or interrupted by signal or stop_socket input, and -1 on an error.
  */
 static int
-CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
+CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream)
 {
+#ifndef WIN32
 	int			ret;
 	fd_set		input_mask;
 	int			connsocket;
@@ -896,10 +897,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 	FD_ZERO(&input_mask);
 	FD_SET(connsocket, &input_mask);
 	maxfd = connsocket;
-	if (stop_socket != PGINVALID_SOCKET)
+	if (stream->stop_socket != PGINVALID_SOCKET)
 	{
-		FD_SET(stop_socket, &input_mask);
-		maxfd = Max(maxfd, stop_socket);
+		FD_SET(stream->stop_socket, &input_mask);
+		maxfd = Max(maxfd, stream->stop_socket);
 	}
 
 	if (timeout_ms < 0)
@@ -924,6 +925,77 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 		return 1;				/* Got input on connection socket */
 
 	return 0;					/* Got timeout or input on stop_socket */
+#else
+	int			ret;
+	int			rc;
+	HANDLE	   *network_event;
+	int			nevents = 0;
+	HANDLE		events[2];
+
+	network_event = WSACreateEvent();
+	if (network_event == WSA_INVALID_EVENT)
+	{
+		pg_log_error("failed to create event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	if (WSAEventSelect(PQsocket(conn), network_event, FD_READ | FD_CLOSE) != 0)
+	{
+		pg_log_error("failed to set up event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	events[0] = network_event;
+	nevents++;
+
+	if (stream->stop_event != NULL)
+	{
+		events[1] = stream->stop_event;
+		nevents++;
+	}
+
+	/* map timeout_ms to WaitForMultipleObjects expectations */
+	if (timeout_ms < 0)
+		timeout_ms = INFINITE;
+
+	rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms);
+
+	if (rc == WAIT_FAILED)
+	{
+		pg_log_error("WaitForMultipleObjects() failed: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
+	else if (rc == WAIT_TIMEOUT)
+	{
+		/* timeout exceeded */
+		ret = 0;
+	}
+	else if (rc == WAIT_OBJECT_0)
+	{
+		/* Got input on connection socket */;
+		ret = 1;
+	}
+	else if (rc == (WAIT_OBJECT_0 + 1))
+	{
+		Assert(stream->stop_event != NULL);
+		/* Got event on stop socket  */;
+		ret = 0;
+	}
+	else
+	{
+		pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc);
+		exit(1);
+	}
+
+	/* reset event association for libpq socket, clean up event */
+	WSAEventSelect(PQsocket(conn), NULL, 0);
+	WSACloseEvent(network_event);
+
+	return ret;
+#endif /* WIN32 */
 }
 
 /*
@@ -939,7 +1011,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  * -1 on error. -2 if the server ended the COPY.
  */
 static int
-CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 				  char **buffer)
 {
 	char	   *copybuf = NULL;
@@ -960,7 +1032,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
 		 * the specified timeout, so that we can ping the server.  Also stop
 		 * waiting if input appears on stop_socket.
 		 */
-		ret = CopyStreamPoll(conn, timeout, stop_socket);
+		ret = CopyStreamPoll(conn, timeout, stream);
 		if (ret <= 0)
 			return ret;
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 050d4bc69fd..2a643e71a74 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -40,8 +40,12 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
+#ifndef WIN32
 	pgsocket	stop_socket;	/* if valid, watch for input on this socket
 								 * and check stream_stop() when there is any */
+#else
+	HANDLE	   *stop_event;
+#endif
 
 	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
-- 
2.34.0

#8Magnus Hagander
magnus@hagander.net
In reply to: Andres Freund (#7)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On Mon, Jan 17, 2022 at 12:31 AM Andres Freund <andres@anarazel.de> wrote:

Hi,

On 2022-01-16 15:28:00 -0800, Andres Freund wrote:

I hacked that up last night. And a fix or two later, it seems to be
working. What I'd missed at first is that the event needs to be reset in
reached_end_position(), otherwise we'll busy loop.

You can create the event with bManualReset set to False to avoid that,
no? With this usecase, I don't really see a reason not to do that
instead?

I wonder if using a short-lived event handle would have dangers of missing
FD_CLOSE here as well? It'd probably be worth avoiding the risk by creating
the event just once.

I just wasn't immediately sure where to stash it. Probably just by adding a
field in StreamCtl, that ReceiveXlogStream() then sets? So far it's constant
once passed to ReceiveXlogStream(), but I don't really see a reason why it'd
need to stay that way?

Oops, attached the patch this time.

Do we really want to create a new event every time? Those are kernel
objects, so they're not entirely free, but that part maybe doesn't
matter. Wouldn't it be cleaner to do it like we do in
pgwin32_waitforsinglesocket() which is create it once and store it in
a static variable? Or is that what you're suggesting above in the "I
wonder if" part?

--
Magnus Hagander
Me: https://www.hagander.net/
Work: https://www.redpill-linpro.com/

#9Andres Freund
andres@anarazel.de
In reply to: Magnus Hagander (#8)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Hi,

On 2022-01-17 14:50:27 +0100, Magnus Hagander wrote:

On Mon, Jan 17, 2022 at 12:31 AM Andres Freund <andres@anarazel.de> wrote:

Hi,

On 2022-01-16 15:28:00 -0800, Andres Freund wrote:

I hacked that up last night. And a fix or two later, it seems to be
working. What I'd missed at first is that the event needs to be reset in
reached_end_position(), otherwise we'll busy loop.

You can create the event with bManualReset set to False to avoid that,
no? With this usecase, I don't really see a reason not to do that
instead?

The problem I'm referring to is that some types of events are edge
triggered. Which we've been painfully reminded of recently:
/messages/by-id/CA+hUKG+OeoETZQ=Qw5Ub5h3tmwQhBmDA=nuNO3KG=zWfUypFAw@mail.gmail.com

It appears there's no guarantee that you'll see e.g. FD_CLOSE if you use
short-lived events (the FD_CLOSE is recorded internally but not signalled
immediately if there's still readable data, and the internal record is reset
by WSAEventSelect()).

I wonder if using a short-lived event handle would have dangers of missing
FD_CLOSE here as well? It'd probably be worth avoiding the risk by creating
the event just once.

I just wasn't immediately sure where to stash it. Probably just by adding a
field in StreamCtl, that ReceiveXlogStream() then sets? So far it's constant
once passed to ReceiveXlogStream(), but I don't really see a reason why it'd
need to stay that way?

Oops, attached the patch this time.

Do we really want to create a new event every time? Those are kernel
objects, so they're not entirely free, but that part maybe doesn't
matter. Wouldn't it be cleaner to do it like we do in
pgwin32_waitforsinglesocket() which is create it once and store it in
a static variable? Or is that what you're suggesting above in the "I
wonder if" part?

Yes, that's what I was suggesting. I wasn't thinking of using a static var,
but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket()
is doing doesn't protect against the problem referenced above, because it
still is reset by WSAEventSelect.

Greetings,

Andres Freund

#10Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#9)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On 2022-01-17 10:06:56 -0800, Andres Freund wrote:

Yes, that's what I was suggesting. I wasn't thinking of using a static var,
but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket()
is doing doesn't protect against the problem referenced above, because it
still is reset by WSAEventSelect.

Do we are about breaking StreamCtl ABI? I don't think so?

#11Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#10)
1 attachment(s)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Hi,

On 2022-01-29 12:44:22 -0800, Andres Freund wrote:

On 2022-01-17 10:06:56 -0800, Andres Freund wrote:

Yes, that's what I was suggesting. I wasn't thinking of using a static var,
but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket()
is doing doesn't protect against the problem referenced above, because it
still is reset by WSAEventSelect.

Do we are about breaking StreamCtl ABI? I don't think so?

Here's a version of the patch only creating the event once. Needs a small bit
of comment polishing, but otherwise I think it's sane?

Greetings,

Andres Freund

Attachments:

v3-0001-Avoid-slow-shutdown-of-pg_basebackup-windows-edit.patchtext/x-diff; charset=us-asciiDownload
From 2c1d67a0b8dff8b6be9683d422d251c937db9121 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 16 Jan 2022 01:58:24 -0800
Subject: [PATCH v3] Avoid slow shutdown of pg_basebackup, windows edition.

See also 7834d20b57a.

Discussion: https://postgr.es/m/20220129204422.ljyxclfy5ubwsciu@alap3.anarazel.de
---
 src/bin/pg_basebackup/pg_basebackup.c | 24 ++++++-
 src/bin/pg_basebackup/pg_receivewal.c |  4 ++
 src/bin/pg_basebackup/receivelog.c    | 96 ++++++++++++++++++++++++---
 src/bin/pg_basebackup/receivelog.h    |  6 ++
 4 files changed, 118 insertions(+), 12 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index c40925c1f04..a539fbe6e0e 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -169,6 +169,8 @@ static const char *progress_filename;
 /* Pipe to communicate with background wal receiver process */
 #ifndef WIN32
 static int	bgpipe[2] = {-1, -1};
+#else
+HANDLE *bgevent = NULL;
 #endif
 
 /* Handle to child process */
@@ -506,7 +508,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 	/*
 	 * At this point we have an end pointer, so compare it to the current
 	 * position to figure out if it's time to stop.
+	 *
+	 * On windows we need to reset the event used to wake up the streaming
+	 * thread, otherwise CopyStreamPoll() will start to immediately return.
 	 */
+#ifdef WIN32
+	ResetEvent(bgevent);
+#endif
+
 	if (segendpos >= xlogendptr)
 		return true;
 
@@ -541,7 +550,7 @@ LogStreamerMain(logstreamer_param *param)
 #ifndef WIN32
 	stream.stop_socket = bgpipe[0];
 #else
-	stream.stop_socket = PGINVALID_SOCKET;
+	stream.stop_event = bgevent;
 #endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
@@ -627,6 +636,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		pg_log_error("could not create pipe for background process: %m");
 		exit(1);
 	}
+#else
+	bgevent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (bgevent == NULL)
+	{
+		pg_log_error("could not create event for background thread: %lu",
+					 GetLastError());
+		exit(1);
+	}
 #endif
 
 	/* Get a second connection */
@@ -2216,7 +2233,9 @@ BaseBackup(void)
 		/*
 		 * On Windows, since we are in the same process, we can just store the
 		 * value directly in the variable, and then set the flag that says
-		 * it's there.
+		 * it's there. To interrupt the thread while it's waiting for network
+		 * IO, we set an event (which the thread waits on in addition to the
+		 * socket).
 		 */
 		if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
 		{
@@ -2226,6 +2245,7 @@ BaseBackup(void)
 		}
 		xlogendptr = ((uint64) hi) << 32 | lo;
 		InterlockedIncrement(&has_xlogendptr);
+		SetEvent(bgevent);
 
 		/* First wait for the thread to exit */
 		if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index ccb215c398c..d27bd85b7ce 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -618,7 +618,11 @@ StreamLog(void)
 					stream.timeline);
 
 	stream.stream_stop = stop_streaming;
+#ifndef WIN32
 	stream.stop_socket = PGINVALID_SOCKET;
+#else
+	stream.stop_event = NULL;
+#endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d39e4b11a1a..cc46de0252e 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,8 +37,8 @@ static bool still_sending = true;	/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 								  XLogRecPtr *stoppos);
-static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
-static int	CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+static int	CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream);
+static int	CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 							  char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
 								int len, XLogRecPtr blockpos, TimestampTz *last_status);
@@ -414,6 +414,27 @@ CheckServerVersionForStreaming(PGconn *conn)
 	return true;
 }
 
+static void
+XLogStreamInit(PGconn *conn, StreamCtl *stream)
+{
+#ifdef WIN32
+	stream->net_event = WSACreateEvent();
+	if (stream->net_event == WSA_INVALID_EVENT)
+	{
+		pg_log_error("failed to create event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	if (WSAEventSelect(PQsocket(conn), stream->net_event, FD_READ | FD_CLOSE) != 0)
+	{
+		pg_log_error("failed to set up event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+#endif
+}
+
 /*
  * Receive a log stream starting at the specified position.
  *
@@ -463,6 +484,8 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	PGresult   *res;
 	XLogRecPtr	stoppos;
 
+	XLogStreamInit(conn, stream);
+
 	/*
 	 * The caller should've checked the server version already, but doesn't do
 	 * any harm to check it here too.
@@ -813,7 +836,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
 												 last_status);
 
-		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
+		r = CopyStreamReceive(conn, sleeptime, stream, &copybuf);
 		while (r != 0)
 		{
 			if (r == -1)
@@ -858,7 +881,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			 * Process the received data, and any subsequent data we can read
 			 * without blocking.
 			 */
-			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
+			r = CopyStreamReceive(conn, 0, stream, &copybuf);
 		}
 	}
 
@@ -877,8 +900,9 @@ error:
  * or interrupted by signal or stop_socket input, and -1 on an error.
  */
 static int
-CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
+CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream)
 {
+#ifndef WIN32
 	int			ret;
 	fd_set		input_mask;
 	int			connsocket;
@@ -896,10 +920,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 	FD_ZERO(&input_mask);
 	FD_SET(connsocket, &input_mask);
 	maxfd = connsocket;
-	if (stop_socket != PGINVALID_SOCKET)
+	if (stream->stop_socket != PGINVALID_SOCKET)
 	{
-		FD_SET(stop_socket, &input_mask);
-		maxfd = Max(maxfd, stop_socket);
+		FD_SET(stream->stop_socket, &input_mask);
+		maxfd = Max(maxfd, stream->stop_socket);
 	}
 
 	if (timeout_ms < 0)
@@ -924,6 +948,58 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 		return 1;				/* Got input on connection socket */
 
 	return 0;					/* Got timeout or input on stop_socket */
+#else
+	int			ret;
+	int			rc;
+	int			nevents = 0;
+	HANDLE		events[2];
+
+
+	events[0] = stream->net_event;
+	nevents++;
+
+	if (stream->stop_event != NULL)
+	{
+		events[1] = stream->stop_event;
+		nevents++;
+	}
+
+	/* map timeout_ms to WaitForMultipleObjects expectations */
+	if (timeout_ms < 0)
+		timeout_ms = INFINITE;
+
+	rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms);
+
+	if (rc == WAIT_FAILED)
+	{
+		pg_log_error("WaitForMultipleObjects() failed: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
+	else if (rc == WAIT_TIMEOUT)
+	{
+		/* timeout exceeded */
+		ret = 0;
+	}
+	else if (rc == WAIT_OBJECT_0)
+	{
+		/* Got input on connection socket */;
+		ret = 1;
+	}
+	else if (rc == (WAIT_OBJECT_0 + 1))
+	{
+		Assert(stream->stop_event != NULL);
+		/* Got event on stop socket  */;
+		ret = 0;
+	}
+	else
+	{
+		pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc);
+		exit(1);
+	}
+
+	return ret;
+#endif /* WIN32 */
 }
 
 /*
@@ -939,7 +1015,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  * -1 on error. -2 if the server ended the COPY.
  */
 static int
-CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 				  char **buffer)
 {
 	char	   *copybuf = NULL;
@@ -960,7 +1036,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
 		 * the specified timeout, so that we can ping the server.  Also stop
 		 * waiting if input appears on stop_socket.
 		 */
-		ret = CopyStreamPoll(conn, timeout, stop_socket);
+		ret = CopyStreamPoll(conn, timeout, stream);
 		if (ret <= 0)
 			return ret;
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 050d4bc69fd..c0b1911a6c1 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -40,8 +40,14 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
+#ifndef WIN32
 	pgsocket	stop_socket;	/* if valid, watch for input on this socket
 								 * and check stream_stop() when there is any */
+#else
+	HANDLE	   *stop_event;		/* on windows, check an event instead */
+
+	HANDLE	   *net_event;		/* event to wait for network IO */
+#endif
 
 	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
-- 
2.34.0

#12Magnus Hagander
magnus@hagander.net
In reply to: Andres Freund (#10)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On Sat, Jan 29, 2022 at 9:44 PM Andres Freund <andres@anarazel.de> wrote:

On 2022-01-17 10:06:56 -0800, Andres Freund wrote:

Yes, that's what I was suggesting. I wasn't thinking of using a static var,
but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket()
is doing doesn't protect against the problem referenced above, because it
still is reset by WSAEventSelect.

Do we are about breaking StreamCtl ABI? I don't think so?

I would say no. It's an internal API and it's not like pg_basebackup
can load plugins.

--
Magnus Hagander
Me: https://www.hagander.net/
Work: https://www.redpill-linpro.com/

#13Magnus Hagander
magnus@hagander.net
In reply to: Andres Freund (#11)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

On Sat, Jan 29, 2022 at 10:47 PM Andres Freund <andres@anarazel.de> wrote:

Hi,

On 2022-01-29 12:44:22 -0800, Andres Freund wrote:

On 2022-01-17 10:06:56 -0800, Andres Freund wrote:

Yes, that's what I was suggesting. I wasn't thinking of using a static var,
but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket()
is doing doesn't protect against the problem referenced above, because it
still is reset by WSAEventSelect.

Do we are about breaking StreamCtl ABI? I don't think so?

Here's a version of the patch only creating the event once. Needs a small bit
of comment polishing, but otherwise I think it's sane?

LGTM in general, yes.

I'm wondering about the part that does:
+       events[0] = stream->net_event;
+       nevents++;
+
+       if (stream->stop_event != NULL)
+       {
+               events[1] = stream->stop_event;
+               nevents++;
+       }
+

Using a combination of nevents but hardcoded indexes does work -- but
only as long as there is only one optional entry. Should they perhaps
be written
+ events[nevents++] = stream->net_event;

instead, for future proofing? But then you'd also have to change the
if() statement on the return side I guess.

Can of course also be changed at such a point where a third event
might be added. Not important, but it poked me in the eye when I was
reading it.

--
Magnus Hagander
Me: https://www.hagander.net/
Work: https://www.redpill-linpro.com/

#14Andres Freund
andres@anarazel.de
In reply to: Magnus Hagander (#13)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Hi,

On 2022-01-30 16:51:12 +0100, Magnus Hagander wrote:

On Sat, Jan 29, 2022 at 10:47 PM Andres Freund <andres@anarazel.de> wrote:

Hi,

On 2022-01-29 12:44:22 -0800, Andres Freund wrote:

On 2022-01-17 10:06:56 -0800, Andres Freund wrote:

Yes, that's what I was suggesting. I wasn't thinking of using a static var,
but putting it in StreamCtl. Note that what pgwin32_waitforsinglesocket()
is doing doesn't protect against the problem referenced above, because it
still is reset by WSAEventSelect.

Do we are about breaking StreamCtl ABI? I don't think so?

Here's a version of the patch only creating the event once. Needs a small bit
of comment polishing, but otherwise I think it's sane?

LGTM in general, yes.

Thanks for checking.

I'm wondering about the part that does:
+       events[0] = stream->net_event;
+       nevents++;
+
+       if (stream->stop_event != NULL)
+       {
+               events[1] = stream->stop_event;
+               nevents++;
+       }
+

Using a combination of nevents but hardcoded indexes does work -- but
only as long as there is only one optional entry. Should they perhaps
be written
+ events[nevents++] = stream->net_event;

instead, for future proofing? But then you'd also have to change the
if() statement on the return side I guess.

I did wonder about it, but the index checks get sufficiently more complicated
that it didn't quite seem worth it. It didn't seem that likely these would get
a third event to check...

I think we're going to have to generalize something like our wait events to be
frontend usable at some point. The proportion and complexity of frontend code
is increasing...

Greetings,

Andres Freund

#15Andres Freund
andres@anarazel.de
In reply to: Andres Freund (#11)
1 attachment(s)
Re: pg_basebackup WAL streamer shutdown is bogus - leading to slow tests

Hi,

On 2022-01-29 13:47:13 -0800, Andres Freund wrote:

Here's a version of the patch only creating the event once. Needs a small bit
of comment polishing, but otherwise I think it's sane?

Ah, it needs a bit more. I was not cleaning up the event at the exit of
ReceiveXlogStream(). For pg_basebackup that perhaps wouldn't matter, but
pg_receivewal loops...

We don't have a good spot for cleaning up right now. ReceiveXlogStream() has
plenty returns. The attached changes those to a goto done; but pretty it is
not. But probably still the best way for the backbranches?

I think the receivelog.c interface probably could do with a bit of
cleanup... The control flow is quite complicated, with repeated checks all
over etc :(. And the whole thing with giving the appearance of being
instantiatable multiple times, but then using global variables for state, is
...

Attached a revised version.

Greetings,

Andres Freund

Attachments:

v3-0001-Avoid-slow-shutdown-of-pg_basebackup-windows-edit.patchtext/x-diff; charset=us-asciiDownload
From 0faf35162c7a1dd1720cfe83e3a7166700148f16 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 16 Jan 2022 01:58:24 -0800
Subject: [PATCH v3 1/3] Avoid slow shutdown of pg_basebackup, windows edition.

See also 7834d20b57a.

Reviewed-By: Magnus Hagander <magnus@hagander.net>
Discussion: https://postgr.es/m/20220129204422.ljyxclfy5ubwsciu@alap3.anarazel.de
---
 src/bin/pg_basebackup/pg_basebackup.c |  24 ++++-
 src/bin/pg_basebackup/pg_receivewal.c |   4 +
 src/bin/pg_basebackup/receivelog.c    | 146 ++++++++++++++++++++++----
 src/bin/pg_basebackup/receivelog.h    |   6 ++
 4 files changed, 159 insertions(+), 21 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index c40925c1f04..9f80cdc4fef 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -169,6 +169,8 @@ static const char *progress_filename;
 /* Pipe to communicate with background wal receiver process */
 #ifndef WIN32
 static int	bgpipe[2] = {-1, -1};
+#else
+HANDLE	   *bgevent = NULL;
 #endif
 
 /* Handle to child process */
@@ -506,7 +508,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 	/*
 	 * At this point we have an end pointer, so compare it to the current
 	 * position to figure out if it's time to stop.
+	 *
+	 * On windows we need to reset the event used to wake up the streaming
+	 * thread, otherwise CopyStreamPoll() will start to immediately return.
 	 */
+#ifdef WIN32
+	ResetEvent(bgevent);
+#endif
+
 	if (segendpos >= xlogendptr)
 		return true;
 
@@ -541,7 +550,7 @@ LogStreamerMain(logstreamer_param *param)
 #ifndef WIN32
 	stream.stop_socket = bgpipe[0];
 #else
-	stream.stop_socket = PGINVALID_SOCKET;
+	stream.stop_event = bgevent;
 #endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
@@ -627,6 +636,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		pg_log_error("could not create pipe for background process: %m");
 		exit(1);
 	}
+#else
+	bgevent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (bgevent == NULL)
+	{
+		pg_log_error("could not create event for background thread: %lu",
+					 GetLastError());
+		exit(1);
+	}
 #endif
 
 	/* Get a second connection */
@@ -2216,7 +2233,9 @@ BaseBackup(void)
 		/*
 		 * On Windows, since we are in the same process, we can just store the
 		 * value directly in the variable, and then set the flag that says
-		 * it's there.
+		 * it's there. To interrupt the thread while it's waiting for network
+		 * IO, we set an event (which the thread waits on in addition to the
+		 * socket).
 		 */
 		if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
 		{
@@ -2226,6 +2245,7 @@ BaseBackup(void)
 		}
 		xlogendptr = ((uint64) hi) << 32 | lo;
 		InterlockedIncrement(&has_xlogendptr);
+		SetEvent(bgevent);
 
 		/* First wait for the thread to exit */
 		if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index ccb215c398c..d27bd85b7ce 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -618,7 +618,11 @@ StreamLog(void)
 					stream.timeline);
 
 	stream.stream_stop = stop_streaming;
+#ifndef WIN32
 	stream.stop_socket = PGINVALID_SOCKET;
+#else
+	stream.stop_event = NULL;
+#endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d39e4b11a1a..b3cb552c3f7 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,8 +37,8 @@ static bool still_sending = true;	/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 								  XLogRecPtr *stoppos);
-static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
-static int	CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+static int	CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream);
+static int	CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 							  char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
 								int len, XLogRecPtr blockpos, TimestampTz *last_status);
@@ -414,6 +414,48 @@ CheckServerVersionForStreaming(PGconn *conn)
 	return true;
 }
 
+/*
+ * Prepare for ReceiveXlogStream() doing its work.
+ *
+ * Right now we just need to prepare an event for CopyStreamPoll() on windows.
+ */
+static void
+XLogStreamPrepare(PGconn *conn, StreamCtl *stream)
+{
+#ifdef WIN32
+	Assert(stream->net_event == NULL);
+	stream->net_event = WSACreateEvent();
+	if (stream->net_event == WSA_INVALID_EVENT)
+	{
+		pg_log_error("failed to create event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	if (WSAEventSelect(PQsocket(conn), stream->net_event, FD_READ | FD_CLOSE) != 0)
+	{
+		pg_log_error("failed to set up event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+#endif
+}
+
+/*
+ * Clean up after ReceiveXlogStream(), undoing XLogStreamPrepare()'s work.
+ */
+static void
+XLogStreamDone(PGconn *conn, StreamCtl *stream)
+{
+#ifdef WIN32
+	Assert(stream->net_event != NULL);
+	/* reset event association for libpq socket, clean up event */
+	WSAEventSelect(PQsocket(conn), NULL, 0);
+	WSACloseEvent(stream->net_event);
+	stream->net_event = NULL;
+#endif
+}
+
 /*
  * Receive a log stream starting at the specified position.
  *
@@ -462,13 +504,16 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	char		slotcmd[128];
 	PGresult   *res;
 	XLogRecPtr	stoppos;
+	bool		ret = false;
+
+	XLogStreamPrepare(conn, stream);
 
 	/*
 	 * The caller should've checked the server version already, but doesn't do
 	 * any harm to check it here too.
 	 */
 	if (!CheckServerVersionForStreaming(conn))
-		return false;
+		goto done;
 
 	/*
 	 * Decide whether we want to report the flush position. If we report the
@@ -506,14 +551,14 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 		if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
 		{
 			pg_free(sysidentifier);
-			return false;
+			goto done;
 		}
 
 		if (strcmp(stream->sysidentifier, sysidentifier) != 0)
 		{
 			pg_log_error("system identifier does not match between base backup and streaming connection");
 			pg_free(sysidentifier);
-			return false;
+			goto done;
 		}
 		pg_free(sysidentifier);
 
@@ -521,7 +566,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 		{
 			pg_log_error("starting timeline %u is not present in the server",
 						 stream->timeline);
-			return false;
+			goto done;
 		}
 	}
 
@@ -549,7 +594,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 				pg_log_error("could not send replication command \"%s\": %s",
 							 "TIMELINE_HISTORY", PQresultErrorMessage(res));
 				PQclear(res);
-				return false;
+				goto done;
 			}
 
 			/*
@@ -575,7 +620,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 		 * callback tells us to stop here.
 		 */
 		if (stream->stream_stop(stream->startpos, stream->timeline, false))
-			return true;
+		{
+			ret = true;
+			goto done;
+		}
 
 		/* Initiate the replication stream at specified location */
 		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
@@ -588,7 +636,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 			pg_log_error("could not send replication command \"%s\": %s",
 						 "START_REPLICATION", PQresultErrorMessage(res));
 			PQclear(res);
-			return false;
+			goto done;
 		}
 		PQclear(res);
 
@@ -672,7 +720,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 			 * complain.
 			 */
 			if (stream->stream_stop(stoppos, stream->timeline, false))
-				return true;
+			{
+				ret = true;
+				goto done;
+			}
 			else
 			{
 				pg_log_error("replication stream was terminated before stop point");
@@ -690,11 +741,15 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	}
 
 error:
+	Assert(ret == false);
 	if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
 		pg_log_error("could not close file \"%s\": %s",
 					 current_walfile_name, stream->walmethod->getlasterror());
 	walfile = NULL;
-	return false;
+
+done:
+	XLogStreamDone(conn, stream);
+	return ret;
 }
 
 /*
@@ -813,7 +868,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
 												 last_status);
 
-		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
+		r = CopyStreamReceive(conn, sleeptime, stream, &copybuf);
 		while (r != 0)
 		{
 			if (r == -1)
@@ -858,7 +913,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			 * Process the received data, and any subsequent data we can read
 			 * without blocking.
 			 */
-			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
+			r = CopyStreamReceive(conn, 0, stream, &copybuf);
 		}
 	}
 
@@ -877,8 +932,9 @@ error:
  * or interrupted by signal or stop_socket input, and -1 on an error.
  */
 static int
-CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
+CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream)
 {
+#ifndef WIN32
 	int			ret;
 	fd_set		input_mask;
 	int			connsocket;
@@ -896,10 +952,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 	FD_ZERO(&input_mask);
 	FD_SET(connsocket, &input_mask);
 	maxfd = connsocket;
-	if (stop_socket != PGINVALID_SOCKET)
+	if (stream->stop_socket != PGINVALID_SOCKET)
 	{
-		FD_SET(stop_socket, &input_mask);
-		maxfd = Max(maxfd, stop_socket);
+		FD_SET(stream->stop_socket, &input_mask);
+		maxfd = Max(maxfd, stream->stop_socket);
 	}
 
 	if (timeout_ms < 0)
@@ -924,6 +980,58 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 		return 1;				/* Got input on connection socket */
 
 	return 0;					/* Got timeout or input on stop_socket */
+#else
+	int			ret;
+	int			rc;
+	int			nevents = 0;
+	HANDLE		events[2];
+
+
+	events[0] = stream->net_event;
+	nevents++;
+
+	if (stream->stop_event != NULL)
+	{
+		events[1] = stream->stop_event;
+		nevents++;
+	}
+
+	/* map timeout_ms to WaitForMultipleObjects expectations */
+	if (timeout_ms < 0)
+		timeout_ms = INFINITE;
+
+	rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms);
+
+	if (rc == WAIT_FAILED)
+	{
+		pg_log_error("WaitForMultipleObjects() failed: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
+	else if (rc == WAIT_TIMEOUT)
+	{
+		/* timeout exceeded */
+		ret = 0;
+	}
+	else if (rc == WAIT_OBJECT_0)
+	{
+		/* Got input on connection socket */
+		ret = 1;
+	}
+	else if (rc == (WAIT_OBJECT_0 + 1))
+	{
+		Assert(stream->stop_event != NULL);
+		/* Got event on stop socket  */
+		ret = 0;
+	}
+	else
+	{
+		pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc);
+		exit(1);
+	}
+
+	return ret;
+#endif							/* WIN32 */
 }
 
 /*
@@ -939,7 +1047,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  * -1 on error. -2 if the server ended the COPY.
  */
 static int
-CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 				  char **buffer)
 {
 	char	   *copybuf = NULL;
@@ -960,7 +1068,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
 		 * the specified timeout, so that we can ping the server.  Also stop
 		 * waiting if input appears on stop_socket.
 		 */
-		ret = CopyStreamPoll(conn, timeout, stop_socket);
+		ret = CopyStreamPoll(conn, timeout, stream);
 		if (ret <= 0)
 			return ret;
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 050d4bc69fd..c0b1911a6c1 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -40,8 +40,14 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
+#ifndef WIN32
 	pgsocket	stop_socket;	/* if valid, watch for input on this socket
 								 * and check stream_stop() when there is any */
+#else
+	HANDLE	   *stop_event;		/* on windows, check an event instead */
+
+	HANDLE	   *net_event;		/* event to wait for network IO */
+#endif
 
 	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
-- 
2.34.0