From dbdc0bc8f448cc6a560fd615c6c37ed8059e4343 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Thu, 22 Jan 2026 11:51:26 +0800
Subject: [PATCH v6 2/2] Add WALRCV_CONNECTING state to walreceiver
Previously, walreceiver set its state to WALRCV_STREAMING immediately
at startup, before actually establishing a replication connection. This
was misleading for monitoring, as pg_stat_wal_receiver would show
"streaming" even while the connection was still being established.
Introduce WALRCV_CONNECTING state to accurately reflect the period
between walreceiver startup and successful START_REPLICATION. The
transition to WALRCV_STREAMING now occurs only after
walrcv_startstreaming() returns successfully.
Update pg_stat_wal_receiver documentation to describe the new state as well.
---
doc/src/sgml/monitoring.sgml | 6 ++++++
src/backend/replication/walreceiver.c | 16 +++++++++++++---
src/backend/replication/walreceiverfuncs.c | 3 ++-
src/include/replication/walreceiver.h | 2 ++
4 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index e36d330fe4f..ad6b819b6e1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1754,6 +1754,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage
but is not yet initialized.
+
+
+ connecting: WAL receiver is connecting to the
+ primary, replication has not yet started.
+
+
streaming: WAL receiver is streaming WAL data.
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a41453530a1..92e54e52e95 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
/* The usual case */
break;
+ case WALRCV_CONNECTING:
case WALRCV_WAITING:
case WALRCV_STREAMING:
case WALRCV_RESTARTING:
@@ -215,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
- walrcv->walRcvState = WALRCV_STREAMING;
+ walrcv->walRcvState = WALRCV_CONNECTING;
/* Fetch information required to start streaming */
walrcv->ready_to_display = false;
@@ -395,6 +396,12 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
LSN_FORMAT_ARGS(startpoint), startpointTLI));
first_stream = false;
+ /* Connection established, switch to streaming state */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->walRcvState == WALRCV_CONNECTING);
+ walrcv->walRcvState = WALRCV_STREAMING;
+ SpinLockRelease(&walrcv->mutex);
+
/* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
@@ -650,7 +657,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
- if (state != WALRCV_STREAMING)
+ if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
{
SpinLockRelease(&walrcv->mutex);
if (state == WALRCV_STOPPING)
@@ -689,7 +696,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
*/
*startpoint = walrcv->receiveStart;
*startpointTLI = walrcv->receiveStartTLI;
- walrcv->walRcvState = WALRCV_STREAMING;
+ walrcv->walRcvState = WALRCV_CONNECTING;
SpinLockRelease(&walrcv->mutex);
break;
}
@@ -792,6 +799,7 @@ WalRcvDie(int code, Datum arg)
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+ walrcv->walRcvState == WALRCV_CONNECTING ||
walrcv->walRcvState == WALRCV_RESTARTING ||
walrcv->walRcvState == WALRCV_STARTING ||
walrcv->walRcvState == WALRCV_WAITING ||
@@ -1391,6 +1399,8 @@ WalRcvGetStateString(WalRcvState state)
return "stopped";
case WALRCV_STARTING:
return "starting";
+ case WALRCV_CONNECTING:
+ return "connecting";
case WALRCV_STREAMING:
return "streaming";
case WALRCV_WAITING:
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index da8794cba7c..42e3e170bc0 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -179,7 +179,7 @@ WalRcvStreaming(void)
}
if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
- state == WALRCV_RESTARTING)
+ state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
return true;
else
return false;
@@ -211,6 +211,7 @@ ShutdownWalRcv(void)
stopped = true;
break;
+ case WALRCV_CONNECTING:
case WALRCV_STREAMING:
case WALRCV_WAITING:
case WALRCV_RESTARTING:
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f3ad00fb6f3..872deb00633 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -47,6 +47,8 @@ typedef enum
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
+ WALRCV_CONNECTING, /* connecting to primary, replication not yet
+ * started */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */
--
2.51.0