parallel distinct union and aggregate support patch

Started by bucoo@sohu.comover 5 years ago27 messageshackers
Jump to latest
#1bucoo@sohu.com
bucoo@sohu.com

Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
1. generate hash value for group clauses values, and using mod hash value save to batch
2. end of outer plan, wait all other workers finish write to batch
3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
4. return row for this batch
5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

path 2 for parallel aggregate, this is a simple use
but regress failed for partitionwise aggregation difference plan
from GatherMerge->Sort->Append->...
to Sort->Gahter->Append->...
I have no idea how to modify it.

Same idea I writed a batch shared tuple store for HashAgg in our PG version, I will send patch for PG14 when I finish it.

The following is a description in Chinese
英语不好,所以这里写点中文,希望上面写的不对的地方请大家帮忙纠正一下。
BatchSort的工作原理
1. 先按group clause计算出hash值,并按取模的值放入不同的批次
2. 当下层plan返回所有的行后,等待所有其它的工作进程结束
3. 每一个工作进程索取一个唯一的一个批次, 并调用tuplesort_performsort()函数完成最终排序
4. 返回本批次的所有行
5. 如果所有的批次没有读完,则返回第3步
BatchSort plan能保证相同的数据(按分给表达式)在同一个周期内返回,所以几个去重和分组相关的plan可以正常工作。
第2个补丁是支持并行分组的,只做一次分组,而不是并行进程做每一次分组后,主进程再进行二次分组。
这个补丁导致了regress测试中的partitionwise aggregation失败,原来的执行计划有所变更。
补丁只写了一个简单的使用BatchSort plan的方法,可能还需要添加其它用法。

用同样的思想我写了一个使用shared tuple store的HashAgg在我们的AntDB版本中(最新版本暂未开源),适配完PG14版本后我会发出来。
打个广告:欢迎关注我们亚信公司基于PG的分布式数据库产品AntDB,开源地址 https://github.com/ADBSQL/AntDB

bucoo@sohu.com

Attachments:

0001-Parallel-distinct-and-union-support.patchapplication/octet-stream; name=0001-Parallel-distinct-and-union-support.patchDownload+960-15
0002-Parallel-aggregate-support-using-batch-sort.patchapplication/octet-stream; name=0002-Parallel-aggregate-support-using-batch-sort.patchDownload+152-1
#2Thomas Munro
thomas.munro@gmail.com
In reply to: bucoo@sohu.com (#1)
Re: parallel distinct union and aggregate support patch

On Tue, Oct 20, 2020 at 3:49 AM bucoo@sohu.com <bucoo@sohu.com> wrote:

I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
1. generate hash value for group clauses values, and using mod hash value save to batch
2. end of outer plan, wait all other workers finish write to batch
3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
4. return row for this batch
5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

Hi!

Interesting work! In the past a few people have speculated about a
Parallel Repartition operator that could partition tuples a bit like
this, so that each process gets a different set of partitions. Here
you combine that with a sort. By doing both things in one node, you
avoid a lot of overheads (writing into a tuplestore once in the
repartitioning node, and then once again in the sort node, with tuples
being copied one-by-one between the two nodes).

If I understood correctly, the tuples emitted by Parallel Batch Sort
in each process are ordered by (hash(key, ...) % npartitions, key,
...), but the path is claiming to be ordered by (key, ...), no?
That's enough for Unique and Aggregate to give the correct answer,
because they really only require equal keys to be consecutive (and in
the same process), but maybe some other plan could break?

#3Dilip Kumar
dilipbalaut@gmail.com
In reply to: bucoo@sohu.com (#1)
Re: parallel distinct union and aggregate support patch

On Mon, Oct 19, 2020 at 8:19 PM bucoo@sohu.com <bucoo@sohu.com> wrote:

Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
1. generate hash value for group clauses values, and using mod hash value save to batch
2. end of outer plan, wait all other workers finish write to batch
3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
4. return row for this batch
5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#4bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
Re: Re: parallel distinct union and aggregate support patch

If I understood correctly, the tuples emitted by Parallel Batch Sort
in each process are ordered by (hash(key, ...) % npartitions, key,
...), but the path is claiming to be ordered by (key, ...), no?
That's enough for Unique and Aggregate to give the correct answer,
because they really only require equal keys to be consecutive (and in
the same process), but maybe some other plan could break?

The path not claiming to be ordered by (key, ...), the path save PathKey(s) in BatchSortPath::batchkeys, not Path::pathkeys.
I don't understand "but maybe some other plan could break", mean some on path using this path? no, BathSortPath on for some special path(Unique, GroupAgg ...).

bucoo@sohu.com

From: Thomas Munro
Date: 2020-10-21 12:27
To: bucoo@sohu.com
CC: pgsql-hackers
Subject: Re: parallel distinct union and aggregate support patch
On Tue, Oct 20, 2020 at 3:49 AM bucoo@sohu.com <bucoo@sohu.com> wrote:

I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
1. generate hash value for group clauses values, and using mod hash value save to batch
2. end of outer plan, wait all other workers finish write to batch
3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
4. return row for this batch
5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

Hi!

Interesting work! In the past a few people have speculated about a
Parallel Repartition operator that could partition tuples a bit like
this, so that each process gets a different set of partitions. Here
you combine that with a sort. By doing both things in one node, you
avoid a lot of overheads (writing into a tuplestore once in the
repartitioning node, and then once again in the sort node, with tuples
being copied one-by-one between the two nodes).

If I understood correctly, the tuples emitted by Parallel Batch Sort
in each process are ordered by (hash(key, ...) % npartitions, key,
...), but the path is claiming to be ordered by (key, ...), no?
That's enough for Unique and Aggregate to give the correct answer,
because they really only require equal keys to be consecutive (and in
the same process), but maybe some other plan could break?

#5bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
Re: Re: parallel distinct union and aggregate support patch

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

Good idea. Batch sort suitable for large aggregate result rows,
in large aggregate result using partial aggregation maybe out of memory,
and all aggregate functions must support partial(using batch sort this is unnecessary).

Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
to shared tuple store, But it's not support parallel grouping sets.
I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
and need support parallel grouping sets hash aggregate.

#6Dilip Kumar
dilipbalaut@gmail.com
In reply to: bucoo@sohu.com (#5)
Re: Re: parallel distinct union and aggregate support patch

On Fri, Oct 23, 2020 at 11:58 AM bucoo@sohu.com <bucoo@sohu.com> wrote:

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

Good idea. Batch sort suitable for large aggregate result rows,
in large aggregate result using partial aggregation maybe out of memory,
and all aggregate functions must support partial(using batch sort this is unnecessary).

Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
to shared tuple store, But it's not support parallel grouping sets.
I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
and need support parallel grouping sets hash aggregate.

I was trying to look into this patch to understand the logic in more
detail. Actually, there are no comments at all so it's really hard to
understand what the code is trying to do.

I was reading the below functions, which is the main entry point for
the batch sort.

+static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
+{
...
+ for (;;)
+ {
...
+ tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
+ }
+
+ for (i=node->numBatches;i>0;)
+ tuplesort_performsort(state->batches[--i]);
+build_already_done_:
+ if (parallel)
+ {
+ for (i=node->numBatches;i>0;)
+ {
+ --i;
+ if (state->batches[i])
+ {
+ tuplesort_end(state->batches[i]);
+ state->batches[i] = NULL;
+ }
+ }

I did not understand this part, that once each worker has performed
their local batch-wise sort why we are clearing the baches? I mean
individual workers have their on batches so eventually they supposed
to get merged. Can you explain this part and also it will be better
if you can add the comments.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#7Robert Haas
robertmhaas@gmail.com
In reply to: Dilip Kumar (#3)
Re: parallel distinct union and aggregate support patch

On Thu, Oct 22, 2020 at 5:08 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

I am not sure if this would be a win if the typical group size is
small and the transition state has to be serialized/deserialized.
Possibly we need multiple strategies, but I guess we'd have to test
performance to be sure.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

#8Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#6)
Re: Re: parallel distinct union and aggregate support patch

On Tue, Oct 27, 2020 at 3:27 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Fri, Oct 23, 2020 at 11:58 AM bucoo@sohu.com <bucoo@sohu.com> wrote:

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

Good idea. Batch sort suitable for large aggregate result rows,
in large aggregate result using partial aggregation maybe out of memory,
and all aggregate functions must support partial(using batch sort this is unnecessary).

Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
to shared tuple store, But it's not support parallel grouping sets.
I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
and need support parallel grouping sets hash aggregate.

I was trying to look into this patch to understand the logic in more
detail. Actually, there are no comments at all so it's really hard to
understand what the code is trying to do.

I was reading the below functions, which is the main entry point for
the batch sort.

+static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
+{
...
+ for (;;)
+ {
...
+ tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
+ }
+
+ for (i=node->numBatches;i>0;)
+ tuplesort_performsort(state->batches[--i]);
+build_already_done_:
+ if (parallel)
+ {
+ for (i=node->numBatches;i>0;)
+ {
+ --i;
+ if (state->batches[i])
+ {
+ tuplesort_end(state->batches[i]);
+ state->batches[i] = NULL;
+ }
+ }

I did not understand this part, that once each worker has performed
their local batch-wise sort why we are clearing the baches? I mean
individual workers have their on batches so eventually they supposed
to get merged. Can you explain this part and also it will be better
if you can add the comments.

I think I got this, IIUC, each worker is initializing the shared
short and performing the batch-wise sorting and we will wait on a
barrier so that all the workers can finish with their sorting. Once
that is done the workers will coordinate and pick the batch by batch
and perform the final merge for the batch.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#9Dilip Kumar
dilipbalaut@gmail.com
In reply to: Robert Haas (#7)
Re: parallel distinct union and aggregate support patch

On Tue, Oct 27, 2020 at 5:43 PM Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Oct 22, 2020 at 5:08 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

I am not sure if this would be a win if the typical group size is
small and the transition state has to be serialized/deserialized.
Possibly we need multiple strategies, but I guess we'd have to test
performance to be sure.

+1

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#10bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
Re: Re: parallel distinct union and aggregate support patch

On Tue, Oct 27, 2020 at 3:27 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Fri, Oct 23, 2020 at 11:58 AM bucoo@sohu.com <bucoo@sohu.com> wrote:

Interesting idea. So IIUC, whenever a worker is scanning the tuple it
will directly put it into the respective batch(shared tuple store),
based on the hash on grouping column and once all the workers are
doing preparing the batch then each worker will pick those baches one
by one, perform sort and finish the aggregation. I think there is a
scope of improvement that instead of directly putting the tuple to the
batch what if the worker does the partial aggregations and then it
places the partially aggregated rows in the shared tuple store based
on the hash value and then the worker can pick the batch by batch. By
doing this way, we can avoid doing large sorts. And then this
approach can also be used with the hash aggregate, I mean the
partially aggregated data by the hash aggregate can be put into the
respective batch.

Good idea. Batch sort suitable for large aggregate result rows,
in large aggregate result using partial aggregation maybe out of memory,
and all aggregate functions must support partial(using batch sort this is unnecessary).

Actually i written a batch hash store for hash aggregate(for pg11) like this idea,
but not write partial aggregations to shared tuple store, it's write origin tuple and hash value
to shared tuple store, But it's not support parallel grouping sets.
I'am trying to write parallel hash aggregate support using batch shared tuple store for PG14,
and need support parallel grouping sets hash aggregate.

I was trying to look into this patch to understand the logic in more
detail. Actually, there are no comments at all so it's really hard to
understand what the code is trying to do.

I was reading the below functions, which is the main entry point for
the batch sort.

+static TupleTableSlot *ExecBatchSortPrepare(PlanState *pstate)
+{
...
+ for (;;)
+ {
...
+ tuplesort_puttupleslot(state->batches[hash%node->numBatches], slot);
+ }
+
+ for (i=node->numBatches;i>0;)
+ tuplesort_performsort(state->batches[--i]);
+build_already_done_:
+ if (parallel)
+ {
+ for (i=node->numBatches;i>0;)
+ {
+ --i;
+ if (state->batches[i])
+ {
+ tuplesort_end(state->batches[i]);
+ state->batches[i] = NULL;
+ }
+ }

I did not understand this part, that once each worker has performed
their local batch-wise sort why we are clearing the baches? I mean
individual workers have their on batches so eventually they supposed
to get merged. Can you explain this part and also it will be better
if you can add the comments.

I think I got this, IIUC, each worker is initializing the shared
short and performing the batch-wise sorting and we will wait on a
barrier so that all the workers can finish with their sorting. Once
that is done the workers will coordinate and pick the batch by batch
and perform the final merge for the batch.

Yes, it is. Each worker open the shared sort as "worker" (nodeBatchSort.c:134),
end of all worker performing, pick one batch and open it as "leader"(nodeBatchSort.c:54).

#11bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
Re: parallel distinct union and aggregate support patch

Hi
Here is patch for parallel distinct union aggregate and grouping sets support using batch hash agg.
Please review.

how to use:
set enable_batch_hashagg = on

how to work:
like batch sort, but not sort each batch, just save hash value in each rows

unfinished work:
not support rescan yet. welcome to add. Actually I don't really understand how rescan works in parallel mode.

other:
patch 1 base on branch master(80f8eb79e24d9b7963eaf17ce846667e2c6b6e6f)
patch 1 and 2 see /messages/by-id/2020101922424962544053@sohu.com
patch 3:
extpand shared tuple store and add batch store module.
By the way, use atomic operations instead LWLock for shared tuple store get next read page.
patch 4:
using batch hash agg support parallels

发件人: bucoo@sohu.com
发送时间: 2020-10-19 22:42
收件人: pgsql-hackers
主题: parallel distinct union and aggregate support patch
Hi hackers,
I write a path for soupport parallel distinct, union and aggregate using batch sort.
steps:
1. generate hash value for group clauses values, and using mod hash value save to batch
2. end of outer plan, wait all other workers finish write to batch
3. echo worker get a unique batch number, call tuplesort_performsort() function finish this batch sort
4. return row for this batch
5. if not end of all batchs, got step 3

BatchSort paln make sure same tuple(group clause) return in same range, so Unique(or GroupAggregate) plan can work.

path 2 for parallel aggregate, this is a simple use
but regress failed for partitionwise aggregation difference plan
from GatherMerge->Sort->Append->...
to Sort->Gahter->Append->...
I have no idea how to modify it.

Same idea I writed a batch shared tuple store for HashAgg in our PG version, I will send patch for PG14 when I finish it.

The following is a description in Chinese
英语不好,所以这里写点中文,希望上面写的不对的地方请大家帮忙纠正一下。
BatchSort的工作原理
1. 先按group clause计算出hash值,并按取模的值放入不同的批次
2. 当下层plan返回所有的行后,等待所有其它的工作进程结束
3. 每一个工作进程索取一个唯一的一个批次, 并调用tuplesort_performsort()函数完成最终排序
4. 返回本批次的所有行
5. 如果所有的批次没有读完,则返回第3步
BatchSort plan能保证相同的数据(按分给表达式)在同一个周期内返回,所以几个去重和分组相关的plan可以正常工作。
第2个补丁是支持并行分组的,只做一次分组,而不是并行进程做每一次分组后,主进程再进行二次分组。
这个补丁导致了regress测试中的partitionwise aggregation失败,原来的执行计划有所变更。
补丁只写了一个简单的使用BatchSort plan的方法,可能还需要添加其它用法。

用同样的思想我写了一个使用shared tuple store的HashAgg在我们的AntDB版本中(最新版本暂未开源),适配完PG14版本后我会发出来。
打个广告:欢迎关注我们亚信公司基于PG的分布式数据库产品AntDB,开源地址 https://github.com/ADBSQL/AntDB

bucoo@sohu.com

Attachments:

0003-extpand-shared-tuple-store-and-add-batch-store-modul.patchapplication/octet-stream; name=0003-extpand-shared-tuple-store-and-add-batch-store-modul.patchDownload+526-24
0004-Parallel-distinct-union-aggregate-and-grouping-sets-.patchapplication/octet-stream; name=0004-Parallel-distinct-union-aggregate-and-grouping-sets-.patchDownload+941-87
#12Tomas Vondra
tomas.vondra@2ndquadrant.com
In reply to: bucoo@sohu.com (#11)
Re: parallel distinct union and aggregate support patch

Hi,

On Wed, Oct 28, 2020 at 05:37:40PM +0800, bucoo@sohu.com wrote:

Hi
Here is patch for parallel distinct union aggregate and grouping sets support using batch hash agg.
Please review.

how to use:
set enable_batch_hashagg = on

how to work:
like batch sort, but not sort each batch, just save hash value in each rows

unfinished work:
not support rescan yet. welcome to add. Actually I don't really understand how rescan works in parallel mode.

other:
patch 1 base on branch master(80f8eb79e24d9b7963eaf17ce846667e2c6b6e6f)
patch 1 and 2 see /messages/by-id/2020101922424962544053@sohu.com
patch 3:
extpand shared tuple store and add batch store module.
By the way, use atomic operations instead LWLock for shared tuple store get next read page.
patch 4:
using batch hash agg support parallels

Thanks for the patch!

Two generic comments:

1) It's better to always include the whole patch series - including the
parts that have not changed. Otherwise people have to scavenge the
thread and search for all the pieces, which may be a source of issues.
Also, it confuses the patch tester [1] which tries to apply patches from
a single message, so it will fail for this one.

2) I suggest you try to describe the goal of these patches, using some
example queries, explain output etc. Right now the reviewers have to
reverse engineer the patches and deduce what the intention was, which
may be causing unnecessary confusion etc. If this was my patch, I'd try
to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
how the patch changes the query plan, showing speedup etc.

I'd like to do a review and some testing, and this would make it much
easier for me.

kind regards

--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#13bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
Re: Re: parallel distinct union and aggregate support patch

1) It's better to always include the whole patch series - including the
parts that have not changed. Otherwise people have to scavenge the
thread and search for all the pieces, which may be a source of issues.
Also, it confuses the patch tester [1] which tries to apply patches from
a single message, so it will fail for this one.

Pathes 3 and 4 do not rely on 1 and 2 in code.
But, it will fail when you apply the apatches 3 and 4 directly, because
they are written after 1 and 2.
I can generate a new single patch if you need.

2) I suggest you try to describe the goal of these patches, using some
example queries, explain output etc. Right now the reviewers have to
reverse engineer the patches and deduce what the intention was, which
may be causing unnecessary confusion etc. If this was my patch, I'd try
to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
how the patch changes the query plan, showing speedup etc.

I written some example queries in to regress, include "unique" "union"
"group by" and "group by grouping sets".
here is my tests, they are not in regress
```sql
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set jit = off;
\timing on
```
normal aggregate times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
Output: sum(id), txt
Group Key: gtest.txt
-> Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
Output: txt, (PARTIAL sum(id))
Workers Planned: 2
Workers Launched: 2
-> Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
Output: txt, (PARTIAL sum(id))
Sort Key: gtest.txt
Sort Method: external merge Disk: 11608kB
Worker 0: actual time=6447.665..7349.431 rows=317512 loops=1
Sort Method: external merge Disk: 10576kB
Worker 1: actual time=6302.882..7061.157 rows=333301 loops=1
Sort Method: external merge Disk: 11112kB
-> Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
Output: txt, PARTIAL sum(id)
Group Key: gtest.txt
Batches: 17 Memory Usage: 4241kB Disk Usage: 113152kB
Worker 0: actual time=2584.345..4486.407 rows=317512 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 101392kB
Worker 1: actual time=2584.369..4393.244 rows=333301 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 112832kB
-> Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.104..607.146 rows=3174970 loops=1
Worker 1: actual time=0.100..603.951 rows=3332785 loops=1
Planning Time: 0.226 ms
Execution Time: 9021.058 ms
(29 rows)

Time: 9022.251 ms (00:09.022)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
Output: (sum(id)), txt
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
Output: sum(id), txt
Group Key: gtest.txt
Worker 0: actual time=3094.550..5486.992 rows=326082 loops=1
Worker 1: actual time=3099.562..5480.111 rows=324729 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.080..646.053 rows=3057680 loops=1
Worker 1: actual time=0.070..662.754 rows=3034370 loops=1
Planning Time: 0.243 ms
Execution Time: 5788.981 ms
(15 rows)

Time: 5790.143 ms (00:05.790)
```

grouping sets times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
------------------------------------------------------------------------------------------
GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Sort Key: gtest.txt
Group Key: gtest.txt
-> Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
Output: txt, id
Sort Key: gtest.id
Sort Method: external merge Disk: 254056kB
-> Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
Output: txt, id
Planning Time: 0.230 ms
Execution Time: 39203.883 ms
(14 rows)

Time: 39205.339 ms (00:39.205)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
Output: (sum(id)), txt, id
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Group Key: gtest.txt
Worker 0: actual time=5916.370..14062.461 rows=513810 loops=1
Worker 1: actual time=5916.037..13932.847 rows=775901 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.052..690.955 rows=3349990 loops=1
Worker 1: actual time=0.050..691.595 rows=3297070 loops=1
Planning Time: 0.157 ms
Execution Time: 14598.416 ms
(17 rows)

Time: 14599.437 ms (00:14.599)
```

#14Dilip Kumar
dilipbalaut@gmail.com
In reply to: bucoo@sohu.com (#13)
Re: Re: parallel distinct union and aggregate support patch

On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:

1) It's better to always include the whole patch series - including the
parts that have not changed. Otherwise people have to scavenge the
thread and search for all the pieces, which may be a source of issues.
Also, it confuses the patch tester [1] which tries to apply patches from
a single message, so it will fail for this one.

Pathes 3 and 4 do not rely on 1 and 2 in code.
But, it will fail when you apply the apatches 3 and 4 directly, because
they are written after 1 and 2.
I can generate a new single patch if you need.

2) I suggest you try to describe the goal of these patches, using some
example queries, explain output etc. Right now the reviewers have to
reverse engineer the patches and deduce what the intention was, which
may be causing unnecessary confusion etc. If this was my patch, I'd try
to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
how the patch changes the query plan, showing speedup etc.

I written some example queries in to regress, include "unique" "union"
"group by" and "group by grouping sets".
here is my tests, they are not in regress
```sql
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set jit = off;
\timing on
```
normal aggregate times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
Output: sum(id), txt
Group Key: gtest.txt
-> Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
Output: txt, (PARTIAL sum(id))
Workers Planned: 2
Workers Launched: 2
-> Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
Output: txt, (PARTIAL sum(id))
Sort Key: gtest.txt
Sort Method: external merge Disk: 11608kB
Worker 0: actual time=6447.665..7349.431 rows=317512 loops=1
Sort Method: external merge Disk: 10576kB
Worker 1: actual time=6302.882..7061.157 rows=333301 loops=1
Sort Method: external merge Disk: 11112kB
-> Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
Output: txt, PARTIAL sum(id)
Group Key: gtest.txt
Batches: 17 Memory Usage: 4241kB Disk Usage: 113152kB
Worker 0: actual time=2584.345..4486.407 rows=317512 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 101392kB
Worker 1: actual time=2584.369..4393.244 rows=333301 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 112832kB
-> Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.104..607.146 rows=3174970 loops=1
Worker 1: actual time=0.100..603.951 rows=3332785 loops=1
Planning Time: 0.226 ms
Execution Time: 9021.058 ms
(29 rows)

Time: 9022.251 ms (00:09.022)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
Output: (sum(id)), txt
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
Output: sum(id), txt
Group Key: gtest.txt
Worker 0: actual time=3094.550..5486.992 rows=326082 loops=1
Worker 1: actual time=3099.562..5480.111 rows=324729 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.080..646.053 rows=3057680 loops=1
Worker 1: actual time=0.070..662.754 rows=3034370 loops=1
Planning Time: 0.243 ms
Execution Time: 5788.981 ms
(15 rows)

Time: 5790.143 ms (00:05.790)
```

grouping sets times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
------------------------------------------------------------------------------------------
GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Sort Key: gtest.txt
Group Key: gtest.txt
-> Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
Output: txt, id
Sort Key: gtest.id
Sort Method: external merge Disk: 254056kB
-> Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
Output: txt, id
Planning Time: 0.230 ms
Execution Time: 39203.883 ms
(14 rows)

Time: 39205.339 ms (00:39.205)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
Output: (sum(id)), txt, id
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Group Key: gtest.txt
Worker 0: actual time=5916.370..14062.461 rows=513810 loops=1
Worker 1: actual time=5916.037..13932.847 rows=775901 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.052..690.955 rows=3349990 loops=1
Worker 1: actual time=0.050..691.595 rows=3297070 loops=1
Planning Time: 0.157 ms
Execution Time: 14598.416 ms
(17 rows)

Time: 14599.437 ms (00:14.599)
```

I have done some performance testing with TPCH to see the impact on
the different query plan, I could see there are a lot of plan changes
across various queries but out of those, there are few queries where
these patches gave noticeable gain query13 and query17 (I have
attached the plan for these 2 queries).

Test details:
----------------
TPCH scale factor 50 (database size 112GB)
work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4

Machine information:
Architecture: x86_64
CPU(s): 56
Thread(s) per core: 2
Core(s) per socket: 14
Socket(s): 2
NUMA node(s): 2
Model name: Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz

Observation:
In the TPCH test, I have noticed that the major gain we are getting in
this patch is because we are able to use the parallelism where we were
not able to use due to the limitation of the parallel aggregate.
Basically, for computing final aggregated results we need to break the
parallelism because the worker is only performing the partial
aggregate and after that, we had to gather all the partially
aggregated results and do the finalize aggregate. Now, with this
patch, since we are batching the results we are able to compute the
final aggregate within the workers itself and that enables us to get
the parallelism in more cases.

Example:
If we observe the output of plan 13(13.explain_head.out), the subquery
is performing the aggregate and the outer query is doing the grouping
on the aggregated value of the subquery, due to this we are not
selecting the parallelism in the head because in the inner aggregation
the number of groups is huge and if we select the parallelism we need
to transfer a lot of tuple through the tuple queue and we will also
have to serialize/deserialize those many transition values. And the
outer query needs the final aggregated results from the inner query so
we can not select the parallelism. Now with the batch
aggregate(13.explain_patch.out), we are able to compute the finalize
aggregation within the workers itself and that enabled us to continue
the parallelism till the top node. The execution time for this query
is now reduced to 57sec from 238sec which is 4X faster.

I will perform some more tests with different scale factors and
analyze the behavior of this.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachments:

13.explain_head.outapplication/octet-stream; name=13.explain_head.outDownload
17.explain_patch.outapplication/octet-stream; name=17.explain_patch.outDownload
17.explain_head.outapplication/octet-stream; name=17.explain_head.outDownload
13.explain_patch.outapplication/octet-stream; name=13.explain_patch.outDownload
13.sqlapplication/sql; name=13.sqlDownload
17.sqlapplication/sql; name=17.sqlDownload
#15Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#14)
Re: Re: parallel distinct union and aggregate support patch

On Tue, Nov 3, 2020 at 6:06 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:

1) It's better to always include the whole patch series - including the
parts that have not changed. Otherwise people have to scavenge the
thread and search for all the pieces, which may be a source of issues.
Also, it confuses the patch tester [1] which tries to apply patches from
a single message, so it will fail for this one.

Pathes 3 and 4 do not rely on 1 and 2 in code.
But, it will fail when you apply the apatches 3 and 4 directly, because
they are written after 1 and 2.
I can generate a new single patch if you need.

2) I suggest you try to describe the goal of these patches, using some
example queries, explain output etc. Right now the reviewers have to
reverse engineer the patches and deduce what the intention was, which
may be causing unnecessary confusion etc. If this was my patch, I'd try
to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
how the patch changes the query plan, showing speedup etc.

I written some example queries in to regress, include "unique" "union"
"group by" and "group by grouping sets".
here is my tests, they are not in regress
```sql
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set jit = off;
\timing on
```
normal aggregate times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
Output: sum(id), txt
Group Key: gtest.txt
-> Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
Output: txt, (PARTIAL sum(id))
Workers Planned: 2
Workers Launched: 2
-> Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
Output: txt, (PARTIAL sum(id))
Sort Key: gtest.txt
Sort Method: external merge Disk: 11608kB
Worker 0: actual time=6447.665..7349.431 rows=317512 loops=1
Sort Method: external merge Disk: 10576kB
Worker 1: actual time=6302.882..7061.157 rows=333301 loops=1
Sort Method: external merge Disk: 11112kB
-> Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
Output: txt, PARTIAL sum(id)
Group Key: gtest.txt
Batches: 17 Memory Usage: 4241kB Disk Usage: 113152kB
Worker 0: actual time=2584.345..4486.407 rows=317512 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 101392kB
Worker 1: actual time=2584.369..4393.244 rows=333301 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 112832kB
-> Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.104..607.146 rows=3174970 loops=1
Worker 1: actual time=0.100..603.951 rows=3332785 loops=1
Planning Time: 0.226 ms
Execution Time: 9021.058 ms
(29 rows)

Time: 9022.251 ms (00:09.022)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
Output: (sum(id)), txt
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
Output: sum(id), txt
Group Key: gtest.txt
Worker 0: actual time=3094.550..5486.992 rows=326082 loops=1
Worker 1: actual time=3099.562..5480.111 rows=324729 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.080..646.053 rows=3057680 loops=1
Worker 1: actual time=0.070..662.754 rows=3034370 loops=1
Planning Time: 0.243 ms
Execution Time: 5788.981 ms
(15 rows)

Time: 5790.143 ms (00:05.790)
```

grouping sets times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
------------------------------------------------------------------------------------------
GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Sort Key: gtest.txt
Group Key: gtest.txt
-> Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
Output: txt, id
Sort Key: gtest.id
Sort Method: external merge Disk: 254056kB
-> Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
Output: txt, id
Planning Time: 0.230 ms
Execution Time: 39203.883 ms
(14 rows)

Time: 39205.339 ms (00:39.205)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
Output: (sum(id)), txt, id
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Group Key: gtest.txt
Worker 0: actual time=5916.370..14062.461 rows=513810 loops=1
Worker 1: actual time=5916.037..13932.847 rows=775901 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.052..690.955 rows=3349990 loops=1
Worker 1: actual time=0.050..691.595 rows=3297070 loops=1
Planning Time: 0.157 ms
Execution Time: 14598.416 ms
(17 rows)

Time: 14599.437 ms (00:14.599)
```

I have done some performance testing with TPCH to see the impact on
the different query plan, I could see there are a lot of plan changes
across various queries but out of those, there are few queries where
these patches gave noticeable gain query13 and query17 (I have
attached the plan for these 2 queries).

Test details:
----------------
TPCH scale factor 50 (database size 112GB)
work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4

Machine information:
Architecture: x86_64
CPU(s): 56
Thread(s) per core: 2
Core(s) per socket: 14
Socket(s): 2
NUMA node(s): 2
Model name: Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz

Observation:
In the TPCH test, I have noticed that the major gain we are getting in
this patch is because we are able to use the parallelism where we were
not able to use due to the limitation of the parallel aggregate.
Basically, for computing final aggregated results we need to break the
parallelism because the worker is only performing the partial
aggregate and after that, we had to gather all the partially
aggregated results and do the finalize aggregate. Now, with this
patch, since we are batching the results we are able to compute the
final aggregate within the workers itself and that enables us to get
the parallelism in more cases.

Example:
If we observe the output of plan 13(13.explain_head.out), the subquery
is performing the aggregate and the outer query is doing the grouping
on the aggregated value of the subquery, due to this we are not
selecting the parallelism in the head because in the inner aggregation
the number of groups is huge and if we select the parallelism we need
to transfer a lot of tuple through the tuple queue and we will also
have to serialize/deserialize those many transition values. And the
outer query needs the final aggregated results from the inner query so
we can not select the parallelism. Now with the batch
aggregate(13.explain_patch.out), we are able to compute the finalize
aggregation within the workers itself and that enabled us to continue
the parallelism till the top node. The execution time for this query
is now reduced to 57sec from 238sec which is 4X faster.

I will perform some more tests with different scale factors and
analyze the behavior of this.

I have started reviewing these patches, I have a couple of review comments.

Some general comment to make code more readable

1. Comments are missing in the patch, even there are no function
header comments to explain the overall idea about the function.
I think adding comments will make it easier to review the patch.

2. Code is not written as per the Postgres coding guideline, the
common problems observed with the patch are
a) There should be an empty line after the variable declaration section
b) In the function definition, the function return type and the
function name should not be in the same line

Change

+static bool ExecNextParallelBatchSort(BatchSortState *state)
{
}
to
static bool
ExecNextParallelBatchSort(BatchSortState *state)
{
}

c) While typecasting the variable the spacing is not used properly and
uniformly, you can refer to other code and fix it.

*Specific comments to patch 0001*

1.
+#define BATCH_SORT_MAX_BATCHES 512

Did you decide this number based on some experiment or is there some
analysis behind selecting this number?

2.
+BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
+{
+ BatchSortState *state;
+ TypeCacheEntry *typentry;
....
+ for (i=0;i<node->numGroupCols;++i)
+ {
...
+ InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
+ fcinfo->args[0].isnull = false;
+ state->groupFuns = lappend(state->groupFuns, fcinfo);
+ }

From the variable naming, it appeared like the batch sort is dependent
upon the grouping node. I think instead of using the name
numGroupCols and groupFuns we need to use names that are more relevant
to the batch sort something like numSortKey.

3.
+ if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
+ {
+ /* for now, we only using in group aggregate */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("not support execute flag(s) %d for group sort", eflags)));
+ }

Instead of ereport, you should just put an Assert for the unsupported
flag or elog.

4.
+ state = makeNode(BatchSortState);
+ state->ps.plan = (Plan*) node;
+ state->ps.state = estate;
+ state->ps.ExecProcNode = ExecBatchSortPrepare;

I think the main executor entry function should be named ExecBatchSort
instead of ExecBatchSortPrepare, it will look more consistent with the
other executor machinery.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#16Dilip Kumar
dilipbalaut@gmail.com
In reply to: Dilip Kumar (#15)
Re: Re: parallel distinct union and aggregate support patch

On Sun, Nov 8, 2020 at 11:54 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Tue, Nov 3, 2020 at 6:06 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Thu, Oct 29, 2020 at 12:53 PM bucoo@sohu.com <bucoo@sohu.com> wrote:

1) It's better to always include the whole patch series - including the
parts that have not changed. Otherwise people have to scavenge the
thread and search for all the pieces, which may be a source of issues.
Also, it confuses the patch tester [1] which tries to apply patches from
a single message, so it will fail for this one.

Pathes 3 and 4 do not rely on 1 and 2 in code.
But, it will fail when you apply the apatches 3 and 4 directly, because
they are written after 1 and 2.
I can generate a new single patch if you need.

2) I suggest you try to describe the goal of these patches, using some
example queries, explain output etc. Right now the reviewers have to
reverse engineer the patches and deduce what the intention was, which
may be causing unnecessary confusion etc. If this was my patch, I'd try
to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
how the patch changes the query plan, showing speedup etc.

