*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2236,2245 **** include 'filename'
         </listitem>
        </varlistentry>
  
!      <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
!       <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
        <indexterm>
!        <primary><varname>replication_timeout</> configuration parameter</primary>
        </indexterm>
        <listitem>
         <para>
--- 2236,2245 ----
         </listitem>
        </varlistentry>
  
!      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
!       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)</term>
        <indexterm>
!        <primary><varname>wal_sender_timeout</> configuration parameter</primary>
        </indexterm>
        <listitem>
         <para>
***************
*** 2251,2262 **** include 'filename'
          the <filename>postgresql.conf</> file or on the server command line.
          The default value is 60 seconds.
         </para>
-        <para>
-         To prevent connections from being terminated prematurely,
-         <xref linkend="guc-wal-receiver-status-interval">
-         must be enabled on the standby, and its value must be less than the
-         value of <varname>replication_timeout</>.
-        </para>
        </listitem>
       </varlistentry>
  
--- 2251,2256 ----
***************
*** 2474,2484 **** include 'filename'
         the <filename>postgresql.conf</> file or on the server command line.
         The default value is 10 seconds.
        </para>
-       <para>
-        When <xref linkend="guc-replication-timeout"> is enabled on a sending server,
-        <varname>wal_receiver_status_interval</> must be enabled, and its value
-        must be less than the value of <varname>replication_timeout</>.
-       </para>
        </listitem>
       </varlistentry>
  
--- 2468,2473 ----
***************
*** 2507,2512 **** include 'filename'
--- 2496,2520 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-wal-receiver-timeout" xreflabel="wal_receiver_timeout">
+       <term><varname>wal_receiver_timeout</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>wal_receiver_timeout</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Terminate replication connections that are inactive longer
+         than the specified number of milliseconds. This is useful for
+         the receiving standby server to detect a primary node crash or network
+         outage.
+         A value of zero disables the timeout mechanism.  This parameter
+         can only be set in
+         the <filename>postgresql.conf</> file or on the server command line.
+         The default value is 60 seconds.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       </variablelist>
      </sect2>
     </sect1>
*** a/doc/src/sgml/release-9.1.sgml
--- b/doc/src/sgml/release-9.1.sgml
***************
*** 3322,3328 ****
       <listitem>
        <para>
         Add
!        <link linkend="guc-replication-timeout"><varname>replication_timeout</></link>
         setting (Fujii Masao, Heikki Linnakangas)
        </para>
  
--- 3322,3328 ----
       <listitem>
        <para>
         Add
!        <varname>replication_timeout</>
         setting (Fujii Masao, Heikki Linnakangas)
        </para>
  
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 55,60 ****
--- 55,61 ----
  
  /* GUC variables */
  int			wal_receiver_status_interval;
+ int			wal_receiver_timeout;
  bool		hot_standby_feedback;
  
  /* libpqreceiver hooks to these when loaded */
***************
*** 121,127 **** static void WalRcvDie(int code, Datum arg);
  static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
  static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
  static void XLogWalRcvFlush(bool dying);
! static void XLogWalRcvSendReply(void);
  static void XLogWalRcvSendHSFeedback(void);
  static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
  
--- 122,128 ----
  static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
  static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
  static void XLogWalRcvFlush(bool dying);
! static void XLogWalRcvSendReply(bool force, bool requestReply);
  static void XLogWalRcvSendHSFeedback(void);
  static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
  
***************
*** 170,178 **** WalReceiverMain(void)
  {
  	char		conninfo[MAXCONNINFO];
  	XLogRecPtr	startpoint;
- 
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalRcvData *walrcv = WalRcv;
  
  	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
--- 171,180 ----
  {
  	char		conninfo[MAXCONNINFO];
  	XLogRecPtr	startpoint;
  	/* use volatile pointer to prevent code rearrangement */
  	volatile WalRcvData *walrcv = WalRcv;
+ 	TimestampTz last_recv_timestamp;
+ 	bool		ping_sent;
  
  	/*
  	 * WalRcv should be set up already (if we are a backend, we inherit this
***************
*** 282,287 **** WalReceiverMain(void)
--- 284,293 ----
  	MemSet(&reply_message, 0, sizeof(reply_message));
  	MemSet(&feedback_message, 0, sizeof(feedback_message));
  
+ 	/* Initialize the last recv timestamp */
+ 	last_recv_timestamp = GetCurrentTimestamp();
+ 	ping_sent = false;
+ 
  	/* Loop until end-of-streaming or error */
  	for (;;)
  	{
***************
*** 316,330 **** WalReceiverMain(void)
  		/* Wait a while for data to arrive */
  		if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
  		{
  			/* Accept the received data, and process it */
  			XLogWalRcvProcessMsg(type, buf, len);
  
  			/* Receive any more data we can without sleeping */
  			while (walrcv_receive(0, &type, &buf, &len))
  				XLogWalRcvProcessMsg(type, buf, len);
  
  			/* Let the master know that we received some data. */
! 			XLogWalRcvSendReply();
  
  			/*
  			 * If we've written some records, flush them to disk and let the
--- 322,344 ----
  		/* Wait a while for data to arrive */
  		if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
  		{
+ 			/* Something was received from master, so reset timeout */
+ 			last_recv_timestamp = GetCurrentTimestamp();
+ 			ping_sent = false;
+ 
  			/* Accept the received data, and process it */
  			XLogWalRcvProcessMsg(type, buf, len);
  
  			/* Receive any more data we can without sleeping */
  			while (walrcv_receive(0, &type, &buf, &len))
+ 			{
+ 				last_recv_timestamp = GetCurrentTimestamp();
+ 				ping_sent = false;
  				XLogWalRcvProcessMsg(type, buf, len);
+ 			}
  
  			/* Let the master know that we received some data. */
! 			XLogWalRcvSendReply(false, false);
  
  			/*
  			 * If we've written some records, flush them to disk and let the
***************
*** 335,344 **** WalReceiverMain(void)
  		else
  		{
  			/*
! 			 * We didn't receive anything new, but send a status update to the
! 			 * master anyway, to report any progress in applying WAL.
  			 */
! 			XLogWalRcvSendReply();
  			XLogWalRcvSendHSFeedback();
  		}
  	}
--- 349,396 ----
  		else
  		{
  			/*
! 			 * We didn't receive anything new. If we haven't heard anything
! 			 * from the server for more than wal_receiver_timeout / 2,
! 			 * ping the server. Also, if it's been longer than
! 			 * wal_receiver_status_interval since the last update we sent,
! 			 * send a status update to the master anyway, to report any
! 			 * progress in applying WAL.
! 			 */
! 			bool requestReply = false;
! 
! 			/*
! 			 * Check if time since last receive from standby has reached the
! 			 * configured limit.
  			 */
! 			if (wal_receiver_timeout > 0)
! 			{
! 				TimestampTz now = GetCurrentTimestamp();
! 				TimestampTz timeout;
! 
! 				timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
! 													  wal_receiver_timeout);
! 
! 				if (now >= timeout)
! 					ereport(ERROR,
! 							(errmsg("terminating walreceiver due to timeout")));
! 
! 				/*
! 				 * We didn't receive anything new, for half of receiver
! 				 * replication timeout. Ping the server.
! 				 */
! 				if (!ping_sent)
! 				{
! 					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
! 														  (wal_receiver_timeout/2));
! 					if (now >= timeout)
! 					{
! 						requestReply = true;
! 						ping_sent = true;
! 					}
! 				}
! 			}
! 
! 			XLogWalRcvSendReply(requestReply, requestReply);
  			XLogWalRcvSendHSFeedback();
  		}
  	}
***************
*** 460,465 **** XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
--- 512,521 ----
  				memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
  
  				ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+ 
+ 				/* If the primary requested a reply, send one immediately */
+ 				if (keepalive.replyRequested)
+ 					XLogWalRcvSendReply(true, false);
  				break;
  			}
  		default:
***************
*** 609,627 **** XLogWalRcvFlush(bool dying)
  
  		/* Also let the master know that we made some progress */
  		if (!dying)
! 		{
! 			XLogWalRcvSendReply();
! 			XLogWalRcvSendHSFeedback();
! 		}
  	}
  }
  
  /*
!  * Send reply message to primary, indicating our current XLOG positions and
!  * the current time.
   */
  static void
