[PATCH v3] pg_progress() SQL function to monitor progression of long running SQL queries/utilities
Hello,
This is version 3 of a new SQL function written in order to monitor long
running SQL queries/utilities in Postgres backends.
Previous patches were using a command named PROGRESS. This was restricting
the use to psql only.
Rationale for the new SQL function
=========================
The purpose of the new SQL function is to monitor the progression of long
running SQL queries/utilities in backend processes. It can be used to
evaluate when a query will complete and consequently whether it will
complete within a reasonable amount of time.
The function is named pg_progress(pid, verbosity).
Arguments are:
- pid : process pid of a given backend. This pid can be found using
pg_stat_activity(). This value can be also 0, in which case progress of all
backend processes is requested, one by one. Only regular backends can be
monitored. Specific backends such as WAL writer for instance are not
concerned by the new function.
- verbosity: this value ranges from 0 to 3. Higher value provide more
details. This also controls whether the disk space resource is exhibited.
The function returns a set of rows similar to name/value pairs reflecting
the execution tree and its progression for long running nodes. Long running
nodes are for instance SeqScan, IndexScan, Sort using tapes, TupleStore (on
disks), Limit, HashJoin.
The row fields returned by the SQL function are:
- pid: process pid of the backend being monitored
- ppid: parent pid or master processs pid for parallel queries
- bid: backend Id
- lineid: line number of a given backend
- indent: indentation of the node. This field and lineid are used to order
and indent the output, in order to reflect the execution tree.
- type: 3 node types exists:
- nodes which are for instance SeqScan, HasjJoin, Sorts.
- relationship between nodes and child nodes of the execution tree
- properties exhibiting the progress execution of long running nodes
- name: the name of the node/relationship/property. This is the name in the
name/value pairs.
- value: the value of the node.
- unit: the unit for the above value field.
SQL query run in the monitoring process can be used to exhibit the
execution tree progress.
test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value, unit FROM pg_progress(0,0);
indent field can be used to reflect the tree nature of the execution tree.
Use case
=======
A use case is shown in the below example based on a table named t_10m with
10 millions rows.
The table has been created with :
CREATE TABLE T_10M ( id integer, md5 text);
INSERT INTO T_10M SELECT generate_series(1,10000000) AS id,
md5(random()::text) AS md5;
ANALYSE t_10m;
ANALYSE is desired to be run in order to have statistics updated.
1/ Start a first psql session to run a long SQL query:
[pgadm@rco ~]$ psql -A -d test
psql (10devel)
Type "help" for help.
test=#
The option -A is used to allow unaligned rows in the output. Otherwise psql
might slow down progression of the query.
Redirect output to a file in order to let the query run without terminal
interaction:
test=# \o file
Start a long running query:
test=# select * from t_10M order by md5;
2/ In a second psql session, list the backend pid(s) and their matching SQL
query
[pgadm@rco ~]$ psql -d test
psql (10devel)
Type "help" for help.
test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value, unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)
test=#
Chose the pid of the backend running the long SQL query to be monitored, or
use 0 as above for the pid to get all backends progress report.
The sort is done on tapes because the table has 10 millions of rows.
The output shows the progression in terms of:
- blocks with:
- already fetched blocks
- total blocks to be fetched
- percentage of completion
Each node may have different output types.
Progression is reported in terms of rows, blocks, bytes, and percentages.
Sort nodes show tapes blocks being used and step phases (merge/sort).
Parallel queries can be monitored by observing progress of both master and
worker backends. In one psql session, run:
test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value, unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+-------+-----+---------------------+-----------------+---------
26924 | 0 | 4 | status | query running |
26924 | 0 | 4 | relationship | progression |
26924 | 0 | 4 | node name | Gather Merge |
26924 | 0 | 4 | relationship | Outer |
26924 | 0 | 4 | node name | Sort |
26924 | 0 | 4 | completion | 0 | percent
26924 | 0 | 4 | relationship | Outer |
26924 | 0 | 4 | node name | Seq Scan |
26924 | 0 | 4 | node mode | parallel |
26924 | 0 | 4 | scan on | t_10m |
26924 | 0 | 4 | scan mode | parallel |
26924 | 0 | 4 | fetched | 38231 | block
26924 | 0 | 4 | total | 132403 | block
26924 | 0 | 4 | completion | 28 | percent
26926 | 26924 | 6 | relationship | child worker |
26926 | 26924 | 6 | node name | Sort |
26926 | 26924 | 6 | completion | 0 | percent
26926 | 26924 | 6 | relationship | Outer |
26926 | 26924 | 6 | node name | Seq Scan |
26926 | 26924 | 6 | node mode | parallel |
26926 | 26924 | 6 | scan on | t_10m |
26926 | 26924 | 6 | scan mode | parallel |
26926 | 26924 | 6 | fetched | 38246 | block
26926 | 26924 | 6 | total | 132403 | block
26926 | 26924 | 6 | completion | 28 | percent
26925 | 26924 | 5 | relationship | child worker |
26925 | 26924 | 5 | node name | Sort |
26925 | 26924 | 5 | completion | 0 | percent
26925 | 26924 | 5 | relationship | Outer |
26925 | 26924 | 5 | node name | Seq Scan |
26925 | 26924 | 5 | node mode | parallel |
26925 | 26924 | 5 | scan on | t_10m |
26925 | 26924 | 5 | scan mode | parallel |
26925 | 26924 | 5 | fetched | 38283 | block
26925 | 26924 | 5 | total | 132403 | block
26925 | 26924 | 5 | completion | 28 | percent
(36 rows)
test=#
Above output shows a master backend for which pid is 26924 and worker
backends are 26926 and 26925.
Nodes types are SeqScan and Sort.
Design of the SQL function
===================
The design of the SQL function relies on a per backend shared memory
structure named ProgressCtl. This structure contains fields for progression
requests and responses.
The monitoring process fills the request fields in the per backend
ProgressCtl structure. Once the request fields are set, it sends a signal
to the monitored process and starts waiting on a per backend latch, for the
response of the monitored backend to be available.
The monitored process, upon receiving the signal stops temporarily its
execution and note the progression request. It then continues its execution
until interruption can be serviced. At this moment, it disables interrupts
and walks through its execution tree to collect entirely the progress of
each node of the execution tree. This is done in one step interrupting the
backend SQL query execution. Structure ProgressState is used to manage the
progress report. It is stored in the monitored backend process. Then, it
dumps the progress report in a per backend shared memory page and set the
latch to allow the monitoring backend to wake up and collect the report put
in shared memory. The monitored backend continues the execution of its SQL
query.
The monitoring backend collects the progress report in shared memory, and
sends it to its psql session as a list of rows.
If the one shared memory page is not enough for the whole progress report,
the progress report transfert between the 2 backends is done with a series
of request/response. Before setting the latch, the monitored backend write
the size of the data dumped in shared memory and set a status to indicate
that more data is to be sent through the shared memory page. The monitoring
backends get the result and sends an other signal, and then wait for the
latch again. The monitored backend does not collect a new progress report
but continues to dump the already collected report. And the exchange goes
on until the full progress report has been dumped.
Should the backend process disappear, the wait on latch will eventually
complete after a timeout value which is guc variable. Default is 5 seconds
which should be enough for most cases.
Should the monitoring process disappear, the monitored backed will dump
data in shared memory and continue its execution. On the next progression
request, the monitored backend starts a new progress report.
When the monitored process execute a SQL query, it collects progress of
each long running node with counters of rows, blocks. This data collection
incurs a very low overhead on the running query.
At the time a monitored backend deals with a progress report, the SQL query
must be in a consistent state without partial nodes under allocation or
freeing. This is needed to walk through the execution tree at this moment,
in the middle of the SQL query execution. This is enforced by a flag added
in the executor. The progression is only collected if the backend is in the
function of ExecutorRun() which is known based on the execution flag. If
execution tree is in a consistent state, it is collected and dumped in the
shared memory page. The dump of the progress report is only done upon
receiving the progress request which is supposed to be seldom used.
Parallel queries can also be monitored. The same mecanism is used to
monitor child workers.
Utility such as CREATE INDEX which may be long to complete are also
monitored.
Patch
=====
The patch for version 3 is attached to the current email.
The patch has been rebased on latest commits as per today.
The patch is also available at : https://github.com/colinet/pro
gress/tree/progress
The diff stat of the patch is:
[root@rco pg]# git diff --stat master..
doc/src/sgml/config.sgml | 27 +-
doc/src/sgml/logical-replication.sgml | 2 +-
doc/src/sgml/monitoring.sgml | 283 +++++++++++-
doc/src/sgml/ref/create_function.sgml | 5 +-
doc/src/sgml/ref/create_subscription.sgml | 2 +-
doc/src/sgml/ref/load.sgml | 11 +-
doc/src/sgml/ref/psql-ref.sgml | 12 +-
src/backend/catalog/index.c | 11 +
src/backend/catalog/partition.c | 2 +-
src/backend/catalog/pg_publication.c | 24 -
src/backend/commands/explain.c | 3 +-
src/backend/executor/execMain.c | 12 +-
src/backend/executor/execProcnode.c | 31 ++
src/backend/nodes/outfuncs.c | 245 ++++++++++
src/backend/optimizer/path/costsize.c | 97 ++--
src/backend/postmaster/postmaster.c | 1 +
src/backend/rewrite/rewriteDefine.c | 6 -
src/backend/storage/file/buffile.c | 73 +++
src/backend/storage/file/fd.c | 15 +
src/backend/storage/ipc/ipci.c | 4 +
src/backend/storage/ipc/procarray.c | 59 +++
src/backend/storage/ipc/procsignal.c | 5 +
src/backend/storage/lmgr/lwlock.c | 2 +-
src/backend/tcop/postgres.c | 59 ++-
src/backend/tcop/pquery.c | 7 +
src/backend/tcop/utility.c | 10 +
src/backend/utils/adt/Makefile | 2 +-
src/backend/utils/adt/pg_progress.c | 2764
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/backend/utils/adt/varlena.c | 4 +-
src/backend/utils/init/globals.c | 18 +
src/backend/utils/init/miscinit.c | 21 +-
src/backend/utils/misc/guc.c | 23 +
src/backend/utils/sort/logtape.c | 18 +
src/backend/utils/sort/tuplesort.c | 153 +++++-
src/backend/utils/sort/tuplestore.c | 75 ++-
src/bin/pg_dump/pg_backup_archiver.c | 8 +-
src/bin/pg_upgrade/check.c | 20 +-
src/bin/pg_upgrade/pg_upgrade.c | 2 +-
src/bin/pg_upgrade/pg_upgrade.h | 2 +-
src/bin/psql/describe.c | 16 +-
src/include/catalog/catversion.h | 2 +-
src/include/catalog/pg_proc.h | 6 +-
src/include/commands/explain.h | 2 +
src/include/executor/execdesc.h | 3 +
src/include/miscadmin.h | 2 +
src/include/nodes/execnodes.h | 3 +
src/include/nodes/nodes.h | 7 +
src/include/nodes/plannodes.h | 11 +
src/include/pgstat.h | 3 +-
src/include/storage/buffile.h | 11 +
src/include/storage/fd.h | 2 +
src/include/storage/lwlock.h | 1 +
src/include/storage/procarray.h | 3 +
src/include/storage/procsignal.h | 3 +
src/include/tcop/pquery.h | 1 +
src/include/utils/logtape.h | 2 +
src/include/utils/pg_progress.h | 46 ++
src/include/utils/tuplesort.h | 72 ++-
src/include/utils/tuplestore.h | 35 ++
src/test/regress/expected/join.out | 35 --
src/test/regress/expected/opr_sanity.out | 10 -
src/test/regress/expected/rules.out | 5 -
src/test/regress/sql/join.sql | 28 --
src/test/regress/sql/opr_sanity.sql | 7 -
src/test/regress/sql/rules.sql | 5 -
65 files changed, 4153 insertions(+), 286 deletions(-)
[root@rco pg]#
2 main files are created:
src/backend/utils/adt/pg_progress.c
src/include/utils/pg_progress.h
Other files are concerned by small changes in order to collect progression
of the long running nodes.
Changelog
========
v3:
Added documentation
Added guc parameters for policy change (timeout, delay before reporting).
Restrict use of pg_progress() to superuser only
Support utility progression with CREATE INDEX.
Allow multiple monitoring backends at the same time to request progress of
the backends.
Fixed design of shared memory exchange between monitoring and monitored
backends when report is more than one shm page size.
Fixed locking of backend report request.
Moved code for new SQL function in src/backend/utils/adt/pg_progress.c
Use LWLockTranche to allocate one LWlock per backend. This lock to used to
serialize progression requests on backends.
Replace PROGRESS command by a new SQL function returning rows instead of
TEXT, JSON, XML.
Improved debug support
v2:
Added JSON, XML, TEXT INLINE format output
Added time consumed to run SQL queries
Added SQL query being monitored
Added support for parallel queries
Added disk used by the queries
Rebased on 9th May 2017 commits.
v1:
Initial version with only TEXT format reporting
Any suggestion, comment or feedback is welcome.
Regards
Attachments:
progress-v3.patchtext/x-patch; charset=US-ASCII; name=progress-v3.patchDownload
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 4a6c99e..0581f1a 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -17,7 +17,7 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
dbcommands.o define.o discard.o dropcmds.o \
event_trigger.o explain.o extension.o foreigncmds.o functioncmds.o \
indexcmds.o lockcmds.o matview.o operatorcmds.o opclasscmds.o \
- policy.o portalcmds.o prepare.o proclang.o publicationcmds.o \
+ policy.o portalcmds.o prepare.o proclang.o progress.o publicationcmds.o \
schemacmds.o seclabel.o sequence.o statscmds.o subscriptioncmds.o \
tablecmds.o tablespace.o trigger.o tsearchcmds.o typecmds.o user.o \
vacuum.o vacuumlazy.o variable.o view.o
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 9359d0a..1a86b4d 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -60,7 +60,6 @@ static void ExplainOneQuery(Query *query, int cursorOptions,
static void report_triggers(ResultRelInfo *rInfo, bool show_relname,
ExplainState *es);
static double elapsed_time(instr_time *starttime);
-static bool ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used);
static void ExplainNode(PlanState *planstate, List *ancestors,
const char *relationship, const char *plan_name,
ExplainState *es);
@@ -784,7 +783,7 @@ elapsed_time(instr_time *starttime)
* This ensures that we don't confusingly assign un-suffixed aliases to RTEs
* that never appear in the EXPLAIN output (such as inheritance parents).
*/
-static bool
+bool
ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used)
{
Plan *plan = planstate->plan;
diff --git a/src/backend/commands/progress.c b/src/backend/commands/progress.c
new file mode 100644
index 0000000..a042324
--- /dev/null
+++ b/src/backend/commands/progress.c
@@ -0,0 +1,2189 @@
+/*
+ * progress.c
+ * Monitor progression of request: PROGRESS
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/commands/monitor.c
+ */
+
+#include "postgres.h"
+
+#include <signal.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "nodes/nodes.h"
+#include "tcop/dest.h"
+#include "tcop/pquery.h"
+#include "catalog/pg_type.h"
+#include "nodes/extensible.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parsetree.h"
+#include "executor/progress.h"
+#include "access/xact.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "storage/lmgr.h"
+#include "storage/latch.h"
+#include "storage/procsignal.h"
+#include "storage/backendid.h"
+#include "executor/execdesc.h"
+#include "executor/executor.h"
+#include "executor/hashjoin.h"
+#include "executor/execParallel.h"
+#include "commands/defrem.h"
+#include "access/relscan.h"
+#include "access/parallel.h"
+#include "utils/memutils.h"
+#include "utils/lsyscache.h"
+#include "utils/builtins.h"
+#include "utils/json.h"
+#include "utils/tuplesort.h"
+#include "utils/tuplestore.h"
+#include "storage/buffile.h"
+#include "utils/ruleutils.h"
+#include "postmaster/bgworker_internals.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "funcapi.h"
+#include "pgstat.h"
+
+static int log_stmt = 1; /* log query monitored */
+static int debug = 1;
+
+/*
+ * Monitoring progress waits 5secs for monitored backend response.
+ *
+ * If this timeout is too short, it may not leave enough time for monotored backend to dump its
+ * progression about the SQL query it is running.
+ *
+ * If this timeout is too long, a cancelled SQL query in a backend could block the monitoring
+ * backend too for a longi time.
+ */
+unsigned short PROGRESS_TIMEOUT = 10;
+unsigned short PROGRESS_TIMEOUT_CHILD = 5;
+char* progress_backend_timeout = "<backend timeout>";
+
+/*
+ * Backend type (single worker, parallel main worker, parallel child worker
+ */
+#define SINGLE_WORKER 0
+#define MAIN_WORKER 1
+#define CHILD_WORKER 2
+
+/*
+ * Number of colums for pg_progress SQL function
+ */
+#define PG_PROGRESS_COLS 9
+
+#define PG_PROGRESS_PID 9
+#define PG_PROGRESS_BID 9
+#define PG_PROGRESS_LINEID 9
+#define PG_PROGRESS_INDENT 9
+#define PG_PROGRESS_TYPE 65
+#define PG_PROGRESS_NAME 257
+#define PG_PROGRESS_VALUE 257
+#define PG_PROGRESS_UNIT 9
+
+/*
+ * Progress node type
+ */
+#define PROP "property"
+#define NODE "node"
+#define RELATIONSHIP "relationship"
+
+/*
+ * Units for reports
+ */
+#define NO_UNIT ""
+#define BLK_UNIT "block"
+#define ROW_UNIT "row"
+#define PERCENT_UNIT "percent"
+#define SECOND_UNIT "second"
+#define BYTE_UNIT "byte"
+#define KBYTE_UNIT "KByte"
+#define MBYTE_UNIT "MByte"
+#define GBYTE_UNIT "Gbyte"
+#define TBYTE_UNIT "TByte"
+
+/*
+ * Verbosity report level
+ */
+#define VERBOSE_DISK_USE 1
+#define VERBOSE_ROW_SCAN 2
+#define VERBOSE_INDEX_SCAN 2
+#define VERBOSE_BUFFILE 2
+#define VERBOSE_HASH_JOIN 2
+#define VERBOSE_HASH_JOIN_DETAILED 3
+#define VERBOSE_TAPES 2
+#define VERBOSE_TAPES_DETAILED 3
+#define VERBOSE_TIME_REPORT 1
+#define VERBOSE_STACK 3
+
+/*
+ * Report only SQL querries which have been running longer than this value
+ */
+int progress_time_threshold = 3;
+
+/*
+ * One ProgressCtl is allocated for each backend process which is to be potentially monitored
+ * The array of progress_ctl structures is protected by ProgressLock global lock.
+ *
+ * Only one backend can be monitored at a time. This may be improved with a finer granulary
+ * using a LWLock tranche of MAX_NR_BACKENDS locks. In which case, one backend can be monitored
+ * independantly of the otther backends.
+ *
+ * The LWLock ensure that one backend can be only monitored by one other backend at a time.
+ * Other backends trying to monitor an already monitered backend will be put in
+ * queue of the LWWlock.
+ */
+typedef struct ProgressCtl {
+ bool verbose; /* be verbose */
+
+ bool parallel; /* true if parallel query */
+ bool child; /* true if child worker, false if main worker */
+ unsigned int child_indent; /* Indentation based value for child worker */
+
+ unsigned long disk_size; /* Disk size in bytes used by the backend for sorts, stores, hashes */
+
+ char* buf; /* progress status report in shm */
+ int pid[MAX_PARALLEL_WORKER_LIMIT];
+
+ struct Latch* latch; /* Used by requestor to wait for backend to complete its report */
+} ProgressCtl;
+
+struct ProgressCtl* progress_ctl_array; /* Array of MaxBackends ProgressCtl */
+char* dump_buf_array; /* SHMEM buffers one for each backend */
+struct Latch* resp_latch_array; /* Array of MaxBackends latches to synchronize response
+ * from monitored backend to monitoring backend */
+
+typedef struct ProgressState {
+ bool parallel; /* true if parallel query */
+ bool child; /* true if parallel and child backend */
+
+ int pid; /* pid of backend of child worker if parallel */
+ int ppid; /* pid of parent worker */
+ int bid;
+
+ /*
+ * State for output formating
+ */
+ int indent; /* current indentation level */
+ int lineid; /* needed for indentation */
+ bool verbose; /* be verbose */
+ StringInfo str; /* output buffer */
+
+ List* grouping_stack; /* format-specific grouping state */
+
+ MemoryContext memcontext;
+
+ /*
+ * State related to current plan/execution tree
+ */
+ PlannedStmt* pstmt;
+ struct Plan* plan;
+ struct PlanState* planstate;
+ List* rtable;
+ List* rtable_names;
+ List* deparse_cxt; /* context list for deparsing expressions */
+ EState* es; /* Top level data */
+ Bitmapset* printed_subplans; /* ids of SubPlans we've printed */
+
+ unsigned long disk_size; /* track on disk use for sorts, stores, and hashes */
+} ProgressState;
+
+/*
+ * No progress request unless requested.
+ */
+volatile bool progress_requested = false;
+
+/*
+ * local functions
+ */
+static void ProgressPlan(QueryDesc* query, ProgressState* ps);
+static void ProgressNode(PlanState* planstate, List* ancestors,
+ const char* relationship, const char* plan_name, ProgressState* ps);
+
+static ProgressState* CreateProgressState(void);
+static void ProgressIndent(ProgressState* ps);
+static void ProgressUnindent(ProgressState* ps);
+
+static void ProgressPid(int pid, int ppid, int verbose, Tuplestorestate* tupstore,
+ TupleDesc tupdesc, char* buf);
+static void ProgressSpecialPid(int pid, int bid, Tuplestorestate* tupstore, TupleDesc tupdesc, char* buf);
+
+static bool ReportHasChildren(Plan* plan, PlanState* planstate);
+
+/*
+ * Individual nodes of interest are:
+ * - scan data: for heap or index
+ * - sort data: for any relation or tuplestore
+ * Other nodes only wait on above nodes
+ */
+static void ProgressGather(GatherState* gs, ProgressState* ps);
+static void ProgressGatherMerge(GatherMergeState* gs, ProgressState* ps);
+static void ProgressParallelExecInfo(ParallelContext* pc, ProgressState* ps);
+
+static void ProgressScanBlks(ScanState* ss, ProgressState* ps);
+static void ProgressScanRows(Scan* plan, PlanState* plantstate, ProgressState* ps);
+static void ProgressTidScan(TidScanState* ts, ProgressState* ps);
+static void ProgressCustomScan(CustomScanState* cs, ProgressState* ps);
+static void ProgressIndexScan(IndexScanState* is, ProgressState* ps);
+
+static void ProgressLimit(LimitState* ls, ProgressState* ps);
+static void ProgressModifyTable(ModifyTableState * planstate, ProgressState* ps);
+static void ProgressHashJoin(HashJoinState* planstate, ProgressState* ps);
+static void ProgressHash(HashState* planstate, ProgressState* ps);
+static void ProgressHashJoinTable(HashJoinTable hashtable, ProgressState* ps);
+static void ProgressBufFileRW(BufFile* bf, ProgressState* ps, unsigned long *reads,
+ unsigned long * writes, unsigned long *disk_size);
+static void ProgressBufFile(BufFile* bf, ProgressState* ps);
+static void ProgressMaterial(MaterialState* planstate, ProgressState* ps);
+static void ProgressTupleStore(Tuplestorestate* tss, ProgressState* ps);
+static void ProgressAgg(AggState* planstate, ProgressState* ps);
+static void ProgressSort(SortState* ss, ProgressState* ps);
+static void ProgressTupleSort(Tuplesortstate* tss, ProgressState* ps);
+static void dumpTapes(struct ts_report* tsr, ProgressState* ps);
+
+//extern void ReportText(const char* label, const char* value, ReportState* rpt);
+//extern void ReportTextNoNewLine(const char* label, const char* value, ReportState* rpt);
+
+static void ReportTime(QueryDesc* query, ProgressState* ps);
+static void ReportStack(ProgressState* ps);
+static void ReportDisk(ProgressState* ps);
+
+static void ProgressDumpRequest(int pid);
+static void ProgressResetRequest(ProgressCtl* req);
+
+static void ProgressPropLong(ProgressState* ps, const char* type,
+ const char* name, unsigned long value, const char* unit);
+static void ProgressPropText(ProgressState* ps, const char* type,
+ const char* name, const char* value);
+static void ProgressPropTextStr(StringInfo str, int pid, int bid,
+ int lineid, int indent, const char* type, const char* name, const char* value);
+
+
+
+Size ProgressShmemSize(void)
+{
+ Size size;
+
+ /* Must match ProgressShmemInit */
+ size = mul_size(MaxBackends, sizeof(ProgressCtl));
+ size = add_size(size, mul_size(MaxBackends, PROGRESS_AREA_SIZE));
+ size = add_size(size, mul_size(MaxBackends, sizeof(struct Latch)));
+
+ return size;
+}
+
+/*
+ * Initialize our shared memory area
+ */
+void ProgressShmemInit(void)
+{
+ bool found;
+ size_t size = 0;
+
+ /*
+ * Allocated shared latches for response to progress request
+ */
+ size = mul_size(MaxBackends, sizeof(struct Latch));
+ resp_latch_array = ShmemInitStruct("Progress latches", size, &found);
+ if (!found) {
+ int i;
+ struct Latch* l;
+
+ l = resp_latch_array;
+ for (i = 0; i < MaxBackends; i++) {
+ InitSharedLatch(l);
+ l++;
+ }
+ }
+
+ /*
+ * Allocate SHMEM buffers for backend communication
+ */
+ size = MaxBackends * PROGRESS_AREA_SIZE;
+ dump_buf_array = (char*) ShmemInitStruct("Backend Dump Pages", size, &found);
+ if (!found) {
+ memset(dump_buf_array, 0, size);
+ }
+
+ /*
+ * Allocate progress request meta data, one for each backend
+ */
+ size = mul_size(MaxBackends, sizeof(ProgressCtl));
+ progress_ctl_array = ShmemInitStruct("ProgressCtl array", size, &found);
+ if (!found) {
+ int i;
+ ProgressCtl* req;
+ struct Latch* latch;
+
+ req = progress_ctl_array;
+ latch = resp_latch_array;
+ for (i = 0; i < MaxBackends; i++) {
+ /* Already zeroed above */
+ memset(req, 0, sizeof(ProgressCtl));
+
+ /* set default value */
+ req->latch = latch;
+ req->buf = dump_buf_array + i * PROGRESS_AREA_SIZE;
+ req->parallel = false;
+ req->child = false;
+ req->child_indent = 0;
+ req->disk_size = 0;
+ req->verbose = 0;
+ memset(req->pid, 0, sizeof(int) * MAX_PARALLEL_WORKER_LIMIT);
+ req++;
+ latch++;
+ }
+ }
+
+ return;
+}
+
+/*
+ * Dump request management
+ */
+static
+void ProgressDumpRequest(int pid)
+{
+ int bid;
+ ProgressCtl* req;
+
+ bid = ProcPidGetBackendId(pid);
+ req = progress_ctl_array + bid;
+ elog(LOG, "backend pid=%d bid=%d verbose=%d, parallel=%d child= %d indent=%d",
+ pid, bid, req->verbose, req->parallel, req->child, req->child_indent);
+}
+
+static
+void ProgressResetRequest(ProgressCtl* req)
+{
+ elog(LOG, "reset progress request at addr %p", req);
+
+ req->parallel = false;
+ req->child = false;
+ req->child_indent = 0;
+ req->disk_size = 0;
+ req->verbose = 0;
+
+ InitSharedLatch(req->latch);
+
+ memset(req->buf, 0, PROGRESS_AREA_SIZE);
+ memset(req->pid, 0, sizeof(int) * MAX_PARALLEL_WORKER_LIMIT);
+}
+
+/*
+ * Report of rows in pg_progress tables
+ */
+static
+void ProgressPropLong(ProgressState* ps,
+ const char* type, const char* name, unsigned long value, const char* unit)
+{
+ /*
+ * Fields are: pid, lineid, indent, name, value, unit
+ */
+ char pid_str[PG_PROGRESS_PID];
+ char bid_str[PG_PROGRESS_BID];
+ char lineid_str[PG_PROGRESS_LINEID];
+ char indent_str[PG_PROGRESS_INDENT];
+ char value_str[PG_PROGRESS_VALUE];
+
+ sprintf(pid_str, "%d", ps->pid);
+ sprintf(bid_str, "%d", ps->bid);
+ sprintf(lineid_str, "%d", ps->lineid);
+ sprintf(indent_str, "%d", ps->indent);
+ sprintf(value_str, "%lu", value);
+
+ elog(LOG, "ProgressPropLong PID_STR = %s", pid_str);
+ appendStringInfo(ps->str, "%s|%s|%s|%s|%s|%s|%s|%s|",
+ pid_str, bid_str, lineid_str, indent_str, type, name, value_str, unit);
+
+ ps->lineid++;
+}
+
+static
+void ProgressPropText(ProgressState* ps,
+ const char* type, const char* name, const char* value)
+{
+ /*
+ * Fields are: pid, lineid, indent, name, value, unit
+ */
+ char pid_str[PG_PROGRESS_PID];
+ char bid_str[PG_PROGRESS_BID];
+ char lineid_str[PG_PROGRESS_LINEID];
+ char indent_str[PG_PROGRESS_INDENT];
+
+ sprintf(pid_str, "%d", ps->pid);
+ sprintf(bid_str, "%d", ps->bid);
+ sprintf(lineid_str, "%d", ps->lineid);
+ sprintf(indent_str, "%d", ps->indent);
+
+ elog(LOG, "ProgressPropText PID_STR = %s", pid_str);
+ appendStringInfo(ps->str, "%s|%s|%s|%s|%s|%s|%s||",
+ pid_str, bid_str, lineid_str, indent_str, type, name, value);
+
+ ps->lineid++;
+}
+
+static
+void ProgressPropTextStr(StringInfo str, int pid, int bid, int lineid,
+ int indent, const char* type, const char* name, const char* value)
+{
+ elog(LOG, "ProgressPropTextStr PID_STR = %d", pid);
+ appendStringInfo(str, "%d|%d|%d|%d|%s|%s|%s||",
+ pid, bid, lineid, indent, type, name, value);
+}
+
+static
+void ProgressResetReport(
+ ProgressState* ps)
+{
+ resetStringInfo(ps->str);
+}
+
+/*
+ * Colums are: pid, lineid, indent, property, value, unit
+ */
+Datum pg_progress(PG_FUNCTION_ARGS)
+{
+ int pid;
+ char* buf;
+
+ unsigned short verbose;
+
+ Datum values[PG_PROGRESS_COLS];
+ bool nulls[PG_PROGRESS_COLS];
+ TupleDesc tupdesc;
+ Tuplestorestate* tupstore;
+ ReturnSetInfo* rsinfo;
+
+ ProgressCtl* req;
+ ProgressCtl* child_req;
+ int pid_index;
+ int child_pid;
+
+ BackendId child_bid;
+
+ int num_backends;
+ int curr_backend;
+
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+
+
+ if (debug)
+ elog(LOG, "Start of pg_progress");
+
+ /*
+ * pid = 0 means collect progress report for all backends
+ */
+ pid = PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+ verbose = PG_ARGISNULL(0) ? false : PG_GETARG_UINT16(1);
+ if (debug)
+ elog(LOG, "pid = %d, verbose = %d", pid, verbose);
+
+ /*
+ * Build a tuple descriptor for our result type
+ */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) {
+ elog(ERROR, "return type must be a row type");
+ }
+
+ /*
+ * Switch to query memory context
+ */
+ rsinfo = (ReturnSetInfo*) fcinfo->resultinfo;
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ /* Allocate buf for local work */
+ buf = palloc0(PROGRESS_AREA_SIZE);
+
+ if (pid > 0) {
+ /* Target specific pid given as SQL function argupment */
+ ProgressPid(pid, 0, verbose, tupstore, tupdesc, buf);
+ } else {
+ /* Loop over all backends */
+ num_backends = pgstat_fetch_stat_numbackends();
+ elog(LOG, "Num backends = %d", num_backends);
+ for (curr_backend = 1; curr_backend <= num_backends; curr_backend++) {
+ LocalPgBackendStatus* local_beentry;
+ PgBackendStatus* beentry;
+ TimestampTz delta;
+ BackendType backend_type;
+ long secs;
+ int usecs;
+
+ elog(LOG, "LOOP on backend %d", curr_backend);
+ local_beentry = pgstat_fetch_stat_local_beentry(curr_backend);
+ if (!local_beentry) {
+ char lbuf[] = "<backend information not available>";
+
+ ProgressSpecialPid(0, curr_backend, tupstore, tupdesc, lbuf);
+ continue;
+ }
+
+ beentry = &local_beentry->backendStatus;
+ pid = beentry->st_procpid;
+ backend_type = beentry->st_backendType;
+
+ /*
+ * Do not monitor oneself
+ */
+ if (pid == getpid()) {
+ // ProgressSpecialPid(pid, curr_backend, tupstore, tupdesc, "<self backend>");
+ continue;
+ }
+
+ if (backend_type != B_BACKEND) {
+ // ProgressSpecialPid(pid, curr_backend, tupstore, tupdesc, "<not a standard backend>");
+ continue;
+ }
+
+ /*
+ * Do not report SQL queries as long as they have not run for threshold amount of time
+ */
+ delta = GetCurrentTimestamp() - beentry->st_proc_start_timestamp;
+ TimestampDifference(beentry->st_activity_start_timestamp, GetCurrentTimestamp(), &secs, &usecs);
+ elog(LOG, "Current %lu Start %lu delta %lu secs %ld",
+ GetCurrentTimestamp(), beentry->st_proc_start_timestamp, delta, secs);
+
+ if (secs < progress_time_threshold) {
+ char lbuf[] = "<backend has not run long enough>";
+
+ ProgressSpecialPid(pid, curr_backend, tupstore, tupdesc, lbuf);
+ continue;
+ }
+
+ elog(LOG, "PROGRESS on backend pid %d bid %d", pid, curr_backend);
+
+ /*
+ * We need to make sure the buffer is wiped out.
+ * Otherwise previous content may reappear in the the rows output
+ */
+ memset(buf, 0, PROGRESS_AREA_SIZE);
+ req = progress_ctl_array + curr_backend;
+ ProgressResetRequest(req);
+ ProgressPid(pid, 0, verbose, tupstore, tupdesc, buf);
+
+ /*
+ * Check for child pid of current pid
+ */
+ pid_index = 0;
+ while (req->pid[pid_index] != 0) {
+ memset(buf, 0, PROGRESS_AREA_SIZE);
+ child_pid = req->pid[pid_index];
+ child_bid = ProcPidGetBackendId(child_pid);
+
+ child_req = progress_ctl_array + child_bid;
+ child_req->parallel = true;
+ child_req->child = true;
+ child_req->child_indent = req->child_indent;
+
+ elog(LOG, "COLLECT CHILD DATA");
+ ProgressDumpRequest(pid);
+ ProgressDumpRequest(child_pid);
+
+ ProgressPid(child_pid, pid, verbose, tupstore, tupdesc, buf);
+ ProgressResetRequest(child_req);
+
+ pid_index++;
+ }
+
+ ProgressResetRequest(req);
+ ProgressDumpRequest(pid);
+ }
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ pfree(buf);
+ MemoryContextSwitchTo(oldcontext);
+
+ return (Datum) 0;
+}
+
+static
+void ProgressPid(int pid, int ppid, int verbose, Tuplestorestate* tupstore, TupleDesc tupdesc, char* buf)
+{
+ int bid;
+
+ Datum values[PG_PROGRESS_COLS];
+ bool nulls[PG_PROGRESS_COLS];
+
+ char pid_str[PG_PROGRESS_PID];
+ int pid_val;
+
+ char bid_str[PG_PROGRESS_BID];
+ int bid_val;
+
+ char lineid_str[PG_PROGRESS_LINEID];
+ int lineid_val;
+
+ char indent_str[PG_PROGRESS_INDENT];
+ int indent_val;
+
+ char type_str[PG_PROGRESS_TYPE];
+ char name_str[PG_PROGRESS_NAME];
+ char value_str[PG_PROGRESS_VALUE];
+ char unit_str[PG_PROGRESS_UNIT];
+
+ char* token_start;
+ char* token_next;
+ unsigned short token_length;
+ unsigned short total_length;
+ int i;
+
+ /* Convert pid to backend_id */
+ bid = ProcPidGetBackendId(pid);
+ if (bid == InvalidBackendId) {
+ ereport(ERROR, (
+ errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW),
+ errmsg("Invalid backend process pid")));
+ }
+
+ if (pid == getpid()) {
+ ereport(ERROR, (
+ errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW),
+ errmsg("Cannot request status from self")));
+ }
+
+ if (debug)
+ elog(LOG, "pid = %d, bid = %d", pid, bid);
+
+ ProgressFetchReport(pid, bid, verbose, buf);
+
+ /*
+ * Setup tuple store
+ */
+ if (debug) {
+ elog(LOG, "setting up tuplestore");
+ elog(LOG, "buffer content: %s", buf);
+ }
+
+ //ProgressTupstoreFill(buf);
+ token_start = buf;
+ token_next = buf;
+
+ token_next = strchr(token_start, '|');
+ total_length = strlen(buf);
+
+ i = 0;
+
+ while (token_start != NULL) {
+ token_length = (unsigned short)(token_next - token_start);
+
+ switch(i) {
+ case 0:
+ snprintf(pid_str, token_length + 1, "%s", token_start);
+ pid_str[token_length + 1] = '\0';
+ break;
+ case 1:
+ snprintf(bid_str, token_length + 1, "%s", token_start);
+ bid_str[token_length + 1] = '\0';
+ break;
+ case 2:
+ snprintf(lineid_str, token_length + 1, "%s", token_start);
+ lineid_str[token_length + 1] = '\0';
+ break;
+ case 3:
+ snprintf(indent_str, token_length + 1, "%s", token_start);
+ indent_str[token_length + 1] = '\0';
+ break;
+ case 4:
+ snprintf(type_str, token_length + 1, "%s", token_start);
+ type_str[token_length + 1] = '\0';
+ break;
+ case 5:
+ snprintf(name_str, token_length + 1, "%s", token_start);
+ name_str[token_length + 1] = '\0';
+ break;
+ case 6:
+ snprintf(value_str, token_length + 1, "%s", token_start);
+ value_str[token_length + 1] = '\0';
+ break;
+ case 7:
+ snprintf(unit_str, token_length + 1, "%s", token_start);
+ unit_str[token_length + 1] = '\0';
+ break;
+ };
+
+ i++;
+
+ if (i == 8) {
+ /* PK */
+ pid_val = atoi(pid_str);
+ values[0] = Int32GetDatum(pid_val);
+ nulls[0] = false;
+
+ values[1] = Int32GetDatum(ppid);
+ nulls[1] = false;
+
+ /* PK */
+ bid_val = atoi(bid_str);
+ values[2] = Int32GetDatum(bid_val);
+ nulls[2] = false;
+
+ /* PK */
+ lineid_val = atoi(lineid_str);
+ values[3] = Int32GetDatum(lineid_val);
+ nulls[3] = false;
+
+ /* PK */
+ indent_val = atoi(indent_str);
+ values[4] = Int32GetDatum(indent_val);
+ nulls[4] = false;
+
+ /* PK */
+ values[5] = CStringGetTextDatum(type_str);
+ nulls[5] = false;
+
+ /* PK */
+ values[6] = CStringGetTextDatum(name_str);
+ nulls[6] = false;
+
+ if (strlen(value_str) == 0) {
+ nulls[7] = true;
+ } else {
+ values[7] = CStringGetTextDatum(value_str);
+ nulls[7] = false;
+ }
+
+ if (strlen(unit_str) == 0) {
+ nulls[8] = true;
+ } else {
+ values[8] = CStringGetTextDatum(unit_str);
+ nulls[8] = false;
+ }
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ i = 0;
+ }
+
+ token_start = token_next + 1;
+ if (token_start >= buf + total_length)
+ break;
+
+ token_next = strchr(token_next + 1,'|');
+ }
+}
+
+static
+void ProgressSpecialPid(int pid, int bid, Tuplestorestate* tupstore, TupleDesc tupdesc, char* buf)
+{
+ Datum values[PG_PROGRESS_COLS];
+ bool nulls[PG_PROGRESS_COLS];
+
+ values[0] = Int32GetDatum(pid);
+ nulls[0] = false;
+
+ values[1] = Int32GetDatum(0);
+ nulls[1] = false;
+
+ values[2] = Int32GetDatum(bid);
+ nulls[2] = false;
+
+ values[3] = Int32GetDatum(0);
+ nulls[3] = false;
+
+ values[4] = Int32GetDatum(0);
+ nulls[4] = false;
+
+ values[5] = CStringGetTextDatum(PROP);
+ nulls[5] = false;
+
+ values[6] = CStringGetTextDatum(buf);
+ nulls[6] = false;
+
+ nulls[7] = true;
+ nulls[8] = true;
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+}
+
+/*
+ * ProgressFetchReport:
+ * Log a request to a backend in order to fetch its progress log
+ */
+void ProgressFetchReport(int pid, int bid, int verbose, char* buf)
+{
+ ProgressCtl* req;
+ unsigned int buf_len = 0;
+ unsigned int str_len = 0;
+ StringInfo str;
+ unsigned short progress_did_timeout = 0;
+
+ if (debug)
+ elog(LOG, "Start of ProgressFetchReport with pid = %d, bid = %d", pid, bid);
+
+ /*
+ * Serialize signals/request to get the progress state of the query
+ */
+ LWLockAcquire(ProgressLock, LW_EXCLUSIVE);
+
+ req = progress_ctl_array + bid;
+ req->verbose = verbose;
+ ProgressDumpRequest(pid);
+
+ OwnLatch(req->latch);
+ ResetLatch(req->latch);
+
+ SendProcSignal(pid, PROCSIG_PROGRESS, bid);
+ if (debug)
+ elog(LOG, "waiting on latch");
+
+ WaitLatch(req->latch, WL_LATCH_SET | WL_TIMEOUT, PROGRESS_TIMEOUT * 1000L, WAIT_EVENT_PROGRESS);
+ DisownLatch(req->latch);
+ if (debug)
+ elog(LOG, "finish latch wait");
+
+ /* Fetch result and clear SHM buffer */
+ if (strlen(req->buf) == 0) {
+ /* We have timed out on PROGRESS_TIMEOUT */
+ progress_did_timeout = 1;
+ str = makeStringInfo();
+ ProgressPropTextStr(str, pid, bid, 0, 0, PROP, "status", progress_backend_timeout);
+ str_len = strlen(str->data);
+ memcpy(buf, str->data, str_len);
+ } else {
+ /* We have a result computed by the monitored backend */
+ buf_len = strlen(req->buf);
+ memcpy(buf, req->buf, buf_len);
+ }
+
+ if (debug)
+ elog(LOG, "buf: %s", buf);
+
+ /*
+ * Clear shm buffer
+ */
+ memset(req->buf, 0, PROGRESS_AREA_SIZE);
+ if (debug)
+ elog(LOG, "cleared shm buf");
+
+ /*
+ * End serialization
+ */
+ LWLockRelease(ProgressLock);
+
+ if (progress_did_timeout) {
+ ereport(ERROR, (
+ errcode(ERRCODE_INTERVAL_FIELD_OVERFLOW),
+ errmsg("timeout to get response")));
+ }
+}
+
+static
+ProgressState* CreateProgressState(void)
+{
+ StringInfo str;
+ ProgressState* prg;
+
+ str = makeStringInfo();
+
+ prg = (ProgressState*) palloc0(sizeof(ProgressState));
+ prg->parallel = false;
+ prg->child = false;
+ prg->str = str;
+ prg->indent = 0;
+ prg->rtable = NULL;
+ prg->plan = NULL;
+ prg->pid = 0;
+ prg->lineid = 0;
+
+ return prg;
+}
+
+static
+void ProgressIndent(ProgressState* ps)
+{
+ ps->indent++;
+}
+
+static
+void ProgressUnindent(ProgressState* ps)
+{
+ ps->indent--;
+}
+
+/*
+ * Request handling
+ */
+void HandleProgressSignal(void)
+{
+ progress_requested = true;
+ InterruptPending = true;
+}
+
+void HandleProgressRequest(void)
+{
+ ProgressCtl* req;
+ ProgressState* ps;
+
+ MemoryContext oldcontext;
+ MemoryContext progress_context;
+
+ unsigned short running = 0;
+ char* shmBufferTooShort = "shm buffer is too small";
+ bool child = false;
+
+ if (debug)
+ elog(LOG, "request received");
+
+ /*
+ * We hold interrupt here because the current SQL query could be cancelled at any time. In which
+ * case, the current backend would not call SetLatch(). Monitoring backend would wait endlessly.
+ *
+ * To avoid such situation, a further safety measure has been added: the monitoring backend waits
+ * the response for a maximum of PROGRESS_TIMEOUT time. After this timeout has expired, the monitoring
+ * backend sends back the respponse which is empty.
+ *
+ * The current backend could indeed be interrupted before the HOLD_INTERRUPTS() is reached.
+ */
+ HOLD_INTERRUPTS();
+
+ progress_context = AllocSetContextCreate(CurrentMemoryContext,
+ "ProgressState", ALLOCSET_DEFAULT_SIZES);
+ oldcontext = MemoryContextSwitchTo(progress_context);
+
+ ps = CreateProgressState();
+ ps->memcontext = progress_context; // TODO: remove this useless field
+
+ Assert(ps != NULL);
+ Assert(ps->str != NULL);
+ Assert(ps->str->data != NULL);
+
+ req = progress_ctl_array + MyBackendId;
+ memset(req->pid, 0, sizeof(int) * MAX_PARALLEL_WORKER_LIMIT);
+
+ ps->verbose = req->verbose;
+ ps->parallel = req->parallel;
+ ps->child = req->child;
+ ps->indent = req->child_indent;
+ ps->disk_size = 0;
+ ps->pid = getpid();
+ ps->bid = MyBackendId;
+
+ /* Reset the buffer */
+ memset(req->buf, 0, PROGRESS_AREA_SIZE);
+
+ /* Local params */
+ child = req->child;
+
+ if (debug)
+ ProgressDumpRequest(getpid());
+
+ /*
+ * Only clear previous content of ps->str
+ */
+ ProgressResetReport(ps);
+
+ if (MyQueryDesc == NULL) {
+ ProgressPropText(ps, PROP, "status", "<idle backend>");
+ } else if (!IsTransactionState()) {
+ ProgressPropText(ps, PROP, "status", "<out of transaction>");
+ } else if (MyQueryDesc->plannedstmt == NULL) {
+ ProgressPropText(ps, PROP, "status", "<NULL planned statement>");
+ } else if (MyQueryDesc->plannedstmt->commandType == CMD_UTILITY) {
+ ProgressPropText(ps, PROP, "status", "<utility statement>");
+ } else if (MyQueryDesc->already_executed == false) {
+ ProgressPropText(ps, PROP, "status", "<query not yet started>");
+ } else if (QueryCancelPending) {
+ ProgressPropText(ps, PROP, "status", "<query cancel pending>");
+ } else if (RecoveryConflictPending) {
+ ProgressPropText(ps, PROP, "status", "<recovery conflict pending>");
+ } else if (ProcDiePending) {
+ ProgressPropText(ps, PROP, "status", "<proc die pending>");
+ } else {
+ running = 1;
+ if (!child)
+ ProgressPropText(ps, PROP, "status", "<query running>");
+ }
+
+ if (log_stmt && !child && running) {
+ if (MyQueryDesc != NULL && MyQueryDesc->sourceText != NULL)
+ ProgressPropText(ps, PROP, "query", MyQueryDesc->sourceText);
+ }
+
+ if (running) {
+ if (!child)
+ ReportTime(MyQueryDesc, ps);
+
+ if (ps->verbose > 0) {
+ ReportStack(ps);
+ }
+
+ ProgressPlan(MyQueryDesc, ps);
+ if (!child && ps->verbose > VERBOSE_DISK_USE)
+ ReportDisk(ps); /* must come after ProgressPlan() */
+ }
+
+ /*
+ * Dump in SHM the string buffer content
+ */
+ if (strlen(ps->str->data) < PROGRESS_AREA_SIZE) {
+ /* Mind the '\0' char at the end of the string */
+ memcpy(req->buf, ps->str->data, strlen(ps->str->data) + 1);
+ } else {
+ memcpy(req->buf, shmBufferTooShort, strlen(shmBufferTooShort));
+ elog(LOG, "Needed size for buffer %d", (int) strlen(ps->str->data));
+ }
+
+ /* Dump disk size used for stores, sorts, and hashes */
+ req->disk_size = ps->disk_size;
+
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextDelete(ps->memcontext);
+
+ if (debug)
+ elog(LOG, "setting latch");
+
+ /* Notify of progress state delivery */
+ SetLatch(req->latch);
+ RESUME_INTERRUPTS();
+}
+
+static
+void ProgressPlan(
+ QueryDesc* query,
+ ProgressState* ps)
+{
+ Bitmapset* rels_used = NULL;
+ PlanState* planstate;
+
+ /*
+ * Set up ProgressState fields associated with this plan tree
+ */
+ Assert(query->plannedstmt != NULL);
+
+ /* Top level tree data */
+ if (query->plannedstmt != NULL)
+ ps->pstmt = query->plannedstmt;
+
+ if (query->plannedstmt->planTree != NULL)
+ ps->plan = query->plannedstmt->planTree;
+
+ if (query->planstate != NULL)
+ ps->planstate = query->planstate;
+
+ if (query->estate != NULL)
+ ps->es = query->estate;
+
+ if (query->plannedstmt->rtable != NULL)
+ ps->rtable = query->plannedstmt->rtable;
+
+ ExplainPreScanNode(query->planstate, &rels_used);
+
+ ps->rtable_names = select_rtable_names_for_explain(ps->rtable, rels_used);
+ ps->deparse_cxt = deparse_context_for_plan_rtable(ps->rtable, ps->rtable_names);
+ ps->printed_subplans = NULL;
+
+ planstate = query->planstate;
+ if (IsA(planstate, GatherState) && ((Gather*) planstate->plan)->invisible) {
+ planstate = outerPlanState(planstate);
+ }
+
+ if (ps->parallel && ps->child)
+ ProgressNode(planstate, NIL, "child worker", NULL, ps);
+ else
+ ProgressNode(planstate, NIL, NULL, NULL, ps);
+}
+
+/*
+ * This is the main workhorse for collecting query execution progress.
+ *
+ * planstate is the current execution state in the global execution tree
+ * relationship: describes the relationship of this plan state to its parent
+ * "outer", "inner". It is null at tol level.
+ */
+static
+void ProgressNode(
+ PlanState* planstate,
+ List* ancestors,
+ const char* relationship,
+ const char* plan_name,
+ ProgressState* ps)
+{
+ Plan* plan = planstate->plan;
+ PlanInfo info;
+ bool haschildren;
+ int ret;
+
+ if (debug)
+ elog(LOG, "=> %s", nodeToString(plan));
+
+ /*
+ * 1st step: display the node type
+ */
+ ret = planNodeInfo(plan, &info);
+ if (ret != 0) {
+ elog(LOG, "unknown node type for plan");
+ }
+
+ ProgressPropText(ps, RELATIONSHIP, "relationship", relationship != NULL ? relationship : "progression");
+ ProgressIndent(ps);
+
+ /*
+ * Report node top properties
+ */
+ if (info.pname)
+ ProgressPropText(ps, NODE, "node name", info.pname);
+
+ if (plan_name)
+ ProgressPropText(ps, NODE, "plan name", plan_name);
+
+ if (plan->parallel_aware)
+ ProgressPropText(ps, PROP, "node mode", "parallel");
+
+ /*
+ * Second step
+ */
+ switch(nodeTag(plan)) {
+ case T_SeqScan: // ScanState
+ case T_SampleScan: // ScanState
+ case T_BitmapHeapScan: // ScanState
+ case T_SubqueryScan: // ScanState
+ case T_FunctionScan: // ScanState
+ case T_ValuesScan: // ScanState
+ case T_CteScan: // ScanState
+ case T_WorkTableScan: // ScanState
+ ProgressScanRows((Scan*) plan, planstate, ps);
+ ProgressScanBlks((ScanState*) planstate, ps);
+ break;
+
+ case T_TidScan: // ScanState
+ ProgressTidScan((TidScanState*) planstate, ps);
+ ProgressScanBlks((ScanState*) planstate, ps);
+ break;
+
+ case T_Limit: // PlanState
+ ProgressLimit((LimitState*) planstate, ps);
+ break;
+
+ case T_ForeignScan: // ScanState
+ case T_CustomScan: // ScanState
+ ProgressCustomScan((CustomScanState*) planstate, ps);
+ ProgressScanRows((Scan*) plan, planstate, ps);
+ break;
+
+ case T_IndexScan: // ScanState
+ case T_IndexOnlyScan: // ScanState
+ case T_BitmapIndexScan: // ScanState
+ ProgressScanBlks((ScanState*) planstate, ps);
+ ProgressIndexScan((IndexScanState*) planstate, ps);
+ break;
+
+ case T_ModifyTable: // PlanState
+ /*
+ * Dealt below with mt_plans array of PlanState nodes
+ */
+ ProgressModifyTable((ModifyTableState *) planstate, ps);
+ break;
+
+ case T_NestLoop: // JoinState (includes a Planstate)
+ case T_MergeJoin: // JoinState (includes a Planstate)
+ /*
+ * Does not perform long ops. Only Join
+ */
+ break;
+
+ case T_HashJoin: { // JoinState (includes a Planstate)
+ /*
+ * uses a HashJoin with BufFile
+ */
+ const char* jointype;
+
+ switch (((Join*) plan)->jointype) {
+ case JOIN_INNER:
+ jointype = "Inner";
+ break;
+
+ case JOIN_LEFT:
+ jointype = "Left";
+ break;
+
+ case JOIN_FULL:
+ jointype = "Full";
+ break;
+
+ case JOIN_RIGHT:
+ jointype = "Right";
+ break;
+
+ case JOIN_SEMI:
+ jointype = "Semi";
+ break;
+
+ case JOIN_ANTI:
+ jointype = "Anti";
+ break;
+
+ default:
+ jointype = "???";
+ break;
+ }
+
+ ProgressPropText(ps, PROP, "join type", jointype);
+
+ }
+
+ ProgressHashJoin((HashJoinState*) planstate, ps);
+ break;
+
+ case T_SetOp: { // PlanState
+ /*
+ * Only uses a in memory hash table
+ */
+ const char* setopcmd;
+
+ switch (((SetOp*) plan)->cmd) {
+ case SETOPCMD_INTERSECT:
+ setopcmd = "Intersect";
+ break;
+
+ case SETOPCMD_INTERSECT_ALL:
+ setopcmd = "Intersect All";
+ break;
+
+ case SETOPCMD_EXCEPT:
+ setopcmd = "Except";
+ break;
+
+ case SETOPCMD_EXCEPT_ALL:
+ setopcmd = "Except All";
+ break;
+
+ default:
+ setopcmd = "???";
+ break;
+ }
+
+ ProgressPropText(ps, PROP, "command", setopcmd);
+
+ }
+ break;
+
+ case T_Sort: // ScanState
+ ProgressSort((SortState*) planstate, ps);
+ break;
+
+ case T_Material: // ScanState
+ /*
+ * Uses: ScanState and Tuplestorestate
+ */
+ ProgressMaterial((MaterialState*) planstate, ps);
+ ProgressScanBlks((ScanState*) planstate, ps);
+ break;
+
+ case T_Group: // ScanState
+ ProgressScanBlks((ScanState*) planstate, ps);
+ break;
+
+ case T_Agg: // ScanState
+ /*
+ * Use tuplesortstate 2 times. Not reflected in child nodes
+ */
+ ProgressAgg((AggState*) planstate, ps);
+ break;
+
+ case T_WindowAgg: // ScanState
+ /*
+ * Has a Tuplestorestate (field buffer)
+ */
+ ProgressTupleStore(((WindowAggState*) plan)->buffer, ps);
+ break;
+
+ case T_Unique: // PlanState
+ /*
+ * Does not store any tuple.
+ * Just fetch tuple and compare with previous one.
+ */
+ break;
+
+ case T_Gather: // PlanState
+ /*
+ * Does not store any tuple.
+ * Used for parallel query
+ */
+ ProgressGather((GatherState*) planstate, ps);
+ break;
+
+ case T_GatherMerge: // PlanState
+ ProgressGatherMerge((GatherMergeState*) planstate, ps);
+ break;
+
+ case T_Hash: // PlanState
+ /*
+ * Has a potential on file hash data
+ */
+ ProgressHash((HashState*) planstate, ps);
+ break;
+
+ case T_LockRows: // PlanState
+ /*
+ * Only store tuples in memory array
+ */
+ break;
+
+ default:
+ break;
+ }
+
+ /*
+ * Target list
+ */
+ //if (ps->verbose)
+ // show_plan_tlist(planstate, ancestors, ps);
+
+ /*
+ * Controls (sort, qual, ...)
+ */
+ //show_control_qual(planstate, ancestors, ps);
+
+ /*
+ * Get ready to display the child plans.
+ * Pass current PlanState as head of ancestors list for children
+ */
+ haschildren = ReportHasChildren(plan, planstate);
+ if (haschildren) {
+ ancestors = lcons(planstate, ancestors);
+ }
+
+ /*
+ * initPlan-s
+ */
+ //if (planstate->initPlan) {
+ // ReportSubPlans(planstate->initPlan, ancestors, "InitPlan", ps, ProgressNode);
+ // }
+
+ /*
+ * lefttree
+ */
+ if (outerPlanState(planstate)) {
+ ProgressNode(outerPlanState(planstate), ancestors, "Outer", NULL, ps);
+ }
+
+ /*
+ * righttree
+ */
+ if (innerPlanState(planstate)) {
+ ProgressNode(innerPlanState(planstate), ancestors, "Inner", NULL, ps);
+ }
+
+ /*
+ * special child plans
+ */
+/*
+ switch (nodeTag(plan)) {
+ case T_ModifyTable:
+ ReportMemberNodes(((ModifyTable*) plan)->plans,
+ ((ModifyTableState*) planstate)->mt_plans, ancestors, ps, ProgressNode);
+ break;
+
+ case T_Append:
+ ReportMemberNodes(((Append*) plan)->appendplans,
+ ((AppendState*) planstate)->appendplans, ancestors, ps, ProgressNode);
+ break;
+
+ case T_MergeAppend:
+ ReportMemberNodes(((MergeAppend*) plan)->mergeplans,
+ ((MergeAppendState*) planstate)->mergeplans, ancestors, ps, ProgressNode);
+ break;
+
+ case T_BitmapAnd:
+ ReportMemberNodes(((BitmapAnd*) plan)->bitmapplans,
+ ((BitmapAndState*) planstate)->bitmapplans, ancestors, ps, ProgressNode);
+ break;
+
+ case T_BitmapOr:
+ ReportMemberNodes(((BitmapOr*) plan)->bitmapplans,
+ ((BitmapOrState*) planstate)->bitmapplans, ancestors, ps, ProgressNode);
+ break;
+
+ case T_SubqueryScan:
+ ProgressNode(((SubqueryScanState*) planstate)->subplan, ancestors,
+ "Subquery", NULL, ps);
+ break;
+
+ case T_CustomScan:
+ ReportCustomChildren((CustomScanState*) planstate, ancestors, ps, ProgressNode);
+ break;
+
+ default:
+ break;
+ }
+*/
+
+ /*
+ * subPlan-s
+ */
+// if (planstate->subPlan)
+// ReportSubPlans(planstate->subPlan, ancestors, "SubPlan", ps, ProgressNode);
+
+ /*
+ * end of child plans
+ */
+ if (haschildren)
+ ancestors = list_delete_first(ancestors);
+
+ ProgressUnindent(ps);
+}
+
+static bool
+ReportHasChildren(Plan* plan, PlanState* planstate)
+{
+ bool haschildren;
+
+ haschildren = planstate->initPlan || outerPlanState(planstate)
+ || innerPlanState(planstate)
+ || IsA(plan, ModifyTable)
+ || IsA(plan, Append)
+ || IsA(plan, MergeAppend)
+ || IsA(plan, BitmapAnd)
+ || IsA(plan, BitmapOr)
+ || IsA(plan, SubqueryScan)
+ || (IsA(planstate, CustomScanState) && ((CustomScanState*) planstate)->custom_ps != NIL)
+ || planstate->subPlan;
+
+ return haschildren;
+}
+
+/**********************************************************************************
+ * Indivual Progress report functions for the different execution nodes starts here.
+ * These functions are leaf function of the Progress tree of functions to be called.
+ *
+ * For each of theses function, we need to be paranoiac because the execution tree
+ * and plan tree can be in any state. Which means, that any pointer may be NULL.
+ *
+ * Only ProgressState data is reliable. Pointers about ProgressState data can be reference
+ * without checking pointers values. All other data must be checked against NULL
+ * pointers.
+ **********************************************************************************/
+
+/*
+ * Deal with worker backends
+ */
+static
+void ProgressGather(GatherState* gs, ProgressState* ps)
+{
+ ParallelContext* pc;
+
+ pc = gs->pei->pcxt;
+ ProgressParallelExecInfo(pc, ps);
+}
+
+static
+void ProgressGatherMerge(GatherMergeState* gms, ProgressState* ps)
+{
+ ParallelContext* pc;
+
+ pc = gms->pei->pcxt;
+ ProgressParallelExecInfo(pc, ps);
+}
+
+static
+void ProgressParallelExecInfo(ParallelContext* pc, ProgressState* ps)
+{
+ ProgressCtl* req; // Current req struct for current backend
+ int pid;
+ int pid_index;
+ int i;
+
+ if (debug)
+ elog(LOG, "ProgressParallelExecInfo node");
+
+ if (pc == NULL) {
+ elog(LOG, "ParallelContext is NULL");
+ return;
+ }
+
+ ps->parallel = true;
+ ps->child = false;
+
+ req = progress_ctl_array + MyBackendId;
+ req->parallel = true;
+ req->child = false;
+ req->child_indent = ps->indent;
+
+ elog(LOG, "COLLECT MASTER PARALLEL WORKER DATA");
+ ProgressDumpRequest(getpid());
+
+ /*
+ * write pid of child worker in main req pid array
+ * note the child indentation for table indent field
+ */
+ pid_index = 0;
+ for (i = 0; i < pc->nworkers_launched; ++i) {
+ pid = pc->worker[i].pid;
+ req->pid[pid_index] = pid;
+ pid_index++;
+ }
+}
+
+static
+void ProgressScanBlks(ScanState* ss, ProgressState* ps)
+{
+ HeapScanDesc hsd;
+ ParallelHeapScanDesc phsd;
+ unsigned int nr_blks;
+
+ if (ss == NULL) {
+ elog(LOG, "SCAN ss is null");
+ return;
+ }
+
+ hsd = ss->ss_currentScanDesc;
+ if (hsd == NULL) {
+ elog(LOG, "SCAN hsd is null");
+ return;
+ }
+
+ phsd = hsd->rs_parallel;
+ if (phsd != NULL) {
+ /* Parallel query */
+ ProgressPropText(ps, PROP, "scan mode", "parallel");
+ if (phsd->phs_nblocks != 0 && phsd->phs_cblock != InvalidBlockNumber) {
+ if (phsd->phs_cblock > phsd->phs_startblock)
+ nr_blks = phsd->phs_cblock - phsd->phs_startblock;
+ else
+ nr_blks = phsd->phs_cblock + phsd->phs_nblocks - phsd->phs_startblock;
+
+ ProgressPropLong(ps, PROP, "fetched", nr_blks, BLK_UNIT);
+ ProgressPropLong(ps, PROP, "total", phsd->phs_nblocks, BLK_UNIT);
+ ProgressPropLong(ps, PROP, "completion", 100 * nr_blks/(phsd->phs_nblocks), PERCENT_UNIT);
+ } else {
+ if (phsd->phs_nblocks != 0)
+ ProgressPropLong(ps, PROP, "total", phsd->phs_nblocks, BLK_UNIT);
+
+ ProgressPropLong(ps, PROP, "completion", 100, PERCENT_UNIT);
+ }
+ } else {
+ /* Not a parallel query */
+ if (hsd->rs_nblocks != 0 && hsd->rs_cblock != InvalidBlockNumber) {
+ if (hsd->rs_cblock > hsd->rs_startblock)
+ nr_blks = hsd->rs_cblock - hsd->rs_startblock;
+ else
+ nr_blks = hsd->rs_cblock + hsd->rs_nblocks - hsd->rs_startblock;
+
+
+ ProgressPropLong(ps, PROP, "fetched", nr_blks, BLK_UNIT);
+ ProgressPropLong(ps, PROP, "total", hsd->rs_nblocks, BLK_UNIT);
+ ProgressPropLong(ps, PROP, "completion BLOCKS", 100 * nr_blks/(hsd->rs_nblocks), PERCENT_UNIT);
+ } else {
+ if (hsd->rs_nblocks != 0)
+ ProgressPropLong(ps, PROP, "total", hsd->rs_nblocks, BLK_UNIT);
+
+ ProgressPropLong(ps, PROP, "completion", 100, PERCENT_UNIT);
+ }
+ }
+}
+
+static
+void ProgressScanRows(Scan* plan, PlanState* planstate, ProgressState* ps)
+{
+ Index rti;
+ RangeTblEntry* rte;
+ char* objectname;
+
+ if (plan == NULL)
+ return;
+
+ if (planstate == NULL)
+ return;
+
+ rti = plan->scanrelid;
+ rte = rt_fetch(rti, ps->rtable);
+ objectname = get_rel_name(rte->relid);
+
+ if (objectname != NULL) {
+ ProgressPropText(ps, PROP, "scan on", quote_identifier(objectname));
+ }
+
+ if (ps->verbose >= VERBOSE_ROW_SCAN) {
+ ProgressPropLong(ps, PROP, "fetched",
+ (unsigned long) planstate->plan_rows, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "total",
+ (unsigned long) plan->plan.plan_rows, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "completion",
+ (unsigned short) planstate->percent_done, PERCENT_UNIT);
+ }
+}
+
+static
+void ProgressTidScan(TidScanState* ts, ProgressState* ps)
+{
+ unsigned int percent;
+
+ if (ts == NULL) {
+ return;
+ }
+
+ if (ts->tss_NumTids == 0)
+ percent = 0;
+ else
+ percent = (unsigned short)(100 * (ts->tss_TidPtr) / (ts->tss_NumTids));
+
+ ProgressPropLong(ps, PROP, "fetched", (long int) ts->tss_TidPtr, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "total", (long int) ts->tss_NumTids, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "completion", percent, PERCENT_UNIT);
+}
+
+static
+void ProgressLimit(LimitState* ls, ProgressState* ps)
+{
+ if (ls == NULL)
+ return;
+
+ if (ls->position == 0) {
+ ProgressPropLong(ps, PROP, "offset", 0, PERCENT_UNIT);
+ ProgressPropLong(ps, PROP, "count", 0, PERCENT_UNIT);
+ }
+
+ if (ls->position > 0 && ls->position <= ls->offset) {
+ ProgressPropLong(ps, PROP, "offset",
+ (unsigned short)(100 * (ls->position)/(ls->offset)), PERCENT_UNIT);
+ ProgressPropLong(ps, PROP, "count", 0, PERCENT_UNIT);
+ }
+
+ if (ls->position > ls->offset) {
+ ProgressPropLong(ps, PROP, "offset", 100, PERCENT_UNIT);
+ ProgressPropLong(ps, PROP, "count",
+ (unsigned short)(100 * (ls->position - ls->offset)/(ls->count)), PERCENT_UNIT);
+ }
+}
+
+static
+void ProgressCustomScan(CustomScanState* cs, ProgressState* ps)
+{
+ if (cs == NULL)
+ return;
+
+// if (cs->methods->ProgressCustomScan) {
+// cs->methods->ProgressCustomScan(cs, NULL, ps);
+// }
+}
+
+static
+void ProgressIndexScan(IndexScanState* is, ProgressState* ps)
+{
+ PlanState planstate;
+ Plan* p;
+
+ if (is == NULL) {
+ return;
+ }
+
+ planstate = is->ss.ps;
+ p = planstate.plan;
+ if (p == NULL) {
+ return;
+ }
+
+ if (ps->verbose > VERBOSE_ROW_SCAN) {
+ ProgressPropLong(ps, PROP, "fetched", (long int) planstate.plan_rows, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "total", (long int) p->plan_rows, ROW_UNIT);
+ }
+
+ ProgressPropLong(ps, PROP, "completion", (unsigned short) planstate.percent_done, PERCENT_UNIT);
+}
+
+static
+void ProgressModifyTable(ModifyTableState *mts, ProgressState* ps)
+{
+ EState* es;
+
+ if (mts == NULL)
+ return;
+
+ es = mts->ps.state;
+ if (es == NULL)
+ return;
+
+ ProgressPropLong(ps, PROP, "modified", (long int) es->es_processed, ROW_UNIT);
+}
+
+static
+void ProgressHash(HashState* hs, ProgressState* ps)
+{
+ if (hs == NULL)
+ return;
+
+ ProgressHashJoinTable((HashJoinTable) hs->hashtable, ps);
+}
+
+static
+void ProgressHashJoin(HashJoinState* hjs, ProgressState* ps)
+{
+ if (hjs == NULL)
+ return;
+
+ ProgressHashJoinTable((HashJoinTable) hjs->hj_HashTable, ps);
+}
+
+/*
+ * HashJoinTable is not a node type
+ */
+static
+void ProgressHashJoinTable(HashJoinTable hashtable, ProgressState* ps)
+{
+ int i;
+ unsigned long reads;
+ unsigned long writes;
+ unsigned long disk_size;
+ unsigned long lreads;
+ unsigned long lwrites;
+ unsigned long ldisk_size;
+
+ /*
+ * Could be used but not yet allocated
+ */
+ if (hashtable == NULL)
+ return;
+
+ if (hashtable->nbatch <= 1)
+ return;
+
+ if (ps->verbose >= VERBOSE_HASH_JOIN)
+ ProgressPropLong(ps, PROP, "hashtable nbatch", hashtable->nbatch, "");
+
+ /*
+ * Display global reads and writes
+ */
+ reads = 0;
+ writes = 0;
+ disk_size = 0;
+
+ for (i = 0; i < hashtable->nbatch; i++) {
+ if (hashtable->innerBatchFile[i]) {
+ ProgressBufFileRW(hashtable->innerBatchFile[i],
+ ps, &lreads, &lwrites, &ldisk_size);
+ reads += lreads;
+ writes += lwrites;
+ disk_size += ldisk_size;
+ }
+
+ if (hashtable->outerBatchFile[i]) {
+ ProgressBufFileRW(hashtable->outerBatchFile[i],
+ ps, &lreads, &lwrites, &ldisk_size);
+ reads += lreads;
+ writes += lwrites;
+ disk_size += ldisk_size;
+ }
+ }
+
+ /*
+ * Update SQL query wide disk use
+ */
+ ps->disk_size += disk_size;
+
+ if (ps->verbose >= VERBOSE_HASH_JOIN) {
+ ProgressPropLong(ps, PROP, "read", reads/1024, KBYTE_UNIT);
+ ProgressPropLong(ps, PROP, "write", writes/1024, KBYTE_UNIT);
+ }
+
+ if (writes > 0)
+ ProgressPropLong(ps, PROP, "completion", reads/writes, PERCENT_UNIT);
+
+ if (ps->verbose >= VERBOSE_DISK_USE)
+ ProgressPropLong(ps, PROP, "disk used", disk_size/1024, KBYTE_UNIT);
+
+ /*
+ * Only display details if requested
+ */
+ if (ps->verbose < VERBOSE_HASH_JOIN_DETAILED)
+ return;
+
+ if (hashtable->nbatch == 0)
+ return;
+
+ ps->indent++;
+ for (i = 0; i < hashtable->nbatch; i++) {
+ ProgressPropLong(ps, PROP, "batch", (long int) i, "");
+
+ if (hashtable->innerBatchFile[i]) {
+ ps->indent++;
+ ProgressPropText(ps, PROP, "group", "inner");
+ ProgressBufFile(hashtable->innerBatchFile[i], ps);
+ ps->indent--;
+ }
+
+ if (hashtable->outerBatchFile[i]) {
+ ps->indent++;
+ ProgressPropText(ps, PROP, "group", "outer");
+ ProgressBufFile(hashtable->outerBatchFile[i], ps);
+ ps->indent--;
+ }
+ }
+
+ ps->indent--;
+}
+
+static
+void ProgressBufFileRW(BufFile* bf, ProgressState* ps,
+ unsigned long* reads, unsigned long* writes, unsigned long* disk_size)
+{
+ MemoryContext oldcontext;
+ struct buffile_state* bfs;
+ int i;
+
+ if (bf == NULL)
+ return;
+
+ *reads = 0;
+ *writes = 0;
+ *disk_size = 0;
+
+ oldcontext = MemoryContextSwitchTo(ps->memcontext);
+ bfs = BufFileState(bf);
+ MemoryContextSwitchTo(oldcontext);
+
+ *disk_size = bfs->disk_size;
+
+ for (i = 0; i < bfs->numFiles; i++) {
+ *reads += bfs->bytes_read[i];
+ *writes += bfs->bytes_write[i];
+ }
+}
+
+static
+void ProgressBufFile(BufFile* bf, ProgressState* ps)
+{
+ int i;
+ struct buffile_state* bfs;
+ MemoryContext oldcontext;
+
+ if (bf == NULL)
+ return;
+
+ oldcontext = MemoryContextSwitchTo(ps->memcontext);
+ bfs = BufFileState(bf);
+ MemoryContextSwitchTo(oldcontext);
+
+ if (ps->verbose < VERBOSE_BUFFILE)
+ return;
+
+ ps->indent++;
+ ProgressPropLong(ps, PROP, "buffile nr files", bfs->numFiles, "");
+
+ if (bfs->numFiles == 0)
+ return;
+
+ if (ps->verbose >= VERBOSE_DISK_USE)
+ ProgressPropLong(ps, PROP, "disk used", bfs->disk_size/1024, KBYTE_UNIT);
+
+ for (i = 0; i < bfs->numFiles; i++) {
+ ps->indent++;
+ ProgressPropLong(ps, NODE, "file", i, "");
+ ProgressPropLong(ps, PROP, "read", bfs->bytes_read[i]/1024, KBYTE_UNIT);
+ ProgressPropLong(ps, PROP, "write", bfs->bytes_write[i]/1024, KBYTE_UNIT);
+ ps->indent--;
+ }
+
+ ps->indent--;
+}
+
+static
+void ProgressMaterial(MaterialState* planstate, ProgressState* ps)
+{
+ Tuplestorestate* tss;
+
+ if (planstate == NULL)
+ return;
+
+ tss = planstate->tuplestorestate;
+ ProgressTupleStore(tss, ps);
+
+}
+/*
+ * Tuplestorestate is not a node type
+ */
+static
+void ProgressTupleStore(Tuplestorestate* tss, ProgressState* ps)
+{
+ struct tss_report tssr;
+
+ if (tss == NULL)
+ return;
+
+ tuplestore_get_state(tss, &tssr);
+
+ switch (tssr.status) {
+ case TSS_INMEM:
+ /* Add separator */
+ ProgressPropLong(ps, PROP, "memory write", (long int) tssr.memtupcount, ROW_UNIT);
+ if (tssr.memtupskipped > 0)
+ ProgressPropLong(ps, PROP, "memory skipped", (long int) tssr.memtupskipped, ROW_UNIT);
+
+ ProgressPropLong(ps, PROP, "memory read", (long int) tssr.memtupread, ROW_UNIT);
+ if (tssr.memtupdeleted)
+ ProgressPropLong(ps, PROP, "memory deleted", (long int) tssr.memtupread, ROW_UNIT);
+ break;
+
+ case TSS_WRITEFILE:
+ case TSS_READFILE:
+ if (tssr.status == TSS_WRITEFILE)
+ ProgressPropText(ps, PROP, "file store", "write");
+ else
+ ProgressPropText(ps, PROP, "file store", "read");
+
+ ProgressPropLong(ps, PROP, "readptrcount", tssr.readptrcount, "");
+ ProgressPropLong(ps, PROP, "write", (long int ) tssr.tuples_count, ROW_UNIT);
+ if (tssr.tuples_skipped)
+ ProgressPropLong(ps, PROP, "skipped", (long int) tssr.tuples_skipped, ROW_UNIT);
+
+ ProgressPropLong(ps, PROP, "read", (long int) tssr.tuples_read, ROW_UNIT);
+ if (tssr.tuples_deleted)
+ ProgressPropLong(ps, PROP, "deleted", (long int) tssr.tuples_deleted, ROW_UNIT);
+
+ ps->disk_size += tssr.disk_size;
+ if (ps->verbose >= VERBOSE_DISK_USE)
+ ProgressPropLong(ps, PROP, "disk used", tssr.disk_size/2014, KBYTE_UNIT);
+ break;
+
+ default:
+ break;
+ }
+}
+
+static
+void ProgressAgg(AggState* planstate, ProgressState* ps)
+{
+ if (planstate == NULL)
+ return;
+
+ ProgressTupleSort(planstate->sort_in, ps);
+ ProgressTupleSort(planstate->sort_out, ps);
+}
+
+static
+void ProgressSort(SortState* ss, ProgressState* ps)
+{
+ Assert(nodeTag(ss) == T_SortState);
+
+ if (ss == NULL)
+ return;
+
+ if (ss->tuplesortstate == NULL)
+ return;
+
+ ProgressTupleSort(ss->tuplesortstate, ps);
+}
+
+static
+void ProgressTupleSort(Tuplesortstate* tss, ProgressState* ps)
+{
+ struct ts_report* tsr;
+ MemoryContext oldcontext;
+ char status[] = "sort status";
+
+ if (tss == NULL)
+ return;
+
+ oldcontext = MemoryContextSwitchTo(ps->memcontext);
+ tsr = tuplesort_get_state(tss);
+ MemoryContextSwitchTo(oldcontext);
+
+ switch (tsr->status) {
+ case TSS_INITIAL: /* Loading tuples in mem still within memory limit */
+ case TSS_BOUNDED: /* Loading tuples in mem into bounded-size heap */
+ ProgressPropText(ps, PROP, status, "loading tuples in memory");
+ ProgressPropLong(ps, PROP, "tuples in memory", tsr->memtupcount, ROW_UNIT);
+ break;
+
+ case TSS_SORTEDINMEM: /* Sort completed entirely in memory */
+ ProgressPropText(ps, PROP, status, "sort completed in memory");
+ ProgressPropLong(ps, PROP, "tuples in memory", tsr->memtupcount, ROW_UNIT);
+ break;
+
+ case TSS_BUILDRUNS: /* Dumping tuples to tape */
+ switch (tsr->sub_status) {
+ case TSSS_INIT_TAPES:
+ ProgressPropText(ps, PROP, status, "on tapes initializing");
+ break;
+
+ case TSSS_DUMPING_TUPLES:
+ ProgressPropText(ps, PROP, status, "on tapes writing");
+ break;
+
+ case TSSS_SORTING_ON_TAPES:
+ ProgressPropText(ps, PROP, status, "on tapes sorting");
+ break;
+
+ case TSSS_MERGING_TAPES:
+ ProgressPropText(ps, PROP, status, "on tapes merging");
+ break;
+ default:
+ ;
+ };
+
+ dumpTapes(tsr, ps);
+ break;
+
+ case TSS_FINALMERGE: /* Performing final merge on-the-fly */
+ ProgressPropText(ps, PROP, status, "on tapes final merge");
+ dumpTapes(tsr, ps);
+ break;
+
+ case TSS_SORTEDONTAPE: /* Sort completed, final run is on tape */
+ switch (tsr->sub_status) {
+ case TSSS_FETCHING_FROM_TAPES:
+ ProgressPropText(ps, PROP, status, "fetching from sorted tapes");
+ break;
+
+ case TSSS_FETCHING_FROM_TAPES_WITH_MERGE:
+ ProgressPropText(ps, PROP, status, "fetching from sorted tapes with merge");
+ break;
+ default:
+ ;
+ };
+
+ dumpTapes(tsr, ps);
+ break;
+
+ default:
+ ProgressPropText(ps, PROP, status, "unexpected sort state");
+ };
+}
+
+static
+void dumpTapes(struct ts_report* tsr, ProgressState* ps)
+{
+ int i;
+ int percent_effective;
+
+ if (tsr == NULL)
+ return;
+
+ if (tsr->tp_write_effective > 0) {
+ percent_effective = 100 * (tsr->tp_read_effective)/(tsr->tp_write_effective);
+ } else {
+ percent_effective = 0;
+ }
+
+ if (ps->verbose >= VERBOSE_TAPES) {
+ ProgressPropLong(ps, PROP, "merge reads", tsr->tp_read_merge, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "merge writes", tsr->tp_write_merge, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "effective reads", tsr->tp_read_effective, ROW_UNIT);
+ ProgressPropLong(ps, PROP, "effective writes", tsr->tp_write_effective, ROW_UNIT);
+ }
+
+ ProgressPropLong(ps, PROP, "completion", percent_effective, PERCENT_UNIT);
+ if (ps->verbose >= VERBOSE_DISK_USE)
+ ProgressPropLong(ps, PROP, "tape size", tsr->blocks_alloc, BLK_UNIT);
+
+ /*
+ * Update total disk size used
+ */
+ ps->disk_size += tsr->blocks_alloc * BLCKSZ;
+
+ if (ps->verbose < VERBOSE_TAPES_DETAILED)
+ return;
+
+ /*
+ * Verbose report
+ */
+ ProgressPropLong(ps, PROP, "tapes total", tsr->maxTapes, NO_UNIT);
+ ProgressPropLong(ps, PROP, "tapes actives", tsr->activeTapes, NO_UNIT);
+
+ if (tsr->result_tape != -1)
+ ProgressPropLong(ps, PROP, "tape result", tsr->result_tape, NO_UNIT);
+
+ if (tsr->maxTapes != 0) {
+ for (i = 0; i< tsr->maxTapes; i++) {
+ ps->indent++;
+ ProgressPropLong(ps, NODE, "tape idx", i, NO_UNIT);
+
+ ps->indent++;
+ if (tsr->tp_fib != NULL)
+ ProgressPropLong(ps, PROP, "fib", tsr->tp_fib[i], NO_UNIT);
+
+ if (tsr->tp_runs != NULL)
+ ProgressPropLong(ps, PROP, "runs", tsr->tp_runs[i], NO_UNIT);
+
+ if (tsr->tp_dummy != NULL)
+ ProgressPropLong(ps, PROP, "dummy", tsr->tp_dummy[i], NO_UNIT);
+
+ if (tsr->tp_read != NULL)
+ ProgressPropLong(ps, PROP, "read", tsr->tp_read[i], ROW_UNIT);
+
+ if (tsr->tp_write)
+ ProgressPropLong(ps, PROP, "write", tsr->tp_write[i], ROW_UNIT);
+
+ ps->indent--;
+ ps->indent--;
+ }
+ }
+}
+
+static
+void ReportTime(QueryDesc* query, ProgressState* ps)
+{
+ instr_time currenttime;
+
+ if (query == NULL)
+ return;
+
+ if (query->totaltime == NULL)
+ return;
+
+ INSTR_TIME_SET_CURRENT(currenttime);
+ INSTR_TIME_SUBTRACT(currenttime, query->totaltime->starttime);
+
+ if (ps->verbose >= VERBOSE_TIME_REPORT) {
+ ProgressPropLong(ps, PROP, "time used",
+ INSTR_TIME_GET_MILLISEC(currenttime)/1000, SECOND_UNIT);
+ }
+}
+
+static
+void ReportStack(ProgressState* ps)
+{
+ unsigned long depth;
+ unsigned long max_depth;
+
+ depth = get_stack_depth();
+ max_depth = get_max_stack_depth();
+
+ if (ps->verbose >= VERBOSE_STACK) {
+ ProgressPropLong(ps, PROP, "stack depth", depth, BYTE_UNIT);
+ ProgressPropLong(ps, PROP, "max stack depth", max_depth, BYTE_UNIT);
+ }
+}
+
+static
+void ReportDisk(ProgressState* ps)
+{
+ unsigned long size;
+ char* unit;
+
+ size = ps->disk_size;
+
+ if (size < 1024) {
+ unit = BYTE_UNIT;
+ } else if (size >= 1024 && size < 1024 * 1024) {
+ unit = KBYTE_UNIT;
+ size = size / 1024;
+ } else if (size >= 1024 * 1024 && size < 1024 * 1024 * 1024) {
+ unit = MBYTE_UNIT;
+ size = size / (1024 * 1024);
+ } else {
+ unit = GBYTE_UNIT;
+ size = size / (1024 * 1024 * 1024);
+ }
+
+ if (ps->verbose >= VERBOSE_DISK_USE)
+ ProgressPropLong(ps, PROP, "disk used", size, unit);
+}
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4a899f1..d6e926c 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -160,6 +160,13 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
Assert(queryDesc->estate == NULL);
/*
+ * Instrumentation about start time of the query
+ * This is needed for progress of long SQL queries as reported by PROGRESS command.
+ */
+ queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_TIMER);
+ InstrInit(queryDesc->totaltime, INSTRUMENT_TIMER);
+
+ /*
* If the transaction is read-only, we need to check if any writes are
* planned to non-temporary tables. EXPLAIN is considered read-only.
*
@@ -366,6 +373,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
direction,
dest,
execute_once);
+ queryDesc->query_completed = true;
}
/*
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 5469cde..b938f97 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -365,6 +365,12 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
}
/*
+ * Initialize percent done
+ */
+ result->percent_done = 0;
+ result->plan_rows = 0;
+
+ /*
* Initialize any initPlans present in this node. The planner put them in
* a separate list for us.
*/
@@ -398,6 +404,9 @@ TupleTableSlot *
ExecProcNode(PlanState *node)
{
TupleTableSlot *result;
+ double computed_rows;
+ double total_rows;
+ unsigned short new_progress; /* % of progression for next percent */
CHECK_FOR_INTERRUPTS();
@@ -577,6 +586,22 @@ ExecProcNode(PlanState *node)
break;
}
+ /*
+ * Progress Query
+ */
+ node->plan_rows++;
+ computed_rows = node->plan_rows;
+ total_rows = node->plan->plan_rows;
+ if (total_rows != 0)
+ new_progress = (100 * computed_rows) / total_rows;
+ else
+ new_progress = 0;
+
+ if (new_progress > node->percent_done) {
+ elog(DEBUG5, "ExecProcNode %d%%\n", (unsigned short) new_progress);
+ node->percent_done = new_progress;
+ }
+
if (node->instrument)
InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
@@ -839,6 +864,12 @@ ExecEndNode(PlanState *node)
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
break;
}
+
+ /*
+ * Re initialize percent done
+ */
+ node->percent_done = 0;
+ node->plan_rows = 0;
}
/*
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 9189c8d..312bb6a 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -4242,3 +4242,248 @@ bmsToString(const Bitmapset *bms)
outBitmapset(&str, bms);
return str.data;
}
+
+/*
+ * nodeName -
+ * extract node name from node pointer
+ */
+int
+planNodeInfo(struct Plan* plan, struct PlanInfo* info)
+{
+ const char *pname;
+ const char *sname;
+ const char *operation;
+ const char *partialmode;
+ const char *strategy;
+ const char * custom_name;
+ int ret;
+
+ ret = 0;
+
+ switch (nodeTag(plan))
+ {
+ case T_Result:
+ pname = sname = "Result";
+ break;
+ case T_ProjectSet:
+ pname = sname = "ProjectSet";
+ break;
+ case T_ModifyTable:
+ sname = "ModifyTable";
+ switch (((ModifyTable *) plan)->operation)
+ {
+ case CMD_INSERT:
+ pname = operation = "Insert";
+ break;
+ case CMD_UPDATE:
+ pname = operation = "Update";
+ break;
+ case CMD_DELETE:
+ pname = operation = "Delete";
+ break;
+ default:
+ pname = "???";
+ ret = -1;
+ break;
+ }
+ break;
+ case T_Append:
+ pname = sname = "Append";
+ break;
+ case T_MergeAppend:
+ pname = sname = "Merge Append";
+ break;
+ case T_RecursiveUnion:
+ pname = sname = "Recursive Union";
+ break;
+ case T_BitmapAnd:
+ pname = sname = "BitmapAnd";
+ break;
+ case T_BitmapOr:
+ pname = sname = "BitmapOr";
+ break;
+ case T_NestLoop:
+ pname = sname = "Nested Loop";
+ break;
+ case T_MergeJoin:
+ pname = "Merge"; /* "Join" gets added by jointype switch */
+ sname = "Merge Join";
+ break;
+ case T_HashJoin:
+ pname = "Hash"; /* "Join" gets added by jointype switch */
+ sname = "Hash Join";
+ break;
+ case T_SeqScan:
+ pname = sname = "Seq Scan";
+ break;
+ case T_SampleScan:
+ pname = sname = "Sample Scan";
+ break;
+ case T_Gather:
+ pname = sname = "Gather";
+ break;
+ case T_GatherMerge:
+ pname = sname = "Gather Merge";
+ break;
+ case T_IndexScan:
+ pname = sname = "Index Scan";
+ break;
+ case T_IndexOnlyScan:
+ pname = sname = "Index Only Scan";
+ break;
+ case T_BitmapIndexScan:
+ pname = sname = "Bitmap Index Scan";
+ break;
+ case T_BitmapHeapScan:
+ pname = sname = "Bitmap Heap Scan";
+ break;
+ case T_TidScan:
+ pname = sname = "Tid Scan";
+ break;
+ case T_SubqueryScan:
+ pname = sname = "Subquery Scan";
+ break;
+ case T_FunctionScan:
+ pname = sname = "Function Scan";
+ break;
+ case T_ValuesScan:
+ pname = sname = "Values Scan";
+ break;
+ case T_CteScan:
+ pname = sname = "CTE Scan";
+ break;
+ case T_WorkTableScan:
+ pname = sname = "WorkTable Scan";
+ break;
+ case T_ForeignScan:
+ sname = "Foreign Scan";
+ switch (((ForeignScan *) plan)->operation)
+ {
+ case CMD_SELECT:
+ pname = "Foreign Scan";
+ operation = "Select";
+ break;
+ case CMD_INSERT:
+ pname = "Foreign Insert";
+ operation = "Insert";
+ break;
+ case CMD_UPDATE:
+ pname = "Foreign Update";
+ operation = "Update";
+ break;
+ case CMD_DELETE:
+ pname = "Foreign Delete";
+ operation = "Delete";
+ break;
+ default:
+ pname = "???";
+ ret = -1;
+ break;
+ }
+ break;
+ case T_CustomScan:
+ sname = "Custom Scan";
+ custom_name = ((CustomScan *) plan)->methods->CustomName;
+ if (custom_name)
+ pname = psprintf("Custom Scan (%s)", custom_name);
+ else
+ pname = sname;
+ break;
+ case T_Material:
+ pname = sname = "Materialize";
+ break;
+ case T_Sort:
+ pname = sname = "Sort";
+ break;
+ case T_Group:
+ pname = sname = "Group";
+ break;
+ case T_Agg:
+ {
+ Agg *agg = (Agg *) plan;
+
+ sname = "Aggregate";
+ switch (agg->aggstrategy)
+ {
+ case AGG_PLAIN:
+ pname = "Aggregate";
+ strategy = "Plain";
+ break;
+ case AGG_SORTED:
+ pname = "GroupAggregate";
+ strategy = "Sorted";
+ break;
+ case AGG_HASHED:
+ pname = "HashAggregate";
+ strategy = "Hashed";
+ break;
+ default:
+ pname = "Aggregate ???";
+ strategy = "???";
+ ret = -1;
+ break;
+ }
+
+ if (DO_AGGSPLIT_SKIPFINAL(agg->aggsplit))
+ {
+ partialmode = "Partial";
+ pname = psprintf("%s %s", partialmode, pname);
+ }
+ else if (DO_AGGSPLIT_COMBINE(agg->aggsplit))
+ {
+ partialmode = "Finalize";
+ pname = psprintf("%s %s", partialmode, pname);
+ }
+ else
+ partialmode = "Simple";
+ }
+ break;
+ case T_WindowAgg:
+ pname = sname = "WindowAgg";
+ break;
+ case T_Unique:
+ pname = sname = "Unique";
+ break;
+ case T_SetOp:
+ sname = "SetOp";
+ switch (((SetOp *) plan)->strategy)
+ {
+ case SETOP_SORTED:
+ pname = "SetOp";
+ strategy = "Sorted";
+ break;
+ case SETOP_HASHED:
+ pname = "HashSetOp";
+ strategy = "Hashed";
+ break;
+ default:
+ pname = "SetOp ???";
+ strategy = "???";
+ ret = -1;
+ break;
+ }
+ break;
+ case T_LockRows:
+ pname = sname = "LockRows";
+ break;
+ case T_Limit:
+ pname = sname = "Limit";
+ break;
+ case T_Hash:
+ pname = sname = "Hash";
+ break;
+ default:
+ pname = sname = "???";
+ elog(LOG, "HERE ???: %d", nodeTag(plan));
+ ret = -1;
+ break;
+ }
+
+ info->pname = pname;
+ info->sname = sname;
+ info->operation = operation;
+ info->partialmode = partialmode;
+ info->strategy = strategy;
+
+ return ret;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 35b4ec8..1ff96ae 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -121,6 +121,7 @@
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
+#include "executor/progress.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/dynamic_loader.h"
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 4ca0ea4..32ed7ab 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -64,6 +64,12 @@ struct BufFile
off_t *offsets; /* palloc'd array with numFiles entries */
/*
+ * palloc's array of blk I/O stat
+ */
+ int *bytes_read;
+ int *bytes_write;
+
+ /*
* offsets[i] is the current seek position of files[i]. We use this to
* avoid making redundant FileSeek calls.
*/
@@ -109,8 +115,13 @@ makeBufFile(File firstfile)
file->numFiles = 1;
file->files = (File *) palloc(sizeof(File));
file->files[0] = firstfile;
+
file->offsets = (off_t *) palloc(sizeof(off_t));
file->offsets[0] = 0L;
+
+ file->bytes_read = (int*) palloc0(sizeof(int));
+ file->bytes_write = (int*) palloc0(sizeof(int));
+
file->isTemp = false;
file->isInterXact = false;
file->dirty = false;
@@ -146,6 +157,10 @@ extendBufFile(BufFile *file)
(file->numFiles + 1) * sizeof(File));
file->offsets = (off_t *) repalloc(file->offsets,
(file->numFiles + 1) * sizeof(off_t));
+
+ file->bytes_read = (int*) repalloc(file->bytes_read, (file->numFiles + 1) * sizeof(int));
+ file->bytes_write = (int*) repalloc(file->bytes_write, (file->numFiles + 1) * sizeof(int));
+
file->files[file->numFiles] = pfile;
file->offsets[file->numFiles] = 0L;
file->numFiles++;
@@ -212,6 +227,10 @@ BufFileClose(BufFile *file)
/* release the buffer space */
pfree(file->files);
pfree(file->offsets);
+
+ pfree(file->bytes_read);
+ pfree(file->bytes_write);
+
pfree(file);
}
@@ -261,6 +280,9 @@ BufFileLoadBuffer(BufFile *file)
WAIT_EVENT_BUFFILE_READ);
if (file->nbytes < 0)
file->nbytes = 0;
+
+ file->bytes_read[file->curFile] += file->nbytes;
+
file->offsets[file->curFile] += file->nbytes;
/* we choose not to advance curOffset here */
@@ -327,6 +349,9 @@ BufFileDumpBuffer(BufFile *file)
WAIT_EVENT_BUFFILE_WRITE);
if (bytestowrite <= 0)
return; /* failed to write */
+
+ file->bytes_write[file->curFile] += bytestowrite;
+
file->offsets[file->curFile] += bytestowrite;
file->curOffset += bytestowrite;
wpos += bytestowrite;
@@ -607,7 +632,55 @@ BufFileTellBlock(BufFile *file)
blknum = (file->curOffset + file->pos) / BLCKSZ;
blknum += file->curFile * BUFFILE_SEG_SIZE;
+
return blknum;
}
#endif
+
+struct buffile_state* BufFileState(BufFile *file)
+{
+ struct buffile_state* bfs;
+ int i;
+
+ if (file->numFiles == 0)
+ return NULL;
+
+ bfs = (struct buffile_state*) palloc0(sizeof(struct buffile_state));
+ bfs->numFiles = file->numFiles;
+ bfs->disk_size = BufFileGetDiskSize(file);
+
+ bfs->bytes_read = (int*) palloc0(file->numFiles * sizeof(int));
+ bfs->bytes_write = (int*) palloc0(file->numFiles * sizeof(int));
+
+ for (i = 0; i < file->numFiles; i++) {
+ bfs->bytes_read[i] = file->bytes_read[i];
+ bfs->bytes_write[i] = file->bytes_write[i];
+ }
+
+ return bfs;
+}
+
+/*
+ * Report disk use in Bytes
+ */
+int
+BufFileGetDiskSize(BufFile *file)
+{
+ int i;
+ int size;
+
+ if (file == NULL)
+ return 0;
+
+ if (file->numFiles == 0)
+ return 0;
+
+ size = 0;
+
+ for (i = 0; i < file->numFiles; i++) {
+ size += FileGetSize(file->files[i]);
+ }
+
+ return size;
+}
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 2851c5d..f5564f5 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -3213,3 +3213,18 @@ fsync_parent_path(const char *fname, int elevel)
return 0;
}
+
+int
+FileGetSize(File file)
+{
+ Vfd* vfdP;
+
+ if (!FileIsValid(file))
+ return 0;
+
+ vfdP = &VfdCache[file];
+ if (vfdP == NULL)
+ return 0;
+
+ return vfdP->fileSize;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2d1ed14..dc5bc37 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -46,6 +46,7 @@
#include "storage/spin.h"
#include "utils/backend_random.h"
#include "utils/snapmgr.h"
+#include "executor/progress.h"
shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
size = add_size(size, BackendRandomShmemSize());
+ size = add_size(size, ProgressShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -270,6 +272,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
SyncScanShmemInit();
AsyncShmemInit();
BackendRandomShmemInit();
+ ProgressShmemInit();
#ifdef EXEC_BACKEND
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8a71536..e60a866 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -172,6 +172,9 @@ static inline void ProcArrayEndTransactionInternal(PGPROC *proc,
PGXACT *pgxact, TransactionId latestXid);
static void ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid);
+/* Debugging primitive */
+static void dump_procs(void);
+
/*
* Report shared-memory space needed by CreateSharedProcArray.
*/
@@ -1257,6 +1260,60 @@ TransactionIdIsActive(TransactionId xid)
/*
+ * Convert process id to backend id.
+ * Needed for cmds/progress.c
+ */
+BackendId ProcPidGetBackendId(int pid)
+{
+ ProcArrayStruct *arrayP = procArray;
+ BackendId bid = InvalidBackendId;
+ int i;
+
+ //dump_procs();
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (i = 0; i < arrayP->numProcs; i++) {
+ int pgprocno;
+ volatile PGPROC* proc;
+
+ pgprocno = arrayP->pgprocnos[i];
+ proc = &allProcs[pgprocno];
+ if (proc->pid == pid) {
+ bid = proc->backendId;
+ break;
+ }
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ return bid;
+}
+
+static void dump_procs(void)
+{
+ ProcArrayStruct *arrayP = procArray;
+ int i;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+
+ for (i = 0; i < arrayP->numProcs; i++) {
+ int pgprocno;
+ volatile PGPROC* proc;
+
+ pgprocno = arrayP->pgprocnos[i];
+ proc = &allProcs[pgprocno];
+ elog(LOG, "pgprocno = %d, proc->pid = %d, proc->backendId = %d\n",
+ pgprocno,
+ proc->pid,
+ proc->backendId);
+ }
+
+ LWLockRelease(ProcArrayLock);
+}
+
+/*
* GetOldestXmin -- returns oldest transaction that was running
* when any current transaction was started.
*
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 4a21d55..31a9e26 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -26,6 +26,7 @@
#include "storage/shmem.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
+#include "executor/progress.h"
/*
@@ -288,6 +289,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
+ if (CheckProcSignal(PROCSIG_PROGRESS))
+ HandleProgressSignal();
+
SetLatch(MyLatch);
latch_sigusr1_handler();
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 35536e4..ab8058e 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 65;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ec..fd2ef26 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42
BackendRandomLock 43
LogicalRepWorkerLock 44
CLogTruncationLock 45
+ProgressLock 46
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a..f4878b7 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -42,6 +42,7 @@
#include "catalog/pg_type.h"
#include "commands/async.h"
#include "commands/prepare.h"
+#include "executor/progress.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
@@ -161,7 +162,7 @@ static bool EchoQuery = false; /* -E switch */
static bool UseSemiNewlineNewline = false; /* -j switch */
/* whether or not, and why, we were canceled by conflict with recovery */
-static bool RecoveryConflictPending = false;
+bool RecoveryConflictPending = false;
static bool RecoveryConflictRetryable = true;
static ProcSignalReason RecoveryConflictReason;
@@ -2999,6 +3000,9 @@ ProcessInterrupts(void)
if (ParallelMessagePending)
HandleParallelMessages();
+
+ if (progress_requested)
+ HandleProgressRequest();
}
@@ -3115,19 +3119,9 @@ check_stack_depth(void)
bool
stack_is_too_deep(void)
{
- char stack_top_loc;
long stack_depth;
- /*
- * Compute distance from reference point to my local variables
- */
- stack_depth = (long) (stack_base_ptr - &stack_top_loc);
-
- /*
- * Take abs value, since stacks grow up on some machines, down on others
- */
- if (stack_depth < 0)
- stack_depth = -stack_depth;
+ stack_depth = get_stack_depth();
/*
* Trouble?
@@ -3150,8 +3144,6 @@ stack_is_too_deep(void)
* Note we assume that the same max_stack_depth applies to both stacks.
*/
#if defined(__ia64__) || defined(__ia64)
- stack_depth = (long) (ia64_get_bsp() - register_stack_base_ptr);
-
if (stack_depth > max_stack_depth_bytes &&
register_stack_base_ptr != NULL)
return true;
@@ -3160,6 +3152,45 @@ stack_is_too_deep(void)
return false;
}
+long
+get_stack_depth(void)
+{
+ char stack_top_loc;
+ long stack_depth;
+
+ /*
+ * Compute distance from reference point to my local variables
+ */
+ stack_depth = (long) (stack_base_ptr - &stack_top_loc);
+
+ /*
+ * Take abs value, since stacks grow up on some machines, down on others
+ */
+ if (stack_depth < 0)
+ stack_depth = -stack_depth;
+
+ /*
+ * On IA64 there is a separate "register" stack that requires its own
+ * independent check. For this, we have to measure the change in the
+ * "BSP" pointer from PostgresMain to here. Logic is just as above,
+ * except that we know IA64's register stack grows up.
+ *
+ * Note we assume that the same max_stack_depth applies to both stacks.
+ */
+#if defined(__ia64__) || defined(__ia64)
+ stack_depth = (long) (ia64_get_bsp() - register_stack_base_ptr);
+
+#endif /* IA64 */
+
+ return stack_depth;
+}
+
+long
+get_max_stack_depth(void)
+{
+ return max_stack_depth_bytes;
+}
+
/* GUC check hook for max_stack_depth */
bool
check_max_stack_depth(int *newval, void **extra, GucSource source)
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index e30aeb1..fa7e842 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -96,6 +96,10 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
/* not yet executed */
qd->already_executed = false;
+ /* Track the QueryDesc from global variables */
+ MyQueryDesc = qd;
+ IsQueryDescValid = true;
+
return qd;
}
@@ -114,6 +118,9 @@ FreeQueryDesc(QueryDesc *qdesc)
/* Only the QueryDesc itself need be freed */
pfree(qdesc);
+
+ MyQueryDesc = NULL;
+ IsQueryDescValid = false;
}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 08b6030..b4a5347 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -22,6 +22,8 @@
#include "libpq/pqcomm.h"
#include "miscadmin.h"
#include "storage/backendid.h"
+#include "executor/execdesc.h"
+#include "executor/progress.h"
ProtocolVersion FrontendProtocol;
@@ -86,6 +88,13 @@ char *DatabasePath = NULL;
pid_t PostmasterPid = 0;
/*
+ * Global QueryDesc pointer.
+ * This is needed from signal context to locate the QueryDesc we are in
+ */
+QueryDesc* MyQueryDesc;
+bool IsQueryDescValid = false;
+
+/*
* IsPostmasterEnvironment is true in a postmaster process and any postmaster
* child process; it is false in a standalone process (bootstrap or
* standalone backend). IsUnderPostmaster is true in postmaster child
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 92e1d63..2004618 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -41,6 +41,7 @@
#include "commands/vacuum.h"
#include "commands/variable.h"
#include "commands/trigger.h"
+#include "executor/progress.h"
#include "funcapi.h"
#include "libpq/auth.h"
#include "libpq/be-fsstubs.h"
@@ -2885,6 +2886,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"progress_time_threshold", PGC_SIGHUP, STATS_MONITORING,
+ gettext_noop("Minimum time before progression of SQL plan is collected."),
+ NULL,
+ GUC_UNIT_S
+ },
+ &progress_time_threshold,
+ 3, 3, 30,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 4557359..d916503 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -890,3 +890,21 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
{
return lts->nBlocksAllocated;
}
+
+long
+LogicalTapeSetBlocksWritten(LogicalTapeSet *lts)
+{
+ return lts->nBlocksWritten;
+}
+
+int
+LogicalTapeGetSize(LogicalTapeSet *lts)
+{
+ if (lts == NULL)
+ return 0;
+
+ if (lts->pfile == NULL)
+ return 0;
+
+ return BufFileGetDiskSize(lts->pfile);
+}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 8a8db0f..7d2498c 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -224,20 +224,6 @@ typedef union SlabSlot
} SlabSlot;
/*
- * Possible states of a Tuplesort object. These denote the states that
- * persist between calls of Tuplesort routines.
- */
-typedef enum
-{
- TSS_INITIAL, /* Loading tuples; still within memory limit */
- TSS_BOUNDED, /* Loading tuples into bounded-size heap */
- TSS_BUILDRUNS, /* Loading tuples; writing to tape */
- TSS_SORTEDINMEM, /* Sort completed entirely in memory */
- TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
- TSS_FINALMERGE /* Performing final merge on-the-fly */
-} TupSortStatus;
-
-/*
* Parameters for calculation of number of tapes to use --- see inittapes()
* and tuplesort_merge_order().
*
@@ -271,6 +257,7 @@ typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
struct Tuplesortstate
{
TupSortStatus status; /* enumerated value as shown above */
+ TupSortSubStatus sub_status; /* Sub status to track when creating and dumping to tape set */
int nKeys; /* number of columns in sort key */
bool randomAccess; /* did caller request random access? */
bool bounded; /* did caller specify a maximum number of
@@ -420,6 +407,20 @@ struct Tuplesortstate
int activeTapes; /* # of active input tapes in merge pass */
/*
+ * Tapeset read and write per tape
+ */
+ int *tp_read;
+ int *tp_write;
+
+ /*
+ * Total of all tuples written and read
+ */
+ int tp_read_effective;
+ int tp_write_effective;
+ int tp_read_merge;
+ int tp_write_merge;
+
+ /*
* These variables are used after completion of sorting to keep track of
* the next tuple to return. (In the tape case, the tape's current read
* position is also critical state.)
@@ -709,6 +710,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
#endif
state->status = TSS_INITIAL;
+ state->sub_status = TSSS_INVALID;
state->randomAccess = randomAccess;
state->bounded = false;
state->tuples = true;
@@ -721,6 +723,12 @@ tuplesort_begin_common(int workMem, bool randomAccess)
state->memtupcount = 0;
+ state->tp_read_effective = 0;
+ state->tp_write_effective = 0;
+
+ state->tp_read_merge = 0;
+ state->tp_write_merge = 0;
+
/*
* Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
* see comments in grow_memtuples().
@@ -1619,12 +1627,15 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
/*
* Nope; time to switch to tape-based operation.
*/
+ state->sub_status = TSSS_INIT_TAPES;
inittapes(state);
/*
* Dump tuples until we are back under the limit.
*/
+ state->sub_status = TSSS_DUMPING_TUPLES;
dumptuples(state, false);
+ state->sub_status = TSSS_INVALID;
break;
case TSS_BOUNDED:
@@ -1716,7 +1727,9 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
/*
* If we are over the memory limit, dump tuples till we're under.
*/
+ state->sub_status = TSSS_DUMPING_TUPLES;
dumptuples(state, false);
+ state->sub_status = TSSS_INVALID;
break;
default:
@@ -1788,6 +1801,7 @@ tuplesort_performsort(Tuplesortstate *state)
* We were able to accumulate all the tuples within the allowed
* amount of memory. Just qsort 'em and we're done.
*/
+ state->sub_status = TSSS_SORTING_IN_MEM;
tuplesort_sort_memtuples(state);
state->current = 0;
state->eof_reached = false;
@@ -1803,12 +1817,14 @@ tuplesort_performsort(Tuplesortstate *state)
* in memory, using a heap to eliminate excess tuples. Now we
* have to transform the heap to a properly-sorted array.
*/
+ state->sub_status = TSSS_SORTING_IN_MEM;
sort_bounded_heap(state);
state->current = 0;
state->eof_reached = false;
state->markpos_offset = 0;
state->markpos_eof = false;
state->status = TSS_SORTEDINMEM;
+ state->sub_status = TSSS_INVALID;
break;
case TSS_BUILDRUNS:
@@ -1819,12 +1835,15 @@ tuplesort_performsort(Tuplesortstate *state)
* run (or, if !randomAccess, one run per tape). Note that
* mergeruns sets the correct state->status.
*/
+ state->sub_status = TSSS_DUMPING_TUPLES;
dumptuples(state, true);
- mergeruns(state);
+ state->sub_status = TSSS_MERGING_TAPES;
+ mergeruns(state); // set TSS_SORTEDONTAPE
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
state->markpos_eof = false;
+ state->sub_status = TSSS_INVALID;
break;
default:
@@ -1867,11 +1886,15 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess);
Assert(!state->slabAllocatorUsed);
+
+ state->sub_status = TSSS_FETCHING_FROM_MEM;
+
if (forward)
{
if (state->current < state->memtupcount)
{
*stup = state->memtuples[state->current++];
+ state->tp_read_effective++;
return true;
}
state->eof_reached = true;
@@ -1904,6 +1927,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
return false;
}
*stup = state->memtuples[state->current - 1];
+ state->tp_read_effective++;
return true;
}
break;
@@ -1912,6 +1936,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
Assert(forward || state->randomAccess);
Assert(state->slabAllocatorUsed);
+ state->sub_status = TSSS_FETCHING_FROM_TAPES;
+
/*
* The slot that held the tuple that we returned in previous
* gettuple call can now be reused.
@@ -1930,6 +1956,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
if ((tuplen = getlen(state, state->result_tape, true)) != 0)
{
READTUP(state, stup, state->result_tape, tuplen);
+ state->tp_read[state->result_tape]++;
+ state->tp_read_effective++;
/*
* Remember the tuple we return, so that we can recycle
@@ -2018,6 +2046,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
if (nmoved != tuplen)
elog(ERROR, "bogus tuple length in backward scan");
READTUP(state, stup, state->result_tape, tuplen);
+ state->tp_read[state->result_tape]++;
+ state->tp_read_effective++;
/*
* Remember the tuple we return, so that we can recycle its memory
@@ -2032,6 +2062,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
/* We are managing memory ourselves, with the slab allocator. */
Assert(state->slabAllocatorUsed);
+ state->sub_status = TSSS_FETCHING_FROM_TAPES_WITH_MERGE;
+
/*
* The slab slot holding the tuple that we returned in previous
* gettuple call can now be reused.
@@ -2051,6 +2083,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
SortTuple newtup;
*stup = state->memtuples[0];
+ state->tp_read_effective++;
/*
* Remember the tuple we return, so that we can recycle its
@@ -2412,6 +2445,9 @@ inittapes(Tuplesortstate *state)
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
+
+ state->tp_read = (int *) palloc0(maxTapes * sizeof(int));
+ state->tp_write = (int *) palloc0(maxTapes * sizeof(int));
/*
* Give replacement selection a try based on user setting. There will be
@@ -2461,7 +2497,10 @@ inittapes(Tuplesortstate *state)
state->tp_runs[j] = 0;
state->tp_dummy[j] = 1;
state->tp_tapenum[j] = j;
+ state->tp_read[j] = 0;
+ state->tp_write[j] = 0;
}
+
state->tp_fib[state->tapeRange] = 0;
state->tp_dummy[state->tapeRange] = 0;
@@ -2814,6 +2853,8 @@ mergeonerun(Tuplesortstate *state)
/* write the tuple to destTape */
srcTape = state->memtuples[0].tupindex;
WRITETUP(state, destTape, &state->memtuples[0]);
+ state->tp_write[destTape]++;
+ state->tp_write_merge++;
/* recycle the slot of the tuple we just wrote out, for the next read */
if (state->memtuples[0].tuple)
@@ -2917,6 +2958,8 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
return false;
}
READTUP(state, stup, srcTape, tuplen);
+ state->tp_read[srcTape]++;
+ state->tp_read_merge++;
return true;
}
@@ -2960,6 +3003,8 @@ dumptuples(Tuplesortstate *state, bool alltuples)
Assert(state->memtupcount > 0);
WRITETUP(state, state->tp_tapenum[state->destTape],
&state->memtuples[0]);
+ state->tp_write[state->tp_tapenum[state->destTape]]++;
+ state->tp_write_effective++;
tuplesort_heap_delete_top(state, true);
}
else
@@ -3097,6 +3142,8 @@ dumpbatch(Tuplesortstate *state, bool alltuples)
{
WRITETUP(state, state->tp_tapenum[state->destTape],
&state->memtuples[i]);
+ state->tp_write[state->tp_tapenum[state->destTape]]++;
+ state->tp_write_effective++;
state->memtupcount--;
}
@@ -4448,3 +4495,79 @@ free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
pfree(stup->tuple);
}
+
+TupSortStatus tuplesort_status(Tuplesortstate* ts_state)
+{
+ return ts_state->status;
+}
+
+int tuplesort_memtupcount(Tuplesortstate* ts_state)
+{
+ return ts_state->memtupcount;
+}
+
+int tuplesort_memtupsize(Tuplesortstate* ts_state)
+{
+ return ts_state->memtupsize;
+}
+
+int tuplesort_sub_status(Tuplesortstate* ts_state)
+{
+ return ts_state->sub_status;
+}
+
+int tuplesort_get_max_tapes(Tuplesortstate* ts_state)
+{
+ return ts_state->maxTapes;
+}
+
+struct ts_report* tuplesort_get_state(Tuplesortstate* tss)
+{
+ int i;
+ struct ts_report* tsr;
+
+ if (tss == NULL)
+ return NULL;
+
+ tsr = (struct ts_report*) palloc0(sizeof(struct ts_report));
+
+ tsr->status = tss->status;
+ tsr->sub_status = tss->sub_status;
+
+ tsr->memtupcount = tss->memtupcount;
+ tsr->memtupsize = tss->memtupsize;
+
+ tsr->maxTapes= tss->maxTapes;
+ tsr->activeTapes = tss->activeTapes;
+ tsr->result_tape = tss->result_tape;
+
+ tsr->tp_read_effective = tss->tp_read_effective;
+ tsr->tp_write_effective = tss->tp_write_effective;
+
+ tsr->tp_read_merge = tss->tp_read_merge;
+ tsr->tp_write_merge = tss->tp_write_merge;
+
+ if (tss->maxTapes == 0)
+ return tsr;
+
+ tsr->tp_fib = (int*) palloc0(tsr->maxTapes * sizeof(int));
+ tsr->tp_runs = (int*) palloc0(tsr->maxTapes * sizeof(int));
+ tsr->tp_dummy = (int*) palloc0(tsr->maxTapes * sizeof(int));
+ tsr->tp_read = (int*) palloc0(tsr->maxTapes * sizeof(int));
+ tsr->tp_write = (int*) palloc0(tsr->maxTapes * sizeof(int));
+
+ for (i = 0; i < tss->maxTapes; i++) {
+ tsr->tp_fib[i] = tss->tp_fib[i];
+ tsr->tp_runs[i] = tss->tp_runs[i];
+ tsr->tp_dummy[i] = tss->tp_dummy[i];
+ tsr->tp_read[i] = tss->tp_read[i];
+ tsr->tp_write[i] = tss->tp_write[i];
+ }
+
+ if (tss->tapeset == NULL)
+ return tsr;
+
+ tsr->blocks_alloc = LogicalTapeSetBlocks(tss->tapeset);
+
+ return tsr;
+}
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index b3f6be7..48fd3bf 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -66,17 +66,6 @@
/*
- * Possible states of a Tuplestore object. These denote the states that
- * persist between calls of Tuplestore routines.
- */
-typedef enum
-{
- TSS_INMEM, /* Tuples still fit in memory */
- TSS_WRITEFILE, /* Writing to temp file */
- TSS_READFILE /* Reading from temp file */
-} TupStoreStatus;
-
-/*
* State for a single read pointer. If we are in state INMEM then all the
* read pointers' "current" fields denote the read positions. In state
* WRITEFILE, the file/offset fields denote the read positions. In state
@@ -158,9 +147,11 @@ struct Tuplestorestate
* includes the deleted pointers.
*/
void **memtuples; /* array of pointers to palloc'd tuples */
- int memtupdeleted; /* the first N slots are currently unused */
- int memtupcount; /* number of tuples currently present */
- int memtupsize; /* allocated length of memtuples array */
+ int memtupcount; /* number of tuples currently present */
+ int memtupskipped; /* number of tuples skipped */
+ int memtupread; /* number of tuples read */
+ int memtupdeleted; /* the first N slots are currently unused */
+ int memtupsize; /* allocated length of memtuples array */
bool growmemtuples; /* memtuples' growth still underway? */
/*
@@ -178,6 +169,11 @@ struct Tuplestorestate
int writepos_file; /* file# (valid if READFILE state) */
off_t writepos_offset; /* offset (valid if READFILE state) */
+
+ int tuples_count;
+ int tuples_skipped;
+ int tuples_read; /* may exceed tuples_count if multiple readers */
+ int tuples_deleted;
};
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
@@ -268,6 +264,14 @@ tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
state->memtupdeleted = 0;
state->memtupcount = 0;
+ state->memtupskipped = 0;
+ state->memtupread = 0;
+
+ state->tuples_count = 0;
+ state->tuples_read = 0;
+ state->tuples_skipped = 0;
+ state->tuples_deleted = 0;
+
state->tuples = 0;
/*
@@ -285,8 +289,7 @@ tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
state->activeptr = 0;
state->readptrcount = 1;
state->readptrsize = 8; /* arbitrary */
- state->readptrs = (TSReadPointer *)
- palloc(state->readptrsize * sizeof(TSReadPointer));
+ state->readptrs = (TSReadPointer *) palloc(state->readptrsize * sizeof(TSReadPointer));
state->readptrs[0].eflags = eflags;
state->readptrs[0].eof_reached = false;
@@ -442,6 +445,9 @@ tuplestore_clear(Tuplestorestate *state)
readptr->eof_reached = false;
readptr->current = 0;
}
+
+ state->tuples_count = 0;
+ state->tuples_read = 0;
}
/*
@@ -801,7 +807,8 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
/* Stash the tuple in the in-memory array */
state->memtuples[state->memtupcount++] = tuple;
-
+ state->tuples_count++;
+
/*
* Done if we still fit in available memory and have array slots.
*/
@@ -851,6 +858,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
}
WRITETUP(state, tuple);
+ state->tuples_count++;
break;
case TSS_READFILE:
@@ -884,6 +892,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
}
WRITETUP(state, tuple);
+ state->tuples_count++;
break;
default:
elog(ERROR, "invalid tuplestore state");
@@ -920,6 +929,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
if (readptr->current < state->memtupcount)
{
/* We have another tuple, so return it */
+ state->tuples_read++;
return state->memtuples[readptr->current++];
}
readptr->eof_reached = true;
@@ -950,6 +960,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
Assert(!state->truncated);
return NULL;
}
+ state->tuples_read++;
return state->memtuples[readptr->current - 1];
}
break;
@@ -981,6 +992,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
if ((tuplen = getlen(state, true)) != 0)
{
tup = READTUP(state, tuplen);
+ state->tuples_read++;
return tup;
}
else
@@ -1053,6 +1065,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
(errcode_for_file_access(),
errmsg("could not seek in tuplestore temporary file: %m")));
tup = READTUP(state, tuplen);
+ state->tuples_read++;
return tup;
default:
@@ -1151,6 +1164,7 @@ tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
if (state->memtupcount - readptr->current >= ntuples)
{
readptr->current += ntuples;
+ state->tuples_skipped++;
return true;
}
readptr->current = state->memtupcount;
@@ -1168,6 +1182,7 @@ tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
if (readptr->current - state->memtupdeleted > ntuples)
{
readptr->current -= ntuples;
+ state->tuples_skipped++;
return true;
}
Assert(!state->truncated);
@@ -1191,6 +1206,7 @@ tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
pfree(tuple);
CHECK_FOR_INTERRUPTS();
}
+ state->tuples_skipped++;
return true;
}
}
@@ -1221,6 +1237,7 @@ dumptuples(Tuplestorestate *state)
if (i >= state->memtupcount)
break;
WRITETUP(state, state->memtuples[i]);
+ state->tuples_count++;
}
state->memtupdeleted = 0;
state->memtupcount = 0;
@@ -1457,6 +1474,30 @@ tuplestore_in_memory(Tuplestorestate *state)
return (state->status == TSS_INMEM);
}
+unsigned int
+tuplestore_status(Tuplestorestate *state)
+{
+ return state->status;
+}
+
+void
+tuplestore_get_state(Tuplestorestate *state, struct tss_report* tss)
+{
+ tss->memtupcount = state->memtupcount;
+ tss->memtupskipped = state->memtupskipped;
+ tss->memtupread = state->memtupread;
+ tss->memtupdeleted = state->memtupdeleted;
+
+ tss->tuples_count = state->tuples_count;
+ tss->tuples_read = state->tuples_read;
+ tss->tuples_skipped = state->tuples_skipped;
+ tss->tuples_deleted = state->tuples_deleted;
+ tss->readptrcount = state->readptrcount;
+
+ tss->disk_size = BufFileGetDiskSize(state->myfile);
+
+ tss->status = state->status;
+}
/*
* Tape interface routines
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 460cdb9..b6b8602 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2832,6 +2832,8 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f
DESCR("statistics: information about currently active backends");
DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
DESCR("statistics: information about progress of backends running maintenance command");
+DATA(insert OID = 3409 ( pg_progress PGNSP PGUID 12 1 100 0 0 f f f f t t s r 2 0 2249 "23 23" "{23,23,23,23,23,23,23,25,25,25,25}" "{i,i,o,o,o,o,o,o,o,o,o}" "{pid,verbose,pid,ppid,bid,lineid,indent,type,name,value,unit}" _null_ _null_ pg_progress _null_ _null_ _null_ ));
+DESCR("progress: progression report about long running SQL queries");
DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h
index e2a0ee0..bba0c67 100644
--- a/src/include/commands/explain.h
+++ b/src/include/commands/explain.h
@@ -68,6 +68,8 @@ extern ExplainState *NewExplainState(void);
extern TupleDesc ExplainResultDesc(ExplainStmt *stmt);
+extern bool ExplainPreScanNode(PlanState* planstate, Bitmapset** rels_used);
+
extern void ExplainOneUtility(Node *utilityStmt, IntoClause *into,
ExplainState *es, const char *queryString,
ParamListInfo params, QueryEnvironment *queryEnv);
diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h
index 37de6f2..d3d9bba 100644
--- a/src/include/executor/execdesc.h
+++ b/src/include/executor/execdesc.h
@@ -50,6 +50,7 @@ typedef struct QueryDesc
/* This field is set by ExecutorRun */
bool already_executed; /* true if previously executed */
+ bool query_completed; /* true if ExecutorRun() completed */
/* This is always set NULL by the core system, but plugins can change it */
struct Instrumentation *totaltime; /* total time spent in ExecutorRun */
@@ -67,4 +68,6 @@ extern QueryDesc *CreateQueryDesc(PlannedStmt *plannedstmt,
extern void FreeQueryDesc(QueryDesc *qdesc);
+extern PGDLLIMPORT QueryDesc* MyQueryDesc;
+extern PGDLLIMPORT bool IsQueryDescValid;
#endif /* EXECDESC_H */
diff --git a/src/include/executor/progress.h b/src/include/executor/progress.h
new file mode 100644
index 0000000..506ce68
--- /dev/null
+++ b/src/include/executor/progress.h
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * progress.h
+ * Progress of query: PROGRESS
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/progress.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef PROGRESS_H
+#define PROGRESS_H
+
+/*
+ * This is arbitratry defined
+ * TODO: Add a guc variable to enable dynamic definition
+ */
+#define PROGRESS_AREA_SIZE (4096 * 128)
+
+/*
+ * Report only SQL querries which have been running longer than this value
+ */
+extern int progress_time_threshold;
+
+/*
+ * Track when a progress report has been requested
+ */
+extern volatile bool progress_requested;
+
+/*
+ * Init and Fini functions
+ */
+extern size_t ProgressShmemSize(void);
+extern void ProgressShmemInit(void);
+
+/*
+ * external functions
+ */
+extern void ProgressFetchReport(int pid, int bid, int verbose, char* buf);
+extern void HandleProgressSignal(void);
+extern void HandleProgressRequest(void);
+
+#endif /* PROGRESS_H */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 4c607b2..7828b20 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -271,6 +271,8 @@ typedef char *pg_stack_base_t;
extern pg_stack_base_t set_stack_base(void);
extern void restore_stack_base(pg_stack_base_t base);
extern void check_stack_depth(void);
+extern long get_stack_depth(void);
+extern long get_max_stack_depth(void);
extern bool stack_is_too_deep(void);
/* in tcop/utility.c */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index d33392f..08f646c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -819,6 +819,9 @@ typedef struct PlanState
Instrumentation *instrument; /* Optional runtime stats for this node */
WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */
+ double plan_rows; /* number of rows returned so far */
+ unsigned short percent_done; /* percentage of execution computed so far */
+
/*
* Common structural data for all Plan types. These links to subsidiary
* state trees parallel links in the associated plan tree (except for the
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 15de936..1fbb460 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -626,6 +626,13 @@ extern void *copyObjectImpl(const void *obj);
*/
extern bool equal(const void *a, const void *b);
+/*
+ * plan nodes functions
+ */
+struct PlanInfo;
+struct Plan;
+
+extern int planNodeInfo(struct Plan* plan, struct PlanInfo* info);
/*
* Typedefs for identifying qualifier selectivities and plan costs as such.
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index d84372d..4fa30bd 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -173,6 +173,17 @@ typedef struct Plan
#define innerPlan(node) (((Plan *)(node))->righttree)
#define outerPlan(node) (((Plan *)(node))->lefttree)
+/*
+ * Structure used to fetch Plan node informations in text format
+ */
+typedef struct PlanInfo {
+ const char* pname; /* node type name for text output */
+ const char *sname; /* node type name for non-text output */
+ const char *strategy;
+ const char *partialmode;
+ const char *operation;
+ const char *custom_name;
+} PlanInfo;
/* ----------------
* Result node -
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5e029c0..9b15433 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -812,7 +812,8 @@ typedef enum
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP,
WAIT_EVENT_LOGICAL_SYNC_DATA,
- WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
+ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
+ WAIT_EVENT_PROGRESS
} WaitEventIPC;
/* ----------
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index fe00bf0..dc69ff6 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -30,6 +30,15 @@
typedef struct BufFile BufFile;
+struct buffile_state {
+ int numFiles;
+
+ int* bytes_read;
+ int* bytes_write;
+
+ unsigned long disk_size;
+};
+
/*
* prototypes for functions in buffile.c
*/
@@ -41,5 +50,7 @@ extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size);
extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
extern int BufFileSeekBlock(BufFile *file, long blknum);
+extern struct buffile_state* BufFileState(BufFile *file);
+extern int BufFileGetDiskSize(BufFile *file);
#endif /* BUFFILE_H */
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 0568049..4661e3e 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -123,6 +123,8 @@ extern int durable_unlink(const char *fname, int loglevel);
extern int durable_link_or_rename(const char *oldfile, const char *newfile, int loglevel);
extern void SyncDataDirectory(void);
+extern int FileGetSize(File file);
+
/* Filename components for OpenTemporaryFile */
#define PG_TEMP_FILES_DIR "pgsql_tmp"
#define PG_TEMP_FILE_PREFIX "pgsql_tmp"
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 22955a7..3dfe2d4 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -89,6 +89,9 @@ extern RunningTransactions GetRunningTransactionData(void);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern bool TransactionIdIsActive(TransactionId xid);
+
+extern BackendId ProcPidGetBackendId(int pid);
+
extern TransactionId GetOldestXmin(Relation rel, int flags);
extern TransactionId GetOldestActiveTransactionId(void);
extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index d068dde..c0f3dbe 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,9 @@ typedef enum
PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
+ /* progress monitoring */
+ PROCSIG_PROGRESS,
+
NUM_PROCSIGNALS /* Must be last! */
} ProcSignalReason;
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 12ff458..664603f 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -17,6 +17,7 @@
#include "nodes/parsenodes.h"
#include "utils/portal.h"
+extern bool RecoveryConflictPending;
extern PGDLLIMPORT Portal ActivePortal;
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index e802c4b..ee5d0f0 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -42,5 +42,7 @@ extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
+extern long LogicalTapeSetBlocksWritten(LogicalTapeSet *lts);
+extern int LogicalTapeGetSize(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 14b9026..07b8475 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -25,6 +25,32 @@
#include "fmgr.h"
#include "utils/relcache.h"
+/*
+ * Possible states of a Tuplesort object. These denote the states that
+ * persist between calls of Tuplesort routines.
+ */
+typedef enum
+{
+ TSS_INITIAL, /* Loading tuples; still within memory limit */
+ TSS_BOUNDED, /* Loading tuples into bounded-size heap */
+ TSS_BUILDRUNS, /* Loading tuples; writing to tape */
+ TSS_SORTEDINMEM, /* Sort completed entirely in memory */
+ TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
+ TSS_FINALMERGE /* Performing final merge on-the-fly */
+} TupSortStatus;
+
+typedef enum
+{
+ TSSS_INVALID, /* Invalid sub status */
+ TSSS_INIT_TAPES, /* Creating tapes */
+ TSSS_DUMPING_TUPLES, /* dumping tuples from mem to tapes */
+ TSSS_SORTING_IN_MEM,
+ TSSS_SORTING_ON_TAPES,
+ TSSS_MERGING_TAPES,
+ TSSS_FETCHING_FROM_MEM,
+ TSSS_FETCHING_FROM_TAPES,
+ TSSS_FETCHING_FROM_TAPES_WITH_MERGE
+} TupSortSubStatus;
/* Tuplesortstate is an opaque type whose details are not known outside
* tuplesort.c.
@@ -32,9 +58,46 @@
typedef struct Tuplesortstate Tuplesortstate;
/*
+ * Used to fetch state of Tuplesortstate
+ */
+struct ts_report {
+ TupSortStatus status;
+ TupSortSubStatus sub_status;
+
+ int memtupcount;
+ Size memtupsize;
+
+ int maxTapes;
+
+
+ int* tp_fib;
+ int* tp_runs;
+ int* tp_dummy;
+ int* tp_tapenum;
+ int activeTapes;
+ int result_tape;
+
+ int* tp_read;
+ int* tp_write;
+
+ /*
+ * Effective rows in/out from sort
+ */
+ int tp_read_effective;
+ int tp_write_effective;
+
+ /*
+ * Rows in/out needed to perform sort
+ */
+ int tp_read_merge;
+ int tp_write_merge;
+ int blocks_alloc;
+};
+
+/*
* We provide multiple interfaces to what is essentially the same code,
* since different callers have different data to be sorted and want to
- * specify the sort key information differently. There are two APIs for
+ * specify the sortkey information differently. There are two APIs for
* sorting HeapTuples and two more for sorting IndexTuples. Yet another
* API supports sorting bare Datums.
*
@@ -123,4 +186,11 @@ extern void tuplesort_rescan(Tuplesortstate *state);
extern void tuplesort_markpos(Tuplesortstate *state);
extern void tuplesort_restorepos(Tuplesortstate *state);
+extern TupSortStatus tuplesort_status(Tuplesortstate* state);
+extern int tuplesort_memtupcount(Tuplesortstate* state);
+extern int tuplesort_memtupsize(Tuplesortstate* state);
+extern int tuplesort_sub_status(Tuplesortstate* state);
+extern int tuplesort_get_max_tapes(Tuplesortstate* state);
+extern struct ts_report* tuplesort_get_state(Tuplesortstate* tss);
+
#endif /* TUPLESORT_H */
diff --git a/src/include/utils/tuplestore.h b/src/include/utils/tuplestore.h
index b31ede8..8ca7dde 100644
--- a/src/include/utils/tuplestore.h
+++ b/src/include/utils/tuplestore.h
@@ -33,6 +33,15 @@
#include "executor/tuptable.h"
+/*
+ * Possible states of a Tuplestore object. These denote the states that
+ * persist between calls of Tuplestore routines.
+ */
+typedef enum {
+ TSS_INMEM, /* Tuples still fit in memory */
+ TSS_WRITEFILE, /* Writing to temp file */
+ TSS_READFILE /* Reading from temp file */
+} TupStoreStatus;
/* Tuplestorestate is an opaque type whose details are not known outside
* tuplestore.c.
@@ -40,6 +49,26 @@
typedef struct Tuplestorestate Tuplestorestate;
/*
+ * Use dto fetch progress/status of Tuplestore
+ */
+struct tss_report {
+ int memtupcount;
+ int memtupskipped;
+ int memtupread;
+ int memtupdeleted;
+
+ int tuples_count;
+ int tuples_skipped;
+ int tuples_read;
+ int tuples_deleted;
+ int readptrcount;
+
+ unsigned long disk_size;
+
+ int status;
+};
+
+/*
* Currently we only need to store MinimalTuples, but it would be easy
* to support the same behavior for IndexTuples and/or bare Datums.
*/
@@ -69,6 +98,8 @@ extern void tuplestore_copy_read_pointer(Tuplestorestate *state,
extern void tuplestore_trim(Tuplestorestate *state);
extern bool tuplestore_in_memory(Tuplestorestate *state);
+extern unsigned int tuplestore_status(Tuplestorestate *state);
+
extern bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
bool copy, TupleTableSlot *slot);
@@ -88,4 +119,8 @@ extern void tuplestore_clear(Tuplestorestate *state);
extern void tuplestore_end(Tuplestorestate *state);
+extern bool tuplestore_in_memory(Tuplestorestate *state);
+extern unsigned int tuplestore_status(Tuplestorestate *state);
+extern void tuplestore_get_state(Tuplestorestate *state, struct tss_report* tss);
+
#endif /* TUPLESTORE_H */
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com> wrote:
test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name), value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN produces.
If the one shared memory page is not enough for the whole progress report,
the progress report transfert between the 2 backends is done with a series
of request/response. Before setting the latch, the monitored backend write
the size of the data dumped in shared memory and set a status to indicate
that more data is to be sent through the shared memory page. The monitoring
backends get the result and sends an other signal, and then wait for the
latch again. The monitored backend does not collect a new progress report
but continues to dump the already collected report. And the exchange goes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2017-07-26 15:27 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com>
wrote:test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN
produces.
me too.
Regards
Pavel
Show quoted text
If the one shared memory page is not enough for the whole progress
report,
the progress report transfert between the 2 backends is done with a
series
of request/response. Before setting the latch, the monitored backend
write
the size of the data dumped in shared memory and set a status to indicate
that more data is to be sent through the shared memory page. Themonitoring
backends get the result and sends an other signal, and then wait for the
latch again. The monitored backend does not collect a new progress report
but continues to dump the already collected report. And the exchangegoes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2017-07-26 15:27 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com>
wrote:test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN
produces.
I had initially used the same output as for the ANALYZE command:
test=# PROGRESS 14611;
PLAN
PROGRESS
-----------------------------------------------------------------------------------------
Gather Merge
-> Sort=> dumping tuples to tapes
rows r/w merge 0/0 rows r/w effective 0/1464520 0%
Sort Key: md5
-> Parallel Seq Scan on t_10m => rows 1464520/4166700 35% blks
36011/83334 43%
(5 rows)
test=#
But this restricts the use to "human consumers". Using a table output with
name/value pairs, allows the use by utilities for instance, without
parsing. This is less handy for administrators, but far better for 3rd
party utilities. One solution is otherwise to create a PL/SQL command on
top of pg_progress() SQL function to produce an output similar to the one
of the ANALYZE command.
If the one shared memory page is not enough for the whole progress
report,
the progress report transfert between the 2 backends is done with a
series
of request/response. Before setting the latch, the monitored backend
write
the size of the data dumped in shared memory and set a status to indicate
that more data is to be sent through the shared memory page. Themonitoring
backends get the result and sends an other signal, and then wait for the
latch again. The monitored backend does not collect a new progress report
but continues to dump the already collected report. And the exchangegoes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.
Yes, I had once considered this solution but then moved away as I was
unsure of the exact need for the transfert of the progress report between
the monitored and the monitoring backends.
I'am going to switch to shm_mq.
Thx & Rgds
Show quoted text
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
2017-07-26 16:24 GMT+02:00 Pavel Stehule <pavel.stehule@gmail.com>:
2017-07-26 15:27 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com>
wrote:test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN
produces.me too.
Regards
Pavel
Above output is better for utilities. No need to parse the fields. But I
can also provide a second SQL function name pg_progress_admin() with an
output similar to ANALYZE command.
Then comes an other question about the format of the output which can be
TEXT, XML, JSON or YAML as for the ANALYZE command.
An other solution is also to use a PL/SQL package to transform the
pg_progress() output into an output similar to ANALYZE command and let the
use decide which format (XML, JSON, ...) to use.
Thx & Rgds
Remi
Show quoted text
If the one shared memory page is not enough for the whole progress
report,
the progress report transfert between the 2 backends is done with a
series
of request/response. Before setting the latch, the monitored backend
write
the size of the data dumped in shared memory and set a status to
indicate
that more data is to be sent through the shared memory page. The
monitoring
backends get the result and sends an other signal, and then wait for the
latch again. The monitored backend does not collect a new progressreport
but continues to dump the already collected report. And the exchange
goes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2017-07-31 11:09 GMT+02:00 Remi Colinet <remi.colinet@gmail.com>:
2017-07-26 15:27 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com>
wrote:test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN
produces.I had initially used the same output as for the ANALYZE command:
test=# PROGRESS 14611;
PLAN PROGRESS------------------------------------------------------------
-----------------------------
Gather Merge
-> Sort=> dumping tuples to tapes
rows r/w merge 0/0 rows r/w effective 0/1464520 0%
Sort Key: md5
-> Parallel Seq Scan on t_10m => rows 1464520/4166700 35% blks
36011/83334 43%
(5 rows)test=#
But this restricts the use to "human consumers". Using a table output with
name/value pairs, allows the use by utilities for instance, without
parsing. This is less handy for administrators, but far better for 3rd
party utilities. One solution is otherwise to create a PL/SQL command on
top of pg_progress() SQL function to produce an output similar to the one
of the ANALYZE command.
you can support XML, JSON output format like EXPLAIN does.
https://www.postgresql.org/docs/current/static/sql-explain.html
Regards
pavel
Show quoted text
If the one shared memory page is not enough for the whole progress
report,
the progress report transfert between the 2 backends is done with a
series
of request/response. Before setting the latch, the monitored backend
write
the size of the data dumped in shared memory and set a status to
indicate
that more data is to be sent through the shared memory page. The
monitoring
backends get the result and sends an other signal, and then wait for the
latch again. The monitored backend does not collect a new progressreport
but continues to dump the already collected report. And the exchange
goes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.Yes, I had once considered this solution but then moved away as I was
unsure of the exact need for the transfert of the progress report between
the monitored and the monitoring backends.
I'am going to switch to shm_mq.Thx & Rgds
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Mon, Jul 31, 2017 at 6:10 AM, Pavel Stehule <pavel.stehule@gmail.com> wrote:
you can support XML, JSON output format like EXPLAIN does.
https://www.postgresql.org/docs/current/static/sql-explain.html
+1 for that approach.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
I did it in version 2 of the patch.
The patch could yield TEXT, JSON, and XML ouput.
For below query, it gives:
=> Terminal 1
test=# select * from t_10m, t_1m where t_10m.md5 like '%cb%';
=> Terminal 2
test=# \watch PROGRESS 9546;
Wed 10 May 2017 06:29:59 PM CEST (every 1s)
PLAN
PROGRESS
---------------------------------------------------------------------------------------------------------
status: <query running>
query: select * from t_10m, t_1m where t_10m.md5 like '%cb%';
time used (s): 10
Nested Loop
-> Seq Scan on t_1m => rows 7/1000000 0% => blks 8334/8334 100%
-> Materialize => file read readptrcount=1 rows write=1189285
read=6584854 disk use (bytes) 53842965
-> Seq Scan on t_10m => rows 1145596/738172 155%
Filter: (md5 ~~ '%cb%'::text)
total disk space used (MB) 51
(9 rows)
=> Terminal 2
test=# PROGRESS (FORMAT JSON) 9546;
PLAN PROGRESS
----------------------------------------------------------------------
[ +
"status": "<query running>", +
"query": "select * from t_10m, t_1m where t_10m.md5 like '%cb%';",+
"time used (s)": 0, +
"single worker": { +
"Node Type": "Nested Loop", +
"Partial Mode": "", +
"Operation": "single worker", +
"Parent Relationship": "single worker", +
"Custom Plan Provider": "(@\u0004\u0001", +
"Parallel Aware": false, +
"Outer": { +
"Node Type": "Seq Scan", +
"Strategy": "", +
"Partial Mode": "single worker", +
"Operation": "Outer", +
"Parent Relationship": "Outer", +
"Custom Plan Provider": "(@\u0004\u0001", +
"Parallel Aware": false, +
"relation": "t_1m", +
"rows fetched": 1, +
"rows total": 1000000, +
"rows percent": 0, +
"blocks fetched": 8334, +
"blocks total": 8334, +
"blocks percent": 100 +
}, +
"Inner": { +
"Node Type": "Materialize", +
"Strategy": "", +
"Partial Mode": "single worker", +
"Operation": "Inner", +
"Parent Relationship": "Inner", +
"Custom Plan Provider": "(@\u0004\u0001", +
"Parallel Aware": false, +
"file store": "write", +
"readptrcount": 1, +
"rows write": 297256, +
"rows read": 0, +
"disk use (bytes)": 11911168, +
"Outer": { +
"Node Type": "Seq Scan", +
"Strategy": "", +
"Partial Mode": "Inner", +
"Operation": "Outer", +
"Parent Relationship": "Outer", +
"Custom Plan Provider": "HtFH\b[]\u000f\u001f", +
"Parallel Aware": false, +
"relation": "t_10m", +
"rows fetched": 253566, +
"rows total": 738172, +
"rows percent": 34, +
"blocks fetched": 18436, +
"blocks total": 83334, +
"blocks percent": 22, +
"Filter": "(md5 ~~ '%cb%'::text)" +
} +
} +
}, +
"unit": "MB", +
"total disk space used": 11 +
]
(1 row)
test=#
I'am skeptical about the use of JSON, XML, and others in such output.
Does anyone use these formats (XML, JSON, YAML) for EXPLAIN output?
I suspect only TEXT format is being used.
This looks better with a SQL table made of name/value fields to be used
both by administrators and by utilities.
And this is much more inline with SQL. No need to parse JSON, XML or any
other format. Output can be controlled with WHERE clause.
Rgds
Remi
2017-07-31 12:10 GMT+02:00 Pavel Stehule <pavel.stehule@gmail.com>:
Show quoted text
2017-07-31 11:09 GMT+02:00 Remi Colinet <remi.colinet@gmail.com>:
2017-07-26 15:27 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com>
wrote:test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN
produces.I had initially used the same output as for the ANALYZE command:
test=# PROGRESS 14611;
PLAN PROGRESS------------------------------------------------------------
-----------------------------
Gather Merge
-> Sort=> dumping tuples to tapes
rows r/w merge 0/0 rows r/w effective 0/1464520 0%
Sort Key: md5
-> Parallel Seq Scan on t_10m => rows 1464520/4166700 35% blks
36011/83334 43%
(5 rows)test=#
But this restricts the use to "human consumers". Using a table output
with name/value pairs, allows the use by utilities for instance, without
parsing. This is less handy for administrators, but far better for 3rd
party utilities. One solution is otherwise to create a PL/SQL command on
top of pg_progress() SQL function to produce an output similar to the one
of the ANALYZE command.you can support XML, JSON output format like EXPLAIN does.
https://www.postgresql.org/docs/current/static/sql-explain.html
Regards
pavel
If the one shared memory page is not enough for the whole progress
report,
the progress report transfert between the 2 backends is done with a
series
of request/response. Before setting the latch, the monitored backend
write
the size of the data dumped in shared memory and set a status to
indicate
that more data is to be sent through the shared memory page. The
monitoring
backends get the result and sends an other signal, and then wait for
the
latch again. The monitored backend does not collect a new progress
report
but continues to dump the already collected report. And the exchange
goes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.Yes, I had once considered this solution but then moved away as I was
unsure of the exact need for the transfert of the progress report between
the monitored and the monitoring backends.
I'am going to switch to shm_mq.Thx & Rgds
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On Tue, Aug 1, 2017 at 6:35 PM, Remi Colinet <remi.colinet@gmail.com> wrote:
I'am skeptical about the use of JSON, XML, and others in such output.
You should not.
Does anyone use these formats (XML, JSON, YAML) for EXPLAIN output?
I suspect only TEXT format is being used.
I think that Depesz makes use of a non-default format for its
explain.depesz.com, or he would have a hard time maintaining a
deparsing API for its application. JSON is for example easy to extract
and reformat when doing analysis of the inner planner nodes, and
Postgres has json and xml data types, which makes analysis with SQL
even easier sometimes.
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-08-01 19:11:55 +0200, Michael Paquier wrote:
I think that Depesz makes use of a non-default format for its
explain.depesz.com, or he would have a hard time maintaining a
deparsing API for its application.
Hm? e.d.c accepts the text explain format, so I'm unclear on what you're
saying here.
Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Aug 1, 2017 at 12:35 PM, Remi Colinet <remi.colinet@gmail.com> wrote:
I did it in version 2 of the patch.
I'am skeptical about the use of JSON, XML, and others in such output.Does anyone use these formats (XML, JSON, YAML) for EXPLAIN output?
I suspect only TEXT format is being used.
Do you have any reason to suspect that others aren't being used? The
default format for anything is likely to be the most commonly-used
one, but I don't think that proves the others are unused.
Even if it were true, it wouldn't be a good justification for
inventing an entirely new machine-readable format, at least not IMHO.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Aug 1, 2017 at 7:17 PM, Andres Freund <andres@anarazel.de> wrote:
On 2017-08-01 19:11:55 +0200, Michael Paquier wrote:
I think that Depesz makes use of a non-default format for its
explain.depesz.com, or he would have a hard time maintaining a
deparsing API for its application.Hm? e.d.c accepts the text explain format, so I'm unclear on what you're
saying here.
Ah, right. I thought it did...
--
Michael
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
2017-08-01 18:35 GMT+02:00 Remi Colinet <remi.colinet@gmail.com>:
I did it in version 2 of the patch.
The patch could yield TEXT, JSON, and XML ouput.For below query, it gives:
=> Terminal 1
test=# select * from t_10m, t_1m where t_10m.md5 like '%cb%';=> Terminal 2
test=# \watch PROGRESS 9546;
Wed 10 May 2017 06:29:59 PM CEST (every 1s)PLAN
PROGRESS
------------------------------------------------------------
---------------------------------------------
status: <query running>
query: select * from t_10m, t_1m where t_10m.md5 like '%cb%';
time used (s): 10
Nested Loop
-> Seq Scan on t_1m => rows 7/1000000 0% => blks 8334/8334 100%
-> Materialize => file read readptrcount=1 rows write=1189285
read=6584854 disk use (bytes) 53842965
-> Seq Scan on t_10m => rows 1145596/738172 155%
Filter: (md5 ~~ '%cb%'::text)
total disk space used (MB) 51
(9 rows)=> Terminal 2
test=# PROGRESS (FORMAT JSON) 9546;
PLAN PROGRESS
----------------------------------------------------------------------
[ +
"status": "<query running>", +
"query": "select * from t_10m, t_1m where t_10m.md5 like '%cb%';",+
"time used (s)": 0, +
"single worker": { +
"Node Type": "Nested Loop", +
"Partial Mode": "", +
"Operation": "single worker", +
"Parent Relationship": "single worker", +
"Custom Plan Provider": "(@\u0004\u0001", +
"Parallel Aware": false, +
"Outer": { +
"Node Type": "Seq Scan", +
"Strategy": "", +
"Partial Mode": "single worker", +
"Operation": "Outer", +
"Parent Relationship": "Outer", +
"Custom Plan Provider": "(@\u0004\u0001", +
"Parallel Aware": false, +
"relation": "t_1m", +
"rows fetched": 1, +
"rows total": 1000000, +
"rows percent": 0, +
"blocks fetched": 8334, +
"blocks total": 8334, +
"blocks percent": 100 +
}, +
"Inner": { +
"Node Type": "Materialize", +
"Strategy": "", +
"Partial Mode": "single worker", +
"Operation": "Inner", +
"Parent Relationship": "Inner", +
"Custom Plan Provider": "(@\u0004\u0001", +
"Parallel Aware": false, +
"file store": "write", +
"readptrcount": 1, +
"rows write": 297256, +
"rows read": 0, +
"disk use (bytes)": 11911168, +
"Outer": { +
"Node Type": "Seq Scan", +
"Strategy": "", +
"Partial Mode": "Inner", +
"Operation": "Outer", +
"Parent Relationship": "Outer", +
"Custom Plan Provider": "HtFH\b[]\u000f\u001f", +
"Parallel Aware": false, +
"relation": "t_10m", +
"rows fetched": 253566, +
"rows total": 738172, +
"rows percent": 34, +
"blocks fetched": 18436, +
"blocks total": 83334, +
"blocks percent": 22, +
"Filter": "(md5 ~~ '%cb%'::text)" +
} +
} +
}, +
"unit": "MB", +
"total disk space used": 11 +
]
(1 row)test=#
I'am skeptical about the use of JSON, XML, and others in such output.
Does anyone use these formats (XML, JSON, YAML) for EXPLAIN output?
I suspect only TEXT format is being used.This looks better with a SQL table made of name/value fields to be used
both by administrators and by utilities.
I disagree. It should be consistent with EXPLAIN - there is zero benefit
from introduction new format.
I know few utils that uses JSON format.
Regards
Pavel
Show quoted text
And this is much more inline with SQL. No need to parse JSON, XML or any
other format. Output can be controlled with WHERE clause.Rgds
Remi2017-07-31 12:10 GMT+02:00 Pavel Stehule <pavel.stehule@gmail.com>:
2017-07-31 11:09 GMT+02:00 Remi Colinet <remi.colinet@gmail.com>:
2017-07-26 15:27 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Wed, Jun 21, 2017 at 10:01 AM, Remi Colinet <remi.colinet@gmail.com>
wrote:test=# SELECT pid, ppid, bid, concat(repeat(' ', 3 * indent),name),
value,
unit FROM pg_progress(0,0);
pid | ppid | bid | concat | value | unit
-------+------+-----+------------------+------------------+---------
14106 | 0 | 4 | status | query running |
14106 | 0 | 4 | relationship | progression |
14106 | 0 | 4 | node name | Sort |
14106 | 0 | 4 | sort status | on tapes writing |
14106 | 0 | 4 | completion | 0 | percent
14106 | 0 | 4 | relationship | Outer |
14106 | 0 | 4 | node name | Seq Scan |
14106 | 0 | 4 | scan on | t_10m |
14106 | 0 | 4 | fetched | 25049 | block
14106 | 0 | 4 | total | 83334 | block
14106 | 0 | 4 | completion | 30 | percent
(11 rows)test=#
Somehow I imagined that the output would look more like what EXPLAIN
produces.I had initially used the same output as for the ANALYZE command:
test=# PROGRESS 14611;
PLAN
PROGRESS
------------------------------------------------------------
-----------------------------
Gather Merge
-> Sort=> dumping tuples to tapes
rows r/w merge 0/0 rows r/w effective 0/1464520 0%
Sort Key: md5
-> Parallel Seq Scan on t_10m => rows 1464520/4166700 35% blks
36011/83334 43%
(5 rows)test=#
But this restricts the use to "human consumers". Using a table output
with name/value pairs, allows the use by utilities for instance, without
parsing. This is less handy for administrators, but far better for 3rd
party utilities. One solution is otherwise to create a PL/SQL command on
top of pg_progress() SQL function to produce an output similar to the one
of the ANALYZE command.you can support XML, JSON output format like EXPLAIN does.
https://www.postgresql.org/docs/current/static/sql-explain.html
Regards
pavel
If the one shared memory page is not enough for the whole progress
report,
the progress report transfert between the 2 backends is done with a
series
of request/response. Before setting the latch, the monitored backend
write
the size of the data dumped in shared memory and set a status to
indicate
that more data is to be sent through the shared memory page. The
monitoring
backends get the result and sends an other signal, and then wait for
the
latch again. The monitored backend does not collect a new progress
report
but continues to dump the already collected report. And the exchange
goes on
until the full progress report has been dumped.
This is basically what shm_mq does. We probably don't want to
reinvent that code, as it has taken a surprising amount of debugging
to get it fully working.Yes, I had once considered this solution but then moved away as I was
unsure of the exact need for the transfert of the progress report between
the monitored and the monitoring backends.
I'am going to switch to shm_mq.Thx & Rgds
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
On 08/01/2017 06:35 PM, Remi Colinet wrote:
Does anyone use these formats (XML, JSON, YAML) for EXPLAIN output?
Yes : http://tatiyants.com/pev/#/plans :)
--
Adrien NAYRAT
2017-08-01 19:57 GMT+02:00 Robert Haas <robertmhaas@gmail.com>:
On Tue, Aug 1, 2017 at 12:35 PM, Remi Colinet <remi.colinet@gmail.com>
wrote:I did it in version 2 of the patch.
I'am skeptical about the use of JSON, XML, and others in such output.Does anyone use these formats (XML, JSON, YAML) for EXPLAIN output?
I suspect only TEXT format is being used.Do you have any reason to suspect that others aren't being used? The
default format for anything is likely to be the most commonly-used
one, but I don't think that proves the others are unused.Even if it were true, it wouldn't be a good justification for
inventing an entirely new machine-readable format, at least not IMHO.
In version 3, my idea was to use a similar output as the one used for
Ora..e database with the v$session_longops dynamic table.
May be this is not such a good idea then. Though, it seems very handy at
1st sight.
I can revert to TEXT, JSON, XML and YAML. I will need to modify EXPLAIN
code in order to share some common parts for output formatting. Basically,
this would not change the code of EXPLAIN unless than moving some functions
in a pg_report.c file and with function names starting by ReportXXX instead
of ExplainXXX . Duplicating code for such output is not an option.
Rgds
Show quoted text
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company