diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile index 21721b4..823d5c3 100644 --- a/src/backend/access/Makefile +++ b/src/backend/access/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/access top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist transam +SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc shmmq spgist transam include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/shmmq/Makefile b/src/backend/access/shmmq/Makefile new file mode 100644 index 0000000..aeae8d9 --- /dev/null +++ b/src/backend/access/shmmq/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/shmmq +# +# IDENTIFICATION +# src/backend/access/shmmq/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/shmmq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = shmmqam.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/shmmq/shmmqam.c b/src/backend/access/shmmq/shmmqam.c new file mode 100644 index 0000000..7be7ba8 --- /dev/null +++ b/src/backend/access/shmmq/shmmqam.c @@ -0,0 +1,374 @@ +/*------------------------------------------------------------------------- + * + * shmmqam.c + * shared memory queue access method code + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/shmmq/shmmqam.c + * + * + * INTERFACE ROUTINES + * shm_getnext - retrieve next tuple in queue + * + * NOTES + * This file contains the shmmq_ routines which implement + * the POSTGRES shared memory access method used for all POSTGRES + * relations. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/shmmqam.h" +#include "access/tupdesc.h" +#include "fmgr.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "utils/lsyscache.h" + + +static HeapTuple +form_result_tuple(worker_result resultState, TupleDesc tupdesc, + StringInfo msg); + +/* + * Indicate that an error came from a particular worker. + */ +static void +worker_error_callback(void *arg) +{ + pid_t pid = * (pid_t *) arg; + + errcontext("worker backend, pid %d", pid); +} + +/* + * shm_beginscan - + * Initializes the shared memory scan descriptor to retrieve tuples + * from worker backends. + */ +ShmScanDesc +shm_beginscan(int num_queues) +{ + ShmScanDesc shmscan; + + shmscan = palloc(sizeof(ShmScanDescData)); + + shmscan->num_shm_queues = num_queues; + shmscan->ss_cqueue = -1; + shmscan->shmscan_inited = false; + + return shmscan; +} + +/* + * ExecInitWorkerResult - + * Initializes the result state to retrieve tuples from worker backends. + */ +worker_result +ExecInitWorkerResult(TupleDesc tupdesc) +{ + worker_result workerResult; + int i; + int natts = tupdesc->natts; + + workerResult = palloc0(sizeof(worker_result_state)); + workerResult->receive_functions = palloc(sizeof(FmgrInfo) * natts); + workerResult->typioparams = palloc(sizeof(Oid) * natts); + + for (i = 0; i < natts; ++i) + { + Oid receive_function_id; + + getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, + &receive_function_id, + &workerResult->typioparams[i]); + fmgr_info(receive_function_id, &workerResult->receive_functions[i]); + } + + return workerResult; +} + + +/* + * shm_getnext - + * Get the next tuple from shared memory queue. This function + * is reponsible for fetching tuples from all the queues associated + * with worker backends used in parallel sequential scan. + */ +HeapTuple +shm_getnext(ShmScanDesc shmScan, worker_result resultState, + shm_mq_handle **responseq, TupleDesc tupdesc) +{ + shm_mq_result res; + char msgtype; + Size nbytes; + void *data; + StringInfoData msg; + int32 pid = 1234; + int queueId = 0; + + + /*state = palloc0(sizeof(worker_result_state)); + state->receive_functions = palloc(sizeof(FmgrInfo) * natts); + state->typioparams = palloc(sizeof(Oid) * natts); + + for (i = 0; i < natts; ++i) + { + Oid receive_function_id; + + getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, + &receive_function_id, + &state->typioparams[i]); + fmgr_info(receive_function_id, &state->receive_functions[i]); + }*/ + + /* + * calculate next starting queue used for fetching tuples + */ + if(!shmScan->shmscan_inited) + { + shmScan->shmscan_inited = true; + Assert(shmScan->num_shm_queues > 0); + queueId = 0; + --shmScan->num_shm_queues; + } + else + queueId = shmScan->ss_cqueue; + + /* Initialize message buffer. */ + initStringInfo(&msg); + + /* Read and processes messages from the shared memory queues. */ + for(;;) + { + for (;;) + { + /* + * mark current queue used for fetching tuples, this is used + * to fetch consecutive tuples from queue used in previous + * fetch. + */ + shmScan->ss_cqueue = queueId; + + /* Get next message. */ + res = shm_mq_receive(responseq[queueId], &nbytes, &data, false); + if (res != SHM_MQ_SUCCESS) + break; + + /* + * Message-parsing routines operate on a null-terminated StringInfo, + * so we must construct one. + */ + resetStringInfo(&msg); + enlargeStringInfo(&msg, nbytes); + msg.len = nbytes; + memcpy(msg.data, data, nbytes); + msg.data[nbytes] = '\0'; + msgtype = pq_getmsgbyte(&msg); + + /* Dispatch on message type. */ + switch (msgtype) + { + case 'E': + case 'N': + { + ErrorData edata; + ErrorContextCallback context; + + /* Parse ErrorResponse or NoticeResponse. */ + pq_parse_errornotice(&msg, &edata); + + /* + * Limit the maximum error level to ERROR. We don't want + * a FATAL inside the backend worker to kill the user + * session. + */ + if (edata.elevel > ERROR) + edata.elevel = ERROR; + + /* + * Rethrow the error with an appropriate context method. + * On error, we need to ensure that master backend stop + * all other workers before propagating the error, so + * we need to pass the pid's of all workers, so that same + * can be done in error callback. + * XXX - For now, I am just sending some random number, this + * needs to be fixed. + */ + context.callback = worker_error_callback; + context.arg = (void *) &pid; + context.previous = error_context_stack; + error_context_stack = &context; + ThrowErrorData(&edata); + error_context_stack = context.previous; + + break; + } + case 'A': + { + /* Propagate NotifyResponse. */ + pq_putmessage(msg.data[0], &msg.data[1], nbytes - 1); + break; + } + case 'T': + { + int16 natts = pq_getmsgint(&msg, 2); + int16 i; + + if (resultState->has_row_description) + elog(ERROR, "multiple RowDescription messages"); + resultState->has_row_description = true; + if (natts != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("worker result rowtype does not match " + "the specified FROM clause rowtype"))); + + for (i = 0; i < natts; ++i) + { + Oid type_id; + + (void) pq_getmsgstring(&msg); /* name */ + (void) pq_getmsgint(&msg, 4); /* table OID */ + (void) pq_getmsgint(&msg, 2); /* table attnum */ + type_id = pq_getmsgint(&msg, 4); /* type OID */ + (void) pq_getmsgint(&msg, 2); /* type length */ + (void) pq_getmsgint(&msg, 4); /* typmod */ + (void) pq_getmsgint(&msg, 2); /* format code */ + + if (type_id != tupdesc->attrs[i]->atttypid) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + + pq_getmsgend(&msg); + + break; + } + case 'D': + { + /* Handle DataRow message. */ + HeapTuple result; + + result = form_result_tuple(resultState, tupdesc, &msg); + return result; + } + case 'C': + { + /* + * Handle CommandComplete message. Ignore tags sent by + * worker backend as we are anyway going to use tag of + * master backend for sending the same to client. + */ + (void) pq_getmsgstring(&msg); + break; + } + case 'G': + case 'H': + case 'W': + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY protocol not allowed in worker"))); + } + + case 'Z': + { + /* Handle ReadyForQuery message. */ + resultState->complete = true; + break; + } + default: + elog(WARNING, "unknown message type: %c (%zu bytes)", + msg.data[0], nbytes); + break; + } + } + + /* Check whether the connection was broken prematurely. */ + if (!resultState->complete) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("lost connection to worker process with PID %d", + pid))); + + /* + * if we have exhausted data from all worker queues, then terminate + * processing data from queues. + */ + if (shmScan->num_shm_queues <=0) + break; + else + { + ++queueId; + --shmScan->num_shm_queues; + resultState->has_row_description = false; + } + } + + return NULL; +} + +/* + * form_result_tuple - + * Parse a DataRow message and form a result tuple. + */ +static HeapTuple +form_result_tuple(worker_result resultState, TupleDesc tupdesc, + StringInfo msg) +{ + /* Handle DataRow message. */ + int16 natts = pq_getmsgint(msg, 2); + int16 i; + Datum *values = NULL; + bool *isnull = NULL; + StringInfoData buf; + + if (!resultState->has_row_description) + elog(ERROR, "DataRow not preceded by RowDescription"); + if (natts != tupdesc->natts) + elog(ERROR, "malformed DataRow"); + if (natts > 0) + { + values = palloc(natts * sizeof(Datum)); + isnull = palloc(natts * sizeof(bool)); + } + initStringInfo(&buf); + + for (i = 0; i < natts; ++i) + { + int32 bytes = pq_getmsgint(msg, 4); + + if (bytes < 0) + { + values[i] = ReceiveFunctionCall(&resultState->receive_functions[i], + NULL, + resultState->typioparams[i], + tupdesc->attrs[i]->atttypmod); + isnull[i] = true; + } + else + { + resetStringInfo(&buf); + appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, bytes), bytes); + values[i] = ReceiveFunctionCall(&resultState->receive_functions[i], + &buf, + resultState->typioparams[i], + tupdesc->attrs[i]->atttypmod); + isnull[i] = false; + } + } + + pq_getmsgend(msg); + + return heap_form_tuple(tupdesc, values, isnull); +} \ No newline at end of file diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 332f04a..f158583 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -714,6 +714,7 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -910,6 +911,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_SeqScan: pname = sname = "Seq Scan"; break; + case T_ParallelSeqScan: + pname = sname = "Parallel Seq Scan"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1059,6 +1063,7 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: case T_BitmapHeapScan: case T_TidScan: case T_SubqueryScan: @@ -1325,6 +1330,16 @@ ExplainNode(PlanState *planstate, List *ancestors, show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); break; + case T_ParallelSeqScan: + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Number of Workers", + ((ParallelSeqScan *) plan)->num_workers, es); + ExplainPropertyInteger("Number of Blocks Per Workers", + ((ParallelSeqScan *) plan)->num_blocks_per_worker, es); + break; case T_FunctionScan: if (es->verbose) { @@ -2142,6 +2157,7 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es) switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index af707b0..9a8ca75 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -21,7 +21,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execJunk.o execMain.o \ nodeLimit.o nodeLockRows.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ - nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ + nodeSeqscan.o nodeParallelSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \ nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index e27c062..a28e74e 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -100,6 +100,7 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodeParallelSeqscan.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSeqscan.h" @@ -190,6 +191,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_ParallelSeqScan: + result = (PlanState *) ExecInitParallelSeqScan((ParallelSeqScan *) node, + estate, eflags); + break; + case T_IndexScan: result = (PlanState *) ExecInitIndexScan((IndexScan *) node, estate, eflags); @@ -406,6 +412,10 @@ ExecProcNode(PlanState *node) result = ExecSeqScan((SeqScanState *) node); break; + case T_ParallelSeqScanState: + result = ExecParallelSeqScan((ParallelSeqScanState *) node); + break; + case T_IndexScanState: result = ExecIndexScan((IndexScanState *) node); break; @@ -644,6 +654,10 @@ ExecEndNode(PlanState *node) ExecEndSeqScan((SeqScanState *) node); break; + case T_ParallelSeqScanState: + ExecEndParallelSeqScan((ParallelSeqScanState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; diff --git a/src/backend/executor/nodeParallelSeqscan.c b/src/backend/executor/nodeParallelSeqscan.c new file mode 100644 index 0000000..3d651b5 --- /dev/null +++ b/src/backend/executor/nodeParallelSeqscan.c @@ -0,0 +1,441 @@ +/*------------------------------------------------------------------------- + * + * nodeParallelSeqscan.c + * Support routines for parallel sequential scans of relations. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeParallelSeqscan.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * ExecParallelSeqScan sequentially scans a relation. + * ExecSeqNext retrieve next tuple in sequential order. + * ExecInitParallelSeqScan creates and initializes a parallel seqscan node. + * ExecEndParallelSeqScan releases any storage allocated. + */ +#include "postgres.h" + +#include "access/relscan.h" +#include "access/shmmqam.h" +#include "commands/dbcommands.h" +#include "executor/execdebug.h" +#include "executor/nodeSeqscan.h" +#include "executor/nodeParallelSeqscan.h" +#include "postmaster/backendworker.h" +#include "utils/rel.h" + + + +/* ---------------------------------------------------------------- + * Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * SeqNext + * + * This is a workhorse for ExecParallelSeqScan + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +ParallelSeqNext(ParallelSeqScanState *node) +{ + HeapTuple tuple; + HeapScanDesc scandesc; + EState *estate; + ScanDirection direction; + TupleTableSlot *slot; + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + slot = node->ss.ss_ScanTupleSlot; + + /* + * get the next tuple from the table based on result tuple descriptor. + */ + tuple = shm_getnext(node->pss_currentShmScanDesc, node->pss_workerResult, + node->responseq, + node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor); + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot and return the slot. Note: we pass 'false' because + * tuples returned by heap_getnext() are pointers onto disk pages and were + * not created with palloc() and so should not be pfree()'d. Note also + * that ExecStoreTuple will increment the refcount of the buffer; the + * refcount will not be dropped until the tuple table slot is cleared. + */ + if (tuple) + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->rs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + else + ExecClearTuple(slot); + + return slot; +} + +/* + * ParallelSeqRecheck -- access method routine to recheck a tuple in EvalPlanQual + */ +static bool +ParallelSeqRecheck(SeqScanState *node, TupleTableSlot *slot) +{ + /* + * Note that unlike IndexScan, ParallelSeqScan never use keys in + * heap_beginscan (and this is very bad) - so, here we do not check + * are keys ok or not. + */ + return true; +} + +/* ---------------------------------------------------------------- + * InitParallelScanRelation + * + * Set up to access the scan relation. + * ---------------------------------------------------------------- + */ +static void +InitParallelScanRelation(SeqScanState *node, EState *estate, int eflags) +{ + Relation currentRelation; + HeapScanDesc currentScanDesc; + + /* + * get the relation object id from the relid'th entry in the range table, + * open that relation and acquire appropriate lock on it. + */ + currentRelation = ExecOpenScanRelation(estate, + ((SeqScan *) node->ps.plan)->scanrelid, + eflags); + + /* initialize a heapscan */ + currentScanDesc = heap_beginscan(currentRelation, + estate->es_snapshot, + 0, + NULL); + + node->ss_currentRelation = currentRelation; + node->ss_currentScanDesc = currentScanDesc; + + /* and report the scan tuple slot's rowtype */ + ExecAssignScanType(node, RelationGetDescr(currentRelation)); +} + + +/* ---------------------------------------------------------------- + * ExecInitParallelSeqScan + * ---------------------------------------------------------------- + */ +ParallelSeqScanState * +ExecInitParallelSeqScan(ParallelSeqScan *node, EState *estate, int eflags) +{ + ParallelSeqScanState *parallelscanstate; + ShmScanDesc currentShmScanDesc; + worker_result workerResult; + + /* + * Once upon a time it was possible to have an outerPlan of a SeqScan, but + * not any more. + */ + Assert(outerPlan(node) == NULL); + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + parallelscanstate = makeNode(ParallelSeqScanState); + parallelscanstate->ss.ps.plan = (Plan *) node; + parallelscanstate->ss.ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, ¶llelscanstate->ss.ps); + + /* + * initialize child expressions + */ + parallelscanstate->ss.ps.targetlist = (List *) + ExecInitExpr((Expr *) node->scan.plan.targetlist, + (PlanState *) parallelscanstate); + parallelscanstate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->scan.plan.qual, + (PlanState *) parallelscanstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, ¶llelscanstate->ss.ps); + ExecInitScanTupleSlot(estate, ¶llelscanstate->ss); + + /* + * initialize scan relation + */ + InitParallelScanRelation(¶llelscanstate->ss, estate, eflags); + + parallelscanstate->ss.ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(¶llelscanstate->ss.ps); + ExecAssignScanProjectionInfo(¶llelscanstate->ss); + + /* Initialize the workers required to perform parallel scan. */ + InitiateWorkers(parallelscanstate->ss.ss_currentRelation->rd_id, + node->scan.plan.targetlist, + ¶llelscanstate->responseq, + ¶llelscanstate->seg, + node->num_blocks_per_worker, + node->num_workers); + + + /* + * use result tuple descriptor to fetch data from shared memory queues + * as the worker backends would have put the data after projection. + * number of queue's must be equal to number of worker backends. + */ + currentShmScanDesc = shm_beginscan(node->num_workers); + workerResult = ExecInitWorkerResult(parallelscanstate->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor); + + parallelscanstate->pss_currentShmScanDesc = currentShmScanDesc; + parallelscanstate->pss_workerResult = workerResult; + + return parallelscanstate; +} + +/* ---------------------------------------------------------------- + * ExecParallelSeqScan(node) + * + * Scans the relation sequentially from multiple workers and returns + * the next qualifying tuple. + * We call the ExecScan() routine and pass it the appropriate + * access method functions. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecParallelSeqScan(ParallelSeqScanState *node) +{ + return ExecScan((ScanState *) &node->ss, + (ExecScanAccessMtd) ParallelSeqNext, + (ExecScanRecheckMtd) ParallelSeqRecheck); +} + +/* ---------------------------------------------------------------- + * ExecEndParallelSeqScan + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndParallelSeqScan(ParallelSeqScanState *node) +{ + Relation relation; + HeapScanDesc scanDesc; + + /* + * get information from node + */ + relation = node->ss.ss_currentRelation; + scanDesc = node->ss.ss_currentScanDesc; + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ss.ps); + + /* + * clean out the tuple table + */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + /* + * close heap scan + */ + heap_endscan(scanDesc); + + /* + * close the heap relation. + */ + ExecCloseScanRelation(relation); + + /* detach from dynamic shared memory. */ + dsm_detach(node->seg); +} + +/* + * EstimateScanRelationIdSpace: + * Returns the size needed to store the ScanRelaionId for the current query + */ +Size +EstimateScanRelationIdSpace(Oid relId) +{ + Size size; + + size = sizeof(relId); + + return size; +} + +/* + * SerializeScanRelationId: + * Dumps the relatinId onto the memory location at start_address. + */ +void +SerializeScanRelationId(Oid relId, Size maxsize, char *start_address) +{ + memcpy(start_address, &relId, maxsize); +} + +/* + * RestoreScanRelationId: + * Reads the relationId from the specified address, restore it into given + * relationId. + */ +void +RestoreScanRelationId(Oid *relId, char *start_address) +{ + memcpy(relId, start_address, sizeof(Oid)); +} + +/* + * EstimateTargetListSpace: + * Returns the size needed to store the Targetlist for the current query + */ +Size +EstimateTargetListSpace(List *targetList) +{ + Size size; + + /* Add space reqd for saving the data size of the targetlist */ + size = sizeof(Size); + + size = add_size(size, + mul_size(targetList->length, sizeof(TargetEntry))); + + /* + * For now, lets just support for Var type of nodes. + * + * FIXME - we need to traverse target list and allocate depending + * on the node type. + */ + /*size = add_size(size, mul_size(targetList->length, sizeof(Expr)));*/ + size = add_size(size, mul_size(targetList->length, sizeof(Var))); + + /* + * Account for column names, we could get exact length for each column + * name, however as NAMEDATALEN is not too big, this seems okay. + */ + size = add_size(size, mul_size(targetList->length, NAMEDATALEN)); + + return size; +} + +/* + * SerializeTargetList: + * Dumps the each target entry onto the memory location at start_address. + */ +void +SerializeTargetList(List *targetList, Size maxsize, char *start_address) +{ + Size targetListSize; + char *curptr; + ListCell *l; + TargetEntry *srctargetEntry; + TargetEntry *desttargetEntry; + + targetListSize = targetList->length; + + /* copy target list size */ + memcpy(start_address, &targetListSize, sizeof(targetListSize)); + curptr = start_address + sizeof(targetListSize); + maxsize -= sizeof(targetListSize); + + /* copy each target list entry */ + foreach(l, (List *) targetList) + { + maxsize -= sizeof(TargetEntry); + if (maxsize < 0) + elog(ERROR, "not enough space to serialize given target list"); + srctargetEntry = (TargetEntry *) lfirst(l); + + desttargetEntry = (TargetEntry *)curptr; + memcpy(desttargetEntry, srctargetEntry, sizeof(TargetEntry)); + + /* + * For now, lets just support for Var type of nodes. + * + * FIXME - we need to traverse target list and serialize depending + * on the node type. + */ + desttargetEntry->expr = (Expr*) ((char*) desttargetEntry + sizeof(TargetEntry)); + memcpy(desttargetEntry->expr, srctargetEntry->expr, sizeof(Var)); + desttargetEntry->resname = + (char*) ((char*) desttargetEntry + sizeof(TargetEntry) + sizeof(Var)); + memcpy(desttargetEntry->resname, (char*) srctargetEntry->resname, + strlen(srctargetEntry->resname)+1); + + curptr += sizeof(TargetEntry); + curptr += sizeof(Var); + curptr += sizeof(NAMEDATALEN); + } +} + +/* + * RestoreTargetList: + * Reads the targetlist from the specified address, restore it into given + * targetlist. + */ +void +RestoreTargetList(List **targetList, char *start_address) +{ + Size targetListSize; + char *curptr; + char *colname; + TargetEntry *srctargetEntry; + TargetEntry *desttargetEntry; + + memcpy(&targetListSize, start_address, sizeof(targetListSize)); + curptr = start_address + sizeof(targetListSize); + + while (targetListSize-- > 0) + { + desttargetEntry = makeNode(TargetEntry); + srctargetEntry = (TargetEntry *)curptr; + memcpy(desttargetEntry, srctargetEntry, sizeof(TargetEntry)); + + desttargetEntry->expr = (Expr*) copyObject((Expr*)((char*)srctargetEntry + sizeof(TargetEntry))); + + /* + * For now, lets just support for Var type of nodes. + * + * FIXME - we need to traverse target list and restore depending + * on the node type. + */ + colname = (char*)((char*)srctargetEntry + sizeof(TargetEntry) + sizeof(Var)); + + desttargetEntry->resname = colname ? pstrdup(colname) : (char*) NULL; + + *targetList = lappend(*targetList, desttargetEntry); + + curptr += sizeof(TargetEntry); + curptr += sizeof(Var); + curptr += sizeof(NAMEDATALEN); + } +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 53cfda5..131cfc5 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -139,6 +139,22 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags) 0, NULL); + /* + * set the scan limits, if requested by plan. If the end block + * is not specified, then scan all the blocks till end. + */ + if (((SeqScan *) node->ps.plan)->startblock != InvalidBlockNumber && + ((SeqScan *) node->ps.plan)->endblock != InvalidBlockNumber) + heap_setscanlimits(currentScanDesc, + ((SeqScan *) node->ps.plan)->startblock, + (((SeqScan *) node->ps.plan)->endblock - + ((SeqScan *) node->ps.plan)->startblock)); + else if (((SeqScan *) node->ps.plan)->startblock != InvalidBlockNumber) + heap_setscanlimits(currentScanDesc, + ((SeqScan *) node->ps.plan)->startblock, + (currentScanDesc->rs_nblocks - + ((SeqScan *) node->ps.plan)->startblock)); + node->ss_currentRelation = currentRelation; node->ss_currentScanDesc = currentScanDesc; diff --git a/src/backend/optimizer/path/Makefile b/src/backend/optimizer/path/Makefile index 6864a62..6e462b1 100644 --- a/src/backend/optimizer/path/Makefile +++ b/src/backend/optimizer/path/Makefile @@ -13,6 +13,6 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = allpaths.o clausesel.o costsize.o equivclass.o indxpath.o \ - joinpath.o joinrels.o pathkeys.o tidpath.o + joinpath.o joinrels.o pathkeys.o parallelpath.o tidpath.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index c97355e..3a0583a 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -410,6 +410,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* Consider sequential scan */ add_path(rel, create_seqscan_path(root, rel, required_outer)); + /* Consider parallel scans */ + create_parallelscan_paths(root, rel); + /* Consider index scans */ create_index_paths(root, rel); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 659daa2..0296323 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -106,6 +106,8 @@ int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE; Cost disable_cost = 1.0e10; +int parallel_seqscan_degree = 0; + bool enable_seqscan = true; bool enable_indexscan = true; bool enable_indexonlyscan = true; @@ -219,6 +221,63 @@ cost_seqscan(Path *path, PlannerInfo *root, } /* + * cost_parallelseqscan + * Determines and returns the cost of scanning a relation parallely. + * + * 'baserel' is the relation to be scanned + * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + */ +void +cost_parallelseqscan(ParallelSeqPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, int nWorkers) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + double spc_seq_page_cost; + QualCost qpqual_cost; + Cost cpu_per_tuple; + + /* Should only be applied to base relations */ + Assert(baserel->relid > 0); + Assert(baserel->rtekind == RTE_RELATION); + + /* Mark the path with the correct row estimate */ + if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = baserel->rows; + + if (!enable_seqscan) + startup_cost += disable_cost; + + /* fetch estimated page cost for tablespace containing table */ + get_tablespace_page_costs(baserel->reltablespace, + NULL, + &spc_seq_page_cost); + + /* + * disk costs + */ + run_cost += spc_seq_page_cost * baserel->pages; + + /* CPU costs */ + get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost); + + startup_cost += qpqual_cost.startup; + cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple; + run_cost += cpu_per_tuple * baserel->tuples; + + /* + * We simply assume that cost will be equally shared by parallel + * workers which might not be true especially for doing disk access. + * XXX - We would like to change these values based on some concrete + * tests. + */ + path->path.startup_cost = startup_cost / nWorkers; + path->path.total_cost = (startup_cost + run_cost) / nWorkers; +} + +/* * cost_index * Determines and returns the cost of scanning a relation using an index. * diff --git a/src/backend/optimizer/path/parallelpath.c b/src/backend/optimizer/path/parallelpath.c new file mode 100644 index 0000000..823abbe --- /dev/null +++ b/src/backend/optimizer/path/parallelpath.c @@ -0,0 +1,97 @@ +/*------------------------------------------------------------------------- + * + * parallelpath.c + * Routines to determine which conditions are usable for scanning + * a given relation, and create ParallelPaths accordingly. + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/optimizer/path/parallelpath.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" + + +/* + * IsTargetListContainNonVars - + * Check if target list contain non-var entries. + */ +static bool +IsTargetListContainNonVars(List *targetlist) +{ + ListCell *l; + + foreach(l, targetlist) + { + TargetEntry *te = (TargetEntry *) lfirst(l); + + if (!IsA(te, TargetEntry)) + continue; /* probably should never happen */ + if (!IsA(te->expr, Var)) + return true; + } + return false; +} + +/* + * create_parallelscan_paths + * Create paths corresponding to parallel scans of the given rel. + * Currently we only support parallel sequential scan. + * + * Candidate paths are added to the rel's pathlist (using add_path). + */ +void +create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel) +{ + int num_parallel_workers = 0; + + /* + * parallel scan is possible only if user has set + * parallel_seqscan_degree to value greater than 0. + */ + if (parallel_seqscan_degree <= 0) + return; + /* + * parallel scan is not supported for joins or queries containg quals. + * + * XXX - There is no reason for not to support quals, so it should be + * supportted in future versions of this patch. + */ + if (root->simple_rel_array_size > 2 || rel->baserestrictinfo != NULL) + return; + + /* parallel scan is supportted only for Select statements. */ + if (root->parse->commandType != CMD_SELECT) + return; + + /* + * parallel scan is not supported for non-var target list. + * + * XXX - This is to keep the implementation simple, we can do this + * in future. Here we are checking by passing root->parse->targetList + * instead of rel->reltargetlist because rel->targetlist always contains + * Vars (refer build_base_rel_tlists). + */ + if(IsTargetListContainNonVars(root->parse->targetList)) + return; + + /* + * There should be atleast one page to scan for each worker. + */ + if (parallel_seqscan_degree <= rel->pages) + num_parallel_workers = parallel_seqscan_degree; + else + num_parallel_workers = rel->pages; + + add_path(rel, (Path *) create_parallelseqscan_path(root, rel, + num_parallel_workers)); +} diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index bf8dbe0..6b54e1b 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -57,6 +57,9 @@ static Material *create_material_plan(PlannerInfo *root, MaterialPath *best_path static Plan *create_unique_plan(PlannerInfo *root, UniquePath *best_path); static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Scan *create_parallelseqscan_plan(PlannerInfo *root, + ParallelSeqPath *best_path, + List *tlist, List *scan_clauses); static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path, List *tlist, List *scan_clauses, bool indexonly); static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root, @@ -99,6 +102,9 @@ static List *order_qual_clauses(PlannerInfo *root, List *clauses); static void copy_path_costsize(Plan *dest, Path *src); static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); +static ParallelSeqScan *make_parallelseqscan(List *qptlist, List *qpqual, + Index scanrelid, int nworkers, + BlockNumber nblocksperworker); static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid, Oid indexid, List *indexqual, List *indexqualorig, List *indexorderby, List *indexorderbyorig, @@ -227,6 +233,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path) switch (best_path->pathtype) { case T_SeqScan: + case T_ParallelSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -342,6 +349,13 @@ create_scan_plan(PlannerInfo *root, Path *best_path) scan_clauses); break; + case T_ParallelSeqScan: + plan = (Plan *) create_parallelseqscan_plan(root, + (ParallelSeqPath *) best_path, + tlist, + scan_clauses); + break; + case T_IndexScan: plan = (Plan *) create_indexscan_plan(root, (IndexPath *) best_path, @@ -1132,6 +1146,71 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path, } /* + * create_worker_seqscan_plan + * Returns a seqscan plan for the base relation scanned by worker + * with restriction clauses 'scan_clauses' and targetlist 'tlist'. + */ +SeqScan * +create_worker_seqscan_plan(List *targetList, List *scan_clauses, + BlockNumber startBlock, BlockNumber endBlock) +{ + SeqScan *scan_plan; + + /* + * Pass scan_relid as 1, this is okay for now as sequence scan worker + * is allowed to operate on just one relation. + * XXX - we should ideally get scanrelid from master backend. + */ + scan_plan = make_seqscan(targetList, + scan_clauses, + 1); + + scan_plan->startblock = startBlock; + scan_plan->endblock = endBlock; + return scan_plan; +} + +/* + * create_parallelseqscan_plan + * Returns a seqscan plan for the base relation scanned by 'best_path' + * with restriction clauses 'scan_clauses' and targetlist 'tlist'. + */ +static Scan * +create_parallelseqscan_plan(PlannerInfo *root, ParallelSeqPath *best_path, + List *tlist, List *scan_clauses) +{ + Scan *scan_plan; + Index scan_relid = best_path->path.parent->relid; + + /* it should be a base rel... */ + Assert(scan_relid > 0); + Assert(best_path->path.parent->rtekind == RTE_RELATION); + + /* Sort clauses into best execution order */ + scan_clauses = order_qual_clauses(root, scan_clauses); + + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + scan_clauses = extract_actual_clauses(scan_clauses, false); + + /* Replace any outer-relation variables with nestloop params */ + if (best_path->path.param_info) + { + scan_clauses = (List *) + replace_nestloop_params(root, (Node *) scan_clauses); + } + + scan_plan = (Scan *) make_parallelseqscan(tlist, + scan_clauses, + scan_relid, + best_path->num_workers, + best_path->num_blocks_per_worker); + + copy_path_costsize(&scan_plan->plan, &best_path->path); + + return scan_plan; +} + +/* * create_indexscan_plan * Returns an indexscan plan for the base relation scanned by 'best_path' * with restriction clauses 'scan_clauses' and targetlist 'tlist'. @@ -3313,6 +3392,30 @@ make_seqscan(List *qptlist, plan->lefttree = NULL; plan->righttree = NULL; node->scanrelid = scanrelid; + node->startblock = InvalidBlockNumber; + node->endblock = InvalidBlockNumber; + + return node; +} + +static ParallelSeqScan * +make_parallelseqscan(List *qptlist, + List *qpqual, + Index scanrelid, + int nworkers, + BlockNumber nblocksperworker) +{ + ParallelSeqScan *node = makeNode(ParallelSeqScan); + Plan *plan = &node->scan.plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = NULL; + plan->righttree = NULL; + node->scan.scanrelid = scanrelid; + node->num_workers = nworkers; + node->num_blocks_per_worker = nblocksperworker; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index fb74d6b..49359e3 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -260,6 +260,55 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) return result; } +/* + * create_worker_seqscan_plannedstmt + * Returns a planned statement to be used by worker for execution. + * Ideally, master backend should form worker's planned statement + * and pass the same to worker, however for now master backend + * just passes the required information and PlannedStmt is then + * constructed by worker. + */ +PlannedStmt * +create_worker_seqscan_plannedstmt(worker_stmt *workerstmt) +{ + AclMode required_access = ACL_SELECT; + RangeTblEntry *rte; + SeqScan *scan_plan; + PlannedStmt *result; + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = workerstmt->relId; + rte->relkind = 'r'; + rte->requiredPerms = required_access; + + scan_plan = create_worker_seqscan_plan(workerstmt->targetList, NIL, + workerstmt->startBlock, + workerstmt->endBlock); + + /* build the PlannedStmt result */ + result = makeNode(PlannedStmt); + + result->commandType = CMD_SELECT; + result->queryId = 0; + result->hasReturning = 0; + result->hasModifyingCTE = 0; + result->canSetTag = 1; + result->transientPlan = 0; + result->planTree = (Plan*) scan_plan; + result->rtable = list_make1(rte); + result->resultRelations = NIL; + result->utilityStmt = NULL; + result->subplans = NIL; + result->rewindPlanIDs = NULL; + result->rowMarks = NIL; + result->relationOids = lappend_oid(result->relationOids, rte->relid);; + result->invalItems = NIL; + result->nParamExec = 0; + result->hasRowSecurity = false; + + return result; +} /*-------------------- * subquery_planner diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index e630d0b..220b92b 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -436,6 +436,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) switch (nodeTag(plan)) { case T_SeqScan: + case T_ParallelSeqScan: { SeqScan *splan = (SeqScan *) plan; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 319e8b2..ce3df40 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -706,6 +706,37 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) } /* + * create_parallelseqscan_path + * Creates a path corresponding to a parallel sequential scan, returning the + * pathnode. + */ +ParallelSeqPath * +create_parallelseqscan_path(PlannerInfo *root, RelOptInfo *rel, int nWorkers) +{ + ParallelSeqPath *pathnode = makeNode(ParallelSeqPath); + + pathnode->path.pathtype = T_ParallelSeqScan; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + false); + pathnode->path.pathkeys = NIL; /* seqscan has unordered result */ + + pathnode->num_workers = nWorkers; + /* + * Divide the work equally among all the workers, for cases + * where division is not equal (example if there are total + * 10 blocks and 3 workers, then as per below calculation each + * worker will scan 3 blocks), last worker will be responsible for + * scanning remaining blocks (refer exec_worker_message). + */ + pathnode->num_blocks_per_worker = rel->pages / nWorkers; + + cost_parallelseqscan(pathnode, root, rel, pathnode->path.param_info, nWorkers); + + return pathnode; +} + +/* * create_index_path * Creates a path node for an index scan. * diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c2321..f056bd5 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,8 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o +OBJS = autovacuum.o backendworker.o bgworker.o bgwriter.o checkpointer.o \ + fork_process.o pgarch.o pgstat.o postmaster.o startup.o syslogger.o \ + walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/backendworker.c b/src/backend/postmaster/backendworker.c new file mode 100644 index 0000000..3b796dd --- /dev/null +++ b/src/backend/postmaster/backendworker.c @@ -0,0 +1,579 @@ +/*------------------------------------------------------------------------- + * + * backendworker.c + * Support routines for setting up backend workers. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/postmaster/backendworker.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * InitiateWorkers Setup dynamic shared memory and parallel backend workers. + */ +#include "postgres.h" + +#include "commands/dbcommands.h" +#include "commands/async.h" +#include "executor/nodeParallelSeqscan.h" +#include "miscadmin.h" +#include "nodes/parsenodes.h" +#include "postmaster/backendworker.h" +#include "storage/ipc.h" +#include "storage/procsignal.h" +#include "storage/procarray.h" +#include "storage/shm_toc.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + + +#define SHM_PARALLEL_SCAN_QUEUE_SIZE 65536 + +/* + * This structure is stored in the dynamic shared memory segment. We use + * it to determine whether all workers started up OK and successfully + * attached to their respective shared message queues. + */ +typedef struct +{ + slock_t mutex; + int workers_total; + int workers_attached; + int workers_ready; +} shm_mq_header; + +/* Fixed-size data passed via our dynamic shared memory segment. */ +typedef struct worker_fixed_data +{ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; + NameData database; + NameData authenticated_user; +} worker_fixed_data; + +/* Private state maintained by the launching backend for IPC. */ +typedef struct worker_info +{ + pid_t pid; + Oid current_user_id; + dsm_segment *seg; + BackgroundWorkerHandle *handle; + shm_mq_handle *responseq; + bool consumed; +} worker_info; + +typedef struct +{ + int nworkers; + BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER]; +} worker_state; + + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define PG_WORKER_MAGIC 0x50674267 +#define PG_WORKER_KEY_HDR_DATA 0 +#define PG_WORKER_KEY_FIXED_DATA 1 +#define PG_WORKER_KEY_RELID 2 +#define PG_WORKER_KEY_TARGETLIST 3 +#define PG_WORKER_KEY_BLOCKS 4 +#define PG_WORKER_FIXED_NKEYS 5 + +void +exec_worker_message(Datum) __attribute__((noreturn)); + +static void +setup_dynamic_shared_memory(Oid relId, List *targetList, + shm_mq_handle ***responseq, + dsm_segment **segp, shm_mq_header **hdrp, + BlockNumber numBlocksPerWorker, int nWorkers); +static worker_state *setup_backend_workers(dsm_segment *seg, int nworkers); +static void cleanup_background_workers(dsm_segment *seg, Datum arg); +static void +wait_for_workers_to_become_ready(worker_state *wstate, + volatile shm_mq_header *hdr); +static bool check_worker_status(worker_state *wstate); +static void bkworker_sigterm_handler(SIGNAL_ARGS); + + +/* + * InitiateWorkers + * It sets up the required infrastructure for backend workers to + * perform execution and return results to the main backend. + */ +void +InitiateWorkers(Oid relId, List *targetList, shm_mq_handle ***responseqp, + dsm_segment **segp, BlockNumber numBlocksPerWorker, + int nWorkers) +{ + shm_mq_header *hdr; + worker_state *wstate; + int i; + + /* Create dynamic shared memory and table of contents. */ + setup_dynamic_shared_memory(relId, targetList, responseqp, + segp, &hdr, numBlocksPerWorker, nWorkers); + + /* Register backend workers. */ + wstate = setup_backend_workers(*segp, nWorkers); + + for (i = 0; i < nWorkers; ++i) + shm_mq_set_handle((*responseqp)[i], wstate->handle[i]); + + /* Wait for workers to become ready. */ + wait_for_workers_to_become_ready(wstate, hdr); +} + +/* + * Set up a dynamic shared memory segment. + * + * We set up a small control region that contains only a shm_mq_header, + * plus one region per message queue. There are as many message queues as + * the number of workers. + */ +static void +setup_dynamic_shared_memory(Oid relId, List *targetList, + shm_mq_handle ***responseqp, + dsm_segment **segp, shm_mq_header **hdrp, + BlockNumber numBlocksPerWorker, int nWorkers) +{ + Size segsize, relid_len, targetlist_len; + dsm_segment *seg; + shm_toc_estimator e; + shm_toc *toc; + worker_fixed_data *fdata; + char *relidp; + char *targetlistdata; + int i; + shm_mq *mq; + shm_mq_header *hdr; + BlockNumber *num_blocks_per_worker; + + /* Allocate memory for shared memory queue handles. */ + *responseqp = (shm_mq_handle**) palloc(nWorkers * sizeof(shm_mq_handle*)); + + /* Create dynamic shared memory and table of contents. */ + shm_toc_initialize_estimator(&e); + + shm_toc_estimate_chunk(&e, sizeof(shm_mq_header)); + + shm_toc_estimate_chunk(&e, sizeof(worker_fixed_data)); + + relid_len = EstimateScanRelationIdSpace(relId); + shm_toc_estimate_chunk(&e, relid_len); + + targetlist_len = EstimateTargetListSpace(targetList); + shm_toc_estimate_chunk(&e, targetlist_len); + + shm_toc_estimate_chunk(&e, sizeof(BlockNumber)); + + for (i = 0; i < nWorkers; ++i) + shm_toc_estimate_chunk(&e, (Size) SHM_PARALLEL_SCAN_QUEUE_SIZE); + + shm_toc_estimate_keys(&e, PG_WORKER_FIXED_NKEYS + nWorkers); + + segsize = shm_toc_estimate(&e); + + seg = dsm_create(segsize); + toc = shm_toc_create(PG_WORKER_MAGIC, dsm_segment_address(seg), + segsize); + + /* Set up the header region. */ + hdr = shm_toc_allocate(toc, sizeof(shm_mq_header)); + SpinLockInit(&hdr->mutex); + hdr->workers_total = nWorkers; + hdr->workers_attached = 0; + hdr->workers_ready = 0; + shm_toc_insert(toc, PG_WORKER_KEY_HDR_DATA, hdr); + + /* Store fixed-size data in dynamic shared memory. */ + fdata = shm_toc_allocate(toc, sizeof(worker_fixed_data)); + fdata->database_id = MyDatabaseId; + fdata->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context); + namestrcpy(&fdata->database, get_database_name(MyDatabaseId)); + namestrcpy(&fdata->authenticated_user, + GetUserNameFromId(fdata->authenticated_user_id)); + shm_toc_insert(toc, PG_WORKER_KEY_FIXED_DATA, fdata); + + /* Store scan relation id in dynamic shared memory. */ + relidp = shm_toc_allocate(toc, relid_len + 1); + SerializeScanRelationId(relId, relid_len, relidp); + shm_toc_insert(toc, PG_WORKER_KEY_RELID, relidp); + + /* Store target list in dynamic shared memory. */ + targetlistdata = shm_toc_allocate(toc, targetlist_len); + SerializeTargetList(targetList, targetlist_len, targetlistdata); + shm_toc_insert(toc, PG_WORKER_KEY_TARGETLIST, targetlistdata); + + /* Store blocks to be scanned by each worker in dynamic shared memory. */ + num_blocks_per_worker = shm_toc_allocate(toc, sizeof(BlockNumber)); + *num_blocks_per_worker = numBlocksPerWorker; + shm_toc_insert(toc, PG_WORKER_KEY_BLOCKS, num_blocks_per_worker); + + /* Establish one message queue per worker in dynamic shared memory. */ + for (i = 1; i <= nWorkers; ++i) + { + mq = shm_mq_create(shm_toc_allocate(toc, (Size) SHM_PARALLEL_SCAN_QUEUE_SIZE), + (Size) SHM_PARALLEL_SCAN_QUEUE_SIZE); + shm_toc_insert(toc, PG_WORKER_FIXED_NKEYS + i, mq); + shm_mq_set_receiver(mq, MyProc); + + /* + * Attach the queue before launching a worker, so that we'll automatically + * detach the queue if we error out. (Otherwise, the worker might sit + * there trying to write the queue long after we've gone away.) + */ + (*responseqp)[i-1] = shm_mq_attach(mq, seg, NULL); + } + + /* Return results to caller. */ + *segp = seg; + *hdrp = hdr; +} + +/* + * Register backend workers. + */ +static worker_state * +setup_backend_workers(dsm_segment *seg, int nWorkers) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + worker_state *wstate; + int i; + + /* + * We need the worker_state object and the background worker handles to + * which it points to be allocated in CurTransactionContext rather than + * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach + * hooks run. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* Create worker state object. */ + wstate = MemoryContextAlloc(TopTransactionContext, + offsetof(worker_state, handle) + + sizeof(BackgroundWorkerHandle *) * nWorkers); + wstate->nworkers = 0; + + /* + * Arrange to kill all the workers if we abort before or after all workers + * are finished hooking themselves up to the dynamic shared memory segment. + * + * XXX - For killing workers, we need to have mechanism with which it can be + * done before aborting the transaction. + */ + + on_dsm_detach(seg, cleanup_background_workers, + PointerGetDatum(wstate)); + + /* Configure a worker. */ + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = exec_worker_message; + snprintf(worker.bgw_name, BGW_MAXLEN, "backend_worker"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + /* set bgw_notify_pid, so we can detect if the worker stops */ + worker.bgw_notify_pid = MyProcPid; + + /* Register the workers. */ + for (i = 0; i < nWorkers; ++i) + { + if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i])) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + ++wstate->nworkers; + } + + /* All done. */ + MemoryContextSwitchTo(oldcontext); + return wstate; +} + +static void +wait_for_workers_to_become_ready(worker_state *wstate, + volatile shm_mq_header *hdr) +{ + bool save_set_latch_on_sigusr1; + bool result = false; + + save_set_latch_on_sigusr1 = set_latch_on_sigusr1; + set_latch_on_sigusr1 = true; + + PG_TRY(); + { + for (;;) + { + int workers_ready; + + /* If all the workers are ready, we have succeeded. */ + SpinLockAcquire(&hdr->mutex); + workers_ready = hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + if (workers_ready >= wstate->nworkers) + { + result = true; + break; + } + + /* If any workers (or the postmaster) have died, we have failed. */ + if (!check_worker_status(wstate)) + { + result = false; + break; + } + + /* Wait to be signalled. */ + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + /* Reset the latch so we don't spin. */ + ResetLatch(&MyProc->procLatch); + } + } + PG_CATCH(); + { + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + PG_RE_THROW(); + } + PG_END_TRY(); + + if (!result) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("one or more background workers failed to start"))); +} + +static bool +check_worker_status(worker_state *wstate) +{ + int n; + + /* If any workers (or the postmaster) have died, we have failed. */ + for (n = 0; n < wstate->nworkers; ++n) + { + BgwHandleStatus status; + pid_t pid; + + status = GetBackgroundWorkerPid(wstate->handle[n], &pid); + /*if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)*/ + /* + * XXX - Do we need to consider BGWH_STOPPED status, if directly return + * false for BGWH_STOPPED, it could very well be possble that worker has + * exited after completing the work in which case the caller of this won't + * wait for other worker's status and main backend will lead to error + * whereas everything is normal for such a case. + */ + if (status == BGWH_POSTMASTER_DIED) + return false; + } + + /* Otherwise, things still look OK. */ + return true; +} + +static void +cleanup_background_workers(dsm_segment *seg, Datum arg) +{ + worker_state *wstate = (worker_state *) arg; + + while (wstate->nworkers > 0) + { + --wstate->nworkers; + TerminateBackgroundWorker(wstate->handle[wstate->nworkers]); + } +} + + +/* + * exec_execute_message + * + * Process an "Execute" message for a portal + */ +void +exec_worker_message(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + worker_fixed_data *fdata; + char *relidp; + char *targetlistdata; + BlockNumber *num_blocks_per_worker; + BlockNumber start_block; + BlockNumber end_block; + shm_mq *mq; + shm_mq_handle *responseq; + int myworkernumber; + volatile shm_mq_header *hdr; + Oid relId; + List *targetList = NIL; + PGPROC *registrant; + worker_stmt *workerstmt; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, bkworker_sigterm_handler); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "backend_worker"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "backend worker", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + /*while(1) + { + }*/ + + /* Connect to the dynamic shared memory segment. */ + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PG_WORKER_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Find data structures in dynamic shared memory. */ + hdr = shm_toc_lookup(toc, PG_WORKER_KEY_HDR_DATA); + fdata = shm_toc_lookup(toc, PG_WORKER_KEY_FIXED_DATA); + relidp = shm_toc_lookup(toc, PG_WORKER_KEY_RELID); + targetlistdata = shm_toc_lookup(toc, PG_WORKER_KEY_TARGETLIST); + num_blocks_per_worker = shm_toc_lookup(toc, PG_WORKER_KEY_BLOCKS); + + /* + * Acquire a worker number. + * + * Our worker number gives our identity: there may be just one + * worker involved in this parallel operation, or there may be many. + */ + SpinLockAcquire(&hdr->mutex); + myworkernumber = ++hdr->workers_attached; + SpinLockRelease(&hdr->mutex); + if (myworkernumber > hdr->workers_total) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many message queue testing workers already"))); + + mq = shm_toc_lookup(toc, PG_WORKER_FIXED_NKEYS + myworkernumber); + shm_mq_set_sender(mq, MyProc); + responseq = shm_mq_attach(mq, seg, NULL); + + end_block = myworkernumber * (*num_blocks_per_worker); + start_block = end_block - (*num_blocks_per_worker); + + /* + * Indicate that we're fully initialized and ready to begin the main part + * of the parallel operation. + * + * Once we signal that we're ready, the user backend is entitled to assume + * that our on_dsm_detach callbacks will fire before we disconnect from + * the shared memory segment and exit. Generally, that means we must have + * attached to all relevant dynamic shared memory data structures by now. + */ + SpinLockAcquire(&hdr->mutex); + ++hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid); + if (registrant == NULL) + { + elog(DEBUG1, "registrant backend has exited prematurely"); + proc_exit(1); + } + SetLatch(®istrant->procLatch); + + + /* Redirect protocol messages to responseq. */ + pq_redirect_to_shm_mq(mq, responseq); + + /* + * Initialize our user and database ID based on the strings version of + * the data, and then go back and check that we actually got the database + * and user ID that we intended to get. We do this because it's not + * impossible for the process that started us to die before we get here, + * and the user or database could be renamed in the meantime. We don't + * want to latch on the wrong object by accident. There should probably + * be a variant of BackgroundWorkerInitializeConnection that accepts OIDs + * rather than strings. + */ + BackgroundWorkerInitializeConnection(NameStr(fdata->database), + NameStr(fdata->authenticated_user)); + if (fdata->database_id != MyDatabaseId || + fdata->authenticated_user_id != GetAuthenticatedUserId()) + ereport(ERROR, + (errmsg("user or database renamed during backend worker startup"))); + + /* Restore RelationId and TargetList from main backend. */ + RestoreScanRelationId(&relId, relidp); + RestoreTargetList(&targetList, targetlistdata); + + /* Handle local_preload_libraries and session_preload_libraries. */ + process_session_preload_libraries(); + + /* Restore user ID and security context. */ + SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context); + + workerstmt = palloc(sizeof(worker_stmt)); + + workerstmt->relId = relId; + workerstmt->targetList = targetList; + workerstmt->startBlock = start_block; + + /* last worker should scan all the remaining blocks. */ + if (myworkernumber == hdr->workers_total) + workerstmt->endBlock = InvalidBlockNumber; + else + workerstmt->endBlock = end_block; + + /* Execute the worker command. */ + exec_worker_stmt(workerstmt); + + ProcessCompletedNotifies(); + + /* Signal that we are done. */ + ReadyForQuery(DestRemote); + + proc_exit(1); +} + +/* + * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just + * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right + * thing. + */ +static void +bkworker_sigterm_handler(SIGNAL_ARGS) +{ + int save_errno = errno; + + if (MyProc) + SetLatch(&MyProc->procLatch); + + if (!proc_exit_inprogress) + { + InterruptPending = true; + ProcDiePending = true; + } + + errno = save_errno; +} \ No newline at end of file diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 6220a8e..11db15e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -99,6 +99,7 @@ #include "miscadmin.h" #include "pg_getopt.h" #include "pgstat.h" +#include "optimizer/cost.h" #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/fork_process.h" @@ -830,6 +831,12 @@ PostmasterMain(int argc, char *argv[]) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\", or \"logical\""))); + if (parallel_seqscan_degree >= MaxConnections) + { + write_stderr("%s: parallel_scan_degree must be less than max_connections\n", progname); + ExitPostmaster(1); + } + /* * Other one-time internal sanity checks can go here, if they are fast. * (Put any slow processing further down, after postmaster.pid creation.) diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index cc62b2c..7de5e0e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -55,6 +55,7 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "postmaster/backendworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -1132,6 +1133,105 @@ exec_simple_query(const char *query_string) } /* + * execute_worker_stmt + * + * Execute the plan for backend worker. + */ +void +exec_worker_stmt(worker_stmt *workerstmt) +{ + Portal portal; + int16 format = 1; + DestReceiver *receiver; + bool isTopLevel = true; + PlannedStmt *planned_stmt; + MemoryContext oldcontext; + MemoryContext plancontext; + + set_ps_display("SELECT", false); + BeginCommand("SELECT", DestNone); + + /* Make sure we are in a transaction command */ + start_xact_command(); + + /* + * Unlike exec_simple_query(), in backend worker we won't allow + * transaction control statements, so we can allow plancontext + * to be created in TopTransaction context. + */ + plancontext = AllocSetContextCreate(CurrentMemoryContext, + "worker plan", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(plancontext); + + planned_stmt = create_worker_seqscan_plannedstmt(workerstmt); + /* + * Create unnamed portal to run the query or queries in. If there + * already is one, silently drop it. + */ + portal = CreatePortal("", true, true); + /* Don't display the portal in pg_cursors */ + portal->visible = false; + + /* + * We don't have to copy anything into the portal, because everything + * we are passing here is in MessageContext, which will outlive the + * portal anyway. + */ + PortalDefineQuery(portal, + NULL, + "", + "", + list_make1(planned_stmt), + NULL); + + /* + * Start the portal. No parameters here. + */ + PortalStart(portal, NULL, 0, InvalidSnapshot); + + /* We always use binary format, for efficiency. */ + PortalSetResultFormat(portal, 1, &format); + + receiver = CreateDestReceiver(DestRemote); + SetRemoteDestReceiverParams(receiver, portal); + + /* + * Only once the portal and destreceiver have been established can + * we return to the transaction context. All that stuff needs to + * survive an internal commit inside PortalRun! + */ + MemoryContextSwitchTo(oldcontext); + + /* + * Run the portal to completion, and then drop it (and the receiver). + */ + (void) PortalRun(portal, + FETCH_ALL, + isTopLevel, + receiver, + receiver, + NULL); + + (*receiver->rDestroy) (receiver); + + PortalDrop(portal, false); + + finish_xact_command(); + + /* + * Send appropriate CommandComplete to client. There is no + * need to send completion tag from worker as that won't be + * of any use considering the completiong tag of master backend + * will be used for sending to client. + */ + EndCommand("", DestRemote); +} + +/* * exec_parse_message * * Execute a "Parse" protocol message. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 23cbe90..69de3b8 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -619,6 +619,8 @@ const char *const config_group_names[] = gettext_noop("Statistics / Query and Index Statistics Collector"), /* AUTOVACUUM */ gettext_noop("Autovacuum"), + /* PARALLEL_QUERY */ + gettext_noop("parallel_seqscan_degree"), /* CLIENT_CONN */ gettext_noop("Client Connection Defaults"), /* CLIENT_CONN_STATEMENT */ @@ -2425,6 +2427,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"parallel_seqscan_degree", PGC_SUSET, PARALLEL_QUERY, + gettext_noop("Sets the maximum number of simultaneously running backend worker processes."), + NULL + }, + ¶llel_seqscan_degree, + 0, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 4a89cb7..3a6b037 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -494,6 +494,11 @@ # autovacuum, -1 means use # vacuum_cost_limit +#------------------------------------------------------------------------------ +# PARALLEL_QUERY PARAMETERS +#------------------------------------------------------------------------------ + +#parallel_seqscan_degree = 0 # max number of worker backend subprocesses #------------------------------------------------------------------------------ # CLIENT CONNECTION DEFAULTS diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index f2c7ca1..f88ef2e 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -20,7 +20,6 @@ #include "access/itup.h" #include "access/tupdesc.h" - typedef struct HeapScanDescData { /* scan parameters */ @@ -105,4 +104,13 @@ typedef struct SysScanDescData Snapshot snapshot; /* snapshot to unregister at end of scan */ } SysScanDescData; +/* struct for scanning shared memory queues */ +typedef struct ShmScanDescData +{ + /* scan current state */ + int num_shm_queues; /* number of shared memory queues used in scan. */ + int ss_cqueue; /* current queue # in scan, if any */ + bool shmscan_inited; /* false = scan not init'd yet */ +} ShmScanDescData; + #endif /* RELSCAN_H */ diff --git a/src/include/access/shmmqam.h b/src/include/access/shmmqam.h new file mode 100644 index 0000000..aa444bc --- /dev/null +++ b/src/include/access/shmmqam.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * shmmqam.h + * POSTGRES shared memory queue access method definitions. + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/shmmqam.h + * + *------------------------------------------------------------------------- + */ +#ifndef SHMMQAM_H +#define SHMMQAM_H + +#include "access/relscan.h" +#include "libpq/pqmq.h" + + +/* Private state maintained across calls to shm_getnext. */ +typedef struct worker_result_state +{ + FmgrInfo *receive_functions; + Oid *typioparams; + bool has_row_description; + bool complete; +} worker_result_state; + +typedef struct worker_result_state *worker_result; + +typedef struct ShmScanDescData *ShmScanDesc; + +extern worker_result ExecInitWorkerResult(TupleDesc tupdesc); +extern ShmScanDesc shm_beginscan(int num_queues); +extern HeapTuple shm_getnext(ShmScanDesc shmScan, worker_result resultState, + shm_mq_handle **responseq, TupleDesc tupdesc); + +#endif /* SHMMQAM_H */ diff --git a/src/include/executor/nodeParallelSeqscan.h b/src/include/executor/nodeParallelSeqscan.h new file mode 100644 index 0000000..b638a24 --- /dev/null +++ b/src/include/executor/nodeParallelSeqscan.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * nodeparallelSeqscan.h + * + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeParallelSeqscan.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEPARALLELSEQSCAN_H +#define NODEPARALLELSEQSCAN_H + +#include "nodes/execnodes.h" + +extern ParallelSeqScanState *ExecInitParallelSeqScan(ParallelSeqScan *node, EState *estate, int eflags); +extern TupleTableSlot *ExecParallelSeqScan(ParallelSeqScanState *node); +extern void ExecEndParallelSeqScan(ParallelSeqScanState *node); + +extern Size EstimateScanRelationIdSpace(Oid relId); +extern void SerializeScanRelationId(Oid relId, Size maxsize, + char *start_address); +extern void RestoreScanRelationId(Oid *relId, char *start_address); + +extern Size EstimateTargetListSpace(List *targetList); +extern void SerializeTargetList(List *targetList, Size maxsize, + char *start_address); +extern void RestoreTargetList(List **targetList, char *start_address); + +#endif /* NODEPARALLELSEQSCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 41b13b2..19ec043 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -16,9 +16,11 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/shmmqam.h" #include "executor/instrument.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/shm_mq.h" #include "utils/reltrigger.h" #include "utils/sortsupport.h" #include "utils/tuplestore.h" @@ -1212,6 +1214,23 @@ typedef struct ScanState typedef ScanState SeqScanState; /* + * ParallelScanState extends ScanState by storing additional information + * related to parallel workers. + * dsm_segment dynamic shared memory segment to setup worker queues + * responseq shared memory queues to receive data from workers + */ +typedef struct ParallelScanState +{ + ScanState ss; /* its first field is NodeTag */ + dsm_segment *seg; + shm_mq_handle **responseq; + ShmScanDesc pss_currentShmScanDesc; + worker_result pss_workerResult; +} ParallelScanState; + +typedef ParallelScanState ParallelSeqScanState; + +/* * These structs store information about index quals that don't have simple * constant right-hand sides. See comments for ExecIndexBuildScanKeys() * for discussion. diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index bc71fea..c48df6c 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -51,6 +51,7 @@ typedef enum NodeTag T_BitmapOr, T_Scan, T_SeqScan, + T_ParallelSeqScan, T_IndexScan, T_IndexOnlyScan, T_BitmapIndexScan, @@ -97,6 +98,7 @@ typedef enum NodeTag T_BitmapOrState, T_ScanState, T_SeqScanState, + T_ParallelSeqScanState, T_IndexScanState, T_IndexOnlyScanState, T_BitmapIndexScanState, @@ -217,6 +219,7 @@ typedef enum NodeTag T_IndexOptInfo, T_ParamPathInfo, T_Path, + T_ParallelSeqPath, T_IndexPath, T_BitmapHeapPath, T_BitmapAndPath, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 3e4f815..54efdc1 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -23,6 +23,7 @@ #include "nodes/bitmapset.h" #include "nodes/primnodes.h" #include "nodes/value.h" +#include "storage/block.h" #include "utils/lockwaitpolicy.h" /* Possible sources of a Query */ @@ -156,6 +157,14 @@ typedef struct Query * depends on to be semantically valid */ } Query; +/* worker statement required for execution. */ +typedef struct worker_stmt +{ + Oid relId; + List *targetList; + BlockNumber startBlock; + BlockNumber endBlock; +} worker_stmt; /**************************************************************************** * Supporting data structures for Parse Trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 7f9eaf0..0375ce1 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -18,6 +18,7 @@ #include "lib/stringinfo.h" #include "nodes/bitmapset.h" #include "nodes/primnodes.h" +#include "storage/block.h" #include "utils/lockwaitpolicy.h" @@ -269,6 +270,8 @@ typedef struct Scan { Plan plan; Index scanrelid; /* relid is index into the range table */ + BlockNumber startblock; /* block to start seq scan */ + BlockNumber endblock; /* block upto which scan has to be done */ } Scan; /* ---------------- @@ -278,6 +281,17 @@ typedef struct Scan typedef Scan SeqScan; /* ---------------- + * parallel sequential scan node + * ---------------- + */ +typedef struct ParallelSeqScan +{ + Scan scan; + int num_workers; + BlockNumber num_blocks_per_worker; +} ParallelSeqScan; + +/* ---------------- * index scan node * * indexqualorig is an implicitly-ANDed list of index qual expressions, each diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 810b9c8..3a38270 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -737,6 +737,13 @@ typedef struct Path /* pathkeys is a List of PathKey nodes; see above */ } Path; +typedef struct ParallelSeqPath +{ + Path path; + int num_workers; + BlockNumber num_blocks_per_worker; +} ParallelSeqPath; + /* Macro for extracting a path's parameterization relids; beware double eval */ #define PATH_REQ_OUTER(path) \ ((path)->param_info ? (path)->param_info->ppi_req_outer : (Relids) NULL) diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 75e2afb..a738c54 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -50,6 +50,7 @@ extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; extern PGDLLIMPORT int effective_cache_size; extern Cost disable_cost; +extern int parallel_seqscan_degree; extern bool enable_seqscan; extern bool enable_indexscan; extern bool enable_indexonlyscan; @@ -68,6 +69,8 @@ extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); +extern void cost_parallelseqscan(ParallelSeqPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, int nWorkers); extern void cost_index(IndexPath *path, PlannerInfo *root, double loop_count); extern void cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 26b17f5..901c792 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -32,6 +32,8 @@ extern bool add_path_precheck(RelOptInfo *parent_rel, extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); +extern ParallelSeqPath *create_parallelseqscan_path(PlannerInfo *root, + RelOptInfo *rel, int nWorkers); extern IndexPath *create_index_path(PlannerInfo *root, IndexOptInfo *index, List *indexclauses, diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index afa5f9b..d2a2760 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -46,6 +46,13 @@ extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel); #endif /* + * parallelpath.c + * routines to generate parallel scan paths + */ + +extern void create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel); + +/* * indxpath.c * routines to generate index paths */ diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 3fdc2cb..b382a27 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -41,6 +41,9 @@ extern Plan *optimize_minmax_aggregates(PlannerInfo *root, List *tlist, * prototypes for plan/createplan.c */ extern Plan *create_plan(PlannerInfo *root, Path *best_path); +extern SeqScan * +create_worker_seqscan_plan(List *targetList, List *scan_clauses, + BlockNumber startBlock, BlockNumber endBlock); extern SubqueryScan *make_subqueryscan(List *qptlist, List *qpqual, Index scanrelid, Plan *subplan); extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual, diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 1e942c5..752bd16 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -14,6 +14,7 @@ #ifndef PLANNER_H #define PLANNER_H +#include "nodes/parsenodes.h" #include "nodes/plannodes.h" #include "nodes/relation.h" @@ -29,6 +30,8 @@ extern PlannedStmt *planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern PlannedStmt *standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); +extern PlannedStmt * +create_worker_seqscan_plannedstmt(worker_stmt *workerstmt); extern Plan *subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, diff --git a/src/include/postmaster/backendworker.h b/src/include/postmaster/backendworker.h new file mode 100644 index 0000000..68f2023 --- /dev/null +++ b/src/include/postmaster/backendworker.h @@ -0,0 +1,29 @@ +/*-------------------------------------------------------------------- + * backendworker.h + * POSTGRES backend workers interface + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/postmaster/backendworker.h + *-------------------------------------------------------------------- + */ +#ifndef BACKENDWORKER_H +#define BACKENDWORKER_H + +/*--------------------------------------------------------------------- + * External module API. + *--------------------------------------------------------------------- + */ + +#include "libpq/pqmq.h" + +extern int parallel_seqscan_degree; +extern void InitiateWorkers(Oid relId, List *targetList, + shm_mq_handle ***responseqp, + dsm_segment **segp, + BlockNumber numBlocksPerWorker, + int nWorkers); + +#endif /* BACKENDWORKER_H */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 60f7532..6087b5e 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -83,5 +83,6 @@ extern void set_debug_options(int debug_flag, extern bool set_plan_disabling_options(const char *arg, GucContext context, GucSource source); extern const char *get_stats_option_name(const char *arg); +extern void exec_worker_stmt(worker_stmt *workerstmt); #endif /* TCOPPROT_H */ diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 47ff880..532d2db 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -85,6 +85,7 @@ enum config_group STATS_MONITORING, STATS_COLLECTOR, AUTOVACUUM, + PARALLEL_QUERY, CLIENT_CONN, CLIENT_CONN_STATEMENT, CLIENT_CONN_LOCALE,