[PoC] Implementation of distinct in Window Aggregates: take two
Hi,
This is reopening of thread:
/messages/by-id/2ef6b491-1946-b606-f064-d9ea79d91463@gmail.com
This is a PoC patch which implements distinct operation in window
aggregates (without order by and for single column aggregation, final
version may vary wrt these limitations). Purpose of this PoC is to get
feedback on the approach used and corresponding implementation, any
nitpicking as deemed reasonable.
Distinct operation is mirrored from implementation in nodeAgg. Existing
partitioning logic determines if row is in partition and when distinct is
required, all tuples for the aggregate column are stored in tuplesort. When
finalize_windowaggregate gets called, tuples are sorted and duplicates are
removed, followed by calling the transition function on each tuple.
When distinct is not required, the above process is skipped and the
transition function gets called directly and nothing gets inserted into
tuplesort.
Note: For each partition, in tuplesort_begin and tuplesort_end is involved
to rinse tuplesort, so at any time, max tuples in tuplesort is equal to
tuples in a particular partition.
I have verified it for interger and interval column aggregates (to rule out
obvious issues related to data types).
Sample cases:
create table mytable(id int, name text);
insert into mytable values(1, 'A');
insert into mytable values(1, 'A');
insert into mytable values(5, 'B');
insert into mytable values(3, 'A');
insert into mytable values(1, 'A');
select avg(distinct id) over (partition by name) from mytable;
avg
--------------------
2.0000000000000000
2.0000000000000000
2.0000000000000000
2.0000000000000000
5.0000000000000000
select avg(id) over (partition by name) from mytable;
avg
--------------------
1.5000000000000000
1.5000000000000000
1.5000000000000000
1.5000000000000000
5.0000000000000000
select avg(distinct id) over () from mytable;
avg
--------------------
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
select avg(distinct id) from mytable;
avg
--------------------
3.0000000000000000
This is my first-time contribution. Please let me know if anything can be
improved as I`m eager to learn.
Regards,
Ankit Kumar Pandey
Attachments:
v1-0001-Implement-distinct-in-Window-Aggregates-take-two.patchtext/x-patch; charset=US-ASCII; name=v1-0001-Implement-distinct-in-Window-Aggregates-take-two.patchDownload
From d7b5face40794f93c6134aedb57c85cf0f077780 Mon Sep 17 00:00:00 2001
From: Ankit Kumar Pandey <itsankitkp@gmail.com>
Date: Wed, 23 Nov 2022 00:38:01 +0530
Subject: [PATCH] 3WIP commit
allow distinct in windows function
---
src/backend/executor/nodeWindowAgg.c | 211 +++++++++++++++++++++++----
src/backend/optimizer/util/clauses.c | 2 +
src/backend/parser/parse_agg.c | 45 ++++++
src/backend/parser/parse_func.c | 20 +--
src/include/nodes/execnodes.h | 1 +
src/include/nodes/primnodes.h | 2 +
src/test/regress/expected/window.out | 16 ++
src/test/regress/sql/window.sql | 3 +
8 files changed, 260 insertions(+), 40 deletions(-)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 310ac23e3a..b396f19e05 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -155,6 +155,13 @@ typedef struct WindowStatePerAggData
int64 transValueCount; /* number of currently-aggregated rows */
+ Datum lastdatum; /* used for single-column DISTINCT */
+ FmgrInfo equalfnOne; /* single-column comparisons*/
+ Oid *eq_ops;
+ Oid *sort_ops;
+
+ bool sort_in;
+
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
@@ -164,7 +171,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate);
+ WindowStatePerAgg peraggstate, Datum value, bool isNull);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
@@ -174,6 +181,9 @@ static void finalize_windowaggregate(WindowAggState *winstate,
Datum *result, bool *isnull);
static void eval_windowaggregates(WindowAggState *winstate);
+static void process_ordered_aggregate_single(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
static void eval_windowfunction(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
Datum *result, bool *isnull);
@@ -231,6 +241,7 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
+ peraggstate->lastdatum = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -241,43 +252,21 @@ initialize_windowaggregate(WindowAggState *winstate,
static void
advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate)
+ WindowStatePerAgg peraggstate, Datum value, bool isNull)
{
LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
- WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
Datum newVal;
- ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
- ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
- /* Skip anything FILTERed out */
- if (filter)
- {
- bool isnull;
- Datum res = ExecEvalExpr(filter, econtext, &isnull);
-
- if (isnull || !DatumGetBool(res))
- {
- MemoryContextSwitchTo(oldContext);
- return;
- }
- }
-
/* We start from 1, since the 0th arg will be the transition value */
- i = 1;
- foreach(arg, wfuncstate->args)
- {
- ExprState *argstate = (ExprState *) lfirst(arg);
- fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
- &fcinfo->args[i].isnull);
- i++;
- }
+ fcinfo->args[1].value = value;
+ fcinfo->args[1].isnull = isNull;
if (peraggstate->transfn.fn_strict)
{
@@ -588,6 +577,10 @@ finalize_windowaggregate(WindowAggState *winstate,
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+ /* Run transition function for distinct agg */
+ if (perfuncstate->wfunc->aggdistinct)
+ process_ordered_aggregate_single(winstate, perfuncstate, peraggstate);
+
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
@@ -674,6 +667,16 @@ eval_windowaggregates(WindowAggState *winstate)
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
+ ExprState *filter;
+ bool isnull;
+ WindowFuncExprState *wfuncstate;
+ ListCell *arg;
+ Datum tuple;
+ ExprContext *aggecontext;
+ ListCell *lc;
+ Oid inputTypes[FUNC_MAX_ARGS];
+ WindowStatePerFunc perfuncstate;
+
numaggs = winstate->numaggs;
if (numaggs == 0)
return; /* nothing to do */
@@ -901,6 +904,22 @@ eval_windowaggregates(WindowAggState *winstate)
}
}
+ perfuncstate = &winstate->perfunc[wfuncno];
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ i = 0;
+ foreach(lc, perfuncstate->wfunc->args)
+ {
+ inputTypes[i++] = exprType((Node *) lfirst(lc));
+ }
+ winstate->sortstates =
+ tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ perfuncstate->wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -935,7 +954,8 @@ eval_windowaggregates(WindowAggState *winstate)
{
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
- break; /* must be end of partition */
+ break; /* must be end of partition */
+
}
/*
@@ -943,14 +963,16 @@ eval_windowaggregates(WindowAggState *winstate)
* current row is not in frame but there might be more in the frame.
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
+
if (ret < 0)
break;
+
if (ret == 0)
goto next_tuple;
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-
+
/* Accumulate row into the aggregates */
for (i = 0; i < numaggs; i++)
{
@@ -962,9 +984,46 @@ eval_windowaggregates(WindowAggState *winstate)
continue;
wfuncno = peraggstate->wfuncno;
- advance_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ perfuncstate = &winstate->perfunc[wfuncno];
+
+ aggecontext = winstate->tmpcontext;
+
+ wfuncstate = perfuncstate->wfuncstate;
+ filter = wfuncstate->aggfilter;
+
+ oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
+ /* Skip anything FILTERed out for aggregates */
+ if (perfuncstate->plain_agg && wfuncstate->aggfilter)
+ {
+ Datum res = ExecEvalExpr(filter, aggecontext, &isnull);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ }
+
+
+ foreach(arg, wfuncstate->args)
+ {
+
+ ExprState *argstate = (ExprState *) lfirst(arg);
+ tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
+
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ tuplesort_putdatum(winstate->sortstates, tuple, isnull);
+ peraggstate->sort_in = true;
+ }
+ else
+ {
+ advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
+ peraggstate, tuple, isnull);
+ }
+
+ }
+ MemoryContextSwitchTo(oldContext);
}
next_tuple:
@@ -1020,6 +1079,67 @@ next_tuple:
}
}
+
+static void
+process_ordered_aggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ Datum newVal;
+ bool isNull;
+ MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
+ MemoryContext oldContext;
+ Datum oldVal = (Datum) 0;
+ bool oldIsNull = true;
+ bool haveOldVal = false;
+
+ if (peraggstate->sort_in){
+ tuplesort_performsort(winstate->sortstates);
+
+
+ while (tuplesort_getdatum(winstate->sortstates,
+ true, false, &newVal, &isNull, NULL))
+ {
+ MemoryContextReset(workcontext);
+ oldContext = MemoryContextSwitchTo(workcontext);
+
+ if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
+ perfuncstate->winCollation,
+ oldVal, newVal)))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ else
+ {
+
+ advance_windowaggregate(winstate, perfuncstate,
+ peraggstate, newVal, isNull);
+ }
+ MemoryContextSwitchTo(oldContext);
+
+ if (!peraggstate->resulttypeByVal)
+ {
+ if (!oldIsNull && false)
+ pfree(DatumGetPointer(oldVal));
+ if (!isNull)
+ oldVal = datumCopy(newVal, true,
+ peraggstate->resulttypeLen);
+ }
+ else
+ oldVal = newVal;
+
+ oldIsNull = isNull;
+ haveOldVal = true;
+ oldVal = newVal;
+ }
+
+ }
+ tuplesort_end(winstate->sortstates);
+ peraggstate->sort_in = false;
+
+}
+
+
/*
* eval_windowfunction
*
@@ -2963,6 +3083,9 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
+ get_typlenbyval(wfunc->wintype,
+ &peraggstate->inputtypeLen,
+ &peraggstate->inputtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
@@ -3030,6 +3153,32 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
else
peraggstate->aggcontext = winstate->aggcontext;
+ /* Handle distinct operation in agg */
+ if (wfunc->aggdistinct)
+ {
+ int numDistinctCols = list_length(wfunc->distinctargs);
+ peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
+ peraggstate->sort_ops = palloc(numDistinctCols * sizeof(Oid));
+ winstate->sortstates = (Tuplesortstate *)
+ palloc0(sizeof(Tuplesortstate *) * 1);
+
+ /* Initialize tuplesort to handle distinct operation */
+
+ i=0;
+ foreach(lc, wfunc->distinctargs)
+ {
+ peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
+ peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
+ i++;
+ }
+ fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
+ winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index da258968b8..e6302966a6 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2482,6 +2482,8 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
+ newexpr->aggdistinct = expr->aggdistinct;
+ newexpr->distinctargs = expr->distinctargs;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 85cd47b7ae..7516fc054f 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1047,6 +1047,51 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
}
}
+ if (wfunc->aggdistinct){
+ List *argtypes = NIL;
+ List *tlist = NIL;
+ List *torder = NIL;
+ List *tdistinct = NIL;
+ AttrNumber attno = 1;
+ ListCell *lc;
+
+ foreach(lc, wfunc->args)
+ {
+ Expr *arg = (Expr *) lfirst(lc);
+ TargetEntry *tle;
+
+ /* We don't bother to assign column names to the entries */
+ tle = makeTargetEntry(arg, attno++, NULL, false);
+ tlist = lappend(tlist, tle);
+ }
+ torder = transformSortClause(pstate,
+ NIL,
+ &tlist,
+ EXPR_KIND_ORDER_BY,
+ true /* force SQL99 rules */ );
+
+ tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
+
+ foreach(lc, tdistinct)
+ {
+ SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
+
+ if (!OidIsValid(sortcl->sortop))
+ {
+ Node *expr = get_sortgroupclause_expr(sortcl, tlist);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("could not identify an ordering operator for type %s",
+ format_type_be(exprType(expr))),
+ errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
+ parser_errposition(pstate, exprLocation(expr))));
+ }
+ }
+ wfunc->distinctargs = tdistinct;
+ }
+
+
pstate->p_hasWindowFuncs = true;
}
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index b3f0b6a137..d9536dcbb2 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,15 +835,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->aggfilter = agg_filter;
wfunc->location = location;
-
- /*
- * agg_star is allowed for aggregate functions but distinct isn't
- */
- if (agg_distinct)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for window functions"),
- parser_errposition(pstate, location)));
+ wfunc->aggdistinct = agg_distinct;
/*
* Reject attempt to call a parameterless aggregate without (*)
@@ -856,6 +848,16 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
NameListToString(funcname)),
parser_errposition(pstate, location)));
+ /*
+ * Distinct is not implemented for aggregates with filter
+ */
+
+ if (agg_distinct && agg_filter)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DISTINCT is not implemented for aggregate functions with FILTER"),
+ parser_errposition(pstate, location)));
+
/*
* ordered aggs not allowed in windows yet
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index cb714f4a19..e067e426fb 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2545,6 +2545,7 @@ typedef struct WindowAggState
* date for current row */
bool grouptail_valid; /* true if grouptailpos is known up to
* date for current row */
+ Tuplesortstate *sortstates;
TupleTableSlot *first_part_slot; /* first tuple of current or next
* partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 60d72a876b..89f4e67ff3 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -558,6 +558,8 @@ typedef struct WindowFunc
/* true if argument list was really '*' */
bool winstar pg_node_attr(query_jumble_ignore);
/* is function a simple aggregate? */
+ bool aggdistinct; /* do we need distinct values for aggregation? */
+ List *distinctargs;
bool winagg pg_node_attr(query_jumble_ignore);
/* token location, or -1 if unknown */
int location;
diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out
index 69a38df10b..cc2f593043 100644
--- a/src/test/regress/expected/window.out
+++ b/src/test/regress/expected/window.out
@@ -1775,6 +1775,22 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
5000 | 4500 | 4200 | 01-01-2008
(10 rows)
+-- with DISTINCT in agg functionn
+select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
+ depname | count
+-----------+-------
+ develop | 4
+ develop | 4
+ develop | 4
+ develop | 4
+ develop | 4
+ personnel | 2
+ personnel | 2
+ sales | 3
+ sales | 3
+ sales | 3
+(10 rows)
+
-- RANGE offset PRECEDING/FOLLOWING with null values
select x, y,
first_value(y) over w,
diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql
index 9113a92ae0..7e0e03d494 100644
--- a/src/test/regress/sql/window.sql
+++ b/src/test/regress/sql/window.sql
@@ -434,6 +434,9 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
exclude current row),
salary, enroll_date from empsalary;
+-- with DISTINCT in agg functionn
+select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
+
-- RANGE offset PRECEDING/FOLLOWING with null values
select x, y,
first_value(y) over w,
--
2.37.2
Hi,
I went through the Cfbot, and some of the test cases are failing for
this patch. It seems like some tests are crashing:
https://api.cirrus-ci.com/v1/artifact/task/6291153444667392/crashlog/crashlog-postgres.exe_03b0_2023-11-07_10-41-39-624.txt
[10:46:56.546] Summary of Failures:
[10:46:56.546]
[10:46:56.546] 87/270 postgresql:postgres_fdw / postgres_fdw/regress
ERROR 11.10s exit status 1
[10:46:56.546] 82/270 postgresql:regress / regress/regress ERROR
248.55s exit status 1
[10:46:56.546] 99/270 postgresql:recovery /
recovery/027_stream_regress ERROR 161.40s exit status 29
[10:46:56.546] 98/270 postgresql:pg_upgrade /
pg_upgrade/002_pg_upgrade ERROR 253.31s exit status 29
link of tests failing:
https://cirrus-ci.com/task/6642997165555712
https://cirrus-ci.com/task/4602303584403456
https://cirrus-ci.com/task/5728203491246080
https://cirrus-ci.com/task/5165253537824768?logs=test_world#L511
https://cirrus-ci.com/task/6291153444667392
Thanks
Shlok Kumar Kyal
On Wed, 8 Nov 2023 at 11:46, Shlok Kyal <shlok.kyal.oss@gmail.com> wrote:
Hi,
I went through the Cfbot, and some of the test cases are failing for
this patch. It seems like some tests are crashing:
https://api.cirrus-ci.com/v1/artifact/task/6291153444667392/crashlog/crashlog-postgres.exe_03b0_2023-11-07_10-41-39-624.txt[10:46:56.546] Summary of Failures:
[10:46:56.546]
[10:46:56.546] 87/270 postgresql:postgres_fdw / postgres_fdw/regress
ERROR 11.10s exit status 1
[10:46:56.546] 82/270 postgresql:regress / regress/regress ERROR
248.55s exit status 1
[10:46:56.546] 99/270 postgresql:recovery /
recovery/027_stream_regress ERROR 161.40s exit status 29
[10:46:56.546] 98/270 postgresql:pg_upgrade /
pg_upgrade/002_pg_upgrade ERROR 253.31s exit status 29link of tests failing:
https://cirrus-ci.com/task/6642997165555712
https://cirrus-ci.com/task/4602303584403456
https://cirrus-ci.com/task/5728203491246080
https://cirrus-ci.com/task/5165253537824768?logs=test_world#L511
https://cirrus-ci.com/task/6291153444667392
The patch which you submitted has been awaiting your attention for
quite some time now. As such, we have moved it to "Returned with
Feedback" and removed it from the reviewing queue. Depending on
timing, this may be reversible. Kindly address the feedback you have
received, and resubmit the patch to the next CommitFest.
Regards,
Vignesh