[DESIGN] ParallelAppend

Started by Kouhei Kaigaiover 10 years ago48 messages
#1Kouhei Kaigai
kaigai@ak.jp.nec.com

Hello,

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

2. Problems to be solved
-------------------------
Typical OLAP workloads takes tons of tables join and scan on large
tables which are often partitioned, and its KPI is query response
time but very small number of sessions are active simultaneously.
So, we are required to run a single query as rapid as possible even
if it consumes larger computing resources than typical OLTP workloads.

Current implementation to scan heap is painful when we look at its
behavior from the standpoint - how many rows we can read within a
certain time, because of synchronous manner.
In the worst case, when SeqScan node tries to fetch the next tuple,
heap_getnext() looks up a block on shared buffer, then ReadBuffer()
calls storage manager to read the target block from the filesystem
if not on the buffer. Next, operating system makes the caller
process slept until required i/o get completed.
Most of the cases are helped in earlier stage than the above worst
case, however, the best scenario we can expect is: the next tuple
already appear on top of the message queue (of course visibility
checks are already done also) with no fall down to buffer manager
or deeper.
If we can run multiple scans in parallel / asynchronous, CPU core
shall be assigned to another process by operating system, thus,
it eventually improves the i/o density and enables higher processing
throughput.
Append node is an ideal point to be parallelized because
- child nodes can have physically different location by tablespace,
so further tuning is possible according to the system landscape.
- it can control whether subplan is actually executed on background
worker, per subplan basis. If subplan contains large tables and
small tables, ParallelAppend may kick background worker to scan
large tables only, but scan on small tables are by itself.
- Like as Funnel node, we don't need to care about enhancement of
individual node types. SeqScan, IndexScan, ForeignScan or others
can perform as usual, but actually in parallel.

3. Implementation
------------------
* Plan & Cost

ParallelAppend shall appear where Appen can appear except for the
usage for dummy. So, I'll enhance set_append_rel_pathlist() to add
both of AppendPath and ParallelAppendPath with cost for each.
Cost estimation logic shall take further discussions, however,
I expect the logic below to estimate the cost for ParallelAppend.
1. Sum startup_cost and run_cost for each child pathnode, but
distinguish according to synchronous or asynchronous.
Probably, total cost of pathnode is less than:
(parallel_setup_cost + its total cost / parallel_append_degree
+ number of rows * cpu_tuple_comm_cost)
is nonsense to run on background worker.
2. parallel_setup_cost * (# of asynchronous nodes) are added to
sum of startup_cost of asynchronous nodes.
3. sum of run_cost of asynchronous nodes are divided by
parallel_append_degree, then cpu_tuple_comm_cost * (total # of
rows by asynchronous nodes) are added.
4. both of synchronous and asynchronous cost are added, then it
becomes the cost of ParallelAppend.
Obviously, it stand on the viewpoint that says: cost reflects response
time of the underlying plan. So, cost of ParallelAppend can be smaller
than sum of underlying child nodes.

* Execution

Like Funnel node, it kicks background worker on the ExecProcNode handler,
thus, its startup time may be later than Fujita-san's approach if call
of ParallelAppend would be late. For example, when ParallelAppend is
located under HashJoin but inner Hash loads billion of rows.
Even though I expect ExecParallelAppend takes, at least, simple round-
robin scheduling like funnel_getnext(), we may give synchronous nodes
than asynchronous just after the background worker startup.

4. Further challenges
----------------------
* Serialization of CustomScan via outfuncs.c/readfuncs.c
Because methods field is, basically, a set of pointers per process basis,
we need to have an infrastructure to reproduce same table on the background
worker process identified by the name.
(I also try to design it.)

* Duplication of the parallel
If Funnel+PartialSeqScan is located under ParallelAppend, directly
or indirectly, it eventually leads background worker process to launch
another background workers. Is it expected usage of the current background
workers??

* Join pushdown
Distribution of nested-loop and hash-join may have advantage by parallel
processing, and by reduction of hash-size if CHECK() constraint of
individual partitioned tables informs rows obviously not to be joined.
Also see the thread:
[idea] table partition + hash join: http://bit.ly/1S2xpHT
My colleague already started to investigate / develop this feature
based on existing Append, to reduce num_batches.

As an aside, my GpuJoin feature works most effectively if entire inner
relations can be loaded to hash-table on GPU RAM, so features are very
welcome.

* Sort break-down
If mergejoin tried to have ParallelAppend node on left or right input,
we may be able to compare its cost with MargeParallelAppend + Sort on
the partial relation.

* Aggregate Push Down
It is what I exactly want to do.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#2Kyotaro HORIGUCHI
horiguchi.kyotaro@lab.ntt.co.jp
In reply to: Kouhei Kaigai (#1)
Re: [DESIGN] ParallelAppend

Hello, can I ask some questions?

I suppose we can take this as the analog of ParalleSeqScan. I
can see not so distinction between Append(ParalleSeqScan) and
ParallelAppend(SeqScan). What difference is there between them?

If other nodes will have the same functionality as you mention at
the last of this proposal, it might be better that some part of
this feature is implemented as a part of existing executor
itself, but not as a deidicated additional node, just as my
asynchronous fdw execution patch patially does. (Although it
lacks planner part and bg worker launching..) If that is the
case, it might be better that ExecProcNode is modified so that it
supports both in-process and inter-bgworker cases by the single
API.

What do you think about this?

regards,

Hello,

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

2. Problems to be solved
-------------------------
Typical OLAP workloads takes tons of tables join and scan on large
tables which are often partitioned, and its KPI is query response
time but very small number of sessions are active simultaneously.
So, we are required to run a single query as rapid as possible even
if it consumes larger computing resources than typical OLTP workloads.

Current implementation to scan heap is painful when we look at its
behavior from the standpoint - how many rows we can read within a
certain time, because of synchronous manner.
In the worst case, when SeqScan node tries to fetch the next tuple,
heap_getnext() looks up a block on shared buffer, then ReadBuffer()
calls storage manager to read the target block from the filesystem
if not on the buffer. Next, operating system makes the caller
process slept until required i/o get completed.
Most of the cases are helped in earlier stage than the above worst
case, however, the best scenario we can expect is: the next tuple
already appear on top of the message queue (of course visibility
checks are already done also) with no fall down to buffer manager
or deeper.
If we can run multiple scans in parallel / asynchronous, CPU core
shall be assigned to another process by operating system, thus,
it eventually improves the i/o density and enables higher processing
throughput.
Append node is an ideal point to be parallelized because
- child nodes can have physically different location by tablespace,
so further tuning is possible according to the system landscape.
- it can control whether subplan is actually executed on background
worker, per subplan basis. If subplan contains large tables and
small tables, ParallelAppend may kick background worker to scan
large tables only, but scan on small tables are by itself.
- Like as Funnel node, we don't need to care about enhancement of
individual node types. SeqScan, IndexScan, ForeignScan or others
can perform as usual, but actually in parallel.

3. Implementation
------------------
* Plan & Cost

ParallelAppend shall appear where Appen can appear except for the
usage for dummy. So, I'll enhance set_append_rel_pathlist() to add
both of AppendPath and ParallelAppendPath with cost for each.
Cost estimation logic shall take further discussions, however,
I expect the logic below to estimate the cost for ParallelAppend.
1. Sum startup_cost and run_cost for each child pathnode, but
distinguish according to synchronous or asynchronous.
Probably, total cost of pathnode is less than:
(parallel_setup_cost + its total cost / parallel_append_degree
+ number of rows * cpu_tuple_comm_cost)
is nonsense to run on background worker.
2. parallel_setup_cost * (# of asynchronous nodes) are added to
sum of startup_cost of asynchronous nodes.
3. sum of run_cost of asynchronous nodes are divided by
parallel_append_degree, then cpu_tuple_comm_cost * (total # of
rows by asynchronous nodes) are added.
4. both of synchronous and asynchronous cost are added, then it
becomes the cost of ParallelAppend.
Obviously, it stand on the viewpoint that says: cost reflects response
time of the underlying plan. So, cost of ParallelAppend can be smaller
than sum of underlying child nodes.

* Execution

Like Funnel node, it kicks background worker on the ExecProcNode handler,
thus, its startup time may be later than Fujita-san's approach if call
of ParallelAppend would be late. For example, when ParallelAppend is
located under HashJoin but inner Hash loads billion of rows.
Even though I expect ExecParallelAppend takes, at least, simple round-
robin scheduling like funnel_getnext(), we may give synchronous nodes
than asynchronous just after the background worker startup.

4. Further challenges
----------------------
* Serialization of CustomScan via outfuncs.c/readfuncs.c
Because methods field is, basically, a set of pointers per process basis,
we need to have an infrastructure to reproduce same table on the background
worker process identified by the name.
(I also try to design it.)

* Duplication of the parallel
If Funnel+PartialSeqScan is located under ParallelAppend, directly
or indirectly, it eventually leads background worker process to launch
another background workers. Is it expected usage of the current background
workers??

* Join pushdown
Distribution of nested-loop and hash-join may have advantage by parallel
processing, and by reduction of hash-size if CHECK() constraint of
individual partitioned tables informs rows obviously not to be joined.
Also see the thread:
[idea] table partition + hash join: http://bit.ly/1S2xpHT
My colleague already started to investigate / develop this feature
based on existing Append, to reduce num_batches.

As an aside, my GpuJoin feature works most effectively if entire inner
relations can be loaded to hash-table on GPU RAM, so features are very
welcome.

* Sort break-down
If mergejoin tried to have ParallelAppend node on left or right input,
we may be able to compare its cost with MargeParallelAppend + Sort on
the partial relation.

* Aggregate Push Down
It is what I exactly want to do.

Thanks,

--
Kyotaro Horiguchi
NTT Open Source Software Center

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Amit Kapila
amit.kapila16@gmail.com
In reply to: Kouhei Kaigai (#1)
Re: [DESIGN] ParallelAppend

On Sun, Jul 26, 2015 at 8:43 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

Hello,

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

2. Problems to be solved
-------------------------
Typical OLAP workloads takes tons of tables join and scan on large
tables which are often partitioned, and its KPI is query response
time but very small number of sessions are active simultaneously.
So, we are required to run a single query as rapid as possible even
if it consumes larger computing resources than typical OLTP workloads.

Current implementation to scan heap is painful when we look at its
behavior from the standpoint - how many rows we can read within a
certain time, because of synchronous manner.
In the worst case, when SeqScan node tries to fetch the next tuple,
heap_getnext() looks up a block on shared buffer, then ReadBuffer()
calls storage manager to read the target block from the filesystem
if not on the buffer. Next, operating system makes the caller
process slept until required i/o get completed.
Most of the cases are helped in earlier stage than the above worst
case, however, the best scenario we can expect is: the next tuple
already appear on top of the message queue (of course visibility
checks are already done also) with no fall down to buffer manager
or deeper.
If we can run multiple scans in parallel / asynchronous, CPU core
shall be assigned to another process by operating system, thus,
it eventually improves the i/o density and enables higher processing
throughput.
Append node is an ideal point to be parallelized because
- child nodes can have physically different location by tablespace,
so further tuning is possible according to the system landscape.
- it can control whether subplan is actually executed on background
worker, per subplan basis. If subplan contains large tables and
small tables, ParallelAppend may kick background worker to scan
large tables only, but scan on small tables are by itself.
- Like as Funnel node, we don't need to care about enhancement of
individual node types. SeqScan, IndexScan, ForeignScan or others
can perform as usual, but actually in parallel.

3. Implementation
------------------
* Plan & Cost

ParallelAppend shall appear where Appen can appear except for the
usage for dummy. So, I'll enhance set_append_rel_pathlist() to add
both of AppendPath and ParallelAppendPath with cost for each.

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#4Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Kyotaro HORIGUCHI (#2)
Re: [DESIGN] ParallelAppend

Hello, can I ask some questions?

I suppose we can take this as the analog of ParalleSeqScan. I
can see not so distinction between Append(ParalleSeqScan) and
ParallelAppend(SeqScan). What difference is there between them?

Append does not start to execute the second or later node until
first node reaches end of the scan.
On the other hands, ParallelAppend will kick all the child nodes
(almost) simultaneously.

If other nodes will have the same functionality as you mention at
the last of this proposal, it might be better that some part of
this feature is implemented as a part of existing executor
itself, but not as a deidicated additional node, just as my
asynchronous fdw execution patch patially does. (Although it
lacks planner part and bg worker launching..) If that is the
case, it might be better that ExecProcNode is modified so that it
supports both in-process and inter-bgworker cases by the single
API.

What do you think about this?

Its downside is that we need to adjust all the existing nodes to
follow the new executor's capability. At this moment, we have 38
node types delivered from Plan. I think, it is not an easy job to
review a patch that changes multi-dozens files.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

regards,

Hello,

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

2. Problems to be solved
-------------------------
Typical OLAP workloads takes tons of tables join and scan on large
tables which are often partitioned, and its KPI is query response
time but very small number of sessions are active simultaneously.
So, we are required to run a single query as rapid as possible even
if it consumes larger computing resources than typical OLTP workloads.

Current implementation to scan heap is painful when we look at its
behavior from the standpoint - how many rows we can read within a
certain time, because of synchronous manner.
In the worst case, when SeqScan node tries to fetch the next tuple,
heap_getnext() looks up a block on shared buffer, then ReadBuffer()
calls storage manager to read the target block from the filesystem
if not on the buffer. Next, operating system makes the caller
process slept until required i/o get completed.
Most of the cases are helped in earlier stage than the above worst
case, however, the best scenario we can expect is: the next tuple
already appear on top of the message queue (of course visibility
checks are already done also) with no fall down to buffer manager
or deeper.
If we can run multiple scans in parallel / asynchronous, CPU core
shall be assigned to another process by operating system, thus,
it eventually improves the i/o density and enables higher processing
throughput.
Append node is an ideal point to be parallelized because
- child nodes can have physically different location by tablespace,
so further tuning is possible according to the system landscape.
- it can control whether subplan is actually executed on background
worker, per subplan basis. If subplan contains large tables and
small tables, ParallelAppend may kick background worker to scan
large tables only, but scan on small tables are by itself.
- Like as Funnel node, we don't need to care about enhancement of
individual node types. SeqScan, IndexScan, ForeignScan or others
can perform as usual, but actually in parallel.

3. Implementation
------------------
* Plan & Cost

ParallelAppend shall appear where Appen can appear except for the
usage for dummy. So, I'll enhance set_append_rel_pathlist() to add
both of AppendPath and ParallelAppendPath with cost for each.
Cost estimation logic shall take further discussions, however,
I expect the logic below to estimate the cost for ParallelAppend.
1. Sum startup_cost and run_cost for each child pathnode, but
distinguish according to synchronous or asynchronous.
Probably, total cost of pathnode is less than:
(parallel_setup_cost + its total cost / parallel_append_degree
+ number of rows * cpu_tuple_comm_cost)
is nonsense to run on background worker.
2. parallel_setup_cost * (# of asynchronous nodes) are added to
sum of startup_cost of asynchronous nodes.
3. sum of run_cost of asynchronous nodes are divided by
parallel_append_degree, then cpu_tuple_comm_cost * (total # of
rows by asynchronous nodes) are added.
4. both of synchronous and asynchronous cost are added, then it
becomes the cost of ParallelAppend.
Obviously, it stand on the viewpoint that says: cost reflects response
time of the underlying plan. So, cost of ParallelAppend can be smaller
than sum of underlying child nodes.

* Execution

Like Funnel node, it kicks background worker on the ExecProcNode handler,
thus, its startup time may be later than Fujita-san's approach if call
of ParallelAppend would be late. For example, when ParallelAppend is
located under HashJoin but inner Hash loads billion of rows.
Even though I expect ExecParallelAppend takes, at least, simple round-
robin scheduling like funnel_getnext(), we may give synchronous nodes
than asynchronous just after the background worker startup.

4. Further challenges
----------------------
* Serialization of CustomScan via outfuncs.c/readfuncs.c
Because methods field is, basically, a set of pointers per process basis,
we need to have an infrastructure to reproduce same table on the background
worker process identified by the name.
(I also try to design it.)

* Duplication of the parallel
If Funnel+PartialSeqScan is located under ParallelAppend, directly
or indirectly, it eventually leads background worker process to launch
another background workers. Is it expected usage of the current background
workers??

* Join pushdown
Distribution of nested-loop and hash-join may have advantage by parallel
processing, and by reduction of hash-size if CHECK() constraint of
individual partitioned tables informs rows obviously not to be joined.
Also see the thread:
[idea] table partition + hash join: http://bit.ly/1S2xpHT
My colleague already started to investigate / develop this feature
based on existing Append, to reduce num_batches.

As an aside, my GpuJoin feature works most effectively if entire inner
relations can be loaded to hash-table on GPU RAM, so features are very
welcome.

* Sort break-down
If mergejoin tried to have ParallelAppend node on left or right input,
we may be able to compare its cost with MargeParallelAppend + Sort on
the partial relation.

* Aggregate Push Down
It is what I exactly want to do.

Thanks,

--
Kyotaro Horiguchi
NTT Open Source Software Center

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Kouhei Kaigai (#4)
Re: [DESIGN] ParallelAppend

On Sun, Jul 26, 2015 at 8:43 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

Hello,

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

2. Problems to be solved
-------------------------
Typical OLAP workloads takes tons of tables join and scan on large
tables which are often partitioned, and its KPI is query response
time but very small number of sessions are active simultaneously.
So, we are required to run a single query as rapid as possible even
if it consumes larger computing resources than typical OLTP workloads.

Current implementation to scan heap is painful when we look at its
behavior from the standpoint - how many rows we can read within a
certain time, because of synchronous manner.
In the worst case, when SeqScan node tries to fetch the next tuple,
heap_getnext() looks up a block on shared buffer, then ReadBuffer()
calls storage manager to read the target block from the filesystem
if not on the buffer. Next, operating system makes the caller
process slept until required i/o get completed.
Most of the cases are helped in earlier stage than the above worst
case, however, the best scenario we can expect is: the next tuple
already appear on top of the message queue (of course visibility
checks are already done also) with no fall down to buffer manager
or deeper.
If we can run multiple scans in parallel / asynchronous, CPU core
shall be assigned to another process by operating system, thus,
it eventually improves the i/o density and enables higher processing
throughput.
Append node is an ideal point to be parallelized because
- child nodes can have physically different location by tablespace,
so further tuning is possible according to the system landscape.
- it can control whether subplan is actually executed on background
worker, per subplan basis. If subplan contains large tables and
small tables, ParallelAppend may kick background worker to scan
large tables only, but scan on small tables are by itself.
- Like as Funnel node, we don't need to care about enhancement of
individual node types. SeqScan, IndexScan, ForeignScan or others
can perform as usual, but actually in parallel.

3. Implementation
------------------
* Plan & Cost

ParallelAppend shall appear where Appen can appear except for the
usage for dummy. So, I'll enhance set_append_rel_pathlist() to add
both of AppendPath and ParallelAppendPath with cost for each.

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

If Funnel can handle both of horizontal and vertical parallelism,
it is a great simplification. I never stick a new node.

Once Funnel get a capability to have multiple child nodes, probably,
Append node above will have gone. I expect set_append_rel_pathlist()
add two paths based on Append and Funnel, then planner will choose
the cheaper one according to its cost.

We will need to pay attention another issues we will look at when Funnel
kicks background worker towards asymmetric relations.

If number of rows of individual child nodes are various, we may
want to assign 10 background workers to scan rel1 with PartialSeqScan.
On the other hands, rel2 may have very small number of rows thus
its total_cost may be smaller than cost to launch a worker.
In this case, Funnel has child nodes to be executed asynchronously and
synchronously.

If cheapest path of the child relation is a pair of Funnel and
PartialSeqScan, we have to avoid to stack Funnel node. Probably,
Funnel node that performs like Append needs to pull up underlying
Funnel and assign equivalen number of workers as follows.

Append
--> Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> Funnel
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3

shall be rewritten to
Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3 (num_workers = 1)

We also need to consider whether Funnel will have capability
equivalent to MergeAppend, even though parallel sorting is
a fantastic challenge.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Kouhei Kaigai (#5)
Re: [DESIGN] ParallelAppend

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Monday, July 27, 2015 11:07 PM
To: Amit Kapila
Cc: pgsql-hackers@postgresql.org; Robert Haas; Kyotaro HORIGUCHI
Subject: Re: [HACKERS] [DESIGN] ParallelAppend

On Sun, Jul 26, 2015 at 8:43 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

Hello,

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

2. Problems to be solved
-------------------------
Typical OLAP workloads takes tons of tables join and scan on large
tables which are often partitioned, and its KPI is query response
time but very small number of sessions are active simultaneously.
So, we are required to run a single query as rapid as possible even
if it consumes larger computing resources than typical OLTP workloads.

Current implementation to scan heap is painful when we look at its
behavior from the standpoint - how many rows we can read within a
certain time, because of synchronous manner.
In the worst case, when SeqScan node tries to fetch the next tuple,
heap_getnext() looks up a block on shared buffer, then ReadBuffer()
calls storage manager to read the target block from the filesystem
if not on the buffer. Next, operating system makes the caller
process slept until required i/o get completed.
Most of the cases are helped in earlier stage than the above worst
case, however, the best scenario we can expect is: the next tuple
already appear on top of the message queue (of course visibility
checks are already done also) with no fall down to buffer manager
or deeper.
If we can run multiple scans in parallel / asynchronous, CPU core
shall be assigned to another process by operating system, thus,
it eventually improves the i/o density and enables higher processing
throughput.
Append node is an ideal point to be parallelized because
- child nodes can have physically different location by tablespace,
so further tuning is possible according to the system landscape.
- it can control whether subplan is actually executed on background
worker, per subplan basis. If subplan contains large tables and
small tables, ParallelAppend may kick background worker to scan
large tables only, but scan on small tables are by itself.
- Like as Funnel node, we don't need to care about enhancement of
individual node types. SeqScan, IndexScan, ForeignScan or others
can perform as usual, but actually in parallel.

3. Implementation
------------------
* Plan & Cost

ParallelAppend shall appear where Appen can appear except for the
usage for dummy. So, I'll enhance set_append_rel_pathlist() to add
both of AppendPath and ParallelAppendPath with cost for each.

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

If Funnel can handle both of horizontal and vertical parallelism,
it is a great simplification. I never stick a new node.

Once Funnel get a capability to have multiple child nodes, probably,
Append node above will have gone. I expect set_append_rel_pathlist()
add two paths based on Append and Funnel, then planner will choose
the cheaper one according to its cost.

In the latest v16 patch, Funnel is declared as follows:

typedef struct Funnel
{
Scan scan;
int num_workers;
} Funnel;

If we try to add Append capability here, I expects the structure will
be adjusted as follows, for example:

typedef struct Funnel
{
Scan scan;
List *funnel_plans;
List *funnel_num_workers;
} Funnel;

As literal, funnel_plans saves underlying Plan nodes instead of the
lefttree. Also, funnel_num_workers saves number of expected workers
to be assigned on individual child plans.

Even though create_parallelscan_paths() in v16 set num_workers not
larger than parallel_seqscan_degree, total number of the concurrent
background workers may exceed this configuration if more than two
PartialSeqScan nodes are underlying.
It is a different configuration from max_worker_processes, so it is
not a matter as long as we have another restriction.
However, how do we control the cap of number of worker processes per
"appendable" Funnel node? For example, if a parent table has 200
child tables but max_worker_processes are configured to 50.
It is obviously impossible to launch all the background workers
simultaneously. One idea I have is to suspend launch of some plans
until earlier ones are completed.

We will need to pay attention another issues we will look at when Funnel
kicks background worker towards asymmetric relations.

If number of rows of individual child nodes are various, we may
want to assign 10 background workers to scan rel1 with PartialSeqScan.
On the other hands, rel2 may have very small number of rows thus
its total_cost may be smaller than cost to launch a worker.
In this case, Funnel has child nodes to be executed asynchronously and
synchronously.

If cheapest path of the child relation is a pair of Funnel and
PartialSeqScan, we have to avoid to stack Funnel node. Probably,
Funnel node that performs like Append needs to pull up underlying
Funnel and assign equivalen number of workers as follows.

Append
--> Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> Funnel
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3

shall be rewritten to
Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3 (num_workers = 1)

We also need to consider whether Funnel will have capability
equivalent to MergeAppend, even though parallel sorting is
a fantastic challenge.

--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Amit Kapila
amit.kapila16@gmail.com
In reply to: Kouhei Kaigai (#6)
Re: [DESIGN] ParallelAppend

On Tue, Jul 28, 2015 at 7:59 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Monday, July 27, 2015 11:07 PM
To: Amit Kapila

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

If Funnel can handle both of horizontal and vertical parallelism,
it is a great simplification. I never stick a new node.

Once Funnel get a capability to have multiple child nodes, probably,
Append node above will have gone. I expect set_append_rel_pathlist()
add two paths based on Append and Funnel, then planner will choose
the cheaper one according to its cost.

In the latest v16 patch, Funnel is declared as follows:

typedef struct Funnel
{
Scan scan;
int num_workers;
} Funnel;

If we try to add Append capability here, I expects the structure will
be adjusted as follows, for example:

typedef struct Funnel
{
Scan scan;
List *funnel_plans;
List *funnel_num_workers;
} Funnel;

As literal, funnel_plans saves underlying Plan nodes instead of the
lefttree. Also, funnel_num_workers saves number of expected workers
to be assigned on individual child plans.

or shall we have a node like above and name it as FunnelAppend or
AppenFunnel?

Even though create_parallelscan_paths() in v16 set num_workers not
larger than parallel_seqscan_degree, total number of the concurrent
background workers may exceed this configuration if more than two
PartialSeqScan nodes are underlying.
It is a different configuration from max_worker_processes, so it is
not a matter as long as we have another restriction.
However, how do we control the cap of number of worker processes per
"appendable" Funnel node? For example, if a parent table has 200
child tables but max_worker_processes are configured to 50.
It is obviously impossible to launch all the background workers
simultaneously. One idea I have is to suspend launch of some plans
until earlier ones are completed.

Okay, but I think in that idea you need to re-launch the workers again for
new set of relation scan's which could turn out to be costly, how about
designing some way where workers after completing their assigned work
check for new set of task/'s (which in this case would be to scan a new) and
then execute the same. I think in this way we can achieve dynamic
allocation
of work and achieve maximum parallelism with available set of workers.
We have achieved this in ParallelSeqScan by scanning at block level, once
a worker finishes a block, it checks for new block to scan.

We will need to pay attention another issues we will look at when Funnel
kicks background worker towards asymmetric relations.

If number of rows of individual child nodes are various, we may
want to assign 10 background workers to scan rel1 with PartialSeqScan.
On the other hands, rel2 may have very small number of rows thus
its total_cost may be smaller than cost to launch a worker.
In this case, Funnel has child nodes to be executed asynchronously and
synchronously.

I think this might turn out to be slightly tricky, for example how do we
know
for what size of relation, how many workers are sufficient?
Another way to look at dividing the work in this case could be in terms of
chunk-of-blocks, once a worker finishes it current set of block/'s, it
should be
able to get new set of block's to scan. So let us assume if we decide
chunk-size as 32 and total number of blocks in whole inheritance hierarchy
are 3200, then the max workers we should allocate to this scan are 100 and
if we have parallel_seqscan degree lesser than that then we can use those
many workers and then let them scan 32-blocks-at-a-time.

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#8Amit Langote
Langote_Amit_f8@lab.ntt.co.jp
In reply to: Kouhei Kaigai (#5)
Re: [DESIGN] ParallelAppend

KaiGai-san,

On 2015-07-27 PM 11:07, Kouhei Kaigai wrote:

Append
--> Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> Funnel
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3

shall be rewritten to
Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3 (num_workers = 1)

In the rewritten plan, are respective scans (PartialSeq or Seq) on rel1,
rel2 and rel3 asynchronous w.r.t each other? Or does each one wait for the
earlier one to finish? I would think the answer is no because then it
would not be different from the former case, right? Because the original
premise seems that (partitions) rel1, rel2, rel3 may be on different
volumes so parallelism across volumes seems like a goal of parallelizing
Append.

From my understanding of parallel seqscan patch, each worker's
PartialSeqScan asks for a block to scan using a shared parallel heap scan
descriptor that effectively keeps track of division of work among
PartialSeqScans in terms of blocks. What if we invent a PartialAppend
which each worker would run in case of a parallelized Append. It would use
some kind of shared descriptor to pick a relation (Append member) to scan.
The shared structure could be the list of subplans including the mutex for
concurrency. It doesn't sound as effective as proposed
ParallelHeapScanDescData does for PartialSeqScan but any more granular
might be complicated. For example, consider (current_relation,
current_block) pair. If there are more workers than subplans/partitions,
then multiple workers might start working on the same relation after a
round-robin assignment of relations (but of course, a later worker would
start scanning from a later block in the same relation). I imagine that
might help with parallelism across volumes if that's the case. MergeAppend
parallelization might involve a bit more complication but may be feasible
with a PartialMergeAppend with slightly different kind of coordination
among workers. What do you think of such an approach?

Thanks,
Amit

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9David Rowley
david.rowley@2ndquadrant.com
In reply to: Kyotaro HORIGUCHI (#2)
Re: [DESIGN] ParallelAppend

On 27 July 2015 at 21:09, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp

wrote:

Hello, can I ask some questions?

I suppose we can take this as the analog of ParalleSeqScan. I
can see not so distinction between Append(ParalleSeqScan) and
ParallelAppend(SeqScan). What difference is there between them?

If other nodes will have the same functionality as you mention at
the last of this proposal, it might be better that some part of
this feature is implemented as a part of existing executor
itself, but not as a deidicated additional node, just as my
asynchronous fdw execution patch patially does. (Although it
lacks planner part and bg worker launching..) If that is the
case, it might be better that ExecProcNode is modified so that it
supports both in-process and inter-bgworker cases by the single
API.

What do you think about this?

I have to say that I really like the thought of us having parallel enabled
stuff in Postgres, but I also have to say that I don't think inventing all
these special parallel node types is a good idea. If we think about
everything that we can parallelise...

Perhaps.... sort, hash join, seqscan, hash, bitmap heap scan, nested loop.
I don't want to debate that, but perhaps there's more, perhaps less.
Are we really going to duplicate all of the code and add in the parallel
stuff as new node types?

My other concern here is that I seldom hear people talk about the planner's
architectural lack of ability to make a good choice about how many parallel
workers to choose. Surely to properly calculate costs you need to know the
exact number of parallel workers that will be available at execution time,
but you need to know this at planning time!? I can't see how this works,
apart from just being very conservative about parallel workers, which I
think is really bad, as many databases have busy times in the day, and also
quiet times, generally quiet time is when large batch stuff gets done, and
that's the time that parallel stuff is likely most useful. Remember queries
are not always planned just before they're executed. We could have a
PREPAREd query, or we could have better plan caching in the future, or if
we build some intelligence into the planner to choose a good number of
workers based on the current server load, then what's to say that the
server will be under this load at exec time? If we plan during a quiet
time, and exec in a busy time all hell may break loose.

I really do think that existing nodes should just be initialized in a
parallel mode, and each node type can have a function to state if it
supports parallelism or not.

I'd really like to hear more opinions in the ideas I discussed here:

/messages/by-id/CAApHDvp2STf0=pQfpq+e7WA4QdYmpFM5qu_YtUpE7R0jLnH82Q@mail.gmail.com

This design makes use of the Funnel node that Amit has already made and
allows more than 1 node to be executed in parallel at once.

It appears that parallel enabling the executor node by node is
fundamentally locked into just 1 node being executed in parallel, then
perhaps a Funnel node gathering up the parallel worker buffers and
streaming those back in serial mode. I believe by design, this does not
permit a whole plan branch from executing in parallel and I really feel
like doing things this way is going to be very hard to undo and improve
later. I might be too stupid to figure it out, but how would parallel hash
join work if it can't gather tuples from the inner and outer nodes in
parallel?

Sorry for the rant, but I just feel like we're painting ourselves into a
corner by parallel enabling the executor node by node.
Apologies if I've completely misunderstood things.

Regards

David Rowley

--
David Rowley http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/&gt;
PostgreSQL Development, 24x7 Support, Training & Services

#10Ashutosh Bapat
ashutosh.bapat@enterprisedb.com
In reply to: David Rowley (#9)
Re: [DESIGN] ParallelAppend

On Tue, Jul 28, 2015 at 12:59 PM, David Rowley <david.rowley@2ndquadrant.com

wrote:

On 27 July 2015 at 21:09, Kyotaro HORIGUCHI <
horiguchi.kyotaro@lab.ntt.co.jp> wrote:

Hello, can I ask some questions?

I suppose we can take this as the analog of ParalleSeqScan. I
can see not so distinction between Append(ParalleSeqScan) and
ParallelAppend(SeqScan). What difference is there between them?

If other nodes will have the same functionality as you mention at
the last of this proposal, it might be better that some part of
this feature is implemented as a part of existing executor
itself, but not as a deidicated additional node, just as my
asynchronous fdw execution patch patially does. (Although it
lacks planner part and bg worker launching..) If that is the
case, it might be better that ExecProcNode is modified so that it
supports both in-process and inter-bgworker cases by the single
API.

What do you think about this?

I have to say that I really like the thought of us having parallel enabled
stuff in Postgres, but I also have to say that I don't think inventing all
these special parallel node types is a good idea. If we think about
everything that we can parallelise...

Perhaps.... sort, hash join, seqscan, hash, bitmap heap scan, nested loop.
I don't want to debate that, but perhaps there's more, perhaps less.
Are we really going to duplicate all of the code and add in the parallel
stuff as new node types?

My other concern here is that I seldom hear people talk about the
planner's architectural lack of ability to make a good choice about how
many parallel workers to choose. Surely to properly calculate costs you
need to know the exact number of parallel workers that will be available at
execution time, but you need to know this at planning time!? I can't see
how this works, apart from just being very conservative about parallel
workers, which I think is really bad, as many databases have busy times in
the day, and also quiet times, generally quiet time is when large batch
stuff gets done, and that's the time that parallel stuff is likely most
useful. Remember queries are not always planned just before they're
executed. We could have a PREPAREd query, or we could have better plan
caching in the future, or if we build some intelligence into the planner to
choose a good number of workers based on the current server load, then
what's to say that the server will be under this load at exec time? If we
plan during a quiet time, and exec in a busy time all hell may break loose.

I really do think that existing nodes should just be initialized in a
parallel mode, and each node type can have a function to state if it
supports parallelism or not.

I'd really like to hear more opinions in the ideas I discussed here:

/messages/by-id/CAApHDvp2STf0=pQfpq+e7WA4QdYmpFM5qu_YtUpE7R0jLnH82Q@mail.gmail.com

This design makes use of the Funnel node that Amit has already made and
allows more than 1 node to be executed in parallel at once.

It appears that parallel enabling the executor node by node is
fundamentally locked into just 1 node being executed in parallel, then
perhaps a Funnel node gathering up the parallel worker buffers and
streaming those back in serial mode. I believe by design, this does not
permit a whole plan branch from executing in parallel and I really feel
like doing things this way is going to be very hard to undo and improve
later. I might be too stupid to figure it out, but how would parallel hash
join work if it can't gather tuples from the inner and outer nodes in
parallel?

Sorry for the rant, but I just feel like we're painting ourselves into a
corner by parallel enabling the executor node by node.
Apologies if I've completely misunderstood things.

+1, well articulated.
--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

#11Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Amit Kapila (#7)
Re: [DESIGN] ParallelAppend

On Tue, Jul 28, 2015 at 7:59 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Monday, July 27, 2015 11:07 PM
To: Amit Kapila

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

If Funnel can handle both of horizontal and vertical parallelism,
it is a great simplification. I never stick a new node.

Once Funnel get a capability to have multiple child nodes, probably,
Append node above will have gone. I expect set_append_rel_pathlist()
add two paths based on Append and Funnel, then planner will choose
the cheaper one according to its cost.

In the latest v16 patch, Funnel is declared as follows:

typedef struct Funnel
{
Scan scan;
int num_workers;
} Funnel;

If we try to add Append capability here, I expects the structure will
be adjusted as follows, for example:

typedef struct Funnel
{
Scan scan;
List *funnel_plans;
List *funnel_num_workers;
} Funnel;

As literal, funnel_plans saves underlying Plan nodes instead of the
lefttree. Also, funnel_num_workers saves number of expected workers
to be assigned on individual child plans.

or shall we have a node like above and name it as FunnelAppend or
AppenFunnel?

It is better to have smaller number of node types which are capable to
kick background workers because of simplification of path construction.

Let's assume the case below. When planner considers a path to append
child scans on rel1, rel2 and rel3 but the cheapest path of rel2 is
Funnel+PartialSeqScan, we cannot put Funnel here unless we don't pull
up Funnel of rel2, can we?

(Append? or Funnel)
--> SeqScan on rel1
--> Funnel
--> PartialSeqScan on rel2
--> IndexScan on rel3

If we pull Funnel here, I think the plan shall be as follows:
Funnel
--> SeqScan on rel1
--> PartialSeqScan on rel2
--> IndexScan on rel3

If all we have to pay attention is Funnel node, it makes the code
around path construction and pull-up logic much simpler, rather than
multiple node types can kick background workers.

Even though create_parallelscan_paths() in v16 set num_workers not
larger than parallel_seqscan_degree, total number of the concurrent
background workers may exceed this configuration if more than two
PartialSeqScan nodes are underlying.
It is a different configuration from max_worker_processes, so it is
not a matter as long as we have another restriction.
However, how do we control the cap of number of worker processes per
"appendable" Funnel node? For example, if a parent table has 200
child tables but max_worker_processes are configured to 50.
It is obviously impossible to launch all the background workers
simultaneously. One idea I have is to suspend launch of some plans
until earlier ones are completed.

Okay, but I think in that idea you need to re-launch the workers again for
new set of relation scan's which could turn out to be costly, how about
designing some way where workers after completing their assigned work
check for new set of task/'s (which in this case would be to scan a new) and
then execute the same. I think in this way we can achieve dynamic allocation
of work and achieve maximum parallelism with available set of workers.
We have achieved this in ParallelSeqScan by scanning at block level, once
a worker finishes a block, it checks for new block to scan.

Is it possible to put multiple PlannedStmt on TOC, isn't it?
If background worker picks up an uncompleted PlannedStmt first
(based on round-robin likely?), it may achieve the maximum
parallelism. Yep, it seems to me a good idea which I want to try.
If (num of worker) > (num of sub-plans), some of sub-plans can
have multiple workers from the beginning, then, other workers
also help to execute heavy plans later.
It may be better to put PlannedStmt in order of total_cost to
bias multi-workers execution from the beginning.

TODO: Even if a heavy query occupied most of available worker slots,
another session wants to use parallel execution later but during
execution of the primary query. We may need to have a 'scoreboard'
on shared memory to know how many workers are potentially needed
and how much ones are overused by somebody. If someone overconsumed
background workers, it should exit first, rather than picking up
the next PlannedStmt.

We will need to pay attention another issues we will look at when Funnel
kicks background worker towards asymmetric relations.

If number of rows of individual child nodes are various, we may
want to assign 10 background workers to scan rel1 with PartialSeqScan.
On the other hands, rel2 may have very small number of rows thus
its total_cost may be smaller than cost to launch a worker.
In this case, Funnel has child nodes to be executed asynchronously and
synchronously.

I think this might turn out to be slightly tricky, for example how do we know
for what size of relation, how many workers are sufficient?

I expected comparison between total_cost of the sub-plan and a threshold that
represents the cost to kick background workers.
However, I'm inclined to the above approach (multiple PlannedStmt on TOC,
then picked up by background workers by round-robin).

Another way to look at dividing the work in this case could be in terms of
chunk-of-blocks, once a worker finishes it current set of block/'s, it should
be
able to get new set of block's to scan. So let us assume if we decide
chunk-size as 32 and total number of blocks in whole inheritance hierarchy
are 3200, then the max workers we should allocate to this scan are 100 and
if we have parallel_seqscan degree lesser than that then we can use those
many workers and then let them scan 32-blocks-at-a-time.

If we use the above multi-PlannedStmt approach, TOC also need to have a counter
to track how many background workers are running on a particular PlannedStmt,
then if enough number of worker is running on the PlannedStmt, next available
worker will skip this PlannedStmt (even if round-robin) or just exit?
Anyway, I think an infrastructure may be needed to avoid too aggressive
parallel execution.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Amit Langote (#8)
Re: [DESIGN] ParallelAppend

KaiGai-san,

On 2015-07-27 PM 11:07, Kouhei Kaigai wrote:

Append
--> Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> Funnel
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3

shall be rewritten to
Funnel
--> PartialSeqScan on rel1 (num_workers = 4)
--> PartialSeqScan on rel2 (num_workers = 8)
--> SeqScan on rel3 (num_workers = 1)

In the rewritten plan, are respective scans (PartialSeq or Seq) on rel1,
rel2 and rel3 asynchronous w.r.t each other? Or does each one wait for the
earlier one to finish? I would think the answer is no because then it
would not be different from the former case, right? Because the original
premise seems that (partitions) rel1, rel2, rel3 may be on different
volumes so parallelism across volumes seems like a goal of parallelizing
Append.

From my understanding of parallel seqscan patch, each worker's
PartialSeqScan asks for a block to scan using a shared parallel heap scan
descriptor that effectively keeps track of division of work among
PartialSeqScans in terms of blocks. What if we invent a PartialAppend
which each worker would run in case of a parallelized Append. It would use
some kind of shared descriptor to pick a relation (Append member) to scan.
The shared structure could be the list of subplans including the mutex for
concurrency. It doesn't sound as effective as proposed
ParallelHeapScanDescData does for PartialSeqScan but any more granular
might be complicated. For example, consider (current_relation,
current_block) pair. If there are more workers than subplans/partitions,
then multiple workers might start working on the same relation after a
round-robin assignment of relations (but of course, a later worker would
start scanning from a later block in the same relation). I imagine that
might help with parallelism across volumes if that's the case.

I initially thought ParallelAppend kicks fixed number of background workers
towards sub-plans, according to the estimated cost on the planning stage.
However, I'm now inclined that background worker picks up an uncompleted
PlannedStmt first. (For more details, please see the reply to Amit Kapila)
It looks like less less-grained worker's job distribution.
Once number of workers gets larger than number of volumes / partitions,
it means more than two workers begin to assign same PartialSeqScan, thus
it takes fine-grained job distribution using shared parallel heap scan.

MergeAppend
parallelization might involve a bit more complication but may be feasible
with a PartialMergeAppend with slightly different kind of coordination
among workers. What do you think of such an approach?

Do we need to have something special in ParallelMergeAppend?
If individual child nodes are designed to return sorted results,
what we have to do seems to me same.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#13Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: David Rowley (#9)
Re: [DESIGN] ParallelAppend

On 27 July 2015 at 21:09, Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
wrote:

Hello, can I ask some questions?

I suppose we can take this as the analog of ParalleSeqScan. I
can see not so distinction between Append(ParalleSeqScan) and
ParallelAppend(SeqScan). What difference is there between them?

If other nodes will have the same functionality as you mention at
the last of this proposal, it might be better that some part of
this feature is implemented as a part of existing executor
itself, but not as a deidicated additional node, just as my
asynchronous fdw execution patch patially does. (Although it
lacks planner part and bg worker launching..) If that is the
case, it might be better that ExecProcNode is modified so that it
supports both in-process and inter-bgworker cases by the single
API.

What do you think about this?

I have to say that I really like the thought of us having parallel enabled stuff
in Postgres, but I also have to say that I don't think inventing all these special
parallel node types is a good idea. If we think about everything that we can
parallelise...

Perhaps.... sort, hash join, seqscan, hash, bitmap heap scan, nested loop. I don't
want to debate that, but perhaps there's more, perhaps less.
Are we really going to duplicate all of the code and add in the parallel stuff
as new node types?

My other concern here is that I seldom hear people talk about the planner's
architectural lack of ability to make a good choice about how many parallel workers
to choose. Surely to properly calculate costs you need to know the exact number
of parallel workers that will be available at execution time, but you need to
know this at planning time!? I can't see how this works, apart from just being
very conservative about parallel workers, which I think is really bad, as many
databases have busy times in the day, and also quiet times, generally quiet time
is when large batch stuff gets done, and that's the time that parallel stuff is
likely most useful. Remember queries are not always planned just before they're
executed. We could have a PREPAREd query, or we could have better plan caching
in the future, or if we build some intelligence into the planner to choose a good
number of workers based on the current server load, then what's to say that the
server will be under this load at exec time? If we plan during a quiet time, and
exec in a busy time all hell may break loose.

Even though it is not easy to estimate available workers at planning time,
it might be possible to define a "target" number of workers to run.
If Funnel cannot get enough number of workers less than target, my preference
is to tell other workers (via scoreboard?) not to pick up next PlannedStmt and
exit when another Funnel cannot launch enough number of workers.

I really do think that existing nodes should just be initialized in a parallel
mode, and each node type can have a function to state if it supports parallelism
or not.

I'd really like to hear more opinions in the ideas I discussed here:

/messages/by-id/CAApHDvp2STf0=pQfpq+e7WA4QdYmpFM5qu_YtU
pE7R0jLnH82Q@mail.gmail.com

This design makes use of the Funnel node that Amit has already made and allows
more than 1 node to be executed in parallel at once.

It appears that parallel enabling the executor node by node is fundamentally locked
into just 1 node being executed in parallel, then perhaps a Funnel node gathering
up the parallel worker buffers and streaming those back in serial mode. I believe
by design, this does not permit a whole plan branch from executing in parallel
and I really feel like doing things this way is going to be very hard to undo
and improve later. I might be too stupid to figure it out, but how would parallel
hash join work if it can't gather tuples from the inner and outer nodes in parallel?

Hash-Join and Nest-Loop should not have PartialSeqScan in the inner-side, but
outer side can be PartialSeqScan under the Funnel node.
In case of Hash-Join, SeqScan of inner-side loads any tuples (*1) to hash-table
once, then records come from outer-side shall be combined with the hash-table.
Even though inner-side is read redundantly, advantage of parallel join will win
as long as inner-side is enough small; This assumption is right on usual pair of
master tables (small) and fact table (big).

(*1) Our colleague is now working on this feature. It enables to drop unnecessary
rows under the partitioned tables. So, we may not need to have entire hash table
for each background workers.
/messages/by-id/9A28C8860F777E439AA12E8AEA7694F8010F672B@BPXM15GP.gisp.nec.co.jp

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#14Amit Langote
Langote_Amit_f8@lab.ntt.co.jp
In reply to: Kouhei Kaigai (#12)
Re: [DESIGN] ParallelAppend

KaiGai-san,

On 2015-07-28 PM 09:58, Kouhei Kaigai wrote:

From my understanding of parallel seqscan patch, each worker's
PartialSeqScan asks for a block to scan using a shared parallel heap scan
descriptor that effectively keeps track of division of work among
PartialSeqScans in terms of blocks. What if we invent a PartialAppend
which each worker would run in case of a parallelized Append. It would use
some kind of shared descriptor to pick a relation (Append member) to scan.
The shared structure could be the list of subplans including the mutex for
concurrency. It doesn't sound as effective as proposed
ParallelHeapScanDescData does for PartialSeqScan but any more granular
might be complicated. For example, consider (current_relation,
current_block) pair. If there are more workers than subplans/partitions,
then multiple workers might start working on the same relation after a
round-robin assignment of relations (but of course, a later worker would
start scanning from a later block in the same relation). I imagine that
might help with parallelism across volumes if that's the case.

I initially thought ParallelAppend kicks fixed number of background workers
towards sub-plans, according to the estimated cost on the planning stage.
However, I'm now inclined that background worker picks up an uncompleted
PlannedStmt first. (For more details, please see the reply to Amit Kapila)
It looks like less less-grained worker's job distribution.
Once number of workers gets larger than number of volumes / partitions,
it means more than two workers begin to assign same PartialSeqScan, thus
it takes fine-grained job distribution using shared parallel heap scan.

I like your idea of using round-robin assignment of partial/non-partial
sub-plans to workers. Do you think there are two considerations of cost
here: sub-plans themselves could have parallel paths to consider and (I
think) your proposal introduces a new consideration - a plain old
synchronous Append path vs. parallel asynchronous Append with Funnel
(below/above?) it. I guess the asynchronous version would always be
cheaper. So, even if we end up with non-parallel sub-plans do we still add
a Funnel to make Append asynchronous? Am I missing something?

MergeAppend
parallelization might involve a bit more complication but may be feasible
with a PartialMergeAppend with slightly different kind of coordination
among workers. What do you think of such an approach?

Do we need to have something special in ParallelMergeAppend?
If individual child nodes are designed to return sorted results,
what we have to do seems to me same.

Sorry, I was wrongly worried because I did not really know that
MergeAppend uses a binaryheap to store tuples before returning.

Thanks,
Amit

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#15Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Amit Langote (#14)
Re: [DESIGN] ParallelAppend

On 2015-07-28 PM 09:58, Kouhei Kaigai wrote:

From my understanding of parallel seqscan patch, each worker's
PartialSeqScan asks for a block to scan using a shared parallel heap scan
descriptor that effectively keeps track of division of work among
PartialSeqScans in terms of blocks. What if we invent a PartialAppend
which each worker would run in case of a parallelized Append. It would use
some kind of shared descriptor to pick a relation (Append member) to scan.
The shared structure could be the list of subplans including the mutex for
concurrency. It doesn't sound as effective as proposed
ParallelHeapScanDescData does for PartialSeqScan but any more granular
might be complicated. For example, consider (current_relation,
current_block) pair. If there are more workers than subplans/partitions,
then multiple workers might start working on the same relation after a
round-robin assignment of relations (but of course, a later worker would
start scanning from a later block in the same relation). I imagine that
might help with parallelism across volumes if that's the case.

I initially thought ParallelAppend kicks fixed number of background workers
towards sub-plans, according to the estimated cost on the planning stage.
However, I'm now inclined that background worker picks up an uncompleted
PlannedStmt first. (For more details, please see the reply to Amit Kapila)
It looks like less less-grained worker's job distribution.
Once number of workers gets larger than number of volumes / partitions,
it means more than two workers begin to assign same PartialSeqScan, thus
it takes fine-grained job distribution using shared parallel heap scan.

I like your idea of using round-robin assignment of partial/non-partial
sub-plans to workers. Do you think there are two considerations of cost
here: sub-plans themselves could have parallel paths to consider and (I
think) your proposal introduces a new consideration - a plain old
synchronous Append path vs. parallel asynchronous Append with Funnel
(below/above?) it. I guess the asynchronous version would always be
cheaper. So, even if we end up with non-parallel sub-plans do we still add
a Funnel to make Append asynchronous? Am I missing something?

I expect Funnel itself will get Append capability but run sub-plans in
background workers, to simplify path constructions. So, if Funnel with
multiple sub-plans have cheaper cost than Append, it will replace the
AppendPath by FunnelPath.

Regarding to the cost estimation, I don't think parallel version is always
cheaper than traditional Append, because of the cost to launch background
workers. It increases startup cost to process the relation, thus, if upper
node prefers small startup cost (like Limit), traditional Append still has
advantages.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#16Amit Langote
Langote_Amit_f8@lab.ntt.co.jp
In reply to: Kouhei Kaigai (#15)
Re: [DESIGN] ParallelAppend

On 2015-07-29 AM 11:02, Kouhei Kaigai wrote:

...
synchronous Append path vs. parallel asynchronous Append with Funnel
(below/above?) it. I guess the asynchronous version would always be
cheaper. So, even if we end up with non-parallel sub-plans do we still add
a Funnel to make Append asynchronous? Am I missing something?

I expect Funnel itself will get Append capability but run sub-plans in
background workers, to simplify path constructions. So, if Funnel with
multiple sub-plans have cheaper cost than Append, it will replace the
AppendPath by FunnelPath.

Regarding to the cost estimation, I don't think parallel version is always
cheaper than traditional Append, because of the cost to launch background
workers. It increases startup cost to process the relation, thus, if upper
node prefers small startup cost (like Limit), traditional Append still has
advantages.

Right, I almost forgot about the start-up cost.

Thanks,
Amit

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Amit Kapila
amit.kapila16@gmail.com
In reply to: Kouhei Kaigai (#11)
Re: [DESIGN] ParallelAppend

On Tue, Jul 28, 2015 at 6:08 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

On Tue, Jul 28, 2015 at 7:59 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com>

wrote:

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei

Kaigai

Sent: Monday, July 27, 2015 11:07 PM
To: Amit Kapila

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

If Funnel can handle both of horizontal and vertical parallelism,
it is a great simplification. I never stick a new node.

Once Funnel get a capability to have multiple child nodes, probably,
Append node above will have gone. I expect set_append_rel_pathlist()
add two paths based on Append and Funnel, then planner will choose
the cheaper one according to its cost.

In the latest v16 patch, Funnel is declared as follows:

typedef struct Funnel
{
Scan scan;
int num_workers;
} Funnel;

If we try to add Append capability here, I expects the structure will
be adjusted as follows, for example:

typedef struct Funnel
{
Scan scan;
List *funnel_plans;
List *funnel_num_workers;
} Funnel;

As literal, funnel_plans saves underlying Plan nodes instead of the
lefttree. Also, funnel_num_workers saves number of expected workers
to be assigned on individual child plans.

or shall we have a node like above and name it as FunnelAppend or
AppenFunnel?

It is better to have smaller number of node types which are capable to
kick background workers because of simplification of path construction.

Let's assume the case below. When planner considers a path to append
child scans on rel1, rel2 and rel3 but the cheapest path of rel2 is
Funnel+PartialSeqScan, we cannot put Funnel here unless we don't pull
up Funnel of rel2, can we?

(Append? or Funnel)
--> SeqScan on rel1
--> Funnel
--> PartialSeqScan on rel2
--> IndexScan on rel3

I am not sure, but what problem do you see in putting Funnel node
for one of the relation scans and not for the others.

If we pull Funnel here, I think the plan shall be as follows:
Funnel
--> SeqScan on rel1
--> PartialSeqScan on rel2
--> IndexScan on rel3

So if we go this route, then Funnel should have capability
to execute non-parallel part of plan as well, like in this
case it should be able to execute non-parallel IndexScan on
rel3 as well and then it might need to distinguish between
parallel and non-parallel part of plans. I think this could
make Funnel node complex.

If all we have to pay attention is Funnel node, it makes the code
around path construction and pull-up logic much simpler, rather than
multiple node types can kick background workers.

Okay, but I think pulling-up Funnel node makes sense only when all
nodes beneath it needs to be executed parallely.

Even though create_parallelscan_paths() in v16 set num_workers not
larger than parallel_seqscan_degree, total number of the concurrent
background workers may exceed this configuration if more than two
PartialSeqScan nodes are underlying.
It is a different configuration from max_worker_processes, so it is
not a matter as long as we have another restriction.
However, how do we control the cap of number of worker processes per
"appendable" Funnel node? For example, if a parent table has 200
child tables but max_worker_processes are configured to 50.
It is obviously impossible to launch all the background workers
simultaneously. One idea I have is to suspend launch of some plans
until earlier ones are completed.

Okay, but I think in that idea you need to re-launch the workers again

for

new set of relation scan's which could turn out to be costly, how about
designing some way where workers after completing their assigned work
check for new set of task/'s (which in this case would be to scan a

new) and

then execute the same. I think in this way we can achieve dynamic

allocation

of work and achieve maximum parallelism with available set of workers.
We have achieved this in ParallelSeqScan by scanning at block level,

once

a worker finishes a block, it checks for new block to scan.

Is it possible to put multiple PlannedStmt on TOC, isn't it?

Yes, I don't see any problem in doing that way. So here for
each different (child) relation, you want to create a separate
PlannedStmt or do you have something else in mind?

If background worker picks up an uncompleted PlannedStmt first
(based on round-robin likely?), it may achieve the maximum
parallelism.

I think this can work well for the cases when there are insufficient
number of workers to execute the different planned statements.

Yep, it seems to me a good idea which I want to try.
If (num of worker) > (num of sub-plans), some of sub-plans can
have multiple workers from the beginning, then, other workers
also help to execute heavy plans later.
It may be better to put PlannedStmt in order of total_cost to
bias multi-workers execution from the beginning.

Yeah, that might be better, but I think for doing so you might
need to traverse each child plan and compare there costs while
constructing multiple planned statements which might incur some
overhead when number of plans are large, however OTOH this cost
should be much smaller as compare to starting up workers, so
probably it should be okay.

TODO: Even if a heavy query occupied most of available worker slots,
another session wants to use parallel execution later but during
execution of the primary query. We may need to have a 'scoreboard'
on shared memory to know how many workers are potentially needed
and how much ones are overused by somebody. If someone overconsumed
background workers, it should exit first, rather than picking up
the next PlannedStmt.

Actually distribution of workers among parallel queriesis a very
tricky problem and I think we have to keep on working on it till
we get some good solution for it.

Another way to look at dividing the work in this case could be in terms

of

chunk-of-blocks, once a worker finishes it current set of block/'s, it

should

be
able to get new set of block's to scan. So let us assume if we decide
chunk-size as 32 and total number of blocks in whole inheritance

hierarchy

are 3200, then the max workers we should allocate to this scan are 100

and

if we have parallel_seqscan degree lesser than that then we can use

those

many workers and then let them scan 32-blocks-at-a-time.

If we use the above multi-PlannedStmt approach, TOC also need to have a

counter

to track how many background workers are running on a particular

PlannedStmt,

then if enough number of worker is running on the PlannedStmt, next

available

worker will skip this PlannedStmt (even if round-robin) or just exit?

I think for a particular PlannedStmt, number of workers must have
been decided before start of execution, so if those many workers are
available to work on that particular PlannedStmt, then next/new
worker should work on next PlannedStmt.

Anyway, I think an infrastructure may be needed to avoid too aggressive
parallel execution.

Yes, I think we need some infrastructure for workers if we have
to follow the design discussed above.

So I think we have three main parts to work for this patch.

1. Allocation of work among workers which needs some different
mechanism than ParallelSeqScan Patch.
2. Execution of work by workers and Funnel node and then pass
the results back to upper node. I think this needs some more
work in addition to ParallelSeqScan patch.
3. Generation of parallel plan for Append node needs somewhat
different mechanism as we might want to have some additional
logic for transaformation of nodes.

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#18Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Amit Kapila (#17)
Re: [DESIGN] ParallelAppend

On Tue, Jul 28, 2015 at 6:08 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

On Tue, Jul 28, 2015 at 7:59 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Monday, July 27, 2015 11:07 PM
To: Amit Kapila

Is there a real need to have new node like ParallelAppendPath?
Can't we have Funnel node beneath AppendNode and then each
worker will be responsible to have SeqScan on each inherited child
relation. Something like

Append
---> Funnel
--> SeqScan rel1
--> SeqScan rel2

If Funnel can handle both of horizontal and vertical parallelism,
it is a great simplification. I never stick a new node.

Once Funnel get a capability to have multiple child nodes, probably,
Append node above will have gone. I expect set_append_rel_pathlist()
add two paths based on Append and Funnel, then planner will choose
the cheaper one according to its cost.

In the latest v16 patch, Funnel is declared as follows:

typedef struct Funnel
{
Scan scan;
int num_workers;
} Funnel;

If we try to add Append capability here, I expects the structure will
be adjusted as follows, for example:

typedef struct Funnel
{
Scan scan;
List *funnel_plans;
List *funnel_num_workers;
} Funnel;

As literal, funnel_plans saves underlying Plan nodes instead of the
lefttree. Also, funnel_num_workers saves number of expected workers
to be assigned on individual child plans.

or shall we have a node like above and name it as FunnelAppend or
AppenFunnel?

It is better to have smaller number of node types which are capable to
kick background workers because of simplification of path construction.

Let's assume the case below. When planner considers a path to append
child scans on rel1, rel2 and rel3 but the cheapest path of rel2 is
Funnel+PartialSeqScan, we cannot put Funnel here unless we don't pull
up Funnel of rel2, can we?

(Append? or Funnel)
--> SeqScan on rel1
--> Funnel
--> PartialSeqScan on rel2
--> IndexScan on rel3

I am not sure, but what problem do you see in putting Funnel node
for one of the relation scans and not for the others.

At this moment, I'm not certain whether background worker can/ought
to launch another background workers.
If sub-Funnel node is executed by 10-processes then it also launch
10-processes for each, 100-processes will run for each?

If we pull Funnel here, I think the plan shall be as follows:
Funnel
--> SeqScan on rel1
--> PartialSeqScan on rel2
--> IndexScan on rel3

So if we go this route, then Funnel should have capability
to execute non-parallel part of plan as well, like in this
case it should be able to execute non-parallel IndexScan on
rel3 as well and then it might need to distinguish between
parallel and non-parallel part of plans. I think this could
make Funnel node complex.

It is difference from what I plan now. In the above example,
Funnel node has two non-parallel aware node (rel1 and rel3)
and one parallel aware node, then three PlannedStmt for each
shall be put on the TOC segment. Even though the background
workers pick up a PlannedStmt from the three, only one worker
can pick up the PlannedStmt for rel1 and rel3, however, rel2
can be executed by multiple workers simultaneously.
(Note: if number of workers are less than three in this case,
PlannedStmt for rel3 shall not be picked up unless any other
worker don't complete to run other plan on rel1 or rel2).

From the standpoint of the Funnel, it just kicks background
workers with:
- multiple PlannedStmt nodes
- maximum number of workers for each plan
in addition to the current form.

Then, it continues to fetch records from the shm_mq.
Probably, it does not change the current form so much.

If all we have to pay attention is Funnel node, it makes the code
around path construction and pull-up logic much simpler, rather than
multiple node types can kick background workers.

Okay, but I think pulling-up Funnel node makes sense only when all
nodes beneath it needs to be executed parallely.

I think its decision should be based on the cost, that includes
additional startup_cost to launch background worker, as long as
non-parallel node is also capable to run on the worker side.

Even though create_parallelscan_paths() in v16 set num_workers not
larger than parallel_seqscan_degree, total number of the concurrent
background workers may exceed this configuration if more than two
PartialSeqScan nodes are underlying.
It is a different configuration from max_worker_processes, so it is
not a matter as long as we have another restriction.
However, how do we control the cap of number of worker processes per
"appendable" Funnel node? For example, if a parent table has 200
child tables but max_worker_processes are configured to 50.
It is obviously impossible to launch all the background workers
simultaneously. One idea I have is to suspend launch of some plans
until earlier ones are completed.

Okay, but I think in that idea you need to re-launch the workers again for
new set of relation scan's which could turn out to be costly, how about
designing some way where workers after completing their assigned work
check for new set of task/'s (which in this case would be to scan a new) and
then execute the same. I think in this way we can achieve dynamic allocation
of work and achieve maximum parallelism with available set of workers.
We have achieved this in ParallelSeqScan by scanning at block level, once
a worker finishes a block, it checks for new block to scan.

Is it possible to put multiple PlannedStmt on TOC, isn't it?

Yes, I don't see any problem in doing that way. So here for
each different (child) relation, you want to create a separate
PlannedStmt or do you have something else in mind?

I plan to create a separate PlannedStmt for each sub-plan, then
a background worker will focus on a particular PlannedStmt until
it completes the current focused one.

If background worker picks up an uncompleted PlannedStmt first
(based on round-robin likely?), it may achieve the maximum
parallelism.

I think this can work well for the cases when there are insufficient
number of workers to execute the different planned statements.

Yep, it is the biggest reason why I like the design than what
I initially proposed; fixed number of workers for each sub-plan.

Yep, it seems to me a good idea which I want to try.
If (num of worker) > (num of sub-plans), some of sub-plans can
have multiple workers from the beginning, then, other workers
also help to execute heavy plans later.
It may be better to put PlannedStmt in order of total_cost to
bias multi-workers execution from the beginning.

Yeah, that might be better, but I think for doing so you might
need to traverse each child plan and compare there costs while
constructing multiple planned statements which might incur some
overhead when number of plans are large, however OTOH this cost
should be much smaller as compare to starting up workers, so
probably it should be okay.

Yep. If we have to execute thousands of child plans, its execution
cost is relatively large, not only planning cost. :-)

TODO: Even if a heavy query occupied most of available worker slots,
another session wants to use parallel execution later but during
execution of the primary query. We may need to have a 'scoreboard'
on shared memory to know how many workers are potentially needed
and how much ones are overused by somebody. If someone overconsumed
background workers, it should exit first, rather than picking up
the next PlannedStmt.

Actually distribution of workers among parallel queriesis a very
tricky problem and I think we have to keep on working on it till
we get some good solution for it.

I agree. Even if initial version adopts simple solution, we can
improve the logic according to our experiences.

Another way to look at dividing the work in this case could be in terms of
chunk-of-blocks, once a worker finishes it current set of block/'s, it should
be
able to get new set of block's to scan. So let us assume if we decide
chunk-size as 32 and total number of blocks in whole inheritance hierarchy
are 3200, then the max workers we should allocate to this scan are 100 and
if we have parallel_seqscan degree lesser than that then we can use those
many workers and then let them scan 32-blocks-at-a-time.

If we use the above multi-PlannedStmt approach, TOC also need to have a counter
to track how many background workers are running on a particular PlannedStmt,
then if enough number of worker is running on the PlannedStmt, next available
worker will skip this PlannedStmt (even if round-robin) or just exit?

I think for a particular PlannedStmt, number of workers must have
been decided before start of execution, so if those many workers are
available to work on that particular PlannedStmt, then next/new
worker should work on next PlannedStmt.

My concern about pre-determined number of workers is, it depends on the
run-time circumstances of concurrent sessions. Even if planner wants to
assign 10-workers on a particular sub-plan, only 4-workers may be
available on the run-time because of consumption by side sessions.
So, I expect only maximum number of workers is meaningful configuration.

Anyway, I think an infrastructure may be needed to avoid too aggressive
parallel execution.

Yes, I think we need some infrastructure for workers if we have
to follow the design discussed above.

So I think we have three main parts to work for this patch.

1. Allocation of work among workers which needs some different
mechanism than ParallelSeqScan Patch.

Yes, I expect to extend the format of TOC, to store multiple PlannedStmt
nodes and state information for each node like PartialSeqScanState.

2. Execution of work by workers and Funnel node and then pass
the results back to upper node. I think this needs some more
work in addition to ParallelSeqScan patch.

I expect we can utilize existing infrastructure here. It just picks
up the records come from the underlying workers, then raise it to
the upper node.

3. Generation of parallel plan for Append node needs somewhat
different mechanism as we might want to have some additional
logic for transaformation of nodes.

I expect set_append_rel_pathlist() is the best location to add
FunnelPath in addition to AppendPath. If its cost is more attractive
than AppendPath, planner will pick up.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#19Amit Kapila
amit.kapila16@gmail.com
In reply to: Kouhei Kaigai (#18)
Re: [DESIGN] ParallelAppend

On Sat, Aug 1, 2015 at 6:39 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

On Tue, Jul 28, 2015 at 6:08 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com>

wrote:

I am not sure, but what problem do you see in putting Funnel node
for one of the relation scans and not for the others.

At this moment, I'm not certain whether background worker can/ought
to launch another background workers.
If sub-Funnel node is executed by 10-processes then it also launch
10-processes for each, 100-processes will run for each?

Yes, that could be more work than current, but what I had in mind
is not that way, rather I was thinking that master backend will only
kick of workers for Funnel nodes in plan.

If we pull Funnel here, I think the plan shall be as follows:
Funnel
--> SeqScan on rel1
--> PartialSeqScan on rel2
--> IndexScan on rel3

So if we go this route, then Funnel should have capability
to execute non-parallel part of plan as well, like in this
case it should be able to execute non-parallel IndexScan on
rel3 as well and then it might need to distinguish between
parallel and non-parallel part of plans. I think this could
make Funnel node complex.

It is difference from what I plan now. In the above example,
Funnel node has two non-parallel aware node (rel1 and rel3)
and one parallel aware node, then three PlannedStmt for each
shall be put on the TOC segment. Even though the background
workers pick up a PlannedStmt from the three, only one worker
can pick up the PlannedStmt for rel1 and rel3, however, rel2
can be executed by multiple workers simultaneously.

Okay, now I got your point, but I think the cost of execution
of non-parallel node by additional worker is not small considering
the communication cost and setting up an addional worker for
each sub-plan (assume the case where out of 100-child nodes
only few (2 or 3) nodes actually need multiple workers).

I think for a particular PlannedStmt, number of workers must have
been decided before start of execution, so if those many workers are
available to work on that particular PlannedStmt, then next/new
worker should work on next PlannedStmt.

My concern about pre-determined number of workers is, it depends on the
run-time circumstances of concurrent sessions. Even if planner wants to
assign 10-workers on a particular sub-plan, only 4-workers may be
available on the run-time because of consumption by side sessions.
So, I expect only maximum number of workers is meaningful configuration.

In that case, there is possibility that many of the workers are just
working on one or two of the nodes and other nodes execution might
get starved. I understand this is tricky problem to allocate number
of workers for different nodes, however we should try to develop any
algorithm where there is some degree of fairness in allocation of workers
for different nodes.

2. Execution of work by workers and Funnel node and then pass
the results back to upper node. I think this needs some more
work in addition to ParallelSeqScan patch.

I expect we can utilize existing infrastructure here. It just picks
up the records come from the underlying workers, then raise it to
the upper node.

Sure, but still you need some work atleast in the area of making
workers understand different node types, I am guessing you need
to modify readfuncs.c to support new plan node if any for this
work.

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#20Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Amit Kapila (#19)
Re: [DESIGN] ParallelAppend

On Sat, Aug 1, 2015 at 6:39 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

On Tue, Jul 28, 2015 at 6:08 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I am not sure, but what problem do you see in putting Funnel node
for one of the relation scans and not for the others.

At this moment, I'm not certain whether background worker can/ought
to launch another background workers.
If sub-Funnel node is executed by 10-processes then it also launch
10-processes for each, 100-processes will run for each?

Yes, that could be more work than current, but what I had in mind
is not that way, rather I was thinking that master backend will only
kick of workers for Funnel nodes in plan.

I agree with, it is fair enough approach, so I mention about
pull-up of Funnel node.

If we pull Funnel here, I think the plan shall be as follows:
Funnel
--> SeqScan on rel1
--> PartialSeqScan on rel2
--> IndexScan on rel3

So if we go this route, then Funnel should have capability
to execute non-parallel part of plan as well, like in this
case it should be able to execute non-parallel IndexScan on
rel3 as well and then it might need to distinguish between
parallel and non-parallel part of plans. I think this could
make Funnel node complex.

It is difference from what I plan now. In the above example,
Funnel node has two non-parallel aware node (rel1 and rel3)
and one parallel aware node, then three PlannedStmt for each
shall be put on the TOC segment. Even though the background
workers pick up a PlannedStmt from the three, only one worker
can pick up the PlannedStmt for rel1 and rel3, however, rel2
can be executed by multiple workers simultaneously.

Okay, now I got your point, but I think the cost of execution
of non-parallel node by additional worker is not small considering
the communication cost and setting up an addional worker for
each sub-plan (assume the case where out of 100-child nodes
only few (2 or 3) nodes actually need multiple workers).

It is a competition between traditional Append that takes Funnel
children and the new appendable Funnel that takes parallel and
non-parallel children. Probably, key factors are cpu_tuple_comm_cost,
parallel_setup_cost and degree of selectivity of sub-plans.
Both cases has advantage and disadvantage depending on the query,
so we can never determine which is better without path consideration.

I think for a particular PlannedStmt, number of workers must have
been decided before start of execution, so if those many workers are
available to work on that particular PlannedStmt, then next/new
worker should work on next PlannedStmt.

My concern about pre-determined number of workers is, it depends on the
run-time circumstances of concurrent sessions. Even if planner wants to
assign 10-workers on a particular sub-plan, only 4-workers may be
available on the run-time because of consumption by side sessions.
So, I expect only maximum number of workers is meaningful configuration.

In that case, there is possibility that many of the workers are just
working on one or two of the nodes and other nodes execution might
get starved. I understand this is tricky problem to allocate number
of workers for different nodes, however we should try to develop any
algorithm where there is some degree of fairness in allocation of workers
for different nodes.

I like to agree, however, I also want to keep the first version as
simple as possible we can. We can develop alternative logic to assign
suitable number of workers later.

2. Execution of work by workers and Funnel node and then pass
the results back to upper node. I think this needs some more
work in addition to ParallelSeqScan patch.

I expect we can utilize existing infrastructure here. It just picks
up the records come from the underlying workers, then raise it to
the upper node.

Sure, but still you need some work atleast in the area of making
workers understand different node types, I am guessing you need
to modify readfuncs.c to support new plan node if any for this
work.

Yes, it was not a creative work. :-)
https://github.com/kaigai/sepgsql/blob/fappend/src/backend/nodes/readfuncs.c#L1479

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#21Amit Kapila
amit.kapila16@gmail.com
In reply to: Kouhei Kaigai (#20)
Re: [DESIGN] ParallelAppend

On Fri, Aug 7, 2015 at 2:15 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

On Sat, Aug 1, 2015 at 6:39 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com>

wrote:

If we pull Funnel here, I think the plan shall be as follows:
Funnel
--> SeqScan on rel1
--> PartialSeqScan on rel2
--> IndexScan on rel3

So if we go this route, then Funnel should have capability
to execute non-parallel part of plan as well, like in this
case it should be able to execute non-parallel IndexScan on
rel3 as well and then it might need to distinguish between
parallel and non-parallel part of plans. I think this could
make Funnel node complex.

It is difference from what I plan now. In the above example,
Funnel node has two non-parallel aware node (rel1 and rel3)
and one parallel aware node, then three PlannedStmt for each
shall be put on the TOC segment. Even though the background
workers pick up a PlannedStmt from the three, only one worker
can pick up the PlannedStmt for rel1 and rel3, however, rel2
can be executed by multiple workers simultaneously.

Okay, now I got your point, but I think the cost of execution
of non-parallel node by additional worker is not small considering
the communication cost and setting up an addional worker for
each sub-plan (assume the case where out of 100-child nodes
only few (2 or 3) nodes actually need multiple workers).

It is a competition between traditional Append that takes Funnel
children and the new appendable Funnel that takes parallel and
non-parallel children. Probably, key factors are cpu_tuple_comm_cost,
parallel_setup_cost and degree of selectivity of sub-plans.
Both cases has advantage and disadvantage depending on the query,
so we can never determine which is better without path consideration.

Sure, that is what we should do, however the tricky part would be when
the path for doing local scan is extremely cheaper than path for parallel
scan for one of the child nodes. For such cases, pulling up Funnel-node
can incur more cost. I think some of the other possible ways to make this
work could be to extend Funnel so that it is capable of executing both
parallel
and non-parallel nodes, have a new Funnel like node which has such a
capability.

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#22Robert Haas
robertmhaas@gmail.com
In reply to: Kouhei Kaigai (#1)
Re: [DESIGN] ParallelAppend

On Sat, Jul 25, 2015 at 11:13 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

Now that I've got more of the parallel infrastructure committed, I've
been starting to have a little time to think about what we might want
to do after we get PartialSeqScan committed. I'm positive on doing
something with the Append node and parallelism, but I'm negative on
doing what you've described here.

I don't think the Append node has any business launching workers.
That's the job of Gather. Append may need to do something
parallel-aware, but starting workers is not that thing. Without
making any changes at all to Append, we can use it like this:

Gather
-> Append
-> Partial Seq Scan on p1
-> Partial Seq Scan on p2
-> Partial Seq Scan on p3

The Gather node will spin up workers and in each worker we will ask
the Append nodes for tuples. Append will ask the first
not-yet-completed child for tuples, so the workers will cooperatively
scan first p1, then p2, then p3. This is great: instead of merely
doing a parallel seq scan of a single table, we can do a parallel seq
scan of a partitioned table. However, there are two improvements we
can make. First, we can teach Append that, when running in parallel,
it should initialize a chunk of dynamic shared memory with an array
indicating how many workers are currently working on each subplan.
Each new worker should join a subplan with the minimum number of
workers, work on that one until it's completely, and then pick a new
subplan. This minimizes contention. Second, we can teach the Append
to serve as a parent not only for intrinsically parallel nodes like
Partial Seq Scan, but also for other nodes, say, an Index Scan. When
an Append is running in parallel but with non-parallel-aware children,
each such child can be claimed by at most one worker process and will
be run to completion by that worker process. For example:

Gather
-> Append
-> Index Scan on p1
-> Partial Seq Scan on p2
-> Index Scan on p3

The first worker which executes the Append should begin the index scan
on p1 and the second should begin the index scan on p3. The remaining
workers, and those two once they're finished, can work on p2.

Proceeding in this way, I don't think we need a separate Parallel
Append node. Rather, we can just give the existing Append node some
extra smarts that are used only when it's running in parallel.

We can also push other things in between the Gather and the Append,
which wouldn't be possible in your design. For example, consider a
join between a partitioned table p and an unpartitioned table q. We
could do this:

Gather
-> Nested Loop
-> Append
-> Index Scan on p1
-> Partial Seq Scan on p2
-> Index Scan on p3
-> Index Scan on q
Index Cond q.x = p.x

That's a pretty sweet plan. Assuming p1, p2, and p3 are all
reasonably large, we could probably benefit from throwing at least 3
processes at this plan tree - maybe more, if p2 is really big. Only
one process can work on each of p1 and p3, but p2, since it has a
truly parallel plan, can soak up as many as we want to throw at it
(although performance may top out at some point if we're I/O-bound).

Sorry for taking so long to give a really substantive reply on this,
but it wasn't until this last week or so that I really had time to
think about this in detail.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#23Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Robert Haas (#22)
Re: [DESIGN] ParallelAppend

On Sat, Jul 25, 2015 at 11:13 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I'm recently working/investigating on ParallelAppend feature
towards the next commit fest. Below is my design proposal.

1. Concept
----------
Its concept is quite simple anybody might consider more than once.
ParallelAppend node kicks background worker process to execute
child nodes in parallel / asynchronous.
It intends to improve the performance to scan a large partitioned
tables from standpoint of entire throughput, however, latency of
the first multi-hundred rows are not scope of this project.
From standpoint of technology trend, it primarily tries to utilize
multi-cores capability within a system, but also enables to expand
distributed database environment using foreign-tables inheritance
features.
Its behavior is very similar to Funnel node except for several
points, thus, we can reuse its infrastructure we have had long-
standing discussion through the v9.5 development cycle.

Now that I've got more of the parallel infrastructure committed, I've
been starting to have a little time to think about what we might want
to do after we get PartialSeqScan committed. I'm positive on doing
something with the Append node and parallelism, but I'm negative on
doing what you've described here.

I don't think the Append node has any business launching workers.
That's the job of Gather. Append may need to do something
parallel-aware, but starting workers is not that thing. Without
making any changes at all to Append, we can use it like this:

Gather
-> Append
-> Partial Seq Scan on p1
-> Partial Seq Scan on p2
-> Partial Seq Scan on p3

The Gather node will spin up workers and in each worker we will ask
the Append nodes for tuples. Append will ask the first
not-yet-completed child for tuples, so the workers will cooperatively
scan first p1, then p2, then p3. This is great: instead of merely
doing a parallel seq scan of a single table, we can do a parallel seq
scan of a partitioned table. However, there are two improvements we
can make. First, we can teach Append that, when running in parallel,
it should initialize a chunk of dynamic shared memory with an array
indicating how many workers are currently working on each subplan.
Each new worker should join a subplan with the minimum number of
workers, work on that one until it's completely, and then pick a new
subplan. This minimizes contention. Second, we can teach the Append
to serve as a parent not only for intrinsically parallel nodes like
Partial Seq Scan, but also for other nodes, say, an Index Scan. When
an Append is running in parallel but with non-parallel-aware children,
each such child can be claimed by at most one worker process and will
be run to completion by that worker process. For example:

Gather
-> Append
-> Index Scan on p1
-> Partial Seq Scan on p2
-> Index Scan on p3

The first worker which executes the Append should begin the index scan
on p1 and the second should begin the index scan on p3. The remaining
workers, and those two once they're finished, can work on p2.

Proceeding in this way, I don't think we need a separate Parallel
Append node. Rather, we can just give the existing Append node some
extra smarts that are used only when it's running in parallel.

I entirely agree with your suggestion.

We may be able to use an analogy between PartialSeqScan and the
parallel- aware Append node.
PartialSeqScan fetches blocks pointed by the index on shared memory
segment, thus multiple workers eventually co-operate to scan a table
using round-robin manner even though individual worker fetches comb-
shaped blocks.
If we assume individual blocks are individual sub-plans on the parallel
aware Append, it performs very similar. A certain number of workers
(more than zero) is launched by Gather node, then the parallel aware
Append node fetches one of the sub-plans if any.

I think, this design also gives additional flexibility according to
the required parallelism by the underlying sub-plans.
Please assume the "PartialSeqScan on p2" in the above example wants
3 workers to process the scan, we can expand the virtual array of
the sub-plans as follows. Then, if Gather node kicks 5 workers,
individual workers are assigned on some of plans. If Gather node
could kick less than 5 workers, the first exit worker picks the
second sub-plan, then it eventually provides the best parallelism.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

Here is no matter even if Append node has multiple parallel-aware
sub-plans. When "PartialSeqScan on p4" is added, all we need to do
is expand the above virtual array of the sub-plans.

For more generic plan construction, Plan node may have a field for
number of "desirable" workers even though most of plan-nodes are not
parallel aware, and it is not guaranteed.
In above case, the parallel aware Append will want 5 workers in total
(2 by 2 index-scans, plus 3 by a partial-seq-scan). It is a discretion
of Gather node how many workers are actually launched, however, it
will give clear information how many workers are best.

First, we can teach Append that, when running in parallel,
it should initialize a chunk of dynamic shared memory with an array
indicating how many workers are currently working on each subplan.

Can the parallel-aware Append can recognize the current mode using
MyBgworkerEntry whether it is valid or not?

We can also push other things in between the Gather and the Append,
which wouldn't be possible in your design. For example, consider a
join between a partitioned table p and an unpartitioned table q. We
could do this:

Gather
-> Nested Loop
-> Append
-> Index Scan on p1
-> Partial Seq Scan on p2
-> Index Scan on p3
-> Index Scan on q
Index Cond q.x = p.x

That's a pretty sweet plan. Assuming p1, p2, and p3 are all
reasonably large, we could probably benefit from throwing at least 3
processes at this plan tree - maybe more, if p2 is really big. Only
one process can work on each of p1 and p3, but p2, since it has a
truly parallel plan, can soak up as many as we want to throw at it
(although performance may top out at some point if we're I/O-bound).

We can also leverage the parallel capability on hash-join and merge-join
(even though it is not all the cases; in case when Append node runs on
the partitioned table with CHECK() constraint).
It is the reason why my colleague works for feature of the join before
append.

/messages/by-id/12A9442FBAE80D4E8953883E0B84E088606D16@BPXM01GP.gisp.nec.co.jp

Sorry for taking so long to give a really substantive reply on this,
but it wasn't until this last week or so that I really had time to
think about this in detail.

No worry, I also couldn't make a valid progress due to various jobs
not only community works...

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#24Robert Haas
robertmhaas@gmail.com
In reply to: Kouhei Kaigai (#23)
Re: [DESIGN] ParallelAppend

On Sun, Oct 25, 2015 at 9:23 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I entirely agree with your suggestion.

We may be able to use an analogy between PartialSeqScan and the
parallel- aware Append node.
PartialSeqScan fetches blocks pointed by the index on shared memory
segment, thus multiple workers eventually co-operate to scan a table
using round-robin manner even though individual worker fetches comb-
shaped blocks.
If we assume individual blocks are individual sub-plans on the parallel
aware Append, it performs very similar. A certain number of workers
(more than zero) is launched by Gather node, then the parallel aware
Append node fetches one of the sub-plans if any.

Exactly, except for the part about the blocks being "comb-shaped",
which doesn't seem to make sense.

I think, this design also gives additional flexibility according to
the required parallelism by the underlying sub-plans.
Please assume the "PartialSeqScan on p2" in the above example wants
3 workers to process the scan, we can expand the virtual array of
the sub-plans as follows. Then, if Gather node kicks 5 workers,
individual workers are assigned on some of plans. If Gather node
could kick less than 5 workers, the first exit worker picks the
second sub-plan, then it eventually provides the best parallelism.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

I don't think the shared memory chunk should be indexed by worker, but
by sub-plan. So with 3 subplans, we would initially have [0,0,0].
The first worker would grab the first subplan, and we get [1,0,0].
The second worker grabs the third subplan, so now we have [1,0,1].
The remaining workers can't join the execution of those plans because
they are not truly parallel, so they all gang up on the second
subplan. At 5 workers we have [1,3,1]. Workers need not ever
decrement the array entries because they only pick a new sub-plan when
the one they picked previously is exhausted; thus, at the end of the
plan, each element in the array shows the total number of workers that
touched it at some point during its execution.

For more generic plan construction, Plan node may have a field for
number of "desirable" workers even though most of plan-nodes are not
parallel aware, and it is not guaranteed.
In above case, the parallel aware Append will want 5 workers in total
(2 by 2 index-scans, plus 3 by a partial-seq-scan). It is a discretion
of Gather node how many workers are actually launched, however, it
will give clear information how many workers are best.

Yeah, maybe. I haven't thought about this deeply just yet, but I
agree it needs more consideration.

First, we can teach Append that, when running in parallel,
it should initialize a chunk of dynamic shared memory with an array
indicating how many workers are currently working on each subplan.

Can the parallel-aware Append can recognize the current mode using
MyBgworkerEntry whether it is valid or not?

No - that would be quite wrong. What it needs to do is define
ExecAppendEstimate and ExecAppendInitializeDSM and call those
functions from ExecParallelEstimate and ExecParallelInitializeDSM. It
also needs to define a third callback ExecAppendInitializeWorker which
will be called from the ExecParallelInitializeWorker function added by
the latest partial seq scan patch. ExecAppendEstimate must estimate
required shared memory usage for the shared memory state;
ExecAppendInitializeDSM must initialize that state, store a pointer to
it in the planstate note, and add a TOC entry; ExecAppendWorker will
run in the worker and should look up the TOC entry and store the
result in the same planstate node that ExecAppendInitializeDSM
populated in the leader. Then ExecAppend can decide what to do based
on whether that pointer is set, and based on the data to which it
points.

Are you going to look at implementing this?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#25Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Robert Haas (#24)
Re: [DESIGN] ParallelAppend

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Robert Haas
Sent: Monday, October 26, 2015 8:53 PM
To: Kaigai Kouhei(海外 浩平)
Cc: pgsql-hackers@postgresql.org; Amit Kapila; Kyotaro HORIGUCHI
Subject: Re: [HACKERS] [DESIGN] ParallelAppend

On Sun, Oct 25, 2015 at 9:23 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I entirely agree with your suggestion.

We may be able to use an analogy between PartialSeqScan and the
parallel- aware Append node.
PartialSeqScan fetches blocks pointed by the index on shared memory
segment, thus multiple workers eventually co-operate to scan a table
using round-robin manner even though individual worker fetches comb-
shaped blocks.
If we assume individual blocks are individual sub-plans on the parallel
aware Append, it performs very similar. A certain number of workers
(more than zero) is launched by Gather node, then the parallel aware
Append node fetches one of the sub-plans if any.

Exactly, except for the part about the blocks being "comb-shaped",
which doesn't seem to make sense.

I think, this design also gives additional flexibility according to
the required parallelism by the underlying sub-plans.
Please assume the "PartialSeqScan on p2" in the above example wants
3 workers to process the scan, we can expand the virtual array of
the sub-plans as follows. Then, if Gather node kicks 5 workers,
individual workers are assigned on some of plans. If Gather node
could kick less than 5 workers, the first exit worker picks the
second sub-plan, then it eventually provides the best parallelism.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

I don't think the shared memory chunk should be indexed by worker, but
by sub-plan. So with 3 subplans, we would initially have [0,0,0].
The first worker would grab the first subplan, and we get [1,0,0].
The second worker grabs the third subplan, so now we have [1,0,1].
The remaining workers can't join the execution of those plans because
they are not truly parallel, so they all gang up on the second
subplan. At 5 workers we have [1,3,1]. Workers need not ever
decrement the array entries because they only pick a new sub-plan when
the one they picked previously is exhausted; thus, at the end of the
plan, each element in the array shows the total number of workers that
touched it at some point during its execution.

Sorry, I could not get the point in the above explanation.
The 1st worker would grab the first subplan, then [1,0,0]. It's OK.
The 2nd worker would grab the last subplan, then [1,0,1]. It's
understandable, even though I'm uncertain why it does not pick up
the 2nd one.
Why remaining worker have to gang up to kick the 2nd (PartialSeqScan;
that is parallel-aware execution node)?

Even if only one worker is kicked towards the PartialSeqScan, it tries
to scan the relation sequentially (because of no parallel workers actually).
Then, once other worker gets additionally launched to scan same relation,
both of the worker begins to co-operate using a common block-index kept
on the shared memory.
So, do we need to wait for completion of non-parallel aware nodes here?

I assume that it is better to launch PartialSeqScan, even if one worker,
than synchronization, because other worker can join the execution later.

For more generic plan construction, Plan node may have a field for
number of "desirable" workers even though most of plan-nodes are not
parallel aware, and it is not guaranteed.
In above case, the parallel aware Append will want 5 workers in total
(2 by 2 index-scans, plus 3 by a partial-seq-scan). It is a discretion
of Gather node how many workers are actually launched, however, it
will give clear information how many workers are best.

Yeah, maybe. I haven't thought about this deeply just yet, but I
agree it needs more consideration.

OK, I'll build a patch including the concept.

First, we can teach Append that, when running in parallel,
it should initialize a chunk of dynamic shared memory with an array
indicating how many workers are currently working on each subplan.

Can the parallel-aware Append can recognize the current mode using
MyBgworkerEntry whether it is valid or not?

No - that would be quite wrong. What it needs to do is define
ExecAppendEstimate and ExecAppendInitializeDSM and call those
functions from ExecParallelEstimate and ExecParallelInitializeDSM. It
also needs to define a third callback ExecAppendInitializeWorker which
will be called from the ExecParallelInitializeWorker function added by
the latest partial seq scan patch. ExecAppendEstimate must estimate
required shared memory usage for the shared memory state;
ExecAppendInitializeDSM must initialize that state, store a pointer to
it in the planstate note, and add a TOC entry; ExecAppendWorker will
run in the worker and should look up the TOC entry and store the
result in the same planstate node that ExecAppendInitializeDSM
populated in the leader. Then ExecAppend can decide what to do based
on whether that pointer is set, and based on the data to which it
points.

Thanks for the clear direction.

Are you going to look at implementing this?

I feel the scale of implementation is not large, if Append node itself
is not capable to launch a new worker process. Let me try it.

Best regards,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#26Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Kouhei Kaigai (#25)
Re: [DESIGN] ParallelAppend

At PGconf.EU, I could have a talk with Robert about this topic,
then it became clear we have same idea.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

In the above example, I put non-parallel sub-plan to use only
1 slot of the array, even though a PartialSeqScan takes 3 slots.
It is a strict rule; non-parallel aware sub-plan can be picked
up once.
The index of sub-plan array is initialized to 0, then increased
to 5 by each workers when it processes the parallel-aware Append.
So, once a worker takes non-parallel sub-plan, other worker can
never take the same slot again, thus, no duplicated rows will be
produced by non-parallel sub-plan in the parallel aware Append.
Also, this array structure will prevent too large number of
workers pick up a particular parallel aware sub-plan, because
PartialSeqScan occupies 3 slots; that means at most three workers
can pick up this sub-plan. If 1st worker took the IndexScan on
p1, and 2nd-4th worker took the PartialSeqScan on p2, then the
5th worker (if any) will pick up the IndexScan on p3 even if
PartialSeqScan on p2 was not completed.

One other thought experiment, what happen if parallel-aware
Append is underlying another parallel-aware Append.
As literal, parallel-aware Append is parallel-aware, thus, it
can occupy multiple slots in the array of sub-plans, like:

subplans[0] ... SeqScan on p1
subplans[1] ... Parallel Append on p2+p3+p4
subplans[2] ... Parallel Append on p2+p3+p4
subplans[3] ... Parallel Append on p2+p3+p4
subplans[4] ... Parallel Append on p2+p3+p4
subplans[5] ... IndexScan on p5

Also, assume the child parallel-aware Append the following
array.
subplans[0] ... SeqScan on p2
subplans[1] ... PartialSeqScan on p3
subplans[2] ... PartialSeqScan on p3
subplans[3] ... SeqScan on p4

The Gather node located on top of the upper Append node will
launch (at most) 6 workers according to the requirement by
the upper Append node.
Each worker picks up a particular sub-plan in the array of
upper Append node, so some of them (4 workers at most) will
execute the child Append node.
Then, these 4 workers will also pick up a particular sub-plan
in the array of child Append node.
It will work even if number of workers are less than the optimal,
so I believe the overall design is reasonable.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Tuesday, October 27, 2015 6:46 AM
To: Robert Haas
Cc: pgsql-hackers@postgresql.org; Amit Kapila; Kyotaro HORIGUCHI
Subject: Re: [HACKERS] [DESIGN] ParallelAppend

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Robert Haas
Sent: Monday, October 26, 2015 8:53 PM
To: Kaigai Kouhei(海外 浩平)
Cc: pgsql-hackers@postgresql.org; Amit Kapila; Kyotaro HORIGUCHI
Subject: Re: [HACKERS] [DESIGN] ParallelAppend

On Sun, Oct 25, 2015 at 9:23 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I entirely agree with your suggestion.

We may be able to use an analogy between PartialSeqScan and the
parallel- aware Append node.
PartialSeqScan fetches blocks pointed by the index on shared memory
segment, thus multiple workers eventually co-operate to scan a table
using round-robin manner even though individual worker fetches comb-
shaped blocks.
If we assume individual blocks are individual sub-plans on the parallel
aware Append, it performs very similar. A certain number of workers
(more than zero) is launched by Gather node, then the parallel aware
Append node fetches one of the sub-plans if any.

Exactly, except for the part about the blocks being "comb-shaped",
which doesn't seem to make sense.

I think, this design also gives additional flexibility according to
the required parallelism by the underlying sub-plans.
Please assume the "PartialSeqScan on p2" in the above example wants
3 workers to process the scan, we can expand the virtual array of
the sub-plans as follows. Then, if Gather node kicks 5 workers,
individual workers are assigned on some of plans. If Gather node
could kick less than 5 workers, the first exit worker picks the
second sub-plan, then it eventually provides the best parallelism.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

I don't think the shared memory chunk should be indexed by worker, but
by sub-plan. So with 3 subplans, we would initially have [0,0,0].
The first worker would grab the first subplan, and we get [1,0,0].
The second worker grabs the third subplan, so now we have [1,0,1].
The remaining workers can't join the execution of those plans because
they are not truly parallel, so they all gang up on the second
subplan. At 5 workers we have [1,3,1]. Workers need not ever
decrement the array entries because they only pick a new sub-plan when
the one they picked previously is exhausted; thus, at the end of the
plan, each element in the array shows the total number of workers that
touched it at some point during its execution.

Sorry, I could not get the point in the above explanation.
The 1st worker would grab the first subplan, then [1,0,0]. It's OK.
The 2nd worker would grab the last subplan, then [1,0,1]. It's
understandable, even though I'm uncertain why it does not pick up
the 2nd one.
Why remaining worker have to gang up to kick the 2nd (PartialSeqScan;
that is parallel-aware execution node)?

Even if only one worker is kicked towards the PartialSeqScan, it tries
to scan the relation sequentially (because of no parallel workers actually).
Then, once other worker gets additionally launched to scan same relation,
both of the worker begins to co-operate using a common block-index kept
on the shared memory.
So, do we need to wait for completion of non-parallel aware nodes here?

I assume that it is better to launch PartialSeqScan, even if one worker,
than synchronization, because other worker can join the execution later.

For more generic plan construction, Plan node may have a field for
number of "desirable" workers even though most of plan-nodes are not
parallel aware, and it is not guaranteed.
In above case, the parallel aware Append will want 5 workers in total
(2 by 2 index-scans, plus 3 by a partial-seq-scan). It is a discretion
of Gather node how many workers are actually launched, however, it
will give clear information how many workers are best.

Yeah, maybe. I haven't thought about this deeply just yet, but I
agree it needs more consideration.

OK, I'll build a patch including the concept.

First, we can teach Append that, when running in parallel,
it should initialize a chunk of dynamic shared memory with an array
indicating how many workers are currently working on each subplan.

Can the parallel-aware Append can recognize the current mode using
MyBgworkerEntry whether it is valid or not?

No - that would be quite wrong. What it needs to do is define
ExecAppendEstimate and ExecAppendInitializeDSM and call those
functions from ExecParallelEstimate and ExecParallelInitializeDSM. It
also needs to define a third callback ExecAppendInitializeWorker which
will be called from the ExecParallelInitializeWorker function added by
the latest partial seq scan patch. ExecAppendEstimate must estimate
required shared memory usage for the shared memory state;
ExecAppendInitializeDSM must initialize that state, store a pointer to
it in the planstate note, and add a TOC entry; ExecAppendWorker will
run in the worker and should look up the TOC entry and store the
result in the same planstate node that ExecAppendInitializeDSM
populated in the leader. Then ExecAppend can decide what to do based
on whether that pointer is set, and based on the data to which it
points.

Thanks for the clear direction.

Are you going to look at implementing this?

I feel the scale of implementation is not large, if Append node itself
is not capable to launch a new worker process. Let me try it.

Best regards,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#27Robert Haas
robertmhaas@gmail.com
In reply to: Kouhei Kaigai (#26)
Re: [DESIGN] ParallelAppend

On Wed, Oct 28, 2015 at 3:55 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

At PGconf.EU, I could have a talk with Robert about this topic,
then it became clear we have same idea.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

In the above example, I put non-parallel sub-plan to use only
1 slot of the array, even though a PartialSeqScan takes 3 slots.
It is a strict rule; non-parallel aware sub-plan can be picked
up once.
The index of sub-plan array is initialized to 0, then increased
to 5 by each workers when it processes the parallel-aware Append.
So, once a worker takes non-parallel sub-plan, other worker can
never take the same slot again, thus, no duplicated rows will be
produced by non-parallel sub-plan in the parallel aware Append.
Also, this array structure will prevent too large number of
workers pick up a particular parallel aware sub-plan, because
PartialSeqScan occupies 3 slots; that means at most three workers
can pick up this sub-plan. If 1st worker took the IndexScan on
p1, and 2nd-4th worker took the PartialSeqScan on p2, then the
5th worker (if any) will pick up the IndexScan on p3 even if
PartialSeqScan on p2 was not completed.

Actually, this is not exactly what I had in mind. I was thinking that
we'd have a single array whose length is equal to the number of Append
subplans, and each element of the array would be a count of the number
of workers executing that subplan. So there wouldn't be multiple
entries for the same subplan, as you propose here. To distinguish
between parallel-aware and non-parallel-aware plans, I plan to put a
Boolean flag in the plan itself.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#28Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Robert Haas (#27)
Re: [DESIGN] ParallelAppend

On Wed, Oct 28, 2015 at 3:55 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

At PGconf.EU, I could have a talk with Robert about this topic,
then it became clear we have same idea.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

In the above example, I put non-parallel sub-plan to use only
1 slot of the array, even though a PartialSeqScan takes 3 slots.
It is a strict rule; non-parallel aware sub-plan can be picked
up once.
The index of sub-plan array is initialized to 0, then increased
to 5 by each workers when it processes the parallel-aware Append.
So, once a worker takes non-parallel sub-plan, other worker can
never take the same slot again, thus, no duplicated rows will be
produced by non-parallel sub-plan in the parallel aware Append.
Also, this array structure will prevent too large number of
workers pick up a particular parallel aware sub-plan, because
PartialSeqScan occupies 3 slots; that means at most three workers
can pick up this sub-plan. If 1st worker took the IndexScan on
p1, and 2nd-4th worker took the PartialSeqScan on p2, then the
5th worker (if any) will pick up the IndexScan on p3 even if
PartialSeqScan on p2 was not completed.

Actually, this is not exactly what I had in mind. I was thinking that
we'd have a single array whose length is equal to the number of Append
subplans, and each element of the array would be a count of the number
of workers executing that subplan. So there wouldn't be multiple
entries for the same subplan, as you propose here. To distinguish
between parallel-aware and non-parallel-aware plans, I plan to put a
Boolean flag in the plan itself.

I don't have strong preference here. Both of design can implement the
requirement; none-parallel sub-plans are never picked up twice, and
parallel-aware sub-plans can be picked up multiple times.
So, I'll start with the above your suggestion.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#29Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Kouhei Kaigai (#28)
Re: [DESIGN] ParallelAppend

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

The commit f0661c4e8c44c0ec7acd4ea7c82e85b265447398 added
'parallel_aware' flag on Path and Plan structure.
It tells us whether the plan/path-node can perform by multiple
background worker processes concurrently, but also tells us
nothing whether the plan/path-node are safe to run on background
worker process context.

From the standpoint of parallel execution, I understand here are
three types of plan/path nodes.

Type-A) It can be performed on background worker context and
picked up by multiple worker processes concurrently.
(e.g: Parallel SeqScan)
Type-B) It can be performed on background worker context but
cannot be picked up by multiple worker processes.
(e.g: non-parallel aware node)
Type-C) It should not be performed on background worker context.
(e.g: plan/path node with volatile functions)

The 'parallel_aware' flag allows to distinguish the type-A and
others, however, we cannot distinguish type-B from type-C.
From the standpoint of parallel append, it makes sense to launch
background workers even if all the sub-plan are type-B, with no
type-A node.

Sorry for late. I'd like to suggest to have 'int num_workers'
in Path and Plan node as a common field.
We give this field the following three meaning.
- If num_workers > 1, it is type-A node, thus, parallel aware
Append node shall assign more than one workers on this node.
- If num_workers == 1, it is type-B node, thus, more than one
background worker process shall be never assigned.
- If num_workers == 0, it is type-C node, thus, planner shall
give up to run this node on background worker context.

The num_workers state shall propagate to the upper node.
For example, I expect a HashJoin node that takes Parallel
SeqScan with num_workers == 4 as outer input will also have
num_worker == 4, as long as join clauses are safe to run on
background worker side.

How about the idea?

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

-----Original Message-----
From: pgsql-hackers-owner@postgresql.org
[mailto:pgsql-hackers-owner@postgresql.org] On Behalf Of Kouhei Kaigai
Sent: Saturday, October 31, 2015 1:35 AM
To: Robert Haas
Cc: pgsql-hackers@postgresql.org; Amit Kapila; Kyotaro HORIGUCHI
Subject: Re: [HACKERS] [DESIGN] ParallelAppend

On Wed, Oct 28, 2015 at 3:55 PM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

At PGconf.EU, I could have a talk with Robert about this topic,
then it became clear we have same idea.

+--------+
|sub-plan |       * Sub-Plan 1 ... Index Scan on p1
|index on *-----> * Sub-Plan 2 ... PartialSeqScan on p2
|shared   |       * Sub-Plan 2 ... PartialSeqScan on p2
|memory   |       * Sub-Plan 2 ... PartialSeqScan on p2
+---------+       * Sub-Plan 3 ... Index Scan on p3

In the above example, I put non-parallel sub-plan to use only
1 slot of the array, even though a PartialSeqScan takes 3 slots.
It is a strict rule; non-parallel aware sub-plan can be picked
up once.
The index of sub-plan array is initialized to 0, then increased
to 5 by each workers when it processes the parallel-aware Append.
So, once a worker takes non-parallel sub-plan, other worker can
never take the same slot again, thus, no duplicated rows will be
produced by non-parallel sub-plan in the parallel aware Append.
Also, this array structure will prevent too large number of
workers pick up a particular parallel aware sub-plan, because
PartialSeqScan occupies 3 slots; that means at most three workers
can pick up this sub-plan. If 1st worker took the IndexScan on
p1, and 2nd-4th worker took the PartialSeqScan on p2, then the
5th worker (if any) will pick up the IndexScan on p3 even if
PartialSeqScan on p2 was not completed.

Actually, this is not exactly what I had in mind. I was thinking that
we'd have a single array whose length is equal to the number of Append
subplans, and each element of the array would be a count of the number
of workers executing that subplan. So there wouldn't be multiple
entries for the same subplan, as you propose here. To distinguish
between parallel-aware and non-parallel-aware plans, I plan to put a
Boolean flag in the plan itself.

I don't have strong preference here. Both of design can implement the
requirement; none-parallel sub-plans are never picked up twice, and
parallel-aware sub-plans can be picked up multiple times.
So, I'll start with the above your suggestion.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#30Amit Langote
Langote_Amit_f8@lab.ntt.co.jp
In reply to: Kouhei Kaigai (#29)
Re: [DESIGN] ParallelAppend

On 2015/11/12 14:09, Kouhei Kaigai wrote:

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

The commit f0661c4e8c44c0ec7acd4ea7c82e85b265447398 added
'parallel_aware' flag on Path and Plan structure.
It tells us whether the plan/path-node can perform by multiple
background worker processes concurrently, but also tells us
nothing whether the plan/path-node are safe to run on background
worker process context.

When I was looking at the recent parallelism related commits, I noticed a
RelOptInfo.consider_parallel flag. That and the function
set_rel_consider_parallel() may be of interest in this regard.
set_append_rel_size() passes the parent's state of this flag down to child
relations but I guess that's not what you're after.

Thanks,
Amit

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#31Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Amit Langote (#30)
Re: [DESIGN] ParallelAppend

On 2015/11/12 14:09, Kouhei Kaigai wrote:

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

The commit f0661c4e8c44c0ec7acd4ea7c82e85b265447398 added
'parallel_aware' flag on Path and Plan structure.
It tells us whether the plan/path-node can perform by multiple
background worker processes concurrently, but also tells us
nothing whether the plan/path-node are safe to run on background
worker process context.

When I was looking at the recent parallelism related commits, I noticed a
RelOptInfo.consider_parallel flag. That and the function
set_rel_consider_parallel() may be of interest in this regard.
set_append_rel_size() passes the parent's state of this flag down to child
relations but I guess that's not what you're after.

Thanks for this information. Indeed, it shall inform us which base
relations are valid for parallel execution.
In case of parallel-append, we can give up parallelism if any of
underlying base relation is not parallel aware. We can use same
logic for join relation cases, potentially.

A challenge is how to count up optimal number of background worker
process. I assume the number of workers of parallel-append shall be
sum of required number of workers by the sub-plans unless it does not
exceed max_parallel_degree.
Probably, we need pathnode_tree_walker() to count up number of workers
required by the sub-plans.

BTW, is the idea of consider_parallel flag in RelOptInfo workable for
join relations also? In case when A LEFT JOIN B for example, it can be
parallel aware if join path has A as outer input, but it cannot be if
B would be outer input.
I doubt that this kind of information belong to Path, not RelOptInfo.

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#32Robert Haas
robertmhaas@gmail.com
In reply to: Kouhei Kaigai (#29)
1 attachment(s)
Re: [DESIGN] ParallelAppend

On Thu, Nov 12, 2015 at 12:09 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

I've been thinking about these questions for a bit now, and I think we
can work on improving Append in multiple phases. The attached patch
shows what I have in mind for phase 1. Currently, if you set up an
inheritance hierarchy, you might get an Append some of whose children
are Gather nodes with Parallel Seq Scans under them, like this:

Append
-> Gather
-> Parallel Seq Scan
-> Gather
-> Parallel Seq Scan
-> Seq Scan

This is a crappy plan. Each Gather node will try to launch its own
bunch of workers, which sucks. The attached patch lets you get this
plan instead:

Gather
-> Append
-> Partial Seq Scan
-> Partial Seq Scan
-> Partial Seq Scan

That's a much better plan.

To make this work, this plan introduces a couple of new concepts.
Each RelOptInfo gets a new partial_pathlist field, which stores paths
that need to be gathered to produce a workable plan. Once we've
populated the partial_pathlist with whatever partial paths we know how
to generate, we can either push a Gather node on top of one of those
partial paths to create a real path; or we can use those partial paths
to build more partial paths. The current patch handles only the
simplest case: we can build a partial path for an appendrel by
appending a partial path for each member rel. But eventually I hope
to handle joinrels this way as well: you can join a partial path with
an ordinary path for form a partial path for the joinrel.

This requires some way of figuring out how many workers to request for
the append-path, so this patch also adds a parallel_degree field to
the path object, which is 0 for non-parallel things and the number of
workers that the path believes to be ideal otherwise. Right now, it
just percolates the highest worker count of any child up to the
appendrel, which might not be right, especially once the append node
knows how to balance workers, but it seems like a reasonable place to
start.

Type-A) It can be performed on background worker context and
picked up by multiple worker processes concurrently.
(e.g: Parallel SeqScan)
Type-B) It can be performed on background worker context but
cannot be picked up by multiple worker processes.
(e.g: non-parallel aware node)
Type-C) It should not be performed on background worker context.
(e.g: plan/path node with volatile functions)

At the time that we're forming an AppendPath, we can identify what
you're calling type-A paths very easily: they're the ones that are
coming from the partial_pathlist. We can distinguish between type-B
and type-C paths coming from the childrel's pathlist based on the
childrel's consider_parallel flag: if it's false, it's type-C, else
type-B. At some point, we might need a per-path flag to distinguish
cases where a particular path is type-C even though some other plan
for that relation might be type-B. But right now that case doesn't
arise.

Now, of course, it's not good enough to have this information
available when we're generating the AppendPath; it has to survive
until execution time. But that doesn't mean the paths need to be
self-identifying. We could, of course, decide to make them so, and
maybe that's the best design, but I'm thinking about another approach:
suppose the append node itself, instead of having one list of child
plans, has a list of type-A plans, a list of type-B plans, and a list
of type-C plans. This actually seems quite convenient, because at
execution time, you presumably want the leader to prioritize type-C
plans over any of the others, since it's the only one that can execute
them, and the workers to prioritize type-B plans, since they can only
take one worker each and are thus prone to be the limiting factor on
finishing the whole Append. Having the plans divided in advance
between these three lists (say, restricted_plans, safe_plans,
parallel_plans) makes that easy to implement.

Incidentally, I think it's subtly wrong to think of the parallel_aware
flag as telling you whether the plan can absorb multiple workers.
That's not really what it's for. It's to tell you whether the plan is
doing *something* parallel aware - that is, whether its Estimate,
InitializeDSM, and InitializeWorker callbacks should do anything. For
SeqScan, flipping parallel_aware actually does split the input among
all the workers, but for Append it's probably just load balances and
for other nodes it might be something else again. The term I'm using
to indicate a path/plan that returns only a subset of the results in
each worker is "partial". Whether or not a path is partial is, in the
design embodied in this patch, indicated both by whether
path->parallel_degree > 0 and whether the path is in rel->pathlist or
rel->partial_pathlist.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachments:

gather-append-pushdown-v1.patchtext/x-diff; charset=US-ASCII; name=gather-append-pushdown-v1.patchDownload
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 012c14b..fe07176 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1588,6 +1588,7 @@ _outPathInfo(StringInfo str, const Path *node)
 	else
 		_outBitmapset(str, NULL);
 	WRITE_BOOL_FIELD(parallel_aware);
+	WRITE_INT_FIELD(parallel_degree);
 	WRITE_FLOAT_FIELD(rows, "%.0f");
 	WRITE_FLOAT_FIELD(startup_cost, "%.2f");
 	WRITE_FLOAT_FIELD(total_cost, "%.2f");
@@ -1764,7 +1765,6 @@ _outGatherPath(StringInfo str, const GatherPath *node)
 	_outPathInfo(str, (const Path *) node);
 
 	WRITE_NODE_FIELD(subpath);
-	WRITE_INT_FIELD(num_workers);
 	WRITE_BOOL_FIELD(single_copy);
 }
 
@@ -1887,6 +1887,7 @@ _outRelOptInfo(StringInfo str, const RelOptInfo *node)
 	WRITE_NODE_FIELD(reltargetlist);
 	WRITE_NODE_FIELD(pathlist);
 	WRITE_NODE_FIELD(ppilist);
+	WRITE_NODE_FIELD(partial_pathlist);
 	WRITE_NODE_FIELD(cheapest_startup_path);
 	WRITE_NODE_FIELD(cheapest_total_path);
 	WRITE_NODE_FIELD(cheapest_unique_path);
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 1fdcae5..fdbe13f 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -72,6 +72,7 @@ static void set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 				 Index rti, RangeTblEntry *rte);
 static void set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel,
 				   RangeTblEntry *rte);
+static void create_parallel_paths(PlannerInfo *root, RelOptInfo *rel);
 static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 						  RangeTblEntry *rte);
 static bool function_rte_parallel_ok(RangeTblEntry *rte);
@@ -107,6 +108,7 @@ static void set_cte_pathlist(PlannerInfo *root, RelOptInfo *rel,
 				 RangeTblEntry *rte);
 static void set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel,
 					   RangeTblEntry *rte);
+static void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
 static RelOptInfo *make_rel_from_joinlist(PlannerInfo *root, List *joinlist);
 static bool subquery_is_pushdown_safe(Query *subquery, Query *topquery,
 						  pushdown_safety_info *safetyInfo);
@@ -612,7 +614,6 @@ static void
 set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 {
 	Relids		required_outer;
-	int			parallel_threshold = 1000;
 
 	/*
 	 * We don't support pushing join clauses into the quals of a seqscan, but
@@ -624,39 +625,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 	/* Consider sequential scan */
 	add_path(rel, create_seqscan_path(root, rel, required_outer, 0));
 
-	/* Consider parallel sequential scan */
-	if (rel->consider_parallel && rel->pages > parallel_threshold &&
-		required_outer == NULL)
-	{
-		Path *path;
-		int parallel_degree = 1;
-
-		/*
-		 * Limit the degree of parallelism logarithmically based on the size
-		 * of the relation.  This probably needs to be a good deal more
-		 * sophisticated, but we need something here for now.
-		 */
-		while (rel->pages > parallel_threshold * 3 &&
-			   parallel_degree < max_parallel_degree)
-		{
-			parallel_degree++;
-			parallel_threshold *= 3;
-			if (parallel_threshold >= PG_INT32_MAX / 3)
-				break;
-		}
-
-		/*
-		 * Ideally we should consider postponing the gather operation until
-		 * much later, after we've pushed joins and so on atop the parallel
-		 * sequential scan path.  But we don't have the infrastructure for
-		 * that yet, so just do this for now.
-		 */
-		path = create_seqscan_path(root, rel, required_outer, parallel_degree);
-		path = (Path *)
-			create_gather_path(root, rel, path, required_outer,
-							   parallel_degree);
-		add_path(rel, path);
-	}
+	/* If appropriate, consider parallel sequential scan */
+	if (rel->consider_parallel && required_outer == NULL)
+		create_parallel_paths(root, rel);
 
 	/* Consider index scans */
 	create_index_paths(root, rel);
@@ -666,6 +637,54 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 }
 
 /*
+ * create_parallel_paths
+ *	  Build parallel access paths for a plain relation
+ */
+static void
+create_parallel_paths(PlannerInfo *root, RelOptInfo *rel)
+{
+	int		parallel_threshold = 1000;
+	int		parallel_degree = 1;
+
+	/*
+	 * If this relation is too small to be worth a parallel scan, just return
+	 * without doing anything ... unless it's an inheritance child.  In that case,
+	 * we want to generate a parallel path here anyway.  It might not be worthwhile
+	 * just for this relation, but when combined with all of its inheritance siblings
+	 * it may well pay off.
+	 */
+	if (rel->pages < parallel_threshold && rel->reloptkind == RELOPT_BASEREL)
+		return;
+
+	/*
+	 * Limit the degree of parallelism logarithmically based on the size of the
+	 * relation.  This probably needs to be a good deal more sophisticated, but we
+	 * need something here for now.
+	 */
+	while (rel->pages > parallel_threshold * 3 &&
+		   parallel_degree < max_parallel_degree)
+	{
+		parallel_degree++;
+		parallel_threshold *= 3;
+		if (parallel_threshold >= PG_INT32_MAX / 3)
+			break;
+	}
+
+	/* Add an unordered partial path based on a parallel sequential scan. */
+	add_partial_path(rel, create_seqscan_path(root, rel, NULL, parallel_degree));
+
+	/*
+	 * If this is a baserel, consider gathering any partial paths we may have
+	 * just created.  If we gathered an inheritance child, we could end up with
+	 * a very large number of gather nodes, each trying to grab its own pool of
+	 * workers, so don't do this in that case.  Instead, we'll consider gathering
+	 * partial paths for the appendrel.
+	 */
+	if (rel->reloptkind == RELOPT_BASEREL)
+		generate_gather_paths(root, rel);
+}
+
+/*
  * set_tablesample_rel_size
  *	  Set size estimates for a sampled relation
  */
@@ -1039,6 +1058,8 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	List	   *live_childrels = NIL;
 	List	   *subpaths = NIL;
 	bool		subpaths_valid = true;
+	List	   *partial_subpaths = NIL;
+	bool		partial_subpaths_valid = true;
 	List	   *all_child_pathkeys = NIL;
 	List	   *all_child_outers = NIL;
 	ListCell   *l;
@@ -1093,6 +1114,13 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 		else
 			subpaths_valid = false;
 
+		/* Same idea, but for a partial plan. */
+		if (childrel->partial_pathlist != NIL)
+			partial_subpaths = accumulate_append_subpath(partial_subpaths,
+										linitial(childrel->partial_pathlist));
+		else
+			partial_subpaths_valid = false;
+
 		/*
 		 * Collect lists of all the available path orderings and
 		 * parameterizations for all the children.  We use these as a
@@ -1164,7 +1192,38 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	 * if we have zero or one live subpath due to constraint exclusion.)
 	 */
 	if (subpaths_valid)
-		add_path(rel, (Path *) create_append_path(rel, subpaths, NULL));
+		add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0));
+
+	/*
+	 * Consider an append of partial unordered, unparameterized partial paths.
+	 */
+	if (partial_subpaths_valid)
+	{
+		AppendPath *appendpath;
+		ListCell *lc;
+		int parallel_degree = 0;
+
+		/*
+		 * Decide what parallel degree to request for this append path.  For
+		 * now, we just use the maximum parallel degree of any member.  It
+		 * might be useful to use a higher number if the Append node were smart
+		 * enough to spread out the workers, but it currently isn't.
+		 */
+		foreach (lc, partial_subpaths)
+		{
+			Path *path = lfirst(lc);
+			parallel_degree = Max(parallel_degree, path->parallel_degree);
+		}
+		Assert(parallel_degree > 0);
+
+		/* Generate a partial append path. */
+		appendpath = create_append_path(rel, partial_subpaths, NULL,
+										parallel_degree);
+		add_partial_path(rel, (Path *) appendpath);
+
+		/* Consider gathering it. */
+		generate_gather_paths(root, rel);
+	}
 
 	/*
 	 * Also build unparameterized MergeAppend paths based on the collected
@@ -1214,7 +1273,7 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 
 		if (subpaths_valid)
 			add_path(rel, (Path *)
-					 create_append_path(rel, subpaths, required_outer));
+					 create_append_path(rel, subpaths, required_outer, 0));
 	}
 }
 
@@ -1440,8 +1499,9 @@ set_dummy_rel_pathlist(RelOptInfo *rel)
 
 	/* Discard any pre-existing paths; no further need for them */
 	rel->pathlist = NIL;
+	rel->partial_pathlist = NIL;
 
-	add_path(rel, (Path *) create_append_path(rel, NIL, NULL));
+	add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0));
 
 	/*
 	 * We set the cheapest path immediately, to ensure that IS_DUMMY_REL()
@@ -1844,6 +1904,35 @@ set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 }
 
 /*
+ * generate_gather_paths
+ *		Generate parallel access paths for a relation by pushing a Gather on
+ *		top of a partial path.
+ */
+static void
+generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
+{
+	Path   *cheapest_partial_path;
+	Path   *simple_gather_path;
+
+	/* If there are no partial paths, there's nothing to do here. */
+	if (rel->partial_pathlist == NIL)
+		return;
+
+	/*
+	 * The output of Gather is currently always unsorted, so there's only one
+	 * partial path of interest: the cheapest one.
+	 *
+	 * Eventually, we should have a Gather Merge operation that can merge multiple
+	 * tuple streams together while preserving their ordering.  We could usefully
+	 * generate such a path from each partial path that has non-NIL pathkeys.
+	 */
+	cheapest_partial_path = linitial(rel->partial_pathlist);
+	simple_gather_path = (Path *)
+		create_gather_path(root, rel, cheapest_partial_path, NULL);
+	add_path(rel, simple_gather_path);
+}
+
+/*
  * make_rel_from_joinlist
  *	  Build access paths using a "joinlist" to guide the join path search.
  *
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 990486c..9d65be9 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -186,8 +186,7 @@ clamp_row_est(double nrows)
  */
 void
 cost_seqscan(Path *path, PlannerInfo *root,
-			 RelOptInfo *baserel, ParamPathInfo *param_info,
-			 int nworkers)
+			 RelOptInfo *baserel, ParamPathInfo *param_info)
 {
 	Cost		startup_cost = 0;
 	Cost		run_cost = 0;
@@ -232,8 +231,8 @@ cost_seqscan(Path *path, PlannerInfo *root,
 	 * This is almost certainly not exactly the right way to model this, so
 	 * this will probably need to be changed at some point...
 	 */
-	if (nworkers > 0)
-		run_cost = run_cost / (nworkers + 0.5);
+	if (path->parallel_degree > 0)
+		run_cost = run_cost / (path->parallel_degree + 0.5);
 
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c
index b2cc9f0..9b2b0b4 100644
--- a/src/backend/optimizer/path/joinrels.c
+++ b/src/backend/optimizer/path/joinrels.c
@@ -1069,9 +1069,10 @@ mark_dummy_rel(RelOptInfo *rel)
 
 	/* Evict any previously chosen paths */
 	rel->pathlist = NIL;
+	rel->partial_pathlist = NIL;
 
 	/* Set up the dummy path */
-	add_path(rel, (Path *) create_append_path(rel, NIL, NULL));
+	add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0));
 
 	/* Set or update cheapest_total_path and related fields */
 	set_cheapest(rel);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 411b36c..95d95f1 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1125,7 +1125,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
 
 	gather_plan = make_gather(subplan->targetlist,
 							  NIL,
-							  best_path->num_workers,
+							  best_path->path.parallel_degree,
 							  best_path->single_copy,
 							  subplan);
 
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 09c3244..166a41b 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -685,6 +685,151 @@ add_path_precheck(RelOptInfo *parent_rel,
 	return true;
 }
 
+/*
+ * add_partial_path
+ *	  Like add_path, our goal here is to consider whether a path is worthy
+ *	  of being kept around, but the considerations here are a bit different.
+ *	  A partial path is one which can be executed in any number of workers in
+ *	  parallel such that each worker will generate a subset of the path's
+ *	  overall result.
+ *
+ *	  We don't generate parameterized partial paths because they seem unlikely
+ *	  ever to be worthwhile.  The only way we could ever use such a path is
+ *	  by executing a nested loop with a complete path on the outer side - thus,
+ *	  each worker would scan the entire outer relation - and the partial path
+ *	  on the inner side - thus, each worker would scan only part of the inner
+ *	  relation.  This is silly: a parameterized path is generally going to be
+ *	  based on an index scan, and we can't generate a partial path for that.
+ *	  And it's generally only going to produce a few rows, so splitting them
+ *	  up between workers doesn't really make sense.  It would be better to
+ *	  use an unparameterized partial path on the outer side of the join with
+ *	  a parameterized complete path on the inner side in virtually every case.
+ *
+ *	  Because we don't need to consider parameterized paths here, we also don't
+ *	  need to consider the row counts as a measure of quality: every path will
+ *	  produce the same number of rows.  Neither do we need to consider startup
+ *	  costs: parallelism is only used for plans that will be run to completion.
+ *    Therefore, this routine is much simpler than add_path: it needs to
+ *    consider only pathkeys and total cost.
+ */
+void
+add_partial_path(RelOptInfo *parent_rel, Path *new_path)
+{
+	bool		accept_new = true;		/* unless we find a superior old path */
+	ListCell   *insert_after = NULL;	/* where to insert new item */
+	ListCell   *p1;
+	ListCell   *p1_prev;
+	ListCell   *p1_next;
+
+	/* Check for query cancel. */
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * As in add_path, throw out any paths which are dominated by the new path,
+	 * but throw out the new path if some existing path dominates it.
+	 */
+	p1_prev = NULL;
+	for (p1 = list_head(parent_rel->partial_pathlist); p1 != NULL;
+		 p1 = p1_next)
+	{
+		Path	   *old_path = (Path *) lfirst(p1);
+		bool		remove_old = false; /* unless new proves superior */
+		PathKeysComparison keyscmp;
+
+		p1_next = lnext(p1);
+
+		/* Compare pathkeys. */
+		keyscmp = compare_pathkeys(new_path->pathkeys, old_path->pathkeys);
+
+		/* Unless pathkeys are incompable, keep just one of the two paths. */
+		if (keyscmp != PATHKEYS_DIFFERENT)
+		{
+			if (new_path->total_cost > old_path->total_cost * STD_FUZZ_FACTOR)
+			{
+				/* New path costs more; keep it only if pathkeys are better. */
+				if (keyscmp != PATHKEYS_BETTER1)
+					accept_new = false;
+			}
+			else if (old_path->total_cost > new_path->total_cost
+						* STD_FUZZ_FACTOR)
+			{
+				/* Old path costs more; keep it only if pathkeys are better. */
+				if (keyscmp != PATHKEYS_BETTER2)
+					remove_old = true;
+			}
+			else if (keyscmp == PATHKEYS_BETTER1)
+			{
+				/* Costs are about the same, new path has better pathkeys. */
+				remove_old = true;
+			}
+			else if (keyscmp == PATHKEYS_BETTER2)
+			{
+				/* Costs are about the same, old path has better pathkeys. */
+				accept_new = false;
+			}
+			else if (old_path->total_cost > new_path->total_cost * 1.0000000001)
+			{
+				/* Pathkeys are the same, and the old path costs more. */
+				remove_old = true;
+			}
+			else
+			{
+				/*
+				 * Pathkeys are the same, and new path isn't materially
+				 * cheaper.
+				 */
+				accept_new = false;
+			}
+		}
+
+		/*
+		 * Remove current element from partial_pathlist if dominated by new.
+		 */
+		if (remove_old)
+		{
+			parent_rel->partial_pathlist =
+				list_delete_cell(parent_rel->partial_pathlist, p1, p1_prev);
+			/* add_path has a special case for IndexPath; we don't need it */
+			Assert(!IsA(old_path, IndexPath));
+			pfree(old_path);
+			/* p1_prev does not advance */
+		}
+		else
+		{
+			/* new belongs after this old path if it has cost >= old's */
+			if (new_path->total_cost >= old_path->total_cost)
+				insert_after = p1;
+			/* p1_prev advances */
+			p1_prev = p1;
+		}
+
+		/*
+		 * If we found an old path that dominates new_path, we can quit
+		 * scanning the partial_pathlist; we will not add new_path, and we
+		 * assume new_path cannot dominate any later path.
+		 */
+		if (!accept_new)
+			break;
+	}
+
+	if (accept_new)
+	{
+		/* Accept the new path: insert it at proper place */
+		if (insert_after)
+			lappend_cell(parent_rel->partial_pathlist, insert_after, new_path);
+		else
+			parent_rel->partial_pathlist =
+				lcons(new_path, parent_rel->partial_pathlist);
+	}
+	else
+	{
+		/* add_path has a special case for IndexPath; we don't need it */
+		Assert(!IsA(new_path, IndexPath));
+		/* Reject and recycle the new path */
+		pfree(new_path);
+	}
+}
+
 
 /*****************************************************************************
  *		PATH NODE CREATION ROUTINES
@@ -697,7 +842,7 @@ add_path_precheck(RelOptInfo *parent_rel,
  */
 Path *
 create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
-					Relids required_outer, int nworkers)
+					Relids required_outer, int parallel_degree)
 {
 	Path	   *pathnode = makeNode(Path);
 
@@ -705,10 +850,11 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->parent = rel;
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
-	pathnode->parallel_aware = nworkers > 0 ? true : false;
+	pathnode->parallel_aware = parallel_degree > 0 ? true : false;
+	pathnode->parallel_degree = parallel_degree;
 	pathnode->pathkeys = NIL;	/* seqscan has unordered result */
 
-	cost_seqscan(pathnode, root, rel, pathnode->param_info, nworkers);
+	cost_seqscan(pathnode, root, rel, pathnode->param_info);
 
 	return pathnode;
 }
@@ -727,6 +873,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* samplescan has unordered result */
 
 	cost_samplescan(pathnode, root, rel, pathnode->param_info);
@@ -781,6 +928,7 @@ create_index_path(PlannerInfo *root,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = pathkeys;
 
 	/* Convert clauses to indexquals the executor can handle */
@@ -827,6 +975,7 @@ create_bitmap_heap_path(PlannerInfo *root,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->bitmapqual = bitmapqual;
@@ -853,6 +1002,7 @@ create_bitmap_and_path(PlannerInfo *root,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = NULL;	/* not used in bitmap trees */
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->bitmapquals = bitmapquals;
@@ -878,6 +1028,7 @@ create_bitmap_or_path(PlannerInfo *root,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = NULL;	/* not used in bitmap trees */
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->bitmapquals = bitmapquals;
@@ -903,6 +1054,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->tidquals = tidquals;
@@ -921,7 +1073,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
  * Note that we must handle subpaths = NIL, representing a dummy access path.
  */
 AppendPath *
-create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
+create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
+				   int parallel_degree)
 {
 	AppendPath *pathnode = makeNode(AppendPath);
 	ListCell   *l;
@@ -931,6 +1084,7 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
 	pathnode->path.param_info = get_appendrel_parampathinfo(rel,
 															required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = parallel_degree;
 	pathnode->path.pathkeys = NIL;		/* result is always considered
 										 * unsorted */
 	pathnode->subpaths = subpaths;
@@ -985,6 +1139,7 @@ create_merge_append_path(PlannerInfo *root,
 	pathnode->path.param_info = get_appendrel_parampathinfo(rel,
 															required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->subpaths = subpaths;
 
@@ -1060,6 +1215,7 @@ create_result_path(List *quals)
 	pathnode->path.parent = NULL;
 	pathnode->path.param_info = NULL;	/* there are no other rels... */
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;
 	pathnode->quals = quals;
 
@@ -1094,6 +1250,7 @@ create_material_path(RelOptInfo *rel, Path *subpath)
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = subpath->param_info;
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = subpath->pathkeys;
 
 	pathnode->subpath = subpath;
@@ -1155,6 +1312,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = subpath->param_info;
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 
 	/*
 	 * Assume the output is unsorted, since we don't necessarily have pathkeys
@@ -1328,7 +1486,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
  */
 GatherPath *
 create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
-				   Relids required_outer, int nworkers)
+				   Relids required_outer)
 {
 	GatherPath *pathnode = makeNode(GatherPath);
 
@@ -1336,11 +1494,18 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
-	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = subpath->parallel_degree;
 	pathnode->path.pathkeys = NIL;		/* Gather has unordered result */
 
 	pathnode->subpath = subpath;
-	pathnode->num_workers = nworkers;
+	pathnode->single_copy = false;
+
+	if (pathnode->path.parallel_degree == 0)
+	{
+		pathnode->path.parallel_degree = 1;
+		pathnode->path.pathkeys = subpath->pathkeys;
+		pathnode->single_copy = true;
+	}
 
 	cost_gather(pathnode, root, rel, pathnode->path.param_info);
 
@@ -1393,6 +1558,7 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = pathkeys;
 
 	cost_subqueryscan(pathnode, root, rel, pathnode->param_info);
@@ -1416,6 +1582,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = pathkeys;
 
 	cost_functionscan(pathnode, root, rel, pathnode->param_info);
@@ -1439,6 +1606,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* result is always unordered */
 
 	cost_valuesscan(pathnode, root, rel, pathnode->param_info);
@@ -1461,6 +1629,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* XXX for now, result is always unordered */
 
 	cost_ctescan(pathnode, root, rel, pathnode->param_info);
@@ -1484,6 +1653,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* result is always unordered */
 
 	/* Cost is the same as for a regular CTE scan */
@@ -1516,6 +1686,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.rows = rows;
 	pathnode->path.startup_cost = startup_cost;
 	pathnode->path.total_cost = total_cost;
@@ -1651,6 +1822,7 @@ create_nestloop_path(PlannerInfo *root,
 								  required_outer,
 								  &restrict_clauses);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->jointype = jointype;
 	pathnode->outerjoinpath = outer_path;
@@ -1709,6 +1881,7 @@ create_mergejoin_path(PlannerInfo *root,
 								  required_outer,
 								  &restrict_clauses);
 	pathnode->jpath.path.parallel_aware = false;
+	pathnode->jpath.path.parallel_degree = 0;
 	pathnode->jpath.path.pathkeys = pathkeys;
 	pathnode->jpath.jointype = jointype;
 	pathnode->jpath.outerjoinpath = outer_path;
@@ -1766,6 +1939,7 @@ create_hashjoin_path(PlannerInfo *root,
 								  required_outer,
 								  &restrict_clauses);
 	pathnode->jpath.path.parallel_aware = false;
+	pathnode->jpath.path.parallel_degree = 0;
 
 	/*
 	 * A hashjoin never has pathkeys, since its output ordering is
diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c
index 996b7fe..8d7ac48 100644
--- a/src/backend/optimizer/util/relnode.c
+++ b/src/backend/optimizer/util/relnode.c
@@ -107,6 +107,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptKind reloptkind)
 	rel->reltargetlist = NIL;
 	rel->pathlist = NIL;
 	rel->ppilist = NIL;
+	rel->partial_pathlist = NIL;
 	rel->cheapest_startup_path = NULL;
 	rel->cheapest_total_path = NULL;
 	rel->cheapest_unique_path = NULL;
@@ -369,6 +370,7 @@ build_join_rel(PlannerInfo *root,
 	joinrel->reltargetlist = NIL;
 	joinrel->pathlist = NIL;
 	joinrel->ppilist = NIL;
+	joinrel->partial_pathlist = NIL;
 	joinrel->cheapest_startup_path = NULL;
 	joinrel->cheapest_total_path = NULL;
 	joinrel->cheapest_unique_path = NULL;
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 9a0dd28..bdf4c53 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -458,6 +458,7 @@ typedef struct RelOptInfo
 	List	   *reltargetlist;	/* Vars to be output by scan of relation */
 	List	   *pathlist;		/* Path structures */
 	List	   *ppilist;		/* ParamPathInfos used in pathlist */
+	List	   *partial_pathlist;	/* partial Paths */
 	struct Path *cheapest_startup_path;
 	struct Path *cheapest_total_path;
 	struct Path *cheapest_unique_path;
@@ -755,6 +756,7 @@ typedef struct Path
 	RelOptInfo *parent;			/* the relation this path can build */
 	ParamPathInfo *param_info;	/* parameterization info, or NULL if none */
 	bool		parallel_aware; /* engage parallel-aware logic? */
+	int			parallel_degree; /* desired parallel degree; 0 = not parallel */
 
 	/* estimated size/costs for path (see costsize.c for more info) */
 	double		rows;			/* estimated number of result tuples */
@@ -1057,7 +1059,6 @@ typedef struct GatherPath
 {
 	Path		path;
 	Path	   *subpath;		/* path for each worker */
-	int			num_workers;	/* number of workers sought to help */
 	bool		single_copy;	/* path must not be executed >1x */
 } GatherPath;
 
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac21a3a..25a7303 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -72,7 +72,7 @@ extern double clamp_row_est(double nrows);
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
 					double index_pages, PlannerInfo *root);
 extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
-			 ParamPathInfo *param_info, int nworkers);
+			 ParamPathInfo *param_info);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 				ParamPathInfo *param_info);
 extern void cost_index(IndexPath *path, PlannerInfo *root,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index f28b4e2..38d4859 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -29,9 +29,10 @@ extern void add_path(RelOptInfo *parent_rel, Path *new_path);
 extern bool add_path_precheck(RelOptInfo *parent_rel,
 				  Cost startup_cost, Cost total_cost,
 				  List *pathkeys, Relids required_outer);
+extern void add_partial_path(RelOptInfo *parent_rel, Path *new_path);
 
 extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
-					Relids required_outer, int nworkers);
+					Relids required_outer, int parallel_degree);
 extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel,
 					   Relids required_outer);
 extern IndexPath *create_index_path(PlannerInfo *root,
@@ -59,7 +60,7 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root,
 extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
 					List *tidquals, Relids required_outer);
 extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths,
-				   Relids required_outer);
+				   Relids required_outer, int parallel_degree);
 extern MergeAppendPath *create_merge_append_path(PlannerInfo *root,
 						 RelOptInfo *rel,
 						 List *subpaths,
@@ -70,8 +71,7 @@ extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath);
 extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
 				   Path *subpath, SpecialJoinInfo *sjinfo);
 extern GatherPath *create_gather_path(PlannerInfo *root,
-				   RelOptInfo *rel, Path *subpath, Relids required_outer,
-				   int nworkers);
+				   RelOptInfo *rel, Path *subpath, Relids required_outer);
 extern Path *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
 						 List *pathkeys, Relids required_outer);
 extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
#33Kouhei Kaigai
kaigai@ak.jp.nec.com
In reply to: Robert Haas (#32)
Re: [DESIGN] ParallelAppend

On Thu, Nov 12, 2015 at 12:09 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

I've been thinking about these questions for a bit now, and I think we
can work on improving Append in multiple phases. The attached patch
shows what I have in mind for phase 1. Currently, if you set up an
inheritance hierarchy, you might get an Append some of whose children
are Gather nodes with Parallel Seq Scans under them, like this:

Append
-> Gather
-> Parallel Seq Scan
-> Gather
-> Parallel Seq Scan
-> Seq Scan

This is a crappy plan. Each Gather node will try to launch its own
bunch of workers, which sucks. The attached patch lets you get this
plan instead:

Gather
-> Append
-> Partial Seq Scan
-> Partial Seq Scan
-> Partial Seq Scan

That's a much better plan.

To make this work, this plan introduces a couple of new concepts.
Each RelOptInfo gets a new partial_pathlist field, which stores paths
that need to be gathered to produce a workable plan. Once we've
populated the partial_pathlist with whatever partial paths we know how
to generate, we can either push a Gather node on top of one of those
partial paths to create a real path; or we can use those partial paths
to build more partial paths. The current patch handles only the
simplest case: we can build a partial path for an appendrel by
appending a partial path for each member rel. But eventually I hope
to handle joinrels this way as well: you can join a partial path with
an ordinary path for form a partial path for the joinrel.

This idea will solve my concern gracefully.
The new partial_pathlist keeps candidate of path-nodes to be gathered
in this level or upper. Unlike path-nodes in the pathlist already, we
don't need to rip off GatherPath later.

Can we expect any path-nodes in the partial_pathlist don't contain
underlying GatherPath even if and when we would apply this design on
joinrel also?

If we would be able to ensure the path-nodes in partial_pathlist is
safe to run under the Gather node - it never contains Gather itself
or any others should not perform on the background worker context,
it will make path consideration much simpler than my expectation.

This requires some way of figuring out how many workers to request for
the append-path, so this patch also adds a parallel_degree field to
the path object, which is 0 for non-parallel things and the number of
workers that the path believes to be ideal otherwise. Right now, it
just percolates the highest worker count of any child up to the
appendrel, which might not be right, especially once the append node
knows how to balance workers, but it seems like a reasonable place to
start.

I agree with. The new parallel_degree will give useful information,
and one worker per child relation is a good start.

Type-A) It can be performed on background worker context and
picked up by multiple worker processes concurrently.
(e.g: Parallel SeqScan)
Type-B) It can be performed on background worker context but
cannot be picked up by multiple worker processes.
(e.g: non-parallel aware node)
Type-C) It should not be performed on background worker context.
(e.g: plan/path node with volatile functions)

At the time that we're forming an AppendPath, we can identify what
you're calling type-A paths very easily: they're the ones that are
coming from the partial_pathlist. We can distinguish between type-B
and type-C paths coming from the childrel's pathlist based on the
childrel's consider_parallel flag: if it's false, it's type-C, else
type-B. At some point, we might need a per-path flag to distinguish
cases where a particular path is type-C even though some other plan
for that relation might be type-B. But right now that case doesn't
arise.

I also think we eventually have to have a per-path flag when we support
parallel capability on joinrel, although, it is not an immediate action.

Now, of course, it's not good enough to have this information
available when we're generating the AppendPath; it has to survive
until execution time. But that doesn't mean the paths need to be
self-identifying. We could, of course, decide to make them so, and
maybe that's the best design, but I'm thinking about another approach:
suppose the append node itself, instead of having one list of child
plans, has a list of type-A plans, a list of type-B plans, and a list
of type-C plans. This actually seems quite convenient, because at
execution time, you presumably want the leader to prioritize type-C
plans over any of the others, since it's the only one that can execute
them, and the workers to prioritize type-B plans, since they can only
take one worker each and are thus prone to be the limiting factor on
finishing the whole Append. Having the plans divided in advance
between these three lists (say, restricted_plans, safe_plans,
parallel_plans) makes that easy to implement.

I'd like to agree with this idea. If Append can handle restricted_plans
concurrently with safe_plans and parallel_plans, we don't need to give
up parallelism even if any of child relation has neither safe- nor
parallel-plans.
One thing we need to pay attention is, we have to inform Gather node
to kick local sub-plans if underlying Append node has any restricted
plans. It also needs to distinguish the case when Gather node cannot
launch any background workers, because the first case runs only type-C
but the second case has to run all the sub-plans in local context.

Incidentally, I think it's subtly wrong to think of the parallel_aware
flag as telling you whether the plan can absorb multiple workers.
That's not really what it's for. It's to tell you whether the plan is
doing *something* parallel aware - that is, whether its Estimate,
InitializeDSM, and InitializeWorker callbacks should do anything. For
SeqScan, flipping parallel_aware actually does split the input among
all the workers, but for Append it's probably just load balances and
for other nodes it might be something else again. The term I'm using
to indicate a path/plan that returns only a subset of the results in
each worker is "partial".

Therefore, a NestLoop that takes underlying ParallelSeqScan and IndexScan
may not be parallel aware by itself, however, it is exactly partial.
This NestLoop will has parallel_degree likely larger than "1", won't it?

It seems to me the "partial" is more clear concept to introduce how sub-
plan will perform.

Whether or not a path is partial is, in the
design embodied in this patch, indicated both by whether
path->parallel_degree > 0 and whether the path is in rel->pathlist or
rel->partial_pathlist.

We should have Assert to detect paths with parallel_degree==0 but in
the rel->partial_pathlist or parallel_degree > 1 but not appear in
the rel->partial_pathlist?

Thanks,
--
NEC Business Creation Division / PG-Strom Project
KaiGai Kohei <kaigai@ak.jp.nec.com>

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#34Robert Haas
robertmhaas@gmail.com
In reply to: Kouhei Kaigai (#33)
Re: [DESIGN] ParallelAppend

On Mon, Nov 16, 2015 at 10:10 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

This idea will solve my concern gracefully.
The new partial_pathlist keeps candidate of path-nodes to be gathered
in this level or upper. Unlike path-nodes in the pathlist already, we
don't need to rip off GatherPath later.

Cool, yes.

Can we expect any path-nodes in the partial_pathlist don't contain
underlying GatherPath even if and when we would apply this design on
joinrel also?

Yes. A path that's already been gathered is not partial any more.
However, to create a partial path for a joinrel, we must join a
partial path to a complete path. The complete path mustn't be one
which internally contains a Gather. This is where we need another
per-path flag, I think.

I'd like to agree with this idea. If Append can handle restricted_plans
concurrently with safe_plans and parallel_plans, we don't need to give
up parallelism even if any of child relation has neither safe- nor
parallel-plans.

Right.

One thing we need to pay attention is, we have to inform Gather node
to kick local sub-plans if underlying Append node has any restricted
plans. It also needs to distinguish the case when Gather node cannot
launch any background workers, because the first case runs only type-C
but the second case has to run all the sub-plans in local context.

I don't think that Gather needs to know anything about what's under
the Append. What I think we want is that when we execute the Append:

(1) If we're the leader or not in parallel mode, run restricted plans,
then parallel plans, then safe plans.
(2) If we're a worker, run safe plans, then parallel plans.
(3) Either way, never run a safe plan if the leader or some other
worker has already begun to execute it.

The reason to have the leader prefer parallel plans to safe plans is
that it is more likely to become a bottleneck than the workers. Thus
it should prefer to do work which can be split up rather than claiming
a whole plan for itself. But in the case of restricted plans it has
no choice, since no one else can execute those, and it should do them
first, since they may be the limiting factor in finishing the whole
plan.

Incidentally, I think it's subtly wrong to think of the parallel_aware
flag as telling you whether the plan can absorb multiple workers.
That's not really what it's for. It's to tell you whether the plan is
doing *something* parallel aware - that is, whether its Estimate,
InitializeDSM, and InitializeWorker callbacks should do anything. For
SeqScan, flipping parallel_aware actually does split the input among
all the workers, but for Append it's probably just load balances and
for other nodes it might be something else again. The term I'm using
to indicate a path/plan that returns only a subset of the results in
each worker is "partial".

Therefore, a NestLoop that takes underlying ParallelSeqScan and IndexScan
may not be parallel aware by itself, however, it is exactly partial.

Right.

This NestLoop will has parallel_degree likely larger than "1", won't it?

Larger than 0.

It seems to me the "partial" is more clear concept to introduce how sub-
plan will perform.

Good.

Whether or not a path is partial is, in the
design embodied in this patch, indicated both by whether
path->parallel_degree > 0 and whether the path is in rel->pathlist or
rel->partial_pathlist.

We should have Assert to detect paths with parallel_degree==0 but in
the rel->partial_pathlist or parallel_degree > 1 but not appear in
the rel->partial_pathlist?

parallel_degree==0 in the partial_pathlist is bad, but
parallel_degree>0 in the regular pathlist is OK, at least if it's a
Gather node.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#35Thom Brown
thom@linux.com
In reply to: Robert Haas (#32)
Re: [DESIGN] ParallelAppend

On 13 November 2015 at 22:09, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Nov 12, 2015 at 12:09 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com> wrote:

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

I've been thinking about these questions for a bit now, and I think we
can work on improving Append in multiple phases. The attached patch
shows what I have in mind for phase 1. Currently, if you set up an
inheritance hierarchy, you might get an Append some of whose children
are Gather nodes with Parallel Seq Scans under them, like this:

Append
-> Gather
-> Parallel Seq Scan
-> Gather
-> Parallel Seq Scan
-> Seq Scan

This is a crappy plan. Each Gather node will try to launch its own
bunch of workers, which sucks. The attached patch lets you get this
plan instead:

Gather
-> Append
-> Partial Seq Scan
-> Partial Seq Scan
-> Partial Seq Scan

That's a much better plan.

To make this work, this plan introduces a couple of new concepts.
Each RelOptInfo gets a new partial_pathlist field, which stores paths
that need to be gathered to produce a workable plan. Once we've
populated the partial_pathlist with whatever partial paths we know how
to generate, we can either push a Gather node on top of one of those
partial paths to create a real path; or we can use those partial paths
to build more partial paths. The current patch handles only the
simplest case: we can build a partial path for an appendrel by
appending a partial path for each member rel. But eventually I hope
to handle joinrels this way as well: you can join a partial path with
an ordinary path for form a partial path for the joinrel.

This requires some way of figuring out how many workers to request for
the append-path, so this patch also adds a parallel_degree field to
the path object, which is 0 for non-parallel things and the number of
workers that the path believes to be ideal otherwise. Right now, it
just percolates the highest worker count of any child up to the
appendrel, which might not be right, especially once the append node
knows how to balance workers, but it seems like a reasonable place to
start.

Type-A) It can be performed on background worker context and
picked up by multiple worker processes concurrently.
(e.g: Parallel SeqScan)
Type-B) It can be performed on background worker context but
cannot be picked up by multiple worker processes.
(e.g: non-parallel aware node)
Type-C) It should not be performed on background worker context.
(e.g: plan/path node with volatile functions)

At the time that we're forming an AppendPath, we can identify what
you're calling type-A paths very easily: they're the ones that are
coming from the partial_pathlist. We can distinguish between type-B
and type-C paths coming from the childrel's pathlist based on the
childrel's consider_parallel flag: if it's false, it's type-C, else
type-B. At some point, we might need a per-path flag to distinguish
cases where a particular path is type-C even though some other plan
for that relation might be type-B. But right now that case doesn't
arise.

Now, of course, it's not good enough to have this information
available when we're generating the AppendPath; it has to survive
until execution time. But that doesn't mean the paths need to be
self-identifying. We could, of course, decide to make them so, and
maybe that's the best design, but I'm thinking about another approach:
suppose the append node itself, instead of having one list of child
plans, has a list of type-A plans, a list of type-B plans, and a list
of type-C plans. This actually seems quite convenient, because at
execution time, you presumably want the leader to prioritize type-C
plans over any of the others, since it's the only one that can execute
them, and the workers to prioritize type-B plans, since they can only
take one worker each and are thus prone to be the limiting factor on
finishing the whole Append. Having the plans divided in advance
between these three lists (say, restricted_plans, safe_plans,
parallel_plans) makes that easy to implement.

Incidentally, I think it's subtly wrong to think of the parallel_aware
flag as telling you whether the plan can absorb multiple workers.
That's not really what it's for. It's to tell you whether the plan is
doing *something* parallel aware - that is, whether its Estimate,
InitializeDSM, and InitializeWorker callbacks should do anything. For
SeqScan, flipping parallel_aware actually does split the input among
all the workers, but for Append it's probably just load balances and
for other nodes it might be something else again. The term I'm using
to indicate a path/plan that returns only a subset of the results in
each worker is "partial". Whether or not a path is partial is, in the
design embodied in this patch, indicated both by whether
path->parallel_degree > 0 and whether the path is in rel->pathlist or
rel->partial_pathlist.

Okay, I've tried this patch. I created a database with
pgbench_accounts -s 300, and partitioned the pgbench_accounts table
into 300 different children based on "bid".

# explain analyse select count(*) from pgbench_accounts;

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=634889.14..634889.15 rows=1 width=0) (actual
time=14868.918..14868.918 rows=1 loops=1)
-> Gather (cost=1000.00..559784.13 rows=30042001 width=0) (actual
time=7.015..12319.699 rows=30000000 loops=1)
Number of Workers: 2
-> Append (cost=0.00..528742.13 rows=30042001 width=0)
(actual time=0.019..24531.096 rows=59094295 loops=1)
-> Parallel Seq Scan on pgbench_accounts
(cost=0.00..0.00 rows=1 width=0) (actual time=0.001..0.006 rows=0
loops=1)
-> Parallel Seq Scan on pgbench_accounts_1
(cost=0.00..1711.60 rows=100000 width=0) (actual time=0.017..44.586
rows=170314 loops=1)
-> Parallel Seq Scan on pgbench_accounts_2
(cost=0.00..1711.60 rows=100000 width=0) (actual time=0.438..49.974
rows=198923 loops=1)
-> Parallel Seq Scan on pgbench_accounts_3
(cost=0.00..1711.60 rows=100000 width=0) (actual time=0.350..42.909
rows=198496 loops=1)
-> Parallel Seq Scan on pgbench_accounts_4
(cost=0.00..1711.60 rows=100000 width=0) (actual time=0.656..37.556
rows=198780 loops=1)
-> Parallel Seq Scan on pgbench_accounts_5
(cost=0.00..1711.60 rows=100000 width=0) (actual time=4.510..90.154
rows=193799 loops=1)
-> Parallel Seq Scan on pgbench_accounts_6
(cost=0.00..1711.60 rows=100000 width=0) (actual time=4.326..76.018
rows=192680 loops=1)

--snip--

Yes, it's working!

However, the first parallel seq scan shows it getting 170314 rows.
Another run shows it getting 194165 rows. The final result is
correct, but as you can see from the rows on the Append node (59094295
rows), it doesn't match the number of rows on the Gather node
(30000000).

And also, for some reason, I can no longer get this using more than 2
workers, even with max_worker_processes = 16 and max_parallel_degree =
12. I don't know if that's anything to do with this patch though.

Thom

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#36Robert Haas
robertmhaas@gmail.com
In reply to: Thom Brown (#35)
Re: [DESIGN] ParallelAppend

On Tue, Nov 17, 2015 at 4:26 AM, Thom Brown <thom@linux.com> wrote:

Okay, I've tried this patch.

Thanks!

Yes, it's working!

Woohoo.

However, the first parallel seq scan shows it getting 170314 rows.
Another run shows it getting 194165 rows. The final result is
correct, but as you can see from the rows on the Append node (59094295
rows), it doesn't match the number of rows on the Gather node
(30000000).

Is this the same issue reported in
/messages/by-id/CAFj8pRBF-i=qDg9b5nZrXYfChzBEZWmthxYPhidQvwoMOjHtzg@mail.gmail.com
and not yet fixed? I am inclined to think it probably is.

And also, for some reason, I can no longer get this using more than 2
workers, even with max_worker_processes = 16 and max_parallel_degree =
12. I don't know if that's anything to do with this patch though.

The number of workers is limited based on the size of the largest
table involved in the Append. That probably needs considerable
improvement, of course, but this patch is still a step forward over
not-this-patch.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#37Thom Brown
thom@linux.com
In reply to: Robert Haas (#36)
Re: [DESIGN] ParallelAppend

On 17 November 2015 at 20:08, Robert Haas <robertmhaas@gmail.com> wrote:

On Tue, Nov 17, 2015 at 4:26 AM, Thom Brown <thom@linux.com> wrote:

However, the first parallel seq scan shows it getting 170314 rows.
Another run shows it getting 194165 rows. The final result is
correct, but as you can see from the rows on the Append node (59094295
rows), it doesn't match the number of rows on the Gather node
(30000000).

Is this the same issue reported in
/messages/by-id/CAFj8pRBF-i=qDg9b5nZrXYfChzBEZWmthxYPhidQvwoMOjHtzg@mail.gmail.com
and not yet fixed? I am inclined to think it probably is.

Yes, that seems to be the same issue.

And also, for some reason, I can no longer get this using more than 2
workers, even with max_worker_processes = 16 and max_parallel_degree =
12. I don't know if that's anything to do with this patch though.

The number of workers is limited based on the size of the largest
table involved in the Append. That probably needs considerable
improvement, of course, but this patch is still a step forward over
not-this-patch.

Ah, okay. I wasn't aware of this. I'll bear that in mind in future.

Thom

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#38Amit Kapila
amit.kapila16@gmail.com
In reply to: Robert Haas (#32)
Re: [DESIGN] ParallelAppend

On Sat, Nov 14, 2015 at 3:39 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Nov 12, 2015 at 12:09 AM, Kouhei Kaigai <kaigai@ak.jp.nec.com>

wrote:

I'm now designing the parallel feature of Append...

Here is one challenge. How do we determine whether each sub-plan
allows execution in the background worker context?

I've been thinking about these questions for a bit now, and I think we
can work on improving Append in multiple phases. The attached patch
shows what I have in mind for phase 1.

Couple of comments and questions regarding this patch:

1.
+/*
+ * add_partial_path
..
+ *  produce the same number of rows.  Neither do we need to consider
startup
+ *  costs: parallelism
is only used for plans that will be run to completion.

A.
Don't we need the startup cost incase we need to build partial paths for
joinpaths like mergepath?
Also, I think there are other cases for single relation scan where startup
cost can matter like when there are psuedoconstants in qualification
(refer cost_qual_eval_walker()) or let us say if someone has disabled
seq scan (disable_cost is considered as startup cost.)

B. I think partial path is an important concept and desrves some
explanation in src/backend/optimizer/README.
There is already a good explanation about Paths, so I think it
seems that it is better to add explanation about partial paths.

2.
+ *  costs: parallelism is only used for plans that will be run to
completion.
+ *    Therefore, this
routine is much simpler than add_path: it needs to
+ *    consider only pathkeys and total cost.

There seems to be some spacing issue in last two lines.

3.
+static void
+create_parallel_paths(PlannerInfo *root, RelOptInfo *rel)
+{
+ int parallel_threshold = 1000;
+ int parallel_degree = 1;
+
+ /*
+ * If this relation is too small to be worth a parallel scan, just return
+ * without doing anything ... unless it's an inheritance child.  In that
case,
+ * we want to generate a parallel path here anyway.  It might not be
worthwhile
+ * just for this relation, but when combined with all of its inheritance
siblings
+ * it may well pay off.
+ */
+ if (rel->pages < parallel_threshold && rel->reloptkind == RELOPT_BASEREL)
+ return;

A.
This means that for inheritance child relations for which rel pages are
less than parallel_threshold, it will always consider the cost shared
between 1 worker and leader as per below calc in cost_seqscan:
if (path->parallel_degree > 0)
run_cost = run_cost / (path->parallel_degree + 0.5);

I think this might not be the appropriate cost model for even for
non-inheritence relations which has pages more than parallel_threshold,
but it seems to be even worst for inheritance children which have
pages less than parallel_threshold

B.
Will it be possible that if none of the inheritence child rels (or very few
of them) are big enough for parallel scan, then considering Append
node for parallelism of any use or in other words, won't it be better
to generate plan as it is done now without this patch for such cases
considering current execution model of Gather node?

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#39Robert Haas
robertmhaas@gmail.com
In reply to: Thom Brown (#37)
Re: [DESIGN] ParallelAppend

On Tue, Nov 17, 2015 at 4:59 PM, Thom Brown <thom@linux.com> wrote:

On 17 November 2015 at 20:08, Robert Haas <robertmhaas@gmail.com> wrote:

On Tue, Nov 17, 2015 at 4:26 AM, Thom Brown <thom@linux.com> wrote:

However, the first parallel seq scan shows it getting 170314 rows.
Another run shows it getting 194165 rows. The final result is
correct, but as you can see from the rows on the Append node (59094295
rows), it doesn't match the number of rows on the Gather node
(30000000).

Is this the same issue reported in
/messages/by-id/CAFj8pRBF-i=qDg9b5nZrXYfChzBEZWmthxYPhidQvwoMOjHtzg@mail.gmail.com
and not yet fixed? I am inclined to think it probably is.

Yes, that seems to be the same issue.

I've committed a fix for that issue now, so you shouldn't see it any
more if you retest this patch.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#40Robert Haas
robertmhaas@gmail.com
In reply to: Amit Kapila (#38)
1 attachment(s)
Re: [DESIGN] ParallelAppend

On Wed, Nov 18, 2015 at 7:25 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Don't we need the startup cost incase we need to build partial paths for
joinpaths like mergepath?
Also, I think there are other cases for single relation scan where startup
cost can matter like when there are psuedoconstants in qualification
(refer cost_qual_eval_walker()) or let us say if someone has disabled
seq scan (disable_cost is considered as startup cost.)

I'm not saying that we don't need to compute it. I'm saying we don't
need to take it into consideration when deciding which paths have
merit. Note that consider_statup is set this way:

rel->consider_startup = (root->tuple_fraction > 0);

And for a joinrel:

joinrel->consider_startup = (root->tuple_fraction > 0);

root->tuple_fraction is 0 when we expect all tuples to be retrieved,
and parallel query can currently only be used when all tuples will be
retrieved.

B. I think partial path is an important concept and desrves some
explanation in src/backend/optimizer/README.
There is already a good explanation about Paths, so I think it
seems that it is better to add explanation about partial paths.

Good idea. In the attached, revised version of the patch, I've added
a large new section to that README.

2.
+ *  costs: parallelism is only used for plans that will be run to
completion.
+ *    Therefore, this
routine is much simpler than add_path: it needs to
+ *    consider only pathkeys and total cost.

There seems to be some spacing issue in last two lines.

Fixed.

A.
This means that for inheritance child relations for which rel pages are
less than parallel_threshold, it will always consider the cost shared
between 1 worker and leader as per below calc in cost_seqscan:
if (path->parallel_degree > 0)
run_cost = run_cost / (path->parallel_degree + 0.5);

I think this might not be the appropriate cost model for even for
non-inheritence relations which has pages more than parallel_threshold,
but it seems to be even worst for inheritance children which have
pages less than parallel_threshold

Why? I'm certainly open to patches to improve the cost model, but I
don't see why this patch needs to do that.

B.
Will it be possible that if none of the inheritence child rels (or very few
of them) are big enough for parallel scan, then considering Append
node for parallelism of any use or in other words, won't it be better
to generate plan as it is done now without this patch for such cases
considering current execution model of Gather node?

I think we should instead extend the Append node as suggested by
KaiGai, so that it can have both partial and non-partial children. I
think we can leave that to another patch, though. Aside from
questions of what the fastest plan are, it's very bad to have Gather
nodes under the Append, because the Append could have many children
and we could end up with a ton of Gather nodes, each using a DSM and a
bunch of workers. That's important to avoid.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachments:

gather-append-pushdown-v2.patchtext/x-diff; charset=US-ASCII; name=gather-append-pushdown-v2.patchDownload
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 012c14b..fe07176 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1588,6 +1588,7 @@ _outPathInfo(StringInfo str, const Path *node)
 	else
 		_outBitmapset(str, NULL);
 	WRITE_BOOL_FIELD(parallel_aware);
+	WRITE_INT_FIELD(parallel_degree);
 	WRITE_FLOAT_FIELD(rows, "%.0f");
 	WRITE_FLOAT_FIELD(startup_cost, "%.2f");
 	WRITE_FLOAT_FIELD(total_cost, "%.2f");
@@ -1764,7 +1765,6 @@ _outGatherPath(StringInfo str, const GatherPath *node)
 	_outPathInfo(str, (const Path *) node);
 
 	WRITE_NODE_FIELD(subpath);
-	WRITE_INT_FIELD(num_workers);
 	WRITE_BOOL_FIELD(single_copy);
 }
 
@@ -1887,6 +1887,7 @@ _outRelOptInfo(StringInfo str, const RelOptInfo *node)
 	WRITE_NODE_FIELD(reltargetlist);
 	WRITE_NODE_FIELD(pathlist);
 	WRITE_NODE_FIELD(ppilist);
+	WRITE_NODE_FIELD(partial_pathlist);
 	WRITE_NODE_FIELD(cheapest_startup_path);
 	WRITE_NODE_FIELD(cheapest_total_path);
 	WRITE_NODE_FIELD(cheapest_unique_path);
diff --git a/src/backend/optimizer/README b/src/backend/optimizer/README
index 916a518..5019804 100644
--- a/src/backend/optimizer/README
+++ b/src/backend/optimizer/README
@@ -851,4 +851,57 @@ lateral reference.  (Perhaps now that that stuff works, we could relax the
 pullup restriction?)
 
 
--- bjm & tgl
+Parallel Query and Partial Paths
+--------------------------------
+
+Parallel query involves dividing up the work that needs to be performed
+either by an entire query or some portion of the query in such a way that
+some of that work can be done by one or more worker processes, which are
+called parallel workers.  Parallel workers are a subtype of dynamic
+background workers; see src/backend/access/transam/README.parallel for a
+fuller description.  Academic literature on parallel query suggests that
+that parallel execution strategies can be divided into essentially two
+categories: pipelined parallelism, where the execution of the query is
+divided into multiple stages and each stage is handled by a separate
+process; and partitioning parallelism, where the data is split between
+multiple processes and each process handles a subset of it.  The
+literature, however, suggests that gains from pipeline parallelism are
+often very limited due to the difficulty of avoiding pipeline stalls.
+Consequently, we do not currently attempt to generate query plans that
+use this technique.
+
+Instead, we focus on partitioning paralellism, which does not require
+that the underlying table be partitioned.  It only requires that (1)
+there is some method of dividing the data from at least one of the base
+tables involved in the relation across multiple processes, (2) allowing
+each process to handle its own portion of the data, and then (3)
+collecting the results.  Requirements (2) and (3) is satisfied by the
+executor node Gather, which launches any number of worker processes and
+executes its single child plan in all of them (and perhaps in the leader
+also, if the children aren't generating enough data to keep the leader
+busy).  Requirement (1) is handled by the SeqScan node: when invoked
+with parallel_aware = true, this node will, in effect, partition the
+table on a block by block basis, returning a subset of the tuples from
+the relation in each worker where that SeqScan is executed.  A similar
+scheme could be (and probably should be) implemented for bitmap heap
+scans.
+
+Just as we do for non-parallel access methods, we build Paths to
+represent access strategies that can be used in a parallel plan.  These
+are, in essence, the same strategies that are available in the
+non-parallel plan, but there is an important difference: a path that
+will run beneath a Gather node returns only a subset of the query
+results in each worker, not all of them.  To form a path that can
+actually be executed, the (rather large) cost of the Gather node must be
+accounted for.  For this reason among others, paths intended to run
+beneath a Gather node - which we call "partial" paths since they return
+only a subset of the results in each worker - must be kept separate from
+ordinary paths (see RelOptInfo's partial_pathlist and the function
+add_partial_path).
+
+One of the keys to making parallel query effective is to run as much of
+the query in parallel as possible.  Therefore, we expect it to generally
+be desirable to postpone the Gather stage until as near to the top of the
+plan as possible.  Expanding the range of cases in which more work can be
+pushed below the Gather (and costly them accurately) is likely to keep us
+busy for a long time to come.
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 1fdcae5..fdbe13f 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -72,6 +72,7 @@ static void set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 				 Index rti, RangeTblEntry *rte);
 static void set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel,
 				   RangeTblEntry *rte);
+static void create_parallel_paths(PlannerInfo *root, RelOptInfo *rel);
 static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
 						  RangeTblEntry *rte);
 static bool function_rte_parallel_ok(RangeTblEntry *rte);
@@ -107,6 +108,7 @@ static void set_cte_pathlist(PlannerInfo *root, RelOptInfo *rel,
 				 RangeTblEntry *rte);
 static void set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel,
 					   RangeTblEntry *rte);
+static void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
 static RelOptInfo *make_rel_from_joinlist(PlannerInfo *root, List *joinlist);
 static bool subquery_is_pushdown_safe(Query *subquery, Query *topquery,
 						  pushdown_safety_info *safetyInfo);
@@ -612,7 +614,6 @@ static void
 set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 {
 	Relids		required_outer;
-	int			parallel_threshold = 1000;
 
 	/*
 	 * We don't support pushing join clauses into the quals of a seqscan, but
@@ -624,39 +625,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 	/* Consider sequential scan */
 	add_path(rel, create_seqscan_path(root, rel, required_outer, 0));
 
-	/* Consider parallel sequential scan */
-	if (rel->consider_parallel && rel->pages > parallel_threshold &&
-		required_outer == NULL)
-	{
-		Path *path;
-		int parallel_degree = 1;
-
-		/*
-		 * Limit the degree of parallelism logarithmically based on the size
-		 * of the relation.  This probably needs to be a good deal more
-		 * sophisticated, but we need something here for now.
-		 */
-		while (rel->pages > parallel_threshold * 3 &&
-			   parallel_degree < max_parallel_degree)
-		{
-			parallel_degree++;
-			parallel_threshold *= 3;
-			if (parallel_threshold >= PG_INT32_MAX / 3)
-				break;
-		}
-
-		/*
-		 * Ideally we should consider postponing the gather operation until
-		 * much later, after we've pushed joins and so on atop the parallel
-		 * sequential scan path.  But we don't have the infrastructure for
-		 * that yet, so just do this for now.
-		 */
-		path = create_seqscan_path(root, rel, required_outer, parallel_degree);
-		path = (Path *)
-			create_gather_path(root, rel, path, required_outer,
-							   parallel_degree);
-		add_path(rel, path);
-	}
+	/* If appropriate, consider parallel sequential scan */
+	if (rel->consider_parallel && required_outer == NULL)
+		create_parallel_paths(root, rel);
 
 	/* Consider index scans */
 	create_index_paths(root, rel);
@@ -666,6 +637,54 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 }
 
 /*
+ * create_parallel_paths
+ *	  Build parallel access paths for a plain relation
+ */
+static void
+create_parallel_paths(PlannerInfo *root, RelOptInfo *rel)
+{
+	int		parallel_threshold = 1000;
+	int		parallel_degree = 1;
+
+	/*
+	 * If this relation is too small to be worth a parallel scan, just return
+	 * without doing anything ... unless it's an inheritance child.  In that case,
+	 * we want to generate a parallel path here anyway.  It might not be worthwhile
+	 * just for this relation, but when combined with all of its inheritance siblings
+	 * it may well pay off.
+	 */
+	if (rel->pages < parallel_threshold && rel->reloptkind == RELOPT_BASEREL)
+		return;
+
+	/*
+	 * Limit the degree of parallelism logarithmically based on the size of the
+	 * relation.  This probably needs to be a good deal more sophisticated, but we
+	 * need something here for now.
+	 */
+	while (rel->pages > parallel_threshold * 3 &&
+		   parallel_degree < max_parallel_degree)
+	{
+		parallel_degree++;
+		parallel_threshold *= 3;
+		if (parallel_threshold >= PG_INT32_MAX / 3)
+			break;
+	}
+
+	/* Add an unordered partial path based on a parallel sequential scan. */
+	add_partial_path(rel, create_seqscan_path(root, rel, NULL, parallel_degree));
+
+	/*
+	 * If this is a baserel, consider gathering any partial paths we may have
+	 * just created.  If we gathered an inheritance child, we could end up with
+	 * a very large number of gather nodes, each trying to grab its own pool of
+	 * workers, so don't do this in that case.  Instead, we'll consider gathering
+	 * partial paths for the appendrel.
+	 */
+	if (rel->reloptkind == RELOPT_BASEREL)
+		generate_gather_paths(root, rel);
+}
+
+/*
  * set_tablesample_rel_size
  *	  Set size estimates for a sampled relation
  */
@@ -1039,6 +1058,8 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	List	   *live_childrels = NIL;
 	List	   *subpaths = NIL;
 	bool		subpaths_valid = true;
+	List	   *partial_subpaths = NIL;
+	bool		partial_subpaths_valid = true;
 	List	   *all_child_pathkeys = NIL;
 	List	   *all_child_outers = NIL;
 	ListCell   *l;
@@ -1093,6 +1114,13 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 		else
 			subpaths_valid = false;
 
+		/* Same idea, but for a partial plan. */
+		if (childrel->partial_pathlist != NIL)
+			partial_subpaths = accumulate_append_subpath(partial_subpaths,
+										linitial(childrel->partial_pathlist));
+		else
+			partial_subpaths_valid = false;
+
 		/*
 		 * Collect lists of all the available path orderings and
 		 * parameterizations for all the children.  We use these as a
@@ -1164,7 +1192,38 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	 * if we have zero or one live subpath due to constraint exclusion.)
 	 */
 	if (subpaths_valid)
-		add_path(rel, (Path *) create_append_path(rel, subpaths, NULL));
+		add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0));
+
+	/*
+	 * Consider an append of partial unordered, unparameterized partial paths.
+	 */
+	if (partial_subpaths_valid)
+	{
+		AppendPath *appendpath;
+		ListCell *lc;
+		int parallel_degree = 0;
+
+		/*
+		 * Decide what parallel degree to request for this append path.  For
+		 * now, we just use the maximum parallel degree of any member.  It
+		 * might be useful to use a higher number if the Append node were smart
+		 * enough to spread out the workers, but it currently isn't.
+		 */
+		foreach (lc, partial_subpaths)
+		{
+			Path *path = lfirst(lc);
+			parallel_degree = Max(parallel_degree, path->parallel_degree);
+		}
+		Assert(parallel_degree > 0);
+
+		/* Generate a partial append path. */
+		appendpath = create_append_path(rel, partial_subpaths, NULL,
+										parallel_degree);
+		add_partial_path(rel, (Path *) appendpath);
+
+		/* Consider gathering it. */
+		generate_gather_paths(root, rel);
+	}
 
 	/*
 	 * Also build unparameterized MergeAppend paths based on the collected
@@ -1214,7 +1273,7 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 
 		if (subpaths_valid)
 			add_path(rel, (Path *)
-					 create_append_path(rel, subpaths, required_outer));
+					 create_append_path(rel, subpaths, required_outer, 0));
 	}
 }
 
@@ -1440,8 +1499,9 @@ set_dummy_rel_pathlist(RelOptInfo *rel)
 
 	/* Discard any pre-existing paths; no further need for them */
 	rel->pathlist = NIL;
+	rel->partial_pathlist = NIL;
 
-	add_path(rel, (Path *) create_append_path(rel, NIL, NULL));
+	add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0));
 
 	/*
 	 * We set the cheapest path immediately, to ensure that IS_DUMMY_REL()
@@ -1844,6 +1904,35 @@ set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
 }
 
 /*
+ * generate_gather_paths
+ *		Generate parallel access paths for a relation by pushing a Gather on
+ *		top of a partial path.
+ */
+static void
+generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
+{
+	Path   *cheapest_partial_path;
+	Path   *simple_gather_path;
+
+	/* If there are no partial paths, there's nothing to do here. */
+	if (rel->partial_pathlist == NIL)
+		return;
+
+	/*
+	 * The output of Gather is currently always unsorted, so there's only one
+	 * partial path of interest: the cheapest one.
+	 *
+	 * Eventually, we should have a Gather Merge operation that can merge multiple
+	 * tuple streams together while preserving their ordering.  We could usefully
+	 * generate such a path from each partial path that has non-NIL pathkeys.
+	 */
+	cheapest_partial_path = linitial(rel->partial_pathlist);
+	simple_gather_path = (Path *)
+		create_gather_path(root, rel, cheapest_partial_path, NULL);
+	add_path(rel, simple_gather_path);
+}
+
+/*
  * make_rel_from_joinlist
  *	  Build access paths using a "joinlist" to guide the join path search.
  *
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 990486c..9d65be9 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -186,8 +186,7 @@ clamp_row_est(double nrows)
  */
 void
 cost_seqscan(Path *path, PlannerInfo *root,
-			 RelOptInfo *baserel, ParamPathInfo *param_info,
-			 int nworkers)
+			 RelOptInfo *baserel, ParamPathInfo *param_info)
 {
 	Cost		startup_cost = 0;
 	Cost		run_cost = 0;
@@ -232,8 +231,8 @@ cost_seqscan(Path *path, PlannerInfo *root,
 	 * This is almost certainly not exactly the right way to model this, so
 	 * this will probably need to be changed at some point...
 	 */
-	if (nworkers > 0)
-		run_cost = run_cost / (nworkers + 0.5);
+	if (path->parallel_degree > 0)
+		run_cost = run_cost / (path->parallel_degree + 0.5);
 
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c
index b2cc9f0..9b2b0b4 100644
--- a/src/backend/optimizer/path/joinrels.c
+++ b/src/backend/optimizer/path/joinrels.c
@@ -1069,9 +1069,10 @@ mark_dummy_rel(RelOptInfo *rel)
 
 	/* Evict any previously chosen paths */
 	rel->pathlist = NIL;
+	rel->partial_pathlist = NIL;
 
 	/* Set up the dummy path */
-	add_path(rel, (Path *) create_append_path(rel, NIL, NULL));
+	add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0));
 
 	/* Set or update cheapest_total_path and related fields */
 	set_cheapest(rel);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 411b36c..95d95f1 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1125,7 +1125,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
 
 	gather_plan = make_gather(subplan->targetlist,
 							  NIL,
-							  best_path->num_workers,
+							  best_path->path.parallel_degree,
 							  best_path->single_copy,
 							  subplan);
 
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 09c3244..354a9e9 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -685,6 +685,151 @@ add_path_precheck(RelOptInfo *parent_rel,
 	return true;
 }
 
+/*
+ * add_partial_path
+ *	  Like add_path, our goal here is to consider whether a path is worthy
+ *	  of being kept around, but the considerations here are a bit different.
+ *	  A partial path is one which can be executed in any number of workers in
+ *	  parallel such that each worker will generate a subset of the path's
+ *	  overall result.
+ *
+ *	  We don't generate parameterized partial paths because they seem unlikely
+ *	  ever to be worthwhile.  The only way we could ever use such a path is
+ *	  by executing a nested loop with a complete path on the outer side - thus,
+ *	  each worker would scan the entire outer relation - and the partial path
+ *	  on the inner side - thus, each worker would scan only part of the inner
+ *	  relation.  This is silly: a parameterized path is generally going to be
+ *	  based on an index scan, and we can't generate a partial path for that.
+ *	  And it's generally only going to produce a few rows, so splitting them
+ *	  up between workers doesn't really make sense.  It would be better to
+ *	  use an unparameterized partial path on the outer side of the join with
+ *	  a parameterized complete path on the inner side in virtually every case.
+ *
+ *	  Because we don't need to consider parameterized paths here, we also don't
+ *	  need to consider the row counts as a measure of quality: every path will
+ *	  produce the same number of rows.  Neither do we need to consider startup
+ *	  costs: parallelism is only used for plans that will be run to completion.
+ *	  Therefore, this routine is much simpler than add_path: it needs to
+ *	  consider only pathkeys and total cost.
+ */
+void
+add_partial_path(RelOptInfo *parent_rel, Path *new_path)
+{
+	bool		accept_new = true;		/* unless we find a superior old path */
+	ListCell   *insert_after = NULL;	/* where to insert new item */
+	ListCell   *p1;
+	ListCell   *p1_prev;
+	ListCell   *p1_next;
+
+	/* Check for query cancel. */
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * As in add_path, throw out any paths which are dominated by the new path,
+	 * but throw out the new path if some existing path dominates it.
+	 */
+	p1_prev = NULL;
+	for (p1 = list_head(parent_rel->partial_pathlist); p1 != NULL;
+		 p1 = p1_next)
+	{
+		Path	   *old_path = (Path *) lfirst(p1);
+		bool		remove_old = false; /* unless new proves superior */
+		PathKeysComparison keyscmp;
+
+		p1_next = lnext(p1);
+
+		/* Compare pathkeys. */
+		keyscmp = compare_pathkeys(new_path->pathkeys, old_path->pathkeys);
+
+		/* Unless pathkeys are incompable, keep just one of the two paths. */
+		if (keyscmp != PATHKEYS_DIFFERENT)
+		{
+			if (new_path->total_cost > old_path->total_cost * STD_FUZZ_FACTOR)
+			{
+				/* New path costs more; keep it only if pathkeys are better. */
+				if (keyscmp != PATHKEYS_BETTER1)
+					accept_new = false;
+			}
+			else if (old_path->total_cost > new_path->total_cost
+						* STD_FUZZ_FACTOR)
+			{
+				/* Old path costs more; keep it only if pathkeys are better. */
+				if (keyscmp != PATHKEYS_BETTER2)
+					remove_old = true;
+			}
+			else if (keyscmp == PATHKEYS_BETTER1)
+			{
+				/* Costs are about the same, new path has better pathkeys. */
+				remove_old = true;
+			}
+			else if (keyscmp == PATHKEYS_BETTER2)
+			{
+				/* Costs are about the same, old path has better pathkeys. */
+				accept_new = false;
+			}
+			else if (old_path->total_cost > new_path->total_cost * 1.0000000001)
+			{
+				/* Pathkeys are the same, and the old path costs more. */
+				remove_old = true;
+			}
+			else
+			{
+				/*
+				 * Pathkeys are the same, and new path isn't materially
+				 * cheaper.
+				 */
+				accept_new = false;
+			}
+		}
+
+		/*
+		 * Remove current element from partial_pathlist if dominated by new.
+		 */
+		if (remove_old)
+		{
+			parent_rel->partial_pathlist =
+				list_delete_cell(parent_rel->partial_pathlist, p1, p1_prev);
+			/* add_path has a special case for IndexPath; we don't need it */
+			Assert(!IsA(old_path, IndexPath));
+			pfree(old_path);
+			/* p1_prev does not advance */
+		}
+		else
+		{
+			/* new belongs after this old path if it has cost >= old's */
+			if (new_path->total_cost >= old_path->total_cost)
+				insert_after = p1;
+			/* p1_prev advances */
+			p1_prev = p1;
+		}
+
+		/*
+		 * If we found an old path that dominates new_path, we can quit
+		 * scanning the partial_pathlist; we will not add new_path, and we
+		 * assume new_path cannot dominate any later path.
+		 */
+		if (!accept_new)
+			break;
+	}
+
+	if (accept_new)
+	{
+		/* Accept the new path: insert it at proper place */
+		if (insert_after)
+			lappend_cell(parent_rel->partial_pathlist, insert_after, new_path);
+		else
+			parent_rel->partial_pathlist =
+				lcons(new_path, parent_rel->partial_pathlist);
+	}
+	else
+	{
+		/* add_path has a special case for IndexPath; we don't need it */
+		Assert(!IsA(new_path, IndexPath));
+		/* Reject and recycle the new path */
+		pfree(new_path);
+	}
+}
+
 
 /*****************************************************************************
  *		PATH NODE CREATION ROUTINES
@@ -697,7 +842,7 @@ add_path_precheck(RelOptInfo *parent_rel,
  */
 Path *
 create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
-					Relids required_outer, int nworkers)
+					Relids required_outer, int parallel_degree)
 {
 	Path	   *pathnode = makeNode(Path);
 
@@ -705,10 +850,11 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->parent = rel;
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
-	pathnode->parallel_aware = nworkers > 0 ? true : false;
+	pathnode->parallel_aware = parallel_degree > 0 ? true : false;
+	pathnode->parallel_degree = parallel_degree;
 	pathnode->pathkeys = NIL;	/* seqscan has unordered result */
 
-	cost_seqscan(pathnode, root, rel, pathnode->param_info, nworkers);
+	cost_seqscan(pathnode, root, rel, pathnode->param_info);
 
 	return pathnode;
 }
@@ -727,6 +873,7 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* samplescan has unordered result */
 
 	cost_samplescan(pathnode, root, rel, pathnode->param_info);
@@ -781,6 +928,7 @@ create_index_path(PlannerInfo *root,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = pathkeys;
 
 	/* Convert clauses to indexquals the executor can handle */
@@ -827,6 +975,7 @@ create_bitmap_heap_path(PlannerInfo *root,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->bitmapqual = bitmapqual;
@@ -853,6 +1002,7 @@ create_bitmap_and_path(PlannerInfo *root,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = NULL;	/* not used in bitmap trees */
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->bitmapquals = bitmapquals;
@@ -878,6 +1028,7 @@ create_bitmap_or_path(PlannerInfo *root,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = NULL;	/* not used in bitmap trees */
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->bitmapquals = bitmapquals;
@@ -903,6 +1054,7 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;		/* always unordered */
 
 	pathnode->tidquals = tidquals;
@@ -921,7 +1073,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
  * Note that we must handle subpaths = NIL, representing a dummy access path.
  */
 AppendPath *
-create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
+create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
+				   int parallel_degree)
 {
 	AppendPath *pathnode = makeNode(AppendPath);
 	ListCell   *l;
@@ -931,6 +1084,7 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer)
 	pathnode->path.param_info = get_appendrel_parampathinfo(rel,
 															required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = parallel_degree;
 	pathnode->path.pathkeys = NIL;		/* result is always considered
 										 * unsorted */
 	pathnode->subpaths = subpaths;
@@ -985,6 +1139,7 @@ create_merge_append_path(PlannerInfo *root,
 	pathnode->path.param_info = get_appendrel_parampathinfo(rel,
 															required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->subpaths = subpaths;
 
@@ -1060,6 +1215,7 @@ create_result_path(List *quals)
 	pathnode->path.parent = NULL;
 	pathnode->path.param_info = NULL;	/* there are no other rels... */
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = NIL;
 	pathnode->quals = quals;
 
@@ -1094,6 +1250,7 @@ create_material_path(RelOptInfo *rel, Path *subpath)
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = subpath->param_info;
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = subpath->pathkeys;
 
 	pathnode->subpath = subpath;
@@ -1155,6 +1312,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = subpath->param_info;
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 
 	/*
 	 * Assume the output is unsorted, since we don't necessarily have pathkeys
@@ -1328,7 +1486,7 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
  */
 GatherPath *
 create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
-				   Relids required_outer, int nworkers)
+				   Relids required_outer)
 {
 	GatherPath *pathnode = makeNode(GatherPath);
 
@@ -1336,11 +1494,18 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.parent = rel;
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
-	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = subpath->parallel_degree;
 	pathnode->path.pathkeys = NIL;		/* Gather has unordered result */
 
 	pathnode->subpath = subpath;
-	pathnode->num_workers = nworkers;
+	pathnode->single_copy = false;
+
+	if (pathnode->path.parallel_degree == 0)
+	{
+		pathnode->path.parallel_degree = 1;
+		pathnode->path.pathkeys = subpath->pathkeys;
+		pathnode->single_copy = true;
+	}
 
 	cost_gather(pathnode, root, rel, pathnode->path.param_info);
 
@@ -1393,6 +1558,7 @@ create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = pathkeys;
 
 	cost_subqueryscan(pathnode, root, rel, pathnode->param_info);
@@ -1416,6 +1582,7 @@ create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = pathkeys;
 
 	cost_functionscan(pathnode, root, rel, pathnode->param_info);
@@ -1439,6 +1606,7 @@ create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* result is always unordered */
 
 	cost_valuesscan(pathnode, root, rel, pathnode->param_info);
@@ -1461,6 +1629,7 @@ create_ctescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer)
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* XXX for now, result is always unordered */
 
 	cost_ctescan(pathnode, root, rel, pathnode->param_info);
@@ -1484,6 +1653,7 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->param_info = get_baserel_parampathinfo(root, rel,
 													 required_outer);
 	pathnode->parallel_aware = false;
+	pathnode->parallel_degree = 0;
 	pathnode->pathkeys = NIL;	/* result is always unordered */
 
 	/* Cost is the same as for a regular CTE scan */
@@ -1516,6 +1686,7 @@ create_foreignscan_path(PlannerInfo *root, RelOptInfo *rel,
 	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
 														  required_outer);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.rows = rows;
 	pathnode->path.startup_cost = startup_cost;
 	pathnode->path.total_cost = total_cost;
@@ -1651,6 +1822,7 @@ create_nestloop_path(PlannerInfo *root,
 								  required_outer,
 								  &restrict_clauses);
 	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_degree = 0;
 	pathnode->path.pathkeys = pathkeys;
 	pathnode->jointype = jointype;
 	pathnode->outerjoinpath = outer_path;
@@ -1709,6 +1881,7 @@ create_mergejoin_path(PlannerInfo *root,
 								  required_outer,
 								  &restrict_clauses);
 	pathnode->jpath.path.parallel_aware = false;
+	pathnode->jpath.path.parallel_degree = 0;
 	pathnode->jpath.path.pathkeys = pathkeys;
 	pathnode->jpath.jointype = jointype;
 	pathnode->jpath.outerjoinpath = outer_path;
@@ -1766,6 +1939,7 @@ create_hashjoin_path(PlannerInfo *root,
 								  required_outer,
 								  &restrict_clauses);
 	pathnode->jpath.path.parallel_aware = false;
+	pathnode->jpath.path.parallel_degree = 0;
 
 	/*
 	 * A hashjoin never has pathkeys, since its output ordering is
diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c
index 996b7fe..8d7ac48 100644
--- a/src/backend/optimizer/util/relnode.c
+++ b/src/backend/optimizer/util/relnode.c
@@ -107,6 +107,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptKind reloptkind)
 	rel->reltargetlist = NIL;
 	rel->pathlist = NIL;
 	rel->ppilist = NIL;
+	rel->partial_pathlist = NIL;
 	rel->cheapest_startup_path = NULL;
 	rel->cheapest_total_path = NULL;
 	rel->cheapest_unique_path = NULL;
@@ -369,6 +370,7 @@ build_join_rel(PlannerInfo *root,
 	joinrel->reltargetlist = NIL;
 	joinrel->pathlist = NIL;
 	joinrel->ppilist = NIL;
+	joinrel->partial_pathlist = NIL;
 	joinrel->cheapest_startup_path = NULL;
 	joinrel->cheapest_total_path = NULL;
 	joinrel->cheapest_unique_path = NULL;
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 9a0dd28..bdf4c53 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -458,6 +458,7 @@ typedef struct RelOptInfo
 	List	   *reltargetlist;	/* Vars to be output by scan of relation */
 	List	   *pathlist;		/* Path structures */
 	List	   *ppilist;		/* ParamPathInfos used in pathlist */
+	List	   *partial_pathlist;	/* partial Paths */
 	struct Path *cheapest_startup_path;
 	struct Path *cheapest_total_path;
 	struct Path *cheapest_unique_path;
@@ -755,6 +756,7 @@ typedef struct Path
 	RelOptInfo *parent;			/* the relation this path can build */
 	ParamPathInfo *param_info;	/* parameterization info, or NULL if none */
 	bool		parallel_aware; /* engage parallel-aware logic? */
+	int			parallel_degree; /* desired parallel degree; 0 = not parallel */
 
 	/* estimated size/costs for path (see costsize.c for more info) */
 	double		rows;			/* estimated number of result tuples */
@@ -1057,7 +1059,6 @@ typedef struct GatherPath
 {
 	Path		path;
 	Path	   *subpath;		/* path for each worker */
-	int			num_workers;	/* number of workers sought to help */
 	bool		single_copy;	/* path must not be executed >1x */
 } GatherPath;
 
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac21a3a..25a7303 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -72,7 +72,7 @@ extern double clamp_row_est(double nrows);
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
 					double index_pages, PlannerInfo *root);
 extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
-			 ParamPathInfo *param_info, int nworkers);
+			 ParamPathInfo *param_info);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 				ParamPathInfo *param_info);
 extern void cost_index(IndexPath *path, PlannerInfo *root,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index f28b4e2..38d4859 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -29,9 +29,10 @@ extern void add_path(RelOptInfo *parent_rel, Path *new_path);
 extern bool add_path_precheck(RelOptInfo *parent_rel,
 				  Cost startup_cost, Cost total_cost,
 				  List *pathkeys, Relids required_outer);
+extern void add_partial_path(RelOptInfo *parent_rel, Path *new_path);
 
 extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
-					Relids required_outer, int nworkers);
+					Relids required_outer, int parallel_degree);
 extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel,
 					   Relids required_outer);
 extern IndexPath *create_index_path(PlannerInfo *root,
@@ -59,7 +60,7 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root,
 extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
 					List *tidquals, Relids required_outer);
 extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths,
-				   Relids required_outer);
+				   Relids required_outer, int parallel_degree);
 extern MergeAppendPath *create_merge_append_path(PlannerInfo *root,
 						 RelOptInfo *rel,
 						 List *subpaths,
@@ -70,8 +71,7 @@ extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath);
 extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
 				   Path *subpath, SpecialJoinInfo *sjinfo);
 extern GatherPath *create_gather_path(PlannerInfo *root,
-				   RelOptInfo *rel, Path *subpath, Relids required_outer,
-				   int nworkers);
+				   RelOptInfo *rel, Path *subpath, Relids required_outer);
 extern Path *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
 						 List *pathkeys, Relids required_outer);
 extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
#41Amit Kapila
amit.kapila16@gmail.com
In reply to: Robert Haas (#40)
Re: [DESIGN] ParallelAppend

On Thu, Nov 19, 2015 at 12:27 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Wed, Nov 18, 2015 at 7:25 AM, Amit Kapila <amit.kapila16@gmail.com>

wrote:

Don't we need the startup cost incase we need to build partial paths for
joinpaths like mergepath?
Also, I think there are other cases for single relation scan where

startup

cost can matter like when there are psuedoconstants in qualification
(refer cost_qual_eval_walker()) or let us say if someone has disabled
seq scan (disable_cost is considered as startup cost.)

I'm not saying that we don't need to compute it. I'm saying we don't
need to take it into consideration when deciding which paths have
merit. Note that consider_statup is set this way:

rel->consider_startup = (root->tuple_fraction > 0);

Even when consider_startup is false, still startup_cost is used for cost
calc, now may be ignoring that is okay for partial paths, but still it seems
worth thinking why leaving for partial paths it is okay even though it
is used in add_path().

+ *  We don't generate parameterized partial paths because they seem
unlikely
+ *  ever to be
worthwhile.  The only way we could ever use such a path is
+ *  by executing a nested loop with a complete
path on the outer side - thus,
+ *  each worker would scan the entire outer relation - and the partial
path
+ *  on the inner side - thus, each worker would scan only part of the inner
+ *  relation.  This is
silly: a parameterized path is generally going to be
+ *  based on an index scan, and we can't generate a
partial path for that.

Won't it be useful to consider parameterized paths for below kind of
plans where we can push the jointree to worker and each worker can
scan the complete outer relation A and then the rest work is divided
among workers (ofcourse there can be other ways to parallelize such joins,
but still the way described also seems to be possible)?

NestLoop
-> Seq Scan on A
Hash Join
Join Condition: B.Y = C.W
-> Seq Scan on B
-> Index Scan using C_Z_IDX on C
Index Condition: C.Z = A.X

-
Is the main reason to have add_partial_path() is that it has some
less checks or is it that current add_path will give wrong answers
in any case?

If there is no case where add_path can't work, then there is some
advanatge in retaining add_path() atleast in terms of maintining
the code.

+void
+add_partial_path(RelOptInfo *parent_rel, Path *new_path)
{
..
+ /* Unless pathkeys are incompable, keep just one of the two paths. */
..

typo - 'incompable'

A.
This means that for inheritance child relations for which rel pages are
less than parallel_threshold, it will always consider the cost shared
between 1 worker and leader as per below calc in cost_seqscan:
if (path->parallel_degree > 0)
run_cost = run_cost / (path->parallel_degree + 0.5);

I think this might not be the appropriate cost model for even for
non-inheritence relations which has pages more than parallel_threshold,
but it seems to be even worst for inheritance children which have
pages less than parallel_threshold

Why?

Because I think the way code is written, it assumes that for each of the
inheritence-child relation which has pages lesser than threshold, half
the work will be done by master-backend which doesn't seem to be the
right distribution. Consider a case where there are three such children
each having cost 100 to scan, now it will cost them as
100/1.5 + 100/1.5 + 100/1.5 which means that per worker, it is
considering 0.5 of master backends work which seems to be wrong.

I think for Append case, we should consider this cost during Append path
creation in create_append_path(). Basically we can make cost_seqscan
to ignore the cost reduction due to parallel_degree for inheritance
relations
and then during Append path creation we can consider it and also consider
work unit of master backend as 0.5 with respect to overall work.

-
--- a/src/backend/optimizer/README
+++ b/src/backend/optimizer/README
+plan as possible.  Expanding the range of cases in which more work can be
+pushed below the Gather (and
costly them accurately) is likely to keep us
+busy for a long time to come.

Seems there is a typo in above text.
/costly/cost

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#42Amit Kapila
amit.kapila16@gmail.com
In reply to: Amit Kapila (#41)
Re: [DESIGN] ParallelAppend

On Thu, Nov 19, 2015 at 1:29 PM, Amit Kapila <amit.kapila16@gmail.com>
wrote:

On Thu, Nov 19, 2015 at 12:27 AM, Robert Haas <robertmhaas@gmail.com>

wrote:

-
Is the main reason to have add_partial_path() is that it has some
less checks or is it that current add_path will give wrong answers
in any case?

If there is no case where add_path can't work, then there is some
advanatge in retaining add_path() atleast in terms of maintining
the code.

To be specific, I mean to say about the logic of add_path().

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#43Robert Haas
robertmhaas@gmail.com
In reply to: Amit Kapila (#41)
Re: [DESIGN] ParallelAppend

On Thu, Nov 19, 2015 at 2:59 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Thu, Nov 19, 2015 at 12:27 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Wed, Nov 18, 2015 at 7:25 AM, Amit Kapila <amit.kapila16@gmail.com>
wrote:

Don't we need the startup cost incase we need to build partial paths for
joinpaths like mergepath?
Also, I think there are other cases for single relation scan where
startup
cost can matter like when there are psuedoconstants in qualification
(refer cost_qual_eval_walker()) or let us say if someone has disabled
seq scan (disable_cost is considered as startup cost.)

I'm not saying that we don't need to compute it. I'm saying we don't
need to take it into consideration when deciding which paths have
merit. Note that consider_statup is set this way:

rel->consider_startup = (root->tuple_fraction > 0);

Even when consider_startup is false, still startup_cost is used for cost
calc, now may be ignoring that is okay for partial paths, but still it seems
worth thinking why leaving for partial paths it is okay even though it
is used in add_path().

That is explained in the comments, and I just explained it again in my
previous email. I'm not sure how much clearer I can make it. For a
regular path, it might sometimes be useful to pick a path with a
higher total cost if it has a lower startup cost. The reason this
could be useful is because we might not run the resulting plan to
completion. However, parallel queries are always run to completion,
so a lower startup cost isn't useful. We just want the path with the
lowest total cost. I don't know what else to say here unless you can
ask a more specific question.

Won't it be useful to consider parameterized paths for below kind of
plans where we can push the jointree to worker and each worker can
scan the complete outer relation A and then the rest work is divided
among workers (ofcourse there can be other ways to parallelize such joins,
but still the way described also seems to be possible)?

NestLoop
-> Seq Scan on A
Hash Join
Join Condition: B.Y = C.W
-> Seq Scan on B
-> Index Scan using C_Z_IDX on C
Index Condition: C.Z = A.X

I had thought that this sort of plan wouldn't actually occur in real
life, but it seems that it does. What you've written here is a little
muddled - the hash join has no hash underneath, for example, and
there'd have to be some sort of join order restriction in order to
consider a plan of this type. However, somewhat to my surprise, I was
able to get a plan much like this by doing this:

rhaas=# create table a (x int);
CREATE TABLE
rhaas=# insert into a values (1);
INSERT 0 1
rhaas=# create table b (y int, filler text);
CREATE TABLE
rhaas=# insert into b select g,
random()::text||random()::text||random()::text||random()::text from
generate_series(1,1000000) g;
INSERT 0 1000000
rhaas=# create table c (z int, w int, filler text);
CREATE TABLE
rhaas=# insert into c select g, g,
random()::text||random()::text||random()::text||random()::text from
generate_series(1,1000000) g;
INSERT 0 1000000
rhaas=# create index c_z_idx on c (z);
CREATE INDEX
rhaas=# vacuum analyze;
VACUUM
rhaas=# explain analyze select * from A LEFT JOIN (B INNER JOIN C ON
B.Y = C.W) ON C.Z = A.x;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------
Nested Loop Left Join (cost=8.46..26810.48 rows=1 width=152) (actual
time=0.076..166.946 rows=1 loops=1)
-> Seq Scan on a (cost=0.00..1.01 rows=1 width=4) (actual
time=0.015..0.016 rows=1 loops=1)
-> Hash Join (cost=8.46..26809.47 rows=1 width=148) (actual
time=0.057..166.925 rows=1 loops=1)
Hash Cond: (b.y = c.w)
-> Seq Scan on b (cost=0.00..23051.00 rows=1000000
width=72) (actual time=0.012..89.013 rows=1000000 loops=1)
-> Hash (cost=8.44..8.44 rows=1 width=76) (actual
time=0.035..0.035 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> Index Scan using c_z_idx on c (cost=0.42..8.44
rows=1 width=76) (actual time=0.031..0.032 rows=1 loops=1)
Index Cond: (z = a.x)
Planning time: 0.394 ms
Execution time: 167.015 ms
(11 rows)

This is extremely narrow. If you have more than one row in A, the
planner doesn't pick a nested loop. And if you're actually going to
be running queries like this frequently, then you should create an
index on B (Y), which causes you to get an execution time of ~ 0.2 ms
instead of 167 ms, because we generate a parameterized nestloop over
two index scans. But if you have no index on B (Y) and a does contain
precisely one row, then you can get this sort of plan, and hey, the
runtime is even long enough for parallelism to potentially be useful.

But after thinking about it for a while, I realized that even if you
think it's important to cater to that case, you still can't just
switch the sequential scan on B out for a parallel sequential scan and
stick a Gather node on top. It will not work. The problem is that,
although we would probably only pick this plan if A contains <= 1 row,
it might turn out at execution time that a second row has been
inserted since statistics were analyzed. Then, we'd need to rescan B.
But it isn't necessarily the case that every worker would finish the
first scan of B at exactly the same time. The first worker to finish
scanning B would punch the rescan button, and now chaos ensues,
because all the other workers now start seeing - for the same row in A
- rows from B that have already been processed. Oops. To make it
safe, you'd need some kind of synchronization that would guarantee
that nobody tries to rescan B until everybody has finished the initial
scan. We do not have such a mechanism today, so this kind of plan is
simply unsafe.

There's another problem, too. Suppose there were also a volatile
filter condition on A. It's possible that scans in two different
workers could arrive at two different conclusions about which tuple
from A to perform the B/C join for first. So now we've got each of
them receiving part of the B/C rows and joining them against two
different A tuples, which is nonsense. We could handle this problem
by not generating this sort of plan when there are any volatile filter
conditions on A, but that sounds pretty fragile. We could also handle
it by sticking the Gather node on top of the Hash Join instead of at
the top of the plan tree, which sounds a lot safer but would require
executor infrastructure we don't have - specifically, the
parameter-passing stuff.

So, all in all, I think this isn't a very promising type of plan -
both because we haven't got the infrastructure to make it safe to
execute today, and because even if we did have that infrastructure it
wouldn't be the right choice except in narrow circumstances. We can
of course revise that decision in the future if things look different
then.

Is the main reason to have add_partial_path() is that it has some
less checks or is it that current add_path will give wrong answers
in any case?

The main reason is that it adds things to the partial_pathlist rather
than the pathlist, but the fact that it has fewer checks is a very
nice bonus. add_path() is performance-critical, and I'd rather not
complicate it further with more if statements, especially when a much
much simpler version will do for partial paths.

typo - 'incompable'

OK, I can fix that.

A.
This means that for inheritance child relations for which rel pages are
less than parallel_threshold, it will always consider the cost shared
between 1 worker and leader as per below calc in cost_seqscan:
if (path->parallel_degree > 0)
run_cost = run_cost / (path->parallel_degree + 0.5);

I think this might not be the appropriate cost model for even for
non-inheritence relations which has pages more than parallel_threshold,
but it seems to be even worst for inheritance children which have
pages less than parallel_threshold

Why?

Because I think the way code is written, it assumes that for each of the
inheritence-child relation which has pages lesser than threshold, half
the work will be done by master-backend which doesn't seem to be the
right distribution. Consider a case where there are three such children
each having cost 100 to scan, now it will cost them as
100/1.5 + 100/1.5 + 100/1.5 which means that per worker, it is
considering 0.5 of master backends work which seems to be wrong.

I think for Append case, we should consider this cost during Append path
creation in create_append_path(). Basically we can make cost_seqscan
to ignore the cost reduction due to parallel_degree for inheritance
relations
and then during Append path creation we can consider it and also consider
work unit of master backend as 0.5 with respect to overall work.

No, I don't think that's right. It's true that the way we're
calculating parallel_degree for each relation is unprincipled right
now, and we need to improve that. But if it were correct, then what
we're doing here would also be correct. If the number of workers
chosen for each child plan reflected the maximum number that could be
used effectively by that child plan, then any extras wouldn't speed
things up even if they were present, so the Append's cost calculation
would be right.

-
--- a/src/backend/optimizer/README
+++ b/src/backend/optimizer/README
+plan as possible.  Expanding the range of cases in which more work can be
+pushed below the Gather (and
costly them accurately) is likely to keep us
+busy for a long time to come.

Seems there is a typo in above text.
/costly/cost

OK.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#44Amit Kapila
amit.kapila16@gmail.com
In reply to: Robert Haas (#43)
Re: [DESIGN] ParallelAppend

On Fri, Nov 20, 2015 at 1:25 AM, Robert Haas <robertmhaas@gmail.com> wrote:

On Thu, Nov 19, 2015 at 2:59 AM, Amit Kapila <amit.kapila16@gmail.com>

wrote:

Won't it be useful to consider parameterized paths for below kind of
plans where we can push the jointree to worker and each worker can
scan the complete outer relation A and then the rest work is divided
among workers (ofcourse there can be other ways to parallelize such

joins,

but still the way described also seems to be possible)?

NestLoop
-> Seq Scan on A
Hash Join
Join Condition: B.Y = C.W
-> Seq Scan on B
-> Index Scan using C_Z_IDX on C
Index Condition: C.Z = A.X

I had thought that this sort of plan wouldn't actually occur in real
life, but it seems that it does. What you've written here is a little
muddled - the hash join has no hash underneath, for example, and
there'd have to be some sort of join order restriction in order to
consider a plan of this type. However, somewhat to my surprise, I was
able to get a plan much like this by doing this:

..

So, all in all, I think this isn't a very promising type of plan -
both because we haven't got the infrastructure to make it safe to
execute today, and because even if we did have that infrastructure it
wouldn't be the right choice except in narrow circumstances.

I think not only above type of plan, but it would be helpful to parallelize
some other forms of joins ((refer "Parameterized Paths" section in
optimiser/README) as well where parametrized params concept
will be required. I am not sure if we can say that such cases will be
narrow, so let's leave them, but surely we don't have enough infrastructure
at the moment to parallelize them.

We can
of course revise that decision in the future if things look different
then.

No issues. The main reason why I brought up this discussion is to
see the possibility of keeping logic of add_partial_path() and add_path()
same, so that it is easy to maintain. There is no correctness issue here,
so I defer it to you.

Because I think the way code is written, it assumes that for each of the
inheritence-child relation which has pages lesser than threshold, half
the work will be done by master-backend which doesn't seem to be the
right distribution. Consider a case where there are three such children
each having cost 100 to scan, now it will cost them as
100/1.5 + 100/1.5 + 100/1.5 which means that per worker, it is
considering 0.5 of master backends work which seems to be wrong.

I think for Append case, we should consider this cost during Append path
creation in create_append_path(). Basically we can make cost_seqscan
to ignore the cost reduction due to parallel_degree for inheritance
relations
and then during Append path creation we can consider it and also

consider

work unit of master backend as 0.5 with respect to overall work.

No, I don't think that's right. It's true that the way we're
calculating parallel_degree for each relation is unprincipled right
now, and we need to improve that. But if it were correct, then what
we're doing here would also be correct. If the number of workers
chosen for each child plan reflected the maximum number that could be
used effectively by that child plan, then any extras wouldn't speed
things up even if they were present,

Okay, but I think that's not what I am talking about. I am talking about
below code in cost_seqscan:

- if (nworkers > 0)

- run_cost = run_cost / (nworkers + 0.5);

+ if (path->parallel_degree > 0)

+ run_cost = run_cost / (path->parallel_degree + 0.5);

It will consider 50% of master backends effort for scan of each child
relation,
does that look correct to you? Wouldn't 50% of master backends effort be
considered to scan all the child relations?

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#45Robert Haas
robertmhaas@gmail.com
In reply to: Amit Kapila (#44)
Re: [DESIGN] ParallelAppend

On Fri, Nov 20, 2015 at 12:45 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Okay, but I think that's not what I am talking about. I am talking about
below code in cost_seqscan:

- if (nworkers > 0)

- run_cost = run_cost / (nworkers + 0.5);

+ if (path->parallel_degree > 0)

+ run_cost = run_cost / (path->parallel_degree + 0.5);

It will consider 50% of master backends effort for scan of each child
relation,
does that look correct to you? Wouldn't 50% of master backends effort be
considered to scan all the child relations?

In the code you originally wrote, you were adding 1 there rather than
0.5. That meant you were expecting the leader to do as much work as
each of its workers, which is clearly a bad estimate, because the
leader also has to do the work of gathering tuples from the workers.
0.5 might not be the right value, but it's surely better than 1.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#46Amit Kapila
amit.kapila16@gmail.com
In reply to: Robert Haas (#45)
Re: [DESIGN] ParallelAppend

On Fri, Nov 20, 2015 at 7:06 PM, Robert Haas <robertmhaas@gmail.com> wrote:

On Fri, Nov 20, 2015 at 12:45 AM, Amit Kapila <amit.kapila16@gmail.com>

wrote:

Okay, but I think that's not what I am talking about. I am talking

about

below code in cost_seqscan:

- if (nworkers > 0)

- run_cost = run_cost / (nworkers + 0.5);

+ if (path->parallel_degree > 0)

+ run_cost = run_cost / (path->parallel_degree + 0.5);

It will consider 50% of master backends effort for scan of each child
relation,
does that look correct to you? Wouldn't 50% of master backends effort

be

considered to scan all the child relations?

In the code you originally wrote, you were adding 1 there rather than
0.5. That meant you were expecting the leader to do as much work as
each of its workers, which is clearly a bad estimate, because the
leader also has to do the work of gathering tuples from the workers.
0.5 might not be the right value, but it's surely better than 1.

Without this patch, that 0.5 (or 50% of leaders effort) is considered for
Gather node irrespective of the number of workers or other factors, but
I think with Patch that is no longer true and that's what I am worrying
about.

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

#47Robert Haas
robertmhaas@gmail.com
In reply to: Amit Kapila (#46)
Re: [DESIGN] ParallelAppend

On Mon, Nov 23, 2015 at 7:45 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Without this patch, that 0.5 (or 50% of leaders effort) is considered for
Gather node irrespective of the number of workers or other factors, but
I think with Patch that is no longer true and that's what I am worrying
about.

Nope, that patch does not change that at all. We probably should, but
this patch does not.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#48Amit Kapila
amit.kapila16@gmail.com
In reply to: Robert Haas (#47)
3 attachment(s)
Re: [DESIGN] ParallelAppend

On Mon, Nov 23, 2015 at 10:39 PM, Robert Haas <robertmhaas@gmail.com> wrote:

On Mon, Nov 23, 2015 at 7:45 AM, Amit Kapila <amit.kapila16@gmail.com>

wrote:

Without this patch, that 0.5 (or 50% of leaders effort) is considered

for

Gather node irrespective of the number of workers or other factors, but
I think with Patch that is no longer true and that's what I am worrying
about.

Nope, that patch does not change that at all. We probably should, but
this patch does not.

I have taken some performance data with this patch.

- Select data from inheritance hierarchy with very few tuples.

Create table parent_rel(c1 int, c2 text);
Create table child1_rel () Inherits (parent_rel);
Create table child2_rel () Inherits (parent_rel);

insert into parent_rel values(generate_series(1,15), 'aaaa');
insert into child1_rel values(generate_series(10,20),'aaa');
insert into child2_rel values(generate_series(20,30),'aaa');

Analyze parent_rel;
Analyze child1_rel;
Analyze child2_rel;

set max_parallel_degree=4;
set parallel_setup_cost=0;
set parallel_tuple_cost=0.01;

postgres=# explain select count(*) from parent_rel;
QUERY PLAN

--------------------------------------------------------------------------------
------
Aggregate (cost=2.71..2.72 rows=1 width=0)
-> Gather (cost=0.00..2.62 rows=37 width=0)
Number of Workers: 1
-> Append (cost=0.00..2.25 rows=37 width=0)
-> Parallel Seq Scan on parent_rel (cost=0.00..0.77
rows=15 width=0)
-> Parallel Seq Scan on child1_rel (cost=0.00..0.74
rows=11 width=0)
-> Parallel Seq Scan on child2_rel (cost=0.00..0.74
rows=11 width=0)

I have changed parallel_setup_cost and parallel_tuple_cost, so
it is selecting Gather path even for a small relation. However,
the same won't be true for non-inheritence relation as if the number
of pages in relation are below than threshold (1000), it won't select
parallel path. Now here we might want to have similar restriction for
Append Relation as well, that if combining all the child subpaths doesn't
have more than threshold number of pages, then don't try to build the
parallel path.

- Choose the data set that fits in shared_buffers and then run statements
with different selectivity and max_parallel_degree

Test setup
----------------
1. Use, pgbench -i -s 100 <db_name> to create initial data.
2. Use attached pgbench_partitions.sql to create 10 partitions with equal
data.
3. Use, parallel_append.sh to execute statements with different Selectivity
and max_parallel_degree (changed parallel_tuple_cost to 0.001)

Selection_criteria – 1% of rows will be selected and used costly function
evaluation for each row

Head

*max_parallel_degree* *exec_time (ms)* *workers_used*
0 76202 0
2 28556 2
4 21620 3
8 21693 3
16 21654 3
32 21579 3
64 21474 3

Patch

*max_parallel_degree* *exec_time (ms)* *workers_used*
0 77027 0
2 27088 2
4 16648 4
8 13730 5
16 13787 5
32 13794 5
64 13872 5

So here we can see that with Patch, performance is better, but I
think that is mainly due to number of workers working on a plan.
It is not clear that if we would have allowed more workers to
work at higher max_parallel_degree whether that can give us any
substantial benefit, but anyway I think thats a generic worker allocation
improvement which is not directly related to this patch. The data
at different selectivities can be found in the attached document,
more or less that shows a similar trend. Apart from this, I have tried
with data set which doesn't fit shared buffers, but fit in RAM, for that
also it shows similar trend.

Patch looks good, apart from worker allocation stuff, but I think we
can deal with that separately.

With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

Attachments:

pgbench_partitions.sqlapplication/octet-stream; name=pgbench_partitions.sqlDownload
parallel_append.shapplication/x-sh; name=parallel_append.shDownload
parallel_append_data.odsapplication/vnd.oasis.opendocument.spreadsheet; name=parallel_append_data.odsDownload