diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f3082c3..6b3abc5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -217,7 +217,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static bool XLogRead(char *buf, XLogRecPtr startptr, Size count, bool noutfoundok);
 
 
 /* Initialize walsender process before entering the main command loop */
@@ -774,7 +774,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 		count = flushptr - targetPagePtr;
 
 	/* now actually read the data, we know it's there */
-	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, false);
 
 	return count;
 }
@@ -1551,8 +1551,9 @@ static void
 ProcessStandbyReplyMessage(void)
 {
 	XLogRecPtr	writePtr,
-				flushPtr,
-				applyPtr;
+				flushPtr, oldFlushPtr,
+				applyPtr,
+				keepPtr;
 	bool		replyRequested;
 
 	/* the caller already consumed the msgtype byte */
@@ -1580,24 +1581,99 @@ ProcessStandbyReplyMessage(void)
 		WalSnd	   *walsnd = MyWalSnd;
 
 		SpinLockAcquire(&walsnd->mutex);
+		keepPtr = walsnd->keep;
+		oldFlushPtr = walsnd->flush;
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
 		SpinLockRelease(&walsnd->mutex);
 	}
 
+	/*
+	 * If we are managed by a replication slot, maintain keepPtr on the page
+	 * where the first fragment of the continuation record at flushPtr. Since
+	 * this doesn't look into individual record, keepPtr may stay a bit too
+	 * behind.
+	 */
+	if (MyReplicationSlot &&
+		flushPtr != InvalidXLogRecPtr && oldFlushPtr != InvalidXLogRecPtr)
+	{
+		/*
+		 * If keepPtr is cathing up, we do nothing until the next segment
+		 * comes. Otherwise check on every page boundary.
+		 */
+		if (oldKeepPtr == InvalidXLogRecPtr ?
+			keepPtr / XLOG_SEG_SIZE != flushPtr / XLOG_SEG_SIZE :
+			keepPtr / XLOG_BLCKSZ != flushPtr / XLOG_BLCKSZ)
+		{
+			XLogRecPtr oldKeepPtr = keepPtr;
+			XLogRecPtr rp;
+
+			if (keepPtr == InvalidXLogRecPtr)
+				keepPtr = oldFlushPtr;
+
+			rp = keepPtr - (keepPtr % XLOG_BLCKSZ);
+
+			/* We may have the record at flushPtr, so it's worth looking */
+			while (rp <= flushPtr)
+			{
+				XLogPageHeaderData header;
+
+				/*
+				 * If we don't have enough wal data, don't move keepPtr
+				 * forward. We may read it by the next chance.
+				 */
+				if(sentPtr - rp >= sizeof(XLogPageHeaderData))
+				{
+					bool found;
+					/*
+					 * Fetch the page header of the next page. Move keepPtr
+					 * forward only if when it is not a continuing page.
+					 */
+					found = XLogRead((char *)&header,
+									 rp, sizeof(XLogPageHeaderData), true);
+					if (found &&
+						(header.xlp_info & XLP_FIRST_IS_CONTRECORD) == 0)
+						keepPtr = rp;
+				}
+				rp += XLOG_BLCKSZ;
+			}
+
+			/*
+			 * If keepPtr is on the same page with flushPtr, it means catching
+			 * up
+			 */
+			if (keepPtr / XLOG_BLCKSZ == flushPtr / XLOG_BLCKSZ)
+				keepPtr = InvalidXLogRecPtr;
+
+			if (oldKeepPtr != keepPtr)
+			{
+				WalSnd	   *walsnd = MyWalSnd;
+				elog(LOG, "%lX => %lX / %lX", oldKeepPtr, keepPtr, flushPtr); 
+				SpinLockAcquire(&walsnd->mutex);
+				walsnd->keep = keepPtr;
+				SpinLockRelease(&walsnd->mutex);
+			}
+		}
+	}
+
 	if (!am_cascading_walsender)
 		SyncRepReleaseWaiters();
 
 	/*
 	 * Advance our local xmin horizon when the client confirmed a flush.
 	 */
-	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
+	if (MyReplicationSlot)
 	{
-		if (SlotIsLogical(MyReplicationSlot))
+		if (SlotIsLogical(MyReplicationSlot) && flushPtr != InvalidXLogRecPtr)
 			LogicalConfirmReceivedLocation(flushPtr);
 		else
-			PhysicalConfirmReceivedLocation(flushPtr);
+		{
+			/* keepPtr == InvalidXLogRecPtr means catching up */
+			if (keepPtr == InvalidXLogRecPtr)
+				keepPtr = flushPtr;
+			PhysicalConfirmReceivedLocation(keepPtr);
+		}
 	}
 }
 
@@ -2019,6 +2095,7 @@ WalSndKill(int code, Datum arg)
 
 /*
  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
+ * Returns false if the segment file is not found iff notfoundok is true.
  *
  * XXX probably this should be improved to suck data directly from the
  * WAL buffers when possible.
@@ -2028,8 +2105,8 @@ WalSndKill(int code, Datum arg)
  * always be one descriptor left open until the process ends, but never
  * more than one.
  */
-static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+static bool
+XLogRead(char *buf, XLogRecPtr startptr, Size count, bool notfoundok)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -2106,10 +2183,15 @@ retry:
 				 * removed or recycled.
 				 */
 				if (errno == ENOENT)
+				{
+					if (notfoundok)
+						return false;
+
 					ereport(ERROR,
 							(errcode_for_file_access(),
 							 errmsg("requested WAL segment %s has already been removed",
 								XLogFileNameP(curFileTimeLine, sendSegNo))));
+				}
 				else
 					ereport(ERROR,
 							(errcode_for_file_access(),
@@ -2189,6 +2271,8 @@ retry:
 			goto retry;
 		}
 	}
+
+	return true;
 }
 
 /*
@@ -2393,7 +2477,7 @@ XLogSendPhysical(void)
 	 * calls.
 	 */
 	enlargeStringInfo(&output_message, nbytes);
-	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+	XLogRead(&output_message.data[output_message.len], startptr, nbytes, false);
 	output_message.len += nbytes;
 	output_message.data[output_message.len] = '\0';
 
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5e6ccfc..084146d 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -47,6 +47,13 @@ typedef struct WalSnd
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
 
+	/*
+	 * Segment-spanning continuation records requires that the all related
+	 * segments preserved. This holds how far we should preserve older
+	 * segments only when it differs to flush location.
+	 */
+	XLogRecPtr	keep;
+
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
 
