diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e0ebee6..3192ef7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2019,6 +2019,28 @@ SET ENABLE_SEQSCAN TO OFF;
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+      <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+      <indexterm>
+       <primary><varname>replication_timeout</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies the maximum time, in milliseconds, to wait for the reply
+        from the standby before terminating replication.  This is useful for
+        the primary server to detect the standby crash or network outage.
+        A value of zero turns this off.  This parameter can only be set in
+        the <filename>postgresql.conf</> file or on the server command line.
+        The default value is 60 seconds.
+       </para>
+       <para>
+        To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+        must be enabled on the standby, and its value must be less than the
+        value of <varname>replication_timeout</>.
+       </para>
+      </listitem>
+     </varlistentry>
      </variablelist>
     </sect2>
 
@@ -2216,6 +2238,11 @@ SET ENABLE_SEQSCAN TO OFF;
        the <filename>postgresql.conf</> file or on the server command line.
        The default value is 10 seconds.
       </para>
+      <para>
+       When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+       <varname>wal_receiver_status_interval</> must be enabled, and its value
+       must be less than the value of <varname>replication_timeout</>.
+      </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 3c7b05b..b6dc8cc 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -56,9 +56,11 @@
  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
  *		pq_flush		- flush pending output
  *		pq_getbyte_if_available - get a byte if available without blocking
+ *		pq_flush_if_writable	- flush pending output if writable without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
+ *		pq_putmessage_noblock - buffer a normal message without blocking (suppressed in COPY OUT mode)
  *		pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  *		pq_endcopyout	- end a COPY OUT transfer
  *
@@ -92,6 +94,7 @@
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
+#include "utils/memutils.h"
 
 /*
  * Configuration options
@@ -108,12 +111,15 @@ static char sock_path[MAXPGPATH];
  * Buffers for low-level I/O
  */
 
-#define PQ_BUFFER_SIZE 8192
+#define PQ_SEND_BUFFER_SIZE 8192
+#define PQ_RECV_BUFFER_SIZE 8192
 
-static char PqSendBuffer[PQ_BUFFER_SIZE];
+static char *PqSendBuffer;
+static int	PqSendBufferSize;
 static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
+static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
 
-static char PqRecvBuffer[PQ_BUFFER_SIZE];
+static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
@@ -142,7 +148,9 @@ static int	Setup_AF_UNIX(void);
 void
 pq_init(void)
 {
-	PqSendPointer = PqRecvPointer = PqRecvLength = 0;
+	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
+	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
+	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
 	DoingCopyOut = false;
 	on_proc_exit(pq_close, 0);
@@ -762,7 +770,7 @@ pq_recvbuf(void)
 		int			r;
 
 		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_BUFFER_SIZE - PqRecvLength);
+						PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
@@ -1138,10 +1146,10 @@ internal_putbytes(const char *s, size_t len)
 	while (len > 0)
 	{
 		/* If buffer is full, then flush it out */
-		if (PqSendPointer >= PQ_BUFFER_SIZE)
+		if (PqSendPointer >= PqSendBufferSize)
 			if (internal_flush())
 				return EOF;
-		amount = PQ_BUFFER_SIZE - PqSendPointer;
+		amount = PqSendBufferSize - PqSendPointer;
 		if (amount > len)
 			amount = len;
 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
@@ -1172,12 +1180,19 @@ pq_flush(void)
 	return res;
 }
 
+/* --------------------------------
+ *		internal_flush - flush pending output
+ *
+ * Returns 0 if OK (meaning everything was sent, or operation would block
+ * and the socket is in non-blocking mode), or EOF if trouble.
+ * --------------------------------
+ */
 static int
 internal_flush(void)
 {
 	static int	last_reported_send_errno = 0;
 
-	char	   *bufptr = PqSendBuffer;
+	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
 	while (bufptr < bufend)
@@ -1192,6 +1207,16 @@ internal_flush(void)
 				continue;		/* Ok if we were interrupted */
 
 			/*
+			 * Ok if no data writable without blocking, and the socket
+			 * is in non-blocking mode.
+			 */
+			if (errno == EAGAIN ||
+				errno == EWOULDBLOCK)
+			{
+				return 0;
+			}
+
+			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
 			 * dump!  This message must go *only* to the postmaster log.
@@ -1212,18 +1237,74 @@ internal_flush(void)
 			 * We drop the buffered data anyway so that processing can
 			 * continue, even though we'll probably quit soon.
 			 */
-			PqSendPointer = 0;
+			PqSendStart = PqSendPointer = 0;
 			return EOF;
 		}
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
+		PqSendStart += r;
 	}
 
-	PqSendPointer = 0;
+	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
 
