From 1a385b30d6cb4ca111cbcc16ea14017c08f9a579 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Date: Fri, 27 Jun 2025 09:16:23 +0530
Subject: [PATCH] Report data sent statistics in pg_stat_replication_slots

pg_stat_replication_slots reports the total number of transactions and bytes
added to the reorder buffer. This is the amount of WAL that is processed by the
WAL sender. This data undergoes filtering and logical decoding before sending to
the downstream. Hence the amount of WAL added to the reorder buffer does not
serve as a good metric for the amount of data sent downstream. Knowing the
amount data sent downstream is useful when debugging slowly moving logical
replication.

This patch adds two new columns to pg_stat_replication_slots:
- sent_txns: The total number of transactions sent downstream.
- sent_bytes: The total number of bytes sent downstream in data messages

Ashutosh Bapat
---
 contrib/test_decoding/expected/stats.out      | 54 ++++++++++---------
 contrib/test_decoding/sql/stats.sql           | 12 +++--
 contrib/test_decoding/t/001_repl_stats.pl     | 14 ++---
 contrib/test_decoding/test_decoding.c         |  1 +
 doc/src/sgml/monitoring.sgml                  | 27 ++++++++++
 src/backend/catalog/system_views.sql          |  2 +
 src/backend/replication/logical/logical.c     | 10 +++-
 .../replication/logical/logicalfuncs.c        |  2 +
 .../replication/logical/reorderbuffer.c       |  2 +
 src/backend/replication/pgoutput/pgoutput.c   |  1 +
 src/backend/replication/walsender.c           |  2 +
 src/backend/utils/activity/pgstat_replslot.c  |  2 +
 src/backend/utils/adt/pgstatfuncs.c           | 14 +++--
 src/include/catalog/pg_proc.dat               |  6 +--
 src/include/pgstat.h                          |  2 +
 src/include/replication/reorderbuffer.h       |  8 +++
 src/test/recovery/t/006_logical_decoding.pl   | 12 ++---
 src/test/regress/expected/rules.out           |  4 +-
 18 files changed, 126 insertions(+), 49 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index de6dc416130..867a8506051 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,28 +37,34 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | t          | t
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes 
+------------------------+------------+-------------+------------+-------------+-----------+------------
+ regression_slot_stats1 | t          | t           | t          | t           |         1 | t
+ regression_slot_stats2 | t          | t           | t          | t           |         1 | t
+ regression_slot_stats3 | t          | t           | t          | t           |         1 | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
 -- reset stats for one slot, others should be unaffected
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 0 for the slot whose stats were reset since the background
+-- transactions are always skipped and not transaction, which would be sent
+-- downstream, has happened since the reset.
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  pg_stat_reset_replication_slot 
 --------------------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes 
+------------------------+------------+-------------+------------+-------------+-----------+------------
+ regression_slot_stats1 | t          | t           | f          | f           |         0 | f
+ regression_slot_stats2 | t          | t           | t          | t           |         1 | t
+ regression_slot_stats3 | t          | t           | t          | t           |         1 | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +74,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | f          | f
- regression_slot_stats3 | t          | t           | f          | f
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes 
+------------------------+------------+-------------+------------+-------------+-----------+------------
+ regression_slot_stats1 | t          | t           | f          | f           |         0 | f
+ regression_slot_stats2 | t          | t           | f          | f           |         0 | f
+ regression_slot_stats3 | t          | t           | f          | f           |         0 | f
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | sent_txns | sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------+------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |         0 |          0 | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | sent_txns | sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------+------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |         0 |          0 | 
 (1 row)
 
 -- spilling the xact
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index a022fe1bf07..8581387d867 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 0 for the slot whose stats were reset since the background
+-- transactions are always skipped and not transaction, which would be sent
+-- downstream, has happened since the reset.
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d8..ba887d752eb 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -26,7 +26,9 @@ sub test_slot_stats
 	my $result = $node->safe_psql(
 		'postgres', qq[
 		SELECT slot_name, total_txns > 0 AS total_txn,
-			   total_bytes > 0 AS total_bytes
+			   total_bytes > 0 AS total_bytes,
+			   sent_txns > 0 AS sent_txn,
+			   sent_bytes > 0 AS sent_bytes
 			   FROM pg_stat_replication_slots
 			   ORDER BY slot_name]);
 	is($result, $expected, $msg);
@@ -80,9 +82,9 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+	qq(regression_slot1|t|t|t|t
+regression_slot2|t|t|t|t
+regression_slot3|t|t|t|t),
 	'check replication statistics are updated');
 
 # Test to remove one of the replication slots and adjust
@@ -104,8 +106,8 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t),
+	qq(regression_slot1|t|t|t|t
+regression_slot2|t|t|t|t),
 	'check replication statistics after removing the slot file');
 
 # cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index bb495563200..4a7e918efbe 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -310,6 +310,7 @@ static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
 	OutputPluginPrepareWrite(ctx, last_write);
+	ctx->reorder->sentTxns++;
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 	else
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4265a22d4de..e5af284df90 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1644,6 +1644,33 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent downstream for
+        this slot. This counts top-level transactions only, and is not incremented
+        for subtransactions. These transactions are subset of transctions
+        sent to the decoding plugin. Hence this count is expected to be lesser than or equal to <structfield>total_txns</structfield>.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transaction changes sent downstream for this slot. The
+        amount of WAL corresponding to the changes sent downstream is subset of
+        the total WAL sent to the decoding plugin. But the amount of data sent
+        downstream for a given decoded WAL record may not match the size of the
+        WAL record. Thus values in this column do not have strong correlation
+        with the values in <structfield>total_bytes</structfield>.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 08f780a2e63..bcc210dd754 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1053,6 +1053,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_bytes,
             s.total_txns,
             s.total_bytes,
+            s.sent_txns,
+            s.sent_bytes,
             s.stats_reset
     FROM pg_replication_slots as r,
         LATERAL pg_stat_get_replication_slot(slot_name) as s
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1eb798f3e9..14615fa70d7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1958,7 +1958,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
 		 rb,
 		 rb->spillTxns,
 		 rb->spillCount,
@@ -1967,7 +1967,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 rb->streamCount,
 		 rb->streamBytes,
 		 rb->totalTxns,
-		 rb->totalBytes);
+		 rb->totalBytes,
+		 rb->sentTxns,
+		 rb->sentBytes);
 
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
@@ -1977,6 +1979,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_bytes = rb->streamBytes;
 	repSlotStat.total_txns = rb->totalTxns;
 	repSlotStat.total_bytes = rb->totalBytes;
+	repSlotStat.sent_txns = rb->sentTxns;
+	repSlotStat.sent_bytes = rb->sentBytes;
 
 	pgstat_report_replslot(ctx->slot, &repSlotStat);
 
@@ -1988,6 +1992,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->streamBytes = 0;
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
+	rb->sentTxns = 0;
+	rb->sentBytes = 0;
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index ca53caac2f2..f85e21b1a50 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -89,6 +89,8 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+	ctx->reorder->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId);
 	p->returned_rows++;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c4299c76fb1..fa4c61192e8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -392,6 +392,8 @@ ReorderBufferAllocate(void)
 	buffer->streamBytes = 0;
 	buffer->totalTxns = 0;
 	buffer->totalBytes = 0;
+	buffer->sentTxns = 0;
+	buffer->sentBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 082b4d9d327..daafac7049c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -589,6 +589,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
 	txndata->sent_begin_txn = true;
+	ctx->reorder->sentTxns++;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f2c33250e8b..f097aa92c64 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1569,6 +1569,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
+	ctx->reorder->sentBytes += ctx->out->len + 1;	/* +1 for the 'd' */
+
 	CHECK_FOR_INTERRUPTS();
 
 	/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index ccfb11c49bf..583ac090cdd 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -96,6 +96,8 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 	REPLSLOT_ACC(stream_bytes);
 	REPLSLOT_ACC(total_txns);
 	REPLSLOT_ACC(total_bytes);
+	REPLSLOT_ACC(sent_txns);
+	REPLSLOT_ACC(sent_bytes);
 #undef REPLSLOT_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 1c12ddbae49..4ed405c6ad5 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 12
 	text	   *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData	slotname;
 	TupleDesc	tupdesc;
@@ -2129,7 +2129,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "sent_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "sent_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2154,11 +2158,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	values[6] = Int64GetDatum(slotent->stream_bytes);
 	values[7] = Int64GetDatum(slotent->total_txns);
 	values[8] = Int64GetDatum(slotent->total_bytes);
+	values[9] = Int64GetDatum(slotent->sent_txns);
+	values[10] = Int64GetDatum(slotent->sent_bytes);
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[9] = true;
+		nulls[11] = true;
 	else
-		values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[11] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb4f7f50350..49990bfe42e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5675,9 +5675,9 @@
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,sent_txns,sent_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 378f2f2c2ba..814c6024ba1 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -395,6 +395,8 @@ typedef struct PgStat_StatReplSlotEntry
 	PgStat_Counter stream_bytes;
 	PgStat_Counter total_txns;
 	PgStat_Counter total_bytes;
+	PgStat_Counter sent_txns;
+	PgStat_Counter sent_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..b22e046a9ad 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -696,6 +696,14 @@ struct ReorderBuffer
 	 */
 	int64		totalTxns;		/* total number of transactions sent */
 	int64		totalBytes;		/* total amount of data decoded */
+
+	/*
+	 * Statistics about the transactions decoded by the output plugin and sent
+	 * downstream.
+	 */
+	int64		sentTxns;		/* number of transactions decoded and sent
+								 * downstream */
+	int64		sentBytes;		/* amount of data decoded and sent downstream */
 };
 
 
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 2137c4e5e30..03b688f54eb 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -212,10 +212,10 @@ my $stats_test_slot2 = 'logical_slot';
 # Stats exist for stats test slot 1
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT total_bytes > 0, sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+	qq(t|t|t),
+	qq(Total bytes and sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
 );
 
 # Do reset of stats for stats test slot 1
@@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres',
 
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, sent_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+	qq(t|t|t),
+	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and sent_bytes were set to 0.)
 );
 
 # Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6cf828ca8d0..9fcf73152cd 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2131,9 +2131,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_bytes,
     s.total_txns,
     s.total_bytes,
+    s.sent_txns,
+    s.sent_bytes,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, sent_txns, sent_bytes, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,

base-commit: 3431e3e4aa3a33e8411f15e76c284cdd4c54ca28
-- 
2.34.1

