From dbdc0bc8f448cc6a560fd615c6c37ed8059e4343 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Thu, 22 Jan 2026 11:51:26 +0800 Subject: [PATCH v6 2/2] Add WALRCV_CONNECTING state to walreceiver Previously, walreceiver set its state to WALRCV_STREAMING immediately at startup, before actually establishing a replication connection. This was misleading for monitoring, as pg_stat_wal_receiver would show "streaming" even while the connection was still being established. Introduce WALRCV_CONNECTING state to accurately reflect the period between walreceiver startup and successful START_REPLICATION. The transition to WALRCV_STREAMING now occurs only after walrcv_startstreaming() returns successfully. Update pg_stat_wal_receiver documentation to describe the new state as well. --- doc/src/sgml/monitoring.sgml | 6 ++++++ src/backend/replication/walreceiver.c | 16 +++++++++++++--- src/backend/replication/walreceiverfuncs.c | 3 ++- src/include/replication/walreceiver.h | 2 ++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index e36d330fe4f..ad6b819b6e1 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1754,6 +1754,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage but is not yet initialized. + + + connecting: WAL receiver is connecting to the + primary, replication has not yet started. + + streaming: WAL receiver is streaming WAL data. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a41453530a1..92e54e52e95 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) /* The usual case */ break; + case WALRCV_CONNECTING: case WALRCV_WAITING: case WALRCV_STREAMING: case WALRCV_RESTARTING: @@ -215,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -395,6 +396,12 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) LSN_FORMAT_ARGS(startpoint), startpointTLI)); first_stream = false; + /* Connection established, switch to streaming state */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_CONNECTING); + walrcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&walrcv->mutex); + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); @@ -650,7 +657,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; - if (state != WALRCV_STREAMING) + if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING) { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) @@ -689,7 +696,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); break; } @@ -792,6 +799,7 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_CONNECTING || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -1391,6 +1399,8 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index da8794cba7c..42e3e170bc0 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,7 +179,7 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || - state == WALRCV_RESTARTING) + state == WALRCV_CONNECTING || state == WALRCV_RESTARTING) return true; else return false; @@ -211,6 +211,7 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f3ad00fb6f3..872deb00633 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,8 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connecting to primary, replication not yet + * started */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */ -- 2.51.0