FDW and parallel execution

Started by Konstantin Knizhnikabout 9 years ago5 messageshackers
Jump to latest
#1Konstantin Knizhnik
k.knizhnik@postgrespro.ru

Hi hackers and personally Robet (you are the best expert in both areas).
I want to ask one more question concerning parallel execution and FDW.
Below are two plans for the same query (TPC-H Q5): one for normal tables, another for FDW to vertical representation of the same data.
FDW supports analyze function and is expected to produce the similar statistic as for original tables.

Query plans are the following:

Normal tables:

Sort (cost=2041588.48..2041588.98 rows=200 width=48)
Sort Key: (sum((lineitem.l_extendedprice * ('1'::double precision - lineitem.l_discount)))) DESC
-> Finalize GroupAggregate (cost=2041335.76..2041580.83 rows=200 width=48)
Group Key: nation.n_name
-> Gather Merge (cost=2041335.76..2041568.33 rows=1400 width=48)
Workers Planned: 7
-> Partial GroupAggregate (cost=2040335.64..2040396.71 rows=200 width=48)
Group Key: nation.n_name
-> Sort (cost=2040335.64..2040345.49 rows=3938 width=40)
Sort Key: nation.n_name
-> Hash Join (cost=605052.97..2040100.48 rows=3938 width=40)
Hash Cond: ((orders.o_custkey = customer.c_custkey) AND (nation.n_nationkey = customer.c_nationkey))
-> Hash Join (cost=525126.37..1951647.85 rows=98414 width=52)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Hash Join (cost=3802.22..1404473.37 rows=654240 width=52)
Hash Cond: (lineitem.l_suppkey = supplier.s_suppkey)
-> Parallel Seq Scan on lineitem (cost=0.00..1361993.36 rows=8569436 width=16)
-> Hash (cost=3705.97..3705.97 rows=7700 width=44)
-> Hash Join (cost=40.97..3705.97 rows=7700 width=44)
Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
-> Seq Scan on supplier (cost=0.00..3090.00 rows=100000 width=8)
-> Hash (cost=40.79..40.79 rows=15 width=36)
-> Hash Join (cost=20.05..40.79 rows=15 width=36)
Hash Cond: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on nation (cost=0.00..17.70 rows=770 width=40)
-> Hash (cost=20.00..20.00 rows=4 width=4)
-> Seq Scan on region (cost=0.00..20.00 rows=4 width=4)
Filter: ((r_name)::text = 'ASIA'::text)
-> Hash (cost=484302.37..484302.37 rows=2256542 width=8)
-> Seq Scan on orders (cost=0.00..484302.37 rows=2256542 width=8)
Filter: ((o_orderdate >= '1996-01-01'::date) AND (o_orderdate < '1997-01-01'::date))
-> Hash (cost=51569.64..51569.64 rows=1499864 width=8)
-> Seq Scan on customer (cost=0.00..51569.64 rows=1499864 width=8)

Plan with FDW:

Sort (cost=2337312.28..2337312.78 rows=200 width=48)
Sort Key: (sum((lineitem_fdw.l_extendedprice * ('1'::double precision - lineitem_fdw.l_discount)))) DESC
-> GroupAggregate (cost=2336881.54..2337304.64 rows=200 width=48)
Group Key: nation.n_name
-> Sort (cost=2336881.54..2336951.73 rows=28073 width=40)
Sort Key: nation.n_name
-> Hash Join (cost=396050.65..2334807.39 rows=28073 width=40)
Hash Cond: ((orders_fdw.o_custkey = customer_fdw.c_custkey) AND (nation.n_nationkey = customer_fdw.c_nationkey))
-> Hash Join (cost=335084.53..2247223.46 rows=701672 width=52)
Hash Cond: (lineitem_fdw.l_orderkey = orders_fdw.o_orderkey)
-> Hash Join (cost=2887.07..1786058.18 rows=4607421 width=52)
Hash Cond: (lineitem_fdw.l_suppkey = supplier_fdw.s_suppkey)
-> Foreign Scan on lineitem_fdw (cost=0.00..1512151.52 rows=59986176 width=16)
-> Hash (cost=2790.80..2790.80 rows=7702 width=44)
-> Hash Join (cost=40.97..2790.80 rows=7702 width=44)
Hash Cond: (supplier_fdw.s_nationkey = nation.n_nationkey)
-> Foreign Scan on supplier_fdw (cost=0.00..2174.64 rows=100032 width=8)
-> Hash (cost=40.79..40.79 rows=15 width=36)
-> Hash Join (cost=20.05..40.79 rows=15 width=36)
Hash Cond: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on nation (cost=0.00..17.70 rows=770 width=40)
-> Hash (cost=20.00..20.00 rows=4 width=4)
-> Seq Scan on region (cost=0.00..20.00 rows=4 width=4)
Filter: ((r_name)::text = 'ASIA'::text)
-> Hash (cost=294718.76..294718.76 rows=2284376 width=8)
-> Foreign Scan on orders_fdw (cost=0.00..294718.76 rows=2284376 width=8)
-> Hash (cost=32605.64..32605.64 rows=1500032 width=8)
-> Foreign Scan on customer_fdw (cost=0.00..32605.64 rows=1500032 width=8)

