Track statistics for streaming of in-progress transactions
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular
slot. Users can pass NULL in pg_stat_reset_replication_slot to reset
stats of all the slots.
Commit 9868167500 has added the basic infrastructure to capture the
stats of slot and this commit extends the statistics collector to
track additional information about slots.
This patch was originally written by Ajin Cherian [1]/messages/by-id/CAFPTHDZ8RnOovefzB+OMoRxLSD404WRLqWBUHe6bWqM5ew1bNA@mail.gmail.com. I have fixed
bugs and modified some comments in the code.
Thoughts?
[1]: /messages/by-id/CAFPTHDZ8RnOovefzB+OMoRxLSD404WRLqWBUHe6bWqM5ew1bNA@mail.gmail.com
--
With Regards,
Amit Kapila
Attachments:
v2-0001-Track-statistics-for-streaming-of-changes-from-Re.patchapplication/octet-stream; name=v2-0001-Track-statistics-for-streaming-of-changes-from-Re.patchDownload
From f385b89e938efaeab201abddf507fa47e5c43a1e Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 14 Oct 2020 08:28:35 +0530
Subject: [PATCH v4] Track statistics for streaming of changes from
ReorderBuffer.
This adds the statistics about transactions streamed to the decoding
output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular slot.
Users can pass NULL in pg_stat_reset_replication_slot to reset stats of
all the slots.
Commit 9868167500 has added the basic infrastructure to capture the stats
of slot and this commit extends the statistics collector to track
additional information about slots.
Author: Ajin Cherian and Amit Kapila
---
doc/src/sgml/monitoring.sgml | 34 +++++++++++++++++++
src/backend/catalog/system_views.sql | 3 ++
src/backend/postmaster/pgstat.c | 11 +++++-
src/backend/replication/logical/logical.c | 19 +++++++----
.../replication/logical/reorderbuffer.c | 20 +++++++++++
src/backend/replication/slot.c | 2 +-
src/backend/utils/adt/pgstatfuncs.c | 9 +++--
src/include/catalog/pg_proc.dat | 6 ++--
src/include/pgstat.h | 8 ++++-
src/include/replication/reorderbuffer.h | 3 ++
src/test/regress/expected/rules.out | 5 ++-
11 files changed, 104 insertions(+), 16 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 66566765f0..5b5222e3fa 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2629,6 +2629,40 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of in-progress transactions streamed to the decoding output plugin
+ after memory used by logical decoding exceeds
+ <literal>logical_decoding_work_mem</literal>. Streaming only works with
+ toplevel transactions (subtransactions can't be streamed independently),
+ so the counter does not get incremented for subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_count</structfield><type>bigint</type>
+ </para>
+ <para>
+ Number of times in-progress transactions were streamed to subscriber.
+ Transactions may get streamed repeatedly, and this counter gets incremented
+ on every such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_bytes</structfield><type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded in-progress transaction data streamed to subscriber.
+ </para>
+ </entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c29390760f..dd5584f1d0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0ebc62..b3513f3943 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
*/
void
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes)
+ int spillbytes, int streamtxns, int streamcount, int streambytes)
{
PgStat_MsgReplSlot msg;
@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
msg.m_spill_txns = spilltxns;
msg.m_spill_count = spillcount;
msg.m_spill_bytes = spillbytes;
+ msg.m_stream_txns = streamtxns;
+ msg.m_stream_count = streamcount;
+ msg.m_stream_bytes = streambytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ replSlotStats[idx].stream_txns += msg->m_stream_txns;
+ replSlotStats[idx].stream_count += msg->m_stream_count;
+ replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
}
}
@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].spill_txns = 0;
replSlotStats[i].spill_count = 0;
replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stream_txns = 0;
+ replSlotStats[i].stream_count = 0;
+ replSlotStats[i].stream_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8675832f4d..d5cfbeaa4a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder;
/*
- * Nothing to do if we haven't spilled anything since the last time the
- * stats has been sent.
+ * Nothing to do if we haven't spilled or streamed anything since the last
+ * time the stats has been sent.
*/
- if (rb->spillBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
- (long long) rb->spillBytes);
+ (long long) rb->spillBytes,
+ (long long) rb->streamTxns,
+ (long long) rb->streamCount,
+ (long long) rb->streamBytes);
pgstat_report_replslot(NameStr(ctx->slot->data.name),
- rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns, rb->spillCount, rb->spillBytes,
+ rb->streamTxns, rb->streamCount, rb->streamBytes);
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
+ rb->streamTxns = 0;
+ rb->streamCount = 0;
+ rb->streamBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4cb27f2224..8585d1d6c7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
+ buffer->streamTxns = 0;
+ buffer->streamCount = 0;
+ buffer->streamBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -3440,6 +3443,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
Snapshot snapshot_now;
CommandId command_id;
+ Size stream_bytes;
+ bool txn_is_streamed;
/* We can never reach here for a subtransaction. */
Assert(txn->toptxn == NULL);
@@ -3520,10 +3525,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->snapshot_now = NULL;
}
+ /*
+ * Remember this information to be used later to update stats. We can't
+ * update the stats here as an error while processing the changes would
+ * lead to the accumulation of stats even though we haven't streamed all
+ * the changes.
+ */
+ txn_is_streamed = rbtxn_is_streamed(txn);
+ stream_bytes = txn->total_size;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+ rb->streamCount += 1;
+ rb->streamBytes += stream_bytes;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 220b4cd6e9..09be1d8c48 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
- pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
/*
* Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 0d0d2e6d2b..ae87bc1953 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[1] = Int64GetDatum(s->spill_txns);
values[2] = Int64GetDatum(s->spill_count);
values[3] = Int64GetDatum(s->spill_bytes);
+ values[4] = Int64GetDatum(s->stream_txns);
+ values[5] = Int64GetDatum(s->stream_count);
+ values[6] = Int64GetDatum(s->stream_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[4] = true;
+ nulls[7] = true;
else
- values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 22340baf1c..1b64aa831e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5261,9 +5261,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o}',
- proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o}',
+ proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4f15..960533beb2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
PgStat_Counter m_spill_bytes;
+ PgStat_Counter m_stream_txns;
+ PgStat_Counter m_stream_count;
+ PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot;
@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter spill_txns;
PgStat_Counter spill_count;
PgStat_Counter spill_bytes;
+ PgStat_Counter stream_txns;
+ PgStat_Counter stream_count;
+ PgStat_Counter stream_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes);
+ int spillbytes, int streamtxns, int streamcount, int streambytes);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0cc3aebb11..84d67db32e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -543,6 +543,9 @@ struct ReorderBuffer
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillBytes; /* amount of data spilled to disk */
+ int64 streamTxns; /* number of transactions streamed to the decoding output plugin */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamBytes; /* amount of data streamed to subscriber */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index cf2a9b4408..576166dac5 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
--
2.28.0.windows.1
On Wed, Oct 14, 2020 at 2:39 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular
slot. Users can pass NULL in pg_stat_reset_replication_slot to reset
stats of all the slots.Commit 9868167500 has added the basic infrastructure to capture the
stats of slot and this commit extends the statistics collector to
track additional information about slots.This patch was originally written by Ajin Cherian [1]. I have fixed
bugs and modified some comments in the code.Thoughts?
[1] - /messages/by-id/CAFPTHDZ8RnOovefzB+OMoRxLSD404WRLqWBUHe6bWqM5ew1bNA@mail.gmail.com
I've applied the patch. It applies cleanly. I've reviewed the patch
and have no comments to report.
I have also run some tests to get streaming stats as well as reset the
stats counter, everything seems to be working as expected.
I am fine with the changes.
regards,
Ajin Cherian
Fujitsu Australia
On Mon, Oct 19, 2020 at 1:52 PM Ajin Cherian <itsajin@gmail.com> wrote:
On Wed, Oct 14, 2020 at 2:39 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular
slot. Users can pass NULL in pg_stat_reset_replication_slot to reset
stats of all the slots.Commit 9868167500 has added the basic infrastructure to capture the
stats of slot and this commit extends the statistics collector to
track additional information about slots.This patch was originally written by Ajin Cherian [1]. I have fixed
bugs and modified some comments in the code.Thoughts?
[1] - /messages/by-id/CAFPTHDZ8RnOovefzB+OMoRxLSD404WRLqWBUHe6bWqM5ew1bNA@mail.gmail.com
I've applied the patch. It applies cleanly. I've reviewed the patch
and have no comments to report.
I have also run some tests to get streaming stats as well as reset the
stats counter, everything seems to be working as expected.
I am fine with the changes.
Thanks. One thing I have considered while updating this patch was to
write a test case similar to what we have for spilled stats in
test_decoding/sql/stats.sql but I decided not to do it as that doesn't
seem to add much value for the streaming case because we already have
some tests in test_decoding/sql/stream.sql which indicates that the
streaming is happening. If we could have a way to get the exact
streaming stats then it would have been better but while writing tests
for spilled stats we found that it is not possible because some
background transactions (like autovacuum) might send the stats earlier
making the actual number inconsistent. What do you think?
Sawada-San, do you have any thoughts on this matter?
--
With Regards,
Amit Kapila.
On Tue, 20 Oct 2020 at 14:29, Amit Kapila <amit.kapila16@gmail.com> wrote:
On Mon, Oct 19, 2020 at 1:52 PM Ajin Cherian <itsajin@gmail.com> wrote:
On Wed, Oct 14, 2020 at 2:39 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular
slot. Users can pass NULL in pg_stat_reset_replication_slot to reset
stats of all the slots.Commit 9868167500 has added the basic infrastructure to capture the
stats of slot and this commit extends the statistics collector to
track additional information about slots.This patch was originally written by Ajin Cherian [1]. I have fixed
bugs and modified some comments in the code.Thoughts?
[1] - /messages/by-id/CAFPTHDZ8RnOovefzB+OMoRxLSD404WRLqWBUHe6bWqM5ew1bNA@mail.gmail.com
I've applied the patch. It applies cleanly. I've reviewed the patch
and have no comments to report.
I have also run some tests to get streaming stats as well as reset the
stats counter, everything seems to be working as expected.
I am fine with the changes.Thanks. One thing I have considered while updating this patch was to
write a test case similar to what we have for spilled stats in
test_decoding/sql/stats.sql but I decided not to do it as that doesn't
seem to add much value for the streaming case because we already have
some tests in test_decoding/sql/stream.sql which indicates that the
streaming is happening. If we could have a way to get the exact
streaming stats then it would have been better but while writing tests
for spilled stats we found that it is not possible because some
background transactions (like autovacuum) might send the stats earlier
making the actual number inconsistent. What do you think?Sawada-San, do you have any thoughts on this matter?
I basically agree with that. Reading the patch, I have a question that
might be relevant to this matter:
The patch has the following code:
+ /*
+ * Remember this information to be used later to update stats. We can't
+ * update the stats here as an error while processing the changes would
+ * lead to the accumulation of stats even though we haven't streamed all
+ * the changes.
+ */
+ txn_is_streamed = rbtxn_is_streamed(txn);
+ stream_bytes = txn->total_size;
The commend seems to mention only about when an error happened while
processing the changes but I wonder if the same is true for the
aborted transaction. That is, if we catch an error due to concurrent
transaction abort while processing the changes, we stop to stream the
changes. But the patch accumulates the stats even in this case. If we
don’t want to accumulate the stats of the abort transaction and it’s
easily reproducible, it might be better to add a test checking if we
don’t accumulate in that case.
Regards,
--
Masahiko Sawada http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Wed, Oct 21, 2020 at 8:15 AM Masahiko Sawada
<masahiko.sawada@2ndquadrant.com> wrote:
On Tue, 20 Oct 2020 at 14:29, Amit Kapila <amit.kapila16@gmail.com> wrote:
Thanks. One thing I have considered while updating this patch was to
write a test case similar to what we have for spilled stats in
test_decoding/sql/stats.sql but I decided not to do it as that doesn't
seem to add much value for the streaming case because we already have
some tests in test_decoding/sql/stream.sql which indicates that the
streaming is happening. If we could have a way to get the exact
streaming stats then it would have been better but while writing tests
for spilled stats we found that it is not possible because some
background transactions (like autovacuum) might send the stats earlier
making the actual number inconsistent. What do you think?Sawada-San, do you have any thoughts on this matter?
I basically agree with that. Reading the patch, I have a question that
might be relevant to this matter:The patch has the following code:
+ /* + * Remember this information to be used later to update stats. We can't + * update the stats here as an error while processing the changes would + * lead to the accumulation of stats even though we haven't streamed all + * the changes. + */ + txn_is_streamed = rbtxn_is_streamed(txn); + stream_bytes = txn->total_size;The commend seems to mention only about when an error happened while
processing the changes but I wonder if the same is true for the
aborted transaction. That is, if we catch an error due to concurrent
transaction abort while processing the changes, we stop to stream the
changes. But the patch accumulates the stats even in this case.
It would only add for the current stream and I don't think that is
wrong because we would have sent some data (at least the start
message) for which we send the stream_stop message later while
decoding Abort message. I had thought to avoid this we can update the
stats in ReorderBufferProcessTXN at the end when we know streaming is
complete but again that would miss the counter update for the data we
have sent before an error has occurred and also updating the streaming
counters in ReorderBufferStreamTXN seems more logical to me.
If we
don’t want to accumulate the stats of the abort transaction and it’s
easily reproducible, it might be better to add a test checking if we
don’t accumulate in that case.
But as explained above, I think we count it as we would have sent at
least one message (could be more) before we encounter this error.
--
With Regards,
Amit Kapila.
On Tue, Oct 20, 2020 at 4:29 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
Thanks. One thing I have considered while updating this patch was to
write a test case similar to what we have for spilled stats in
test_decoding/sql/stats.sql but I decided not to do it as that doesn't
seem to add much value for the streaming case because we already have
some tests in test_decoding/sql/stream.sql which indicates that the
streaming is happening. If we could have a way to get the exact
streaming stats then it would have been better but while writing tests
for spilled stats we found that it is not possible because some
background transactions (like autovacuum) might send the stats earlier
making the actual number inconsistent. What do you think?
I agree. If the stat numbers can't be guaranteed to be consistent it's
not worth writing specific tests for this.
regards,
Ajin Cherian
Fujitsu Australia
On Wed, Oct 14, 2020 at 9:09 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer.
I have reviewed the attached patch, I have one comment
+ int64 streamTxns; /* number of transactions streamed to the decoding
output plugin */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamBytes; /* amount of data streamed to subscriber */
I think instead of saying "amount of data streamed to subscriber" it
should be " amount of data streamed to the decoding output plugin"
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Thu, Oct 22, 2020 at 11:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Wed, Oct 14, 2020 at 9:09 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer.I have reviewed the attached patch, I have one comment
+ int64 streamTxns; /* number of transactions streamed to the decoding output plugin */ + int64 streamCount; /* streaming invocation counter */ + int64 streamBytes; /* amount of data streamed to subscriber */I think instead of saying "amount of data streamed to subscriber" it
should be " amount of data streamed to the decoding output plugin"
Thanks, I think a similar change is required in docs as well. One more
thing I was considering whether to change docs to explain stream_count
and stream_txns somewhat more clearly based on what I have posted for
spilled_count and spilled_txns in the other thread [1]/messages/by-id/CAA4eK1LdPQucvp9St2D6NhO9aQ2KKr3U0yAbKDox2UC86Q+_zg@mail.gmail.com? Do you think
that patch is an improvement over what we have now? If yes, we can
adapt the similar changes here as well, otherwise, we can leave it as
it is.
[1]: /messages/by-id/CAA4eK1LdPQucvp9St2D6NhO9aQ2KKr3U0yAbKDox2UC86Q+_zg@mail.gmail.com
--
With Regards,
Amit Kapila.
On Thu, Oct 22, 2020 at 2:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Oct 22, 2020 at 11:52 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Wed, Oct 14, 2020 at 9:09 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Commit 464824323e has added the support of the streaming of
in-progress transactions into the built-in logical replication. The
attached patch adds the statistics about transactions streamed to the
decoding output plugin from ReorderBuffer.I have reviewed the attached patch, I have one comment
+ int64 streamTxns; /* number of transactions streamed to the decoding output plugin */ + int64 streamCount; /* streaming invocation counter */ + int64 streamBytes; /* amount of data streamed to subscriber */I think instead of saying "amount of data streamed to subscriber" it
should be " amount of data streamed to the decoding output plugin"Thanks, I think a similar change is required in docs as well.
I have fixed the above comment and rebased the patch. I have changed
the docs a bit to add more explanation about the counters. Let me know
if you have any more comments. Thanks Dilip and Sawada-San for
reviewing this patch.
--
With Regards,
Amit Kapila.
Attachments:
v3-0001-Track-statistics-for-streaming-of-changes-from-Re.patchapplication/octet-stream; name=v3-0001-Track-statistics-for-streaming-of-changes-from-Re.patchDownload
From 12fd4d12c30c08df3d104d3204c3f3b5dd7f35fd Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 23 Oct 2020 10:00:14 +0530
Subject: [PATCH v3] Track statistics for streaming of changes from
ReorderBuffer.
This adds the statistics about transactions streamed to the decoding
output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular slot.
Users can pass NULL in pg_stat_reset_replication_slot to reset stats of
all the slots.
Commit 9868167500 has added the basic infrastructure to capture the stats
of slot and this commit extends the statistics collector to track
additional information about slots.
Author: Ajin Cherian and Amit Kapila
Reviewed-by: Sawada Masahiko and Dilip Kumar
Discussion: https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com
---
doc/src/sgml/monitoring.sgml | 39 +++++++++++++++++++
src/backend/catalog/system_views.sql | 3 ++
src/backend/postmaster/pgstat.c | 11 +++++-
src/backend/replication/logical/logical.c | 19 ++++++---
.../replication/logical/reorderbuffer.c | 20 ++++++++++
src/backend/replication/slot.c | 2 +-
src/backend/utils/adt/pgstatfuncs.c | 9 +++--
src/include/catalog/pg_proc.dat | 6 +--
src/include/pgstat.h | 8 +++-
src/include/replication/reorderbuffer.h | 5 +++
src/test/regress/expected/rules.out | 5 ++-
11 files changed, 111 insertions(+), 16 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index f5cf163c8c..90a20b1e31 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2629,6 +2629,45 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of in-progress transactions streamed to the decoding output plugin
+ after the memory used by logical decoding of changes from WAL for this
+ slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only
+ works with toplevel transactions (subtransactions can't be streamed
+ independently), so the counter does not get incremented for subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_count</structfield><type>bigint</type>
+ </para>
+ <para>
+ Number of times in-progress transactions were streamed to the decoding
+ output plugin while performing decoding of changes from WAL for this
+ slot. Transactions may get streamed repeatedly, and this counter gets
+ incremented on every such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_bytes</structfield><type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded in-progress transaction data streamed to the decoding
+ output plugin while performing decoding of changes from WAL for this
+ slot. This and other streaming counters for this slot can be used to
+ gauge the network I/O occurred during logical decoding and can be used
+ to tune <literal>logical_decoding_work_mem</literal>.
+ </para>
+ </entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 85cd147e21..2e4aa1c4b6 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0ebc62..b3513f3943 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
*/
void
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes)
+ int spillbytes, int streamtxns, int streamcount, int streambytes)
{
PgStat_MsgReplSlot msg;
@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
msg.m_spill_txns = spilltxns;
msg.m_spill_count = spillcount;
msg.m_spill_bytes = spillbytes;
+ msg.m_stream_txns = streamtxns;
+ msg.m_stream_count = streamcount;
+ msg.m_stream_bytes = streambytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ replSlotStats[idx].stream_txns += msg->m_stream_txns;
+ replSlotStats[idx].stream_count += msg->m_stream_count;
+ replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
}
}
@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].spill_txns = 0;
replSlotStats[i].spill_count = 0;
replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stream_txns = 0;
+ replSlotStats[i].stream_count = 0;
+ replSlotStats[i].stream_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8675832f4d..d5cfbeaa4a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder;
/*
- * Nothing to do if we haven't spilled anything since the last time the
- * stats has been sent.
+ * Nothing to do if we haven't spilled or streamed anything since the last
+ * time the stats has been sent.
*/
- if (rb->spillBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
- (long long) rb->spillBytes);
+ (long long) rb->spillBytes,
+ (long long) rb->streamTxns,
+ (long long) rb->streamCount,
+ (long long) rb->streamBytes);
pgstat_report_replslot(NameStr(ctx->slot->data.name),
- rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns, rb->spillCount, rb->spillBytes,
+ rb->streamTxns, rb->streamCount, rb->streamBytes);
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
+ rb->streamTxns = 0;
+ rb->streamCount = 0;
+ rb->streamBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7a8bf76079..c1bd68011c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
+ buffer->streamTxns = 0;
+ buffer->streamCount = 0;
+ buffer->streamBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
Snapshot snapshot_now;
CommandId command_id;
+ Size stream_bytes;
+ bool txn_is_streamed;
/* We can never reach here for a subtransaction. */
Assert(txn->toptxn == NULL);
@@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->snapshot_now = NULL;
}
+ /*
+ * Remember this information to be used later to update stats. We can't
+ * update the stats here as an error while processing the changes would
+ * lead to the accumulation of stats even though we haven't streamed all
+ * the changes.
+ */
+ txn_is_streamed = rbtxn_is_streamed(txn);
+ stream_bytes = txn->total_size;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+ rb->streamCount += 1;
+ rb->streamBytes += stream_bytes;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 220b4cd6e9..09be1d8c48 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
- pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
/*
* Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 472fa596e1..a210fc93b4 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 5
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[1] = Int64GetDatum(s->spill_txns);
values[2] = Int64GetDatum(s->spill_count);
values[3] = Int64GetDatum(s->spill_bytes);
+ values[4] = Int64GetDatum(s->stream_txns);
+ values[5] = Int64GetDatum(s->stream_count);
+ values[6] = Int64GetDatum(s->stream_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[4] = true;
+ nulls[7] = true;
else
- values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bbcac69d48..70fb76a7c9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5261,9 +5261,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4f15..960533beb2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
PgStat_Counter m_spill_bytes;
+ PgStat_Counter m_stream_txns;
+ PgStat_Counter m_stream_count;
+ PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot;
@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter spill_txns;
PgStat_Counter spill_count;
PgStat_Counter spill_bytes;
+ PgStat_Counter stream_txns;
+ PgStat_Counter stream_count;
+ PgStat_Counter stream_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes);
+ int spillbytes, int streamtxns, int streamcount, int streambytes);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1c77819aad..dfdda938b2 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -551,6 +551,11 @@ struct ReorderBuffer
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillBytes; /* amount of data spilled to disk */
+
+ /* Statistics about transactions streamed to the decoding output plugin */
+ int64 streamTxns; /* number of transactions streamed */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamBytes; /* amount of data streamed */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 492cdcf74c..097ff5d111 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
--
2.28.0.windows.1
On Fri, Oct 23, 2020 at 10:24 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Oct 22, 2020 at 2:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I have fixed the above comment and rebased the patch. I have changed
the docs a bit to add more explanation about the counters. Let me know
if you have any more comments. Thanks Dilip and Sawada-San for
reviewing this patch.
Attached is an updated patch with minor changes in docs and cosmetic
changes. I am planning to push this patch tomorrow unless there are
any more comments/suggestions.
--
With Regards,
Amit Kapila.
Attachments:
v4-0001-Track-statistics-for-streaming-of-changes-from-Re.patchapplication/octet-stream; name=v4-0001-Track-statistics-for-streaming-of-changes-from-Re.patchDownload
From 555af50f5aa2633564faf6ab210f34f3b034d284 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 28 Oct 2020 08:36:51 +0530
Subject: [PATCH v4] Track statistics for streaming of changes from
ReorderBuffer.
This adds the statistics about transactions streamed to the decoding
output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular slot.
Users can pass NULL in pg_stat_reset_replication_slot to reset stats of
all the slots.
Commit 9868167500 has added the basic infrastructure to capture the stats
of slot and this commit extends the statistics collector to track
additional information about slots.
Author: Ajin Cherian and Amit Kapila
Reviewed-by: Sawada Masahiko and Dilip Kumar
Discussion: https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com
---
doc/src/sgml/monitoring.sgml | 38 +++++++++++++++++++++++++
src/backend/catalog/system_views.sql | 3 ++
src/backend/postmaster/pgstat.c | 11 ++++++-
src/backend/replication/logical/logical.c | 19 +++++++++----
src/backend/replication/logical/reorderbuffer.c | 20 +++++++++++++
src/backend/replication/slot.c | 2 +-
src/backend/utils/adt/pgstatfuncs.c | 9 ++++--
src/include/catalog/pg_proc.dat | 6 ++--
src/include/pgstat.h | 8 +++++-
src/include/replication/reorderbuffer.h | 5 ++++
src/test/regress/expected/rules.out | 5 +++-
11 files changed, 110 insertions(+), 16 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 313e44e..98e1995 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2634,6 +2634,44 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<row>
<entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of in-progress transactions streamed to the decoding output plugin
+ after the memory used by logical decoding of changes from WAL for this
+ slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only
+ works with toplevel transactions (subtransactions can't be streamed
+ independently), so the counter does not get incremented for subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_count</structfield><type>bigint</type>
+ </para>
+ <para>
+ Number of times in-progress transactions were streamed to the decoding
+ output plugin while decoding changes from WAL for this slot. Transactions
+ may get streamed repeatedly, and this counter gets incremented on every
+ such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_bytes</structfield><type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded in-progress transaction data streamed to the decoding
+ output plugin while decoding changes from WAL for this slot. This and other
+ streaming counters for this slot can be used to gauge the network I/O which
+ occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
+ </para>
+ </entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
</para>
<para>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 85cd147..2e4aa1c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 822f0eb..f1dca2f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
*/
void
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes)
+ int spillbytes, int streamtxns, int streamcount, int streambytes)
{
PgStat_MsgReplSlot msg;
@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
msg.m_spill_txns = spilltxns;
msg.m_spill_count = spillcount;
msg.m_spill_bytes = spillbytes;
+ msg.m_stream_txns = streamtxns;
+ msg.m_stream_count = streamcount;
+ msg.m_stream_bytes = streambytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ replSlotStats[idx].stream_txns += msg->m_stream_txns;
+ replSlotStats[idx].stream_count += msg->m_stream_count;
+ replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
}
}
@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].spill_txns = 0;
replSlotStats[i].spill_count = 0;
replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stream_txns = 0;
+ replSlotStats[i].stream_count = 0;
+ replSlotStats[i].stream_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8675832..d5cfbea 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder;
/*
- * Nothing to do if we haven't spilled anything since the last time the
- * stats has been sent.
+ * Nothing to do if we haven't spilled or streamed anything since the last
+ * time the stats has been sent.
*/
- if (rb->spillBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
- (long long) rb->spillBytes);
+ (long long) rb->spillBytes,
+ (long long) rb->streamTxns,
+ (long long) rb->streamCount,
+ (long long) rb->streamBytes);
pgstat_report_replslot(NameStr(ctx->slot->data.name),
- rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns, rb->spillCount, rb->spillBytes,
+ rb->streamTxns, rb->streamCount, rb->streamBytes);
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
+ rb->streamTxns = 0;
+ rb->streamCount = 0;
+ rb->streamBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7a8bf76..c1bd680 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
+ buffer->streamTxns = 0;
+ buffer->streamCount = 0;
+ buffer->streamBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
Snapshot snapshot_now;
CommandId command_id;
+ Size stream_bytes;
+ bool txn_is_streamed;
/* We can never reach here for a subtransaction. */
Assert(txn->toptxn == NULL);
@@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->snapshot_now = NULL;
}
+ /*
+ * Remember this information to be used later to update stats. We can't
+ * update the stats here as an error while processing the changes would
+ * lead to the accumulation of stats even though we haven't streamed all
+ * the changes.
+ */
+ txn_is_streamed = rbtxn_is_streamed(txn);
+ stream_bytes = txn->total_size;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+ rb->streamCount += 1;
+ rb->streamBytes += stream_bytes;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 220b4cd..09be1d8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
- pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
/*
* Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 472fa59..a210fc9 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 5
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[1] = Int64GetDatum(s->spill_txns);
values[2] = Int64GetDatum(s->spill_count);
values[3] = Int64GetDatum(s->spill_bytes);
+ values[4] = Int64GetDatum(s->stream_txns);
+ values[5] = Int64GetDatum(s->stream_count);
+ values[6] = Int64GetDatum(s->stream_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[4] = true;
+ nulls[7] = true;
else
- values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bbcac69..70fb76a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5261,9 +5261,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index a821ff4..257e515 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
PgStat_Counter m_spill_bytes;
+ PgStat_Counter m_stream_txns;
+ PgStat_Counter m_stream_count;
+ PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot;
@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter spill_txns;
PgStat_Counter spill_count;
PgStat_Counter spill_bytes;
+ PgStat_Counter stream_txns;
+ PgStat_Counter stream_count;
+ PgStat_Counter stream_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes);
+ int spillbytes, int streamtxns, int streamcount, int streambytes);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1c77819..dfdda93 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -551,6 +551,11 @@ struct ReorderBuffer
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillBytes; /* amount of data spilled to disk */
+
+ /* Statistics about transactions streamed to the decoding output plugin */
+ int64 streamTxns; /* number of transactions streamed */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamBytes; /* amount of data streamed */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 492cdcf..097ff5d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
--
1.8.3.1
On Wed, Oct 28, 2020 at 08:54:53AM +0530, Amit Kapila wrote:
On Fri, Oct 23, 2020 at 10:24 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Oct 22, 2020 at 2:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I have fixed the above comment and rebased the patch. I have changed
the docs a bit to add more explanation about the counters. Let me know
if you have any more comments. Thanks Dilip and Sawada-San for
reviewing this patch.Attached is an updated patch with minor changes in docs and cosmetic
changes. I am planning to push this patch tomorrow unless there are
any more comments/suggestions.
+1 and thanks for working on this
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
On Thu, Oct 29, 2020 at 5:16 AM Tomas Vondra
<tomas.vondra@2ndquadrant.com> wrote:
On Wed, Oct 28, 2020 at 08:54:53AM +0530, Amit Kapila wrote:
On Fri, Oct 23, 2020 at 10:24 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Thu, Oct 22, 2020 at 2:09 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
I have fixed the above comment and rebased the patch. I have changed
the docs a bit to add more explanation about the counters. Let me know
if you have any more comments. Thanks Dilip and Sawada-San for
reviewing this patch.Attached is an updated patch with minor changes in docs and cosmetic
changes. I am planning to push this patch tomorrow unless there are
any more comments/suggestions.+1 and thanks for working on this
Pushed!
--
With Regards,
Amit Kapila.