commit c3222ed9ef49bbe00af4b75362c59622834bea8d
Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date:   Tue May 7 15:37:02 2013 +0300

    Fix walsender failure at promotion.
    
    If a standby server has a cascading standby server connected to it, it's
    possible that WAL has already been sent up to the next WAL page boundary,
    splitting a WAL record in the middle, when the first standby server is
    promoted. Don't throw an assertion failure or error in walsender if that
    happens.
    
    Also, adjust the logic in receiving end so that if a timeline appears after
    reading half of a WAL record, WAL streaming is restarted at the end of the
    last completely replayed record. Otherwise the receiving server would get
    stuck trying to find the missing half of the WAL record on the old timeline,
    instead of backing off to the previous record boundary, where it will find
    the timeline switch record on the new timeline. This isn't a perfect fix, as
    the WAL receiver is restarted when that happens. That's OK from a
    correctness point of view, but it establishing a new connection causes a
    small delay. We might want to re-visit that later.
    
    This also fixes a variant of the same bug in pg_receivexlog: if pg_receivexlog
    had already received WAL on previous timeline up to a segment boundary, when
    the upstream standby server is promoted so that the timeline switch record
    falls on the previous segment, pg_receivexlog would miss the segment containing
    the timeline switch. To fix that, have walsender send the position of the
    timeline switch at end-of-streaming, in addition to the next timeline's ID.
    It was previously assumed that the switch happened exactly where the streaming
    stopped.
    
    Reported by Fujii Masao.

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 1e2604b..7016511 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1423,10 +1423,15 @@ The commands accepted in walsender mode are:
      <para>
       After streaming all the WAL on a timeline that is not the latest one,
       the server will end streaming by exiting the COPY mode. When the client
-      acknowledges this by also exiting COPY mode, the server sends a
-      single-row, single-column result set indicating the next timeline in
-      this server's history. That is followed by a CommandComplete message,
-      and the server is ready to accept a new command.
+      acknowledges this by also exiting COPY mode, the server sends a result
+      set with one row and two columns, indicating the next timeline in this
+      server's history. The first column is the next timeline's ID, and the
+      second column is the XLOG position where the switch happened. Usually,
+      the switch position is the end of the WAL that was streamed, but there
+      are corner cases where the server can send some WAL from the old
+      timeline that it has not itself replayed before promoting. Finally, the
+      server sends CommandComplete message, and is ready to accept a new
+      command.
      </para>
 
      <para>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 959f423..f7dd61c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						}
 						else
 						{
-							ptr = RecPtr;
+							ptr = tliRecPtr;
 							tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
 
 							if (curFileTLI > 0 && tli < curFileTLI)
@@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 									 tli, curFileTLI);
 						}
 						curFileTLI = tli;
-						RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
+						RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+						receivedUpto = 0;
 					}
 					/*
 					 * Move to XLOG_FROM_STREAM state in either case. We'll get
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e6e670e..f7cc6e3 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
 	res = PQgetResult(streamConn);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
-		/* Read the next timeline's ID */
-		if (PQnfields(res) != 1 || PQntuples(res) != 1)
+		/*
+		 * Read the next timeline's ID. The server also sends the timeline's
+		 * starting point, but it is ignored.
+		 */
+		if (PQnfields(res) < 2 || PQntuples(res) != 1)
 			ereport(ERROR,
 					(errmsg("unexpected result set after end-of-streaming")));
 		*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index d414808..e5ad843 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
 	walrcv->startTime = now;
 
 	/*
-	 * If this is the first startup of walreceiver, we initialize receivedUpto
-	 * and latestChunkStart to receiveStart.
+	 * If this is the first startup of walreceiver (on this timeline),
+	 * initialize receivedUpto and latestChunkStart to the starting point.
 	 */
-	if (walrcv->receiveStart == 0)
+	if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
 	{
 		walrcv->receivedUpto = recptr;
+		walrcv->receivedTLI = tli;
 		walrcv->latestChunkStart = recptr;
 	}
 	walrcv->receiveStart = recptr;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c05bb1e..1dcb0f5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd)
 	 */
 	if (sendTimeLineIsHistoric)
 	{
-		char		str[11];
-		snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+		char		tli_str[11];
+		char		startpos_str[8+1+8+1];
 
-		pq_beginmessage(&buf, 'T'); /* RowDescription */
-		pq_sendint(&buf, 1, 2);		/* 1 field */
+		snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
+		snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+				 (uint32) (sendTimeLineValidUpto >> 32),
+				 (uint32) sendTimeLineValidUpto);
+
+		pq_beginmessage(&buf, 'T'); 	/* RowDescription */
+		pq_sendint(&buf, 2, 2);			/* 2 fields */
 
 		/* Field header */
 		pq_sendstring(&buf, "next_tli");
-		pq_sendint(&buf, 0, 4);		/* table oid */
-		pq_sendint(&buf, 0, 2);		/* attnum */
+		pq_sendint(&buf, 0, 4);			/* table oid */
+		pq_sendint(&buf, 0, 2);			/* attnum */
 		/*
 		 * int8 may seem like a surprising data type for this, but in theory
 		 * int4 would not be wide enough for this, as TimeLineID is unsigned.
@@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd)
 		pq_sendint(&buf, -1, 2);
 		pq_sendint(&buf, 0, 4);
 		pq_sendint(&buf, 0, 2);
+
+		pq_sendstring(&buf, "next_tli_startpos");
+		pq_sendint(&buf, 0, 4);			/* table oid */
+		pq_sendint(&buf, 0, 2);			/* attnum */
+		pq_sendint(&buf, TEXTOID, 4);	/* type oid */
+		pq_sendint(&buf, -1, 2);
+		pq_sendint(&buf, 0, 4);
+		pq_sendint(&buf, 0, 2);
 		pq_endmessage(&buf);
 
 		/* Data row */
 		pq_beginmessage(&buf, 'D');
-		pq_sendint(&buf, 1, 2);		/* number of columns */
-		pq_sendint(&buf, strlen(str), 4);	/* length */
-		pq_sendbytes(&buf, str, strlen(str));
+		pq_sendint(&buf, 2, 2);			/* number of columns */
+
+		pq_sendint(&buf, strlen(tli_str), 4);	/* length */
+		pq_sendbytes(&buf, tli_str, strlen(tli_str));
+
+		pq_sendint(&buf, strlen(startpos_str), 4);	/* length */
+		pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
+
 		pq_endmessage(&buf);
 	}
 
@@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup)
 
 			history = readTimeLineHistory(ThisTimeLineID);
 			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
-			Assert(sentPtr <= sendTimeLineValidUpto);
+
 			Assert(sendTimeLine < sendTimeLineNextTLI);
 			list_free_deep(history);
 
-			/* the current send pointer should be <= the switchpoint */
-			if (!(sentPtr <= sendTimeLineValidUpto))
-				elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
-					 sendTimeLine,
-					 (uint32) (sendTimeLineValidUpto >> 32),
-					 (uint32) sendTimeLineValidUpto,
-					 (uint32) (sentPtr >> 32),
-					 (uint32) sentPtr);
-
 			sendTimeLineIsHistoric = true;
 
 			SendRqstPtr = sendTimeLineValidUpto;
@@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup)
 	/*
 	 * If this is a historic timeline and we've reached the point where we
 	 * forked to the next timeline, stop streaming.
+	 *
+	 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
+	 * startup process will normally replay all WAL that has been received from
+	 * the master, before promoting, but if the WAL streaming is terminated at
+	 * a WAL page boundary, the valid portion of the timeline might end in the
+	 * middle of a WAL record. We might've already sent the first half of that
+	 * partial WAL record to the cascading standby, so that sentPtr >
+	 * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
+	 * partial WAL record either, so it can still follow our timeline switch.
 	 */
 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
 	{
@@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup)
 		streamingDoneSending = true;
 
 		*caughtup = true;
+
+		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
 		return;
 	}
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index e4da799..fa0ac51 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -83,10 +83,12 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
 				timeline);
 
 	/*
-	 * Note that we report the previous, not current, position here. That's
-	 * the exact location where the timeline switch happend. After the switch,
-	 * we restart streaming from the beginning of the segment, so xlogpos can
-	 * smaller than prevpos if we just switched to new timeline.
+	 * Note that we report the previous, not current, position here. After a
+	 * timeline switch, xlogpos points to the beginning of the segment because
+	 * that's where we always begin streaming. Reporting the end of previous
+	 * timeline isn't totally accurate, because the next timeline can begin
+	 * slightly before the end of the WAL that we received on the previous
+	 * timeline, but it's close enough for reporting purposes.
 	 */
 	if (prevtimeline != 0 && prevtimeline != timeline)
 		fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index f297003..98e874f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,6 +37,9 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 stream_stop_callback stream_stop, int standby_message_timeout,
 				 char *partial_suffix, XLogRecPtr *stoppos);
 
+static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
+						 uint32 *timeline);
+
 /*
  * Open a new WAL file in the specified directory.
  *
@@ -627,26 +630,44 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		 * There are two possible reasons for that: a controlled shutdown,
 		 * or we reached the end of the current timeline. In case of
 		 * end-of-timeline, the server sends a result set after Copy has
-		 * finished, containing the next timeline's ID. Read that, and
-		 * restart streaming from the next timeline.
+		 * finished, containing information about the next timeline. Read
+		 * that, and restart streaming from the next timeline. In case of
+		 * controlled shutdown, stop here.
 		 */
-
 		if (PQresultStatus(res) == PGRES_TUPLES_OK)
 		{
 			/*
-			 * End-of-timeline. Read the next timeline's ID.
+			 * End-of-timeline. Read the next timeline's ID and starting
+			 * position. Usually, the starting position will match the end of
+			 * the previous timeline, but there are corner cases like if the
+			 * server had sent us half of a WAL record, when it was promoted.
+			 * The new timeline will begin at the end of the last complete
+			 * record in that case, overlapping the partial WAL record on the
+			 * the old timeline.
 			 */
 			uint32		newtimeline;
+			bool		parsed;
 
-			newtimeline = atoi(PQgetvalue(res, 0, 0));
+			parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
 			PQclear(res);
+			if (!parsed)
+				goto error;
 
+			/* Sanity check the values the server gave us */
 			if (newtimeline <= timeline)
 			{
-				/* shouldn't happen */
 				fprintf(stderr,
-						"server reported unexpected next timeline %u, following timeline %u\n",
-						newtimeline, timeline);
+						_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
+						progname, newtimeline, timeline);
+				goto error;
+			}
+			if (startpos > stoppos)
+			{
+				fprintf(stderr,
+						_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
+						progname,
+						timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
+						newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
 				goto error;
 			}
 
@@ -666,7 +687,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			 * Always start streaming at the beginning of a segment.
 			 */
 			timeline = newtimeline;
-			startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
+			startpos = startpos - (startpos % XLOG_SEG_SIZE);
 			continue;
 		}
 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -705,6 +726,50 @@ error:
 }
 
 /*
+ * Helper function to parse the result set returned by server after streaming
+ * has finished. On failure, prints an error to stderr and returns false.
+ */
+static bool
+ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
+{
+	uint32		startpos_xlogid,
+				startpos_xrecoff;
+
+	/*----------
+	 * The result set consists of one row and two columns, e.g:
+	 *
+	 *  next_tli | next_tli_startpos
+	 * ----------+-------------------
+	 *         4 | 0/9949AE0
+	 *
+	 * next_tli is the timeline ID of the next timeline after the one that
+	 * just finished streaming. next_tli_startpos is the XLOG position where
+	 * the server switched to it.
+	 *----------
+	 */
+	if (PQnfields(res) < 2 || PQntuples(res) != 1)
+	{
+		fprintf(stderr,
+				_("%s: unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields\n"),
+				progname, PQntuples(res), PQnfields(res), 1, 2);
+		return false;
+	}
+
+	*timeline = atoi(PQgetvalue(res, 0, 0));
+	if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
+			   &startpos_xrecoff) != 2)
+	{
+		fprintf(stderr,
+				_("%s: could not parse next timeline's starting point \"%s\"\n"),
+				progname, PQgetvalue(res, 0, 1));
+		return false;
+	}
+	*startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
+
+	return true;
+}
+
+/*
  * The main loop of ReceiveXLogStream. Handles the COPY stream after
  * initiating streaming with the START_STREAMING command.
  *