The plans look very similar, but first one is parallel and second - not.
My FDW provides implementation for IsForeignScanParallelSafe which returns true.
I wonder what can prevent optimizer from using parallel plan in this case?

Thank in advance,

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

#2Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Konstantin Knizhnik (#1)
Re: FDW and parallel execution

Hi,

At Sun, 02 Apr 2017 16:30:24 +0300, Konstantin Knizhnik <k.knizhnik@postgrespro.ru> wrote in <58E0FCF0.2070603@postgrespro.ru>

Hi hackers and personally Robet (you are the best expert in both
areas).
I want to ask one more question concerning parallel execution and FDW.
Below are two plans for the same query (TPC-H Q5): one for normal
tables, another for FDW to vertical representation of the same data.
FDW supports analyze function and is expected to produce the similar
statistic as for original tables.

<big explain>

The plans look very similar, but first one is parallel and second -
not.
My FDW provides implementation for IsForeignScanParallelSafe which
returns true.
I wonder what can prevent optimizer from using parallel plan in this
case?

Parallel execution requires partial paths. It's the work for
GetForeignPaths of your FDW.

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Konstantin Knizhnik
k.knizhnik@postgrespro.ru
In reply to: Kyotaro Horiguchi (#2)
Re: FDW and parallel execution

On 04.04.2017 13:29, Kyotaro HORIGUCHI wrote:

Hi,

At Sun, 02 Apr 2017 16:30:24 +0300, Konstantin Knizhnik <k.knizhnik@postgrespro.ru> wrote in <58E0FCF0.2070603@postgrespro.ru>

Hi hackers and personally Robet (you are the best expert in both
areas).
I want to ask one more question concerning parallel execution and FDW.
Below are two plans for the same query (TPC-H Q5): one for normal
tables, another for FDW to vertical representation of the same data.
FDW supports analyze function and is expected to produce the similar
statistic as for original tables.

<big explain>

The plans look very similar, but first one is parallel and second -
not.
My FDW provides implementation for IsForeignScanParallelSafe which
returns true.
I wonder what can prevent optimizer from using parallel plan in this
case?

Parallel execution requires partial paths. It's the work for
GetForeignPaths of your FDW.

Thank you very much for explanation.
But unfortunately I still do not completely understand what kind of
queries allow parallel execution with FDW.

Section "FDW Routines for Parallel Execution" of FDW specification says:

A ForeignScan node can, optionally, support parallel execution. A
parallel ForeignScan will be executed in multiple processes and should
return each row only once across all cooperating processes. To do
this, processes can coordinate through fixed size chunks of dynamic
shared memory. This shared memory is not guaranteed to be mapped at
the same address in every process, so pointers may not be used. The
following callbacks are all optional in general, but required if
parallel execution is to be supported.

I provided IsForeignScanParallelSafe, EstimateDSMForeignScan,
InitializeDSMForeignSca and InitializeWorkerForeignScan in my FDW.
IsForeignScanParallelSafe returns true.
Also in GetForeignPaths function I created path with
baserel->consider_parallel == true.
Is it enough or I should do something else?

But unfortunately I failed to find any query: sequential scan, grand
aggregation, aggregation with group by, joins... when parallel execution
plan is used for this FDW.
Also there are no examples of using this functions in Postgres
distributive and I failed to find any such examples in Internet.

Can somebody please clarify my situation with parallel execution and FDW
and may be point at some examples?
Thank in advance.

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

#4Hans-Jürgen Schönig
postgres@cybertec.at
In reply to: Konstantin Knizhnik (#1)
Re: FDW and parallel execution

hello ...

did you check out antonin houska's patches?
we basically got code, which can do that.

many thanks,

hans

On 04/02/2017 03:30 PM, Konstantin Knizhnik wrote:

Hi hackers and personally Robet (you are the best expert in both areas).
I want to ask one more question concerning parallel execution and FDW.
Below are two plans for the same query (TPC-H Q5): one for normal
tables, another for FDW to vertical representation of the same data.
FDW supports analyze function and is expected to produce the similar
statistic as for original tables.

Query plans are the following:

Normal tables:

Sort (cost=2041588.48..2041588.98 rows=200 width=48)
Sort Key: (sum((lineitem.l_extendedprice * ('1'::double precision -
lineitem.l_discount)))) DESC
-> Finalize GroupAggregate (cost=2041335.76..2041580.83 rows=200
width=48)
Group Key: nation.n_name
-> Gather Merge (cost=2041335.76..2041568.33 rows=1400
width=48)
Workers Planned: 7
-> Partial GroupAggregate
(cost=2040335.64..2040396.71 rows=200 width=48)
Group Key: nation.n_name
-> Sort (cost=2040335.64..2040345.49 rows=3938
width=40)
Sort Key: nation.n_name
-> Hash Join (cost=605052.97..2040100.48
rows=3938 width=40)
Hash Cond: ((orders.o_custkey =
customer.c_custkey) AND (nation.n_nationkey = customer.c_nationkey))
-> Hash Join
(cost=525126.37..1951647.85 rows=98414 width=52)
Hash Cond: (lineitem.l_orderkey
= orders.o_orderkey)
-> Hash Join
(cost=3802.22..1404473.37 rows=654240 width=52)
Hash Cond:
(lineitem.l_suppkey = supplier.s_suppkey)
-> Parallel Seq Scan on
lineitem (cost=0.00..1361993.36 rows=8569436 width=16)
-> Hash
(cost=3705.97..3705.97 rows=7700 width=44)
-> Hash Join
(cost=40.97..3705.97 rows=7700 width=44)
Hash Cond:
(supplier.s_nationkey = nation.n_nationkey)
-> Seq Scan
on supplier (cost=0.00..3090.00 rows=100000 width=8)
-> Hash
(cost=40.79..40.79 rows=15 width=36)
->
Hash Join (cost=20.05..40.79 rows=15 width=36)

Hash Cond: (nation.n_regionkey = region.r_regionkey)

-> Seq Scan on nation (cost=0.00..17.70 rows=770 width=40)

-> Hash (cost=20.00..20.00 rows=4 width=4)

-> Seq Scan on region (cost=0.00..20.00 rows=4 width=4)

Filter: ((r_name)::text = 'ASIA'::text)
-> Hash
(cost=484302.37..484302.37 rows=2256542 width=8)
-> Seq Scan on orders
(cost=0.00..484302.37 rows=2256542 width=8)
Filter:
((o_orderdate >= '1996-01-01'::date) AND (o_orderdate <
'1997-01-01'::date))
-> Hash (cost=51569.64..51569.64
rows=1499864 width=8)
-> Seq Scan on customer
(cost=0.00..51569.64 rows=1499864 width=8)

Plan with FDW:

Sort (cost=2337312.28..2337312.78 rows=200 width=48)
Sort Key: (sum((lineitem_fdw.l_extendedprice * ('1'::double
precision - lineitem_fdw.l_discount)))) DESC
-> GroupAggregate (cost=2336881.54..2337304.64 rows=200 width=48)
Group Key: nation.n_name
-> Sort (cost=2336881.54..2336951.73 rows=28073 width=40)
Sort Key: nation.n_name
-> Hash Join (cost=396050.65..2334807.39 rows=28073
width=40)
Hash Cond: ((orders_fdw.o_custkey =
customer_fdw.c_custkey) AND (nation.n_nationkey =
customer_fdw.c_nationkey))
-> Hash Join (cost=335084.53..2247223.46
rows=701672 width=52)
Hash Cond: (lineitem_fdw.l_orderkey =
orders_fdw.o_orderkey)
-> Hash Join (cost=2887.07..1786058.18
rows=4607421 width=52)
Hash Cond: (lineitem_fdw.l_suppkey =
supplier_fdw.s_suppkey)
-> Foreign Scan on lineitem_fdw
(cost=0.00..1512151.52 rows=59986176 width=16)
-> Hash (cost=2790.80..2790.80
rows=7702 width=44)
-> Hash Join
(cost=40.97..2790.80 rows=7702 width=44)
Hash Cond:
(supplier_fdw.s_nationkey = nation.n_nationkey)
-> Foreign Scan on
supplier_fdw (cost=0.00..2174.64 rows=100032 width=8)
-> Hash
(cost=40.79..40.79 rows=15 width=36)
-> Hash Join
(cost=20.05..40.79 rows=15 width=36)
Hash Cond:
(nation.n_regionkey = region.r_regionkey)
-> Seq Scan
on nation (cost=0.00..17.70 rows=770 width=40)
-> Hash
(cost=20.00..20.00 rows=4 width=4)
-> Seq
Scan on region (cost=0.00..20.00 rows=4 width=4)

Filter: ((r_name)::text = 'ASIA'::text)
-> Hash (cost=294718.76..294718.76
rows=2284376 width=8)
-> Foreign Scan on orders_fdw
(cost=0.00..294718.76 rows=2284376 width=8)
-> Hash (cost=32605.64..32605.64 rows=1500032
width=8)
-> Foreign Scan on customer_fdw
(cost=0.00..32605.64 rows=1500032 width=8)

The plans look very similar, but first one is parallel and second - not.
My FDW provides implementation for IsForeignScanParallelSafe which
returns true.
I wonder what can prevent optimizer from using parallel plan in this case?

Thank in advance,
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

--
Hans-J�rgen Sch�nig
Cybertec Sch�nig & Sch�nig GmbH
Gr�hrm�hlgasse 26
A-2700 Wiener Neustadt
Web: http://www.postgresql-support.de, http://www.cybertec.at

#5Kyotaro Horiguchi
horikyota.ntt@gmail.com
In reply to: Konstantin Knizhnik (#1)
Re: FDW and parallel execution

Sorry for the too-brief reply.

At Tue, 11 Apr 2017 20:08:46 +0300, Konstantin Knizhnik <k.knizhnik@postgrespro.ru> wrote in <94c8692a-f299-b72b-6227-270b8a9ed7ad@postgrespro.ru>

On 04.04.2017 13:29, Kyotaro HORIGUCHI wrote:

Hi,

At Sun, 02 Apr 2017 16:30:24 +0300, Konstantin Knizhnik
<k.knizhnik@postgrespro.ru> wrote in <58E0FCF0.2070603@postgrespro.ru>

My FDW provides implementation for IsForeignScanParallelSafe which
returns true.
I wonder what can prevent optimizer from using parallel plan in this
case?

Parallel execution requires partial paths. It's the work for
GetForeignPaths of your FDW.

Thank you very much for explanation.
But unfortunately I still do not completely understand what kind of
queries allow parallel execution with FDW.

At Tue, 11 Apr 2017 19:20:04 +0200, PostgreSQL - Hans-Jürgen Schönig <postgres@cybertec.at> wrote in <0c9c101d-0fbb-1e19-f04c-7a6ec577d960@cybertec.at>

did you check out antonin houska's patches?
we basically got code, which can do that.

Parallel aggregation is already available. Antonin's patch is
partition-wise aggregation, which boosts the case where partition
key is aggregation key, I suppose. parallel aggregation seems to
be considered when any appropriate partial path is available. (I
haven't tried anything, though.)

set_plain_rel_pathlist() does the work for plain relations so
what we should do in GetForeignPaths would be follows.

- check rel->consider_parallel (won't be requried since the fDW
knows that) and rel->lateral_relids.

- If parallel is OK, create a path with create_foreignscan_path
in ordinary way then change some parallel related members as
necessary.

- Like create_plain_partial_paths(), check certain conditions and
finally add_partial_path() the created partial foreign scan path.

I haven't really done this, so I might be wrong.

Section "FDW Routines for Parallel Execution" of FDW specification
says:

A ForeignScan node can, optionally, support parallel execution. A
parallel ForeignScan will be executed in multiple processes and should
return each row only once across all cooperating processes. To do
this, processes can coordinate through fixed size chunks of dynamic
shared memory. This shared memory is not guaranteed to be mapped at
the same address in every process, so pointers may not be used. The
following callbacks are all optional in general, but required if
parallel execution is to be supported.

I provided IsForeignScanParallelSafe, EstimateDSMForeignScan,
InitializeDSMForeignSca and InitializeWorkerForeignScan in my FDW.
IsForeignScanParallelSafe returns true.
Also in GetForeignPaths function I created path with
baserel->consider_parallel == true.
Is it enough or I should do something else?

Creating partial paths, I think. create_grouping_paths() requires
partial_pathlist in input_rel.

The section is explaning FDW routines specially provided for
parallel execution. But it doesn't seem mentioning "how to run a
parallel execution" as a whole.

But unfortunately I failed to find any query: sequential scan, grand
aggregation, aggregation with group by, joins... when parallel
execution plan is used for this FDW.
Also there are no examples of using this functions in Postgres
distributive and I failed to find any such examples in Internet.

Maybe you're the pioneer in this area.

Can somebody please clarify my situation with parallel execution and
FDW and may be point at some examples?
Thank in advance.

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers