diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 25a81e5..3e3aded 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -17,6 +17,7 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/heaptoast.h" +#include "access/parallel.h" #include "access/table.h" #include "access/toast_internals.h" #include "access/xact.h" @@ -116,7 +117,7 @@ toast_save_datum(Relation rel, Datum value, TupleDesc toasttupDesc; Datum t_values[3]; bool t_isnull[3]; - CommandId mycid = GetCurrentCommandId(true); + CommandId mycid = GetCurrentCommandId(!IsParallelWorker()); struct varlena *result; struct varatt_external toast_pointer; union diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1585861..94c8507 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2049,11 +2049,6 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * inserts in general except for the cases where inserts generate a new * CommandId (eg. inserts into a table having a foreign key column). */ - if (IsParallelWorker()) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot insert tuples in a parallel worker"))); - tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index dcaea71..f661e8c 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -822,19 +822,14 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, isdead = false; break; case HEAPTUPLE_INSERT_IN_PROGRESS: - /* * Since we hold exclusive lock on the relation, normally the * only way to see this is if it was inserted earlier in our * own transaction. However, it can happen in system * catalogs, since we tend to release write lock before commit - * there. Give a warning if neither case applies; but in any - * case we had better copy it. + * there. In any case we had better copy it. */ - if (!is_system_catalog && - !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data))) - elog(WARNING, "concurrent insert in progress within table \"%s\"", - RelationGetRelationName(OldHeap)); + /* treat as live */ isdead = false; break; @@ -1434,16 +1429,11 @@ heapam_index_build_range_scan(Relation heapRelation, * the only way to see this is if it was inserted earlier * in our own transaction. However, it can happen in * system catalogs, since we tend to release write lock - * before commit there. Give a warning if neither case - * applies. + * before commit there. */ xwait = HeapTupleHeaderGetXmin(heapTuple->t_data); if (!TransactionIdIsCurrentTransactionId(xwait)) { - if (!is_system_catalog) - elog(WARNING, "concurrent insert in progress within table \"%s\"", - RelationGetRelationName(heapRelation)); - /* * If we are performing uniqueness checks, indexing * such a tuple could lead to a bogus uniqueness diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index a4944fa..9d3f100 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -53,13 +53,6 @@ GetNewTransactionId(bool isSubXact) TransactionId xid; /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for new XIDs after that point. - */ - if (IsInParallelMode()) - elog(ERROR, "cannot assign TransactionIds during a parallel operation"); - - /* * During bootstrap initialization, we return the special bootstrap * transaction id. */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index af6afce..1442ec0 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -517,6 +517,37 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * AssignFullTransactionIdForWorker + * + * For parallel operations, transaction id of leader will be used by the workers. + */ +void +AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId) +{ + TransactionState s = CurrentTransactionState; + + Assert((IsInParallelMode() || IsParallelWorker())); + s->fullTransactionId = fullTransactionId; +} + +/* + * AssignCommandIdForWorker + * + * For parallel operations, command id of leader will be used by the workers. + */ +void +AssignCommandIdForWorker(CommandId commandId, bool used) +{ + Assert((IsInParallelMode() || IsParallelWorker())); + + /* this is global to a transaction, not subtransaction-local */ + if (used) + currentCommandIdUsed = true; + + currentCommandId = commandId; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. @@ -577,13 +608,6 @@ AssignTransactionId(TransactionState s) Assert(s->state == TRANS_INPROGRESS); /* - * Workers synchronize transaction state at the beginning of each parallel - * operation, so we can't account for new XIDs at this point. - */ - if (IsInParallelMode() || IsParallelWorker()) - elog(ERROR, "cannot assign XIDs during a parallel operation"); - - /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Mustn't recurse here, or we might get a stack * overflow if we're at the bottom of a huge stack of subtransactions none diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2e27e26..3fac2c2 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -173,7 +173,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * against performing unsafe operations in parallel mode, but this gives a * more user-friendly error message. */ - if ((XactReadOnly || IsInParallelMode()) && + if ((XactReadOnly || (IsInParallelMode() && queryDesc->plannedstmt->commandType != CMD_INSERT)) && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); @@ -235,7 +235,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: - estate->es_output_cid = GetCurrentCommandId(true); + estate->es_output_cid = GetCurrentCommandId(!IsParallelWorker()); break; default: diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 382e78f..88eb286 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "access/xact.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" @@ -77,6 +78,8 @@ typedef struct FixedParallelExecutorState dsa_pointer param_exec; int eflags; int jit_flags; + FullTransactionId full_transaction_id; + CommandId command_id; } FixedParallelExecutorState; /* @@ -173,18 +176,20 @@ ExecSerializePlan(Plan *plan, EState *estate) * PlannedStmt to start the executor. */ pstmt = makeNode(PlannedStmt); - pstmt->commandType = CMD_SELECT; + Assert(estate->es_plannedstmt->commandType == CMD_SELECT || + estate->es_plannedstmt->commandType == CMD_INSERT); + pstmt->commandType = (plan->type == T_ModifyTable) ? CMD_INSERT : CMD_SELECT; pstmt->queryId = UINT64CONST(0); - pstmt->hasReturning = false; - pstmt->hasModifyingCTE = false; + pstmt->hasReturning = estate->es_plannedstmt->hasReturning; + pstmt->hasModifyingCTE = estate->es_plannedstmt->hasModifyingCTE; pstmt->canSetTag = true; pstmt->transientPlan = false; pstmt->dependsOnRole = false; pstmt->parallelModeNeeded = false; pstmt->planTree = plan; pstmt->rtable = estate->es_range_table; - pstmt->resultRelations = NIL; - pstmt->rootResultRelations = NIL; + pstmt->resultRelations = estate->es_plannedstmt->resultRelations; + pstmt->rootResultRelations = estate->es_plannedstmt->rootResultRelations; pstmt->appendRelations = NIL; /* @@ -730,6 +735,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, fpes->param_exec = InvalidDsaPointer; fpes->eflags = estate->es_top_eflags; fpes->jit_flags = estate->es_jit_flags; + if (pcxt->nworkers > 0 && planstate->type == T_ModifyTableState) + { + fpes->full_transaction_id = GetCurrentFullTransactionId(); + fpes->command_id = GetCurrentCommandId(true); + } + else + { + fpes->full_transaction_id = InvalidFullTransactionId; + fpes->command_id = InvalidCommandId; + } shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -1392,6 +1407,15 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Get fixed-size state. */ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + if (FullTransactionIdIsValid(fpes->full_transaction_id)) + { + AssignFullTransactionIdForWorker(fpes->full_transaction_id); + } + if (fpes->command_id != InvalidCommandId) + { + AssignCommandIdForWorker(fpes->command_id, true); + } + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index a01b46a..b107944 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -60,6 +60,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) GatherState *gatherstate; Plan *outerNode; TupleDesc tupDesc; + Index varno; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -104,7 +105,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize result type and projection. */ ExecInitResultTypeTL(&gatherstate->ps); - ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); + varno = (outerNode->type == T_ModifyTable && ((ModifyTable *)outerNode)->returningLists != NULL) ? + ((ModifyTableState *)outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR; + ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno); /* * Without projections result slot type is not trivially known, see diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index b399592..7e90df5 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -3903,6 +3903,36 @@ compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages, } /* + * Compute the number of parallel workers that should be used to insert into + * a relation. + * + * "max_workers" is caller's limit on the number of workers. This typically + * comes from a GUC. + */ +int +compute_parallel_insert_worker(RelOptInfo *rel, int max_workers) +{ + int parallel_workers = 0; + + /* + * If the user has set the parallel_workers reloption, use that; otherwise + * select a default number of workers. + */ + if (rel->rel_parallel_workers != -1) + parallel_workers = rel->rel_parallel_workers; + else + { + /* TODO - smarts for computing best no. of workers for insert */ + parallel_workers = max_workers; + } + + /* In no case use more than caller supplied maximum number of workers */ + parallel_workers = Min(parallel_workers, max_workers); + + return parallel_workers; +} + +/* * generate_partitionwise_join_paths * Create paths representing partitionwise join for given partitioned * join relation. diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index cd3716d..2a6c27a 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -179,6 +179,7 @@ static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); static double get_parallel_divisor(Path *path); +static double get_modifytable_parallel_divisor(ModifyTablePath *path); /* @@ -202,6 +203,62 @@ clamp_row_est(double nrows) } +void +cost_modifytable(ModifyTablePath *path) +{ + double total_size; + double total_rows; + ListCell *lc; + + /* + * Compute cost & rowcount as sum of subpath costs & rowcounts. + */ + path->path.startup_cost = 0; + path->path.total_cost = 0; + path->path.rows = 0; + total_size = 0; + total_rows = 0; + foreach(lc, path->subpaths) + { + Path *subpath = (Path *) lfirst(lc); + + if (lc == list_head(path->subpaths)) /* first node? */ + path->path.startup_cost = subpath->startup_cost; + path->path.total_cost += subpath->total_cost; + total_rows += subpath->rows; + total_size += subpath->pathtarget->width * subpath->rows; + } + + /* Adjust costing for parallelism, if used. */ + if (path->path.parallel_workers > 0) + { + double parallel_divisor = get_modifytable_parallel_divisor(path); + + /* The total cost is divided among all the workers. */ + path->path.total_cost /= parallel_divisor; + + /* + * In the case of a parallel plan, the row count needs to represent + * the number of tuples processed per worker. + */ + path->path.rows = clamp_row_est(total_rows / parallel_divisor); + } + else + { + path->path.rows = total_rows; + } + + /* + * Set width to the average width of the subpath outputs. XXX this is + * totally wrong: we should report zero if no RETURNING, else an average + * of the RETURNING tlist widths. But it's what happened historically, + * and improving it is a task for another day. + */ + if (total_rows > 0) + total_size /= total_rows; + path->path.pathtarget->width = rint(total_size); +} + /* * cost_seqscan * Determines and returns the cost of scanning a relation sequentially. @@ -5737,6 +5794,29 @@ get_parallel_divisor(Path *path) } /* + * Divisor for ModifyTable (currently only Parallel Insert). + * Estimate the fraction of the work that each worker will do given the + * number of workers budgeted for the path. + * TODO: Needs revising based on further experience. + */ +static double +get_modifytable_parallel_divisor(ModifyTablePath *path) +{ + double parallel_divisor = path->path.parallel_workers; + + if (parallel_leader_participation && path->returningLists != NIL) + { + double leader_contribution; + + leader_contribution = 1.0 - (0.3 * path->path.parallel_workers); + if (leader_contribution > 0) + parallel_divisor += leader_contribution; + } + + return parallel_divisor; +} + +/* * compute_bitmap_pages * * compute number of pages fetched from heap in bitmap heap scan. diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 99278ee..cb9a6ea25 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path) * top-level tlist seen at execution time. However, ModifyTable plan * nodes don't have a tlist matching the querytree targetlist. */ - if (!IsA(plan, ModifyTable)) + if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(plan->lefttree, ModifyTable))) apply_tlist_labeling(plan->targetlist, root->processed_tlist); /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 8007e20..982db80 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -337,7 +337,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || parse->commandType == CMD_INSERT) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) @@ -371,6 +371,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * parallel-unsafe, or else the query planner itself has a bug. */ glob->parallelModeNeeded = glob->parallelModeOK && + (parse->commandType == CMD_SELECT) && (force_parallel_mode != FORCE_PARALLEL_OFF); /* Determine what fraction of the plan is likely to be scanned */ @@ -425,7 +426,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * Optionally add a Gather node for testing purposes, provided this is * actually a safe thing to do. */ - if (force_parallel_mode != FORCE_PARALLEL_OFF && top_plan->parallel_safe) + if (force_parallel_mode != FORCE_PARALLEL_OFF && parse->commandType == CMD_SELECT && top_plan->parallel_safe) { Gather *gather = makeNode(Gather); @@ -1798,7 +1799,8 @@ inheritance_planner(PlannerInfo *root) returningLists, rowMarks, NULL, - assign_special_exec_param(root))); + assign_special_exec_param(root), + 0)); } /*-------------------- @@ -1846,6 +1848,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, RelOptInfo *final_rel; FinalPathExtraData extra; ListCell *lc; + int parallel_insert_partial_path_count = 0; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -2382,13 +2385,97 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, returningLists, rowMarks, parse->onConflict, - assign_special_exec_param(root)); + assign_special_exec_param(root), + 0); } /* And shove it into final_rel */ add_path(final_rel, path); } + /* Consider Parallel INSERT */ + if (parse->commandType == CMD_INSERT && + !inheritance_update && + final_rel->consider_parallel && + parse->rowMarks == NIL) + { + Index rootRelation; + List *withCheckOptionLists; + List *returningLists; + int parallelInsertWorkers; + + parallelInsertWorkers = compute_parallel_insert_worker(current_rel, max_parallel_workers_per_gather); + + /* + * Generate partial paths for the final_rel. Insert all surviving paths, with + * Limit, and/or ModifyTable steps added if needed. + */ + foreach(lc, current_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* + * If there is a LIMIT/OFFSET clause, add the LIMIT node. + */ + if (limit_needed(parse)) + { + path = (Path *) create_limit_path(root, final_rel, path, + parse->limitOffset, + parse->limitCount, + parse->limitOption, + offset_est, count_est); + } + + /* + * Add the ModifyTable node. + */ + + /* + * If target is a partition root table, we need to mark the + * ModifyTable node appropriately for that. + */ + if (rt_fetch(parse->resultRelation, parse->rtable)->relkind == + RELKIND_PARTITIONED_TABLE) + rootRelation = parse->resultRelation; + else + rootRelation = 0; + + /* + * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if + * needed. + */ + if (parse->withCheckOptions) + withCheckOptionLists = list_make1(parse->withCheckOptions); + else + withCheckOptionLists = NIL; + + if (parse->returningList) + returningLists = list_make1(parse->returningList); + else + returningLists = NIL; + + path = (Path *) + create_modifytable_path(root, final_rel, + parse->commandType, + parse->canSetTag, + parse->resultRelation, + rootRelation, + false, + list_make1_int(parse->resultRelation), + list_make1(path), + list_make1(root), + withCheckOptionLists, + returningLists, + root->rowMarks, + parse->onConflict, + assign_special_exec_param(root), + parallelInsertWorkers); + + add_partial_path(final_rel, path); + parallel_insert_partial_path_count++; + } + } + /* * Generate partial paths for final_rel, too, if outer query levels might * be able to make use of them. @@ -2405,6 +2492,12 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, } } + if (parallel_insert_partial_path_count > 0) + { + final_rel->rows = current_rel->rows; /* ??? why hasn't this been set above somewhere ???? */ + generate_useful_gather_paths(root, final_rel, false); + } + extra.limit_needed = limit_needed(parse); extra.limit_tuples = limit_tuples; extra.count_est = count_est; @@ -7574,7 +7667,43 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) - generate_useful_gather_paths(root, rel, false); + { + if (root->parse->commandType == CMD_INSERT) + { + Relation relation; + bool hasFKs; + RangeTblEntry *rte; + List *cachedFKs; + + /* Check if the target relation has foreign keys; if so, avoid + * creating a parallel Insert plan (because inserting into + * such tables would result in creation of new CommandIds, and + * this isn't supported by parallel workers). + * Similarly, avoid creating a parallel Insert plan if ON + * CONFLICT ... DO UPDATE ... has been specified, because + * parallel UPDATE is not supported. + * However, do allow any underlying query to be run by parallel + * workers in these cases. + */ + + rte = rt_fetch(root->parse->resultRelation, root->parse->rtable); + relation = table_open(rte->relid, NoLock); + cachedFKs = RelationGetFKeyList(relation); + hasFKs = cachedFKs != NIL; + table_close(relation, NoLock); + + if (hasFKs || (root->parse->onConflict != NULL && root->parse->onConflict->action == ONCONFLICT_UPDATE)) + { + generate_useful_gather_paths(root, rel, false); + /* Don't allow parallel insert */ + rel->consider_parallel = false; + } + } + else + { + generate_useful_gather_paths(root, rel, false); + } + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index baefe0e..67a7cc8 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -225,6 +225,7 @@ set_plan_references(PlannerInfo *root, Plan *plan) PlannerGlobal *glob = root->glob; int rtoffset = list_length(glob->finalrtable); ListCell *lc; + Plan *finalPlan; /* * Add all the query's RTEs to the flattened rangetable. The live ones @@ -275,7 +276,16 @@ set_plan_references(PlannerInfo *root, Plan *plan) } /* Now fix the Plan tree */ - return set_plan_refs(root, plan, rtoffset); + finalPlan = set_plan_refs(root, plan, rtoffset); + if (finalPlan->type == T_Gather || finalPlan->type == T_GatherMerge) + { + Plan *subplan = outerPlan(finalPlan); + if (subplan->type == T_ModifyTable && ((ModifyTable *)subplan)->returningLists != NULL) + { + finalPlan->targetlist = outerPlan(finalPlan)->targetlist; + } + } + return finalPlan; } /* diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index c1fc866..4a9c3fa 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3538,11 +3538,11 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + int epqParam, + int parallel_workers) { + ListCell *lc; ModifyTablePath *pathnode = makeNode(ModifyTablePath); - double total_size; - ListCell *lc; Assert(list_length(resultRelations) == list_length(subpaths)); Assert(list_length(resultRelations) == list_length(subroots)); @@ -3557,45 +3557,22 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->path.pathtarget = rel->reltarget; /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; - pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; - pathnode->path.parallel_workers = 0; - pathnode->path.pathkeys = NIL; - - /* - * Compute cost & rowcount as sum of subpath costs & rowcounts. - * - * Currently, we don't charge anything extra for the actual table - * modification work, nor for the WITH CHECK OPTIONS or RETURNING - * expressions if any. It would only be window dressing, since - * ModifyTable is always a top-level node and there is no way for the - * costs to change any higher-level planning choices. But we might want - * to make it look better sometime. - */ - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; - pathnode->path.rows = 0; - total_size = 0; - foreach(lc, subpaths) + pathnode->path.parallel_aware = parallel_workers > 0 ? true : false; + pathnode->path.parallel_safe = rel->consider_parallel; + if (rel->consider_parallel) { - Path *subpath = (Path *) lfirst(lc); - - if (lc == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; - pathnode->path.rows += subpath->rows; - total_size += subpath->pathtarget->width * subpath->rows; + foreach (lc, subpaths) + { + Path *sp = (Path *)lfirst(lc); + if (!sp->parallel_safe) + { + pathnode->path.parallel_safe = false; + break; + } + } } - - /* - * Set width to the average width of the subpath outputs. XXX this is - * totally wrong: we should report zero if no RETURNING, else an average - * of the RETURNING tlist widths. But it's what happened historically, - * and improving it is a task for another day. - */ - if (pathnode->path.rows > 0) - total_size /= pathnode->path.rows; - pathnode->path.pathtarget->width = rint(total_size); + pathnode->path.parallel_workers = parallel_workers; + pathnode->path.pathkeys = NIL; pathnode->operation = operation; pathnode->canSetTag = canSetTag; @@ -3611,6 +3588,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + cost_modifytable(pathnode); + return pathnode; } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index df1b43a..71a6c9b 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -385,6 +385,8 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId); +extern void AssignCommandIdForWorker(CommandId commandId, bool used); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6141654..fafa087 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); +extern void cost_modifytable(ModifyTablePath *path); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 715a24a..2d08f0c 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -264,7 +264,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam); + int epqParam, + int parallel_workers); extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, Node *limitOffset, Node *limitCount, diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 10b6e81..1a01dcf 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -58,6 +58,8 @@ extern void generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows); extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages, int max_workers); +extern int compute_parallel_insert_worker(RelOptInfo *rel, + int max_workers); extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, Path *bitmapqual); extern void generate_partitionwise_join_paths(PlannerInfo *root,