+/* --------------------------------
+ *		pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 0 if OK, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+	int			res;
+
+	/* Quick exit if nothing to do */
+	if (PqSendPointer == PqSendStart)
+		return 0;
+
+	/* No-op if reentrant call */
+	if (PqCommBusy)
+		return 0;
+
+	PqCommBusy = true;
+
+	/* Temporarily put the socket into non-blocking mode */
+#ifdef WIN32
+	pgwin32_noblock = 1;
+#else
+	if (!pg_set_noblock(MyProcPort->sock))
+		ereport(ERROR,
+				(errmsg("could not set socket to non-blocking mode: %m")));
+#endif
+	MyProcPort->noblock = true;
+
+	res = internal_flush();
+
+#ifdef WIN32
+	pgwin32_noblock = 0;
+#else
+	if (!pg_set_block(MyProcPort->sock))
+		ereport(FATAL,
+				(errmsg("could not set socket to blocking mode: %m")));
+#endif
+	MyProcPort->noblock = false;
+
+	PqCommBusy = false;
+	return res;
+}
+
+/* --------------------------------
+ *		pq_is_send_pending	- is there any pending data in the output buffer?
+ * --------------------------------
+ */
+bool
+pq_is_send_pending(void)
+{
+	return (PqSendStart < PqSendPointer);
+}
 
 /* --------------------------------
  * Message-level I/O routines begin here.
@@ -1286,6 +1367,25 @@ fail:
 }
 
 /* --------------------------------
+ *		pq_putmessage_noblock	- like pq_putmessage, but never blocks
+ *
+ *		If the output buffer is too small to hold the message, the buffer
+ *		is enlarged.
+ */
+int
+pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+	int required = PqSendPointer + len + 5 ;
+	if (required > PqSendBufferSize)
+	{
+		PqSendBuffer = repalloc(PqSendBuffer, required);
+		PqSendBufferSize = required;
+	}
+	return pq_putmessage(msgtype, s, len);
+}
+
+
+/* --------------------------------
  *		pq_startcopyout - inform libpq that an old-style COPY OUT transfer
  *			is beginning
  * --------------------------------
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index a4f559e..32d0cb5 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 /*
  * Like WaitLatch, but will also return when there's data available in
- * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
- * was set, or 2 if the scoket became readable.
+ * 'sock' for reading or writing. Returns 0 if timeout was reached,
+ * 1 if the latch was set, 2 if the socket became readable or writable.
  */
 int
-WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
+				  bool forWrite, long timeout)
 {
 	struct timeval tv, *tvp = NULL;
 	fd_set		input_mask;
+	fd_set		output_mask;
 	int			rc;
 	int			result = 0;
 
@@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
 		FD_ZERO(&input_mask);
 		FD_SET(selfpipe_readfd, &input_mask);
 		hifd = selfpipe_readfd;
-		if (sock != PGINVALID_SOCKET)
+		if (sock != PGINVALID_SOCKET && forRead)
 		{
 			FD_SET(sock, &input_mask);
 			if (sock > hifd)
 				hifd = sock;
 		}
 
-		rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+		FD_ZERO(&output_mask);
+		if (sock != PGINVALID_SOCKET && forWrite)
+		{
+			FD_SET(sock, &output_mask);
+			if (sock > hifd)
+				hifd = sock;
+		}
+
+		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
 		if (rc < 0)
 		{
 			if (errno == EINTR)
@@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
 			result = 0;
 			break;
 		}
-		if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+		if (sock != PGINVALID_SOCKET &&
+			((forRead && FD_ISSET(sock, &input_mask)) ||
+			 (forWrite && FD_ISSET(sock, &output_mask))))
 		{
 			result = 2;
 			break;		/* data available in socket */
diff --git a/src/backend/port/win32/socket.c b/src/backend/port/win32/socket.c
index 76dd6be..dbbd4a3 100644
--- a/src/backend/port/win32/socket.c
+++ b/src/backend/port/win32/socket.c
@@ -14,7 +14,8 @@
 #include "postgres.h"
 
 /*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
  *
  * Since the socket emulation layer always sets the actual socket to
  * non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
 			return -1;
 		}
 
+		if (pgwin32_noblock)
+		{
+			/*
+			 * No data sent, and we are in "emulated non-blocking mode", so
+			 * return indicating that we'd block if we were to continue.
+			 */
+			errno = EWOULDBLOCK;
+			return -1;
+		}
+
 		/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
 
 		if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index ac20c49..f42cfef 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
 bool
 WaitLatch(volatile Latch *latch, long timeout)
 {
-	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
 }
 
 int
-WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
+				  bool forWrite, long timeout)
 {
 	DWORD		rc;
 	HANDLE		events[3];
@@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 	events[0] = latchevent;
 	events[1] = pgwin32_signal_event;
 	numevents = 2;
-	if (sock != PGINVALID_SOCKET)
+	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
 	{
+		int		flags = 0;
+
+		if (forRead)
+			flags |= FD_READ;
+		if (forWrite)
+			flags |= FD_WRITE;
+
 		sockevent = WSACreateEvent();
-		WSAEventSelect(sock, sockevent, FD_READ);
+		WSAEventSelect(sock, sockevent, flags);
 		events[numevents++] = sockevent;
 	}
 
@@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 			pgwin32_dispatch_queued_signals();
 		else if (rc == WAIT_OBJECT_0 + 2)
 		{
+			WSANETWORKEVENTS resEvents;
+
 			Assert(sock != PGINVALID_SOCKET);
-			result = 2;
+
+			ZeroMemory(&resEvents, sizeof(resEvents));
+			if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
+				ereport(FATAL,
+						(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
+
+			if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
+				(forWrite && resEvents.lNetworkEvents & FD_WRITE))
+				result = 2;
 			break;
 		}
 		else if (rc != WAIT_OBJECT_0)
@@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
 	}
 
 	/* Clean up the handle we created for the socket */
-		if (sock != PGINVALID_SOCKET)
+	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
 	{
 		WSAEventSelect(sock, sockevent, 0);
 		WSACloseEvent(sockevent);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f76b5b0..36406d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -74,6 +74,7 @@ bool		am_walsender = false;		/* Am I a walsender process ? */
 /* User-settable parameters for walsender */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
 int			WalSndDelay = 1000;	/* max sleep time between some actions */
+int			replication_timeout = 60 * 1000;	/* maximum time to send one WAL data message */
 
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
  */
 static StringInfoData reply_message;
 
+/*
+ * Timestamp of the last receipt of the reply from the standby.
+ */
+static TimestampTz last_reply_timestamp;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
 volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -113,7 +119,7 @@ static int	WalSndLoop(void);
 static void InitWalSnd(void);
 static void WalSndHandshake(void);
 static void WalSndKill(int code, Datum arg);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
 static void StartReplication(StartReplicationCmd * cmd);
 static void ProcessStandbyMessage(void);
@@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
 {
 	unsigned char firstchar;
 	int			r;
+	int		received = false;
 
 	for (;;)
 	{
@@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
 		if (r == 0)
 		{
 			/* no data available without blocking */
-			return;
+			break;
 		}
 
 		/* Handle the very limited subset of commands expected in this phase */
@@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
 				 */
 			case 'd':
 				ProcessStandbyMessage();
+				received = true;
 				break;
 
 				/*
@@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
 								firstchar)));
 		}
 	}
+	/*
+	 * Save the last reply timestamp if we've received at least
+	 * one reply.
+	 */
+	if (received)
+		last_reply_timestamp = GetCurrentTimestamp();
 }
 
 /*
@@ -688,6 +702,9 @@ WalSndLoop(void)
 	 */
 	initStringInfo(&reply_message);
 
+	/* Initialize the last reply timestamp */
+	last_reply_timestamp = GetCurrentTimestamp();
+
 	/* Loop forever, unless we get an error */
 	for (;;)
 	{
@@ -706,19 +723,6 @@ WalSndLoop(void)
 			SyncRepInitConfig();
 		}
 
-		/*
-		 * When SIGUSR2 arrives, we send all outstanding logs up to the
-		 * shutdown checkpoint record (i.e., the latest record) and exit.
-		 */
-		if (walsender_ready_to_stop)
-		{
-			if (!XLogSend(output_message, &caughtup))
-				break;
-			ProcessRepliesIfAny();
-			if (caughtup)
-				walsender_shutdown_requested = true;
-		}
-
 		/* Normal exit from the walsender is here */
 		if (walsender_shutdown_requested)
 		{
@@ -730,11 +734,13 @@ WalSndLoop(void)
 		}
 
 		/*
-		 * If we had sent all accumulated WAL in last round, nap for the
-		 * configured time before retrying.
+		 * If we don't have any pending data in the output buffer, try to
+		 * send some more.
 		 */
-		if (caughtup)
+		if (!pq_is_send_pending())
 		{
+			XLogSend(output_message, &caughtup);
+
 			/*
 			 * Even if we wrote all the WAL that was available when we started
 			 * sending, more might have arrived while we were sending this
@@ -742,28 +748,79 @@ WalSndLoop(void)
 			 * received any signals from that time. Let's arm the latch
 			 * again, and after that check that we're still up-to-date.
 			 */
-			ResetLatch(&MyWalSnd->latch);
-
-			if (!XLogSend(output_message, &caughtup))
-				break;
-			if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
+			if (caughtup && !pq_is_send_pending())
 			{
-				/*
-				 * XXX: We don't really need the periodic wakeups anymore,
-				 * WaitLatchOrSocket should reliably wake up as soon as
-				 * something interesting happens.
-				 */
+				ResetLatch(&MyWalSnd->latch);
 
-				/* Sleep */
-				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
-								  WalSndDelay * 1000L);
+				XLogSend(output_message, &caughtup);
 			}
 		}
-		else
+
+		/* Flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			break;
+
+		/*
+		 * When SIGUSR2 arrives, we send any outstanding logs up to the
+		 * shutdown checkpoint record (i.e., the latest record) and exit.
+		 */
+		if (walsender_ready_to_stop && !pq_is_send_pending())
 		{
-			/* Attempt to send the log once every loop */
-			if (!XLogSend(output_message, &caughtup))
+			XLogSend(output_message, &caughtup);
+			ProcessRepliesIfAny();
+			if (caughtup && !pq_is_send_pending())
+				walsender_shutdown_requested = true;
+		}
+
+		if ((caughtup || pq_is_send_pending()) &&
+			!got_SIGHUP &&
+			!walsender_shutdown_requested)
+		{
+			TimestampTz	finish_time;
+			long		sleeptime;
+
+			/* Reschedule replication timeout */
+			if (replication_timeout > 0)
+			{
+				long		secs;
+				int		usecs;
+
+				finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+														  replication_timeout);
+				TimestampDifference(GetCurrentTimestamp(),
+									finish_time, &secs, &usecs);
+				sleeptime = secs * 1000 + usecs / 1000;
+				if (WalSndDelay < sleeptime)
+					sleeptime = WalSndDelay;
+			}
+			else
+			{
+				/*
+				 * XXX: Without timeout, we don't really need the periodic
+				 * wakeups anymore, WaitLatchOrSocket should reliably wake up
+				 * as soon as something interesting happens.
+				 */
+				sleeptime = WalSndDelay;
+			}
+
+			/* Sleep */
+			WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+							  true, pq_is_send_pending(),
+							  sleeptime * 1000L);
+
+			/* Check for replication timeout */
+			if (replication_timeout > 0 &&
+				GetCurrentTimestamp() >= finish_time)
+			{
+				/*
+				 * Since typically expiration of replication timeout means
+				 * communication problem, we don't send the error message
+				 * to the standby.
+				 */
+				ereport(COMMERROR,
+						(errmsg("terminating walsender process due to replication timeout")));
 				break;
+			}
 		}
 
 		/*
@@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
 
 /*
  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
- * but not yet sent to the client, and send it.
+ * but not yet sent to the client, and buffer it in the libpq output
+ * buffer.
  *
  * msgbuf is a work area in which the output message is constructed.  It's
  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  *
  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
  * *caughtup is set to false.
- *
- * Returns true if OK, false if trouble.
+
  */
-static bool
+static void
 XLogSend(char *msgbuf, bool *caughtup)
 {
 	XLogRecPtr	SendRqstPtr;
@@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 	if (XLByteLE(SendRqstPtr, sentPtr))
 	{
 		*caughtup = true;
-		return true;
+		return;
 	}
 
 	/*
@@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 
 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
 
-	pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
-
-	/* Flush pending output to the client */
-	if (pq_flush())
-		return false;
+	pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
 
 	sentPtr = endptr;
 
@@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
 		set_ps_display(activitymsg, false);
 	}
 
-	return true;
+	return;
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 9ca1329..b49bdae 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1856,6 +1856,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+			gettext_noop("Sets the maximum time to wait for WAL replication."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&replication_timeout,
+		60 * 1000, 0, INT_MAX, NULL, NULL
+	},
+
+	{
 		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
 						 "flushing WAL to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ed70223..4348185 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -200,6 +200,7 @@
 #wal_sender_delay = 1s		# walsender cycle time, 1-10000 milliseconds
 #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
 #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+#replication_timeout = 60s # in milliseconds, 0 is disabled
 
 # - Standby Servers -
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 8ecab6d..b20b0c2 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -60,7 +60,10 @@ extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
 extern int	pq_flush(void);
+extern int	pq_flush_if_writable(void);
+extern bool	pq_is_send_pending(void);
 extern int	pq_putmessage(char msgtype, const char *s, size_t len);
+extern int	pq_putmessage_noblock(char msgtype, const char *s, size_t len);
 extern void pq_startcopyout(void);
 extern void pq_endcopyout(bool errorAbort);
 
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 150a71f..2670a2e 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
 /* user-settable parameters */
 extern int	WalSndDelay;
 extern int	max_wal_senders;
+extern int	replication_timeout;
 
 extern int	WalSenderMain(void);
 extern void WalSndSignals(void);
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 31744ff..f64e13b 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
 extern void DisownLatch(volatile Latch *latch);
 extern bool WaitLatch(volatile Latch *latch, long timeout);
 extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
-				  long timeout);
+				  bool forRead, bool forWrite, long timeout);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
