diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml index 7a6f8a9..32fc3ac 100644 --- a/doc/src/sgml/ref/create_aggregate.sgml +++ b/doc/src/sgml/ref/create_aggregate.sgml @@ -40,6 +40,7 @@ CREATE AGGREGATE name ( [ minitial_condition ] [ , SORTOP = sort_operator ] + [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ] ) CREATE AGGREGATE name ( [ [ argmode ] [ argname ] arg_data_type [ , ... ] ] @@ -684,6 +685,12 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1; Currently, ordered-set aggregates do not need to support moving-aggregate mode, since they cannot be used as window functions. + + + The meaning of PARALLEL SAFE, PARALLEL RESTRICTED, + and PARALLEL UNSAFE is the same as for + . + diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index b420349..bcc9411 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -72,7 +72,8 @@ AggregateCreate(const char *aggName, Oid aggmTransType, int32 aggmTransSpace, const char *agginitval, - const char *aggminitval) + const char *aggminitval, + char proparallel) { Relation aggdesc; HeapTuple tup; @@ -622,7 +623,7 @@ AggregateCreate(const char *aggName, false, /* isStrict (not needed for agg) */ PROVOLATILE_IMMUTABLE, /* volatility (not * needed for agg) */ - PROPARALLEL_UNSAFE, + proparallel, parameterTypes, /* paramTypes */ allParameterTypes, /* allParamTypes */ parameterModes, /* parameterModes */ diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index 3424f84..b551f42 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -91,6 +91,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, Oid mtransTypeId = InvalidOid; char transTypeType; char mtransTypeType = 0; + char proparallel = PROPARALLEL_UNSAFE; ListCell *pl; /* Convert list of names to a name and namespace */ @@ -178,6 +179,39 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, initval = defGetString(defel); else if (pg_strcasecmp(defel->defname, "minitcond") == 0) minitval = defGetString(defel); + else if (pg_strcasecmp(defel->defname, "parallel") == 0) + { + bool ok = false; + + if (IsA(defel->arg, TypeName)) + { + TypeName *tn = (TypeName *) defel->arg; + char *parallel; + + Assert(tn->names != NULL && IsA(tn->names, List)); + + if (list_length(tn->names) == 1) + { + Assert(IsA(linitial(tn->names), String)); + parallel = strVal(linitial(tn->names)); + ok = true; + + if (pg_strcasecmp(parallel, "safe") == 0) + proparallel = PROPARALLEL_SAFE; + else if (pg_strcasecmp(parallel, "restricted") == 0) + proparallel = PROPARALLEL_RESTRICTED; + else if (pg_strcasecmp(parallel, "unsafe") == 0) + proparallel = PROPARALLEL_UNSAFE; + else + ok = false; + } + } + + if (!ok) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE"))); + } else ereport(WARNING, (errcode(ERRCODE_SYNTAX_ERROR), @@ -480,5 +514,6 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters, mtransTypeId, /* transition data type */ mtransSpace, /* transition space */ initval, /* initial condition */ - minitval); /* initial condition */ + minitval, /* initial condition */ + proparallel); /* parallel safe? */ } diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index a745d73..748c8f7 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -566,9 +566,8 @@ interpret_func_parallel(DefElem *defel) else { ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("parallel option \"%s\" not recognized", - str))); + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE"))); return PROPARALLEL_UNSAFE; /* keep compiler quiet */ } } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index c615717..5674a73 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -1419,6 +1419,13 @@ has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *context) if (parallel_too_dangerous(func_parallel(expr->funcid), context)) return true; } + else if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + + if (parallel_too_dangerous(func_parallel(aggref->aggfnoid), context)) + return true; + } else if (IsA(node, OpExpr)) { OpExpr *expr = (OpExpr *) node; diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 4205fab..4e81260 100644 --- a/src/include/catalog/pg_aggregate.h +++ b/src/include/catalog/pg_aggregate.h @@ -349,6 +349,7 @@ extern ObjectAddress AggregateCreate(const char *aggName, Oid aggmTransType, int32 aggmTransSpace, const char *agginitval, - const char *aggminitval); + const char *aggminitval, + char proparallel); #endif /* PG_AGGREGATE_H */ diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out index 66e073d..5bbe1a5 100644 --- a/src/test/regress/expected/create_aggregate.out +++ b/src/test/regress/expected/create_aggregate.out @@ -20,9 +20,9 @@ CREATE AGGREGATE newsum ( -- zero-argument aggregate CREATE AGGREGATE newcnt (*) ( sfunc = int8inc, stype = int8, - initcond = '0' + initcond = '0', parallel = safe ); --- old-style spelling of same +-- old-style spelling of same (except without parallel-safe; that's too new) CREATE AGGREGATE oldcnt ( sfunc = int8inc, basetype = 'ANY', stype = int8, initcond = '0' @@ -119,6 +119,14 @@ WHERE aggfnoid = 'mysum'::REGPROC; (1 row) DROP AGGREGATE mysum (int); +-- invalid: bad parallel-safety marking +CREATE AGGREGATE mysum (int) +( + stype = int, + sfunc = int4pl, + parallel = pear +); +ERROR: parameter "parallel" must be SAFE, RESTRICTED, or UNSAFE -- invalid: nonstrict inverse with strict forward function CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS $$ SELECT $1 - $2; $$ diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql index dfcbc5a..3b9d26b 100644 --- a/src/test/regress/sql/create_aggregate.sql +++ b/src/test/regress/sql/create_aggregate.sql @@ -23,10 +23,10 @@ CREATE AGGREGATE newsum ( -- zero-argument aggregate CREATE AGGREGATE newcnt (*) ( sfunc = int8inc, stype = int8, - initcond = '0' + initcond = '0', parallel = safe ); --- old-style spelling of same +-- old-style spelling of same (except without parallel-safe; that's too new) CREATE AGGREGATE oldcnt ( sfunc = int8inc, basetype = 'ANY', stype = int8, initcond = '0' @@ -132,6 +132,14 @@ WHERE aggfnoid = 'mysum'::REGPROC; DROP AGGREGATE mysum (int); +-- invalid: bad parallel-safety marking +CREATE AGGREGATE mysum (int) +( + stype = int, + sfunc = int4pl, + parallel = pear +); + -- invalid: nonstrict inverse with strict forward function CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS