Logical Decoding follows timelines

Started by Simon Riggsabout 11 years ago9 messages
#1Simon Riggs
simon@2ndQuadrant.com
1 attachment(s)

Currently, it doesn't.

This patch is a WIP version of doing that, but only currently attempts
to do this in the WALSender.

Objective is to allow cascaded logical replication.

Very WIP, but here for comments.

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

logical_timeline_following.v1.patchapplication/octet-stream; name=logical_timeline_following.v1.patchDownload
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5937cbb..b4ae782 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -929,6 +929,8 @@ static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
 	StringInfoData buf;
+	XLogRecPtr	FlushPtr;
+	List	   *timeLineHistory;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -941,6 +943,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Force a disconnect, so that the decoding code doesn't need to care
 	 * about a eventual switch from running in recovery, to running in a
 	 * normal environment. Client code is expected to handle reconnects.
+	 * This covers the race condition where we are promoted half way
+	 * through starting up.
 	 */
 	if (am_cascading_walsender && !RecoveryInProgress())
 	{
@@ -949,6 +953,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		walsender_ready_to_stop = true;
 	}
 
+	if (am_cascading_walsender)
+	{
+		/* this also updates ThisTimeLineID */
+		FlushPtr = GetStandbyFlushRecPtr();
+	}
+	else
+		FlushPtr = GetFlushRecPtr();
+
 	WalSndSetState(WALSNDSTATE_CATCHUP);
 
 	/* Send a CopyBothResponse message, and start streaming */
@@ -975,6 +987,24 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
 
 	/*
+	 * Find the timeline for the start location, or throw an error.
+	 *
+	 * Logical replication relies upon replication slots. Each slot has a
+	 * single timeline history baked into it, so this should be easy.
+	 */
+	timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory);
+	if (sendTimeLine != ThisTimeLineID)
+	{
+		sendTimeLineIsHistoric = true;
+		sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory,
+										 &sendTimeLineNextTLI);
+	}
+	list_free_deep(timeLineHistory);
+
+	streamingDoneSending = streamingDoneReceiving = false;
+
+	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
@@ -2424,6 +2454,7 @@ XLogSendPhysical(void)
 static void
 XLogSendLogical(void)
 {
+	XLogRecPtr	SendRqstPtr;
 	XLogRecord *record;
 	char	   *errm;
 
@@ -2458,6 +2489,105 @@ XLogSendLogical(void)
 			WalSndCaughtUp = true;
 	}
 
+	if (am_cascading_walsender && !sendTimeLineIsHistoric)
+	{
+		/*
+		 * Streaming the latest timeline on a standby.
+		 *
+		 * The timeline we're recovering from can change, or we can be
+		 * promoted. In either case, the current timeline becomes historic. We
+		 * need to detect that so that we don't try to stream past the point
+		 * where we switched to another timeline. We check for promotion or
+		 * timeline switch after calculating FlushPtr, to avoid a race
+		 * condition: if the timeline becomes historic just after we checked
+		 * that it was still current, it's still be OK to stream it up to the
+		 * FlushPtr that was calculated before it became historic.
+		 */
+		bool		becameHistoric = false;
+
+		SendRqstPtr = GetStandbyFlushRecPtr();
+
+		if (!RecoveryInProgress())
+		{
+			/*
+			 * We have been promoted. RecoveryInProgress() updated
+			 * ThisTimeLineID to the new current timeline.
+			 */
+			am_cascading_walsender = false;
+			becameHistoric = true;
+		}
+		else
+		{
+			/*
+			 * Still a cascading standby. But is the timeline we're sending
+			 * still the one recovery is recovering from? ThisTimeLineID was
+			 * updated by the GetStandbyFlushRecPtr() call above.
+			 */
+			if (sendTimeLine != ThisTimeLineID)
+				becameHistoric = true;
+		}
+
+		if (becameHistoric)
+		{
+			/*
+			 * The timeline we were sending has become historic. Read the
+			 * timeline history file of the new timeline to see where exactly
+			 * we forked off from the timeline we were sending.
+			 */
+			List	   *history;
+
+			history = readTimeLineHistory(ThisTimeLineID);
+			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history,
+												   &sendTimeLineNextTLI);
+
+			Assert(sendTimeLine < sendTimeLineNextTLI);
+			list_free_deep(history);
+
+			sendTimeLineIsHistoric = true;
+
+			SendRqstPtr = sendTimeLineValidUpto;
+		}
+	}
+
+	/*
+	 * If this is a historic timeline and we've reached the point where we
+	 * forked to the next timeline, switch to new timeline.
+	 */
+	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
+	{
+		List	   *history;
+
+		/* close the current file. */
+		if (sendFile >= 0)
+			close(sendFile);
+		sendFile = -1;
+
+		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
+
+		/*
+		 * Did we reach the current timeline yet? If not, switch to the
+		 * next one and follow that to its endpoint.
+		 */
+		if (sendTimeLineNextTLI == ThisTimeLineID)
+			sendTimeLineIsHistoric = false;
+		else
+		{
+			List	   *history;
+
+			sendTimeLine = sendTimeLineNextTLI;
+			history = readTimeLineHistory(sendTimeLine);
+			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history,
+												   &sendTimeLineNextTLI);
+
+			Assert(sendTimeLine < sendTimeLineNextTLI);
+			list_free_deep(history);
+
+			SendRqstPtr = sendTimeLineValidUpto;
+		}
+	}
+
 	/* Update shared memory status */
 	{
 		/* use volatile pointer to prevent code rearrangement */
@@ -2467,6 +2597,8 @@ XLogSendLogical(void)
 		walsnd->sentPtr = sentPtr;
 		SpinLockRelease(&walsnd->mutex);
 	}
+
+	/* ps display updated by plugin, if desired */
 }
 
 /*
#2Heikki Linnakangas
hlinnakangas@vmware.com
In reply to: Simon Riggs (#1)
Re: Logical Decoding follows timelines

On 12/15/2014 08:54 PM, Simon Riggs wrote:

Currently, it doesn't.

This patch is a WIP version of doing that, but only currently attempts
to do this in the WALSender.

Objective is to allow cascaded logical replication.

Very WIP, but here for comments.

With the patch, XLogSendLogical uses the same logic to calculate
SendRqstPtr that XLogSendPhysical does. It would be good to refactor
that into a common function, rather than copy-paste.

SendRqstPtr isn't actually used for anything in XLogSendLogical.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Simon Riggs
simon@2ndQuadrant.com
In reply to: Heikki Linnakangas (#2)
Re: Logical Decoding follows timelines

On 16 December 2014 at 14:25, Heikki Linnakangas
<hlinnakangas@vmware.com> wrote:

On 12/15/2014 08:54 PM, Simon Riggs wrote:

Currently, it doesn't.

This patch is a WIP version of doing that, but only currently attempts
to do this in the WALSender.

Objective is to allow cascaded logical replication.

Very WIP, but here for comments.

With the patch, XLogSendLogical uses the same logic to calculate SendRqstPtr
that XLogSendPhysical does. It would be good to refactor that into a common
function, rather than copy-paste.

Some of the logic is similar, but not all.

SendRqstPtr isn't actually used for anything in XLogSendLogical.

It exists to allow the call which resets TLI.

I'll see if I can make it exactly identical; I didn't think so when I
first looked, will look again.

Thanks

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Simon Riggs
simon@2ndQuadrant.com
In reply to: Simon Riggs (#3)
1 attachment(s)
Re: Logical Decoding follows timelines

On 16 December 2014 at 21:17, Simon Riggs <simon@2ndquadrant.com> wrote:

This patch is a WIP version of doing that, but only currently attempts

With the patch, XLogSendLogical uses the same logic to calculate SendRqstPtr
that XLogSendPhysical does. It would be good to refactor that into a common
function, rather than copy-paste.

Some of the logic is similar, but not all.

SendRqstPtr isn't actually used for anything in XLogSendLogical.

It exists to allow the call which resets TLI.

I'll see if I can make it exactly identical; I didn't think so when I
first looked, will look again.

Yes, that works. New version attached

--
Simon Riggs http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachments:

logical_timeline_following.v2.patchapplication/octet-stream; name=logical_timeline_following.v2.patchDownload
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 019ae6a..6b7f219 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -217,7 +217,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transac
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
-
+static XLogRecPtr GetLatestRequestPtr(void);
+static TimeLineID ReadSendTimeLine(TimeLineID tli);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -536,8 +537,6 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->timeline != 0)
 	{
-		XLogRecPtr	switchpoint;
-
 		sendTimeLine = cmd->timeline;
 		if (sendTimeLine == ThisTimeLineID)
 		{
@@ -546,18 +545,13 @@ StartReplication(StartReplicationCmd *cmd)
 		}
 		else
 		{
-			List	   *timeLineHistory;
-
 			sendTimeLineIsHistoric = true;
 
 			/*
 			 * Check that the timeline the client requested for exists, and
 			 * the requested start location is on that timeline.
 			 */
-			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
-			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
-										 &sendTimeLineNextTLI);
-			list_free_deep(timeLineHistory);
+			(void) ReadSendTimeLine(cmd->timeline);
 
 			/*
 			 * Found the requested timeline in the history. Check that
@@ -577,8 +571,8 @@ StartReplication(StartReplicationCmd *cmd)
 			 * that's older than the switchpoint, if it's still in the same
 			 * WAL segment.
 			 */
-			if (!XLogRecPtrIsInvalid(switchpoint) &&
-				switchpoint < cmd->startpoint)
+			if (!XLogRecPtrIsInvalid(sendTimeLineValidUpto) &&
+				sendTimeLineValidUpto < cmd->startpoint)
 			{
 				ereport(ERROR,
 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
@@ -587,10 +581,9 @@ StartReplication(StartReplicationCmd *cmd)
 								cmd->timeline),
 						 errdetail("This server's history forked from timeline %u at %X/%X.",
 								   cmd->timeline,
-								   (uint32) (switchpoint >> 32),
-								   (uint32) (switchpoint))));
+								   (uint32) (sendTimeLineValidUpto >> 32),
+								   (uint32) (sendTimeLineValidUpto))));
 			}
-			sendTimeLineValidUpto = switchpoint;
 		}
 	}
 	else
@@ -929,6 +922,8 @@ static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
 	StringInfoData buf;
+	XLogRecPtr	FlushPtr;
+	List	   *timeLineHistory;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -941,6 +936,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Force a disconnect, so that the decoding code doesn't need to care
 	 * about an eventual switch from running in recovery, to running in a
 	 * normal environment. Client code is expected to handle reconnects.
+	 * This covers the race condition where we are promoted half way
+	 * through starting up.
 	 */
 	if (am_cascading_walsender && !RecoveryInProgress())
 	{
@@ -949,6 +946,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		walsender_ready_to_stop = true;
 	}
 
+	if (am_cascading_walsender)
+	{
+		/* this also updates ThisTimeLineID */
+		FlushPtr = GetStandbyFlushRecPtr();
+	}
+	else
+		FlushPtr = GetFlushRecPtr();
+
 	WalSndSetState(WALSNDSTATE_CATCHUP);
 
 	/* Send a CopyBothResponse message, and start streaming */
@@ -975,6 +980,24 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
 
 	/*
+	 * Find the timeline for the start location, or throw an error.
+	 *
+	 * Logical replication relies upon replication slots. Each slot has a
+	 * single timeline history baked into it, so this should be easy.
+	 */
+	timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory);
+	if (sendTimeLine != ThisTimeLineID)
+	{
+		sendTimeLineIsHistoric = true;
+		sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory,
+										 &sendTimeLineNextTLI);
+	}
+	list_free_deep(timeLineHistory);
+
+	streamingDoneSending = streamingDoneReceiving = false;
+
+	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
@@ -2201,93 +2224,10 @@ XLogSendPhysical(void)
 		return;
 	}
 
-	/* Figure out how far we can safely send the WAL. */
-	if (sendTimeLineIsHistoric)
-	{
-		/*
-		 * Streaming an old timeline timeline that's in this server's history,
-		 * but is not the one we're currently inserting or replaying. It can
-		 * be streamed up to the point where we switched off that timeline.
-		 */
-		SendRqstPtr = sendTimeLineValidUpto;
-	}
-	else if (am_cascading_walsender)
-	{
-		/*
-		 * Streaming the latest timeline on a standby.
-		 *
-		 * Attempt to send all WAL that has already been replayed, so that we
-		 * know it's valid. If we're receiving WAL through streaming
-		 * replication, it's also OK to send any WAL that has been received
-		 * but not replayed.
-		 *
-		 * The timeline we're recovering from can change, or we can be
-		 * promoted. In either case, the current timeline becomes historic. We
-		 * need to detect that so that we don't try to stream past the point
-		 * where we switched to another timeline. We check for promotion or
-		 * timeline switch after calculating FlushPtr, to avoid a race
-		 * condition: if the timeline becomes historic just after we checked
-		 * that it was still current, it's still be OK to stream it up to the
-		 * FlushPtr that was calculated before it became historic.
-		 */
-		bool		becameHistoric = false;
-
-		SendRqstPtr = GetStandbyFlushRecPtr();
-
-		if (!RecoveryInProgress())
-		{
-			/*
-			 * We have been promoted. RecoveryInProgress() updated
-			 * ThisTimeLineID to the new current timeline.
-			 */
-			am_cascading_walsender = false;
-			becameHistoric = true;
-		}
-		else
-		{
-			/*
-			 * Still a cascading standby. But is the timeline we're sending
-			 * still the one recovery is recovering from? ThisTimeLineID was
-			 * updated by the GetStandbyFlushRecPtr() call above.
-			 */
-			if (sendTimeLine != ThisTimeLineID)
-				becameHistoric = true;
-		}
-
-		if (becameHistoric)
-		{
-			/*
-			 * The timeline we were sending has become historic. Read the
-			 * timeline history file of the new timeline to see where exactly
-			 * we forked off from the timeline we were sending.
-			 */
-			List	   *history;
-
-			history = readTimeLineHistory(ThisTimeLineID);
-			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
-
-			Assert(sendTimeLine < sendTimeLineNextTLI);
-			list_free_deep(history);
-
-			sendTimeLineIsHistoric = true;
-
-			SendRqstPtr = sendTimeLineValidUpto;
-		}
-	}
-	else
-	{
-		/*
-		 * Streaming the current timeline on a master.
-		 *
-		 * Attempt to send all data that's already been written out and
-		 * fsync'd to disk.  We cannot go further than what's been written out
-		 * given the current implementation of XLogRead().  And in any case
-		 * it's unsafe to send WAL that is not securely down to disk on the
-		 * master: if the master subsequently crashes and restarts, slaves
-		 * must not have applied any WAL that gets lost on the master.
-		 */
-		SendRqstPtr = GetFlushRecPtr();
-	}
+	/*
+	 * Get the SendRqstPtr and follow any timeline changes.
+	 */
+	SendRqstPtr = GetLatestRequestPtr();
 
 	/*
 	 * If this is a historic timeline and we've reached the point where we
@@ -2424,6 +2364,7 @@ XLogSendPhysical(void)
 static void
 XLogSendLogical(void)
 {
+	XLogRecPtr	SendRqstPtr;
 	XLogRecord *record;
 	char	   *errm;
 
@@ -2458,6 +2399,42 @@ XLogSendLogical(void)
 			WalSndCaughtUp = true;
 	}
 
+	/*
+	 * We don't need the SendRqstPtr, but we want to follow timeline
+	 * changes and set sendTimeLineIsHistoric if required.
+	 */
+	if (!sendTimeLineIsHistoric)
+		SendRqstPtr = GetLatestRequestPtr();
+
+	/*
+	 * If this is a historic timeline and we've reached the point where we
+	 * forked to the next timeline, switch to new timeline.
+	 */
+	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
+	{
+		/* close the current file. */
+		if (sendFile >= 0)
+			close(sendFile);
+		sendFile = -1;
+
+		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
+
+		/*
+		 * Did we reach the current timeline yet? If not, switch to the
+		 * next one and follow that to its endpoint.
+		 */
+		if (sendTimeLineNextTLI == ThisTimeLineID)
+		{
+			sendTimeLineIsHistoric = false;
+			sendTimeLine = sendTimeLineNextTLI;
+			sendTimeLineValidUpto = InvalidXLogRecPtr;
+		}
+		else
+			sendTimeLine = ReadSendTimeLine(sendTimeLineNextTLI);
+	}
+
 	/* Update shared memory status */
 	{
 		/* use volatile pointer to prevent code rearrangement */
@@ -2467,6 +2444,8 @@ XLogSendLogical(void)
 		walsnd->sentPtr = sentPtr;
 		SpinLockRelease(&walsnd->mutex);
 	}
