From efb4f15d7b1f3f97da370cf4b7db7562eec08cb3 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Wed, 19 Jul 2023 11:29:02 +0900
Subject: [PATCH v5] Fix pg_recvlogical error message upon SIGINT/SIGTERM

When pg_recvlogical needs to abort on a signal like SIGINT/SIGTERM, it
is expected to exit cleanly.  However, the code forgot to clean up the
state of the connection befor leaving.  This would cause the tool to
emit messages like "unexpected termination of replication stream" error,
which is meant for really unexpected termination or a crash.

The code is refactored to apply the same termination abort operations for
signals, end LSN and keepalive cases.

Reported-by: Andres Freund
Author: Bharath Rupireddy
Reviewed-by: Kyotaro Horiguchi, Andres Freund, Cary Huang
Discussion: https://www.postgresql.org/message-id/20221019213953.htdtzikf4f45ywil%40awork3.anarazel.de
---
 src/bin/pg_basebackup/pg_recvlogical.c | 52 ++++++++++++++++++++------
 1 file changed, 41 insertions(+), 11 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3c7937a1d..3bd83deee7 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -32,6 +32,13 @@
 /* Time to sleep between reconnection attempts */
 #define RECONNECT_SLEEP_TIME 5
 
+typedef enum
+{
+	STREAM_STOP_END_OF_WAL,
+	STREAM_STOP_KEEPALIVE,
+	STREAM_STOP_SIGNAL
+}			StreamStopReason;
+
 /* Global Options */
 static char *outfile = NULL;
 static int	verbose = 0;
@@ -66,7 +73,7 @@ static void usage(void);
 static void StreamLogicalLog(void);
 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
-							   bool keepalive, XLogRecPtr lsn);
+							   StreamStopReason reason, XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -207,6 +214,8 @@ StreamLogicalLog(void)
 	TimestampTz last_status = -1;
 	int			i;
 	PQExpBuffer query;
+	XLogRecPtr	stop_lsn = InvalidXLogRecPtr;
+	StreamStopReason stop_reason = STREAM_STOP_SIGNAL;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
@@ -487,7 +496,7 @@ StreamLogicalLog(void)
 
 			if (endposReached)
 			{
-				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+				stop_reason = STREAM_STOP_KEEPALIVE;
 				time_to_abort = true;
 				break;
 			}
@@ -519,6 +528,12 @@ StreamLogicalLog(void)
 		/* Extract WAL location for this block */
 		cur_record_lsn = fe_recvint64(&copybuf[1]);
 
+		/*
+		 * If this loop is aborted, like on signal, saving this information
+		 * here gives a correct feedback.
+		 */
+		stop_lsn = cur_record_lsn;
+
 		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
 		{
 			/*
@@ -527,7 +542,7 @@ StreamLogicalLog(void)
 			 */
 			if (!flushAndSendFeedback(conn, &now))
 				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			stop_reason = STREAM_STOP_END_OF_WAL;
 			time_to_abort = true;
 			break;
 		}
@@ -572,12 +587,16 @@ StreamLogicalLog(void)
 			/* endpos was exactly the record we just processed, we're done */
 			if (!flushAndSendFeedback(conn, &now))
 				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			stop_reason = STREAM_STOP_END_OF_WAL;
 			time_to_abort = true;
 			break;
 		}
 	}
 
+	/* Clean up connection state if stream has been aborted */
+	if (time_to_abort)
+		prepareToTerminate(conn, endpos, stop_reason, stop_lsn);
+
 	res = PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -1021,18 +1040,29 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now)
  * retry on failure.
  */
 static void
-prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
+				   XLogRecPtr lsn)
 {
 	(void) PQputCopyEnd(conn, NULL);
 	(void) PQflush(conn);
 
 	if (verbose)
 	{
-		if (keepalive)
-			pg_log_info("end position %X/%X reached by keepalive",
-						LSN_FORMAT_ARGS(endpos));
-		else
-			pg_log_info("end position %X/%X reached by WAL record at %X/%X",
-						LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+		switch (reason)
+		{
+			case STREAM_STOP_SIGNAL:
+				pg_log_info("end position %X/%X reached on signal",
+							LSN_FORMAT_ARGS(lsn));
+				break;
+			case STREAM_STOP_KEEPALIVE:
+				pg_log_info("end position %X/%X reached by keepalive",
+							LSN_FORMAT_ARGS(endpos));
+				break;
+			case STREAM_STOP_END_OF_WAL:
+				Assert(!XLogRecPtrIsInvalid(lsn));
+				pg_log_info("end position %X/%X reached by WAL record at %X/%X",
+							LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+				break;
+		}
 	}
 }
-- 
2.40.1