! XLogWalRcvSendReply(void)
  {
  	char		buf[sizeof(StandbyReplyMessage) + 1];
  	TimestampTz now;
--- 665,688 ----
  
  		/* Also let the master know that we made some progress */
  		if (!dying)
! 			XLogWalRcvSendReply(false, false);
  	}
  }
  
  /*
!  * Send reply message to primary, indicating our current XLOG positions, oldest
!  * xmin and the current time.
!  *
!  * If 'force' is not true, the message is not sent unless enough time has
!  * passed since last status update to reach wal_receiver_status_internal (or
!  * if wal_receiver_status_interval is disabled altogether).
!  *
!  * If 'requestReply' is true, requests the server to reply immediately upon
!  * receiving this message. This is used for heartbearts, when approaching
!  * wal_receiver_timeout.
   */
  static void
! XLogWalRcvSendReply(bool force, bool requestReply)
  {
  	char		buf[sizeof(StandbyReplyMessage) + 1];
  	TimestampTz now;
***************
*** 630,636 **** XLogWalRcvSendReply(void)
  	 * If the user doesn't want status to be reported to the master, be sure
  	 * to exit before doing anything at all.
  	 */
! 	if (wal_receiver_status_interval <= 0)
  		return;
  
  	/* Get current timestamp. */
--- 691,697 ----
  	 * If the user doesn't want status to be reported to the master, be sure
  	 * to exit before doing anything at all.
  	 */
! 	if (!force && wal_receiver_status_interval <= 0)
  		return;
  
  	/* Get current timestamp. */
***************
*** 645,651 **** XLogWalRcvSendReply(void)
  	 * this is only for reporting purposes and only on idle systems, that's
  	 * probably OK.
  	 */
! 	if (XLByteEQ(reply_message.write, LogstreamResult.Write)
  		&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
  		&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
  									   wal_receiver_status_interval * 1000))
--- 706,713 ----
  	 * this is only for reporting purposes and only on idle systems, that's
  	 * probably OK.
  	 */
! 	if (!force
! 		&& XLByteEQ(reply_message.write, LogstreamResult.Write)
  		&& XLByteEQ(reply_message.flush, LogstreamResult.Flush)
  		&& !TimestampDifferenceExceeds(reply_message.sendTime, now,
  									   wal_receiver_status_interval * 1000))
***************
*** 656,661 **** XLogWalRcvSendReply(void)
--- 718,724 ----
  	reply_message.flush = LogstreamResult.Flush;
  	reply_message.apply = GetXLogReplayRecPtr(NULL);
  	reply_message.sendTime = now;
+ 	reply_message.replyRequested = requestReply;
  
  	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
  		 (uint32) (reply_message.write >> 32), (uint32) reply_message.write,
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 82,88 **** static bool	replication_started = false; /* Started streaming yet? */
  
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
! int			replication_timeout = 60 * 1000;	/* maximum time to send one
  												 * WAL data message */
  /*
   * State for WalSndWakeupRequest
--- 82,88 ----
  
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
! int			wal_sender_timeout = 60 * 1000;	/* maximum time to send one
  												 * WAL data message */
  /*
   * State for WalSndWakeupRequest
***************
*** 103,117 **** static uint32 sendOff = 0;
   */
  static XLogRecPtr sentPtr = 0;
  
  /*
!  * Buffer for processing reply messages.
   */
! static StringInfoData reply_message;
  
  /*
   * Timestamp of the last receipt of the reply from the standby.
   */
  static TimestampTz last_reply_timestamp;
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
--- 103,122 ----
   */
  static XLogRecPtr sentPtr = 0;
  
+ /* Buffer for processing reply messages. */
+ static StringInfoData reply_message;
  /*
!  * Buffer for constructing outgoing messages
!  * (1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE bytes)
   */
! static char *output_message;
  
  /*
   * Timestamp of the last receipt of the reply from the standby.
   */
  static TimestampTz last_reply_timestamp;
+ /* Have we sent a heartbeat message asking for reply, since last reply? */
+ static bool	ping_sent = false;
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
***************
*** 126,139 **** static void WalSndLastCycleHandler(SIGNAL_ARGS);
  static void WalSndLoop(void) __attribute__((noreturn));
  static void InitWalSenderSlot(void);
  static void WalSndKill(int code, Datum arg);
