[PoC] Implementation of distinct in Window Aggregates
Hi,
This is a PoC patch which implements distinct operation in window
aggregates (without order by and for single column aggregation, final
version may vary wrt these limitations). Purpose of this PoC is to get
feedback on the approach used and corresponding implementation, any
nitpicking as deemed reasonable.
Distinct operation is mirrored from implementation in nodeAgg. Existing
partitioning logic determines if row is in partition and when distinct is
required, all tuples for the aggregate column are stored in tuplesort. When
finalize_windowaggregate gets called, tuples are sorted and duplicates are
removed, followed by calling the transition function on each tuple.
When distinct is not required, the above process is skipped and the
transition function gets called directly and nothing gets inserted into
tuplesort.
Note: For each partition, in tuplesort_begin and tuplesort_end is involved
to rinse tuplesort, so at any time, max tuples in tuplesort is equal to
tuples in a particular partition.
I have verified it for interger and interval column aggregates (to rule out
obvious issues related to data types).
Sample cases:
create table mytable(id int, name text);
insert into mytable values(1, 'A');
insert into mytable values(1, 'A');
insert into mytable values(5, 'B');
insert into mytable values(3, 'A');
insert into mytable values(1, 'A');
select avg(distinct id) over (partition by name) from mytable;
avg
--------------------
2.0000000000000000
2.0000000000000000
2.0000000000000000
2.0000000000000000
5.0000000000000000
select avg(id) over (partition by name) from mytable;
avg
--------------------
1.5000000000000000
1.5000000000000000
1.5000000000000000
1.5000000000000000
5.0000000000000000
select avg(distinct id) over () from mytable;
avg
--------------------
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
select avg(distinct id) from mytable;
avg
--------------------
3.0000000000000000
This is my first-time contribution. Please let me know if anything can be
improved as I`m eager to learn.
Regards,
Ankit Kumar Pandey
Attachments:
v1-0001-Implement-distinct-in-Window-Aggregates.patchtext/x-patch; charset=US-ASCII; name=v1-0001-Implement-distinct-in-Window-Aggregates.patchDownload
From 2a3061a95988c39f4654accc06205099713af6cc Mon Sep 17 00:00:00 2001
From: Ankit Kumar Pandey <itsankitkp@gmail.com>
Date: Wed, 23 Nov 2022 00:38:01 +0530
Subject: [PATCH] Implement distinct in Window Aggregates.
---
src/backend/executor/nodeWindowAgg.c | 229 +++++++++++++++++++++++----
src/backend/optimizer/util/clauses.c | 2 +
src/backend/parser/parse_agg.c | 45 ++++++
src/backend/parser/parse_func.c | 19 +--
src/include/nodes/execnodes.h | 1 +
src/include/nodes/primnodes.h | 2 +
6 files changed, 258 insertions(+), 40 deletions(-)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 4f4aeb2883..1d67ba2c39 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -154,6 +154,14 @@ typedef struct WindowStatePerAggData
int64 transValueCount; /* number of currently-aggregated rows */
+ /* For DISTINCT in Aggregates */
+ Datum lastdatum; /* used for single-column DISTINCT */
+ FmgrInfo equalfnOne; /* single-column comparisons*/
+
+ Oid *eq_ops; /* used for equality check in DISTINCT */
+ Oid *sort_ops; /* used for sorting distinct columns */
+ bool sort_in; /* FLAG set true if data is stored in tuplesort */
+
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
@@ -163,7 +171,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate);
+ WindowStatePerAgg peraggstate, Datum value, bool isNull);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
@@ -173,6 +181,9 @@ static void finalize_windowaggregate(WindowAggState *winstate,
Datum *result, bool *isnull);
static void eval_windowaggregates(WindowAggState *winstate);
+static void process_ordered_windowaggregate_single(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
static void eval_windowfunction(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
Datum *result, bool *isnull);
@@ -230,6 +241,7 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
+ peraggstate->lastdatum = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -240,43 +252,21 @@ initialize_windowaggregate(WindowAggState *winstate,
static void
advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate)
+ WindowStatePerAgg peraggstate, Datum value, bool isNull)
{
LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
- WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
Datum newVal;
- ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
- ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
- /* Skip anything FILTERed out */
- if (filter)
- {
- bool isnull;
- Datum res = ExecEvalExpr(filter, econtext, &isnull);
-
- if (isnull || !DatumGetBool(res))
- {
- MemoryContextSwitchTo(oldContext);
- return;
- }
- }
-
/* We start from 1, since the 0th arg will be the transition value */
- i = 1;
- foreach(arg, wfuncstate->args)
- {
- ExprState *argstate = (ExprState *) lfirst(arg);
- fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
- &fcinfo->args[i].isnull);
- i++;
- }
+ fcinfo->args[1].value = value;
+ fcinfo->args[1].isnull = isNull;
if (peraggstate->transfn.fn_strict)
{
@@ -585,6 +575,10 @@ finalize_windowaggregate(WindowAggState *winstate,
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+ /* Run transition function for distinct agg */
+ if (perfuncstate->wfunc->aggdistinct)
+ process_ordered_windowaggregate_single(winstate, perfuncstate, peraggstate);
+
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
@@ -666,6 +660,16 @@ eval_windowaggregates(WindowAggState *winstate)
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
+ ExprState *filter;
+ bool isnull;
+ WindowFuncExprState *wfuncstate;
+ ListCell *arg;
+ Datum tuple;
+ ExprContext *aggecontext;
+ ListCell *lc;
+ Oid inputTypes[FUNC_MAX_ARGS];
+ WindowStatePerFunc perfuncstate;
+
numaggs = winstate->numaggs;
if (numaggs == 0)
return; /* nothing to do */
@@ -893,6 +897,23 @@ eval_windowaggregates(WindowAggState *winstate)
}
}
+ perfuncstate = &winstate->perfunc[wfuncno];
+ /* Initialize tuplesort for new partition */
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ i = 0;
+ foreach(lc, perfuncstate->wfunc->args)
+ {
+ inputTypes[i++] = exprType((Node *) lfirst(lc));
+ }
+ winstate->sortstates =
+ tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ perfuncstate->wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -927,7 +948,8 @@ eval_windowaggregates(WindowAggState *winstate)
{
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
- break; /* must be end of partition */
+ break; /* must be end of partition */
+
}
/*
@@ -935,14 +957,16 @@ eval_windowaggregates(WindowAggState *winstate)
* current row is not in frame but there might be more in the frame.
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
+
if (ret < 0)
break;
+
if (ret == 0)
goto next_tuple;
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-
+
/* Accumulate row into the aggregates */
for (i = 0; i < numaggs; i++)
{
@@ -954,9 +978,51 @@ eval_windowaggregates(WindowAggState *winstate)
continue;
wfuncno = peraggstate->wfuncno;
- advance_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ perfuncstate = &winstate->perfunc[wfuncno];
+
+ aggecontext = winstate->tmpcontext;
+
+ wfuncstate = perfuncstate->wfuncstate;
+ filter = wfuncstate->aggfilter;
+
+ oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
+
+ /* Skip anything FILTERed out for aggregates */
+ if (perfuncstate->plain_agg && wfuncstate->aggfilter)
+ {
+ Datum res = ExecEvalExpr(filter, aggecontext, &isnull);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ }
+
+ /* Fetch tuple and either put them in tuplesort for removal
+ * of duplicates and running partition later or run transition
+ * function right away
+ */
+ foreach(arg, wfuncstate->args)
+ {
+
+ ExprState *argstate = (ExprState *) lfirst(arg);
+ tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
+
+ /* Store in tuplestore */
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ tuplesort_putdatum(winstate->sortstates, tuple, isnull);
+ peraggstate->sort_in = true;
+ }
+ else
+ {
+ advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
+ peraggstate, tuple, isnull);
+ }
+
+ }
+ MemoryContextSwitchTo(oldContext);
}
next_tuple:
@@ -1012,6 +1078,75 @@ next_tuple:
}
}
+/*
+ * process_ordered_windowaggregate_single
+ * parallel to process_ordered_aggregate_single in nodeAgg.c
+ */
+static void
+process_ordered_windowaggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ Datum newVal;
+ bool isNull;
+ MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
+ MemoryContext oldContext;
+ Datum oldVal = (Datum) 0;
+ bool oldIsNull = true;
+ bool haveOldVal = false;
+
+ if (peraggstate->sort_in){
+ tuplesort_performsort(winstate->sortstates);
+
+ while (tuplesort_getdatum(winstate->sortstates,
+ true, false, &newVal, &isNull, NULL))
+ {
+ MemoryContextReset(workcontext);
+ oldContext = MemoryContextSwitchTo(workcontext);
+
+ /*
+ * Loop over all tuples in current partition
+ * and remove duplicates
+ */
+ if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
+ perfuncstate->winCollation,
+ oldVal, newVal)))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ else
+ {
+ /* Run transition function over each unique tuple */
+ advance_windowaggregate(winstate, perfuncstate,
+ peraggstate, newVal, isNull);
+ }
+ MemoryContextSwitchTo(oldContext);
+
+ if (!peraggstate->resulttypeByVal)
+ {
+ if (!oldIsNull && false)
+ pfree(DatumGetPointer(oldVal));
+ if (!isNull)
+ oldVal = datumCopy(newVal, true,
+ peraggstate->resulttypeLen);
+ }
+ else
+ oldVal = newVal;
+
+ oldIsNull = isNull;
+ haveOldVal = true;
+ oldVal = newVal;
+ }
+
+ }
+ // clear up tuplesort, next partition will
+ // use a new one
+ tuplesort_end(winstate->sortstates);
+ peraggstate->sort_in = false;
+
+}
+
+
/*
* eval_windowfunction
*
@@ -2947,6 +3082,9 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
+ get_typlenbyval(wfunc->wintype,
+ &peraggstate->inputtypeLen,
+ &peraggstate->inputtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
@@ -3014,6 +3152,35 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
else
peraggstate->aggcontext = winstate->aggcontext;
+ /* Handle distinct operation in agg */
+ if (wfunc->aggdistinct)
+ {
+ int numDistinctCols = list_length(wfunc->distinctargs);
+ peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
+ peraggstate->sort_ops = palloc(numDistinctCols * sizeof(Oid));
+ /* Use single tuplesort for all partitions by rinsing it again and again */
+ winstate->sortstates = (Tuplesortstate *)
+ palloc0(sizeof(Tuplesortstate *) * 1);
+
+ /* Initialize tuplesort operators namely sort operator to sort tuples
+ * before running equality op to remove/skip duplicates
+ */
+
+ i=0;
+ foreach(lc, wfunc->distinctargs)
+ {
+ peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
+ peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
+ i++;
+ }
+ fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
+ winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index bffc8112aa..1e2aa897df 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2443,6 +2443,8 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
+ newexpr->aggdistinct = expr->aggdistinct;
+ newexpr->distinctargs = expr->distinctargs;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 8eec2088aa..03f21b37bf 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1047,6 +1047,51 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
}
}
+ if (wfunc->aggdistinct){
+ List *argtypes = NIL;
+ List *tlist = NIL;
+ List *torder = NIL;
+ List *tdistinct = NIL;
+ AttrNumber attno = 1;
+ ListCell *lc;
+
+ foreach(lc, wfunc->args)
+ {
+ Expr *arg = (Expr *) lfirst(lc);
+ TargetEntry *tle;
+
+ /* We don't bother to assign column names to the entries */
+ tle = makeTargetEntry(arg, attno++, NULL, false);
+ tlist = lappend(tlist, tle);
+ }
+ torder = transformSortClause(pstate,
+ NIL,
+ &tlist,
+ EXPR_KIND_ORDER_BY,
+ true /* force SQL99 rules */ );
+
+ tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
+
+ foreach(lc, tdistinct)
+ {
+ SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
+
+ if (!OidIsValid(sortcl->sortop))
+ {
+ Node *expr = get_sortgroupclause_expr(sortcl, tlist);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("could not identify an ordering operator for type %s",
+ format_type_be(exprType(expr))),
+ errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
+ parser_errposition(pstate, exprLocation(expr))));
+ }
+ }
+ wfunc->distinctargs = tdistinct;
+ }
+
+
pstate->p_hasWindowFuncs = true;
}
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 827989f379..f536a1411a 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,15 +835,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->aggfilter = agg_filter;
wfunc->location = location;
-
- /*
- * agg_star is allowed for aggregate functions but distinct isn't
- */
- if (agg_distinct)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for window functions"),
- parser_errposition(pstate, location)));
+ wfunc->aggdistinct = agg_distinct;
/*
* Reject attempt to call a parameterless aggregate without (*)
@@ -856,6 +848,15 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
NameListToString(funcname)),
parser_errposition(pstate, location)));
+ /*
+ * Distinct is not implemented for aggregates with filter
+ */
+ if (agg_distinct && over->orderClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DISTINCT is not implemented for aggregate functions with ORDER BY"),
+ parser_errposition(pstate, location)));
+
/*
* ordered aggs not allowed in windows yet
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9a64a830a2..63188a9565 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2523,6 +2523,7 @@ typedef struct WindowAggState
* date for current row */
bool grouptail_valid; /* true if grouptailpos is known up to
* date for current row */
+ Tuplesortstate *sortstates;
TupleTableSlot *first_part_slot; /* first tuple of current or next
* partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 74f228d959..d7f84a40fd 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -495,6 +495,8 @@ typedef struct WindowFunc
Index winref; /* index of associated WindowClause */
bool winstar; /* true if argument list was really '*' */
bool winagg; /* is function a simple aggregate? */
+ bool aggdistinct; /* do we need distinct values for aggregation? */
+ List *distinctargs;
int location; /* token location, or -1 if unknown */
} WindowFunc;
--
2.37.2
On 24/12/22 18:22, Ankit Pandey wrote:
Hi,
This is a PoC patch which implements distinct operation in window
aggregates (without order by and for single column aggregation, final
version may vary wrt these limitations). Purpose of this PoC is to get
feedback on the approach used and corresponding implementation, any
nitpicking as deemed reasonable.Distinct operation is mirrored from implementation in nodeAgg.
Existing partitioning logic determines if row is in partition and when
distinct is required, all tuples for the aggregate column are stored
in tuplesort. When finalize_windowaggregate gets called, tuples are
sorted and duplicates are removed, followed by calling the transition
function on each tuple.
When distinct is not required, the above process is skipped and the
transition function gets called directly and nothing gets inserted
into tuplesort.
Note: For each partition, in tuplesort_begin and tuplesort_end is
involved to rinse tuplesort, so at any time, max tuples in tuplesort
is equal to tuples in a particular partition.I have verified it for interger and interval column aggregates (to
rule out obvious issues related to data types).Sample cases:
create table mytable(id int, name text);
insert into mytable values(1, 'A');
insert into mytable values(1, 'A');
insert into mytable values(5, 'B');
insert into mytable values(3, 'A');
insert into mytable values(1, 'A');select avg(distinct id) over (partition by name) from mytable;
avg
--------------------
2.0000000000000000
2.0000000000000000
2.0000000000000000
2.0000000000000000
5.0000000000000000select avg(id) over (partition by name) from mytable;
avg
--------------------
1.5000000000000000
1.5000000000000000
1.5000000000000000
1.5000000000000000
5.0000000000000000select avg(distinct id) over () from mytable;
avg
--------------------
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000select avg(distinct id) from mytable;
avg
--------------------
3.0000000000000000This is my first-time contribution. Please let me know if anything can be
improved as I`m eager to learn.Regards,
Ankit Kumar Pandey
Hi all,
I know everyone is busy with holidays (well, Happy Holidays!) but I will
be glad if someone can take a quick look at this PoC and share thoughts.
This is my first time contribution so I am pretty sure there will be
some very obvious feedbacks (which will help me to move forward with
this change).
--
Regards,
Ankit Kumar Pandey
On 29/12/22 20:58, Ankit Kumar Pandey wrote:
On 24/12/22 18:22, Ankit Pandey wrote:
Hi,
This is a PoC patch which implements distinct operation in window
aggregates (without order by and for single column aggregation, final
version may vary wrt these limitations). Purpose of this PoC is to
get feedback on the approach used and corresponding implementation,
any nitpicking as deemed reasonable.Distinct operation is mirrored from implementation in nodeAgg.
Existing partitioning logic determines if row is in partition and
when distinct is required, all tuples for the aggregate column are
stored in tuplesort. When finalize_windowaggregate gets called,
tuples are sorted and duplicates are removed, followed by calling the
transition function on each tuple.
When distinct is not required, the above process is skipped and the
transition function gets called directly and nothing gets inserted
into tuplesort.
Note: For each partition, in tuplesort_begin and tuplesort_end is
involved to rinse tuplesort, so at any time, max tuples in tuplesort
is equal to tuples in a particular partition.I have verified it for interger and interval column aggregates (to
rule out obvious issues related to data types).Sample cases:
create table mytable(id int, name text);
insert into mytable values(1, 'A');
insert into mytable values(1, 'A');
insert into mytable values(5, 'B');
insert into mytable values(3, 'A');
insert into mytable values(1, 'A');select avg(distinct id) over (partition by name) from mytable;
avg
--------------------
2.0000000000000000
2.0000000000000000
2.0000000000000000
2.0000000000000000
5.0000000000000000select avg(id) over (partition by name) from mytable;
avg
--------------------
1.5000000000000000
1.5000000000000000
1.5000000000000000
1.5000000000000000
5.0000000000000000select avg(distinct id) over () from mytable;
avg
--------------------
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000select avg(distinct id) from mytable;
avg
--------------------
3.0000000000000000This is my first-time contribution. Please let me know if anything
can be
improved as I`m eager to learn.Regards,
Ankit Kumar PandeyHi all,
I know everyone is busy with holidays (well, Happy Holidays!) but I
will be glad if someone can take a quick look at this PoC and share
thoughts.This is my first time contribution so I am pretty sure there will be
some very obvious feedbacks (which will help me to move forward with
this change).
Updated patch with latest master. Last patch was an year old.
--
Regards,
Ankit Kumar Pandey
Attachments:
v1-0002-Implement-distinct-in-Window-Aggregates.patchtext/x-patch; charset=UTF-8; name=v1-0002-Implement-distinct-in-Window-Aggregates.patchDownload
From cf56d545bb837fc8f1a7630ea4417680256eddd4 Mon Sep 17 00:00:00 2001
From: Ankit Kumar Pandey <itsankitkp@gmail.com>
Date: Wed, 23 Nov 2022 00:38:01 +0530
Subject: [PATCH] Implement distinct in Window Aggregates.
---
src/backend/executor/nodeWindowAgg.c | 227 +++++++++++++++++++++++----
src/backend/optimizer/util/clauses.c | 2 +
src/backend/parser/parse_agg.c | 45 ++++++
src/backend/parser/parse_func.c | 19 +--
src/include/nodes/execnodes.h | 1 +
src/include/nodes/primnodes.h | 2 +
6 files changed, 257 insertions(+), 39 deletions(-)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index d61d57e9a8..2d685f3be4 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -154,6 +154,14 @@ typedef struct WindowStatePerAggData
int64 transValueCount; /* number of currently-aggregated rows */
+ /* For DISTINCT in Aggregates */
+ Datum lastdatum; /* used for single-column DISTINCT */
+ FmgrInfo equalfnOne; /* single-column comparisons*/
+
+ Oid *eq_ops; /* used for equality check in DISTINCT */
+ Oid *sort_ops; /* used for sorting distinct columns */
+ bool sort_in; /* FLAG set true if data is stored in tuplesort */
+
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
@@ -163,7 +171,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate);
+ WindowStatePerAgg peraggstate, Datum value, bool isNull);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
@@ -173,6 +181,9 @@ static void finalize_windowaggregate(WindowAggState *winstate,
Datum *result, bool *isnull);
static void eval_windowaggregates(WindowAggState *winstate);
+static void process_ordered_windowaggregate_single(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
static void eval_windowfunction(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
Datum *result, bool *isnull);
@@ -230,6 +241,7 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
+ peraggstate->lastdatum = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -240,43 +252,21 @@ initialize_windowaggregate(WindowAggState *winstate,
static void
advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate)
+ WindowStatePerAgg peraggstate, Datum value, bool isNull)
{
LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
- WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
Datum newVal;
- ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
- ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
- /* Skip anything FILTERed out */
- if (filter)
- {
- bool isnull;
- Datum res = ExecEvalExpr(filter, econtext, &isnull);
-
- if (isnull || !DatumGetBool(res))
- {
- MemoryContextSwitchTo(oldContext);
- return;
- }
- }
-
/* We start from 1, since the 0th arg will be the transition value */
- i = 1;
- foreach(arg, wfuncstate->args)
- {
- ExprState *argstate = (ExprState *) lfirst(arg);
- fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
- &fcinfo->args[i].isnull);
- i++;
- }
+ fcinfo->args[1].value = value;
+ fcinfo->args[1].isnull = isNull;
if (peraggstate->transfn.fn_strict)
{
@@ -585,6 +575,10 @@ finalize_windowaggregate(WindowAggState *winstate,
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+ /* Run transition function for distinct agg */
+ if (perfuncstate->wfunc->aggdistinct)
+ process_ordered_windowaggregate_single(winstate, perfuncstate, peraggstate);
+
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
@@ -666,6 +660,16 @@ eval_windowaggregates(WindowAggState *winstate)
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
+ ExprState *filter;
+ bool isnull;
+ WindowFuncExprState *wfuncstate;
+ ListCell *arg;
+ Datum tuple;
+ ExprContext *aggecontext;
+ ListCell *lc;
+ Oid inputTypes[FUNC_MAX_ARGS];
+ WindowStatePerFunc perfuncstate;
+
numaggs = winstate->numaggs;
if (numaggs == 0)
return; /* nothing to do */
@@ -893,6 +897,23 @@ eval_windowaggregates(WindowAggState *winstate)
}
}
+ perfuncstate = &winstate->perfunc[wfuncno];
+ /* Initialize tuplesort for new partition */
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ i = 0;
+ foreach(lc, perfuncstate->wfunc->args)
+ {
+ inputTypes[i++] = exprType((Node *) lfirst(lc));
+ }
+ winstate->sortstates =
+ tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ perfuncstate->wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -928,6 +949,7 @@ eval_windowaggregates(WindowAggState *winstate)
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
break; /* must be end of partition */
+
}
/*
@@ -935,14 +957,16 @@ eval_windowaggregates(WindowAggState *winstate)
* current row is not in frame but there might be more in the frame.
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
+
if (ret < 0)
break;
+
if (ret == 0)
goto next_tuple;
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-
+
/* Accumulate row into the aggregates */
for (i = 0; i < numaggs; i++)
{
@@ -954,9 +978,51 @@ eval_windowaggregates(WindowAggState *winstate)
continue;
wfuncno = peraggstate->wfuncno;
- advance_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ perfuncstate = &winstate->perfunc[wfuncno];
+
+ aggecontext = winstate->tmpcontext;
+
+ wfuncstate = perfuncstate->wfuncstate;
+ filter = wfuncstate->aggfilter;
+
+ oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
+
+ /* Skip anything FILTERed out for aggregates */
+ if (perfuncstate->plain_agg && wfuncstate->aggfilter)
+ {
+ Datum res = ExecEvalExpr(filter, aggecontext, &isnull);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ }
+
+ /* Fetch tuple and either put them in tuplesort for removal
+ * of duplicates and running partition later or run transition
+ * function right away
+ */
+ foreach(arg, wfuncstate->args)
+ {
+
+ ExprState *argstate = (ExprState *) lfirst(arg);
+ tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
+
+ /* Store in tuplestore */
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ tuplesort_putdatum(winstate->sortstates, tuple, isnull);
+ peraggstate->sort_in = true;
+ }
+ else
+ {
+ advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
+ peraggstate, tuple, isnull);
+ }
+
+ }
+ MemoryContextSwitchTo(oldContext);
}
next_tuple:
@@ -1012,6 +1078,75 @@ next_tuple:
}
}
+/*
+ * process_ordered_windowaggregate_single
+ * parallel to process_ordered_aggregate_single in nodeAgg.c
+ */
+static void
+process_ordered_windowaggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ Datum newVal;
+ bool isNull;
+ MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
+ MemoryContext oldContext;
+ Datum oldVal = (Datum) 0;
+ bool oldIsNull = true;
+ bool haveOldVal = false;
+
+ if (peraggstate->sort_in){
+ tuplesort_performsort(winstate->sortstates);
+
+ while (tuplesort_getdatum(winstate->sortstates,
+ true, false, &newVal, &isNull, NULL))
+ {
+ MemoryContextReset(workcontext);
+ oldContext = MemoryContextSwitchTo(workcontext);
+
+ /*
+ * Loop over all tuples in current partition
+ * and remove duplicates
+ */
+ if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
+ perfuncstate->winCollation,
+ oldVal, newVal)))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ else
+ {
+ /* Run transition function over each unique tuple */
+ advance_windowaggregate(winstate, perfuncstate,
+ peraggstate, newVal, isNull);
+ }
+ MemoryContextSwitchTo(oldContext);
+
+ if (!peraggstate->resulttypeByVal)
+ {
+ if (!oldIsNull && false)
+ pfree(DatumGetPointer(oldVal));
+ if (!isNull)
+ oldVal = datumCopy(newVal, true,
+ peraggstate->resulttypeLen);
+ }
+ else
+ oldVal = newVal;
+
+ oldIsNull = isNull;
+ haveOldVal = true;
+ oldVal = newVal;
+ }
+
+ }
+ // clear up tuplesort, next partition will
+ // use a new one
+ tuplesort_end(winstate->sortstates);
+ peraggstate->sort_in = false;
+
+}
+
+
/*
* eval_windowfunction
*
@@ -2947,6 +3082,9 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
+ get_typlenbyval(wfunc->wintype,
+ &peraggstate->inputtypeLen,
+ &peraggstate->inputtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
@@ -3014,6 +3152,35 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
else
peraggstate->aggcontext = winstate->aggcontext;
+ /* Handle distinct operation in agg */
+ if (wfunc->aggdistinct)
+ {
+ int numDistinctCols = list_length(wfunc->distinctargs);
+ peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
+ peraggstate->sort_ops = palloc(numDistinctCols * sizeof(Oid));
+ /* Use single tuplesort for all partitions by rinsing it again and again */
+ winstate->sortstates = (Tuplesortstate *)
+ palloc0(sizeof(Tuplesortstate *) * 1);
+
+ /* Initialize tuplesort operators namely sort operator to sort tuples
+ * before running equality op to remove/skip duplicates
+ */
+
+ i=0;
+ foreach(lc, wfunc->distinctargs)
+ {
+ peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
+ peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
+ i++;
+ }
+ fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
+ winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index aa584848cf..2513dc2981 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2443,6 +2443,8 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
+ newexpr->aggdistinct = expr->aggdistinct;
+ newexpr->distinctargs = expr->distinctargs;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index f7a1046026..87e6ce1626 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1047,6 +1047,51 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
}
}
+ if (wfunc->aggdistinct){
+ List *argtypes = NIL;
+ List *tlist = NIL;
+ List *torder = NIL;
+ List *tdistinct = NIL;
+ AttrNumber attno = 1;
+ ListCell *lc;
+
+ foreach(lc, wfunc->args)
+ {
+ Expr *arg = (Expr *) lfirst(lc);
+ TargetEntry *tle;
+
+ /* We don't bother to assign column names to the entries */
+ tle = makeTargetEntry(arg, attno++, NULL, false);
+ tlist = lappend(tlist, tle);
+ }
+ torder = transformSortClause(pstate,
+ NIL,
+ &tlist,
+ EXPR_KIND_ORDER_BY,
+ true /* force SQL99 rules */ );
+
+ tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
+
+ foreach(lc, tdistinct)
+ {
+ SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
+
+ if (!OidIsValid(sortcl->sortop))
+ {
+ Node *expr = get_sortgroupclause_expr(sortcl, tlist);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("could not identify an ordering operator for type %s",
+ format_type_be(exprType(expr))),
+ errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
+ parser_errposition(pstate, exprLocation(expr))));
+ }
+ }
+ wfunc->distinctargs = tdistinct;
+ }
+
+
pstate->p_hasWindowFuncs = true;
}
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index ca14f06308..5b5dc9d938 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,15 +835,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->aggfilter = agg_filter;
wfunc->location = location;
-
- /*
- * agg_star is allowed for aggregate functions but distinct isn't
- */
- if (agg_distinct)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for window functions"),
- parser_errposition(pstate, location)));
+ wfunc->aggdistinct = agg_distinct;
/*
* Reject attempt to call a parameterless aggregate without (*)
@@ -856,6 +848,15 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
NameListToString(funcname)),
parser_errposition(pstate, location)));
+ /*
+ * Distinct is not implemented for aggregates with filter
+ */
+ if (agg_distinct && over->orderClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DISTINCT is not implemented for aggregate functions with ORDER BY"),
+ parser_errposition(pstate, location)));
+
/*
* ordered aggs not allowed in windows yet
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 2cd0a4f472..dfcadc8693 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2523,6 +2523,7 @@ typedef struct WindowAggState
* date for current row */
bool grouptail_valid; /* true if grouptailpos is known up to
* date for current row */
+ Tuplesortstate *sortstates;
TupleTableSlot *first_part_slot; /* first tuple of current or next
* partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 83e40e56d3..2991f04d7f 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -495,6 +495,8 @@ typedef struct WindowFunc
Index winref; /* index of associated WindowClause */
bool winstar; /* true if argument list was really '*' */
bool winagg; /* is function a simple aggregate? */
+ bool aggdistinct; /* do we need distinct values for aggregation? */
+ List *distinctargs;
int location; /* token location, or -1 if unknown */
} WindowFunc;
--
2.37.2
On 04/01/23 18:10, Ankit Kumar Pandey wrote:
On 29/12/22 20:58, Ankit Kumar Pandey wrote:
On 24/12/22 18:22, Ankit Pandey wrote:
Hi,
This is a PoC patch which implements distinct operation in window
aggregates (without order by and for single column aggregation, final
version may vary wrt these limitations). Purpose of this PoC is to
get feedback on the approach used and corresponding implementation,
any nitpicking as deemed reasonable.Distinct operation is mirrored from implementation in nodeAgg.
Existing partitioning logic determines if row is in partition and
when distinct is required, all tuples for the aggregate column are
stored in tuplesort. When finalize_windowaggregate gets called,
tuples are sorted and duplicates are removed, followed by calling the
transition function on each tuple.
When distinct is not required, the above process is skipped and the
transition function gets called directly and nothing gets inserted
into tuplesort.
Note: For each partition, in tuplesort_begin and tuplesort_end is
involved to rinse tuplesort, so at any time, max tuples in tuplesort
is equal to tuples in a particular partition.I have verified it for interger and interval column aggregates (to
rule out obvious issues related to data types).Sample cases:
create table mytable(id int, name text);
insert into mytable values(1, 'A');
insert into mytable values(1, 'A');
insert into mytable values(5, 'B');
insert into mytable values(3, 'A');
insert into mytable values(1, 'A');select avg(distinct id) over (partition by name) from mytable;
avg
--------------------
2.0000000000000000
2.0000000000000000
2.0000000000000000
2.0000000000000000
5.0000000000000000select avg(id) over (partition by name) from mytable;
avg
--------------------
1.5000000000000000
1.5000000000000000
1.5000000000000000
1.5000000000000000
5.0000000000000000select avg(distinct id) over () from mytable;
avg
--------------------
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000select avg(distinct id) from mytable;
avg
--------------------
3.0000000000000000This is my first-time contribution. Please let me know if anything
can be
improved as I`m eager to learn.Regards,
Ankit Kumar PandeyHi all,
I know everyone is busy with holidays (well, Happy Holidays!) but I
will be glad if someone can take a quick look at this PoC and share
thoughts.This is my first time contribution so I am pretty sure there will be
some very obvious feedbacks (which will help me to move forward with
this change).Updated patch with latest master. Last patch was an year old.
Attaching patch with rebase from latest HEAD
Thanks,
Ankit
Attachments:
distinct_windows.patchtext/x-patch; charset=UTF-8; name=distinct_windows.patchDownload
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 9240c691c1..7c07fb0684 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -155,13 +155,6 @@ typedef struct WindowStatePerAggData
int64 transValueCount; /* number of currently-aggregated rows */
- Datum lastdatum; /* used for single-column DISTINCT */
- FmgrInfo equalfnOne; /* single-column comparisons*/
- Oid *eq_ops;
- Oid *sort_ops;
-
- bool sort_in;
-
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
@@ -171,7 +164,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate, Datum value, bool isNull);
+ WindowStatePerAgg peraggstate);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
@@ -181,9 +174,6 @@ static void finalize_windowaggregate(WindowAggState *winstate,
Datum *result, bool *isnull);
static void eval_windowaggregates(WindowAggState *winstate);
-static void process_ordered_aggregate_single(WindowAggState *winstate,
- WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate);
static void eval_windowfunction(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
Datum *result, bool *isnull);
@@ -241,7 +231,6 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
- peraggstate->lastdatum = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -252,21 +241,43 @@ initialize_windowaggregate(WindowAggState *winstate,
static void
advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate, Datum value, bool isNull)
+ WindowStatePerAgg peraggstate)
{
LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
+ WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
Datum newVal;
+ ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
+ ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+ /* Skip anything FILTERed out */
+ if (filter)
+ {
+ bool isnull;
+ Datum res = ExecEvalExpr(filter, econtext, &isnull);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ return;
+ }
+ }
+
/* We start from 1, since the 0th arg will be the transition value */
+ i = 1;
+ foreach(arg, wfuncstate->args)
+ {
+ ExprState *argstate = (ExprState *) lfirst(arg);
- fcinfo->args[1].value = value;
- fcinfo->args[1].isnull = isNull;
+ fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
+ &fcinfo->args[i].isnull);
+ i++;
+ }
if (peraggstate->transfn.fn_strict)
{
@@ -575,10 +586,6 @@ finalize_windowaggregate(WindowAggState *winstate,
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
- /* Run transition function for distinct agg */
- if (perfuncstate->wfunc->aggdistinct)
- process_ordered_aggregate_single(winstate, perfuncstate, peraggstate);
-
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
@@ -660,16 +667,6 @@ eval_windowaggregates(WindowAggState *winstate)
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
- ExprState *filter;
- bool isnull;
- WindowFuncExprState *wfuncstate;
- ListCell *arg;
- Datum tuple;
- ExprContext *aggecontext;
- ListCell *lc;
- Oid inputTypes[FUNC_MAX_ARGS];
- WindowStatePerFunc perfuncstate;
-
numaggs = winstate->numaggs;
if (numaggs == 0)
return; /* nothing to do */
@@ -897,22 +894,6 @@ eval_windowaggregates(WindowAggState *winstate)
}
}
- perfuncstate = &winstate->perfunc[wfuncno];
- if (perfuncstate->wfunc->aggdistinct)
- {
- i = 0;
- foreach(lc, perfuncstate->wfunc->args)
- {
- inputTypes[i++] = exprType((Node *) lfirst(lc));
- }
- winstate->sortstates =
- tuplesort_begin_datum(inputTypes[0],
- peraggstate->sort_ops[0],
- perfuncstate->wfunc->inputcollid,
- true,
- work_mem, NULL, TUPLESORT_NONE);
- }
-
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -947,8 +928,7 @@ eval_windowaggregates(WindowAggState *winstate)
{
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
- break; /* must be end of partition */
-
+ break; /* must be end of partition */
}
/*
@@ -956,16 +936,14 @@ eval_windowaggregates(WindowAggState *winstate)
* current row is not in frame but there might be more in the frame.
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
-
if (ret < 0)
break;
-
if (ret == 0)
goto next_tuple;
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-
+
/* Accumulate row into the aggregates */
for (i = 0; i < numaggs; i++)
{
@@ -977,46 +955,9 @@ eval_windowaggregates(WindowAggState *winstate)
continue;
wfuncno = peraggstate->wfuncno;
- perfuncstate = &winstate->perfunc[wfuncno];
-
- aggecontext = winstate->tmpcontext;
-
- wfuncstate = perfuncstate->wfuncstate;
- filter = wfuncstate->aggfilter;
-
- oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
- /* Skip anything FILTERed out for aggregates */
- if (perfuncstate->plain_agg && wfuncstate->aggfilter)
- {
- Datum res = ExecEvalExpr(filter, aggecontext, &isnull);
-
- if (isnull || !DatumGetBool(res))
- {
- MemoryContextSwitchTo(oldContext);
- continue;
- }
- }
-
-
- foreach(arg, wfuncstate->args)
- {
-
- ExprState *argstate = (ExprState *) lfirst(arg);
- tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
-
- if (perfuncstate->wfunc->aggdistinct)
- {
- tuplesort_putdatum(winstate->sortstates, tuple, isnull);
- peraggstate->sort_in = true;
- }
- else
- {
- advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
- peraggstate, tuple, isnull);
- }
-
- }
- MemoryContextSwitchTo(oldContext);
+ advance_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate);
}
next_tuple:
@@ -1072,67 +1013,6 @@ next_tuple:
}
}
-
-static void
-process_ordered_aggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate)
-{
- Datum newVal;
- bool isNull;
- MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
- MemoryContext oldContext;
- Datum oldVal = (Datum) 0;
- bool oldIsNull = true;
- bool haveOldVal = false;
-
- if (peraggstate->sort_in){
- tuplesort_performsort(winstate->sortstates);
-
-
- while (tuplesort_getdatum(winstate->sortstates,
- true, false, &newVal, &isNull, NULL))
- {
- MemoryContextReset(workcontext);
- oldContext = MemoryContextSwitchTo(workcontext);
-
- if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
- perfuncstate->winCollation,
- oldVal, newVal)))
- {
- MemoryContextSwitchTo(oldContext);
- continue;
- }
- else
- {
-
- advance_windowaggregate(winstate, perfuncstate,
- peraggstate, newVal, isNull);
- }
- MemoryContextSwitchTo(oldContext);
-
- if (!peraggstate->resulttypeByVal)
- {
- if (!oldIsNull && false)
- pfree(DatumGetPointer(oldVal));
- if (!isNull)
- oldVal = datumCopy(newVal, true,
- peraggstate->resulttypeLen);
- }
- else
- oldVal = newVal;
-
- oldIsNull = isNull;
- haveOldVal = true;
- oldVal = newVal;
- }
-
- }
- tuplesort_end(winstate->sortstates);
- peraggstate->sort_in = false;
-
-}
-
-
/*
* eval_windowfunction
*
@@ -3076,9 +2956,6 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
- get_typlenbyval(wfunc->wintype,
- &peraggstate->inputtypeLen,
- &peraggstate->inputtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
@@ -3146,32 +3023,6 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
else
peraggstate->aggcontext = winstate->aggcontext;
- /* Handle distinct operation in agg */
- if (wfunc->aggdistinct)
- {
- int numDistinctCols = list_length(wfunc->distinctargs);
- peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
- peraggstate->sort_ops = palloc(numDistinctCols * sizeof(Oid));
- winstate->sortstates = (Tuplesortstate *)
- palloc0(sizeof(Tuplesortstate *) * 1);
-
- /* Initialize tuplesort to handle distinct operation */
-
- i=0;
- foreach(lc, wfunc->distinctargs)
- {
- peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
- peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
- i++;
- }
- fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
- winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
- peraggstate->sort_ops[0],
- wfunc->inputcollid,
- true,
- work_mem, NULL, TUPLESORT_NONE);
- }
-
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 4a755d0604..76e25118f9 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2445,8 +2445,6 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
- newexpr->aggdistinct = expr->aggdistinct;
- newexpr->distinctargs = expr->distinctargs;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index c3c1f3f922..4fbf80c271 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1048,51 +1048,6 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
}
}
- if (wfunc->aggdistinct){
- List *argtypes = NIL;
- List *tlist = NIL;
- List *torder = NIL;
- List *tdistinct = NIL;
- AttrNumber attno = 1;
- ListCell *lc;
-
- foreach(lc, wfunc->args)
- {
- Expr *arg = (Expr *) lfirst(lc);
- TargetEntry *tle;
-
- /* We don't bother to assign column names to the entries */
- tle = makeTargetEntry(arg, attno++, NULL, false);
- tlist = lappend(tlist, tle);
- }
- torder = transformSortClause(pstate,
- NIL,
- &tlist,
- EXPR_KIND_ORDER_BY,
- true /* force SQL99 rules */ );
-
- tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
-
- foreach(lc, tdistinct)
- {
- SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
-
- if (!OidIsValid(sortcl->sortop))
- {
- Node *expr = get_sortgroupclause_expr(sortcl, tlist);
-
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_FUNCTION),
- errmsg("could not identify an ordering operator for type %s",
- format_type_be(exprType(expr))),
- errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
- parser_errposition(pstate, exprLocation(expr))));
- }
- }
- wfunc->distinctargs = tdistinct;
- }
-
-
pstate->p_hasWindowFuncs = true;
}
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 89a443eac5..ca14f06308 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,7 +835,15 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->aggfilter = agg_filter;
wfunc->location = location;
- wfunc->aggdistinct = agg_distinct;
+
+ /*
+ * agg_star is allowed for aggregate functions but distinct isn't
+ */
+ if (agg_distinct)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DISTINCT is not implemented for window functions"),
+ parser_errposition(pstate, location)));
/*
* Reject attempt to call a parameterless aggregate without (*)
@@ -848,16 +856,6 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
NameListToString(funcname)),
parser_errposition(pstate, location)));
- /*
- * Distinct is not implemented for aggregates with filter
- */
-
- if (agg_distinct && agg_filter)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for aggregate functions with FILTER"),
- parser_errposition(pstate, location)));
-
/*
* ordered aggs not allowed in windows yet
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4301db31d8..bc67cb9ed8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2529,7 +2529,6 @@ typedef struct WindowAggState
* date for current row */
bool grouptail_valid; /* true if grouptailpos is known up to
* date for current row */
- Tuplesortstate *sortstates;
TupleTableSlot *first_part_slot; /* first tuple of current or next
* partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index f5dd8f2d07..4220c63ab7 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -558,8 +558,6 @@ typedef struct WindowFunc
/* true if argument list was really '*' */
bool winstar pg_node_attr(query_jumble_ignore);
/* is function a simple aggregate? */
- bool aggdistinct; /* do we need distinct values for aggregation? */
- List *distinctargs;
bool winagg pg_node_attr(query_jumble_ignore);
/* token location, or -1 if unknown */
int location;
diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out
index 04247ce4a3..747608e3c1 100644
--- a/src/test/regress/expected/window.out
+++ b/src/test/regress/expected/window.out
@@ -1775,22 +1775,6 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
5000 | 4500 | 4200 | 01-01-2008
(10 rows)
--- with DISTINCT in agg functionn
-select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
- depname | count
------------+-------
- develop | 4
- develop | 4
- develop | 4
- develop | 4
- develop | 4
- personnel | 2
- personnel | 2
- sales | 3
- sales | 3
- sales | 3
-(10 rows)
-
-- RANGE offset PRECEDING/FOLLOWING with null values
select x, y,
first_value(y) over w,
diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql
index a88d1d092c..1009b438de 100644
--- a/src/test/regress/sql/window.sql
+++ b/src/test/regress/sql/window.sql
@@ -434,9 +434,6 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
exclude current row),
salary, enroll_date from empsalary;
--- with DISTINCT in agg functionn
-select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
-
-- RANGE offset PRECEDING/FOLLOWING with null values
select x, y,
first_value(y) over w,
Attaching updated patch with a fix for an issue in window function.
I have also fixed naming convention of patch as last patch had
incompatible name.
Note:
1. Pending: Investigation of test cases failures.
Regards,
Ankit
Attachments:
v1-0003-Implement-distinct-in-Window-Aggregates.patchtext/x-patch; charset=UTF-8; name=v1-0003-Implement-distinct-in-Window-Aggregates.patchDownload
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 4fcfd6df72..94ddccd23a 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -1574,14 +1574,40 @@ check_tuple(HeapCheckContext *ctx)
static FullTransactionId
FullTransactionIdFromXidAndCtx(TransactionId xid, const HeapCheckContext *ctx)
{
- uint32 epoch;
+ uint64 nextfxid_i;
+ int32 diff;
+ FullTransactionId fxid;
+
+ Assert(TransactionIdIsNormal(ctx->next_xid));
+ Assert(FullTransactionIdIsNormal(ctx->next_fxid));
+ Assert(XidFromFullTransactionId(ctx->next_fxid) == ctx->next_xid);
if (!TransactionIdIsNormal(xid))
return FullTransactionIdFromEpochAndXid(0, xid);
- epoch = EpochFromFullTransactionId(ctx->next_fxid);
- if (xid > ctx->next_xid)
- epoch--;
- return FullTransactionIdFromEpochAndXid(epoch, xid);
+
+ nextfxid_i = U64FromFullTransactionId(ctx->next_fxid);
+
+ /* compute the 32bit modulo difference */
+ diff = (int32) (ctx->next_xid - xid);
+
+ /*
+ * In cases of corruption we might see a 32bit xid that is before epoch
+ * 0. We can't represent that as a 64bit xid, due to 64bit xids being
+ * unsigned integers, without the modulo arithmetic of 32bit xid. There's
+ * no really nice way to deal with that, but it works ok enough to use
+ * FirstNormalFullTransactionId in that case, as a freshly initdb'd
+ * cluster already has a newer horizon.
+ */
+ if (diff > 0 && (nextfxid_i - FirstNormalTransactionId) < (int64) diff)
+ {
+ Assert(EpochFromFullTransactionId(ctx->next_fxid) == 0);
+ fxid = FirstNormalFullTransactionId;
+ }
+ else
+ fxid = FullTransactionIdFromU64(nextfxid_i - diff);
+
+ Assert(FullTransactionIdIsNormal(fxid));
+ return fxid;
}
/*
@@ -1597,8 +1623,8 @@ update_cached_xid_range(HeapCheckContext *ctx)
LWLockRelease(XidGenLock);
/* And compute alternate versions of the same */
- ctx->oldest_fxid = FullTransactionIdFromXidAndCtx(ctx->oldest_xid, ctx);
ctx->next_xid = XidFromFullTransactionId(ctx->next_fxid);
+ ctx->oldest_fxid = FullTransactionIdFromXidAndCtx(ctx->oldest_xid, ctx);
}
/*
diff --git a/contrib/pg_trgm/expected/pg_word_trgm.out b/contrib/pg_trgm/expected/pg_word_trgm.out
index 936d489390..c66a67f30e 100644
--- a/contrib/pg_trgm/expected/pg_word_trgm.out
+++ b/contrib/pg_trgm/expected/pg_word_trgm.out
@@ -1044,3 +1044,9 @@ select t,word_similarity('Kabankala',t) as sml from test_trgm2 where t %> 'Kaban
Waikala | 0.3
(89 rows)
+-- test unsatisfiable pattern
+select * from test_trgm2 where t ~ '.*$x';
+ t
+---
+(0 rows)
+
diff --git a/contrib/pg_trgm/sql/pg_word_trgm.sql b/contrib/pg_trgm/sql/pg_word_trgm.sql
index d9fa1c55e5..d2ada49133 100644
--- a/contrib/pg_trgm/sql/pg_word_trgm.sql
+++ b/contrib/pg_trgm/sql/pg_word_trgm.sql
@@ -43,3 +43,6 @@ select t,word_similarity('Baykal',t) as sml from test_trgm2 where 'Baykal' <% t
select t,word_similarity('Kabankala',t) as sml from test_trgm2 where 'Kabankala' <% t order by sml desc, t;
select t,word_similarity('Baykal',t) as sml from test_trgm2 where t %> 'Baykal' order by sml desc, t;
select t,word_similarity('Kabankala',t) as sml from test_trgm2 where t %> 'Kabankala' order by sml desc, t;
+
+-- test unsatisfiable pattern
+select * from test_trgm2 where t ~ '.*$x';
diff --git a/contrib/pg_trgm/trgm_regexp.c b/contrib/pg_trgm/trgm_regexp.c
index 9a00564ae4..06cd3db67b 100644
--- a/contrib/pg_trgm/trgm_regexp.c
+++ b/contrib/pg_trgm/trgm_regexp.c
@@ -1947,9 +1947,7 @@ packGraph(TrgmNFA *trgmNFA, MemoryContext rcontext)
arcsCount;
HASH_SEQ_STATUS scan_status;
TrgmState *state;
- TrgmPackArcInfo *arcs,
- *p1,
- *p2;
+ TrgmPackArcInfo *arcs;
TrgmPackedArc *packedArcs;
TrgmPackedGraph *result;
int i,
@@ -2021,17 +2019,25 @@ packGraph(TrgmNFA *trgmNFA, MemoryContext rcontext)
qsort(arcs, arcIndex, sizeof(TrgmPackArcInfo), packArcInfoCmp);
/* We could have duplicates because states were merged. Remove them. */
- /* p1 is probe point, p2 is last known non-duplicate. */
- p2 = arcs;
- for (p1 = arcs + 1; p1 < arcs + arcIndex; p1++)
+ if (arcIndex > 1)
{
- if (packArcInfoCmp(p1, p2) > 0)
+ /* p1 is probe point, p2 is last known non-duplicate. */
+ TrgmPackArcInfo *p1,
+ *p2;
+
+ p2 = arcs;
+ for (p1 = arcs + 1; p1 < arcs + arcIndex; p1++)
{
- p2++;
- *p2 = *p1;
+ if (packArcInfoCmp(p1, p2) > 0)
+ {
+ p2++;
+ *p2 = *p1;
+ }
}
+ arcsCount = (p2 - arcs) + 1;
}
- arcsCount = (p2 - arcs) + 1;
+ else
+ arcsCount = arcIndex;
/* Create packed representation */
result = (TrgmPackedGraph *)
diff --git a/meson.build b/meson.build
index 2409cc2254..d4384f1bf6 100644
--- a/meson.build
+++ b/meson.build
@@ -1268,7 +1268,7 @@ if uuidopt != 'none'
elif uuidopt == 'ossp'
uuid = dependency('ossp-uuid', required: true)
uuidfunc = 'uuid_export'
- uuidheader = 'ossp/uuid.h'
+ uuidheader = 'uuid.h'
else
error('huh')
endif
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index c695aa7525..7c07fb0684 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -155,13 +155,6 @@ typedef struct WindowStatePerAggData
int64 transValueCount; /* number of currently-aggregated rows */
- Datum lastdatum; /* used for single-column DISTINCT */
- FmgrInfo equalfnOne; /* single-column comparisons*/
- Oid *eq_ops;
- Oid *sort_ops;
-
- bool sort_in;
-
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
@@ -171,7 +164,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate, Datum value, bool isNull);
+ WindowStatePerAgg peraggstate);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
@@ -181,9 +174,6 @@ static void finalize_windowaggregate(WindowAggState *winstate,
Datum *result, bool *isnull);
static void eval_windowaggregates(WindowAggState *winstate);
-static void process_ordered_aggregate_single(WindowAggState *winstate,
- WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate);
static void eval_windowfunction(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
Datum *result, bool *isnull);
@@ -241,7 +231,6 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
- peraggstate->lastdatum = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -252,21 +241,43 @@ initialize_windowaggregate(WindowAggState *winstate,
static void
advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate, Datum value, bool isNull)
+ WindowStatePerAgg peraggstate)
{
LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
+ WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
Datum newVal;
+ ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
+ ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+ /* Skip anything FILTERed out */
+ if (filter)
+ {
+ bool isnull;
+ Datum res = ExecEvalExpr(filter, econtext, &isnull);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ return;
+ }
+ }
+
/* We start from 1, since the 0th arg will be the transition value */
+ i = 1;
+ foreach(arg, wfuncstate->args)
+ {
+ ExprState *argstate = (ExprState *) lfirst(arg);
- fcinfo->args[1].value = value;
- fcinfo->args[1].isnull = isNull;
+ fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
+ &fcinfo->args[i].isnull);
+ i++;
+ }
if (peraggstate->transfn.fn_strict)
{
@@ -575,10 +586,6 @@ finalize_windowaggregate(WindowAggState *winstate,
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
- /* Run transition function for distinct agg */
- if (perfuncstate->wfunc->aggdistinct)
- process_ordered_aggregate_single(winstate, perfuncstate, peraggstate);
-
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
@@ -660,16 +667,6 @@ eval_windowaggregates(WindowAggState *winstate)
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
- ExprState *filter;
- bool isnull;
- WindowFuncExprState *wfuncstate;
- ListCell *arg;
- Datum tuple;
- ExprContext *aggecontext;
- ListCell *lc;
- Oid inputTypes[FUNC_MAX_ARGS];
- WindowStatePerFunc perfuncstate;
-
numaggs = winstate->numaggs;
if (numaggs == 0)
return; /* nothing to do */
@@ -897,22 +894,6 @@ eval_windowaggregates(WindowAggState *winstate)
}
}
- perfuncstate = &winstate->perfunc[wfuncno];
- if (perfuncstate->wfunc->aggdistinct)
- {
- i = 0;
- foreach(lc, perfuncstate->wfunc->args)
- {
- inputTypes[i++] = exprType((Node *) lfirst(lc));
- }
- winstate->sortstates =
- tuplesort_begin_datum(inputTypes[0],
- peraggstate->sort_ops[0],
- perfuncstate->wfunc->inputcollid,
- true,
- work_mem, NULL, TUPLESORT_NONE);
- }
-
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -947,8 +928,7 @@ eval_windowaggregates(WindowAggState *winstate)
{
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
- break; /* must be end of partition */
-
+ break; /* must be end of partition */
}
/*
@@ -956,16 +936,14 @@ eval_windowaggregates(WindowAggState *winstate)
* current row is not in frame but there might be more in the frame.
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
-
if (ret < 0)
break;
-
if (ret == 0)
goto next_tuple;
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-
+
/* Accumulate row into the aggregates */
for (i = 0; i < numaggs; i++)
{
@@ -977,52 +955,9 @@ eval_windowaggregates(WindowAggState *winstate)
continue;
wfuncno = peraggstate->wfuncno;
- perfuncstate = &winstate->perfunc[wfuncno];
-
- aggecontext = winstate->tmpcontext;
-
- wfuncstate = perfuncstate->wfuncstate;
- filter = wfuncstate->aggfilter;
-
- oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
- /* Skip anything FILTERed out for aggregates */
- if (perfuncstate->plain_agg && wfuncstate->aggfilter)
- {
- Datum res = ExecEvalExpr(filter, aggecontext, &isnull);
-
- if (isnull || !DatumGetBool(res))
- {
- MemoryContextSwitchTo(oldContext);
- continue;
- }
- }
-
-
- foreach(arg, wfuncstate->args)
- {
-
- ExprState *argstate = (ExprState *) lfirst(arg);
- tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
-
- if (perfuncstate->wfunc->aggdistinct)
- {
- tuplesort_putdatum(winstate->sortstates, tuple, isnull);
- peraggstate->sort_in = true;
- }
- else
- {
- advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
- peraggstate, tuple, isnull);
- }
-
- }
- // Eg case of count(*), How can we this in better way?
- if (wfuncstate->args == NIL)
- {
- advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
- peraggstate, tuple, isnull);
- }
- MemoryContextSwitchTo(oldContext);
+ advance_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate);
}
next_tuple:
@@ -1078,67 +1013,6 @@ next_tuple:
}
}
-
-static void
-process_ordered_aggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate)
-{
- Datum newVal;
- bool isNull;
- MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
- MemoryContext oldContext;
- Datum oldVal = (Datum) 0;
- bool oldIsNull = true;
- bool haveOldVal = false;
-
- if (peraggstate->sort_in){
- tuplesort_performsort(winstate->sortstates);
-
-
- while (tuplesort_getdatum(winstate->sortstates,
- true, false, &newVal, &isNull, NULL))
- {
- MemoryContextReset(workcontext);
- oldContext = MemoryContextSwitchTo(workcontext);
-
- if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
- perfuncstate->winCollation,
- oldVal, newVal)))
- {
- MemoryContextSwitchTo(oldContext);
- continue;
- }
- else
- {
-
- advance_windowaggregate(winstate, perfuncstate,
- peraggstate, newVal, isNull);
- }
- MemoryContextSwitchTo(oldContext);
-
- if (!peraggstate->resulttypeByVal)
- {
- if (!oldIsNull && false)
- pfree(DatumGetPointer(oldVal));
- if (!isNull)
- oldVal = datumCopy(newVal, true,
- peraggstate->resulttypeLen);
- }
- else
- oldVal = newVal;
-
- oldIsNull = isNull;
- haveOldVal = true;
- oldVal = newVal;
- }
-
- }
- tuplesort_end(winstate->sortstates);
- peraggstate->sort_in = false;
-
-}
-
-
/*
* eval_windowfunction
*
@@ -3082,9 +2956,6 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
- get_typlenbyval(wfunc->wintype,
- &peraggstate->inputtypeLen,
- &peraggstate->inputtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
@@ -3152,32 +3023,6 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
else
peraggstate->aggcontext = winstate->aggcontext;
- /* Handle distinct operation in agg */
- if (wfunc->aggdistinct)
- {
- int numDistinctCols = list_length(wfunc->distinctargs);
- peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
- peraggstate->sort_ops = palloc(numDistinctCols * sizeof(Oid));
- winstate->sortstates = (Tuplesortstate *)
- palloc0(sizeof(Tuplesortstate *) * 1);
-
- /* Initialize tuplesort to handle distinct operation */
-
- i=0;
- foreach(lc, wfunc->distinctargs)
- {
- peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
- peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
- i++;
- }
- fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
- winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
- peraggstate->sort_ops[0],
- wfunc->inputcollid,
- true,
- work_mem, NULL, TUPLESORT_NONE);
- }
-
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 4a755d0604..76e25118f9 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2445,8 +2445,6 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
- newexpr->aggdistinct = expr->aggdistinct;
- newexpr->distinctargs = expr->distinctargs;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index fb0262676b..4fbf80c271 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1048,50 +1048,6 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
}
}
- if (wfunc->aggdistinct){
- List *tlist = NIL;
- List *torder = NIL;
- List *tdistinct = NIL;
- AttrNumber attno = 1;
- ListCell *lc;
-
- foreach(lc, wfunc->args)
- {
- Expr *arg = (Expr *) lfirst(lc);
- TargetEntry *tle;
-
- /* We don't bother to assign column names to the entries */
- tle = makeTargetEntry(arg, attno++, NULL, false);
- tlist = lappend(tlist, tle);
- }
- torder = transformSortClause(pstate,
- NIL,
- &tlist,
- EXPR_KIND_ORDER_BY,
- true /* force SQL99 rules */ );
-
- tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
-
- foreach(lc, tdistinct)
- {
- SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
-
- if (!OidIsValid(sortcl->sortop))
- {
- Node *expr = get_sortgroupclause_expr(sortcl, tlist);
-
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_FUNCTION),
- errmsg("could not identify an ordering operator for type %s",
- format_type_be(exprType(expr))),
- errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
- parser_errposition(pstate, exprLocation(expr))));
- }
- }
- wfunc->distinctargs = tdistinct;
- }
-
-
pstate->p_hasWindowFuncs = true;
}
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 89a443eac5..ca14f06308 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,7 +835,15 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->aggfilter = agg_filter;
wfunc->location = location;
- wfunc->aggdistinct = agg_distinct;
+
+ /*
+ * agg_star is allowed for aggregate functions but distinct isn't
+ */
+ if (agg_distinct)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DISTINCT is not implemented for window functions"),
+ parser_errposition(pstate, location)));
/*
* Reject attempt to call a parameterless aggregate without (*)
@@ -848,16 +856,6 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
NameListToString(funcname)),
parser_errposition(pstate, location)));
- /*
- * Distinct is not implemented for aggregates with filter
- */
-
- if (agg_distinct && agg_filter)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for aggregate functions with FILTER"),
- parser_errposition(pstate, location)));
-
/*
* ordered aggs not allowed in windows yet
*/
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index 215c30eaa8..e5ae7e6aad 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -217,17 +217,17 @@ my $rel = $node->safe_psql('postgres',
my $relpath = "$pgdata/$rel";
# Insert data and freeze public.test
-use constant ROWCOUNT => 16;
+my $ROWCOUNT = 17;
$node->safe_psql(
'postgres', qq(
INSERT INTO public.test (a, b, c)
- VALUES (
+ SELECT
x'DEADF9F9DEADF9F9'::bigint,
'abcdefg',
repeat('w', 10000)
- );
- VACUUM FREEZE public.test
- )) for (1 .. ROWCOUNT);
+ FROM generate_series(1, $ROWCOUNT);
+ VACUUM FREEZE public.test;)
+);
my $relfrozenxid = $node->safe_psql('postgres',
q(select relfrozenxid from pg_class where relname = 'test'));
@@ -246,16 +246,13 @@ if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
}
# Find where each of the tuples is located on the page.
-my @lp_off;
-for my $tup (0 .. ROWCOUNT - 1)
-{
- push(
- @lp_off,
- $node->safe_psql(
- 'postgres', qq(
-select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
- offset $tup limit 1)));
-}
+my @lp_off = split '\n', $node->safe_psql(
+ 'postgres', qq(
+ select lp_off from heap_page_items(get_raw_page('test', 'main', 0))
+ where lp <= $ROWCOUNT
+ )
+);
+is(scalar @lp_off, $ROWCOUNT, "acquired row offsets");
# Sanity check that our 'test' table on disk layout matches expectations. If
# this is not so, we will have to skip the test until somebody updates the test
@@ -267,7 +264,7 @@ open($file, '+<', $relpath)
binmode $file;
my $ENDIANNESS;
-for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
+for (my $tupidx = 0; $tupidx < $ROWCOUNT; $tupidx++)
{
my $offnum = $tupidx + 1; # offnum is 1-based, not zero-based
my $offset = $lp_off[$tupidx];
@@ -345,7 +342,7 @@ open($file, '+<', $relpath)
or BAIL_OUT("open failed: $!");
binmode $file;
-for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
+for (my $tupidx = 0; $tupidx < $ROWCOUNT; $tupidx++)
{
my $offnum = $tupidx + 1; # offnum is 1-based, not zero-based
my $offset = $lp_off[$tupidx];
@@ -378,23 +375,24 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
elsif ($offnum == 3)
{
# Corruptly set xmin < datfrozenxid, further back, noting circularity
- # of xid comparison. For a new cluster with epoch = 0, the corrupt
- # xmin will be interpreted as in the future
- $tup->{t_xmin} = 4026531839;
+ # of xid comparison.
+ my $xmin = 4026531839;
+ $tup->{t_xmin} = $xmin;
$tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
$tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
push @expected,
- qr/${$header}xmin 4026531839 equals or exceeds next valid transaction ID 0:\d+/;
+ qr/${$header}xmin ${xmin} precedes oldest valid transaction ID 0:\d+/;
}
elsif ($offnum == 4)
{
# Corruptly set xmax < relminmxid;
- $tup->{t_xmax} = 4026531839;
+ my $xmax = 4026531839;
+ $tup->{t_xmax} = $xmax;
$tup->{t_infomask} &= ~HEAP_XMAX_INVALID;
push @expected,
- qr/${$header}xmax 4026531839 equals or exceeds next valid transaction ID 0:\d+/;
+ qr/${$header}xmax ${xmax} precedes oldest valid transaction ID 0:\d+/;
}
elsif ($offnum == 5)
{
@@ -502,7 +500,7 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
push @expected,
qr/${header}multitransaction ID 4 equals or exceeds next valid multitransaction ID 1/;
}
- elsif ($offnum == 15) # Last offnum must equal ROWCOUNT
+ elsif ($offnum == 15)
{
# Set both HEAP_XMAX_COMMITTED and HEAP_XMAX_IS_MULTI
$tup->{t_infomask} |= HEAP_XMAX_COMMITTED;
@@ -512,6 +510,17 @@ for (my $tupidx = 0; $tupidx < ROWCOUNT; $tupidx++)
push @expected,
qr/${header}multitransaction ID 4000000000 precedes relation minimum multitransaction ID threshold 1/;
}
+ elsif ($offnum == 16) # Last offnum must equal ROWCOUNT
+ {
+ # Corruptly set xmin > next_xid to be in the future.
+ my $xmin = 123456;
+ $tup->{t_xmin} = $xmin;
+ $tup->{t_infomask} &= ~HEAP_XMIN_COMMITTED;
+ $tup->{t_infomask} &= ~HEAP_XMIN_INVALID;
+
+ push @expected,
+ qr/${$header}xmin ${xmin} equals or exceeds next valid transaction ID 0:\d+/;
+ }
write_tuple($file, $offset, $tup);
}
close($file)
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4301db31d8..bc67cb9ed8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2529,7 +2529,6 @@ typedef struct WindowAggState
* date for current row */
bool grouptail_valid; /* true if grouptailpos is known up to
* date for current row */
- Tuplesortstate *sortstates;
TupleTableSlot *first_part_slot; /* first tuple of current or next
* partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index f5dd8f2d07..4220c63ab7 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -558,8 +558,6 @@ typedef struct WindowFunc
/* true if argument list was really '*' */
bool winstar pg_node_attr(query_jumble_ignore);
/* is function a simple aggregate? */
- bool aggdistinct; /* do we need distinct values for aggregation? */
- List *distinctargs;
bool winagg pg_node_attr(query_jumble_ignore);
/* token location, or -1 if unknown */
int location;
diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out
index 04247ce4a3..747608e3c1 100644
--- a/src/test/regress/expected/window.out
+++ b/src/test/regress/expected/window.out
@@ -1775,22 +1775,6 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
5000 | 4500 | 4200 | 01-01-2008
(10 rows)
--- with DISTINCT in agg functionn
-select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
- depname | count
------------+-------
- develop | 4
- develop | 4
- develop | 4
- develop | 4
- develop | 4
- personnel | 2
- personnel | 2
- sales | 3
- sales | 3
- sales | 3
-(10 rows)
-
-- RANGE offset PRECEDING/FOLLOWING with null values
select x, y,
first_value(y) over w,
diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql
index a88d1d092c..1009b438de 100644
--- a/src/test/regress/sql/window.sql
+++ b/src/test/regress/sql/window.sql
@@ -434,9 +434,6 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
exclude current row),
salary, enroll_date from empsalary;
--- with DISTINCT in agg functionn
-select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
-
-- RANGE offset PRECEDING/FOLLOWING with null values
select x, y,
first_value(y) over w,
On 3/12/23 09:17, Ankit Kumar Pandey wrote:
Attaching updated patch with a fix for an issue in window function.
I have also fixed naming convention of patch as last patch had
incompatible name.
Hi,
This patch does not apply to master. Could you rebase it and submit it
as one patch which applies directly to master? Maybe I am wrong but the
latest version looks like it only applies on top of one of your previous
patches which makes it hard for the reviewer.
Andreas
On 11 Jul 2023, at 01:06, Andreas Karlsson <andreas@proxel.se> wrote:
On 3/12/23 09:17, Ankit Kumar Pandey wrote:
Attaching updated patch with a fix for an issue in window function.
I have also fixed naming convention of patch as last patch had incompatible name.Hi,
This patch does not apply to master. Could you rebase it and submit it as one patch which applies directly to master? Maybe I am wrong but the latest version looks like it only applies on top of one of your previous patches which makes it hard for the reviewer.
Since no update was posted, the patch was considered a PoC and the thread has
stalled, I will mark this returned with feedback. Please feel free to reopen a
new CF entry when there is a new patch available which addresses Andreas'
feedback on patch structure.
--
Daniel Gustafsson