Per query FDW network stat collection
Hello,
I have implemented per query network stat collection for FDW. It is done
in a similar way to how buffer and WAL stats are collected and it can be
seen with a new NETWORK option for explain command:
explain (analyze, network) insert into itrtest values (2, 'blah');
QUERY PLAN
-----------------------------------------------------------------------------------------------
Insert on itrtest (cost=0.00..0.01 rows=0 width=0) (actual
time=0.544..0.544 rows=0 loops=1)
Network: FDW bytes sent=197 received=72, wait_time=0.689
-> Result (cost=0.00..0.01 rows=1 width=36) (actual
time=0.003..0.003 rows=1 loops=1)
Planning Time: 0.025 ms
Execution Time: 0.701 ms
(5 rows)
I am yet to add corresponding columns to pg_stat_statements, write tests
and documentation, but before I go ahead with that, I would like to know
what the community thinks about the patch.
Regards,
Ilya Gladyshev
Attachments:
0001-adds-per-query-FDW-network-usage-stats.patchtext/x-patch; charset=UTF-8; name=0001-adds-per-query-FDW-network-usage-stats.patchDownload
From 3ffbe071480672189c2e03d7e54707c77ba58b0b Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev <i.gladyshev@postgrespro.ru>
Date: Mon, 23 Aug 2021 21:37:31 +0300
Subject: [PATCH] adds per query FDW network usage stats
Adds means for collecting network usage stats and outputting it in
explain with NETWORK option. Implements network stats collection for
postgres_fdw via adding a hook for stat collection to libpq.
---
contrib/postgres_fdw/connection.c | 57 ++++++++++++++++++++++++-
contrib/postgres_fdw/postgres_fdw.c | 28 +++++++++++++
src/backend/access/heap/vacuumlazy.c | 5 ++-
src/backend/access/nbtree/nbtsort.c | 14 ++++---
src/backend/commands/explain.c | 62 ++++++++++++++++++++++++++++
src/backend/executor/execParallel.c | 28 ++++++++++---
src/backend/executor/instrument.c | 57 +++++++++++++++++++++----
src/backend/utils/misc/guc.c | 10 ++++-
src/include/commands/explain.h | 1 +
src/include/executor/execParallel.h | 1 +
src/include/executor/instrument.h | 25 ++++++++---
src/interfaces/libpq/exports.txt | 1 +
src/interfaces/libpq/fe-misc.c | 2 +
src/interfaces/libpq/fe-secure.c | 4 ++
src/interfaces/libpq/libpq-fe.h | 5 ++-
15 files changed, 271 insertions(+), 29 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 82aa14a65de..3f479a74ba1 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -571,10 +571,22 @@ void
do_sql_command(PGconn *conn, const char *sql)
{
PGresult *res;
+ instr_time start, duration;
+
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -684,10 +696,14 @@ GetPrepStmtNumber(PGconn *conn)
PGresult *
pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
{
+ PGresult *res;
+ instr_time start, duration;
/* First, process a pending asynchronous request, if any. */
if (state && state->pendingAreq)
process_pending_request(state->pendingAreq);
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -696,7 +712,14 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
pgfdw_report_error(ERROR, NULL, conn, false, query);
/* Wait for the result. */
- return pgfdw_get_result(conn, query);
+ res = pgfdw_get_result(conn, query);
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
+ return res;
}
/*
@@ -717,6 +740,10 @@ pgfdw_get_result(PGconn *conn, const char *query)
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
for (;;)
{
PGresult *res;
@@ -750,6 +777,13 @@ pgfdw_get_result(PGconn *conn, const char *query)
PQclear(last_res);
last_res = res;
}
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
}
PG_CATCH();
{
@@ -893,7 +927,18 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (entry->have_prep_stmt && entry->have_error)
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
res = PQexec(entry->conn, "DEALLOCATE ALL");
+
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
PQclear(res);
}
entry->have_prep_stmt = false;
@@ -1329,6 +1374,10 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
/* In what follows, do not leak any PGresults on an error. */
PG_TRY();
{
+ instr_time start, duration;
+ if (track_fdw_wait_timing)
+ INSTR_TIME_SET_CURRENT(start);
+
for (;;)
{
PGresult *res;
@@ -1377,6 +1426,12 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
last_res = res;
}
exit: ;
+ if (track_fdw_wait_timing)
+ {
+ INSTR_TIME_SET_CURRENT(duration);
+ INSTR_TIME_SUBTRACT(duration, start);
+ INSTR_TIME_ADD(pgNetUsage.fdw_wait_time, duration);
+ }
}
PG_CATCH();
{
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9d443baf02a..e511c4f8a73 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -60,6 +60,7 @@ PG_MODULE_MAGIC;
/* If no remote estimates, assume a sort costs 20% extra */
#define DEFAULT_FDW_SORT_MULTIPLIER 1.2
+PQnetworkStats_hook_type prev_PQnetworkStats;
/*
* Indexes of FDW-private information stored in fdw_private lists.
*
@@ -313,6 +314,14 @@ typedef struct
List *already_used; /* expressions already dealt with */
} ec_member_foreign_arg;
+void _PG_init(void);
+void _PG_fini(void);
+
+/*
+ * Hooks
+ */
+static void PostgresFdw_PQnetworkStats(ssize_t bytesReceived, ssize_t bytesSent);
+
/*
* SQL functions
*/
@@ -7438,3 +7447,22 @@ get_batch_size_option(Relation rel)
return batch_size;
}
+static void
+PostgresFdw_PQnetworkStats(ssize_t bytesSent, ssize_t bytesReceived)
+{
+ pgNetUsage.fdw_recv_bytes += bytesReceived;
+ pgNetUsage.fdw_sent_bytes += bytesSent;
+}
+
+void
+_PG_init(void)
+{
+ prev_PQnetworkStats = PQnetworkStats_hook;
+ PQnetworkStats_hook = PostgresFdw_PQnetworkStats;
+}
+
+void
+_PG_fini(void)
+{
+ PQnetworkStats_hook = prev_PQnetworkStats;
+}
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 334d8a2aa71..a94d61f1ec4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2800,7 +2800,7 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
WaitForParallelWorkersToFinish(lps->pcxt);
for (int i = 0; i < lps->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
+ InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i], NULL);
}
/*
@@ -4243,7 +4243,8 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
+ &wal_usage[ParallelWorkerNumber],
+ NULL);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 54c8eb1289d..055fbd86e44 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1663,11 +1663,11 @@ _bt_end_parallel(BTLeader *btleader)
WaitForParallelWorkersToFinish(btleader->pcxt);
/*
- * Next, accumulate WAL usage. (This must wait for the workers to finish,
- * or we might get incomplete data.)
+ * Next, accumulate WAL and buffer usage. (This must wait for the workers
+ * to finish, or we might get incomplete data.)
*/
for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]);
+ InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i], NULL);
/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(btleader->snapshot))
@@ -1870,11 +1870,15 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
sharedsort2, sortmem, false);
- /* Report WAL/buffer usage during parallel execution */
+ /*
+ * Report WAL/buffer usage during parallel execution. No need to report
+ * network
+ */
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
- &walusage[ParallelWorkerNumber]);
+ &walusage[ParallelWorkerNumber],
+ NULL);
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 10644dfac44..617cd28acd3 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -122,6 +122,7 @@ static const char *explain_get_index_name(Oid indexId);
static void show_buffer_usage(ExplainState *es, const BufferUsage *usage,
bool planning);
static void show_wal_usage(ExplainState *es, const WalUsage *usage);
+static void show_net_usage(ExplainState *es, const NetworkUsage * usage);
static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir,
ExplainState *es);
static void ExplainScanTarget(Scan *plan, ExplainState *es);
@@ -188,6 +189,8 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
es->buffers = defGetBoolean(opt);
else if (strcmp(opt->defname, "wal") == 0)
es->wal = defGetBoolean(opt);
+ else if (strcmp(opt->defname, "network") == 0)
+ es->network = defGetBoolean(opt);
else if (strcmp(opt->defname, "settings") == 0)
es->settings = defGetBoolean(opt);
else if (strcmp(opt->defname, "timing") == 0)
@@ -232,6 +235,12 @@ ExplainQuery(ParseState *pstate, ExplainStmt *stmt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("EXPLAIN option WAL requires ANALYZE")));
+ if (es->network && !es->analyze)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("EXPLAIN option NETWORK requires ANALYZE")));
+
+
/* if the timing was not set explicitly, set default value */
es->timing = (timing_set) ? es->timing : es->analyze;
@@ -538,6 +547,8 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
instrument_option |= INSTRUMENT_BUFFERS;
if (es->wal)
instrument_option |= INSTRUMENT_WAL;
+ if (es->network)
+ instrument_option |= INSTRUMENT_NETWORK;
/*
* We always collect timing for the entire statement, even when node-level
@@ -2048,6 +2059,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
show_buffer_usage(es, &planstate->instrument->bufusage, false);
if (es->wal && planstate->instrument)
show_wal_usage(es, &planstate->instrument->walusage);
+ if (es->network && planstate->instrument)
+ show_net_usage(es, &planstate->instrument->netusage);
/* Prepare per-worker buffer/WAL usage */
if (es->workers_state && (es->buffers || es->wal) && es->verbose)
@@ -3617,6 +3630,55 @@ show_buffer_usage(ExplainState *es, const BufferUsage *usage, bool planning)
}
}
+/*
+ * Show network usage details.
+ */
+static void
+show_net_usage(ExplainState *es, const NetworkUsage * usage)
+{
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ bool has_data = (usage->fdw_recv_bytes > 0) || (usage->fdw_sent_bytes > 0);
+ bool has_timing = !INSTR_TIME_IS_ZERO(usage->fdw_wait_time);
+
+ /* Show only positive counter values. */
+ if (has_data)
+ {
+ ExplainIndentText(es);
+ appendStringInfoString(es->str, "Network: FDW bytes");
+
+ if (usage->fdw_sent_bytes > 0)
+ appendStringInfo(es->str, " sent=" UINT64_FORMAT,
+ usage->fdw_sent_bytes);
+ if (usage->fdw_recv_bytes > 0)
+ appendStringInfo(es->str, " received=" UINT64_FORMAT,
+ usage->fdw_recv_bytes);
+ }
+ if (has_timing)
+ {
+ appendStringInfo(es->str, ", wait_time=%0.3f",
+ INSTR_TIME_GET_MILLISEC(usage->fdw_wait_time));
+ }
+ if (has_data || has_timing)
+ {
+ appendStringInfoChar(es->str, '\n');
+ }
+ }
+ else
+ {
+ ExplainPropertyInteger("FDW Bytes Sent", NULL,
+ usage->fdw_sent_bytes, es);
+ ExplainPropertyInteger("FDW Bytes Received", NULL,
+ usage->fdw_recv_bytes, es);
+ if (track_fdw_wait_timing)
+ {
+ ExplainPropertyFloat("FDW Wait Time", NULL,
+ INSTR_TIME_GET_MILLISEC(usage->fdw_wait_time),
+ 3, es);
+ }
+ }
+}
+
/*
* Show WAL usage details.
*/
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f8a4a40e7b5..4b9603113ca 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,7 +12,7 @@
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer/WAL usage info, and
+ * any ParamListInfo associated with the query, buffer/WAL/network usage info, and
* the actual plan to be passed down to the worker.
*
* IDENTIFICATION
@@ -66,6 +66,7 @@
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_NETWORK_USAGE UINT64CONST(0xE00000000000000B)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -599,6 +600,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *paramlistinfo_space;
BufferUsage *bufusage_space;
WalUsage *walusage_space;
+ NetworkUsage *netusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
@@ -679,6 +681,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Same for network */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(NetworkUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -767,6 +774,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
pei->wal_usage = walusage_space;
+ /* Same for network */
+ netusage_space = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(NetworkUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_NETWORK_USAGE, netusage_space);
+ pei->net_usage = netusage_space;
+
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
@@ -1159,11 +1172,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
WaitForParallelWorkersToFinish(pei->pcxt);
/*
- * Next, accumulate buffer/WAL usage. (This must wait for the workers to
- * finish, or we might get incomplete data.)
+ * Next, accumulate buffer/WAL/network usage. (This must wait for the
+ * workers to finish, or we might get incomplete data.)
*/
for (i = 0; i < nworkers; i++)
- InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
+ InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i], &pei->net_usage[i]);
pei->finished = true;
}
@@ -1396,6 +1409,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
+ NetworkUsage *net_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
@@ -1469,11 +1483,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Shut down the executor */
ExecutorFinish(queryDesc);
- /* Report buffer/WAL usage during parallel execution. */
+ /* Report buffer/WAL/network usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ net_usage = shm_toc_lookup(toc, PARALLEL_KEY_NETWORK_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
+ &wal_usage[ParallelWorkerNumber],
+ &net_usage[ParallelWorkerNumber]);
/* Report instrumentation data if any instrumentation options are set. */
if (instrumentation != NULL)
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index 2b106d8473c..31e897a86fe 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -21,10 +21,14 @@ BufferUsage pgBufferUsage;
static BufferUsage save_pgBufferUsage;
WalUsage pgWalUsage;
static WalUsage save_pgWalUsage;
+NetworkUsage pgNetUsage;
+static NetworkUsage save_pgNetUsage;
+
+bool track_fdw_wait_timing;
static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
static void WalUsageAdd(WalUsage *dst, WalUsage *add);
-
+static void NetUsageAdd(NetworkUsage *dst, const NetworkUsage *add);
/* Allocate new instrumentation structure(s) */
Instrumentation *
@@ -34,11 +38,13 @@ InstrAlloc(int n, int instrument_options, bool async_mode)
/* initialize all fields to zeroes, then modify as needed */
instr = palloc0(n * sizeof(Instrumentation));
- if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL))
+ if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER |
+ INSTRUMENT_WAL | INSTRUMENT_NETWORK))
{
bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+ bool need_network = (instrument_options & INSTRUMENT_NETWORK) != 0;
int i;
for (i = 0; i < n; i++)
@@ -46,6 +52,7 @@ InstrAlloc(int n, int instrument_options, bool async_mode)
instr[i].need_bufusage = need_buffers;
instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
+ instr[i].need_netusage = need_network;
instr[i].async_mode = async_mode;
}
}
@@ -61,6 +68,7 @@ InstrInit(Instrumentation *instr, int instrument_options)
instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+ instr->need_netusage = (instrument_options & INSTRUMENT_NETWORK) != 0;
}
/* Entry to a plan node */
@@ -77,6 +85,9 @@ InstrStartNode(Instrumentation *instr)
if (instr->need_walusage)
instr->walusage_start = pgWalUsage;
+
+ if (instr->need_netusage)
+ instr->netusage_start = pgNetUsage;
}
/* Exit from a plan node */
@@ -103,12 +114,14 @@ InstrStopNode(Instrumentation *instr, double nTuples)
/* Add delta of buffer usage since entry to node's totals */
if (instr->need_bufusage)
- BufferUsageAccumDiff(&instr->bufusage,
- &pgBufferUsage, &instr->bufusage_start);
+ BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage,
+ &instr->bufusage_start);
if (instr->need_walusage)
- WalUsageAccumDiff(&instr->walusage,
- &pgWalUsage, &instr->walusage_start);
+ WalUsageAccumDiff(&instr->walusage, &pgWalUsage, &instr->walusage_start);
+
+ if (instr->need_netusage)
+ NetUsageAccumDiff(&instr->netusage, &pgNetUsage, &instr->netusage_start);
/* Is this the first tuple of this cycle? */
if (!instr->running)
@@ -193,6 +206,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add)
if (dst->need_walusage)
WalUsageAdd(&dst->walusage, &add->walusage);
+
+ if (dst->need_netusage)
+ NetUsageAdd(&dst->netusage, &add->netusage);
}
/* note current values during parallel executor startup */
@@ -201,24 +217,32 @@ InstrStartParallelQuery(void)
{
save_pgBufferUsage = pgBufferUsage;
save_pgWalUsage = pgWalUsage;
+ save_pgNetUsage = pgNetUsage;
}
/* report usage after parallel executor shutdown */
void
-InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
+InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage)
{
memset(bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
memset(walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
+ if (netusage != NULL)
+ {
+ memset(netusage, 0, sizeof(NetworkUsage));
+ NetUsageAccumDiff(netusage, &pgNetUsage, &save_pgNetUsage);
+ }
}
/* accumulate work done by workers in leader's stats */
void
-InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
+InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage)
{
BufferUsageAdd(&pgBufferUsage, bufusage);
WalUsageAdd(&pgWalUsage, walusage);
+ if (netusage != NULL)
+ NetUsageAdd(&pgNetUsage, netusage);
}
/* dst += add */
@@ -277,3 +301,20 @@ WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
dst->wal_records += add->wal_records - sub->wal_records;
dst->wal_fpi += add->wal_fpi - sub->wal_fpi;
}
+
+void
+NetUsageAccumDiff(NetworkUsage * dst, const NetworkUsage * add,
+ const NetworkUsage * sub)
+{
+ dst->fdw_recv_bytes += add->fdw_recv_bytes - sub->fdw_recv_bytes;
+ dst->fdw_sent_bytes += add->fdw_sent_bytes - sub->fdw_sent_bytes;
+ INSTR_TIME_ACCUM_DIFF(dst->fdw_wait_time, add->fdw_wait_time, sub->fdw_wait_time);
+}
+
+static void
+NetUsageAdd(NetworkUsage * dst, const NetworkUsage * add)
+{
+ dst->fdw_recv_bytes += add->fdw_recv_bytes;
+ dst->fdw_sent_bytes += add->fdw_sent_bytes;
+ INSTR_TIME_ADD(dst->fdw_wait_time, add->fdw_wait_time);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 467b0fd6fe7..75d4f574951 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1552,7 +1552,15 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
-
+ {
+ {"track_fdw_wait_timing", PGC_SUSET, STATS_COLLECTOR,
+ gettext_noop("Collects statistics for foreign source waiting time."),
+ NULL
+ },
+ &track_fdw_wait_timing,
+ false,
+ NULL, NULL, NULL
+ },
{
{"update_process_title", PGC_SUSET, PROCESS_TITLE,
gettext_noop("Updates the process title to show the active SQL command."),
diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h
index e94d9e49cf6..a212d87f037 100644
--- a/src/include/commands/explain.h
+++ b/src/include/commands/explain.h
@@ -43,6 +43,7 @@ typedef struct ExplainState
bool costs; /* print estimated costs */
bool buffers; /* print buffer usage */
bool wal; /* print WAL usage */
+ bool network; /* print network usage */
bool timing; /* print detailed node timing */
bool summary; /* print total planning and execution timing */
bool settings; /* print modified settings */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 3888175a2f4..22118628a51 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
WalUsage *wal_usage; /* walusage area in DSM */
+ NetworkUsage *net_usage; /* netusage area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index 2f9905b7c8e..29c8a7c6c39 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -51,6 +51,13 @@ typedef struct WalUsage
uint64 wal_bytes; /* size of WAL records produced */
} WalUsage;
+typedef struct NetworkUsage
+{
+ uint64 fdw_recv_bytes; /* Bytes received from foreign source */
+ uint64 fdw_sent_bytes; /* Bytes sent to foreign targets */
+ instr_time fdw_wait_time; /* Time spent waiting for response from foreign source. */
+} NetworkUsage;
+
/* Flag bits included in InstrAlloc's instrument_options bitmask */
typedef enum InstrumentOption
{
@@ -58,6 +65,7 @@ typedef enum InstrumentOption
INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */
INSTRUMENT_ROWS = 1 << 2, /* needs row count */
INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */
+ INSTRUMENT_NETWORK = 1 << 4, /* needs network usage */
INSTRUMENT_ALL = PG_INT32_MAX
} InstrumentOption;
@@ -67,6 +75,7 @@ typedef struct Instrumentation
bool need_timer; /* true if we need timer data */
bool need_bufusage; /* true if we need buffer usage data */
bool need_walusage; /* true if we need WAL usage data */
+ bool need_netusage; /* true if we need network usage data */
bool async_mode; /* true if node is in async mode */
/* Info about current plan cycle: */
bool running; /* true if we've completed first tuple */
@@ -76,6 +85,7 @@ typedef struct Instrumentation
double tuplecount; /* # of tuples emitted so far this cycle */
BufferUsage bufusage_start; /* buffer usage at start */
WalUsage walusage_start; /* WAL usage at start */
+ NetworkUsage netusage_start; /* network usage at start */
/* Accumulated statistics across all completed cycles: */
double startup; /* total startup time (in seconds) */
double total; /* total time (in seconds) */
@@ -86,6 +96,7 @@ typedef struct Instrumentation
double nfiltered2; /* # of tuples removed by "other" quals */
BufferUsage bufusage; /* total buffer usage */
WalUsage walusage; /* total WAL usage */
+ NetworkUsage netusage; /* total network usage */
} Instrumentation;
typedef struct WorkerInstrumentation
@@ -96,6 +107,9 @@ typedef struct WorkerInstrumentation
extern PGDLLIMPORT BufferUsage pgBufferUsage;
extern PGDLLIMPORT WalUsage pgWalUsage;
+extern PGDLLIMPORT NetworkUsage pgNetUsage;
+
+extern PGDLLIMPORT bool track_fdw_wait_timing;
extern Instrumentation *InstrAlloc(int n, int instrument_options,
bool async_mode);
@@ -106,11 +120,12 @@ extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
extern void InstrEndLoop(Instrumentation *instr);
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
extern void InstrStartParallelQuery(void);
-extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
-extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
-extern void BufferUsageAccumDiff(BufferUsage *dst,
- const BufferUsage *add, const BufferUsage *sub);
+extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage);
+extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage, NetworkUsage *netusage);
+extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add,
+ const BufferUsage *sub);
extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,
const WalUsage *sub);
-
+extern void NetUsageAccumDiff(NetworkUsage *dst, const NetworkUsage *add,
+ const NetworkUsage *sub);
#endif /* INSTRUMENT_H */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc883709..6663b28eb8d 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus 183
PQsetTraceFlags 184
PQmblenBounded 185
PQsendFlushRequest 186
+PQnetworkStats_hook 187
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 9a2a9702934..067c1aa51d7 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,6 +53,8 @@
#include "pg_config_paths.h"
#include "port/pg_bswap.h"
+PQnetworkStats_hook_type PQnetworkStats_hook = NULL;
+
static int pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
static int pqSendSome(PGconn *conn, int len);
static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c
index b15d8d137ce..c3653482e5e 100644
--- a/src/interfaces/libpq/fe-secure.c
+++ b/src/interfaces/libpq/fe-secure.c
@@ -224,6 +224,8 @@ pqsecure_read(PGconn *conn, void *ptr, size_t len)
{
n = pqsecure_raw_read(conn, ptr, len);
}
+ if (PQnetworkStats_hook)
+ PQnetworkStats_hook(0, n);
return n;
}
@@ -307,6 +309,8 @@ pqsecure_write(PGconn *conn, const void *ptr, size_t len)
n = pqsecure_raw_write(conn, ptr, len);
}
+ if (PQnetworkStats_hook)
+ PQnetworkStats_hook(n, 0);
return n;
}
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index a6fd69acebc..552a61b3c27 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -27,7 +27,7 @@ extern "C"
* such as Oid.
*/
#include "postgres_ext.h"
-
+#include "c.h"
/*
* These symbols may be used in compile-time #ifdef tests for the availability
* of newer libpq features.
@@ -646,6 +646,9 @@ extern int PQdsplen(const char *s, int encoding);
/* Get encoding id from environment variable PGCLIENTENCODING */
extern int PQenv2encoding(void);
+typedef void (*PQnetworkStats_hook_type) (ssize_t bytesSent, ssize_t bytesReceived);
+extern PGDLLEXPORT PQnetworkStats_hook_type PQnetworkStats_hook;
+
/* === in fe-auth.c === */
extern char *PQencryptPassword(const char *passwd, const char *user);
--
2.30.2
On Tue, Aug 24, 2021 at 5:12 PM Ilya Gladyshev
<i.gladyshev@postgrespro.ru> wrote:
I have implemented per query network stat collection for FDW. It is done
in a similar way to how buffer and WAL stats are collected and it can be
seen with a new NETWORK option for explain command:explain (analyze, network) insert into itrtest values (2, 'blah');
QUERY PLAN
-----------------------------------------------------------------------------------------------
Insert on itrtest (cost=0.00..0.01 rows=0 width=0) (actual
time=0.544..0.544 rows=0 loops=1)
Network: FDW bytes sent=197 received=72, wait_time=0.689
[...]
It sound like a really useful metric to have.
However I'm not sure that having a new "network" option is the best
way for that. It seems confusing as IIUC it won't be catching all
network activity (like fe/be activity, or network disk...) but only
FDW activity. I think it would be better to have those information
retrieved when using the verbose option rather than a new one.
Similarly, I'm afraid that INSTRUMENT_NETWORK could be misleading,
although I don't have any better proposal right now.
On 24.08.2021 12:19, Julien Rouhaud wrote:
However I'm not sure that having a new "network" option is the best
way for that. It seems confusing as IIUC it won't be catching all
network activity (like fe/be activity, or network disk...) but only
FDW activity. I think it would be better to have those information
retrieved when using the verbose option rather than a new one.
Similarly, I'm afraid that INSTRUMENT_NETWORK could be misleading,
although I don't have any better proposal right now.
I am also doubtful about this naming. Initially, I wanted to add fe/be
activity as one of the metrics, but then decided to restrict myself to
FDW for now. However, I decided to leave "network" as it is, because to
me it makes sense to have all the network-related metrics under a single
explain option (and a single instrumentation flag perhaps), in case more
are added later. The struct fields used for collection internally tell
explicitly that they are meant to be used only for FDW stats and the
explain output also mentions that the displayed stats are for FDW
network activity.