I written some example queries in to regress, include "unique" "union"
"group by" and "group by grouping sets".
here is my tests, they are not in regress
```sql
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set jit = off;
\timing on
```
normal aggregate times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
Output: sum(id), txt
Group Key: gtest.txt
-> Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
Output: txt, (PARTIAL sum(id))
Workers Planned: 2
Workers Launched: 2
-> Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
Output: txt, (PARTIAL sum(id))
Sort Key: gtest.txt
Sort Method: external merge Disk: 11608kB
Worker 0: actual time=6447.665..7349.431 rows=317512 loops=1
Sort Method: external merge Disk: 10576kB
Worker 1: actual time=6302.882..7061.157 rows=333301 loops=1
Sort Method: external merge Disk: 11112kB
-> Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
Output: txt, PARTIAL sum(id)
Group Key: gtest.txt
Batches: 17 Memory Usage: 4241kB Disk Usage: 113152kB
Worker 0: actual time=2584.345..4486.407 rows=317512 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 101392kB
Worker 1: actual time=2584.369..4393.244 rows=333301 loops=1
Batches: 17 Memory Usage: 4241kB Disk Usage: 112832kB
-> Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.104..607.146 rows=3174970 loops=1
Worker 1: actual time=0.100..603.951 rows=3332785 loops=1
Planning Time: 0.226 ms
Execution Time: 9021.058 ms
(29 rows)

Time: 9022.251 ms (00:09.022)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by txt;
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
Output: (sum(id)), txt
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
Output: sum(id), txt
Group Key: gtest.txt
Worker 0: actual time=3094.550..5486.992 rows=326082 loops=1
Worker 1: actual time=3099.562..5480.111 rows=324729 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.080..646.053 rows=3057680 loops=1
Worker 1: actual time=0.070..662.754 rows=3034370 loops=1
Planning Time: 0.243 ms
Execution Time: 5788.981 ms
(15 rows)

Time: 5790.143 ms (00:05.790)
```

grouping sets times
```
set enable_batch_hashagg = off;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
------------------------------------------------------------------------------------------
GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Sort Key: gtest.txt
Group Key: gtest.txt
-> Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
Output: txt, id
Sort Key: gtest.id
Sort Method: external merge Disk: 254056kB
-> Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
Output: txt, id
Planning Time: 0.230 ms
Execution Time: 39203.883 ms
(14 rows)

Time: 39205.339 ms (00:39.205)

set enable_batch_hashagg = on;
explain (costs off,analyze,verbose)
select sum(id),txt from gtest group by grouping sets(id,txt,());
QUERY PLAN
-------------------------------------------------------------------------------------------------
Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
Output: (sum(id)), txt, id
Workers Planned: 2
Workers Launched: 2
-> Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
Output: sum(id), txt, id
Group Key: gtest.id
Group Key: ()
Group Key: gtest.txt
Worker 0: actual time=5916.370..14062.461 rows=513810 loops=1
Worker 1: actual time=5916.037..13932.847 rows=775901 loops=1
-> Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
Output: id, txt
Worker 0: actual time=0.052..690.955 rows=3349990 loops=1
Worker 1: actual time=0.050..691.595 rows=3297070 loops=1
Planning Time: 0.157 ms
Execution Time: 14598.416 ms
(17 rows)

Time: 14599.437 ms (00:14.599)
```

I have done some performance testing with TPCH to see the impact on
the different query plan, I could see there are a lot of plan changes
across various queries but out of those, there are few queries where
these patches gave noticeable gain query13 and query17 (I have
attached the plan for these 2 queries).

Test details:
----------------
TPCH scale factor 50 (database size 112GB)
work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4

Machine information:
Architecture: x86_64
CPU(s): 56
Thread(s) per core: 2
Core(s) per socket: 14
Socket(s): 2
NUMA node(s): 2
Model name: Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz

Observation:
In the TPCH test, I have noticed that the major gain we are getting in
this patch is because we are able to use the parallelism where we were
not able to use due to the limitation of the parallel aggregate.
Basically, for computing final aggregated results we need to break the
parallelism because the worker is only performing the partial
aggregate and after that, we had to gather all the partially
aggregated results and do the finalize aggregate. Now, with this
patch, since we are batching the results we are able to compute the
final aggregate within the workers itself and that enables us to get
the parallelism in more cases.

Example:
If we observe the output of plan 13(13.explain_head.out), the subquery
is performing the aggregate and the outer query is doing the grouping
on the aggregated value of the subquery, due to this we are not
selecting the parallelism in the head because in the inner aggregation
the number of groups is huge and if we select the parallelism we need
to transfer a lot of tuple through the tuple queue and we will also
have to serialize/deserialize those many transition values. And the
outer query needs the final aggregated results from the inner query so
we can not select the parallelism. Now with the batch
aggregate(13.explain_patch.out), we are able to compute the finalize
aggregation within the workers itself and that enabled us to continue
the parallelism till the top node. The execution time for this query
is now reduced to 57sec from 238sec which is 4X faster.

I will perform some more tests with different scale factors and
analyze the behavior of this.

I have started reviewing these patches, I have a couple of review comments.

Some general comment to make code more readable

1. Comments are missing in the patch, even there are no function
header comments to explain the overall idea about the function.
I think adding comments will make it easier to review the patch.

2. Code is not written as per the Postgres coding guideline, the
common problems observed with the patch are
a) There should be an empty line after the variable declaration section
b) In the function definition, the function return type and the
function name should not be in the same line

Change

+static bool ExecNextParallelBatchSort(BatchSortState *state)
{
}
to
static bool
ExecNextParallelBatchSort(BatchSortState *state)
{
}

c) While typecasting the variable the spacing is not used properly and
uniformly, you can refer to other code and fix it.

*Specific comments to patch 0001*

1.
+#define BATCH_SORT_MAX_BATCHES 512

Did you decide this number based on some experiment or is there some
analysis behind selecting this number?

2.
+BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
+{
+ BatchSortState *state;
+ TypeCacheEntry *typentry;
....
+ for (i=0;i<node->numGroupCols;++i)
+ {
...
+ InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
+ fcinfo->args[0].isnull = false;
+ state->groupFuns = lappend(state->groupFuns, fcinfo);
+ }

From the variable naming, it appeared like the batch sort is dependent
upon the grouping node. I think instead of using the name
numGroupCols and groupFuns we need to use names that are more relevant
to the batch sort something like numSortKey.

3.
+ if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
+ {
+ /* for now, we only using in group aggregate */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("not support execute flag(s) %d for group sort", eflags)));
+ }

Instead of ereport, you should just put an Assert for the unsupported
flag or elog.

4.
+ state = makeNode(BatchSortState);
+ state->ps.plan = (Plan*) node;
+ state->ps.state = estate;
+ state->ps.ExecProcNode = ExecBatchSortPrepare;

I think the main executor entry function should be named ExecBatchSort
instead of ExecBatchSortPrepare, it will look more consistent with the
other executor machinery.

1.
+void cost_batchsort(Path *path, PlannerInfo *root,
+                    List *batchkeys, Cost input_cost,
+                    double tuples, int width,
+                    Cost comparison_cost, int sort_mem,
+                    uint32 numGroupCols, uint32 numBatches)
+{
+    Cost        startup_cost = input_cost;
+    Cost        run_cost = 0;
+    double        input_bytes = relation_byte_size(tuples, width);
+    double        batch_bytes = input_bytes / numBatches;
+    double        batch_tuples = tuples / numBatches;
+    long        sort_mem_bytes = sort_mem * 1024L;
+
+    if (sort_mem_bytes < (64*1024))
+        sort_mem_bytes = (64*1024);
+
+    if (!enable_batch_sort)
+        startup_cost += disable_cost;

You don't need to write a duplicate function for this, you can reuse
the cost_tuplesort function with some minor changes.

2. I have one more suggestion, currently, the batches are picked by
workers dynamically and the benefit of that is the work distribution
is quite flexible. But one downside I see with this approach is that
if we want to make this parallelism to the upper node for example
merge join, therein we can imagine the merge join with both side nodes
as BatchSort. But the problem is if the worker picks the batch
dynamically then the worker need to pick the same batch on both sides
so for that the right side node should be aware of what batch got
picked on the left side node so for doing that we might have to
introduce a different join node say BatchWiseMergeJoin. Whereas if we
make the batches as per the worker number then each sort node can be
processed independently without knowing what is happening on the other
side.

3. I have also done some performance tests especially with the small
group size, basically, the cases where parallel aggregate is not
picked due to the small group size, and with the new patch the
parallel aggregate is possible now.

Setup: I have used TPCH database with S.F 50 and executed an
aggregation query on the ORDER table

Number of rows in order table: 75000000
Total table size: 18 GB

Work_mem: 10GB

postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

QUERY
PLAN
--------------------------------------------------------------------------------------------------------------------------------------
HashAggregate (cost=2506201.00..2570706.04 rows=5160403 width=40)
(actual time=94002.681..98733.002 rows=4999889 loops=1)
Output: sum(o_totalprice), o_custkey
Group Key: orders.o_custkey
Batches: 1 Memory Usage: 2228241kB
-> Seq Scan on public.orders (cost=0.00..2131201.00 rows=75000000
width=16) (actual time=0.042..12930.981 rows=75000000 loops=1)
Output: o_orderkey, o_custkey, o_orderstatus, o_totalprice,
o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment
Planning Time: 0.317 ms
Execution Time: 99230.242 ms

postgres=# set enable_batch_sort=on;
SET
postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

QUERY PLAN

-------------------------------------------------------------------------------------------------------------------------------------------------
---------
Gather (cost=1616576.00..1761358.55 rows=40316 width=40) (actual
time=18516.549..28811.164 rows=4999889 loops=1)
Output: (sum(o_totalprice)), o_custkey
Workers Planned: 4
Workers Launched: 4
-> GroupAggregate (cost=1615576.00..1756326.99 rows=10079
width=40) (actual time=18506.051..28131.650 rows=999978 loops=5)
Output: sum(o_totalprice), o_custkey
Group Key: orders.o_custkey
Worker 0: actual time=18502.746..28406.868 rows=995092 loops=1
Worker 1: actual time=18502.339..28518.559 rows=1114511 loops=1
Worker 2: actual time=18503.233..28461.975 rows=985574 loops=1
Worker 3: actual time=18506.026..28409.130 rows=1005414 loops=1
-> Parallel BatchSort (cost=1615576.00..1662451.00
rows=18750000 width=16) (actual time=18505.982..21839.567
rows=15000000 loops=5)
Output: o_custkey, o_totalprice
Sort Key: orders.o_custkey
batches: 512
Worker 0: actual time=18502.666..21945.442 rows=14925544 loops=1
Worker 1: actual time=18502.270..21979.350 rows=16714443 loops=1
Worker 2: actual time=18503.144..21933.151 rows=14784292 loops=1
Worker 3: actual time=18505.950..21943.312 rows=15081559 loops=1
-> Parallel Seq Scan on public.orders
(cost=0.00..1568701.00 rows=18750000 width=16) (actual
time=0.082..4662.390 rows=15000000
loops=5)
Output: o_custkey, o_totalprice
Worker 0: actual time=0.079..4720.424
rows=15012981 loops=1
Worker 1: actual time=0.083..4710.919
rows=15675399 loops=1
Worker 2: actual time=0.082..4663.096
rows=14558663 loops=1
Worker 3: actual time=0.104..4625.940
rows=14496910 loops=1
Planning Time: 0.281 ms
Execution Time: 29504.248 ms

postgres=# set enable_batch_hashagg =on;
postgres=# set enable_batch_sort=off;
postgres=# explain (analyze, verbose) select sum(o_totalprice) from
orders group by o_custkey;

QUERY PLAN

-------------------------------------------------------------------------------------------------------------------------------------------------
---
Gather (cost=1755004.00..2287170.56 rows=5160403 width=40) (actual
time=12935.338..27064.962 rows=4999889 loops=1)
Output: (sum(o_totalprice)), o_custkey
Workers Planned: 4
Workers Launched: 4
-> Parallel BatchHashAggregate (cost=1754004.00..1770130.26
rows=1290101 width=40) (actual time=12987.830..24726.348 rows=999978
loops=5)
Output: sum(o_totalprice), o_custkey
Group Key: orders.o_custkey
Worker 0: actual time=13013.228..25078.902 rows=999277 loops=1
Worker 1: actual time=12917.375..25456.751 rows=1100607 loops=1
Worker 2: actual time=13041.088..24022.445 rows=900562 loops=1
Worker 3: actual time=13032.732..25230.101 rows=1001386 loops=1
-> Parallel Seq Scan on public.orders
(cost=0.00..1568701.00 rows=18750000 width=16) (actual
time=0.059..2764.881 rows=15000000 loops=
5)
Output: o_orderkey, o_custkey, o_orderstatus,
o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority,
o_comment
Worker 0: actual time=0.056..2754.621 rows=14924063 loops=1
Worker 1: actual time=0.063..2815.688 rows=16241825 loops=1
Worker 2: actual time=0.067..2750.927 rows=14064529 loops=1
Worker 3: actual time=0.055..2753.620 rows=14699841 loops=1
Planning Time: 0.209 ms
Execution Time: 27728.363 ms
(19 rows)

I think both parallel batch-wise grouping aggregate and the batch-wise
hash aggregate are giving very huge improvement when the typical group
size is small.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#17Heikki Linnakangas
heikki.linnakangas@enterprisedb.com
In reply to: Dilip Kumar (#15)
Re: parallel distinct union and aggregate support patch

I also had a quick look at the patch and the comments made so far. Summary:

1. The performance results are promising.

2. The code needs comments.

Regarding the design:

Thomas Munro mentioned the idea of a "Parallel Repartition" node that
would redistribute tuples like this. As I understand it, the difference
is that this BatchSort implementation collects all tuples in a tuplesort
or a tuplestore, while a Parallel Repartition node would just
redistribute the tuples to the workers, without buffering. The receiving
worker could put the tuples to a tuplestore or sort if needed.

I think a non-buffering Reparttion node would be simpler, and thus
better. In these patches, you have a BatchSort node, and batchstore, but
a simple Parallel Repartition node could do both. For example, to
implement distinct:

Gather
- > Unique
-> Sort
-> Parallel Redistribute
-> Parallel Seq Scan

And a Hash Agg would look like this:

Gather
- > Hash Agg
-> Parallel Redistribute
-> Parallel Seq Scan

I'm marking this as Waiting on Author in the commitfest.

- Heikki

#18Robert Haas
robertmhaas@gmail.com
In reply to: Heikki Linnakangas (#17)
Re: parallel distinct union and aggregate support patch

On Fri, Nov 27, 2020 at 10:55 AM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

I think a non-buffering Reparttion node would be simpler, and thus
better. In these patches, you have a BatchSort node, and batchstore, but
a simple Parallel Repartition node could do both. For example, to
implement distinct:

Gather
- > Unique
-> Sort
-> Parallel Redistribute
-> Parallel Seq Scan

And a Hash Agg would look like this:

Gather
- > Hash Agg
-> Parallel Redistribute
-> Parallel Seq Scan

I'm marking this as Waiting on Author in the commitfest.

I'm also intrigued by the parallel redistribute operator -- it seems
like it might be more flexible than this approach. However, I'm
concerned that there may be deadlock risks. If there is no buffer, or
a fixed-size buffer, the buffer might be full, and process trying to
jam tuples into the parallel redistribute would have to wait. Now if A
can wait for B and at the same time B can wait for A, deadlock will
ensue. In a naive implementation, this could happen with a single
parallel redistribute operator: worker 1 is trying to send a tuple to
worker 2, which can't receive it because it's busy sending a tuple to
worker 1. That could probably be fixed by arranging for workers to try
to try to receive data whenever they block in the middle of sending
data. However, in general there can be multiple nodes that cause
waiting in the tree: any number of Parallel Redistribute nodes, plus a
Gather, plus maybe other stuff. The cheap way out of that problem is
to use a buffer that can grow arbitrarily large, but that's not
terribly satisfying either.

--
Robert Haas
EDB: http://www.enterprisedb.com

#19Dilip Kumar
dilipbalaut@gmail.com
In reply to: Heikki Linnakangas (#17)
Re: parallel distinct union and aggregate support patch

On Fri, Nov 27, 2020 at 9:25 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:

I also had a quick look at the patch and the comments made so far. Summary:

1. The performance results are promising.

2. The code needs comments.

Regarding the design:

Thomas Munro mentioned the idea of a "Parallel Repartition" node that
would redistribute tuples like this. As I understand it, the difference
is that this BatchSort implementation collects all tuples in a tuplesort
or a tuplestore, while a Parallel Repartition node would just
redistribute the tuples to the workers, without buffering.

I think the advantage of the "Parallel BatchSort" is that it give
flexibility to pick the batches dynamically by the worker after the
repartition. OTOH if we distribute batches directly based on the
worker number the advantage is that the operator will be quite
flexible, e.g. if we want to implement the merge join we can just
place the "Parallel Repartition" node above both side of the scan node
and we will simply get the batch wise merge join because each worker
knows their batch. Whereas if we allow workers to dynamically pick
the batch the right side node needs to know which batch to pick
because it is dynamically picked, I mean it is not as difficult
because it is the same worker but it seems less flexible.

The receiving

worker could put the tuples to a tuplestore or sort if needed.

If we are using it without buffering then the sending worker can
directly put the tuple into the respective sort/tuplestore node.

I think a non-buffering Reparttion node would be simpler, and thus
better. In these patches, you have a BatchSort node, and batchstore, but
a simple Parallel Repartition node could do both. For example, to
implement distinct:

Gather
- > Unique
-> Sort
-> Parallel Redistribute
-> Parallel Seq Scan

And a Hash Agg would look like this:

Gather
- > Hash Agg
-> Parallel Redistribute
-> Parallel Seq Scan

I'm marking this as Waiting on Author in the commitfest.

I agree that the simple parallel redistribute/repartition node will be
flexible and could do both, but I see one problem. Basically, if we
use the common operator then first the Parallel Redistribute operator
will use the tuplestore for redistributing the data as per the worker
and then each worker might use the disk again to sort their respective
data. Instead of that while redistributing the data itself we can use
the parallel sort so that each worker gets their respective batch in
form of sorted tapes.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

#20bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
Re: Re: parallel distinct union and aggregate support patch

1.
+#define BATCH_SORT_MAX_BATCHES 512

Did you decide this number based on some experiment or is there some
analysis behind selecting this number?

When there are too few batches, if a certain process works too slowly, it will cause unbalanced load.
When there are too many batches, FD will open and close files frequently.

2.
+BatchSortState* ExecInitBatchSort(BatchSort *node, EState *estate, int eflags)
+{
+ BatchSortState *state;
+ TypeCacheEntry *typentry;
....
+ for (i=0;i<node->numGroupCols;++i)
+ {
...
+ InitFunctionCallInfoData(*fcinfo, flinfo, 1, attr->attcollation, NULL, NULL);
+ fcinfo->args[0].isnull = false;
+ state->groupFuns = lappend(state->groupFuns, fcinfo);
+ }

From the variable naming, it appeared like the batch sort is dependent
upon the grouping node. I think instead of using the name
numGroupCols and groupFuns we need to use names that are more relevant
to the batch sort something like numSortKey.

Not all data types support both sorting and hashing calculations, such as user-defined data types.
We do not need all columns to support hash calculation when we batch, so I used two variables.

3.
+ if (eflags & (EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))
+ {
+ /* for now, we only using in group aggregate */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("not support execute flag(s) %d for group sort", eflags)));
+ }

Instead of ereport, you should just put an Assert for the unsupported
flag or elog.

In fact, this is an unfinished feature, BatchSort should also support these features, welcome to supplement.

4.
+ state = makeNode(BatchSortState);
+ state->ps.plan = (Plan*) node;
+ state->ps.state = estate;
+ state->ps.ExecProcNode = ExecBatchSortPrepare;

I think the main executor entry function should be named ExecBatchSort
instead of ExecBatchSortPrepare, it will look more consistent with the
other executor machinery.

The job of the ExecBatchSortPrepare function is to preprocess the data (batch and pre-sort),
and when its work ends, it will call "ExecSetExecProcNode(pstate, ExecBatchSort)" to return the data to the ExecBatchSort function.
There is another advantage of dividing into two functions,
It is not necessary to judge whether tuplesort is now available every time the function is processed to improve the subtle performance.
And I think this code is clearer.

#21bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
#22David Steele
david@pgmasters.net
In reply to: bucoo@sohu.com (#21)
#23bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
#24David Rowley
dgrowleyml@gmail.com
In reply to: bucoo@sohu.com (#23)
#25bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
#26bucoo@sohu.com
bucoo@sohu.com
In reply to: bucoo@sohu.com (#1)
#27Daniel Gustafsson
daniel@yesql.se
In reply to: David Steele (#22)