From 202024c8d24ac9fa393b8e3b36bfdcc14d7bae0e Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 21 Apr 2021 19:21:11 +0800 Subject: [PATCH] get-parallel-safety-function provide a utility function pg_get_parallel_safety('table_name') that returns records of (objid, classid, parallel_safety) that represent the parallel safety of objects that determine the parallel safety of the specified table. The user can use this function to identify problematic objects when a parallel DML fails or is not parallelized in an expected manner. When detecting an parallel unsafe/restricted function in index(or others can have an expression), return both the function oid and the index oid. --- src/backend/optimizer/plan/planner.c | 3 +- src/backend/optimizer/util/clauses.c | 600 ++++++++++++++++++++++++++- src/backend/utils/adt/misc.c | 75 ++++ src/backend/utils/cache/typcache.c | 14 + src/include/access/xact.h | 14 + src/include/catalog/pg_proc.dat | 18 +- src/include/optimizer/clauses.h | 9 + src/include/utils/typcache.h | 2 + 8 files changed, 729 insertions(+), 6 deletions(-) diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1868c4eff4..dbc2827d20 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -332,7 +332,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || + is_parallel_allowed_for_modify(parse)) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index d9ad4efc5e..3ec0bf26cb 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -19,13 +19,20 @@ #include "postgres.h" +#include "access/amapi.h" +#include "access/genam.h" #include "access/htup_details.h" +#include "access/table.h" +#include "access/xact.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -43,6 +50,9 @@ #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_func.h" +#include "parser/parsetree.h" +#include "partitioning/partdesc.h" +#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -51,6 +61,8 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/partcache.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -88,6 +100,9 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + bool check_all; + List *func_oids; + PartitionDirectory partition_directory; } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -98,6 +113,20 @@ static bool contain_volatile_functions_walker(Node *node, void *context); static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context); static bool max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context); +static List *target_rel_all_parallel_hazard_recurse(Relation relation, + max_parallel_hazard_context *context); +static List *target_rel_trigger_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static List *index_expr_max_parallel_hazard(Relation index_rel, List *ii_Expressions, List *ii_Predicate, + bool check_all, char max_interesting, max_parallel_hazard_context *context); +static List *target_rel_index_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static List *target_rel_domain_max_parallel_hazard(Oid typid, + max_parallel_hazard_context *context); +static List *target_rel_partitions_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static List *target_rel_chk_constr_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); static bool contain_nonstrict_functions_walker(Node *node, void *context); static bool contain_exec_param_walker(Node *node, List *param_ids); static bool contain_context_dependent_node(Node *clause); @@ -149,6 +178,7 @@ static Query *substitute_actual_srf_parameters(Query *expr, static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); +static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel); /***************************************************************************** * Aggregate-function clause manipulation @@ -620,6 +650,10 @@ max_parallel_hazard(Query *parse) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; context.safe_param_ids = NIL; + context.check_all = false; + context.func_oids = NIL; + context.partition_directory = NULL; + (void) max_parallel_hazard_walker((Node *) parse, &context); return context.max_hazard; } @@ -651,6 +685,9 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + context.check_all = false; + context.func_oids = NIL; + context.partition_directory = NULL; /* * The params that refer to the same or parent query level are considered @@ -682,7 +719,7 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context) break; case PROPARALLEL_RESTRICTED: /* increase max_hazard to RESTRICTED */ - Assert(context->max_hazard != PROPARALLEL_UNSAFE); + Assert(context->check_all || context->max_hazard != PROPARALLEL_UNSAFE); context->max_hazard = proparallel; /* done if we are not expecting any unsafe functions */ if (context->max_interesting == proparallel) @@ -699,6 +736,63 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context) return false; } + +static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel) +{ + safety_object *object = (safety_object *) palloc(sizeof(safety_object)); + + object->objid = objid; + object->classid = classid; + object->proparallel = proparallel; + + return object; +} + +static bool +parallel_safety_checker(Oid func_id, void *context) +{ + char proparallel; + bool max_hazard_found; + max_parallel_hazard_context *cont = (max_parallel_hazard_context *) context; + + proparallel = func_parallel(func_id); + max_hazard_found = max_parallel_hazard_test(proparallel, cont); + + if ((proparallel != PROPARALLEL_SAFE && cont->check_all) || + max_hazard_found) + { + cont->func_oids = lappend(cont->func_oids, + make_safety_object(func_id, ProcedureRelationId, proparallel)); + } + + return max_hazard_found && !cont->check_all; +} + +/* Check parallel unsafe/restricted function in expression */ +static bool +parallel_safety_walker(Node *node, max_parallel_hazard_context *context) +{ + if (node == NULL) + return false; + + /* Check for hazardous functions in node itself */ + if (check_functions_in_node(node, parallel_safety_checker, + context)) + return true; + + if (IsA(node, CoerceToDomain)) + { + if (target_rel_domain_max_parallel_hazard(((CoerceToDomain *)node)->resulttype, context) != NIL && + !context->check_all) + return true; + } + + /* Recurse to check arguments */ + return expression_tree_walker(node, + parallel_safety_walker, + context); +} + /* check_functions_in_node callback */ static bool max_parallel_hazard_checker(Oid func_id, void *context) @@ -854,6 +948,510 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) context); } +List* +target_rel_max_parallel_hazard(RangeVar *relrv, bool findall, char max_interesting) +{ + max_parallel_hazard_context context; + Relation targetRel; + List *objects; + + context.check_all = findall; + context.func_oids = NIL; + context.max_hazard = PROPARALLEL_SAFE; + context.max_interesting = max_interesting; + context.safe_param_ids = NIL; + context.partition_directory = NULL; + + targetRel = table_openrv(relrv, AccessShareLock); + + objects = target_rel_all_parallel_hazard_recurse(targetRel, &context); + if (context.partition_directory) + DestroyPartitionDirectory(context.partition_directory); + + table_close(targetRel, AccessShareLock); + + return objects; +} + + +List * +target_rel_all_parallel_hazard_recurse(Relation rel, max_parallel_hazard_context *context) +{ + TupleDesc tupdesc; + int attnum; + + Assert(context != NULL && context->check_all); + + /* + * We can't support table modification in a parallel worker if it's a + * foreign table/partition (no FDW API for supporting parallel access) or + * a temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context); + context->func_oids = lappend(context->func_oids, + make_safety_object(rel->rd_rel->oid, RelationRelationId, PROPARALLEL_RESTRICTED)); + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + (void) target_rel_partitions_max_parallel_hazard(rel, context); + + /* + * If there are any index expressions or index predicate, check that they + * are parallel-mode safe. + */ + (void) target_rel_index_max_parallel_hazard(rel, context); + + /* + * If any triggers exist, check that they are parallel-safe. + */ + (void) target_rel_trigger_max_parallel_hazard(rel, context); + + /* + * Column default expressions are only applicable to INSERT and UPDATE. + * Note that even though column defaults may be specified separately for + * each partition in a partitioned table, a partition's default value is + * not applied when inserting a tuple through a partitioned table. + */ + + tupdesc = RelationGetDescr(rel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atthasdef) + { + Node *defaultexpr; + defaultexpr = build_column_default(rel, attnum); + parallel_safety_walker((Node *) defaultexpr, context); + } + + /* + * If the column is of a DOMAIN type, determine whether that + * domain has any CHECK expressions that are not parallel-mode + * safe. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + (void) target_rel_domain_max_parallel_hazard(att->atttypid, context); + } + } + + /* + * CHECK constraints are only applicable to INSERT and UPDATE. If any + * CHECK constraints exist, determine if they are parallel-safe. + */ + (void) target_rel_chk_constr_max_parallel_hazard(rel, context); + + return context->func_oids; +} + +/* + * target_rel_trigger_max_parallel_hazard + * + * Finds all the PARALLEL UNSAFE/RESTRICTED objects for the specified relation's + * trigger data. + */ +static List* +target_rel_trigger_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + int i; + char proparallel; + + Assert(context != NULL && context->check_all); + + if (rel->trigdesc == NULL) + return context->func_oids; + + /* + * Care is needed here to avoid using the same relcache TriggerDesc field + * across other cache accesses, because relcache doesn't guarantee that it + * won't move. + */ + for (i = 0; i < rel->trigdesc->numtriggers; i++) + { + Oid tgfoid = rel->trigdesc->triggers[i].tgfoid; + Oid tgoid = rel->trigdesc->triggers[i].tgoid; + + proparallel = func_parallel(tgfoid); + + if (proparallel != PROPARALLEL_SAFE) + { + context->func_oids = lappend(context->func_oids, + make_safety_object(tgfoid, ProcedureRelationId, proparallel)); + context->func_oids = lappend(context->func_oids, + make_safety_object(tgoid, TriggerRelationId, proparallel)); + } + } + + return context->func_oids; +} + +static List* +index_expr_max_parallel_hazard(Relation index_rel, + List *ii_Expressions, List *ii_Predicate, + bool check_all, char max_interesting, + max_parallel_hazard_context *context) +{ + int indnatts; + int nsupport; + Form_pg_index indexStruct; + int i; + ListCell *index_expr_item; + + Assert(context != NULL && context->check_all); + + indexStruct = index_rel->rd_index; + index_expr_item = list_head(ii_Expressions); + + if (ii_Expressions != NIL) + { + for (i = 0; i < indexStruct->indnatts; i++) + { + int keycol = indexStruct->indkey.values[i]; + + if (keycol == 0) + { + /* Found an index expression */ + Node *index_expr; + + Assert(index_expr_item != NULL); + if (index_expr_item == NULL) /* shouldn't happen */ + elog(ERROR, "too few entries in indexprs list"); + + index_expr = (Node *) lfirst(index_expr_item); + + /* find some not safe objects */ + parallel_safety_walker(index_expr, context); + index_expr_item = lnext(ii_Expressions, index_expr_item); + } + } + } + + if (ii_Predicate != NIL) + parallel_safety_walker((Node *) ii_Predicate, context); + + /* + * Check parallel-safety of any index AM support functions. + */ + indnatts = IndexRelationGetNumberOfAttributes(index_rel); + nsupport = indnatts * index_rel->rd_indam->amsupport; + if (nsupport > 0) + { + for (i = 0; i < nsupport; i++) + { + char proparallel; + + Oid funcOid = index_rel->rd_support[i]; + if (!OidIsValid(funcOid)) + continue; + + proparallel = func_parallel(funcOid); + if (proparallel != PROPARALLEL_SAFE) + { + context->func_oids = lappend(context->func_oids, + make_safety_object(funcOid, ProcedureRelationId, proparallel)); + } + } + } + + return context->func_oids; +} + +/* + * target_rel_index_max_parallel_hazard + * + * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any existing index + * expressions or index predicate of a specified relation. + */ +static List* +target_rel_index_max_parallel_hazard(Relation rel, max_parallel_hazard_context *context) +{ + List *index_oid_list; + ListCell *lc; + LOCKMODE lockmode = AccessShareLock; + + Assert(context != NULL && context->check_all); + + index_oid_list = RelationGetIndexList(rel); + foreach(lc, index_oid_list) + { + Relation index_rel; + List *ii_Expressions; + List *ii_Predicate; + List *temp_objects; + Oid index_oid = lfirst_oid(lc); + + temp_objects = context->func_oids; + context->func_oids = NIL; + context->max_hazard = PROPARALLEL_SAFE; + + index_rel = index_open(index_oid, lockmode); + + /* Check index expression */ + ii_Expressions = RelationGetIndexExpressions(index_rel); + ii_Predicate = RelationGetIndexPredicate(index_rel); + + index_expr_max_parallel_hazard(index_rel, ii_Expressions, + ii_Predicate, context->check_all, + context->max_interesting, + context); + + /* Add the index itself to the objects list */ + if (context->func_oids != NIL) + { + context->func_oids = lappend(context->func_oids, + make_safety_object(index_oid, IndexRelationId, context->max_hazard)); + } + + context->func_oids = list_concat(context->func_oids, temp_objects); + list_free(temp_objects); + index_close(index_rel, lockmode); + } + + list_free(index_oid_list); + + return context->func_oids; +} + +/* + * target_rel_domain_max_parallel_hazard + * + * Finds all the PARALLEL UNSAFE/RESTRICTED objects for the specified DOMAIN type. + * Only any CHECK expressions are examined for parallel-safety. + */ +static List* +target_rel_domain_max_parallel_hazard(Oid typid, max_parallel_hazard_context *context) +{ + ListCell *lc; + List *domain_list; + List *temp_objects; + + Assert(context != NULL && context->check_all); + + domain_list = GetDomainConstraints(typid); + + foreach(lc, domain_list) + { + DomainConstraintState *r = (DomainConstraintState *) lfirst(lc); + + temp_objects = context->func_oids; + context->func_oids = NIL; + context->max_hazard = PROPARALLEL_SAFE; + + parallel_safety_walker((Node *) r->check_expr, context); + + /* Add the Constraint itself to the objects list */ + if (context->func_oids != NIL) + { + context->func_oids = lappend(context->func_oids, + make_safety_object(get_domain_constraint_oid(typid, r->name, false), + ConstraintRelationId, + context->max_hazard)); + } + + context->func_oids = list_concat(context->func_oids, temp_objects); + list_free(temp_objects); + } + + return context->func_oids; + +} + +/* + * target_rel_partitions_max_parallel_hazard + * + * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any partitions of a + * of a specified relation. + */ +static List* +target_rel_partitions_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + int i; + PartitionDesc pdesc; + PartitionKey pkey; + ListCell *partexprs_item; + int partnatts; + List *partexprs, *qual; + + Assert(context != NULL && context->check_all); + + /* Check partition check expression */ + qual = RelationGetPartitionQual(rel); + parallel_safety_walker((Node *) qual, context); + + if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + return context->func_oids; + + pkey = RelationGetPartitionKey(rel); + + partnatts = get_partition_natts(pkey); + partexprs = get_partition_exprs(pkey); + + partexprs_item = list_head(partexprs); + for (i = 0; i < partnatts; i++) + { + Oid funcOid = pkey->partsupfunc[i].fn_oid; + if (OidIsValid(funcOid)) + { + char proparallel = func_parallel(funcOid); + max_parallel_hazard_test(proparallel, context); + + if (proparallel != PROPARALLEL_SAFE) + context->func_oids = lappend(context->func_oids, + make_safety_object(funcOid, ProcedureRelationId, proparallel)); + } + /* Check parallel-safety of any expressions in the partition key */ + if (get_partition_col_attnum(pkey, i) == 0) + { + Node *check_expr = (Node *) lfirst(partexprs_item); + + parallel_safety_walker(check_expr, context); + partexprs_item = lnext(partexprs, partexprs_item); + } + } + + /* Recursively check each partition ... */ + + /* Create the PartitionDirectory infrastructure if we didn't already */ + if (context->partition_directory == NULL) + context->partition_directory = + CreatePartitionDirectory(CurrentMemoryContext, false); + + pdesc = PartitionDirectoryLookup(context->partition_directory, rel); + + for (i = 0; i < pdesc->nparts; i++) + { + Relation part_rel; + + part_rel = table_open(pdesc->oids[i], AccessShareLock); + (void) target_rel_all_parallel_hazard_recurse(part_rel, context); + table_close(part_rel, AccessShareLock); + } + + return context->func_oids; +} + +/* + * target_rel_chk_constr_max_parallel_hazard + * + * Finds all the PARALLEL UNSAFE/RESTRICTED objects for any CHECK expressions or + * CHECK constraints related to the specified relation. + */ +static List* +target_rel_chk_constr_max_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + TupleDesc tupdesc; + List *temp_objects; + + Assert(context != NULL && context->check_all); + + tupdesc = RelationGetDescr(rel); + + /* + * Determine if there are any CHECK constraints which are not + * parallel-safe. + */ + if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0) + { + int i; + + ConstrCheck *check = tupdesc->constr->check; + + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check[i].ccbin); + + temp_objects = context->func_oids; + context->func_oids = NIL; + context->max_hazard = PROPARALLEL_SAFE; + + parallel_safety_walker((Node *) check_expr, context); + + if (context->func_oids != NIL) + { + context->func_oids = lappend(context->func_oids, + make_safety_object(get_relation_constraint_oid(rel->rd_rel->oid, check->ccname, true), ConstraintRelationId, context->max_hazard)); + } + + context->func_oids = list_concat(context->func_oids, temp_objects); + list_free(temp_objects); + } + } + + return context->func_oids; +} + +/* + * is_parallel_allowed_for_modify + * + * Check at a high-level if parallel mode is able to be used for the specified + * table-modification statement. Currently, we support only Inserts. + * + * It's not possible in the following cases: + * + * 1) INSERT...ON CONFLICT...DO UPDATE + * 2) INSERT without SELECT + * + * (Note: we don't do in-depth parallel-safety checks here, we do only the + * cheaper tests that can quickly exclude obvious cases for which + * parallelism isn't supported, to avoid having to do further parallel-safety + * checks for these) + */ +bool +is_parallel_allowed_for_modify(Query *parse) +{ + bool hasSubQuery; + RangeTblEntry *rte; + ListCell *lc; + + if (!IsModifySupportedInParallelMode(parse->commandType)) + return false; + + /* + * UPDATE is not currently supported in parallel-mode, so prohibit + * INSERT...ON CONFLICT...DO UPDATE... + * + * In order to support update, even if only in the leader, some further + * work would need to be done. A mechanism would be needed for sharing + * combo-cids between leader and workers during parallel-mode, since for + * example, the leader might generate a combo-cid and it needs to be + * propagated to the workers. + */ + if (parse->commandType == CMD_INSERT && + parse->onConflict != NULL && + parse->onConflict->action == ONCONFLICT_UPDATE) + return false; + + /* + * If there is no underlying SELECT, a parallel insert operation is not + * desirable. + */ + hasSubQuery = false; + foreach(lc, parse->rtable) + { + rte = lfirst_node(RangeTblEntry, lc); + if (rte->rtekind == RTE_SUBQUERY) + { + hasSubQuery = true; + break; + } + } + + return hasSubQuery; +} /***************************************************************************** * Check clauses for nonstrict functions diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index 88faf4dfd7..33bc6b602d 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -23,6 +23,8 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/catalog.h" +#include "catalog/namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_type.h" #include "catalog/system_fk_info.h" @@ -31,6 +33,7 @@ #include "common/keywords.h" #include "funcapi.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "parser/scansup.h" #include "pgstat.h" #include "postmaster/syslogger.h" @@ -43,6 +46,7 @@ #include "utils/lsyscache.h" #include "utils/ruleutils.h" #include "utils/timestamp.h" +#include "utils/varlena.h" /* * Common subroutine for num_nulls() and num_nonnulls(). @@ -605,6 +609,77 @@ pg_collation_for(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(generate_collation_name(collid))); } +/* + * Determine whether the target relation is safe to execute parallel modification. + * + * Return all the PARALLEL RESTRICTED/UNSAFE objects. + */ +Datum +pg_get_parallel_safety(PG_FUNCTION_ARGS) +{ +#define PG_GET_PARALLEL_SAFETY_COLS 3 + List *objects; + ListCell *object; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + RangeVar *relvar; + text *relname_text; + ReturnSetInfo *rsinfo; + + relname_text = PG_GETARG_TEXT_PP(0); + rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + + MemoryContextSwitchTo(oldcontext); + + relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text)); + objects = target_rel_max_parallel_hazard(relvar, true, PROPARALLEL_UNSAFE); + foreach(object, objects) + { + Datum values[PG_GET_PARALLEL_SAFETY_COLS]; + bool nulls[PG_GET_PARALLEL_SAFETY_COLS]; + safety_object *sobject = (safety_object *) lfirst(object); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = sobject->objid; + values[1] = sobject->classid; + values[2] = sobject->proparallel; + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + /* * pg_relation_is_updatable - determine which update events the specified diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 4915ef5934..260f5d45c8 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -2518,6 +2518,20 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2) return 0; } + +List *GetDomainConstraints(Oid type_id) +{ + TypeCacheEntry *typentry; + List *constraints = NIL; + + typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO); + + if(typentry->domainData != NULL) + constraints = typentry->domainData->constraints; + + return constraints; +} + /* * Load (or re-load) the enumData member of the typcache entry. */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index f49a57b35e..c04e6a98d7 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -467,4 +467,18 @@ extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +/* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * parallel-safety conditions. + */ +static inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} + #endif /* XACT_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b62abcd22c..a511a03021 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3765,6 +3765,16 @@ provolatile => 's', prorettype => 'regclass', proargtypes => 'regclass', prosrc => 'pg_get_replica_identity_index' }, +{ oid => '6122', + descr => 'information about the parallel unsafe or restricted objects in the target table', + proname => 'pg_get_parallel_safety', prorows => '100', + proretset => 't', provolatile => 'v', proparallel => 'u', + prorettype => 'record', proargtypes => 'text', + proallargtypes => '{text,oid,oid,char}', + proargmodes => '{i,o,o,o}', + proargnames => '{table_name, objid, classid, proparallel}', + prosrc => 'pg_get_parallel_safety' }, + # Deferrable unique constraint trigger { oid => '1250', descr => 'deferred UNIQUE constraint check', proname => 'unique_key_recheck', provolatile => 'v', prorettype => 'trigger', @@ -3772,11 +3782,11 @@ # Generic referential integrity constraint triggers { oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES', - proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger', - proargtypes => '', prosrc => 'RI_FKey_check_ins' }, + proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r', + prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' }, { oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES', - proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger', - proargtypes => '', prosrc => 'RI_FKey_check_upd' }, + proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r', + prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' }, { oid => '1646', descr => 'referential integrity ON DELETE CASCADE', proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_cascade_del' }, diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 0673887a85..857d89e0d4 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -23,6 +23,13 @@ typedef struct List **windowFuncs; /* lists of WindowFuncs for each winref */ } WindowFuncLists; +typedef struct safety_object +{ + Oid objid; + Oid classid; + char proparallel; +} safety_object; + extern bool contain_agg_clause(Node *clause); extern bool contain_window_function(Node *clause); @@ -52,5 +59,7 @@ extern void CommuteOpExpr(OpExpr *clause); extern Query *inline_set_returning_function(PlannerInfo *root, RangeTblEntry *rte); +extern bool is_parallel_allowed_for_modify(Query *parse); +extern List *target_rel_max_parallel_hazard(RangeVar *relrv, bool findall, char max_interesting); #endif /* CLAUSES_H */ diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 1d68a9a4b7..28ca7d8a6e 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern List *GetDomainConstraints(Oid type_id); + extern size_t SharedRecordTypmodRegistryEstimate(void); extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, -- 2.18.4