From 31d5e1ed8ced85ea01de18fbd70af0ad38956e55 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Thu, 9 Jan 2020 09:45:27 +0530
Subject: [PATCH v6 09/12] Track statistics for streaming

---
 doc/src/sgml/monitoring.sgml                  | 25 +++++++++++++++
 src/backend/catalog/system_views.sql          |  5 ++-
 .../replication/logical/reorderbuffer.c       | 13 ++++++++
 src/backend/replication/walsender.c           | 32 ++++++++++++++++---
 src/include/catalog/pg_proc.dat               |  6 ++--
 src/include/replication/reorderbuffer.h       | 13 +++++---
 src/include/replication/walsender_private.h   |  5 +++
 src/test/regress/expected/rules.out           |  7 ++--
 8 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index dcb58115af..180ea880a4 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1996,6 +1996,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       may get spilled repeatedly, and this counter gets incremented on every
       such invocation.</entry>
     </row>
+    <row>
+     <entry><structfield>stream_txns</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>Number of in-progress transactions streamed to subscriber after
+      memory used by logical decoding exceeds <literal>logical_work_mem</literal>.
+      Streaming only works with toplevel transactions (subtransactions can't
+      be streamed independently), so the counter does not get incremented for
+      subtransactions.
+      </entry>
+    </row>
+    <row>
+     <entry><structfield>stream_count</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>Number of times in-progress transactions were streamed to subscriber.
+      Transactions may get streamed repeatedly, and this counter gets incremented
+      on every such invocation.
+      </entry>
+    </row>
+    <row>
+     <entry><structfield>stream_bytes</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>Amount of decoded in-progress transaction data streamed to subscriber.
+      </entry>
+    </row>
+
    </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 773edf85e7..cb9e6ee9ea 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -785,7 +785,10 @@ CREATE VIEW pg_stat_replication AS
             W.reply_time,
             W.spill_txns,
             W.spill_count,
-            W.spill_bytes
+            W.spill_bytes,
+            W.stream_txns,
+            W.stream_count,
+            W.stream_bytes
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8e4744f73a..16515d0a12 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -331,6 +331,10 @@ ReorderBufferAllocate(void)
 	buffer->spillTxns = 0;
 	buffer->spillBytes = 0;
 
+	buffer->streamCount = 0;
+	buffer->streamTxns = 0;
+	buffer->streamBytes = 0;
+
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -3264,6 +3268,15 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
 
+	/*
+	 * Update the stream statistics.
+	 */
+	rb->streamCount += 1;
+	rb->streamBytes += txn->size;
+
+	/* Don't consider already streamed transaction. */
+	rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1;
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 63fc2c7ff2..d7f22ae960 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1293,7 +1293,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'update_progress' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical),
- * and update the spill statistics.
+ * and update the spill/stream statistics.
  */
 static void
 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
@@ -1314,7 +1314,8 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 	sendTime = now;
 
 	/*
-	 * Update statistics about transactions that spilled to disk.
+	 * Update statistics about transactions that spilled to disk or streamed to
+	 * subscriber (before being committed).
 	 */
 	UpdateSpillStats(ctx);
 }
@@ -2357,6 +2358,9 @@ InitWalSenderSlot(void)
 			walsnd->spillTxns = 0;
 			walsnd->spillCount = 0;
 			walsnd->spillBytes = 0;
+			walsnd->streamTxns = 0;
+			walsnd->streamCount = 0;
+			walsnd->streamBytes = 0;
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3196,7 +3200,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	15
+#define PG_STAT_GET_WAL_SENDERS_COLS	18
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -3253,6 +3257,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		int64		spillTxns;
 		int64		spillCount;
 		int64		spillBytes;
+		int64		streamTxns;
+		int64		streamCount;
+		int64		streamBytes;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -3276,6 +3283,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		spillTxns = walsnd->spillTxns;
 		spillCount = walsnd->spillCount;
 		spillBytes = walsnd->spillBytes;
+		streamTxns = walsnd->streamTxns;
+		streamCount = walsnd->streamCount;
+		streamBytes = walsnd->streamBytes;
 		SpinLockRelease(&walsnd->mutex);
 
 		memset(nulls, 0, sizeof(nulls));
@@ -3362,6 +3372,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			values[12] = Int64GetDatum(spillTxns);
 			values[13] = Int64GetDatum(spillCount);
 			values[14] = Int64GetDatum(spillBytes);
+
+			/* stream over-sized transactions */
+			values[15] = Int64GetDatum(streamTxns);
+			values[16] = Int64GetDatum(streamCount);
+			values[17] = Int64GetDatum(streamBytes);
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -3610,11 +3625,18 @@ UpdateSpillStats(LogicalDecodingContext *ctx)
 	MyWalSnd->spillCount = rb->spillCount;
 	MyWalSnd->spillBytes = rb->spillBytes;
 
-	elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
+	MyWalSnd->streamTxns = rb->streamTxns;
+	MyWalSnd->streamCount = rb->streamCount;
+	MyWalSnd->streamBytes = rb->streamBytes;
+
+	elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
-		 (long long) rb->spillBytes);
+		 (long long) rb->spillBytes,
+		 (long long) rb->streamTxns,
+		 (long long) rb->streamCount,
+		 (long long) rb->streamBytes);
 
 	SpinLockRelease(&MyWalSnd->mutex);
 }
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 427faa3c3b..9ef4fbf4f9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5173,9 +5173,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8,int8,int8,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0510d3831f..7259c66c3f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -524,15 +524,20 @@ struct ReorderBuffer
 	Size		size;
 
 	/*
-	 * Statistics about transactions spilled to disk.
+	 * Statistics about transactions streamed or spilled to disk.
 	 *
-	 * A single transaction may be spilled repeatedly, which is why we keep
-	 * two different counters. For spilling, the transaction counter includes
-	 * both toplevel transactions and subtransactions.
+	 * A single transaction may be streamed/spilled repeatedly, which is
+	 * why we keep two different counters. For spilling, the transaction
+	 * counter includes both toplevel transactions and subtransactions.
+	 * For streaming, it only includes toplevel transactions (we never
+	 * stream individual subtransactions).
 	 */
 	int64		spillCount;		/* spill-to-disk invocation counter */
 	int64		spillTxns;		/* number of transactions spilled to disk  */
 	int64		spillBytes;		/* amount of data spilled to disk */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamTxns;		/* number of transactions spilled to disk */
+	int64		streamBytes;	/* amount of data streamed to subscriber */
 };
 
 
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f0a4..3888b0c2f8 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -85,6 +85,11 @@ typedef struct WalSnd
 	int64		spillTxns;
 	int64		spillCount;
 	int64		spillBytes;
+
+	/* Statistics for in-progress transactions streamed to subscriber. */
+	int64           streamTxns;
+	int64           streamCount;
+	int64           streamBytes;
 } WalSnd;
 
 extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62eaf90a0f..2dcb063912 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1960,9 +1960,12 @@ pg_stat_replication| SELECT s.pid,
     w.reply_time,
     w.spill_txns,
     w.spill_count,
-    w.spill_bytes
+    w.spill_bytes,
+    w.stream_txns,
+    w.stream_count,
+    w.stream_bytes
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,
-- 
2.20.1