+
+	/* ps display updated by plugin, if desired */
 }
 
 /*
@@ -2957,3 +2936,106 @@ GetOldestWALSendPointer(void)
 }
 
 #endif
+
+static XLogRecPtr
+GetLatestRequestPtr(void)
+{
+	XLogRecPtr	SendRqstPtr;
+
+	/* Figure out how far we can safely send the WAL. */
+	if (sendTimeLineIsHistoric)
+	{
+		/*
+		 * Streaming an old timeline timeline that's in this server's history,
+		 * but is not the one we're currently inserting or replaying. It can
+		 * be streamed up to the point where we switched off that timeline.
+		 */
+		SendRqstPtr =  sendTimeLineValidUpto;
+	}
+	else if (am_cascading_walsender)
+	{
+		/*
+		 * Streaming the latest timeline on a standby.
+		 *
+		 * Attempt to send all WAL that has already been replayed, so that we
+		 * know it's valid. If we're receiving WAL through streaming
+		 * replication, it's also OK to send any WAL that has been received
+		 * but not replayed.
+		 *
+		 * The timeline we're recovering from can change, or we can be
+		 * promoted. In either case, the current timeline becomes historic. We
+		 * need to detect that so that we don't try to stream past the point
+		 * where we switched to another timeline. We check for promotion or
+		 * timeline switch after calculating FlushPtr, to avoid a race
+		 * condition: if the timeline becomes historic just after we checked
+		 * that it was still current, it's still be OK to stream it up to the
+		 * FlushPtr that was calculated before it became historic.
+		 */
+		bool		becameHistoric = false;
+
+		SendRqstPtr = GetStandbyFlushRecPtr();
+
+		if (!RecoveryInProgress())
+		{
+			/*
+			 * We have been promoted. RecoveryInProgress() updated
+			 * ThisTimeLineID to the new current timeline.
+			 */
+			am_cascading_walsender = false;
+			becameHistoric = true;
+		}
+		else
+		{
+			/*
+			 * Still a cascading standby. But is the timeline we're sending
+			 * still the one recovery is recovering from? ThisTimeLineID was
+			 * updated by the GetStandbyFlushRecPtr() call above.
+			 */
+			if (sendTimeLine != ThisTimeLineID)
+				becameHistoric = true;
+		}
+
+		if (becameHistoric)
+		{
+			/*
+			 * The timeline we were sending has become historic. Read the
+			 * timeline history file of the new timeline to see where exactly
+			 * we forked off from the timeline we were sending.
+			 */
+			(void) ReadSendTimeLine(ThisTimeLineID);
+
+			sendTimeLineIsHistoric = true;
+
+			SendRqstPtr = sendTimeLineValidUpto;
+		}
+	}
+	else
+	{
+		/*
+		 * Streaming the current timeline on a master.
+		 *
+		 * Attempt to send all data that's already been written out and
+		 * fsync'd to disk.  We cannot go further than what's been written out
+		 * given the current implementation of XLogRead().  And in any case
+		 * it's unsafe to send WAL that is not securely down to disk on the
+		 * master: if the master subsequently crashes and restarts, slaves
+		 * must not have applied any WAL that gets lost on the master.
+		 */
+		SendRqstPtr = GetFlushRecPtr();
+	}
+
+	return SendRqstPtr;
+}
+
+static TimeLineID
+ReadSendTimeLine(TimeLineID tli)
+{
+	List	   *history;
+
+	history = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLineValidUpto = tliSwitchPoint(tli, history,
+										   &sendTimeLineNextTLI);
+
+	Assert(tli < sendTimeLineNextTLI);
+	list_free_deep(history);
+}
#5Heikki Linnakangas
hlinnakangas@vmware.com
In reply to: Simon Riggs (#4)
Re: Logical Decoding follows timelines

On 12/17/2014 10:35 AM, Simon Riggs wrote:

On 16 December 2014 at 21:17, Simon Riggs <simon@2ndquadrant.com> wrote:

This patch is a WIP version of doing that, but only currently attempts

With the patch, XLogSendLogical uses the same logic to calculate SendRqstPtr
that XLogSendPhysical does. It would be good to refactor that into a common
function, rather than copy-paste.

Some of the logic is similar, but not all.

SendRqstPtr isn't actually used for anything in XLogSendLogical.

It exists to allow the call which resets TLI.

I'll see if I can make it exactly identical; I didn't think so when I
first looked, will look again.

Yes, that works. New version attached

Some comments, mostly on readability (not all of these were this patch's
fault):

/*
* Check that the timeline the client requested for exists, and
* the requested start location is on that timeline.
*/
(void) ReadSendTimeLine(cmd->timeline);

/*
* Found the requested timeline in the history. Check that
* requested startpoint is on that timeline in our history.
*
* This is quite loose on purpose. We only check that we didn't
* fork off the requested timeline before the switchpoint. We
* don't check that we switched *to* it before the requested
* starting point. This is because the client can legitimately
* request to start replication from the beginning of the WAL
* segment that contains switchpoint, but on the new timeline, so
* that it doesn't end up with a partial segment. If you ask for a
* too old starting point, you'll get an error later when we fail
* to find the requested WAL segment in pg_xlog.
*
* XXX: we could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it's still in the same
* WAL segment.
*/

The first comment implies that the ReadSendTimeLine call checks that the
requested start location is on the timeline, but that's actually done by
the code that follows the second comment. I would merge these two
comments, and move the ReadSendTimeLine call below the merged comment.

@@ -577,8 +571,8 @@ StartReplication(StartReplicationCmd *cmd)
* that's older than the switchpoint, if it's still in the same
* WAL segment.
*/
-			if (!XLogRecPtrIsInvalid(switchpoint) &&
-				switchpoint < cmd->startpoint)
+			if (!XLogRecPtrIsInvalid(sendTimeLineValidUpto) &&
+				sendTimeLineValidUpto < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",

IMHO using the local 'switchpoint' variable was more clear.

@@ -941,6 +936,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* Force a disconnect, so that the decoding code doesn't need to care
* about an eventual switch from running in recovery, to running in a
* normal environment. Client code is expected to handle reconnects.
+	 * This covers the race condition where we are promoted half way
+	 * through starting up.
*/
if (am_cascading_walsender && !RecoveryInProgress())
{

We could exit recovery immediately after this check. Why is this check
needed?

/*
+	 * Find the timeline for the start location, or throw an error.
+	 *
+	 * Logical replication relies upon replication slots. Each slot has a
+	 * single timeline history baked into it, so this should be easy.
+	 */

I don't understand what "baked in" means here.

+	/*
+	 * Get the SendRqstPtr and follow any timeline changes.
+	 */
+	SendRqstPtr = GetLatestRequestPtr();

The old comment used to say "Figure out how far we can safely send the
WAL". I think that was much more clear. It's not clear what following
timeline changes means here, and the fact that it "gets the SendRqstPtr"
is obvious from the code.

+
+static XLogRecPtr
+GetLatestRequestPtr(void)

This function desperately needs comment to explain what it does. I don't
much like its name either.

+static TimeLineID
+ReadSendTimeLine(TimeLineID tli)

Ditto. This function is also missing a "return".

I think it would slightly more intuitive if this function didn't set the
global variables directly, but simply returned the returned values to
the caller.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Andres Freund
andres@2ndquadrant.com
In reply to: Heikki Linnakangas (#5)
Re: Logical Decoding follows timelines

On 2015-01-03 12:07:29 +0200, Heikki Linnakangas wrote:

@@ -941,6 +936,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* Force a disconnect, so that the decoding code doesn't need to care
* about an eventual switch from running in recovery, to running in a
* normal environment. Client code is expected to handle reconnects.
+	 * This covers the race condition where we are promoted half way
+	 * through starting up.
*/
if (am_cascading_walsender && !RecoveryInProgress())
{

We could exit recovery immediately after this check. Why is this check
needed?

I probably wrote that ched and I don't think it really is needed. I
think that's a remnant of what the physical pendant used to do.

I think this needs slightly more abstraction because the infrastructure
is local to walsender.c - but logical decoding is also possible via
SQL. I'm not yet sure how that should look like. It'd be awesome if in
the course of that we could get rid of the nearly duplicated XLogRead()
:(

Simon, have you checked that this actually correctly follows timelines?
Afaics the patch as is won't allow to start logical decoding on a standby.

To allow logical decoding from clients I (apperently) wrote the the
following comment:
/* ----
* TODO: We got to change that someday soon...
*
* There's basically three things missing to allow this:
* 1) We need to be able to correctly and quickly identify the timeline a
* LSN belongs to
* 2) We need to force hot_standby_feedback to be enabled at all times so
* the primary cannot remove rows we need.
* 3) support dropping replication slots referring to a database, in
* dbase_redo. There can't be any active ones due to HS recovery
* conflicts, so that should be relatively easy.
* ----
*/
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("logical decoding cannot be used while in recovery")));

You're implementing 1) here. 3) doesn't look very challenging.

But 2) imo is rather more interesting/complex. I guess we'd have to
force that streaming replication is used, that a physical replication
slot is used and that hot_standby_feedback is enabled.

Greetings,

Andres Freund

--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Michael Paquier
michael.paquier@gmail.com
In reply to: Simon Riggs (#4)
Re: Logical Decoding follows timelines

On Wed, Dec 17, 2014 at 5:35 PM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 16 December 2014 at 21:17, Simon Riggs <simon@2ndquadrant.com> wrote:

This patch is a WIP version of doing that, but only currently attempts

With the patch, XLogSendLogical uses the same logic to calculate

SendRqstPtr

that XLogSendPhysical does. It would be good to refactor that into a

common

function, rather than copy-paste.

Some of the logic is similar, but not all.

SendRqstPtr isn't actually used for anything in XLogSendLogical.

It exists to allow the call which resets TLI.

I'll see if I can make it exactly identical; I didn't think so when I
first looked, will look again.

Yes, that works. New version attached

Moved patch to CF 2015-02 to not lose track of it, also because it does not
seem it received a proper review.
--
Michael

#8Michael Paquier
michael.paquier@gmail.com
In reply to: Michael Paquier (#7)
1 attachment(s)
Re: Logical Decoding follows timelines

On Fri, Feb 13, 2015 at 4:57 PM, Michael Paquier wrote:

Moved patch to CF 2015-02 to not lose track of it, also because it does not
seem it received a proper review.

This patch does not apply anymore, so attached is a rebased version.
The comments mentioned here have not been addressed:
/messages/by-id/54A7BF61.9080708@vmware.com
Also, what kind of tests have been done? Logical decoding cannot be
used while a node is in recovery.
Regards,
--
Michael

Attachments:

logical_timeline_following.v3.patchtext/x-patch; charset=US-ASCII; name=logical_timeline_following.v3.patchDownload
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4a20569..3036ce6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -216,7 +216,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transac
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
-
+static XLogRecPtr GetLatestRequestPtr(void);
+static TimeLineID ReadSendTimeLine(TimeLineID tli);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -535,8 +536,6 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->timeline != 0)
 	{
-		XLogRecPtr	switchpoint;
-
 		sendTimeLine = cmd->timeline;
 		if (sendTimeLine == ThisTimeLineID)
 		{
@@ -545,18 +544,13 @@ StartReplication(StartReplicationCmd *cmd)
 		}
 		else
 		{
-			List	   *timeLineHistory;
-
 			sendTimeLineIsHistoric = true;
 
 			/*
 			 * Check that the timeline the client requested for exists, and
 			 * the requested start location is on that timeline.
 			 */
-			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
-			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
-										 &sendTimeLineNextTLI);
-			list_free_deep(timeLineHistory);
+			(void) ReadSendTimeLine(cmd->timeline);
 
 			/*
 			 * Found the requested timeline in the history. Check that
@@ -576,8 +570,8 @@ StartReplication(StartReplicationCmd *cmd)
 			 * that's older than the switchpoint, if it's still in the same
 			 * WAL segment.
 			 */
-			if (!XLogRecPtrIsInvalid(switchpoint) &&
-				switchpoint < cmd->startpoint)
+			if (!XLogRecPtrIsInvalid(sendTimeLineValidUpto) &&
+				sendTimeLineValidUpto < cmd->startpoint)
 			{
 				ereport(ERROR,
 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
@@ -586,10 +580,9 @@ StartReplication(StartReplicationCmd *cmd)
 								cmd->timeline),
 						 errdetail("This server's history forked from timeline %u at %X/%X.",
 								   cmd->timeline,
-								   (uint32) (switchpoint >> 32),
-								   (uint32) (switchpoint))));
+								   (uint32) (sendTimeLineValidUpto >> 32),
+								   (uint32) (sendTimeLineValidUpto))));
 			}
-			sendTimeLineValidUpto = switchpoint;
 		}
 	}
 	else
@@ -928,6 +921,8 @@ static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
 	StringInfoData buf;
+	XLogRecPtr	FlushPtr;
+	List	   *timeLineHistory;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -940,6 +935,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * Force a disconnect, so that the decoding code doesn't need to care
 	 * about an eventual switch from running in recovery, to running in a
 	 * normal environment. Client code is expected to handle reconnects.
+	 * This covers the race condition where we are promoted half way
+	 * through starting up.
 	 */
 	if (am_cascading_walsender && !RecoveryInProgress())
 	{
@@ -948,6 +945,14 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		walsender_ready_to_stop = true;
 	}
 
+	if (am_cascading_walsender)
+	{
+		/* this also updates ThisTimeLineID */
+		FlushPtr = GetStandbyFlushRecPtr();
+	}
+	else
+		FlushPtr = GetFlushRecPtr();
+
 	WalSndSetState(WALSNDSTATE_CATCHUP);
 
 	/* Send a CopyBothResponse message, and start streaming */
@@ -974,6 +979,24 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	logical_startptr = MyReplicationSlot->data.restart_lsn;
 
 	/*
+	 * Find the timeline for the start location, or throw an error.
+	 *
+	 * Logical replication relies upon replication slots. Each slot has a
+	 * single timeline history baked into it, so this should be easy.
+	 */
+	timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory);
+	if (sendTimeLine != ThisTimeLineID)
+	{
+		sendTimeLineIsHistoric = true;
+		sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory,
+										 &sendTimeLineNextTLI);
+	}
+	list_free_deep(timeLineHistory);
+
+	streamingDoneSending = streamingDoneReceiving = false;
+
+	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
@@ -2179,93 +2202,10 @@ XLogSendPhysical(void)
 		return;
 	}
 
-	/* Figure out how far we can safely send the WAL. */
-	if (sendTimeLineIsHistoric)
-	{
-		/*
-		 * Streaming an old timeline that's in this server's history, but is
-		 * not the one we're currently inserting or replaying. It can be
-		 * streamed up to the point where we switched off that timeline.
-		 */
-		SendRqstPtr = sendTimeLineValidUpto;
-	}
-	else if (am_cascading_walsender)
-	{
-		/*
-		 * Streaming the latest timeline on a standby.
-		 *
-		 * Attempt to send all WAL that has already been replayed, so that we
-		 * know it's valid. If we're receiving WAL through streaming
-		 * replication, it's also OK to send any WAL that has been received
-		 * but not replayed.
-		 *
-		 * The timeline we're recovering from can change, or we can be
-		 * promoted. In either case, the current timeline becomes historic. We
-		 * need to detect that so that we don't try to stream past the point
-		 * where we switched to another timeline. We check for promotion or
-		 * timeline switch after calculating FlushPtr, to avoid a race
-		 * condition: if the timeline becomes historic just after we checked
-		 * that it was still current, it's still be OK to stream it up to the
-		 * FlushPtr that was calculated before it became historic.
-		 */
-		bool		becameHistoric = false;
-
-		SendRqstPtr = GetStandbyFlushRecPtr();
-
-		if (!RecoveryInProgress())
-		{
-			/*
-			 * We have been promoted. RecoveryInProgress() updated
-			 * ThisTimeLineID to the new current timeline.
-			 */
-			am_cascading_walsender = false;
-			becameHistoric = true;
-		}
-		else
-		{
-			/*
-			 * Still a cascading standby. But is the timeline we're sending
-			 * still the one recovery is recovering from? ThisTimeLineID was
-			 * updated by the GetStandbyFlushRecPtr() call above.
-			 */
-			if (sendTimeLine != ThisTimeLineID)
-				becameHistoric = true;
-		}
-
-		if (becameHistoric)
-		{
-			/*
-			 * The timeline we were sending has become historic. Read the
-			 * timeline history file of the new timeline to see where exactly
-			 * we forked off from the timeline we were sending.
-			 */
-			List	   *history;
-
-			history = readTimeLineHistory(ThisTimeLineID);
-			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
-
-			Assert(sendTimeLine < sendTimeLineNextTLI);
-			list_free_deep(history);
-
-			sendTimeLineIsHistoric = true;
-
-			SendRqstPtr = sendTimeLineValidUpto;
-		}
-	}
-	else
-	{
-		/*
-		 * Streaming the current timeline on a master.
-		 *
-		 * Attempt to send all data that's already been written out and
-		 * fsync'd to disk.  We cannot go further than what's been written out
-		 * given the current implementation of XLogRead().  And in any case
-		 * it's unsafe to send WAL that is not securely down to disk on the
-		 * master: if the master subsequently crashes and restarts, slaves
-		 * must not have applied any WAL that gets lost on the master.
-		 */
-		SendRqstPtr = GetFlushRecPtr();
-	}
+	/*
+	 * Get the SendRqstPtr and follow any timeline changes.
+	 */
+	SendRqstPtr = GetLatestRequestPtr();
 
 	/*
 	 * If this is a historic timeline and we've reached the point where we
@@ -2402,6 +2342,7 @@ XLogSendPhysical(void)
 static void
 XLogSendLogical(void)
 {
+	XLogRecPtr	SendRqstPtr;
 	XLogRecord *record;
 	char	   *errm;
 
@@ -2436,6 +2377,42 @@ XLogSendLogical(void)
 			WalSndCaughtUp = true;
 	}
 
+	/*
+	 * We don't need the SendRqstPtr, but we want to follow timeline
+	 * changes and set sendTimeLineIsHistoric if required.
+	 */
+	if (!sendTimeLineIsHistoric)
+		SendRqstPtr = GetLatestRequestPtr();
+
+	/*
+	 * If this is a historic timeline and we've reached the point where we
+	 * forked to the next timeline, switch to new timeline.
+	 */
+	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
+	{
+		/* close the current file. */
+		if (sendFile >= 0)
+			close(sendFile);
+		sendFile = -1;
+
+		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
+
+		/*
+		 * Did we reach the current timeline yet? If not, switch to the
+		 * next one and follow that to its endpoint.
+		 */
+		if (sendTimeLineNextTLI == ThisTimeLineID)
+		{
+			sendTimeLineIsHistoric = false;
+			sendTimeLine = sendTimeLineNextTLI;
+			sendTimeLineValidUpto = InvalidXLogRecPtr;
+		}
+		else
+			sendTimeLine = ReadSendTimeLine(sendTimeLineNextTLI);
+	}
+
 	/* Update shared memory status */
 	{
 		/* use volatile pointer to prevent code rearrangement */
@@ -2445,6 +2422,8 @@ XLogSendLogical(void)
 		walsnd->sentPtr = sentPtr;
 		SpinLockRelease(&walsnd->mutex);
 	}
+
+	/* ps display updated by plugin, if desired */
 }
 
 /*
@@ -2947,3 +2926,106 @@ GetOldestWALSendPointer(void)
 }
 
 #endif
+
+static XLogRecPtr
+GetLatestRequestPtr(void)
+{
+	XLogRecPtr	SendRqstPtr;
+
+	/* Figure out how far we can safely send the WAL. */
+	if (sendTimeLineIsHistoric)
+	{
+		/*
+		 * Streaming an old timeline timeline that's in this server's history,
+		 * but is not the one we're currently inserting or replaying. It can
+		 * be streamed up to the point where we switched off that timeline.
+		 */
+		SendRqstPtr =  sendTimeLineValidUpto;
+	}
+	else if (am_cascading_walsender)
+	{
+		/*
+		 * Streaming the latest timeline on a standby.
+		 *
+		 * Attempt to send all WAL that has already been replayed, so that we
+		 * know it's valid. If we're receiving WAL through streaming
+		 * replication, it's also OK to send any WAL that has been received
+		 * but not replayed.
+		 *
+		 * The timeline we're recovering from can change, or we can be
+		 * promoted. In either case, the current timeline becomes historic. We
+		 * need to detect that so that we don't try to stream past the point
+		 * where we switched to another timeline. We check for promotion or
+		 * timeline switch after calculating FlushPtr, to avoid a race
+		 * condition: if the timeline becomes historic just after we checked
+		 * that it was still current, it's still be OK to stream it up to the
+		 * FlushPtr that was calculated before it became historic.
+		 */
+		bool		becameHistoric = false;
+
+		SendRqstPtr = GetStandbyFlushRecPtr();
+
+		if (!RecoveryInProgress())
+		{
+			/*
+			 * We have been promoted. RecoveryInProgress() updated
+			 * ThisTimeLineID to the new current timeline.
+			 */
+			am_cascading_walsender = false;
+			becameHistoric = true;
+		}
+		else
+		{
+			/*
+			 * Still a cascading standby. But is the timeline we're sending
+			 * still the one recovery is recovering from? ThisTimeLineID was
+			 * updated by the GetStandbyFlushRecPtr() call above.
+			 */
+			if (sendTimeLine != ThisTimeLineID)
+				becameHistoric = true;
+		}
+
+		if (becameHistoric)
+		{
+			/*
+			 * The timeline we were sending has become historic. Read the
+			 * timeline history file of the new timeline to see where exactly
+			 * we forked off from the timeline we were sending.
+			 */
+			(void) ReadSendTimeLine(ThisTimeLineID);
+
+			sendTimeLineIsHistoric = true;
+
+			SendRqstPtr = sendTimeLineValidUpto;
+		}
+	}
+	else
+	{
+		/*
+		 * Streaming the current timeline on a master.
+		 *
+		 * Attempt to send all data that's already been written out and
+		 * fsync'd to disk.  We cannot go further than what's been written out
+		 * given the current implementation of XLogRead().  And in any case
+		 * it's unsafe to send WAL that is not securely down to disk on the
+		 * master: if the master subsequently crashes and restarts, slaves
+		 * must not have applied any WAL that gets lost on the master.
+		 */
+		SendRqstPtr = GetFlushRecPtr();
+	}
+
+	return SendRqstPtr;
+}
+
+static TimeLineID
+ReadSendTimeLine(TimeLineID tli)
+{
+	List	   *history;
+
+	history = readTimeLineHistory(ThisTimeLineID);
+	sendTimeLineValidUpto = tliSwitchPoint(tli, history,
+										   &sendTimeLineNextTLI);
+
+	Assert(tli < sendTimeLineNextTLI);
+	list_free_deep(history);
+}
#9Simon Riggs
simon@2ndQuadrant.com
In reply to: Michael Paquier (#8)
Re: Logical Decoding follows timelines

On 21 April 2015 at 05:49, Michael Paquier <michael.paquier@gmail.com>
wrote:

On Fri, Feb 13, 2015 at 4:57 PM, Michael Paquier wrote:

Moved patch to CF 2015-02 to not lose track of it, also because it does

not

seem it received a proper review.

This patch does not apply anymore, so attached is a rebased version.
The comments mentioned here have not been addressed:
/messages/by-id/54A7BF61.9080708@vmware.com
Also, what kind of tests have been done? Logical decoding cannot be
used while a node is in recovery.

Returned with Feedback, I think. I have a new approach to be coded for next
release.

--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/&gt;
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services