More protocol.h replacements this time into walsender.c
Greetings,
Patch attached
Dave Cramer
Attachments:
0001-replace-protocol-constants-with-named-constants-from.patchapplication/octet-stream; name=0001-replace-protocol-constants-with-named-constants-from.patchDownload
From a8f7e338476564a8e605ca9273edb75c2a32cb67 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Tue, 22 Jul 2025 17:48:39 -0400
Subject: [PATCH] replace protocol constants with named constants from
protocol.h
---
src/backend/replication/walsender.c | 25 +++++++++++++------------
1 file changed, 13 insertions(+), 12 deletions(-)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 28b8591efa5..26502477f70 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -65,6 +65,7 @@
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
+#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
@@ -733,13 +734,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
- case 'c': /* CopyDone */
- case 'f': /* CopyFail */
- case 'H': /* Flush */
- case 'S': /* Sync */
+ case PqMsg_CopyDone:
+ case PqMsg_CopyFail:
+ case PqMsg_Flush:
+ case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -761,19 +762,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
/* Process the message */
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
AppendIncrementalManifestData(ib, buf->data, buf->len);
return true;
- case 'c': /* CopyDone */
+ case PqMsg_CopyDone:
return false;
- case 'H': /* Sync */
- case 'S': /* Flush */
+ case PqMsg_Sync:
+ case PqMsg_Flush:
/* Ignore these while in CopyOut mode as we do elsewhere. */
return true;
- case 'f':
+ case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
@@ -2303,7 +2304,7 @@ ProcessRepliesIfAny(void)
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
}
@@ -3246,7 +3247,7 @@ XLogSendPhysical(void)
wal_segment_close(xlogreader);
/* Send CopyDone */
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
--
2.39.5 (Apple Git-154)
On Tue, Jul 22, 2025 at 05:54:48PM -0400, Dave Cramer wrote:
Patch attached
Thanks. I plan to look into committing this tomorrow.
--
nathan
Committed. I noticed that there are several characters with no match in
protocol.h. It might be worth adding those.
In walsender.c:
1537: pq_sendbyte(ctx->out, 'w');
2353: case 'r':
2357: case 'h':
2361: case 'p':
2755: pq_sendbyte(&output_message, 's');
3367: pq_sendbyte(&output_message, 'w');
4138: pq_sendbyte(&output_message, 'k');
In walreceiver.c:
829: case 'w': /* WAL records */
853: case 'k': /* Keepalive */
1133: pq_sendbyte(&reply_message, 'r');
1237: pq_sendbyte(&reply_message, 'h');
In logical/worker.c:
3854: if (c == 'w')
3876: else if (c == 'k')
3895: else if (c == 's') /* Primary status update */
4127: pq_sendbyte(reply_message, 'r');
4298: pq_sendbyte(request_message, 'p');
--
nathan
On Wed, 23 Jul 2025 at 11:40, Nathan Bossart <nathandbossart@gmail.com>
wrote:
Committed. I noticed that there are several characters with no match in
protocol.h. It might be worth adding those.In walsender.c:
1537: pq_sendbyte(ctx->out, 'w');
2353: case 'r':
2357: case 'h':
2361: case 'p':
2755: pq_sendbyte(&output_message, 's');
3367: pq_sendbyte(&output_message, 'w');
4138: pq_sendbyte(&output_message, 'k');In walreceiver.c:
829: case 'w': /* WAL
records */
853: case 'k': /*
Keepalive */
1133: pq_sendbyte(&reply_message, 'r');
1237: pq_sendbyte(&reply_message, 'h');In logical/worker.c:
3854: if (c == 'w')
3876: else if (c == 'k')
3895: else if (c == 's') /* Primary
status update */
4127: pq_sendbyte(reply_message, 'r');
4298: pq_sendbyte(request_message, 'p');
Interesting, yes I will add those
Thanks!
Dave
Show quoted text
--
nathan
On Thu, 24 Jul 2025 at 05:34, Dave Cramer <davecramer@gmail.com> wrote:
On Wed, 23 Jul 2025 at 11:40, Nathan Bossart <nathandbossart@gmail.com>
wrote:Committed. I noticed that there are several characters with no match in
protocol.h. It might be worth adding those.In walsender.c:
1537: pq_sendbyte(ctx->out, 'w');
2353: case 'r':
2357: case 'h':
2361: case 'p':
2755: pq_sendbyte(&output_message, 's');
3367: pq_sendbyte(&output_message, 'w');
4138: pq_sendbyte(&output_message, 'k');In walreceiver.c:
829: case 'w': /* WAL
records */
853: case 'k': /*
Keepalive */
1133: pq_sendbyte(&reply_message, 'r');
1237: pq_sendbyte(&reply_message, 'h');In logical/worker.c:
3854: if (c == 'w')
3876: else if (c == 'k')
3895: else if (c == 's') /*
Primary status update */
4127: pq_sendbyte(reply_message, 'r');
4298: pq_sendbyte(request_message, 'p');Interesting, yes I will add those
Patch attached
Dave Cramer
Show quoted text
Attachments:
0001-replace-protocol-replication-constants-with-named-co.patchapplication/octet-stream; name=0001-replace-protocol-replication-constants-with-named-co.patchDownload
From 936e6b7ec334b57d3b1482729fee3189afdfddad Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Thu, 24 Jul 2025 14:38:03 -0400
Subject: [PATCH] replace protocol replication constants with named constants
from protocol.h
---
src/backend/replication/logical/worker.c | 10 +++++-----
src/backend/replication/walreceiver.c | 4 ++--
src/backend/replication/walsender.c | 18 +++++++++---------
src/include/libpq/protocol.h | 18 ++++++++++++++++++
4 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b59221c4d06..46d69da88d5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3851,7 +3851,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
- if (c == 'w')
+ if (c == PqMsg_XlogData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -3873,7 +3873,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
- else if (c == 'k')
+ else if (c == PqMsg_PrimaryKeepAlive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -3892,7 +3892,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
- else if (c == 's') /* Primary status update */
+ else if (c == PqMsg_PrimaryStatusUpdate) /* Primary status update */
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4124,7 +4124,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
- pq_sendbyte(reply_message, 'r');
+ pq_sendbyte(reply_message, PqMsg_StandbyStatus);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4295,7 +4295,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
- pq_sendbyte(request_message, 'p');
+ pq_sendbyte(request_message, PqMsg_RequestPrimaryStatus);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b6281101711..a138e9c3556 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'r');
+ pq_sendbyte(&reply_message, PqMsg_StandbyStatus);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'h');
+ pq_sendbyte(&reply_message, PqMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ee911394a23..e00bc74e68f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
- pq_sendbyte(ctx->out, 'w');
+ pq_sendbyte(ctx->out, PqMsg_XlogData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2292,7 +2292,7 @@ ProcessRepliesIfAny(void)
switch (firstchar)
{
/*
- * 'd' means a standby reply wrapped in a CopyData packet.
+ * standby reply wrapped in a CopyData packet.
*/
case PqMsg_CopyData:
ProcessStandbyMessage();
@@ -2315,7 +2315,7 @@ ProcessRepliesIfAny(void)
break;
/*
- * 'X' means that the standby is closing down the socket.
+ * the standby is closing down the socket.
*/
case PqMsg_Terminate:
proc_exit(0);
@@ -2350,15 +2350,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
- case 'r':
+ case PqMsg_StandbyStatus:
ProcessStandbyReplyMessage();
break;
- case 'h':
+ case PqMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
- case 'p':
+ case PqMsg_RequestPrimaryStatus:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2752,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 's');
+ pq_sendbyte(&output_message, PqMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3364,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'w');
+ pq_sendbyte(&output_message, PqMsg_XlogData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4135,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'k');
+ pq_sendbyte(&output_message, PqMsg_PrimaryKeepAlive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index b0bcb3cdc26..06653a7c2c6 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -68,6 +68,24 @@
/* These are the codes sent by parallel workers to leader processes. */
#define PqMsg_Progress 'P'
+/* Replication Protocol sent by the primary */
+
+#define PqMsg_XlogData 'w'
+#define PqMsg_PrimaryKeepAlive 'k'
+#define PqMsg_PrimaryStatusUpdate 's'
+
+
+/* Replication Protocol sent by the standby */
+
+#define PqMsg_StandbyStatus 'r'
+#define PqMsg_HotStandbyFeedback 'h'
+#define PqMsg_RequestPrimaryStatus 'p'
+
+/* These are the codes sent by the frontend and backend. */
+
+#define PqMsg_PasswordMessage 'p'
+
+/* These are the codes sent by the frontend and backend. */
/* These are the authentication request codes sent by the backend. */
--
2.39.5 (Apple Git-154)
Hello,
Since this is a whole new symbol, I'd rather you use the term WAL rather than Xlog ...
--
Álvaro Herrera
On Thu, Jul 24, 2025 at 12:04 PM Dave Cramer <davecramer@gmail.com> wrote:
Patch attached
+/* Replication Protocol sent by the primary */
+
+#define PqMsg_XlogData 'w'
+#define PqMsg_PrimaryKeepAlive 'k'
+#define PqMsg_PrimaryStatusUpdate 's'
+
+
+/* Replication Protocol sent by the standby */
+
+#define PqMsg_StandbyStatus 'r'
+#define PqMsg_HotStandbyFeedback 'h'
+#define PqMsg_RequestPrimaryStatus 'p'
Since these are part of the replication subprotocol (i.e. tunneled,
via CopyData) rather than the top-level wire protocol, do they deserve
their own prefix? PqReplMsg_* maybe?
+/* These are the codes sent by the frontend and backend. */
+
+#define PqMsg_PasswordMessage 'p'
+
+/* These are the codes sent by the frontend and backend. */
Is this change intended?
--Jacob
On Thu, 24 Jul 2025 at 17:05, Jacob Champion <
jacob.champion@enterprisedb.com> wrote:
On Thu, Jul 24, 2025 at 12:04 PM Dave Cramer <davecramer@gmail.com> wrote:
Patch attached
+/* Replication Protocol sent by the primary */ + +#define PqMsg_XlogData 'w' +#define PqMsg_PrimaryKeepAlive 'k' +#define PqMsg_PrimaryStatusUpdate 's' + + +/* Replication Protocol sent by the standby */ + +#define PqMsg_StandbyStatus 'r' +#define PqMsg_HotStandbyFeedback 'h' +#define PqMsg_RequestPrimaryStatus 'p'Since these are part of the replication subprotocol (i.e. tunneled,
via CopyData) rather than the top-level wire protocol, do they deserve
their own prefix? PqReplMsg_* maybe?
I'm going to wait to see if there are any other opinions. Last time I did
this there were quite a few opinions before finally settling on the naming
+/* These are the codes sent by the frontend and backend. */ + +#define PqMsg_PasswordMessage 'p' + +/* These are the codes sent by the frontend and backend. */Is this change intended?
It was as it lines up with the others at least in my editor.
I'm not married to it.
Dave
On Thu, 24 Jul 2025 at 16:49, Álvaro Herrera <alvherre@kurilemu.de> wrote:
Hello,
Since this is a whole new symbol, I'd rather you use the term WAL rather
than Xlog ...
Fair enough
Dave
On 2025-Jul-24, Dave Cramer wrote:
On Thu, 24 Jul 2025 at 17:05, Jacob Champion <
jacob.champion@enterprisedb.com> wrote:On Thu, Jul 24, 2025 at 12:04 PM Dave Cramer <davecramer@gmail.com> wrote:
+/* Replication Protocol sent by the primary */ + +#define PqMsg_XlogData 'w' +#define PqMsg_PrimaryKeepAlive 'k' +#define PqMsg_PrimaryStatusUpdate 's' + + +/* Replication Protocol sent by the standby */ + +#define PqMsg_StandbyStatus 'r' +#define PqMsg_HotStandbyFeedback 'h' +#define PqMsg_RequestPrimaryStatus 'p'Since these are part of the replication subprotocol (i.e. tunneled,
via CopyData) rather than the top-level wire protocol, do they deserve
their own prefix? PqReplMsg_* maybe?I'm going to wait to see if there are any other opinions. Last time I did
this there were quite a few opinions before finally settling on the naming
Count me in.
--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
"The Postgresql hackers have what I call a "NASA space shot" mentality.
Quite refreshing in a world of "weekend drag racer" developers."
(Scott Marlowe)
Dave Cramer
On Fri, 25 Jul 2025 at 04:11, Álvaro Herrera <alvherre@kurilemu.de> wrote:
On 2025-Jul-24, Dave Cramer wrote:
On Thu, 24 Jul 2025 at 17:05, Jacob Champion <
jacob.champion@enterprisedb.com> wrote:On Thu, Jul 24, 2025 at 12:04 PM Dave Cramer <davecramer@gmail.com>
wrote:
+/* Replication Protocol sent by the primary */ + +#define PqMsg_XlogData 'w' +#define PqMsg_PrimaryKeepAlive 'k' +#define PqMsg_PrimaryStatusUpdate 's' + + +/* Replication Protocol sent by the standby */ + +#define PqMsg_StandbyStatus 'r' +#define PqMsg_HotStandbyFeedback 'h' +#define PqMsg_RequestPrimaryStatus 'p'Since these are part of the replication subprotocol (i.e. tunneled,
via CopyData) rather than the top-level wire protocol, do they deserve
their own prefix? PqReplMsg_* maybe?I'm going to wait to see if there are any other opinions. Last time I did
this there were quite a few opinions before finally settling on thenaming
Count me in.
FYI, the reason I used XLogData is because the term is used multiple times
here https://www.postgresql.org/docs/devel/protocol-replication.html
Dave
On 2025-Jul-25, Dave Cramer wrote:
FYI, the reason I used XLogData is because the term is used multiple times
here https://www.postgresql.org/docs/devel/protocol-replication.html
Yeah, we could rename it, as in the attached. It doesn't harm anything.
--
Álvaro Herrera Breisgau, Deutschland — https://www.EnterpriseDB.com/
Attachments:
0001-rename-XLogData-to-WALData.patchtext/x-diff; charset=utf-8Download
From de850f70b793f27c0aaa5a1f8589d44450691767 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@kurilemu.de>
Date: Fri, 25 Jul 2025 12:12:03 +0200
Subject: [PATCH] rename XLogData to WALData
---
doc/src/sgml/protocol.sgml | 8 ++++----
src/bin/pg_basebackup/pg_recvlogical.c | 4 ++--
src/bin/pg_basebackup/receivelog.c | 18 +++++++++---------
3 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b115884acb3..eadac60224e 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2550,8 +2550,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para>
<variablelist>
- <varlistentry id="protocol-replication-xlogdata">
- <term>XLogData (B)</term>
+ <varlistentry id="protocol-replication-waldata">
+ <term>WALData (B)</term>
<listitem>
<variablelist>
<varlistentry>
@@ -2599,11 +2599,11 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para>
<para>
- A single WAL record is never split across two XLogData messages.
+ A single WAL record is never split across two WALData messages.
When a WAL record crosses a WAL page boundary, and is therefore
already split using continuation records, it can be split at the page
boundary. In other words, the first main WAL record and its
- continuation records can be sent in different XLogData messages.
+ continuation records can be sent in different WALData messages.
</para>
</listitem>
</varlistentry>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 8a5dd24e6c9..0e9d2e23947 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -517,7 +517,7 @@ StreamLogicalLog(void)
}
/*
- * Read the header of the XLogData message, enclosed in the CopyData
+ * Read the header of the WALData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest
* of the header is ignored.
*/
@@ -605,7 +605,7 @@ StreamLogicalLog(void)
/*
* We're doing a client-initiated clean exit and have sent CopyDone to
* the server. Drain any messages, so we don't miss a last-minute
- * ErrorResponse. The walsender stops generating XLogData records once
+ * ErrorResponse. The walsender stops generating WALData records once
* it sees CopyDone, so expect this to finish quickly. After CopyDone,
* it's too late for sendFeedback(), even if this were to take a long
* time. Hence, use synchronous-mode PQgetCopyData().
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d6b7f117fa3..f2b54d3c501 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -38,8 +38,8 @@ static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
int len, XLogRecPtr blockpos, TimestampTz *last_status);
-static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
- XLogRecPtr *blockpos);
+static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
+ XLogRecPtr *blockpos);
static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
XLogRecPtr blockpos, XLogRecPtr *stoppos);
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
@@ -831,7 +831,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
}
else if (copybuf[0] == 'w')
{
- if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
+ if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
/*
@@ -1041,11 +1041,11 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
}
/*
- * Process XLogData message.
+ * Process WALData message.
*/
static bool
-ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
- XLogRecPtr *blockpos)
+ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
+ XLogRecPtr *blockpos)
{
int xlogoff;
int bytes_left;
@@ -1054,13 +1054,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
/*
* Once we've decided we don't want to receive any more, just ignore any
- * subsequent XLogData messages.
+ * subsequent WALData messages.
*/
if (!(still_sending))
return true;
/*
- * Read the header of the XLogData message, enclosed in the CopyData
+ * Read the header of the WALData message, enclosed in the CopyData
* message. We only need the WAL location field (dataStart), the rest of
* the header is ignored.
*/
@@ -1162,7 +1162,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
return false;
}
still_sending = false;
- return true; /* ignore the rest of this XLogData packet */
+ return true; /* ignore the rest of this WALData packet */
}
}
}
--
2.39.5
On Fri, 25 Jul 2025 at 06:23, Álvaro Herrera <alvherre@kurilemu.de> wrote:
On 2025-Jul-25, Dave Cramer wrote:
FYI, the reason I used XLogData is because the term is used multiple
times
here https://www.postgresql.org/docs/devel/protocol-replication.html
Yeah, we could rename it, as in the attached. It doesn't harm anything.
Consistency is good. If your patch were applied, then it would be
consistent to use WALData
Dave
On Fri, Jul 25, 2025 at 06:28:28AM -0400, Dave Cramer wrote:
On Fri, 25 Jul 2025 at 06:23, �lvaro Herrera <alvherre@kurilemu.de> wrote:
Yeah, we could rename it, as in the attached. It doesn't harm anything.
Consistency is good. If your patch were applied, then it would be
consistent to use WALData
+1
--
nathan
On Thu, Jul 24, 2025 at 05:39:14PM -0400, Dave Cramer wrote:
On Thu, 24 Jul 2025 at 17:05, Jacob Champion <
jacob.champion@enterprisedb.com> wrote:Since these are part of the replication subprotocol (i.e. tunneled,
via CopyData) rather than the top-level wire protocol, do they deserve
their own prefix? PqReplMsg_* maybe?I'm going to wait to see if there are any other opinions. Last time I did
this there were quite a few opinions before finally settling on the naming
+1 to a new prefix. I don't have any strong opinions on the exact choice,
though. PqReplMsg, ReplMsg, PqMsg_Repl, etc. seem like some obvious
options.
--
nathan
On Fri, 25 Jul 2025 at 10:38, Nathan Bossart <nathandbossart@gmail.com>
wrote:
On Thu, Jul 24, 2025 at 05:39:14PM -0400, Dave Cramer wrote:
On Thu, 24 Jul 2025 at 17:05, Jacob Champion <
jacob.champion@enterprisedb.com> wrote:Since these are part of the replication subprotocol (i.e. tunneled,
via CopyData) rather than the top-level wire protocol, do they deserve
their own prefix? PqReplMsg_* maybe?I'm going to wait to see if there are any other opinions. Last time I did
this there were quite a few opinions before finally settling on thenaming
+1 to a new prefix. I don't have any strong opinions on the exact choice,
though. PqReplMsg, ReplMsg, PqMsg_Repl, etc. seem like some obvious
options.I chose PqReplMsg patch attached
Dave
Attachments:
0001-Rename-replication-messages-to-start-with-PqReplMsg.patchapplication/octet-stream; name=0001-Rename-replication-messages-to-start-with-PqReplMsg.patchDownload
From 1279eaf23a5a636e624d623f2bc830421afc97ad Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Mon, 28 Jul 2025 15:12:32 -0400
Subject: [PATCH] Rename replication messages to start with PqReplMsg
---
src/backend/replication/logical/worker.c | 10 +++++-----
src/backend/replication/walreceiver.c | 4 ++--
src/backend/replication/walsender.c | 14 +++++++-------
src/include/libpq/protocol.h | 12 ++++++------
4 files changed, 20 insertions(+), 20 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 46d69da88d5..73482feb550 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3851,7 +3851,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
- if (c == PqMsg_XlogData)
+ if (c == PqReplMsg_WALData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -3873,7 +3873,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
- else if (c == PqMsg_PrimaryKeepAlive)
+ else if (c == PqReplMsg_PrimaryKeepAlive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -3892,7 +3892,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
- else if (c == PqMsg_PrimaryStatusUpdate) /* Primary status update */
+ else if (c == PqReplMsg_PrimaryStatusUpdate) /* Primary status update */
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4124,7 +4124,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
- pq_sendbyte(reply_message, PqMsg_StandbyStatus);
+ pq_sendbyte(reply_message, PqReplMsg_StandbyStatus);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4295,7 +4295,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
- pq_sendbyte(request_message, PqMsg_RequestPrimaryStatus);
+ pq_sendbyte(request_message, PqReplMsg_RequestPrimaryStatus);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a138e9c3556..82798f2aedb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, PqMsg_StandbyStatus);
+ pq_sendbyte(&reply_message, PqReplMsg_StandbyStatus);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, PqMsg_HotStandbyFeedback);
+ pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e00bc74e68f..037e3cb9e74 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
- pq_sendbyte(ctx->out, PqMsg_XlogData);
+ pq_sendbyte(ctx->out, PqReplMsg_WALData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2350,15 +2350,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
- case PqMsg_StandbyStatus:
+ case PqReplMsg_StandbyStatus:
ProcessStandbyReplyMessage();
break;
- case PqMsg_HotStandbyFeedback:
+ case PqReplMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
- case PqMsg_RequestPrimaryStatus:
+ case PqReplMsg_RequestPrimaryStatus:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2752,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, PqMsg_PrimaryStatusUpdate);
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3364,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, PqMsg_XlogData);
+ pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4135,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, PqMsg_PrimaryKeepAlive);
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryKeepAlive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index 06653a7c2c6..121704fa00f 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -70,16 +70,16 @@
/* Replication Protocol sent by the primary */
-#define PqMsg_XlogData 'w'
-#define PqMsg_PrimaryKeepAlive 'k'
-#define PqMsg_PrimaryStatusUpdate 's'
+#define PqReplMsg_WALData 'w'
+#define PqReplMsg_PrimaryKeepAlive 'k'
+#define PqReplMsg_PrimaryStatusUpdate 's'
/* Replication Protocol sent by the standby */
-#define PqMsg_StandbyStatus 'r'
-#define PqMsg_HotStandbyFeedback 'h'
-#define PqMsg_RequestPrimaryStatus 'p'
+#define PqReplMsg_StandbyStatus 'r'
+#define PqReplMsg_HotStandbyFeedback 'h'
+#define PqReplMsg_RequestPrimaryStatus 'p'
/* These are the codes sent by the frontend and backend. */
--
2.39.5 (Apple Git-154)
On 2025-Jul-25, Nathan Bossart wrote:
On Fri, Jul 25, 2025 at 06:28:28AM -0400, Dave Cramer wrote:
On Fri, 25 Jul 2025 at 06:23, Álvaro Herrera <alvherre@kurilemu.de> wrote:
Yeah, we could rename it, as in the attached. It doesn't harm anything.
Consistency is good. If your patch were applied, then it would be
consistent to use WALData+1
Okay, done, thanks.
--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
On 2025-Jul-28, Dave Cramer wrote:
I chose PqReplMsg patch attached
I think you sent a patch that applied on top of your previous patch
instead of a patch on top of master. Here it is as a standalone patch.
--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
Attachments:
v3-0001-Replace-protocol-replication-constants-with-named.patchtext/x-diff; charset=utf-8Download
From b21aa102999bf26ee20bf9eb916ea6aeee3e1edf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@kurilemu.de>
Date: Mon, 4 Aug 2025 14:27:53 +0200
Subject: [PATCH v3] Replace protocol replication constants with named
constants from protocol.h
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Author: Dave Cramer <davecramer@gmail.com>
Reviewed-by: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>
Reviewed-by: Ãlvaro Herrera <alvherre@kurilemu.de>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
---
src/backend/replication/logical/worker.c | 10 +++++-----
src/backend/replication/walreceiver.c | 4 ++--
src/backend/replication/walsender.c | 18 +++++++++---------
src/include/libpq/protocol.h | 13 +++++++++++++
4 files changed, 29 insertions(+), 16 deletions(-)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 89e241c8392..4594db65fec 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
- if (c == 'w')
+ if (c == PqReplMsg_WALData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
- else if (c == 'k')
+ else if (c == PqReplMsg_PrimaryKeepAlive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
- else if (c == 's') /* Primary status update */
+ else if (c == PqReplMsg_PrimaryStatusUpdate) /* Primary status update */
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
- pq_sendbyte(reply_message, 'r');
+ pq_sendbyte(reply_message, PqReplMsg_StandbyStatus);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
- pq_sendbyte(request_message, 'p');
+ pq_sendbyte(request_message, PqReplMsg_RequestPrimaryStatus);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b6281101711..82798f2aedb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'r');
+ pq_sendbyte(&reply_message, PqReplMsg_StandbyStatus);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'h');
+ pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ee911394a23..b19a7551b9f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
- pq_sendbyte(ctx->out, 'w');
+ pq_sendbyte(ctx->out, PqReplMsg_WALData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2292,7 +2292,7 @@ ProcessRepliesIfAny(void)
switch (firstchar)
{
/*
- * 'd' means a standby reply wrapped in a CopyData packet.
+ * standby reply wrapped in a CopyData packet.
*/
case PqMsg_CopyData:
ProcessStandbyMessage();
@@ -2315,7 +2315,7 @@ ProcessRepliesIfAny(void)
break;
/*
- * 'X' means that the standby is closing down the socket.
+ * the standby is closing down the socket.
*/
case PqMsg_Terminate:
proc_exit(0);
@@ -2350,15 +2350,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
- case 'r':
+ case PqReplMsg_StandbyStatus:
ProcessStandbyReplyMessage();
break;
- case 'h':
+ case PqReplMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
- case 'p':
+ case PqReplMsg_RequestPrimaryStatus:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2752,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 's');
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3364,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'w');
+ pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4135,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'k');
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryKeepAlive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index b0bcb3cdc26..ce37e11c3a3 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -86,4 +86,17 @@
#define AUTH_REQ_SASL_FIN 12 /* Final SASL message */
#define AUTH_REQ_MAX AUTH_REQ_SASL_FIN /* maximum AUTH_REQ_* value */
+/* Replication Protocol, sent by the primary */
+
+#define PqReplMsg_WALData 'w'
+#define PqReplMsg_PrimaryKeepAlive 'k'
+#define PqReplMsg_PrimaryStatusUpdate 's'
+
+/* Replication Protocol, sent by the standby */
+
+#define PqReplMsg_StandbyStatus 'r'
+#define PqReplMsg_HotStandbyFeedback 'h'
+#define PqReplMsg_RequestPrimaryStatus 'p'
+
+
#endif /* PROTOCOL_H */
--
2.39.5
On Mon, Aug 04, 2025 at 02:31:05PM +0200, �lvaro Herrera wrote:
+/* Replication Protocol, sent by the primary */ + +#define PqReplMsg_WALData 'w' +#define PqReplMsg_PrimaryKeepAlive 'k' +#define PqReplMsg_PrimaryStatusUpdate 's' + +/* Replication Protocol, sent by the standby */ + +#define PqReplMsg_StandbyStatus 'r' +#define PqReplMsg_HotStandbyFeedback 'h' +#define PqReplMsg_RequestPrimaryStatus 'p'
I know I previously +1'd a new prefix for these, but upon further review,
I'm not so sure about that. The replication protocol uses many of the
existing PqMsg macros already, so it would be a little strange if only a
subset of the replication protocol messages used the special prefix. And
IMO it would also be weird to duplicate all the macros used by both
protocols. There's also backups, which use the replication protocol but
have their own special characters [0]/messages/by-id/aIOkE7fgvFOu0FI_@nathan. If we're going the prefix route,
would we add another prefix for those, or use the replication one?
[0]: /messages/by-id/aIOkE7fgvFOu0FI_@nathan
--
nathan
On Mon, Aug 4, 2025 at 12:56 PM Nathan Bossart <nathandbossart@gmail.com> wrote:
The replication protocol uses many of the
existing PqMsg macros already, so it would be a little strange if only a
subset of the replication protocol messages used the special prefix.
May I ask why? These messages are legitimately different; they're
tunneled through CopyData, so their reservations don't collide with
the top-level codes.
There's also backups, which use the replication protocol but
have their own special characters [0]. If we're going the prefix route,
would we add another prefix for those, or use the replication one?
My vote would be to add another. 'p' is a password message in the
top-level protocol (one of many, actually), a progress message in a
backup stream, and a status request in a replication stream, so I
think they deserve their own namespaces.
--Jacob
On Mon, Aug 04, 2025 at 02:11:26PM -0700, Jacob Champion wrote:
On Mon, Aug 4, 2025 at 12:56 PM Nathan Bossart <nathandbossart@gmail.com> wrote:
The replication protocol uses many of the
existing PqMsg macros already, so it would be a little strange if only a
subset of the replication protocol messages used the special prefix.May I ask why? These messages are legitimately different; they're
tunneled through CopyData, so their reservations don't collide with
the top-level codes.
Ah, I missed that finer detail. IIUC the codes at hands are _only_ used in
these tunneled messages, in which case they belong to a distinct category.
There's also backups, which use the replication protocol but
have their own special characters [0]. If we're going the prefix route,
would we add another prefix for those, or use the replication one?My vote would be to add another. 'p' is a password message in the
top-level protocol (one of many, actually), a progress message in a
backup stream, and a status request in a replication stream, so I
think they deserve their own namespaces.
These also seem to use the same tunneling mechanism. I retract my
objection.
--
nathan
Here is an updated patch that includes 1) added uses of PqMsg_* macros, 2)
new PqReplMsg_* macros, and 3) new PqBackupMsg_* macros. Thoughts?
--
nathan
Attachments:
v4-0001-Expand-usage-of-protocol-characters.patchtext/plain; charset=iso-8859-1Download
From 6a1d03725009837c5ce99dcfc283fa565d587d13 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Tue, 5 Aug 2025 14:53:42 -0500
Subject: [PATCH v4 1/1] Expand usage of protocol characters.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Author: Dave Cramer <davecramer@gmail.com>
Co-authored-by: FabrÃzio de Royes Mello <fabriziomello@gmail.com>
Co-authored-by: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>
Reviewed-by: Ãlvaro Herrera <alvherre@kurilemu.de>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
Discussion: https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
---
src/backend/backup/basebackup_copy.c | 14 +++++++-------
src/backend/replication/logical/worker.c | 10 +++++-----
src/backend/replication/walreceiver.c | 4 ++--
src/backend/replication/walsender.c | 20 +++++++++++---------
src/include/libpq/protocol.h | 21 +++++++++++++++++++++
5 files changed, 46 insertions(+), 23 deletions(-)
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c
index 18b0b5a52d3..eb45d3bcb66 100644
--- a/src/backend/backup/basebackup_copy.c
+++ b/src/backend/backup/basebackup_copy.c
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
- mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+ mysink->msgbuffer[0] = PqMsg_CopyData; /* archive or manifest data */
/* Tell client the backup start location. */
SendXlogRecPtrResult(state->startptr, state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
ti = list_nth(state->tablespaces, state->tablespace_num);
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'n'); /* New archive */
+ pq_sendbyte(&buf, PqBackupMsg_NewArchive);
pq_sendstring(&buf, archive_name);
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
if (mysink->send_to_client)
{
/* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
+ pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
}
/* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
mysink->last_progress_report_time = now;
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
mysink->bytes_done_at_last_time_check = state->bytes_done;
mysink->last_progress_report_time = GetCurrentTimestamp();
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
StringInfoData buf;
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'm'); /* Manifest */
+ pq_sendbyte(&buf, PqBackupMsg_Manifest);
pq_endmessage(&buf);
}
@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
if (mysink->send_to_client)
{
/* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
+ pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
}
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 89e241c8392..2035b54a065 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
- if (c == 'w')
+ if (c == PqReplMsg_WALData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
- else if (c == 'k')
+ else if (c == PqReplMsg_PrimaryKeepAlive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
- else if (c == 's') /* Primary status update */
+ else if (c == PqReplMsg_PrimaryStatusUpdate)
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
- pq_sendbyte(reply_message, 'r');
+ pq_sendbyte(reply_message, PqReplMsg_StandbyStatus);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
- pq_sendbyte(request_message, 'p');
+ pq_sendbyte(request_message, PqReplMsg_RequestPrimaryStatus);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b6281101711..82798f2aedb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'r');
+ pq_sendbyte(&reply_message, PqReplMsg_StandbyStatus);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'h');
+ pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ee911394a23..bf40bfbd3af 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
- pq_sendbyte(ctx->out, 'w');
+ pq_sendbyte(ctx->out, PqReplMsg_WALData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2292,7 +2292,8 @@ ProcessRepliesIfAny(void)
switch (firstchar)
{
/*
- * 'd' means a standby reply wrapped in a CopyData packet.
+ * CopyData means a standby reply wrapped in a CopyData
+ * packet.
*/
case PqMsg_CopyData:
ProcessStandbyMessage();
@@ -2315,7 +2316,8 @@ ProcessRepliesIfAny(void)
break;
/*
- * 'X' means that the standby is closing down the socket.
+ * PqMsg_Terminate means that the standby is closing down the
+ * socket.
*/
case PqMsg_Terminate:
proc_exit(0);
@@ -2350,15 +2352,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
- case 'r':
+ case PqReplMsg_StandbyStatus:
ProcessStandbyReplyMessage();
break;
- case 'h':
+ case PqReplMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
- case 'p':
+ case PqReplMsg_RequestPrimaryStatus:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2754,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 's');
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3366,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'w');
+ pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4137,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'k');
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryKeepAlive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index b0bcb3cdc26..077a1af46a5 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -86,4 +86,25 @@
#define AUTH_REQ_SASL_FIN 12 /* Final SASL message */
#define AUTH_REQ_MAX AUTH_REQ_SASL_FIN /* maximum AUTH_REQ_* value */
+
+/* Replication codes sent by the primary (wrapped in CopyData messages). */
+
+#define PqReplMsg_PrimaryKeepAlive 'k'
+#define PqReplMsg_PrimaryStatusUpdate 's'
+#define PqReplMsg_WALData 'w'
+
+
+/* Replication codes sent by the standby (wrapped in CopyData messages). */
+
+#define PqReplMsg_HotStandbyFeedback 'h'
+#define PqReplMsg_RequestPrimaryStatus 'p'
+#define PqReplMsg_StandbyStatus 'r'
+
+
+/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */
+
+#define PqBackupMsg_Manifest 'm'
+#define PqBackupMsg_NewArchive 'n'
+#define PqBackupMsg_ProgressReport 'p'
+
#endif /* PROTOCOL_H */
--
2.39.5 (Apple Git-154)
On 2025-Aug-05, Nathan Bossart wrote:
Here is an updated patch that includes 1) added uses of PqMsg_* macros, 2)
new PqReplMsg_* macros, and 3) new PqBackupMsg_* macros. Thoughts?
Hmm, I think if you're going to add the backup ones, then it'd be good
to update ReceiveArchiveStreamChunk() to use these macros and this
header file.
--
Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/
On Tue, Aug 5, 2025, at 4:58 PM, Nathan Bossart wrote:
Here is an updated patch that includes 1) added uses of PqMsg_* macros, 2)
new PqReplMsg_* macros, and 3) new PqBackupMsg_* macros. Thoughts?
- * 'd' means a standby reply wrapped in a CopyData packet.
+ * CopyData means a standby reply wrapped in a CopyData
+ * packet.
*/
Shouldn't it be PqMsg_CopyData instead of CopyData (first reference)?
The function LogicalParallelApplyLoop() has
/*
* The first byte of messages sent from leader apply worker to
* parallel apply workers can only be 'w'.
*/
c = pq_getmsgbyte(&s);
if (c != 'w')
elog(ERROR, "unexpected message \"%c\"", c);
it needs to be adjusted.
The function XLogWalRcvProcessMsg() has
switch (type)
{
case 'w': /* WAL records */
{
StringInfoData incoming_message;
...
case 'k': /* Keepalive */
{
StringInfoData incoming_message;
it also needs to be adjusted.
There is also some references into receivelog.c. The functions sendFeedback()
and HandleCopyStream() contain some references to be replaced. There are other
functions too that refers to these replication codes on comments. It seems a
good idea to replace these references too.
There are also some references in pg_recvlogical.c. The functions
sendFeedback() and StreamLogicalLog() contain some references to be replaced.
Alvaro already pointed out the other cases in the pg_basebackup.c file.
May I suggest that you put these code before authentication codes? It seems
natural to have these new codes near the existing ones.
For the reference, I used grep -r -E "'[a-z]'" in some directories to catch
these cases.
It is a bit off-topic for this thread but looking at the replication protocol
messages, a different pattern is used for naming the messages such as
WALData
Primary keepalive message
Standby status update
Hot standby feedback message
new archive
manifest
archive or manifest data
progress report
I would expect to see the same pattern (Pascal case) as the wire protocol.
AuthenticationSASLFinal
BackendKeyData
Bind
--
Euler Taveira
EDB https://www.enterprisedb.com/
Okay, I think I've addressed all the latest feedback in v5.
--
nathan
Attachments:
v5-0001-Expand-usage-of-protocol-characters.patchtext/plain; charset=iso-8859-1Download
From 00540a80854d3fc598a4b99daddfe1e7c0817b5c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Tue, 5 Aug 2025 22:18:11 -0500
Subject: [PATCH v5 1/1] Expand usage of protocol characters.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Author: Dave Cramer <davecramer@gmail.com>
Co-authored-by: FabrÃzio de Royes Mello <fabriziomello@gmail.com>
Co-authored-by: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>
Reviewed-by: Ãlvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
Discussion: https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
---
src/backend/backup/basebackup_copy.c | 14 +++++------
.../replication/logical/applyparallelworker.c | 4 +--
src/backend/replication/logical/worker.c | 10 ++++----
src/backend/replication/walreceiver.c | 8 +++---
src/backend/replication/walsender.c | 25 +++++++++++--------
src/bin/pg_basebackup/pg_basebackup.c | 9 ++++---
src/bin/pg_basebackup/pg_recvlogical.c | 11 ++++----
src/bin/pg_basebackup/receivelog.c | 11 ++++----
src/include/libpq/protocol.h | 21 ++++++++++++++++
9 files changed, 70 insertions(+), 43 deletions(-)
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c
index 18b0b5a52d3..eb45d3bcb66 100644
--- a/src/backend/backup/basebackup_copy.c
+++ b/src/backend/backup/basebackup_copy.c
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
- mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+ mysink->msgbuffer[0] = PqMsg_CopyData; /* archive or manifest data */
/* Tell client the backup start location. */
SendXlogRecPtrResult(state->startptr, state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
ti = list_nth(state->tablespaces, state->tablespace_num);
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'n'); /* New archive */
+ pq_sendbyte(&buf, PqBackupMsg_NewArchive);
pq_sendstring(&buf, archive_name);
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
if (mysink->send_to_client)
{
/* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
+ pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
}
/* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
mysink->last_progress_report_time = now;
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
mysink->bytes_done_at_last_time_check = state->bytes_done;
mysink->last_progress_report_time = GetCurrentTimestamp();
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
StringInfoData buf;
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'm'); /* Manifest */
+ pq_sendbyte(&buf, PqBackupMsg_Manifest);
pq_endmessage(&buf);
}
@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
if (mysink->send_to_client)
{
/* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
+ pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
}
}
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1fa931a7422..cd0e19176fd 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -778,10 +778,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
/*
* The first byte of messages sent from leader apply worker to
- * parallel apply workers can only be 'w'.
+ * parallel apply workers can only be PqReplMsg_WALData.
*/
c = pq_getmsgbyte(&s);
- if (c != 'w')
+ if (c != PqReplMsg_WALData)
elog(ERROR, "unexpected message \"%c\"", c);
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 89e241c8392..0fdc5de57ba 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
- if (c == 'w')
+ if (c == PqReplMsg_WALData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
- else if (c == 'k')
+ else if (c == PqReplMsg_Keepalive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
- else if (c == 's') /* Primary status update */
+ else if (c == PqReplMsg_PrimaryStatusUpdate)
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
- pq_sendbyte(reply_message, 'r');
+ pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
- pq_sendbyte(request_message, 'p');
+ pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b6281101711..7361ffc9dcf 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -826,7 +826,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
switch (type)
{
- case 'w': /* WAL records */
+ case PqReplMsg_WALData:
{
StringInfoData incoming_message;
@@ -850,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
XLogWalRcvWrite(buf, len, dataStart, tli);
break;
}
- case 'k': /* Keepalive */
+ case PqReplMsg_Keepalive:
{
StringInfoData incoming_message;
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'r');
+ pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'h');
+ pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ee911394a23..0855bae3535 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
- pq_sendbyte(ctx->out, 'w');
+ pq_sendbyte(ctx->out, PqReplMsg_WALData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2292,7 +2292,8 @@ ProcessRepliesIfAny(void)
switch (firstchar)
{
/*
- * 'd' means a standby reply wrapped in a CopyData packet.
+ * PqMsg_CopyData means a standby reply wrapped in a CopyData
+ * packet.
*/
case PqMsg_CopyData:
ProcessStandbyMessage();
@@ -2300,8 +2301,9 @@ ProcessRepliesIfAny(void)
break;
/*
- * CopyDone means the standby requested to finish streaming.
- * Reply with CopyDone, if we had not sent that already.
+ * PqMsg_CopyDone means the standby requested to finish
+ * streaming. Reply with CopyDone, if we had not sent that
+ * already.
*/
case PqMsg_CopyDone:
if (!streamingDoneSending)
@@ -2315,7 +2317,8 @@ ProcessRepliesIfAny(void)
break;
/*
- * 'X' means that the standby is closing down the socket.
+ * PqMsg_Terminate means that the standby is closing down the
+ * socket.
*/
case PqMsg_Terminate:
proc_exit(0);
@@ -2350,15 +2353,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
- case 'r':
+ case PqReplMsg_StandbyStatusUpdate:
ProcessStandbyReplyMessage();
break;
- case 'h':
+ case PqReplMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
- case 'p':
+ case PqReplMsg_PrimaryStatusRequest:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2755,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 's');
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3367,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'w');
+ pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4138,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'k');
+ pq_sendbyte(&output_message, PqReplMsg_Keepalive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 55621f35fb6..0a3ca4315de 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -35,6 +35,7 @@
#include "fe_utils/option_utils.h"
#include "fe_utils/recovery_gen.h"
#include "getopt_long.h"
+#include "libpq/protocol.h"
#include "receivelog.h"
#include "streamutil.h"
@@ -1338,7 +1339,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
/* Each CopyData message begins with a type byte. */
switch (GetCopyDataByte(r, copybuf, &cursor))
{
- case 'n':
+ case PqBackupMsg_NewArchive:
{
/* New archive. */
char *archive_name;
@@ -1410,7 +1411,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
break;
}
- case 'd':
+ case PqMsg_CopyData:
{
/* Archive or manifest data. */
if (state->manifest_buffer != NULL)
@@ -1446,7 +1447,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
break;
}
- case 'p':
+ case PqBackupMsg_ProgressReport:
{
/*
* Progress report.
@@ -1465,7 +1466,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
break;
}
- case 'm':
+ case PqBackupMsg_Manifest:
{
/*
* Manifest data will be sent next. This message is not
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 0e9d2e23947..7a4d1a2d2ca 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -24,6 +24,7 @@
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
+#include "libpq/protocol.h"
#include "pqexpbuffer.h"
#include "streamutil.h"
@@ -149,7 +150,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
LSN_FORMAT_ARGS(output_fsync_lsn),
replication_slot);
- replybuf[len] = 'r';
+ replybuf[len] = PqReplMsg_StandbyStatusUpdate;
len += 1;
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
len += 8;
@@ -454,7 +455,7 @@ StreamLogicalLog(void)
}
/* Check the message type. */
- if (copybuf[0] == 'k')
+ if (copybuf[0] == PqReplMsg_Keepalive)
{
int pos;
bool replyRequested;
@@ -466,7 +467,7 @@ StreamLogicalLog(void)
* We just check if the server requested a reply, and ignore the
* rest.
*/
- pos = 1; /* skip msgtype 'k' */
+ pos = 1; /* skip msgtype PqReplMsg_Keepalive */
walEnd = fe_recvint64(©buf[pos]);
output_written_lsn = Max(walEnd, output_written_lsn);
@@ -509,7 +510,7 @@ StreamLogicalLog(void)
continue;
}
- else if (copybuf[0] != 'w')
+ else if (copybuf[0] != PqReplMsg_WALData)
{
pg_log_error("unrecognized streaming header: \"%c\"",
copybuf[0]);
@@ -521,7 +522,7 @@ StreamLogicalLog(void)
* message. We only need the WAL location field (dataStart), the rest
* of the header is ignored.
*/
- hdr_len = 1; /* msgtype 'w' */
+ hdr_len = 1; /* msgtype PqReplMsg_WALData */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index f2b54d3c501..25b13c7f55c 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -21,6 +21,7 @@
#include "access/xlog_internal.h"
#include "common/logging.h"
#include "libpq-fe.h"
+#include "libpq/protocol.h"
#include "receivelog.h"
#include "streamutil.h"
@@ -338,7 +339,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
- replybuf[len] = 'r';
+ replybuf[len] = PqReplMsg_StandbyStatusUpdate;
len += 1;
fe_sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
@@ -823,13 +824,13 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
}
/* Check the message type. */
- if (copybuf[0] == 'k')
+ if (copybuf[0] == PqReplMsg_Keepalive)
{
if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
&last_status))
goto error;
}
- else if (copybuf[0] == 'w')
+ else if (copybuf[0] == PqReplMsg_WALData)
{
if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
@@ -1001,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* Parse the keepalive message, enclosed in the CopyData message. We just
* check if the server requested a reply, and ignore the rest.
*/
- pos = 1; /* skip msgtype 'k' */
+ pos = 1; /* skip msgtype PqReplMsg_Keepalive */
pos += 8; /* skip walEnd */
pos += 8; /* skip sendTime */
@@ -1064,7 +1065,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* message. We only need the WAL location field (dataStart), the rest of
* the header is ignored.
*/
- hdr_len = 1; /* msgtype 'w' */
+ hdr_len = 1; /* msgtype PqReplMsg_WALData */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index b0bcb3cdc26..c64e628628d 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -69,6 +69,27 @@
#define PqMsg_Progress 'P'
+/* Replication codes sent by the primary (wrapped in CopyData messages). */
+
+#define PqReplMsg_Keepalive 'k'
+#define PqReplMsg_PrimaryStatusUpdate 's'
+#define PqReplMsg_WALData 'w'
+
+
+/* Replication codes sent by the standby (wrapped in CopyData messages). */
+
+#define PqReplMsg_HotStandbyFeedback 'h'
+#define PqReplMsg_PrimaryStatusRequest 'p'
+#define PqReplMsg_StandbyStatusUpdate 'r'
+
+
+/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */
+
+#define PqBackupMsg_Manifest 'm'
+#define PqBackupMsg_NewArchive 'n'
+#define PqBackupMsg_ProgressReport 'p'
+
+
/* These are the authentication request codes sent by the backend. */
#define AUTH_REQ_OK 0 /* User is authenticated */
--
2.39.5 (Apple Git-154)
On 2025-Aug-05, Nathan Bossart wrote:
Okay, I think I've addressed all the latest feedback in v5.
LGTM.
--
Álvaro Herrera 48°01'N 7°57'E — https://www.EnterpriseDB.com/
"Nunca confiaré en un traidor. Ni siquiera si el traidor lo he creado yo"
(Barón Vladimir Harkonnen)
On Wed, Aug 6, 2025, at 12:26 AM, Nathan Bossart wrote:
Okay, I think I've addressed all the latest feedback in v5.
LGTM.
I tried to apply your patch and it says
$ git am -3 v5-0001-Expand-usage-of-protocol-characters.patch
Applying: Expand usage of protocol characters.
Warning: commit message did not conform to UTF-8.
You may want to amend it after fixing the message, or set the config
variable i18n.commitEncoding to the encoding your project uses.
It seems your file is ISO-8859-1 but your commit message claims to be UTF-8.
--
Euler Taveira
EDB https://www.enterprisedb.com/
On Wed, Aug 06, 2025 at 02:54:13PM -0300, Euler Taveira wrote:
I tried to apply your patch and it says
$ git am -3 v5-0001-Expand-usage-of-protocol-characters.patch
Applying: Expand usage of protocol characters.
Warning: commit message did not conform to UTF-8.
You may want to amend it after fixing the message, or set the config
variable i18n.commitEncoding to the encoding your project uses.It seems your file is ISO-8859-1 but your commit message claims to be UTF-8.
I think my .muttrc needed some adjustments. Let's see if this works...
--
nathan
Attachments:
v6-0001-Expand-usage-of-protocol-characters.patchtext/plain; charset=utf-8Download
From 328cf708480fc6c7e4687f96938b584b1cf67d00 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Tue, 5 Aug 2025 22:18:11 -0500
Subject: [PATCH v6 1/1] Expand usage of protocol characters.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Author: Dave Cramer <davecramer@gmail.com>
Co-authored-by: FabrÃzio de Royes Mello <fabriziomello@gmail.com>
Co-authored-by: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>
Reviewed-by: Ãlvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
Discussion: https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
---
src/backend/backup/basebackup_copy.c | 14 +++++------
.../replication/logical/applyparallelworker.c | 4 +--
src/backend/replication/logical/worker.c | 10 ++++----
src/backend/replication/walreceiver.c | 8 +++---
src/backend/replication/walsender.c | 25 +++++++++++--------
src/bin/pg_basebackup/pg_basebackup.c | 9 ++++---
src/bin/pg_basebackup/pg_recvlogical.c | 11 ++++----
src/bin/pg_basebackup/receivelog.c | 11 ++++----
src/include/libpq/protocol.h | 21 ++++++++++++++++
9 files changed, 70 insertions(+), 43 deletions(-)
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c
index 18b0b5a52d3..eb45d3bcb66 100644
--- a/src/backend/backup/basebackup_copy.c
+++ b/src/backend/backup/basebackup_copy.c
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
- mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+ mysink->msgbuffer[0] = PqMsg_CopyData; /* archive or manifest data */
/* Tell client the backup start location. */
SendXlogRecPtrResult(state->startptr, state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
ti = list_nth(state->tablespaces, state->tablespace_num);
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'n'); /* New archive */
+ pq_sendbyte(&buf, PqBackupMsg_NewArchive);
pq_sendstring(&buf, archive_name);
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
if (mysink->send_to_client)
{
/* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
+ pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
}
/* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
mysink->last_progress_report_time = now;
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
mysink->bytes_done_at_last_time_check = state->bytes_done;
mysink->last_progress_report_time = GetCurrentTimestamp();
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
pq_sendint64(&buf, state->bytes_done);
pq_endmessage(&buf);
pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
StringInfoData buf;
pq_beginmessage(&buf, PqMsg_CopyData);
- pq_sendbyte(&buf, 'm'); /* Manifest */
+ pq_sendbyte(&buf, PqBackupMsg_Manifest);
pq_endmessage(&buf);
}
@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
if (mysink->send_to_client)
{
/* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
+ pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
}
}
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1fa931a7422..cd0e19176fd 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -778,10 +778,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
/*
* The first byte of messages sent from leader apply worker to
- * parallel apply workers can only be 'w'.
+ * parallel apply workers can only be PqReplMsg_WALData.
*/
c = pq_getmsgbyte(&s);
- if (c != 'w')
+ if (c != PqReplMsg_WALData)
elog(ERROR, "unexpected message \"%c\"", c);
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 89e241c8392..0fdc5de57ba 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
c = pq_getmsgbyte(&s);
- if (c == 'w')
+ if (c == PqReplMsg_WALData)
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
}
- else if (c == 'k')
+ else if (c == PqReplMsg_Keepalive)
{
XLogRecPtr end_lsn;
TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
- else if (c == 's') /* Primary status update */
+ else if (c == PqReplMsg_PrimaryStatusUpdate)
{
rdt_data.remote_lsn = pq_getmsgint64(&s);
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
else
resetStringInfo(reply_message);
- pq_sendbyte(reply_message, 'r');
+ pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
* Send the current time to update the remote walsender's latest reply
* message received time.
*/
- pq_sendbyte(request_message, 'p');
+ pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
pq_sendint64(request_message, GetCurrentTimestamp());
elog(DEBUG2, "sending publisher status request message");
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b6281101711..7361ffc9dcf 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -826,7 +826,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
switch (type)
{
- case 'w': /* WAL records */
+ case PqReplMsg_WALData:
{
StringInfoData incoming_message;
@@ -850,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
XLogWalRcvWrite(buf, len, dataStart, tli);
break;
}
- case 'k': /* Keepalive */
+ case PqReplMsg_Keepalive:
{
StringInfoData incoming_message;
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'r');
+ pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Construct the message and send it. */
resetStringInfo(&reply_message);
- pq_sendbyte(&reply_message, 'h');
+ pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ee911394a23..0855bae3535 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
resetStringInfo(ctx->out);
- pq_sendbyte(ctx->out, 'w');
+ pq_sendbyte(ctx->out, PqReplMsg_WALData);
pq_sendint64(ctx->out, lsn); /* dataStart */
pq_sendint64(ctx->out, lsn); /* walEnd */
@@ -2292,7 +2292,8 @@ ProcessRepliesIfAny(void)
switch (firstchar)
{
/*
- * 'd' means a standby reply wrapped in a CopyData packet.
+ * PqMsg_CopyData means a standby reply wrapped in a CopyData
+ * packet.
*/
case PqMsg_CopyData:
ProcessStandbyMessage();
@@ -2300,8 +2301,9 @@ ProcessRepliesIfAny(void)
break;
/*
- * CopyDone means the standby requested to finish streaming.
- * Reply with CopyDone, if we had not sent that already.
+ * PqMsg_CopyDone means the standby requested to finish
+ * streaming. Reply with CopyDone, if we had not sent that
+ * already.
*/
case PqMsg_CopyDone:
if (!streamingDoneSending)
@@ -2315,7 +2317,8 @@ ProcessRepliesIfAny(void)
break;
/*
- * 'X' means that the standby is closing down the socket.
+ * PqMsg_Terminate means that the standby is closing down the
+ * socket.
*/
case PqMsg_Terminate:
proc_exit(0);
@@ -2350,15 +2353,15 @@ ProcessStandbyMessage(void)
switch (msgtype)
{
- case 'r':
+ case PqReplMsg_StandbyStatusUpdate:
ProcessStandbyReplyMessage();
break;
- case 'h':
+ case PqReplMsg_HotStandbyFeedback:
ProcessStandbyHSFeedbackMessage();
break;
- case 'p':
+ case PqReplMsg_PrimaryStatusRequest:
ProcessStandbyPSRequestMessage();
break;
@@ -2752,7 +2755,7 @@ ProcessStandbyPSRequestMessage(void)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 's');
+ pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
pq_sendint64(&output_message, lsn);
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3367,7 @@ XLogSendPhysical(void)
* OK to read and send the slice.
*/
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'w');
+ pq_sendbyte(&output_message, PqReplMsg_WALData);
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4138,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
/* construct the message... */
resetStringInfo(&output_message);
- pq_sendbyte(&output_message, 'k');
+ pq_sendbyte(&output_message, PqReplMsg_Keepalive);
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 55621f35fb6..0a3ca4315de 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -35,6 +35,7 @@
#include "fe_utils/option_utils.h"
#include "fe_utils/recovery_gen.h"
#include "getopt_long.h"
+#include "libpq/protocol.h"
#include "receivelog.h"
#include "streamutil.h"
@@ -1338,7 +1339,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
/* Each CopyData message begins with a type byte. */
switch (GetCopyDataByte(r, copybuf, &cursor))
{
- case 'n':
+ case PqBackupMsg_NewArchive:
{
/* New archive. */
char *archive_name;
@@ -1410,7 +1411,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
break;
}
- case 'd':
+ case PqMsg_CopyData:
{
/* Archive or manifest data. */
if (state->manifest_buffer != NULL)
@@ -1446,7 +1447,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
break;
}
- case 'p':
+ case PqBackupMsg_ProgressReport:
{
/*
* Progress report.
@@ -1465,7 +1466,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
break;
}
- case 'm':
+ case PqBackupMsg_Manifest:
{
/*
* Manifest data will be sent next. This message is not
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 0e9d2e23947..7a4d1a2d2ca 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -24,6 +24,7 @@
#include "getopt_long.h"
#include "libpq-fe.h"
#include "libpq/pqsignal.h"
+#include "libpq/protocol.h"
#include "pqexpbuffer.h"
#include "streamutil.h"
@@ -149,7 +150,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
LSN_FORMAT_ARGS(output_fsync_lsn),
replication_slot);
- replybuf[len] = 'r';
+ replybuf[len] = PqReplMsg_StandbyStatusUpdate;
len += 1;
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
len += 8;
@@ -454,7 +455,7 @@ StreamLogicalLog(void)
}
/* Check the message type. */
- if (copybuf[0] == 'k')
+ if (copybuf[0] == PqReplMsg_Keepalive)
{
int pos;
bool replyRequested;
@@ -466,7 +467,7 @@ StreamLogicalLog(void)
* We just check if the server requested a reply, and ignore the
* rest.
*/
- pos = 1; /* skip msgtype 'k' */
+ pos = 1; /* skip msgtype PqReplMsg_Keepalive */
walEnd = fe_recvint64(©buf[pos]);
output_written_lsn = Max(walEnd, output_written_lsn);
@@ -509,7 +510,7 @@ StreamLogicalLog(void)
continue;
}
- else if (copybuf[0] != 'w')
+ else if (copybuf[0] != PqReplMsg_WALData)
{
pg_log_error("unrecognized streaming header: \"%c\"",
copybuf[0]);
@@ -521,7 +522,7 @@ StreamLogicalLog(void)
* message. We only need the WAL location field (dataStart), the rest
* of the header is ignored.
*/
- hdr_len = 1; /* msgtype 'w' */
+ hdr_len = 1; /* msgtype PqReplMsg_WALData */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index f2b54d3c501..25b13c7f55c 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -21,6 +21,7 @@
#include "access/xlog_internal.h"
#include "common/logging.h"
#include "libpq-fe.h"
+#include "libpq/protocol.h"
#include "receivelog.h"
#include "streamutil.h"
@@ -338,7 +339,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
- replybuf[len] = 'r';
+ replybuf[len] = PqReplMsg_StandbyStatusUpdate;
len += 1;
fe_sendint64(blockpos, &replybuf[len]); /* write */
len += 8;
@@ -823,13 +824,13 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
}
/* Check the message type. */
- if (copybuf[0] == 'k')
+ if (copybuf[0] == PqReplMsg_Keepalive)
{
if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
&last_status))
goto error;
}
- else if (copybuf[0] == 'w')
+ else if (copybuf[0] == PqReplMsg_WALData)
{
if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
@@ -1001,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* Parse the keepalive message, enclosed in the CopyData message. We just
* check if the server requested a reply, and ignore the rest.
*/
- pos = 1; /* skip msgtype 'k' */
+ pos = 1; /* skip msgtype PqReplMsg_Keepalive */
pos += 8; /* skip walEnd */
pos += 8; /* skip sendTime */
@@ -1064,7 +1065,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* message. We only need the WAL location field (dataStart), the rest of
* the header is ignored.
*/
- hdr_len = 1; /* msgtype 'w' */
+ hdr_len = 1; /* msgtype PqReplMsg_WALData */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index b0bcb3cdc26..c64e628628d 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -69,6 +69,27 @@
#define PqMsg_Progress 'P'
+/* Replication codes sent by the primary (wrapped in CopyData messages). */
+
+#define PqReplMsg_Keepalive 'k'
+#define PqReplMsg_PrimaryStatusUpdate 's'
+#define PqReplMsg_WALData 'w'
+
+
+/* Replication codes sent by the standby (wrapped in CopyData messages). */
+
+#define PqReplMsg_HotStandbyFeedback 'h'
+#define PqReplMsg_PrimaryStatusRequest 'p'
+#define PqReplMsg_StandbyStatusUpdate 'r'
+
+
+/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */
+
+#define PqBackupMsg_Manifest 'm'
+#define PqBackupMsg_NewArchive 'n'
+#define PqBackupMsg_ProgressReport 'p'
+
+
/* These are the authentication request codes sent by the backend. */
#define AUTH_REQ_OK 0 /* User is authenticated */
--
2.39.5 (Apple Git-154)