diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..3a0106bc93 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
 			{
 				ereport(DEBUG1,
-						(errmsg("standby \"%s\" has now caught up with primary",
+						(errmsg("\"%s\" has now caught up with upstream server",
 								application_name)));
 				WalSndSetState(WALSNDSTATE_STREAMING);
 			}
@@ -2758,10 +2758,10 @@ XLogSendLogical(void)
 	char	   *errm;
 
 	/*
-	 * Don't know whether we've caught up yet. We'll set it to true in
-	 * WalSndWaitForWal, if we're actually waiting. We also set to true if
-	 * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
-	 * i.e. when we're shutting down.
+	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
+	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
+	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
+	 * didn't wait - i.e. when we're shutting down.
 	 */
 	WalSndCaughtUp = false;
 
@@ -2774,6 +2774,9 @@ XLogSendLogical(void)
 
 	if (record != NULL)
 	{
+		/* XXX: Note that logical decoding cannot be used while in recovery */
+		XLogRecPtr	flushPtr = GetFlushRecPtr();
+
 		/*
 		 * Note the lack of any call to LagTrackerWrite() which is handled by
 		 * WalSndUpdateProgress which is called by output plugin through
@@ -2782,6 +2785,13 @@ XLogSendLogical(void)
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+
+		/*
+		 * If we have sent a record that is at or beyond the flushed point, we
+		 * have caught up.
+		 */
+		if (sentPtr >= flushPtr)
+			WalSndCaughtUp = true;
 	}
 	else
 	{
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index a08af65695..79fb457075 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1535,7 +1535,8 @@ also works for logical subscriptions)
 until its replication location in pg_stat_replication equals or passes the
 upstream's WAL insert point at the time this function is called. By default
 the replay_lsn is waited for, but 'mode' may be specified to wait for any of
-sent|write|flush|replay.
+sent|write|flush|replay. The connection catching up must be in a streaming
+state.
 
 If there is no active replication connection from this peer, waits until
 poll_query_until timeout.
@@ -1580,7 +1581,7 @@ sub wait_for_catchup
 	  . $lsn_expr . " on "
 	  . $self->name . "\n";
 	my $query =
-	  qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+	  qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
 	$self->poll_query_until('postgres', $query)
 	  or croak "timed out waiting for catchup";
 	print "done\n";
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 503556fd6c..d94458e00e 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -188,6 +188,11 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+# Restart the publisher and check the state of the subscriber which
+# should be in a streaming state after catching up.
+$node_publisher->stop('fast');
+$node_publisher->start;
+
 $node_publisher->wait_for_catchup($appname);
 
 $result = $node_subscriber->safe_psql('postgres',