! static void XLogSend(char *msgbuf, bool *caughtup);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd *cmd);
  static void ProcessStandbyMessage(void);
  static void ProcessStandbyReplyMessage(void);
  static void ProcessStandbyHSFeedbackMessage(void);
  static void ProcessRepliesIfAny(void);
! static void WalSndKeepalive(char *msgbuf);
  
  
  /* Initialize walsender process before entering the main command loop */
--- 131,144 ----
  static void WalSndLoop(void) __attribute__((noreturn));
  static void InitWalSenderSlot(void);
  static void WalSndKill(int code, Datum arg);
! static void XLogSend(bool *caughtup);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd *cmd);
  static void ProcessStandbyMessage(void);
  static void ProcessStandbyReplyMessage(void);
  static void ProcessStandbyHSFeedbackMessage(void);
  static void ProcessRepliesIfAny(void);
! static void WalSndKeepalive(bool requestReply);
  
  
  /* Initialize walsender process before entering the main command loop */
***************
*** 465,471 **** ProcessRepliesIfAny(void)
--- 470,479 ----
  	 * Save the last reply timestamp if we've received at least one reply.
  	 */
  	if (received)
+ 	{
  		last_reply_timestamp = GetCurrentTimestamp();
+ 		ping_sent = false;
+ 	}
  }
  
  /*
***************
*** 527,532 **** ProcessStandbyReplyMessage(void)
--- 535,544 ----
  		 (uint32) (reply.flush >> 32), (uint32) reply.flush,
  		 (uint32) (reply.apply >> 32), (uint32) reply.apply);
  
+ 	/* Send a reply if the standby requested one. */
+ 	if (reply.replyRequested)
+ 		WalSndKeepalive(false);
+ 
  	/*
  	 * Update shared state for this WalSender process based on reply data from
  	 * standby.
***************
*** 620,626 **** ProcessStandbyHSFeedbackMessage(void)
  static void
  WalSndLoop(void)
  {
- 	char	   *output_message;
  	bool		caughtup = false;
  
  	/*
--- 632,637 ----
***************
*** 638,643 **** WalSndLoop(void)
--- 649,655 ----
  
  	/* Initialize the last reply timestamp */
  	last_reply_timestamp = GetCurrentTimestamp();
+ 	ping_sent = false;
  
  	/* Loop forever, unless we get an error */
  	for (;;)
***************
*** 672,678 **** WalSndLoop(void)
  		 * caught up.
  		 */
  		if (!pq_is_send_pending())
! 			XLogSend(output_message, &caughtup);
  		else
  			caughtup = false;
  
--- 684,690 ----
  		 * caught up.
  		 */
  		if (!pq_is_send_pending())
! 			XLogSend(&caughtup);
  		else
  			caughtup = false;
  
