diff --git a/doc/src/sgml/ref/create_aggregate.sgml b/doc/src/sgml/ref/create_aggregate.sgml
index 7a6f8a9..32fc3ac 100644
--- a/doc/src/sgml/ref/create_aggregate.sgml
+++ b/doc/src/sgml/ref/create_aggregate.sgml
@@ -40,6 +40,7 @@ CREATE AGGREGATE name ( [ minitial_condition ]
[ , SORTOP = sort_operator ]
+ [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]
)
CREATE AGGREGATE name ( [ [ argmode ] [ argname ] arg_data_type [ , ... ] ]
@@ -684,6 +685,12 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
Currently, ordered-set aggregates do not need to support
moving-aggregate mode, since they cannot be used as window functions.
+
+
+ The meaning of PARALLEL SAFE>, PARALLEL RESTRICTED>,
+ and PARALLEL UNSAFE> is the same as for
+ .
+
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index b420349..bcc9411 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -72,7 +72,8 @@ AggregateCreate(const char *aggName,
Oid aggmTransType,
int32 aggmTransSpace,
const char *agginitval,
- const char *aggminitval)
+ const char *aggminitval,
+ char proparallel)
{
Relation aggdesc;
HeapTuple tup;
@@ -622,7 +623,7 @@ AggregateCreate(const char *aggName,
false, /* isStrict (not needed for agg) */
PROVOLATILE_IMMUTABLE, /* volatility (not
* needed for agg) */
- PROPARALLEL_UNSAFE,
+ proparallel,
parameterTypes, /* paramTypes */
allParameterTypes, /* allParamTypes */
parameterModes, /* parameterModes */
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index 3424f84..b551f42 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -91,6 +91,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
Oid mtransTypeId = InvalidOid;
char transTypeType;
char mtransTypeType = 0;
+ char proparallel = PROPARALLEL_UNSAFE;
ListCell *pl;
/* Convert list of names to a name and namespace */
@@ -178,6 +179,39 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
initval = defGetString(defel);
else if (pg_strcasecmp(defel->defname, "minitcond") == 0)
minitval = defGetString(defel);
+ else if (pg_strcasecmp(defel->defname, "parallel") == 0)
+ {
+ bool ok = false;
+
+ if (IsA(defel->arg, TypeName))
+ {
+ TypeName *tn = (TypeName *) defel->arg;
+ char *parallel;
+
+ Assert(tn->names != NULL && IsA(tn->names, List));
+
+ if (list_length(tn->names) == 1)
+ {
+ Assert(IsA(linitial(tn->names), String));
+ parallel = strVal(linitial(tn->names));
+ ok = true;
+
+ if (pg_strcasecmp(parallel, "safe") == 0)
+ proparallel = PROPARALLEL_SAFE;
+ else if (pg_strcasecmp(parallel, "restricted") == 0)
+ proparallel = PROPARALLEL_RESTRICTED;
+ else if (pg_strcasecmp(parallel, "unsafe") == 0)
+ proparallel = PROPARALLEL_UNSAFE;
+ else
+ ok = false;
+ }
+ }
+
+ if (!ok)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE")));
+ }
else
ereport(WARNING,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -480,5 +514,6 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
mtransTypeId, /* transition data type */
mtransSpace, /* transition space */
initval, /* initial condition */
- minitval); /* initial condition */
+ minitval, /* initial condition */
+ proparallel); /* parallel safe? */
}
diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c
index a745d73..748c8f7 100644
--- a/src/backend/commands/functioncmds.c
+++ b/src/backend/commands/functioncmds.c
@@ -566,9 +566,8 @@ interpret_func_parallel(DefElem *defel)
else
{
ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("parallel option \"%s\" not recognized",
- str)));
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parameter \"parallel\" must be SAFE, RESTRICTED, or UNSAFE")));
return PROPARALLEL_UNSAFE; /* keep compiler quiet */
}
}
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index c615717..5674a73 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -1419,6 +1419,13 @@ has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *context)
if (parallel_too_dangerous(func_parallel(expr->funcid), context))
return true;
}
+ else if (IsA(node, Aggref))
+ {
+ Aggref *aggref = (Aggref *) node;
+
+ if (parallel_too_dangerous(func_parallel(aggref->aggfnoid), context))
+ return true;
+ }
else if (IsA(node, OpExpr))
{
OpExpr *expr = (OpExpr *) node;
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 4205fab..4e81260 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -349,6 +349,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
Oid aggmTransType,
int32 aggmTransSpace,
const char *agginitval,
- const char *aggminitval);
+ const char *aggminitval,
+ char proparallel);
#endif /* PG_AGGREGATE_H */
diff --git a/src/test/regress/expected/create_aggregate.out b/src/test/regress/expected/create_aggregate.out
index 66e073d..5bbe1a5 100644
--- a/src/test/regress/expected/create_aggregate.out
+++ b/src/test/regress/expected/create_aggregate.out
@@ -20,9 +20,9 @@ CREATE AGGREGATE newsum (
-- zero-argument aggregate
CREATE AGGREGATE newcnt (*) (
sfunc = int8inc, stype = int8,
- initcond = '0'
+ initcond = '0', parallel = safe
);
--- old-style spelling of same
+-- old-style spelling of same (except without parallel-safe; that's too new)
CREATE AGGREGATE oldcnt (
sfunc = int8inc, basetype = 'ANY', stype = int8,
initcond = '0'
@@ -119,6 +119,14 @@ WHERE aggfnoid = 'mysum'::REGPROC;
(1 row)
DROP AGGREGATE mysum (int);
+-- invalid: bad parallel-safety marking
+CREATE AGGREGATE mysum (int)
+(
+ stype = int,
+ sfunc = int4pl,
+ parallel = pear
+);
+ERROR: parameter "parallel" must be SAFE, RESTRICTED, or UNSAFE
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS
$$ SELECT $1 - $2; $$
diff --git a/src/test/regress/sql/create_aggregate.sql b/src/test/regress/sql/create_aggregate.sql
index dfcbc5a..3b9d26b 100644
--- a/src/test/regress/sql/create_aggregate.sql
+++ b/src/test/regress/sql/create_aggregate.sql
@@ -23,10 +23,10 @@ CREATE AGGREGATE newsum (
-- zero-argument aggregate
CREATE AGGREGATE newcnt (*) (
sfunc = int8inc, stype = int8,
- initcond = '0'
+ initcond = '0', parallel = safe
);
--- old-style spelling of same
+-- old-style spelling of same (except without parallel-safe; that's too new)
CREATE AGGREGATE oldcnt (
sfunc = int8inc, basetype = 'ANY', stype = int8,
initcond = '0'
@@ -132,6 +132,14 @@ WHERE aggfnoid = 'mysum'::REGPROC;
DROP AGGREGATE mysum (int);
+-- invalid: bad parallel-safety marking
+CREATE AGGREGATE mysum (int)
+(
+ stype = int,
+ sfunc = int4pl,
+ parallel = pear
+);
+
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS