libpqsrv_connect_params should call ProcessWalRcvInterrupts

Started by Kyotaro Horiguchiabout 2 years ago19 messages
#1Kyotaro Horiguchi
horikyota.ntt@gmail.com
1 attachment(s)

Hello.

We've noticed that when walreceiver is waiting for a connection to
complete, standby does not immediately respond to promotion
requests. In PG14, upon receiving a promotion request, walreceiver
terminates instantly, but in PG16, it waits for connection
timeout. This behavior is attributed to commit 728f86fec65, where a
part of libpqrcv_connect was simply replaced with a call to
libpqsrc_connect_params. This behavior can be verified by simply
dropping packets from the standby to the primary.

By a simple thought, in walreceiver, libpqsrv_connect_internal could
just call ProcessWalRcvInterrupts() instead of CHECK_FOR_INTERRUPTS(),
but this approach is quite ugly. Since ProcessWalRcvInterrupts()
originally calls CHECK_FOR_INTERRUPTS() and there are no standalone
calls to CHECK_FOR_INTERRUPTS() within walreceiver, I think it might
be better to use ProcDiePending instead of ShutdownRequestPending. I
added a subset function of die() as the SIGTERM handler in walsender
in a crude patch attached.

What do you think about the issue, and the approach?

If there are no issues or objections with this method, I will continue
to refine this patch. For now, I plan to register it for the upcoming
commitfest.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

walrcv_shutdown_deblocking.patchtext/x-patch; charset=us-asciiDownload
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 693b3669ba..e503799bd8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -738,7 +738,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1042,7 +1042,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 26ded928a7..c53a8e6c89 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
+static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
 void
-ProcessWalRcvInterrupts(void)
+WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 {
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
+	int			save_errno = errno;
 
-	if (ShutdownRequestPending)
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
 	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
+		InterruptPending = true;
+		ProcDiePending = true;
 	}
+
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+	
 }
 
+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+	return WalRcv != NULL;
+}
 
 /* Main entry point for walreceiver process */
 void
@@ -277,7 +272,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -456,7 +451,7 @@ WalReceiverMain(void)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -552,7 +547,7 @@ WalReceiverMain(void)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7298a187d1..04cab7dafa 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -59,6 +59,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3286,6 +3287,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (IsWalReceiver())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (IsBackgroundWorker)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 949e874f21..c69c8daa6a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -456,8 +456,8 @@ walrcv_clear_result(WalRcvExecResult *walres)
 }
 
 /* prototypes for functions in walreceiver.c */