***************
*** 708,714 **** WalSndLoop(void)
  			if (walsender_ready_to_stop)
  			{
  				/* ... let's just be real sure we're caught up ... */
! 				XLogSend(output_message, &caughtup);
  				if (caughtup && !pq_is_send_pending())
  				{
  					/* Inform the standby that XLOG streaming is done */
--- 720,726 ----
  			if (walsender_ready_to_stop)
  			{
  				/* ... let's just be real sure we're caught up ... */
! 				XLogSend(&caughtup);
  				if (caughtup && !pq_is_send_pending())
  				{
  					/* Inform the standby that XLOG streaming is done */
***************
*** 738,760 **** WalSndLoop(void)
  
  			if (pq_is_send_pending())
  				wakeEvents |= WL_SOCKET_WRITEABLE;
! 			else if (MyWalSnd->sendKeepalive)
  			{
! 				WalSndKeepalive(output_message);
! 				/* Try to flush pending output to the client */
! 				if (pq_flush_if_writable() != 0)
! 					break;
  			}
  
  			/* Determine time until replication timeout */
! 			if (replication_timeout > 0)
  			{
  				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
! 													  replication_timeout);
! 				sleeptime = 1 + (replication_timeout / 10);
  			}
  
! 			/* Sleep until something happens or replication timeout */
  			ImmediateInterruptOK = true;
  			CHECK_FOR_INTERRUPTS();
  			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
--- 750,783 ----
  
  			if (pq_is_send_pending())
  				wakeEvents |= WL_SOCKET_WRITEABLE;
! 			else if (wal_sender_timeout > 0 && !ping_sent)
  			{
! 				/*
! 				 * If half of wal_sender_timeout has lapsed without receiving
! 				 * any reply from standby, send a keep-alive message to standby
! 				 * requesting an immediate reply.
! 				 */
! 				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
! 													  wal_sender_timeout / 2);
! 				if (GetCurrentTimestamp() >= timeout)
! 				{
! 					WalSndKeepalive(true);
! 					ping_sent = true;
! 					/* Try to flush pending output to the client */
! 					if (pq_flush_if_writable() != 0)
! 						break;
! 				}
  			}
  
  			/* Determine time until replication timeout */
! 			if (wal_sender_timeout > 0)
  			{
  				timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
! 													  wal_sender_timeout);
! 				sleeptime = 1 + (wal_sender_timeout / 10);
  			}
  
! 			/* Sleep until something happens or we time out */
  			ImmediateInterruptOK = true;
  			CHECK_FOR_INTERRUPTS();
  			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
***************
*** 766,773 **** WalSndLoop(void)
  			 * possibility that the client replied just as we reached the
  			 * timeout ... he's supposed to reply *before* that.
  			 */
! 			if (replication_timeout > 0 &&
! 				GetCurrentTimestamp() >= timeout)
  			{
  				/*
  				 * Since typically expiration of replication timeout means
--- 789,795 ----
  			 * possibility that the client replied just as we reached the
  			 * timeout ... he's supposed to reply *before* that.
  			 */
! 			if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
  			{
  				/*
  				 * Since typically expiration of replication timeout means
***************
*** 1016,1030 **** retry:
   * but not yet sent to the client, and buffer it in the libpq output
   * buffer.
   *
-  * msgbuf is a work area in which the output message is constructed.  It's
-  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
-  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
-  *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   */
  static void
! XLogSend(char *msgbuf, bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
--- 1038,1048 ----
   * but not yet sent to the client, and buffer it in the libpq output
   * buffer.
   *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   */
  static void
! XLogSend(bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
***************
*** 1107,1119 **** XLogSend(char *msgbuf, bool *caughtup)
  	/*
  	 * OK to read and send the slice.
  	 */
! 	msgbuf[0] = 'w';
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
--- 1125,1137 ----
  	/*
  	 * OK to read and send the slice.
  	 */
! 	output_message[0] = 'w';
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(output_message + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
***************
*** 1123,1131 **** XLogSend(char *msgbuf, bool *caughtup)
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
  
! 	pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
  
  	sentPtr = endptr;
  
--- 1141,1149 ----
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(output_message + 1, &msghdr, sizeof(WalDataMessageHeader));
  
! 	pq_putmessage_noblock('d', output_message, 1 + sizeof(WalDataMessageHeader) + nbytes);
  
  	sentPtr = endptr;
  
***************
*** 1492,1512 **** pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
  	return (Datum) 0;
  }
  
  static void
! WalSndKeepalive(char *msgbuf)
  {
  	PrimaryKeepaliveMessage keepalive_message;
  
  	/* Construct a new message */
  	keepalive_message.walEnd = sentPtr;
  	keepalive_message.sendTime = GetCurrentTimestamp();
  
  	elog(DEBUG2, "sending replication keepalive");
  
  	/* Prepend with the message type and send it. */
! 	msgbuf[0] = 'k';
! 	memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
! 	pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
  }
  
  /*
--- 1510,1536 ----
  	return (Datum) 0;
  }
  
+ /*
+   * This function is used to send keepalive message to standby.
+   * If requestReply is set, sets a flag in the message requesting the standby
+   * to send a message back to us, for heartbeat purposes.
+   */
  static void
! WalSndKeepalive(bool requestReply)
  {
  	PrimaryKeepaliveMessage keepalive_message;
  
  	/* Construct a new message */
  	keepalive_message.walEnd = sentPtr;
  	keepalive_message.sendTime = GetCurrentTimestamp();
+ 	keepalive_message.replyRequested = requestReply;
  
  	elog(DEBUG2, "sending replication keepalive");
  
  	/* Prepend with the message type and send it. */
! 	output_message[0] = 'k';
! 	memcpy(output_message + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
! 	pq_putmessage_noblock('d', output_message, sizeof(PrimaryKeepaliveMessage) + 1);
  }
  
  /*
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1596,1601 **** static struct config_int ConfigureNamesInt[] =
--- 1596,1612 ----
  	},
  
  	{
+ 		{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
+ 			gettext_noop("Sets the maximum wait time to receive data from master."),
+ 			NULL,
+ 			GUC_UNIT_MS
+ 		},
+ 		&wal_receiver_timeout,
+ 		60 * 1000, 0, INT_MAX,
+ 		NULL, NULL, NULL
+ 	},
+ 
+ 	{
  		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
  			gettext_noop("Sets the maximum number of concurrent connections."),
  			NULL
***************
*** 2019,2030 **** static struct config_int ConfigureNamesInt[] =
  	},
  
  	{
! 		{"replication_timeout", PGC_SIGHUP, REPLICATION_SENDING,
  			gettext_noop("Sets the maximum time to wait for WAL replication."),
  			NULL,
  			GUC_UNIT_MS
  		},
! 		&replication_timeout,
  		60 * 1000, 0, INT_MAX,
  		NULL, NULL, NULL
  	},
--- 2030,2041 ----
  	},
  
  	{
! 		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
  			gettext_noop("Sets the maximum time to wait for WAL replication."),
  			NULL,
  			GUC_UNIT_MS
  		},
! 		&wal_sender_timeout,
  		60 * 1000, 0, INT_MAX,
  		NULL, NULL, NULL
  	},
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 210,216 ****
  #max_wal_senders = 0		# max number of walsender processes
  				# (change requires restart)
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
! #replication_timeout = 60s	# in milliseconds; 0 disables
  
  # - Master Server -
  
--- 210,216 ----
  #max_wal_senders = 0		# max number of walsender processes
  				# (change requires restart)
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
! #wal_sender_timeout = 60s	# in milliseconds; 0 disables
  
  # - Master Server -
  
***************
*** 237,242 ****
--- 237,245 ----
  					# 0 disables
  #hot_standby_feedback = off		# send info from standby to prevent
  					# query conflicts
+ #wal_receiver_timeout = 60s		# time that receiver waits for
+ 					# communication from master
+ 					# in milliseconds; 0 disables
  
  
  #------------------------------------------------------------------------------
*** a/src/include/replication/walprotocol.h
--- b/src/include/replication/walprotocol.h
***************
*** 27,32 **** typedef struct
--- 27,38 ----
  
  	/* Sender's system clock at the time of transmission */
  	TimestampTz sendTime;
+ 
+ 	/*
+ 	 * If replyRequested is set, the client should reply immediately to this
+ 	 * message, to avoid a timeout disconnect.
+ 	 */
+ 	bool		replyRequested;
  } WalSndrMessage;
  
  
***************
*** 80,85 **** typedef struct
--- 86,97 ----
  
  	/* Sender's system clock at the time of transmission */
  	TimestampTz sendTime;
+ 
+ 	/*
+ 	 * If replyRequested is set, the server should reply immediately to this
+ 	 * message, to avoid a timeout disconnect.
+ 	 */
+ 	bool		replyRequested;
  } StandbyReplyMessage;
  
  /*
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 17,23 ****
--- 17,25 ----
  #include "storage/spin.h"
  #include "pgtime.h"
  
+ /* user-settable parameters */
  extern int	wal_receiver_status_interval;
+ extern int	wal_receiver_timeout;
  extern bool hot_standby_feedback;
  
  /*
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 24,30 **** extern bool wake_wal_senders;
  
  /* user-settable parameters */
  extern int	max_wal_senders;
! extern int	replication_timeout;
  
  extern void InitWalSender(void);
  extern void exec_replication_command(const char *query_string);
--- 24,30 ----
  
  /* user-settable parameters */
  extern int	max_wal_senders;
! extern int	wal_sender_timeout;
  
  extern void InitWalSender(void);
  extern void exec_replication_command(const char *query_string);
*** a/src/include/replication/walsender_private.h
--- b/src/include/replication/walsender_private.h
***************
*** 37,43 **** typedef struct WalSnd
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
  	bool		needreload;		/* does currently-open file need to be
  								 * reloaded? */
- 	bool		sendKeepalive;	/* do we send keepalives on this connection? */
  
  	/*
  	 * The xlog locations that have been written, flushed, and applied by
--- 37,42 ----
