diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index a666391..82da86f 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -190,6 +190,9 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) estate->es_param_exec_vals = (ParamExecData *) palloc0(queryDesc->plannedstmt->nParamExec * sizeof(ParamExecData)); + estate->es_queryString = (char *) palloc0(strlen(queryDesc->sourceText) + 1); + estate->es_queryString = queryDesc->sourceText; + /* * If non-read-only query, set the command ID to mark output tuples with */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index fe87c9a..e2c2e9c 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -37,6 +37,7 @@ #include "utils/dsa.h" #include "utils/memutils.h" #include "utils/snapmgr.h" +#include "pgstat.h" /* * Magic numbers for parallel executor communication. We use constants @@ -51,7 +52,8 @@ #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 - +/* Save query text to pass on to the workers */ +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) /* * DSM structure for accumulating per-PlanState instrumentation. * @@ -350,6 +352,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) int instrumentation_len = 0; int instrument_offset = 0; Size dsa_minsize = dsa_minimum_size(); + char *query_string; + char *query_data; + int query_len; + + query_data = (char *) palloc0(strlen(estate->es_queryString) + 1); + strcpy(query_data, estate->es_queryString); /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); @@ -369,6 +377,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for query text. */ + query_len = strlen(query_data); + shm_toc_estimate_chunk(&pcxt->estimator, query_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized PlannedStmt. */ pstmt_len = strlen(pstmt_data) + 1; shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); @@ -433,6 +446,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store query string */ + query_string = shm_toc_allocate(pcxt->toc, query_len); + memcpy(query_string, query_data, query_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string); + /* Store serialized PlannedStmt. */ pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len); memcpy(pstmt_space, pstmt_data, pstmt_len); @@ -643,6 +661,10 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, char *paramspace; PlannedStmt *pstmt; ParamListInfo paramLI; + char *queryString; + + /* Get the query string from shared memory */ + queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT); /* Reconstruct leader-supplied PlannedStmt. */ pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT); @@ -661,7 +683,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, * revising this someday. */ return CreateQueryDesc(pstmt, - "", + queryString, GetActiveSnapshot(), InvalidSnapshot, receiver, paramLI, instrument_options); } @@ -775,6 +797,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) instrument_options = instrumentation->instrument_options; queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); + /* Report workers' query for monitoring purposes */ + debug_query_string = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT); + + pgstat_report_activity(STATE_RUNNING, shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT)); + /* Prepare to track buffer usage during query execution. */ InstrStartParallelQuery(); diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index e49feff..51d231d 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -139,6 +139,7 @@ CreateExecutorState(void) estate->es_epqTuple = NULL; estate->es_epqTupleSet = NULL; estate->es_epqScanDone = NULL; + estate->es_queryString = NULL; /* * Return the executor state structure diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 42c6c58..a4046bd 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -434,6 +434,7 @@ typedef struct EState /* The per-query shared memory area to use for parallel execution. */ struct dsa_area *es_query_dsa; + const char *es_queryString; /* Query string for passing to workers */ } EState;