+extern bool IsWalReceiver(void);
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-extern void ProcessWalRcvInterrupts(void);
 extern void WalRcvForceReply(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
#2Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#1)
Network failure may prevent promotion

(Apology for resubmitting due to poor subject of the previous mail)
---
Hello.

We've noticed that when walreceiver is waiting for a connection to
complete, standby does not immediately respond to promotion
requests. In PG14, upon receiving a promotion request, walreceiver
terminates instantly, but in PG16, it waits for connection
timeout. This behavior is attributed to commit 728f86fec65, where a
part of libpqrcv_connect was simply replaced with a call to
libpqsrc_connect_params. This behavior can be verified by simply
dropping packets from the standby to the primary.

By a simple thought, in walreceiver, libpqsrv_connect_internal could
just call ProcessWalRcvInterrupts() instead of CHECK_FOR_INTERRUPTS(),
but this approach is quite ugly. Since ProcessWalRcvInterrupts()
originally calls CHECK_FOR_INTERRUPTS() and there are no standalone
calls to CHECK_FOR_INTERRUPTS() within walreceiver, I think it might
be better to use ProcDiePending instead of ShutdownRequestPending. I
added a subset function of die() as the SIGTERM handler in walsender
in a crude patch attached.

What do you think about the issue, and the approach?

If there are no issues or objections with this method, I will continue
to refine this patch. For now, I plan to register it for the upcoming
commitfest.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#3Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Kyotaro Horiguchi (#2)
1 attachment(s)
Re: Network failure may prevent promotion

At Sun, 31 Dec 2023 20:07:41 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

We've noticed that when walreceiver is waiting for a connection to
complete, standby does not immediately respond to promotion
requests. In PG14, upon receiving a promotion request, walreceiver
terminates instantly, but in PG16, it waits for connection
timeout. This behavior is attributed to commit 728f86fec65, where a
part of libpqrcv_connect was simply replaced with a call to
libpqsrc_connect_params. This behavior can be verified by simply
dropping packets from the standby to the primary.

Apologize for the inconvenience on my part, but I need to fix this
behavior. To continue this discussion, I'm providing a repro script
here.

With the script, the standby is expected to promote immediately,
emitting the following log lines:

standby.log:

2024-01-18 16:25:22.245 JST [31849] LOG: received promote request
2024-01-18 16:25:22.245 JST [31850] FATAL: terminating walreceiver process due to administrator command
2024-01-18 16:25:22.246 JST [31849] LOG: redo is not required
2024-01-18 16:25:22.246 JST [31849] LOG: selected new timeline ID: 2
2024-01-18 16:25:22.274 JST [31849] LOG: archive recovery complete
2024-01-18 16:25:22.275 JST [31847] LOG: checkpoint starting: force
2024-01-18 16:25:22.277 JST [31846] LOG: database system is ready to accept connections
2024-01-18 16:25:22.280 JST [31847] LOG: checkpoint complete: wrote 3 buffers (0.0%); 0 WAL file(s) added, 0 removed, 0 recycled; write=0.001 s, sync=0.001 s, total=0.005 s; sync files=2, longest=0.001 s, average=0.001 s; distance=0 kB, estimate=0 kB; lsn=0/1548E98, redo lsn=0/1548E40
2024-01-18 16:25:22.356 JST [31846] LOG: received immediate shutdown request
2024-01-18 16:25:22.361 JST [31846] LOG: database system is shut down

After 728f86fec65 was introduced, promotion does not complete with the
same operation, as follows. The patch attached to the previous mail
fixes this behavior to the old behavior above.

2024-01-18 16:47:53.314 JST [34515] LOG: received promote request
2024-01-18 16:48:03.947 JST [34512] LOG: received immediate shutdown request
2024-01-18 16:48:03.952 JST [34512] LOG: database system is shut down

The attached script requires that sudo is executable. And there's
another point to note. The script attempts to establish a replication
connection to $primary_address:$primary_port. To packet-filter can
work, it must be a remote address that is accessible when no
packet-filter setting is set up. The firewall-cmd setting, need to be
configured to block this connection. If simply an inaccessible IP
address is set, the process will fail immediately with a "No route to
host" error before the first packet is sent out, and it will not be
blocked as intended.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

promote_test.pltext/plain; charset=us-asciiDownload
#4Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Kyotaro Horiguchi (#3)
Re: Network failure may prevent promotion

On 18/01/2024 10:26, Kyotaro Horiguchi wrote:

At Sun, 31 Dec 2023 20:07:41 +0900 (JST), Kyotaro Horiguchi <horikyota.ntt@gmail.com> wrote in

We've noticed that when walreceiver is waiting for a connection to
complete, standby does not immediately respond to promotion
requests. In PG14, upon receiving a promotion request, walreceiver
terminates instantly, but in PG16, it waits for connection
timeout. This behavior is attributed to commit 728f86fec65, where a
part of libpqrcv_connect was simply replaced with a call to
libpqsrc_connect_params. This behavior can be verified by simply
dropping packets from the standby to the primary.

Apologize for the inconvenience on my part, but I need to fix this
behavior. To continue this discussion, I'm providing a repro script
here.

Thanks for script, I can repro this with it.

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

In your patch, there's one more stray reference to
ProcessWalRcvInterrupts() in the comment above libpqrcv_PQexec. That
makes me wonder, why didn't commit 728f86fec6 go all the way and also
replace libpqrcv_PQexec and libpqrcv_PQgetResult with libpqsrv_exec and
libpqsrv_get_result?

--
Heikki Linnakangas
Neon (https://neon.tech)

#5Michael Paquier
michael@paquier.xyz
In reply to: Heikki Linnakangas (#4)
Re: Network failure may prevent promotion

On Thu, Jan 18, 2024 at 03:42:28PM +0200, Heikki Linnakangas wrote:

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

Is there a point in keeping 728f86fec6 as well on HEAD? That does not
strike me as wise to keep that in the tree for now. If it needs to be
reworked, looking at this problem from scratch would be a safer
approach.
--
Michael

#6Peter Smith
smithpb2250@gmail.com
In reply to: Kyotaro Horiguchi (#3)
Re: Network failure may prevent promotion

2024-01 Commitfest.

Hi, This patch has a CF status of "Needs Review" [1]https://commitfest.postgresql.org/46/4748/, but it seems
there were CFbot test failures last time it was run [2]https://cirrus-ci.com/github/postgresql-cfbot/postgresql/commitfest/46/4748. Please have a
look and post an updated version if necessary.

======
[1]: https://commitfest.postgresql.org/46/4748/
[2]: https://cirrus-ci.com/github/postgresql-cfbot/postgresql/commitfest/46/4748

Kind Regards,
Peter Smith.

#7Fujii Masao
masao.fujii@gmail.com
In reply to: Heikki Linnakangas (#4)
Re: Network failure may prevent promotion

On Thu, Jan 18, 2024 at 10:42 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

+1 for the revert.

This issue should be fixed in the upcoming minor release
since it might cause unexpected delays in failover times.

Regards,

--
Fujii Masao

#8Andres Freund
andres@anarazel.de
In reply to: Michael Paquier (#5)
Re: Network failure may prevent promotion

Hi,

On 2024-01-19 12:28:05 +0900, Michael Paquier wrote:

On Thu, Jan 18, 2024 at 03:42:28PM +0200, Heikki Linnakangas wrote:

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

Is there a point in keeping 728f86fec6 as well on HEAD? That does not
strike me as wise to keep that in the tree for now. If it needs to be
reworked, looking at this problem from scratch would be a safer
approach.

IDK, I think we'll introduce this type of bug over and over if we don't fix it
properly.

Greetings,

Andres Freund

#9Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Andres Freund (#8)
Re: Network failure may prevent promotion

At Mon, 22 Jan 2024 13:29:10 -0800, Andres Freund <andres@anarazel.de> wrote in

Hi,

On 2024-01-19 12:28:05 +0900, Michael Paquier wrote:

On Thu, Jan 18, 2024 at 03:42:28PM +0200, Heikki Linnakangas wrote:

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

Is there a point in keeping 728f86fec6 as well on HEAD? That does not
strike me as wise to keep that in the tree for now. If it needs to be
reworked, looking at this problem from scratch would be a safer
approach.

IDK, I think we'll introduce this type of bug over and over if we don't fix it
properly.

Just to clarify my position, I thought that 728f86fec6 was heading the
right direction. Considering the current approach to signal handling
in walreceiver, I believed that it would be better to further
generalize in this direction rather than reverting. That's why I
proposed that patch.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

#10Fujii Masao
masao.fujii@gmail.com
In reply to: Kyotaro Horiguchi (#9)
Re: Network failure may prevent promotion

On Tue, Jan 23, 2024 at 1:23 PM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:

At Mon, 22 Jan 2024 13:29:10 -0800, Andres Freund <andres@anarazel.de> wrote in

Hi,

On 2024-01-19 12:28:05 +0900, Michael Paquier wrote:

On Thu, Jan 18, 2024 at 03:42:28PM +0200, Heikki Linnakangas wrote:

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

Is there a point in keeping 728f86fec6 as well on HEAD? That does not
strike me as wise to keep that in the tree for now. If it needs to be
reworked, looking at this problem from scratch would be a safer
approach.

IDK, I think we'll introduce this type of bug over and over if we don't fix it
properly.

Just to clarify my position, I thought that 728f86fec6 was heading the
right direction. Considering the current approach to signal handling
in walreceiver, I believed that it would be better to further
generalize in this direction rather than reverting. That's why I
proposed that patch.

Regarding the patch, here are the review comments.

+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+ return WalRcv != NULL;
+}

This looks wrong because WalRcv can be non-NULL in processes other
than walreceiver.

- pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+ pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */

Can't we just use die(), instead?

Regards,

--
Fujii Masao

#11Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Fujii Masao (#10)
1 attachment(s)
Re: Network failure may prevent promotion

Thank you for looking this!

At Tue, 23 Jan 2024 15:07:10 +0900, Fujii Masao <masao.fujii@gmail.com> wrote in

Regarding the patch, here are the review comments.

+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+ return WalRcv != NULL;
+}

This looks wrong because WalRcv can be non-NULL in processes other
than walreceiver.

Mmm. Sorry for the silly mistake. We can use B_WAL_RECEIVER
instead. I'm not sure if the new function IsWalReceiver() is
required. The expression "MyBackendType == B_WAL_RECEIVER" is quite
descriptive. However, the function does make ProcessInterrupts() more
aligned regarding process types.

- pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+ pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */

Can't we just use die(), instead?

There was a comment explaining the problems associated with exiting
within a signal handler;

- * Currently, only SIGTERM is of interest. We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock. Instead, the

And I think we should keep the considerations it suggests. The patch
removes the comment itself, but it does so because it implements our
standard process exit procedure, which incorporates points suggested
by the now-removed comment.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

walrcv_shutdown_deblocking_v2.patchtext/x-patch; charset=us-asciiDownload
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 201c36cb22..db779dc6ca 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -749,7 +749,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1053,7 +1053,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 728059518e..e491f7d4c5 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
+static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
 void
-ProcessWalRcvInterrupts(void)
+WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 {
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
+	int			save_errno = errno;
 
-	if (ShutdownRequestPending)
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
 	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
+		InterruptPending = true;
+		ProcDiePending = true;
 	}
+
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+	
 }
 
+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+	return MyBackendType == B_WAL_RECEIVER;
+}
 
 /* Main entry point for walreceiver process */
 void
@@ -277,7 +272,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -456,7 +451,7 @@ WalReceiverMain(void)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -552,7 +547,7 @@ WalReceiverMain(void)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1a34bd3715..2ce24d8a9a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -59,6 +59,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3286,6 +3287,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (IsWalReceiver())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (IsBackgroundWorker)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb..a7684b7bdc 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -456,8 +456,8 @@ walrcv_clear_result(WalRcvExecResult *walres)
 }
 
 /* prototypes for functions in walreceiver.c */
+extern bool IsWalReceiver(void);
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-extern void ProcessWalRcvInterrupts(void);
 extern void WalRcvForceReply(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
#12Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Kyotaro Horiguchi (#9)
Re: Network failure may prevent promotion

On 23/01/2024 06:23, Kyotaro Horiguchi wrote:

At Mon, 22 Jan 2024 13:29:10 -0800, Andres Freund <andres@anarazel.de> wrote in

Hi,

On 2024-01-19 12:28:05 +0900, Michael Paquier wrote:

On Thu, Jan 18, 2024 at 03:42:28PM +0200, Heikki Linnakangas wrote:

Given that commit 728f86fec6 that introduced this issue was not strictly
required, perhaps we should just revert it for v16.

Is there a point in keeping 728f86fec6 as well on HEAD? That does not
strike me as wise to keep that in the tree for now. If it needs to be
reworked, looking at this problem from scratch would be a safer
approach.

IDK, I think we'll introduce this type of bug over and over if we don't fix it
properly.

Just to clarify my position, I thought that 728f86fec6 was heading the
right direction. Considering the current approach to signal handling
in walreceiver, I believed that it would be better to further
generalize in this direction rather than reverting. That's why I
proposed that patch.

I reverted commit 728f86fec6 from REL_16_STABLE and master.

I agree it was the right direction, so let's develop a complete patch,
and re-apply it to master when we have the patch ready.

--
Heikki Linnakangas
Neon (https://neon.tech)

#13Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Kyotaro Horiguchi (#11)
4 attachment(s)
Re: Network failure may prevent promotion

On 23/01/2024 10:24, Kyotaro Horiguchi wrote:

Thank you for looking this!

At Tue, 23 Jan 2024 15:07:10 +0900, Fujii Masao <masao.fujii@gmail.com> wrote in

Regarding the patch, here are the review comments.

+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+ return WalRcv != NULL;
+}

This looks wrong because WalRcv can be non-NULL in processes other
than walreceiver.

Mmm. Sorry for the silly mistake. We can use B_WAL_RECEIVER
instead. I'm not sure if the new function IsWalReceiver() is
required. The expression "MyBackendType == B_WAL_RECEIVER" is quite
descriptive. However, the function does make ProcessInterrupts() more
aligned regarding process types.

There's an existing AmWalReceiverProcess() macro too. Let's use that.

(See also
/messages/by-id/f3ecd4cb-85ee-4e54-8278-5fabfb3a4ed0@iki.fi
for refactoring in this area)

Here's a patch set summarizing the changes so far. They should be
squashed, but I kept them separate for now to help with review:

1. revert the revert of 728f86fec6.
2. your walrcv_shutdown_deblocking_v2-2.patch
3. Also replace libpqrcv_PQexec() and libpqrcv_PQgetResult() with the
wrappers from libpq-be-fe-helpers.h
4. Replace IsWalReceiver() with AmWalReceiverProcess()

- pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+ pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */

Can't we just use die(), instead?

There was a comment explaining the problems associated with exiting
within a signal handler;

- * Currently, only SIGTERM is of interest. We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock. Instead, the

And I think we should keep the considerations it suggests. The patch
removes the comment itself, but it does so because it implements our
standard process exit procedure, which incorporates points suggested
by the now-removed comment.

die() doesn't call exit(1). Unless DoingCommandRead is set, but it never
is in the walreceiver. It looks just like the new
WalRcvShutdownSignalHandler() function. Am I missing something?

Hmm, but doesn't bgworker_die() have that problem with exit(1)ing in the
signal handler?

I also wonder if we should replace SignalHandlerForShutdownRequest()
completely with die(), in all processes? The difference is that
SignalHandlerForShutdownRequest() uses ShutdownRequestPending, while
die() uses ProcDiePending && InterruptPending to indicate that the
signal was received. Or do some of the processes want to check for
ShutdownRequestPending only at specific places, and don't want to get
terminated at the any random CHECK_FOR_INTERRUPTS()?

--
Heikki Linnakangas
Neon (https://neon.tech)

Attachments:

v3-0001-Revert-Revert-libpqwalreceiver-Convert-to-libpq-b.patchtext/x-patch; charset=UTF-8; name=v3-0001-Revert-Revert-libpqwalreceiver-Convert-to-libpq-b.patchDownload
From 27b9f8283b2caa7a4243fe57a8d14a127396e80f Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:01:03 +0200
Subject: [PATCH v3 1/4] Revert "Revert "libpqwalreceiver: Convert to
 libpq-be-fe-helpers.h""

This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 55 +++----------------
 1 file changed, 8 insertions(+), 47 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 77669074e82..201c36cb220 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -132,7 +133,6 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 				 const char *appname, char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -188,56 +188,17 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	Assert(i < sizeof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-		goto bad_connection_errmsg;
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn =
+		libpqsrv_connect_params(keys, vals,
+								 /* expand_dbname = */ true,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
 
 	if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
 	{
-		PQfinish(conn->streamConn);
+		libpqsrv_disconnect(conn->streamConn);
 		pfree(conn);
 
 		ereport(ERROR,
@@ -273,7 +234,7 @@ bad_connection_errmsg:
 
 	/* error path, error already set */
 bad_connection:
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	pfree(conn);
 	return NULL;
 }
@@ -809,7 +770,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
-- 
2.39.2

v3-0002-Apply-walrcv_shutdown_deblocking_v2-2.patch.patchtext/x-patch; charset=UTF-8; name=v3-0002-Apply-walrcv_shutdown_deblocking_v2-2.patch.patchDownload
From 9295f868fa207d71f7eef620e64f3047e8f45e13 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:01:25 +0200
Subject: [PATCH v3 2/4] Apply walrcv_shutdown_deblocking_v2-2.patch

From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com
---
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +-
 src/backend/replication/walreceiver.c         | 53 +++++++++----------
 src/backend/tcop/postgres.c                   |  5 ++
 src/include/replication/walreceiver.h         |  2 +-
 4 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 201c36cb220..db779dc6ca6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -749,7 +749,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1053,7 +1053,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 728059518e1..e491f7d4c5e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
+static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
 void
-ProcessWalRcvInterrupts(void)
+WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 {
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
+	int			save_errno = errno;
 
-	if (ShutdownRequestPending)
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
 	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
+		InterruptPending = true;
+		ProcDiePending = true;
 	}
+
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+	
 }
 
+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+	return MyBackendType == B_WAL_RECEIVER;
+}
 
 /* Main entry point for walreceiver process */
 void
@@ -277,7 +272,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -456,7 +451,7 @@ WalReceiverMain(void)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -552,7 +547,7 @@ WalReceiverMain(void)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1a34bd3715f..2ce24d8a9a1 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -59,6 +59,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3286,6 +3287,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (IsWalReceiver())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (IsBackgroundWorker)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb8..a7684b7bdc8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -456,8 +456,8 @@ walrcv_clear_result(WalRcvExecResult *walres)
 }
 
 /* prototypes for functions in walreceiver.c */
+extern bool IsWalReceiver(void);
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-extern void ProcessWalRcvInterrupts(void);
 extern void WalRcvForceReply(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
-- 
2.39.2

v3-0003-Use-libpq-be-fe-helpers.h-wrappers-more.patchtext/x-patch; charset=UTF-8; name=v3-0003-Use-libpq-be-fe-helpers.h-wrappers-more.patchDownload
From 4a8fa778fe5ee1807b91d2587775b7a7ad250829 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:16:23 +0200
Subject: [PATCH v3 3/4] Use libpq-be-fe-helpers.h wrappers more

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 148 ++++--------------
 1 file changed, 31 insertions(+), 117 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index db779dc6ca6..c60a121093c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -102,8 +102,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -212,8 +210,9 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
-							  ALWAYS_SECURE_SEARCH_PATH_SQL);
+		res = libpqsrv_exec(conn->streamConn,
+							ALWAYS_SECURE_SEARCH_PATH_SQL,
+							WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
@@ -385,7 +384,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqsrv_exec(conn->streamConn,
+						"IDENTIFY_SYSTEM",
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -518,7 +519,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -548,7 +551,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PGresult   *res;
 
 	/*
-	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+	 * Send copy-end message.  As in libpqsrv_exec, this could theoretically
 	 * block, but the risk seems small.
 	 */
 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -568,7 +571,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -583,7 +587,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -597,7 +602,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -608,7 +614,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -633,7 +640,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -663,107 +672,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-	PGresult   *lastResult = NULL;
-
-	/*
-	 * PQexec() silently discards any prior query results on the connection.
-	 * This is not required for this function as it's expected that the caller
-	 * (which is this library in all cases) will behave correctly and we don't
-	 * have to be backwards compatible with old libpq.
-	 */
-
-	/*
-	 * Submit the query.  Since we don't use non-blocking mode, this could
-	 * theoretically block.  In practice, since we don't send very long query
-	 * strings, the risk seems negligible.
-	 */
-	if (!PQsendQuery(streamConn, query))
-		return NULL;
-
-	for (;;)
-	{
-		/* Wait for, and collect, the next PGresult. */
-		PGresult   *result;
-
-		result = libpqrcv_PQgetResult(streamConn);
-		if (result == NULL)
-			break;				/* query is complete, or failure */
-
-		/*
-		 * Emulate PQexec()'s behavior of returning the last result when there
-		 * are many.  We are fine with returning just last error message.
-		 */
-		PQclear(lastResult);
-		lastResult = result;
-
-		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
-			break;
-	}
-
-	return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-	/*
-	 * Collect data until PQgetResult is ready to get the result without
-	 * blocking.
-	 */
-	while (PQisBusy(streamConn))
-	{
-		int			rc;
-
-		/*
-		 * We don't need to break down the sleep into smaller increments,
-		 * since we'll get interrupted by signals and can handle any
-		 * interrupts here.
-		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
-		{
-			/* trouble; return NULL */
-			return NULL;
-		}
-	}
-
-	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
@@ -824,13 +732,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqsrv_get_result(conn->streamConn,
+									  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -972,7 +882,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1099,7 +1011,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqsrv_exec(conn->streamConn,
+						  query,
+						  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 	switch (PQresultStatus(pgres))
 	{
-- 
2.39.2

v3-0004-Use-existing-AmWalReceiverProcess-function.patchtext/x-patch; charset=UTF-8; name=v3-0004-Use-existing-AmWalReceiverProcess-function.patchDownload
From c0797200f5b84505bfab618166b8f9d576685678 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:19:07 +0200
Subject: [PATCH v3 4/4] Use existing AmWalReceiverProcess() function

---
 src/backend/replication/walreceiver.c | 9 ---------
 src/backend/tcop/postgres.c           | 2 +-
 2 files changed, 1 insertion(+), 10 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e491f7d4c5e..3bd633e75cb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -167,15 +167,6 @@ WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 	
 }
 
-/*
- * Is current process a wal receiver?
- */
-bool
-IsWalReceiver(void)
-{
-	return MyBackendType == B_WAL_RECEIVER;
-}
-
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 2ce24d8a9a1..5a4dc1977d3 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3287,7 +3287,7 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
-		else if (IsWalReceiver())
+		else if (AmWalReceiverProcess())
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("terminating walreceiver process due to administrator command")));
-- 
2.39.2

#14Fujii Masao
masao.fujii@gmail.com
In reply to: Heikki Linnakangas (#13)
Re: Network failure may prevent promotion

On Tue, Jan 23, 2024 at 6:43 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

There's an existing AmWalReceiverProcess() macro too. Let's use that.

+1

Hmm, but doesn't bgworker_die() have that problem with exit(1)ing in the
signal handler?

Yes, that's a problem. This issue was raised sometimes so far,
but has not been resolved yet.

I also wonder if we should replace SignalHandlerForShutdownRequest()
completely with die(), in all processes? The difference is that
SignalHandlerForShutdownRequest() uses ShutdownRequestPending, while
die() uses ProcDiePending && InterruptPending to indicate that the
signal was received. Or do some of the processes want to check for
ShutdownRequestPending only at specific places, and don't want to get
terminated at the any random CHECK_FOR_INTERRUPTS()?

For example, checkpointer seems to want to handle a shutdown request
only when no other checkpoint is in progress because initiating a shutdown
checkpoint while another checkpoint is running could lead to issues.

Also I just wonder if even walreceiver can exit safely at any random
CHECK_FOR_INTERRUPTS()...

Regards,

--
Fujii Masao

#15Fujii Masao
masao.fujii@gmail.com
In reply to: Fujii Masao (#14)
Re: Network failure may prevent promotion

On Wed, Jan 24, 2024 at 8:29 PM Fujii Masao <masao.fujii@gmail.com> wrote:

On Tue, Jan 23, 2024 at 6:43 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

There's an existing AmWalReceiverProcess() macro too. Let's use that.

+1

Hmm, but doesn't bgworker_die() have that problem with exit(1)ing in the
signal handler?

Yes, that's a problem. This issue was raised sometimes so far,
but has not been resolved yet.

I also wonder if we should replace SignalHandlerForShutdownRequest()
completely with die(), in all processes? The difference is that
SignalHandlerForShutdownRequest() uses ShutdownRequestPending, while
die() uses ProcDiePending && InterruptPending to indicate that the
signal was received. Or do some of the processes want to check for
ShutdownRequestPending only at specific places, and don't want to get
terminated at the any random CHECK_FOR_INTERRUPTS()?

For example, checkpointer seems to want to handle a shutdown request
only when no other checkpoint is in progress because initiating a shutdown
checkpoint while another checkpoint is running could lead to issues.

This my comment is not right... Sorry for noise.

Regards,

--
Fujii Masao

#16Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Heikki Linnakangas (#13)
5 attachment(s)
Re: Network failure may prevent promotion

Thank you fixing the issue.

At Tue, 23 Jan 2024 11:43:43 +0200, Heikki Linnakangas <hlinnaka@iki.fi> wrote i
n

There's an existing AmWalReceiverProcess() macro too. Let's use that.

Mmm. I sought an Is* function becuase "IsLogicalWorker()" is placed on
the previous line. Our convention regarding those functions (macros)
and variables seems inconsistent. However, I can't say for sure that
we should unify all of them.

(See also
/messages/by-id/f3ecd4cb-85ee-4e54-8278-5fabfb3a4ed0@iki.fi
for refactoring in this area)

Here's a patch set summarizing the changes so far. They should be
squashed, but I kept them separate for now to help with review:

1. revert the revert of 728f86fec6.
2. your walrcv_shutdown_deblocking_v2-2.patch
3. Also replace libpqrcv_PQexec() and libpqrcv_PQgetResult() with the
wrappers from libpq-be-fe-helpers.h

Both replacements look fine. I didn't find another instance of similar
code.

4. Replace IsWalReceiver() with AmWalReceiverProcess()

Just look fine.

- pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request
- shutdown */
+ pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown
*/

Can't we just use die(), instead?

There was a comment explaining the problems associated with exiting
within a signal handler;
- * Currently, only SIGTERM is of interest. We can't just exit(1) within
- * the
- * SIGTERM signal handler, because the signal might arrive in the middle
- * of
- * some critical operation, like while we're holding a spinlock.
- * Instead, the
And I think we should keep the considerations it suggests. The patch
removes the comment itself, but it does so because it implements our
standard process exit procedure, which incorporates points suggested
by the now-removed comment.

die() doesn't call exit(1). Unless DoingCommandRead is set, but it
never is in the walreceiver. It looks just like the new
WalRcvShutdownSignalHandler() function. Am I missing something?

Ugh.. Doesn't the name 'die()' suggest exit()?
I agree that die() can be used instad.

Hmm, but doesn't bgworker_die() have that problem with exit(1)ing in
the signal handler?

I noticed that but ignored for this time.

I also wonder if we should replace SignalHandlerForShutdownRequest()
completely with die(), in all processes? The difference is that
SignalHandlerForShutdownRequest() uses ShutdownRequestPending, while
die() uses ProcDiePending && InterruptPending to indicate that the
signal was received. Or do some of the processes want to check for
ShutdownRequestPending only at specific places, and don't want to get
terminated at the any random CHECK_FOR_INTERRUPTS()?

At least, pg_log_backend_memory_context(<chkpt_pid>) causes a call to
ProcessInterrupts via "ereport(LOG_SERVER_ONLY" which can lead to an
exit due to ProcDiePending. In this regard, checkpointer clearly
requires the distinction.

Rather than merely consolidating the notification variables and
striving to annihilate CFI calls in the execution path, I
believe we need a shutdown mechanism that CFI doesn't react
to. However, as for the method to achieve this, whether we should keep
the notification variables separate as they are now, or whether it
would be better to introduce a variable that causes CFI to ignore
ProcDiePending, is a matter I think is open to discussion.

Attached patches are the rebased version of v3 (0003 is updated) and
additional 0005 that makes use of die() instead of walreceiver's
custom function.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

Attachments:

v4-0001-Revert-Revert-libpqwalreceiver-Convert-to-libpq-b.patchtext/x-patch; charset=us-asciiDownload
From daac3aa06bd33f5e11118f04a406c8a59fd4cc97 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:01:03 +0200
Subject: [PATCH v4 1/5] Revert "Revert "libpqwalreceiver: Convert to
 libpq-be-fe-helpers.h""

This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 55 +++----------------
 1 file changed, 8 insertions(+), 47 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 2439733b55..dbee2f8f0e 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -136,7 +137,6 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 				 const char *appname, char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -192,56 +192,17 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	Assert(i < sizeof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-		goto bad_connection_errmsg;
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn =
+		libpqsrv_connect_params(keys, vals,
+								 /* expand_dbname = */ true,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
 
 	if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
 	{
-		PQfinish(conn->streamConn);
+		libpqsrv_disconnect(conn->streamConn);
 		pfree(conn);
 
 		ereport(ERROR,
@@ -277,7 +238,7 @@ bad_connection_errmsg:
 
 	/* error path, error already set */
 bad_connection:
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	pfree(conn);
 	return NULL;
 }
@@ -813,7 +774,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
-- 
2.39.3

v4-0002-Apply-walrcv_shutdown_deblocking_v2-2.patch.patchtext/x-patch; charset=us-asciiDownload
From 1d8f786f2b8185bde63bfb443152006b6aef9b0b Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:01:25 +0200
Subject: [PATCH v4 2/5] Apply walrcv_shutdown_deblocking_v2-2.patch

From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com
---
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +-
 src/backend/replication/walreceiver.c         | 53 +++++++++----------
 src/backend/tcop/postgres.c                   |  5 ++
 src/include/replication/walreceiver.h         |  2 +-
 4 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index dbee2f8f0e..41cf3dc853 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -753,7 +753,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1093,7 +1093,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e29a6196a3..110d96a4e6 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -147,39 +147,34 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
+static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
 void
-ProcessWalRcvInterrupts(void)
+WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 {
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
+	int			save_errno = errno;
 
-	if (ShutdownRequestPending)
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
 	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
+		InterruptPending = true;
+		ProcDiePending = true;
 	}
+
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+	
 }
 
+/*
+ * Is current process a wal receiver?
+ */
+bool
+IsWalReceiver(void)
+{
+	return MyBackendType == B_WAL_RECEIVER;
+}
 
 /* Main entry point for walreceiver process */
 void
@@ -277,7 +272,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -456,7 +451,7 @@ WalReceiverMain(void)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -552,7 +547,7 @@ WalReceiverMain(void)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -691,7 +686,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1a34bd3715..2ce24d8a9a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -59,6 +59,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3286,6 +3287,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (IsWalReceiver())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (IsBackgroundWorker)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f566a99ba1..916d7b84fe 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -470,8 +470,8 @@ walrcv_clear_result(WalRcvExecResult *walres)
 }
 
 /* prototypes for functions in walreceiver.c */
+extern bool IsWalReceiver(void);
 extern void WalReceiverMain(void) pg_attribute_noreturn();
-extern void ProcessWalRcvInterrupts(void);
 extern void WalRcvForceReply(void);
 
 /* prototypes for functions in walreceiverfuncs.c */
-- 
2.39.3

v4-0003-Use-libpq-be-fe-helpers.h-wrappers-more.patchtext/x-patch; charset=us-asciiDownload
From 648538720d982a981f4b8f7b118d59d292392fe3 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 29 Jan 2024 16:24:21 +0900
Subject: [PATCH v4 3/5] Use libpq-be-fe-helpers.h wrappers more

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 151 ++++--------------
 1 file changed, 33 insertions(+), 118 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 41cf3dc853..ae48dbafb5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -106,8 +106,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -216,8 +214,9 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
-							  ALWAYS_SECURE_SEARCH_PATH_SQL);
+		res = libpqsrv_exec(conn->streamConn,
+							ALWAYS_SECURE_SEARCH_PATH_SQL,
+							WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
@@ -389,7 +388,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqsrv_exec(conn->streamConn,
+						"IDENTIFY_SYSTEM",
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -522,7 +523,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -552,7 +555,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PGresult   *res;
 
 	/*
-	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+	 * Send copy-end message.  As in libpqsrv_exec, this could theoretically
 	 * block, but the risk seems small.
 	 */
 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -572,7 +575,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -587,7 +591,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -601,7 +606,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -612,7 +618,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -637,7 +644,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -667,107 +676,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-	PGresult   *lastResult = NULL;
-
-	/*
-	 * PQexec() silently discards any prior query results on the connection.
-	 * This is not required for this function as it's expected that the caller
-	 * (which is this library in all cases) will behave correctly and we don't
-	 * have to be backwards compatible with old libpq.
-	 */
-
-	/*
-	 * Submit the query.  Since we don't use non-blocking mode, this could
-	 * theoretically block.  In practice, since we don't send very long query
-	 * strings, the risk seems negligible.
-	 */
-	if (!PQsendQuery(streamConn, query))
-		return NULL;
-
-	for (;;)
-	{
-		/* Wait for, and collect, the next PGresult. */
-		PGresult   *result;
-
-		result = libpqrcv_PQgetResult(streamConn);
-		if (result == NULL)
-			break;				/* query is complete, or failure */
-
-		/*
-		 * Emulate PQexec()'s behavior of returning the last result when there
-		 * are many.  We are fine with returning just last error message.
-		 */
-		PQclear(lastResult);
-		lastResult = result;
-
-		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
-			break;
-	}
-
-	return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-	/*
-	 * Collect data until PQgetResult is ready to get the result without
-	 * blocking.
-	 */
-	while (PQisBusy(streamConn))
-	{
-		int			rc;
-
-		/*
-		 * We don't need to break down the sleep into smaller increments,
-		 * since we'll get interrupted by signals and can handle any
-		 * interrupts here.
-		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
-		{
-			/* trouble; return NULL */
-			return NULL;
-		}
-	}
-
-	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
@@ -828,13 +736,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqsrv_get_result(conn->streamConn,
+									  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -985,7 +895,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1026,7 +938,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
 					 quote_identifier(slotname),
 					 failover ? "true" : "false");
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn, cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1139,7 +1052,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqsrv_exec(conn->streamConn,
+						  query,
+						  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 	switch (PQresultStatus(pgres))
 	{
-- 
2.39.3

v4-0004-Use-existing-AmWalReceiverProcess-function.patchtext/x-patch; charset=us-asciiDownload
From 1062917551121021c6ad9d56273da5e483ea7c29 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jan 2024 11:19:07 +0200
Subject: [PATCH v4 4/5] Use existing AmWalReceiverProcess() function

---
 src/backend/replication/walreceiver.c | 9 ---------
 src/backend/tcop/postgres.c           | 2 +-
 2 files changed, 1 insertion(+), 10 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 110d96a4e6..abaf21a20c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -167,15 +167,6 @@ WalRcvShutdownSignalHandler(SIGNAL_ARGS)
 	
 }
 
-/*
- * Is current process a wal receiver?
- */
-bool
-IsWalReceiver(void)
-{
-	return MyBackendType == B_WAL_RECEIVER;
-}
-
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 2ce24d8a9a..5a4dc1977d 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3287,7 +3287,7 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
-		else if (IsWalReceiver())
+		else if (AmWalReceiverProcess())
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("terminating walreceiver process due to administrator command")));
-- 
2.39.3

v4-0005-Use-die-instead-of-WalRcvShutdownSignalHandler.patchtext/x-patch; charset=us-asciiDownload
From 68b3bbf30e33b9f79e605899b6540e619329a119 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Date: Mon, 29 Jan 2024 16:24:41 +0900
Subject: [PATCH v4 5/5] Use die() instead of WalRcvShutdownSignalHandler()

---
 src/backend/replication/walreceiver.c | 21 ++-------------------
 1 file changed, 2 insertions(+), 19 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index abaf21a20c..1e03d55062 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -73,6 +73,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
+#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -147,25 +148,7 @@ static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
-static void WalRcvShutdownSignalHandler(SIGNAL_ARGS);
 
-void
-WalRcvShutdownSignalHandler(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	/* Don't joggle the elbow of proc_exit */
-	if (!proc_exit_inprogress)
-	{
-		InterruptPending = true;
-		ProcDiePending = true;
-	}
-
-	SetLatch(MyLatch);
-
-	errno = save_errno;
-	
-}
 
 /* Main entry point for walreceiver process */
 void
@@ -263,7 +246,7 @@ WalReceiverMain(void)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, WalRcvShutdownSignalHandler); /* request shutdown */
+	pqsignal(SIGTERM, die); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
-- 
2.39.3

#17Robert Haas
robertmhaas@gmail.com
In reply to: Kyotaro Horiguchi (#16)
Re: Network failure may prevent promotion

On Mon, Jan 29, 2024 at 2:32 AM Kyotaro Horiguchi
<horikyota.ntt@gmail.com> wrote:

[ new patch set ]

Hi,

I think it would be helpful to make it more clear exactly what's going
on here. It looks 0001 is intended to revert
21ef4d4d897563adb2f7920ad53b734950f1e0a4, which was itself a revert of
728f86fec65537eade8d9e751961782ddb527934, and then I guess the
remaining patches are to fix up issues created by that commit, but the
commit messages aren't meaningful so it's hard to understand what is
being fixed.

I think it would also be useful to clarify whether this is imagined to
be for master only, or something to be back-patched. In addition to
mentioning that here, it would be good to add that information to the
target version field of https://commitfest.postgresql.org/48/4748/

--
Robert Haas
EDB: http://www.enterprisedb.com

#18Yura Sokolov
y.sokolov@postgrespro.ru
In reply to: Robert Haas (#17)
3 attachment(s)
Re: Network failure may prevent promotion

I've just rebased patches and merged last two fix-commits (0003 and 0004)
into 0002.

--
regards
Yura Sokolov aka funny-falcon

Attachments:

v5-0001-Reapply-libpqwalreceiver-Convert-to-libpq-be-fe-h.patchtext/x-patch; charset=UTF-8; name=v5-0001-Reapply-libpqwalreceiver-Convert-to-libpq-be-fe-h.patchDownload
From f8923c9b8e2f470dd3caaa1e71fb3b931389148b Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 21 Mar 2025 16:03:30 +0300
Subject: [PATCH v5 1/3] Reapply "libpqwalreceiver: Convert to
 libpq-be-fe-helpers.h"

This reverts commit 21ef4d4d897563adb2f7920ad53b734950f1e0a4.
---
 .../libpqwalreceiver/libpqwalreceiver.c       | 55 +++----------------
 1 file changed, 8 insertions(+), 47 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index c650935ef5d..5755ab2f072 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -25,6 +25,7 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -145,7 +146,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
 				 bool must_use_password, const char *appname, char **err)
 {
 	WalReceiverConn *conn;
-	PostgresPollingStatusType status;
 	const char *keys[6];
 	const char *vals[6];
 	int			i = 0;
@@ -211,56 +211,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
 	Assert(i < lengthof(keys));
 
 	conn = palloc0(sizeof(WalReceiverConn));
-	conn->streamConn = PQconnectStartParams(keys, vals,
-											 /* expand_dbname = */ true);
-	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-		goto bad_connection_errmsg;
-
-	/*
-	 * Poll connection until we have OK or FAILED status.
-	 *
-	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
-	 */
-	status = PGRES_POLLING_WRITING;
-	do
-	{
-		int			io_flag;
-		int			rc;
-
-		if (status == PGRES_POLLING_READING)
-			io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-		/* Windows needs a different test while waiting for connection-made */
-		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-			io_flag = WL_SOCKET_CONNECTED;
-#endif
-		else
-			io_flag = WL_SOCKET_WRITEABLE;
-
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
-		}
-
-		/* If socket is ready, advance the libpq state machine */
-		if (rc & io_flag)
-			status = PQconnectPoll(conn->streamConn);
-	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+	conn->streamConn =
+		libpqsrv_connect_params(keys, vals,
+								 /* expand_dbname = */ true,
+								WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
 		goto bad_connection_errmsg;
 
 	if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
 	{
-		PQfinish(conn->streamConn);
+		libpqsrv_disconnect(conn->streamConn);
 		pfree(conn);
 
 		ereport(ERROR,
@@ -300,7 +261,7 @@ bad_connection_errmsg:
 
 	/* error path, error already set */
 bad_connection:
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	pfree(conn);
 	return NULL;
 }
@@ -880,7 +841,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-	PQfinish(conn->streamConn);
+	libpqsrv_disconnect(conn->streamConn);
 	PQfreemem(conn->recvBuf);
 	pfree(conn);
 }
-- 
2.43.0

v5-0002-Apply-walrcv_shutdown_deblocking_v2-2.patch.patchtext/x-patch; charset=UTF-8; name=v5-0002-Apply-walrcv_shutdown_deblocking_v2-2.patch.patchDownload
From e0e7b14b9c4f3987224b2d0e8d3cb4be6e02003d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 21 Mar 2025 16:13:38 +0300
Subject: [PATCH v5 2/3] Apply walrcv_shutdown_deblocking_v2-2.patch

From https://www.postgresql.org/message-id/20240123.172410.1596193222420636986.horikyota.ntt%40gmail.com
Plus couple of fixes from review:
- use of AmWalReceiverProcess
- use of die
---
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +-
 src/backend/replication/walreceiver.c         | 41 +++----------------
 src/backend/tcop/postgres.c                   |  5 +++
 3 files changed, 12 insertions(+), 38 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5755ab2f072..d51624ef3da 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -820,7 +820,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
-			ProcessWalRcvInterrupts();
+			CHECK_FOR_INTERRUPTS();
 		}
 
 		/* Consume whatever data is available from the socket */
@@ -1172,7 +1172,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
 	{
 		char	   *cstrs[MaxTupleAttributeNumber];
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		/* Do the allocations in temporary context. */
 		oldcontext = MemoryContextSwitchTo(rowcontext);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2e5dd6deb2c..b51a6d06b21 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -71,6 +71,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
+#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
@@ -145,38 +146,6 @@ static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
-void
-ProcessWalRcvInterrupts(void)
-{
-	/*
-	 * Although walreceiver interrupt handling doesn't use the same scheme as
-	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-	 * any incoming signals on Win32, and also to make sure we process any
-	 * barrier events.
-	 */
-	CHECK_FOR_INTERRUPTS();
-
-	if (ShutdownRequestPending)
-	{
-		ereport(FATAL,
-				(errcode(ERRCODE_ADMIN_SHUTDOWN),
-				 errmsg("terminating walreceiver process due to administrator command")));
-	}
-}
-
 
 /* Main entry point for walreceiver process */
 void
@@ -280,7 +249,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
 													 * file */
 	pqsignal(SIGINT, SIG_IGN);
-	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+	pqsignal(SIGTERM, die); /* request shutdown */
 	/* SIGQUIT handler was already set up by InitPostmasterChild */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
@@ -459,7 +428,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
 
 				/* Process any requests or signals received recently */
-				ProcessWalRcvInterrupts();
+				CHECK_FOR_INTERRUPTS();
 
 				if (ConfigReloadPending)
 				{
@@ -555,7 +524,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 				if (rc & WL_LATCH_SET)
 				{
 					ResetLatch(MyLatch);
-					ProcessWalRcvInterrupts();
+					CHECK_FOR_INTERRUPTS();
 
 					if (walrcv->force_reply)
 					{
@@ -704,7 +673,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 	{
 		ResetLatch(MyLatch);
 
-		ProcessWalRcvInterrupts();
+		CHECK_FOR_INTERRUPTS();
 
 		SpinLockAcquire(&walrcv->mutex);
 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 0554a4ae3c7..c7d3b25d3f2 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -58,6 +58,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
@@ -3311,6 +3312,10 @@ ProcessInterrupts(void)
 			 */
 			proc_exit(1);
 		}
+		else if (AmWalReceiverProcess())
+			ereport(FATAL,
+					(errcode(ERRCODE_ADMIN_SHUTDOWN),
+					 errmsg("terminating walreceiver process due to administrator command")));
 		else if (AmBackgroundWorkerProcess())
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
-- 
2.43.0

v5-0003-Use-libpq-be-fe-helpers.h-wrappers-more.patchtext/x-patch; charset=UTF-8; name=v5-0003-Use-libpq-be-fe-helpers.h-wrappers-more.patchDownload
From 51e7c0cf4d3312df227f57b5bf61be7ce1d5f155 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Fri, 21 Mar 2025 16:15:05 +0300
Subject: [PATCH v5 3/3] Use libpq-be-fe-helpers.h wrappers more

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 151 ++++--------------
 1 file changed, 33 insertions(+), 118 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index d51624ef3da..be6fbe41705 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -111,8 +111,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
@@ -239,8 +237,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQexec(conn->streamConn,
-							  ALWAYS_SECURE_SEARCH_PATH_SQL);
+		res = libpqsrv_exec(conn->streamConn,
+							ALWAYS_SECURE_SEARCH_PATH_SQL,
+							WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			PQclear(res);
@@ -412,7 +411,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
 	 * Get the system identifier and timeline ID as a DataRow message from the
 	 * primary server.
 	 */
-	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+	res = libpqsrv_exec(conn->streamConn,
+						"IDENTIFY_SYSTEM",
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -589,7 +590,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 						 options->proto.physical.startpointTLI);
 
 	/* Start streaming. */
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -619,7 +622,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PGresult   *res;
 
 	/*
-	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+	 * Send copy-end message.  As in libpqsrv_exec, this could theoretically
 	 * block, but the risk seems small.
 	 */
 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
@@ -639,7 +642,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
 	 * also possible in case we aborted the copy in mid-stream.
 	 */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
 	{
 		/*
@@ -654,7 +658,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 		PQclear(res);
 
 		/* the result set should be followed by CommandComplete */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -668,7 +673,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 							pchomp(PQerrorMessage(conn->streamConn)))));
 
 		/* CommandComplete should follow */
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	}
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -679,7 +685,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
 	PQclear(res);
 
 	/* Verify that there are no more results */
-	res = libpqrcv_PQgetResult(conn->streamConn);
+	res = libpqsrv_get_result(conn->streamConn,
+							  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (res != NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -704,7 +711,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	 * Request the primary to send over the history file for given timeline.
 	 */
 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-	res = libpqrcv_PQexec(conn->streamConn, cmd);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		PQclear(res);
@@ -734,107 +743,6 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 	PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-	PGresult   *lastResult = NULL;
-
-	/*
-	 * PQexec() silently discards any prior query results on the connection.
-	 * This is not required for this function as it's expected that the caller
-	 * (which is this library in all cases) will behave correctly and we don't
-	 * have to be backwards compatible with old libpq.
-	 */
-
-	/*
-	 * Submit the query.  Since we don't use non-blocking mode, this could
-	 * theoretically block.  In practice, since we don't send very long query
-	 * strings, the risk seems negligible.
-	 */
-	if (!PQsendQuery(streamConn, query))
-		return NULL;
-
-	for (;;)
-	{
-		/* Wait for, and collect, the next PGresult. */
-		PGresult   *result;
-
-		result = libpqrcv_PQgetResult(streamConn);
-		if (result == NULL)
-			break;				/* query is complete, or failure */
-
-		/*
-		 * Emulate PQexec()'s behavior of returning the last result when there
-		 * are many.  We are fine with returning just last error message.
-		 */
-		PQclear(lastResult);
-		lastResult = result;
-
-		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-			PQstatus(streamConn) == CONNECTION_BAD)
-			break;
-	}
-
-	return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-	/*
-	 * Collect data until PQgetResult is ready to get the result without
-	 * blocking.
-	 */
-	while (PQisBusy(streamConn))
-	{
-		int			rc;
-
-		/*
-		 * We don't need to break down the sleep into smaller increments,
-		 * since we'll get interrupted by signals and can handle any
-		 * interrupts here.
-		 */
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-							   WL_LATCH_SET,
-							   PQsocket(streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-		/* Interrupted? */
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Consume whatever data is available from the socket */
-		if (PQconsumeInput(streamConn) == 0)
-		{
-			/* trouble; return NULL */
-			return NULL;
-		}
-	}
-
-	/* Now we can collect and return the next PGresult */
-	return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
@@ -895,13 +803,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
 	{
 		PGresult   *res;
 
-		res = libpqrcv_PQgetResult(conn->streamConn);
+		res = libpqsrv_get_result(conn->streamConn,
+								  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
 		{
 			PQclear(res);
 
 			/* Verify that there are no more results. */
-			res = libpqrcv_PQgetResult(conn->streamConn);
+			res = libpqsrv_get_result(conn->streamConn,
+									  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 			if (res != NULL)
 			{
 				PQclear(res);
@@ -1052,7 +962,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 			appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
 	}
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn,
+						cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -1105,7 +1017,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
 
 	appendStringInfoString(&cmd, " );");
 
-	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	res = libpqsrv_exec(conn->streamConn, cmd.data,
+						WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 	pfree(cmd.data);
 
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1218,7 +1131,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("the query interface requires a database connection")));
 
-	pgres = libpqrcv_PQexec(conn->streamConn, query);
+	pgres = libpqsrv_exec(conn->streamConn,
+						  query,
+						  WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
 	switch (PQresultStatus(pgres))
 	{
-- 
2.43.0

#19Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Yura Sokolov (#18)
Re: Network failure may prevent promotion

On 21/03/2025 15:24, Yura Sokolov wrote:

I've just rebased patches and merged last two fix-commits (0003 and 0004)
into 0002.

Thanks for reviving this! I looked over this again and committed.

On May 14, 2024 at 14:16 Robert Haas <robertmhaas@gmail.com> wrote:

I think it would be helpful to make it more clear exactly what's going
on here. It looks 0001 is intended to revert
21ef4d4d897563adb2f7920ad53b734950f1e0a4, which was itself a revert of
728f86fec65537eade8d9e751961782ddb527934, and then I guess the
remaining patches are to fix up issues created by that commit, but the
commit messages aren't meaningful so it's hard to understand what is
being fixed.

Yes, that's exactly what's going on here :-). I squashed the patches
together and explained that in the commit message.

I think it would also be useful to clarify whether this is imagined to
be for master only, or something to be back-patched. In addition to
mentioning that here, it would be good to add that information to the
target version field of https://commitfest.postgresql.org/48/4748/

A lot of time and v17 release passed since you wrote this, but for sake
of completeness: This is for v18 only. It's not a bug fix. There was an
bug with the original commit 728f86fec6, which was resolved by reverting
it in v16 and v17. This commit brings it back in v17, hopefully bug-free
this time.

--
Heikki Linnakangas
Neon (https://neon.tech)