From 57cf4fca23fc6481f79d94b9261a2a5c14c6ba9a Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Date: Mon, 11 Aug 2025 11:02:58 +0530
Subject: [PATCH 1/2] Report size of reorder buffer contents

pg_stat_replication_slots reports cumulative statistics about spilled or
streamed transactions. It helps to know that whether the logical
decoding work memory is sufficient or not to hold all the changes. This
statistics does not give an idea about the total size of changes
belonging to the transactions tracked by the reorder buffer at a given
point in time. That is important to set logical decoding work memory
size so as to make use of memory, (in case spilled transactions) disk
and (in case of streamed transactions) resources on downstream
optimally.

This patch reports total size of the reorder buffer contents through
pg_stat_replication view. The size includes the contents in the memory,
spilled to the disk or sent downstream as part of an unfinished streamed
transaction.  Effectively it counts for size of all changes belonging
the transactions that are currently active in the reorder buffer. The
trendline of total reorder buffer size should help user set
logical_decoding_work_mem after considering various trade-offs.

It's arguable whether the count should be reported in
pg_stat_replication_slots or pg_stat_replication.
pg_stat_replication_slot covers all replication slots, in turn covering
wal senders as well as backends using replication slots e.g. clients
receiving logical changes via SQL callable functions. But it is a
cumulative statistics view and size of reorder buffer is a point in time
statistics. Such statistics are covered by
pg_stat_replication, which doesn't report statistics for client backends
receiving logical changes via a SQL callable function.
pg_stat_replication_slot statistics are updated at stragegic events
during logical decoding. Updating reorder buffer size at those strategic
points may not always suffice; it will not give the accurate value at
the time when the view is queried. pg_stat_replication fetches the real
time state of wal sender process and thus will report reorder buffer
size more accurately. But user may miss strategic events when sampling
pg_stat_replication.

Any serious user of logical replication who is looking for optimally
sizing logical_decoding_work_mem better be using a WAL sender. Thus
pg_stat_replication makes more sense.

Author: Ashutosh Bapat
---
 doc/src/sgml/monitoring.sgml                    | 14 ++++++++++++++
 src/backend/catalog/system_views.sql            |  3 ++-
 src/backend/replication/logical/decode.c        |  4 ++--
 src/backend/replication/logical/reorderbuffer.c | 16 +++++++++++++++-
 src/backend/replication/walsender.c             | 15 ++++++++++++++-
 src/include/catalog/pg_proc.dat                 |  6 +++---
 src/include/replication/reorderbuffer.h         | 16 +++++++++++++++-
 src/include/replication/walsender_private.h     |  7 +++++++
 src/test/regress/expected/rules.out             |  5 +++--
 9 files changed, 75 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3f4a27a736e..c40f5be9a59 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1464,6 +1464,20 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        Send time of last reply message received from standby server
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>rb_total_size</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Total size of the changes belonging to all the transactions queued in the
+       reorder buffer. This includes the size of changes queued in memory or
+       spilled to the disk or sent downstream as part of a an unfinished
+       streamed transaction. It can be used to tune
+       <literal>logical_decoding_work_mem</literal> or estimate the logical
+       decoding load on the wal sender.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 1b3c5a55882..ca31bf5375b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -932,7 +932,8 @@ CREATE VIEW pg_stat_replication AS
             W.replay_lag,
             W.sync_priority,
             W.sync_state,
-            W.reply_time
+            W.reply_time,
+            W.rb_total_size
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index cc03f0706e9..5fd8f1848ce 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -713,9 +713,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		for (i = 0; i < parsed->nsubxacts; i++)
 		{
-			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr, true);
 		}
-		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr, true);
 
 		return;
 	}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 34cf05668ae..0adec1fddb8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -392,6 +392,7 @@ ReorderBufferAllocate(void)
 	buffer->streamBytes = 0;
 	buffer->totalTxns = 0;
 	buffer->totalBytes = 0;
+	buffer->totalSize = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -3167,7 +3168,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
  * to this xid might re-create the transaction incompletely.
  */
 void
-ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
+ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, bool upd_rb_total_size)
 {
 	ReorderBufferTXN *txn;
 
@@ -3198,6 +3199,12 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
 	/* remove potential on-disk data, and deallocate */
 	ReorderBufferCleanupTXN(rb, txn);
+
+	if (upd_rb_total_size)
+	{
+		/* Update the total size of the reorder buffer */
+		rb->totalSize -= txn->rb_size;
+	}
 }
 
 /*
@@ -3403,6 +3410,13 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 		txn->size += sz;
 		rb->size += sz;
 
+		/*
+		 * TODO: update totalSize if only this is a new change being added the
+		 * first time - i.e. not being read from the spilled disk.
+		 */
+		rb->totalSize += sz;
+		txn->rb_size += sz;
+
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0855bae3535..670f49f9286 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1495,6 +1495,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	/* Also update the sent position status in shared memory */
 	SpinLockAcquire(&MyWalSnd->mutex);
 	MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+	MyWalSnd->rb_total_size = logical_decoding_ctx->reorder->totalSize;
 	SpinLockRelease(&MyWalSnd->mutex);
 
 	replication_active = true;
@@ -3029,6 +3030,7 @@ InitWalSenderSlot(void)
 			walsnd->applyLag = -1;
 			walsnd->sync_standby_priority = 0;
 			walsnd->replyTime = 0;
+			walsnd->rb_total_size = 0;
 
 			/*
 			 * The kind assignment is done here and not in StartReplication()
@@ -3545,6 +3547,7 @@ XLogSendLogical(void)
 
 		SpinLockAcquire(&walsnd->mutex);
 		walsnd->sentPtr = sentPtr;
+		walsnd->rb_total_size = logical_decoding_ctx->reorder->totalSize;
 		SpinLockRelease(&walsnd->mutex);
 	}
 }
@@ -3963,7 +3966,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	12
+#define PG_STAT_GET_WAL_SENDERS_COLS	13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	SyncRepStandbyData *sync_standbys;
 	int			num_standbys;
@@ -3991,10 +3994,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		int			pid;
 		WalSndState state;
 		TimestampTz replyTime;
+		int64		rb_total_size;
 		bool		is_sync_standby;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
 		int			j;
+		ReplicationKind replkind;
 
 		/* Collect data from shared memory */
 		SpinLockAcquire(&walsnd->mutex);
@@ -4014,6 +4019,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		applyLag = walsnd->applyLag;
 		priority = walsnd->sync_standby_priority;
 		replyTime = walsnd->replyTime;
+		rb_total_size = walsnd->rb_total_size;
+		replkind = walsnd->kind;
 		SpinLockRelease(&walsnd->mutex);
 
 		/*
@@ -4110,6 +4117,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[11] = true;
 			else
 				values[11] = TimestampTzGetDatum(replyTime);
+
+			/* Physical walsenders do not maintain a reorder buffer. */
+			if (replkind == REPLICATION_KIND_PHYSICAL)
+				nulls[12] = true;
+			else
+				values[12] = Int64GetDatum(rb_total_size);
 		}
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da1ace..f44407a6aa2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5661,9 +5661,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,rb_total_size}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..932ce87b8c2 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -458,6 +458,12 @@ typedef struct ReorderBufferTXN
 	 */
 	Size		size;
 
+	/*
+	 * Size of this transaction (changes currently in memory, on disk or
+	 * streamed in bytes).
+	 */
+	Size		rb_size;
+
 	/* Size of top-transaction including sub-transactions. */
 	Size		total_size;
 
@@ -696,6 +702,14 @@ struct ReorderBuffer
 	 */
 	int64		totalTxns;		/* total number of transactions sent */
 	int64		totalBytes;		/* total amount of data decoded */
+
+	/*
+	 * Tracks total size of changes in the reorder buffer including the
+	 * changes spilled to disk or sent downstream as part of an unfinished
+	 * streamed transaction. In other words, this tracks the total size of all
+	 * the changes in reorder buffer if it would have been infinitely large.
+	 */
+	int64		totalSize;
 };
 
 
@@ -736,7 +750,7 @@ extern void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 							   TimestampTz abort_time);
 extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
-extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
+extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, bool upd_rb_total_size);
 extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 
 extern void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index e98701038f5..0df7b6c1e75 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -75,6 +75,13 @@ typedef struct WalSnd
 	 */
 	TimestampTz replyTime;
 
+	/*
+	 * Size of contents in reorder buffer including those spilled to the disk
+	 * or sent downstream as part of an unfinished streamed transaction. This
+	 * is the total size of all the changes in reorder buffer if it would have
+	 * been infinitely large.
+	 */
+	int64		rb_total_size;
 	ReplicationKind kind;
 } WalSnd;
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 35e8aad7701..6afddad4c59 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2127,9 +2127,10 @@ pg_stat_replication| SELECT s.pid,
     w.replay_lag,
     w.sync_priority,
     w.sync_state,
-    w.reply_time
+    w.reply_time,
+    w.rb_total_size
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, gss_delegation, leader_pid, query_id)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, rb_total_size) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
     s.spill_txns,

base-commit: 783cbb6d5e8bdf87d321286f210983c177ead967
-- 
2.34.1

