Parallel Seq Scan
As per discussion on another thread related to using
custom scan nodes for prototype of parallel sequence scan,
I have developed the same, but directly by adding
new nodes for parallel sequence scan. There might be
some advantages for developing this as a contrib
module by using custom scan nodes, however I think
we might get stucked after some point due to custom
scan node capability as pointed out by Andres.
The basic idea used is that while evaluating the cheapest
path for scan, optimizer will also evaluate if it can use
parallel seq path. Currently I have kept a very simple
model to calculate the cost of parallel sequence path which
is that divide the cost for CPU and disk by availble number
of worker backends (We can enhance it based on further
experiments and discussion; we need to consider worker startup
and dynamic shared memory setup cost as well). The work aka
scan of blocks is divided equally among all workers (except for
corner cases where blocks can't be equally divided among workers,
the last worker will be responsible for scanning the remaining blocks).
The number of worker backends that can be used for
parallel seq scan can be configured by using a new GUC
parallel_seqscan_degree, the default value of which is zero
and it means parallel seq scan will not be considered unless
user configures this value.
In ExecutorStart phase, initiate the required number of workers
as per parallel seq scan plan and setup dynamic shared memory and
share the information required for worker to execute the scan.
Currently I have just shared the relId, targetlist and number
of blocks to be scanned by worker, however I think we might want
to generate a plan for each of the workers in master backend and
then share the same to individual worker.
Now to fetch the data from multiple queues corresponding to each
worker a simple mechanism is used that is fetch from first queue
till all the data is consumed from same, then fetch from second
queue and so on. Also here master backend is responsible for just
getting the data from workers and passing it back to client.
I am sure that we can improve this strategy in many ways
like by making master backend to also perform scan for some
of the blocks rather than just getting data from workers and
a better strategy to fetch the data from multiple queues.
Worker backend will receive the information related to scan
from master backend and generate the plan from same and
execute that plan, so here the work to scan the data after
generating the plan is very much similar to exec_simple_query()
(i.e Create the portal and run it based on planned statement)
except that worker backends will initialize the block range it want to
scan in executor initialization phase (ExecInitSeqScan()).
Workers will exit after sending the data to master backend
which essentially means that for each execution we need
to initiate the workers, I think here we can improve by giving the
control for workers to postmaster so that we don't need to
initialize them each time during execution, however this can
be a totally separate optimization which is better to be done
independently of this patch.
As currently we don't have mechanism to share transaction
state, I have used separate transaction in worker backend to
execute the plan.
Any error in master backend either via backend worker or due
to other issue in master backend itself should terminate all the
workers before aborting the transaction.
We can't do it with the error context callback mechanism
(error_context_stack) which we use at other places in code, as
for this case we need it from the time workers are started till
the execution is complete (error_context_stack could get reset
once the control goes out of the function which has set it.)
One way could be that maintain the callback information in
TransactionState and use it to kill the workers before aborting
transaction in main backend. Another could be that have another
variable similar to error_context_stack (which will be used
specifically for storing the workers state), and kill the workers
in errfinish via callback. Currently I have handled it at the time of
detaching from shared memory.
Another point that needs to be taken care in worker backend is
that if any error occurs, we should *not* abort the transaction as
the transaction state is shared across all workers.
Currently the parallel seq scan will not be considered
for statements other than SELECT or if there is a join in
the statement or if statement contains quals or if target
list contains non-Var fields. We can definitely support
simple quals and targetlist other than non-Vars. By simple,
I means that it should not contain functions or some other
conditions which can't be pushed down to worker backend.
Behaviour of some simple statements with patch is as below:
postgres=# create table t1(c1 int, c2 char(500)) with (fillfactor=10);
CREATE TABLE
postgres=# insert into t1 values(generate_series(1,100),'amit');
INSERT 0 100
postgres=# explain select c1 from t1;
QUERY PLAN
------------------------------------------------------
Seq Scan on t1 (cost=0.00..101.00 rows=100 width=4)
(1 row)
postgres=# set parallel_seqscan_degree=4;
SET
postgres=# explain select c1 from t1;
QUERY PLAN
--------------------------------------------------------------
Parallel Seq Scan on t1 (cost=0.00..25.25 rows=100 width=4)
Number of Workers: 4
Number of Blocks Per Workers: 25
(3 rows)
postgres=# explain select Distinct(c1) from t1;
QUERY PLAN
--------------------------------------------------------------------
HashAggregate (cost=25.50..26.50 rows=100 width=4)
Group Key: c1
-> Parallel Seq Scan on t1 (cost=0.00..25.25 rows=100 width=4)
Number of Workers: 4
Number of Blocks Per Workers: 25
(5 rows)
Attached patch is just to facilitate the discussion about the
parallel seq scan and may be some other dependent tasks like
sharing of various states like combocid, snapshot with parallel
workers. It is by no means ready to do any complex test, ofcourse
I will work towards making it more robust both in terms of adding
more stuff and doing performance optimizations.
Thoughts/Suggestions?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
parallel_seqscan_v1.patchapplication/octet-stream; name=parallel_seqscan_v1.patchDownload+2112-6
On 12/04/2014 07:35 AM, Amit Kapila wrote:
[snip]
The number of worker backends that can be used for
parallel seq scan can be configured by using a new GUC
parallel_seqscan_degree, the default value of which is zero
and it means parallel seq scan will not be considered unless
user configures this value.
The number of parallel workers should be capped (of course!) at the
maximum amount of "processors" (cores/vCores, threads/hyperthreads)
available.
More over, when load goes up, the relative cost of parallel working
should go up as well.
Something like:
p = number of cores
l = 1min-load
additional_cost = tuple estimate * cpu_tuple_cost * (l+1)/(c-1)
(for c>1, of course)
In ExecutorStart phase, initiate the required number of workers
as per parallel seq scan plan and setup dynamic shared memory and
share the information required for worker to execute the scan.
Currently I have just shared the relId, targetlist and number
of blocks to be scanned by worker, however I think we might want
to generate a plan for each of the workers in master backend and
then share the same to individual worker.
[snip]
Attached patch is just to facilitate the discussion about the
parallel seq scan and may be some other dependent tasks like
sharing of various states like combocid, snapshot with parallel
workers. It is by no means ready to do any complex test, ofcourse
I will work towards making it more robust both in terms of adding
more stuff and doing performance optimizations.Thoughts/Suggestions?
Not directly (I haven't had the time to read the code yet), but I'm
thinking about the ability to simply *replace* executor methods from an
extension.
This could be an alternative to providing additional nodes that the
planner can include in the final plan tree, ready to be executed.
The parallel seq scan nodes are definitively the best approach for
"parallel query", since the planner can optimize them based on cost.
I'm wondering about the ability to modify the implementation of some
methods themselves once at execution time: given a previously planned
query, chances are that, at execution time (I'm specifically thinking
about prepared statements here), a different implementation of the same
"node" might be more suitable and could be used instead while the
condition holds.
If this latter line of thinking is too off-topic within this thread and
there is any interest, we can move the comments to another thread and
I'd begin work on a PoC patch. It might as well make sense to implement
the executor overloading mechanism alongide the custom plan API, though.
Any comments appreciated.
Thank you for your work, Amit
Regards,
/ J.L.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
José,
* José Luis Tallón (jltallon@adv-solutions.net) wrote:
On 12/04/2014 07:35 AM, Amit Kapila wrote:
The number of worker backends that can be used for
parallel seq scan can be configured by using a new GUC
parallel_seqscan_degree, the default value of which is zero
and it means parallel seq scan will not be considered unless
user configures this value.The number of parallel workers should be capped (of course!) at the
maximum amount of "processors" (cores/vCores, threads/hyperthreads)
available.More over, when load goes up, the relative cost of parallel working
should go up as well.
Something like:
p = number of cores
l = 1min-loadadditional_cost = tuple estimate * cpu_tuple_cost * (l+1)/(c-1)
(for c>1, of course)
While I agree in general that we'll need to come up with appropriate
acceptance criteria, etc, I don't think we want to complicate this patch
with that initially. A SUSET GUC which caps the parallel GUC would be
enough for an initial implementation, imv.
Not directly (I haven't had the time to read the code yet), but I'm
thinking about the ability to simply *replace* executor methods from
an extension.
You probably want to look at the CustomScan thread+patch directly then..
Thanks,
Stephen
Amit,
* Amit Kapila (amit.kapila16@gmail.com) wrote:
postgres=# explain select c1 from t1;
QUERY PLAN
------------------------------------------------------
Seq Scan on t1 (cost=0.00..101.00 rows=100 width=4)
(1 row)postgres=# set parallel_seqscan_degree=4;
SET
postgres=# explain select c1 from t1;
QUERY PLAN
--------------------------------------------------------------
Parallel Seq Scan on t1 (cost=0.00..25.25 rows=100 width=4)
Number of Workers: 4
Number of Blocks Per Workers: 25
(3 rows)
This is all great and interesting, but I feel like folks might be
waiting to see just what kind of performance results come from this (and
what kind of hardware is needed to see gains..). There's likely to be
situations where this change is an improvement while also being cases
where it makes things worse.
One really interesting case would be parallel seq scans which are
executing against foreign tables/FDWs..
Thanks!
Stephen
On 12/5/14, 9:08 AM, José Luis Tallón wrote:
More over, when load goes up, the relative cost of parallel working should go up as well.
Something like:
p = number of cores
l = 1min-loadadditional_cost = tuple estimate * cpu_tuple_cost * (l+1)/(c-1)
(for c>1, of course)
...
The parallel seq scan nodes are definitively the best approach for "parallel query", since the planner can optimize them based on cost.
I'm wondering about the ability to modify the implementation of some methods themselves once at execution time: given a previously planned query, chances are that, at execution time (I'm specifically thinking about prepared statements here), a different implementation of the same "node" might be more suitable and could be used instead while the condition holds.
These comments got me wondering... would it be better to decide on parallelism during execution instead of at plan time? That would allow us to dynamically scale parallelism based on system load. If we don't even consider parallelism until we've pulled some number of tuples/pages from a relation, this would also eliminate all parallel overhead on small relations.
--
Jim Nasby, Data Architect, Blue Treble Consulting
Data in Trouble? Get it in Treble! http://BlueTreble.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Fri, Dec 5, 2014 at 8:38 PM, José Luis Tallón <jltallon@adv-solutions.net>
wrote:
On 12/04/2014 07:35 AM, Amit Kapila wrote:
[snip]
The number of worker backends that can be used for
parallel seq scan can be configured by using a new GUC
parallel_seqscan_degree, the default value of which is zero
and it means parallel seq scan will not be considered unless
user configures this value.The number of parallel workers should be capped (of course!) at the
maximum amount of "processors" (cores/vCores, threads/hyperthreads)
available.
Also, it should consider MaxConnections configured by user.
More over, when load goes up, the relative cost of parallel working
should go up as well.
Something like:
p = number of cores
l = 1min-loadadditional_cost = tuple estimate * cpu_tuple_cost * (l+1)/(c-1)
(for c>1, of course)
How will you identify load in above formula and what is exactly 'c'
(is it parallel workers involved?).
For now, I have managed this simply by having a configuration
variable and it seems to me that the same should be good
enough for first version, we can definitely enhance it in future
version by dynamically allocating the number of workers based
on their availability and need of query, but I think lets leave that
for another day.
In ExecutorStart phase, initiate the required number of workers
as per parallel seq scan plan and setup dynamic shared memory and
share the information required for worker to execute the scan.
Currently I have just shared the relId, targetlist and number
of blocks to be scanned by worker, however I think we might want
to generate a plan for each of the workers in master backend and
then share the same to individual worker.[snip]
Attached patch is just to facilitate the discussion about the
parallel seq scan and may be some other dependent tasks like
sharing of various states like combocid, snapshot with parallel
workers. It is by no means ready to do any complex test, ofcourse
I will work towards making it more robust both in terms of adding
more stuff and doing performance optimizations.Thoughts/Suggestions?
Not directly (I haven't had the time to read the code yet), but I'm
thinking about the ability to simply *replace* executor methods from an
extension.
This could be an alternative to providing additional nodes that the
planner can include in the final plan tree, ready to be executed.
The parallel seq scan nodes are definitively the best approach for
"parallel query", since the planner can optimize them based on cost.
I'm wondering about the ability to modify the implementation of some
methods themselves once at execution time: given a previously planned
query, chances are that, at execution time (I'm specifically thinking about
prepared statements here), a different implementation of the same "node"
might be more suitable and could be used instead while the condition holds.
Idea sounds interesting and I think probably in some cases
different implementation of same node might help, but may be
at this stage if we focus on one kind of implementation (which is
a win for reasonable number of cases) and make it successful,
then doing alternative implementations will be comparatively
easier and have more chances of success.
If this latter line of thinking is too off-topic within this thread and
there is any interest, we can move the comments to another thread and I'd
begin work on a PoC patch. It might as well make sense to implement the
executor overloading mechanism alongide the custom plan API, though.
Sure, please go ahead which ever way you like to proceed.
If you want to contribute in this area/patch, then you are
welcome.
Any comments appreciated.
Thank you for your work, Amit
Many thanks to you as well for showing interest.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On 4 December 2014 at 19:35, Amit Kapila <amit.kapila16@gmail.com> wrote:
Attached patch is just to facilitate the discussion about the
parallel seq scan and may be some other dependent tasks like
sharing of various states like combocid, snapshot with parallel
workers. It is by no means ready to do any complex test, ofcourse
I will work towards making it more robust both in terms of adding
more stuff and doing performance optimizations.Thoughts/Suggestions?
This is good news!
I've not gotten to look at the patch yet, but I thought you may be able to
make use of the attached at some point.
It's bare-bones core support for allowing aggregate states to be merged
together with another aggregate state. I would imagine that if a query such
as:
SELECT MAX(value) FROM bigtable;
was run, then a series of parallel workers could go off and each find the
max value from their portion of the table and then perhaps some other node
type would then take all the intermediate results from the workers, once
they're finished, and join all of the aggregate states into one and return
that. Naturally, you'd need to check that all aggregates used in the
targetlist had a merge function first.
This is just a few hours of work. I've not really tested the pg_dump
support or anything yet. I've also not added any new functions to allow
AVG() or COUNT() to work, I've really just re-used existing functions where
I could, as things like MAX() and BOOL_OR() can just make use of the
existing transition function. I thought that this might be enough for early
tests.
I'd imagine such a workload, ignoring IO overhead, should scale pretty much
linearly with the number of worker processes. Of course, if there was a
GROUP BY clause then the merger code would have to perform more work.
If you think you might be able to make use of this, then I'm willing to go
off and write all the other merge functions required for the other
aggregates.
Regards
David Rowley
Attachments:
merge_aggregate_state_v1.patchapplication/octet-stream; name=merge_aggregate_state_v1.patchDownload+271-150
On Fri, Dec 5, 2014 at 8:46 PM, Stephen Frost <sfrost@snowman.net> wrote:
Amit,
* Amit Kapila (amit.kapila16@gmail.com) wrote:
postgres=# explain select c1 from t1;
QUERY PLAN
------------------------------------------------------
Seq Scan on t1 (cost=0.00..101.00 rows=100 width=4)
(1 row)postgres=# set parallel_seqscan_degree=4;
SET
postgres=# explain select c1 from t1;
QUERY PLAN
--------------------------------------------------------------
Parallel Seq Scan on t1 (cost=0.00..25.25 rows=100 width=4)
Number of Workers: 4
Number of Blocks Per Workers: 25
(3 rows)This is all great and interesting, but I feel like folks might be
waiting to see just what kind of performance results come from this (and
what kind of hardware is needed to see gains..).
Initially I was thinking that first we should discuss if the design
and idea used in patch is sane, but now as you have asked and
even Robert has asked the same off list to me, I will take the
performance data next week (Another reason why I have not
taken any data is that still the work to push qualification down
to workers is left which I feel is quite important). However I still
think if I get some feedback on some of the basic things like below,
it would be good.
1. As the patch currently stands, it just shares the relevant
data (like relid, target list, block range each worker should
perform on etc.) to the worker and then worker receives that
data and form the planned statement which it will execute and
send the results back to master backend. So the question
here is do you think it is reasonable or should we try to form
the complete plan for each worker and then share the same
and may be other information as well like range table entries
which are required. My personal gut feeling in this matter
is that for long term it might be better to form the complete
plan of each worker in master and share the same, however
I think the current way as done in patch (okay that needs
some improvement) is also not bad and quite easier to implement.
2. Next question related to above is what should be the
output of ExplainPlan, as currently worker is responsible
for forming its own plan, Explain Plan is not able to show
the detailed plan for each worker, is that okay?
3. Some places where optimizations are possible:
- Currently after getting the tuple from heap, it is deformed by
worker and sent via message queue to master backend, master
backend then forms the tuple and send it to upper layer which
before sending it to frontend again deforms it via slot_getallattrs(slot).
- Master backend currently receives the data from multiple workers
serially. We can optimize in a way that it can check other queues,
if there is no data in current queue.
- Master backend is just responsible for coordination among workers
It shares the required information to workers and then fetch the
data processed by each worker, by using some more logic, we might
be able to make master backend also fetch data from heap rather than
doing just co-ordination among workers.
I think in all above places we can do some optimisation, however
we can do that later as well, unless they hit the performance badly for
cases which people care most.
4. Should parallel_seqscan_degree value be dependent on other
backend processes like MaxConnections, max_worker_processes,
autovacuum_max_workers do or should it be independent like
max_wal_senders?
I think it is better to keep it dependent on other backend processes,
however for simplicity, I have kept it similar to max_wal_senders for now.
There's likely to be
situations where this change is an improvement while also being cases
where it makes things worse.
Agreed and I think that will be more clear after doing some
performance tests.
One really interesting case would be parallel seq scans which are
executing against foreign tables/FDWs..
Sure.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Fri, Dec 5, 2014 at 8:43 PM, Stephen Frost <sfrost@snowman.net> wrote:
José,
* José Luis Tallón (jltallon@adv-solutions.net) wrote:
On 12/04/2014 07:35 AM, Amit Kapila wrote:
The number of worker backends that can be used for
parallel seq scan can be configured by using a new GUC
parallel_seqscan_degree, the default value of which is zero
and it means parallel seq scan will not be considered unless
user configures this value.The number of parallel workers should be capped (of course!) at the
maximum amount of "processors" (cores/vCores, threads/hyperthreads)
available.More over, when load goes up, the relative cost of parallel working
should go up as well.
Something like:
p = number of cores
l = 1min-loadadditional_cost = tuple estimate * cpu_tuple_cost * (l+1)/(c-1)
(for c>1, of course)
While I agree in general that we'll need to come up with appropriate
acceptance criteria, etc, I don't think we want to complicate this patch
with that initially.A SUSET GUC which caps the parallel GUC would be
enough for an initial implementation, imv.
This is exactly what I have done in patch.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sat, Dec 6, 2014 at 12:27 AM, Jim Nasby <Jim.Nasby@bluetreble.com> wrote:
On 12/5/14, 9:08 AM, José Luis Tallón wrote:
More over, when load goes up, the relative cost of parallel working
should go up as well.
Something like:
p = number of cores
l = 1min-loadadditional_cost = tuple estimate * cpu_tuple_cost * (l+1)/(c-1)
(for c>1, of course)
...
The parallel seq scan nodes are definitively the best approach for
"parallel query", since the planner can optimize them based on cost.
I'm wondering about the ability to modify the implementation of some
methods themselves once at execution time: given a previously planned
query, chances are that, at execution time (I'm specifically thinking about
prepared statements here), a different implementation of the same "node"
might be more suitable and could be used instead while the condition holds.
These comments got me wondering... would it be better to decide on
parallelism during execution instead of at plan time? That would allow us
to dynamically scale parallelism based on system load. If we don't even
consider parallelism until we've pulled some number of tuples/pages from a
relation,
this would also eliminate all parallel overhead on small relations.
--
I think we have access to this information in planner (RelOptInfo -> pages),
if we want, we can use that to eliminate the small relations from
parallelism, but question is how big relations do we want to consider
for parallelism, one way is to check via tests which I am planning to
follow, do you think we have any heuristic which we can use to decide
how big relations should be consider for parallelism?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sat, Dec 6, 2014 at 10:43 AM, David Rowley <dgrowleyml@gmail.com> wrote:
On 4 December 2014 at 19:35, Amit Kapila <amit.kapila16@gmail.com> wrote:
Attached patch is just to facilitate the discussion about the
parallel seq scan and may be some other dependent tasks like
sharing of various states like combocid, snapshot with parallel
workers. It is by no means ready to do any complex test, ofcourse
I will work towards making it more robust both in terms of adding
more stuff and doing performance optimizations.Thoughts/Suggestions?
This is good news!
Thanks.
I've not gotten to look at the patch yet, but I thought you may be able
to make use of the attached at some point.
I also think so, that it can be used in near future to enhance
and provide more value to the parallel scan feature. Thanks
for taking the initiative to do the leg-work for supporting
aggregates.
It's bare-bones core support for allowing aggregate states to be merged
together with another aggregate state. I would imagine that if a query such
as:
SELECT MAX(value) FROM bigtable;
was run, then a series of parallel workers could go off and each find the
max value from their portion of the table and then perhaps some other node
type would then take all the intermediate results from the workers, once
they're finished, and join all of the aggregate states into one and return
that. Naturally, you'd need to check that all aggregates used in the
targetlist had a merge function first.
Direction sounds to be right.
This is just a few hours of work. I've not really tested the pg_dump
support or anything yet. I've also not added any new functions to allow
AVG() or COUNT() to work, I've really just re-used existing functions where
I could, as things like MAX() and BOOL_OR() can just make use of the
existing transition function. I thought that this might be enough for early
tests.
I'd imagine such a workload, ignoring IO overhead, should scale pretty
much linearly with the number of worker processes. Of course, if there was
a GROUP BY clause then the merger code would have to perform more work.
Agreed.
If you think you might be able to make use of this, then I'm willing to
go off and write all the other merge functions required for the other
aggregates.
Don't you think that first we should stabilize the basic (target list
and quals that can be independently evaluated by workers) parallel
scan and then jump to do such enhancements?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
* Amit Kapila (amit.kapila16@gmail.com) wrote:
1. As the patch currently stands, it just shares the relevant
data (like relid, target list, block range each worker should
perform on etc.) to the worker and then worker receives that
data and form the planned statement which it will execute and
send the results back to master backend. So the question
here is do you think it is reasonable or should we try to form
the complete plan for each worker and then share the same
and may be other information as well like range table entries
which are required. My personal gut feeling in this matter
is that for long term it might be better to form the complete
plan of each worker in master and share the same, however
I think the current way as done in patch (okay that needs
some improvement) is also not bad and quite easier to implement.
For my 2c, I'd like to see it support exactly what the SeqScan node
supports and then also what Foreign Scan supports. That would mean we'd
then be able to push filtering down to the workers which would be great.
Even better would be figuring out how to parallelize an Append node
(perhaps only possible when the nodes underneath are all SeqScan or
ForeignScan nodes) since that would allow us to then parallelize the
work across multiple tables and remote servers.
One of the big reasons why I was asking about performance data is that,
today, we can't easily split a single relation across multiple i/o
channels. Sure, we can use RAID and get the i/o channel that the table
sits on faster than a single disk and possibly fast enough that a single
CPU can't keep up, but that's not quite the same. The historical
recommendations for Hadoop nodes is around one CPU per drive (of course,
it'll depend on workload, etc, etc, but still) and while there's still a
lot of testing, etc, to be done before we can be sure about the 'right'
answer for PG (and it'll also vary based on workload, etc), that strikes
me as a pretty reasonable rule-of-thumb to go on.
Of course, I'm aware that this won't be as easy to implement..
2. Next question related to above is what should be the
output of ExplainPlan, as currently worker is responsible
for forming its own plan, Explain Plan is not able to show
the detailed plan for each worker, is that okay?
I'm not entirely following this. How can the worker be responsible for
its own "plan" when the information passed to it (per the above
paragraph..) is pretty minimal? In general, I don't think we need to
have specifics like "this worker is going to do exactly X" because we
will eventually need some communication to happen between the worker and
the master process where the worker can ask for more work because it's
finished what it was tasked with and the master will need to give it
another chunk of work to do. I don't think we want exactly what each
worker process will do to be fully formed at the outset because, even
with the best information available, given concurrent load on the
system, it's not going to be perfect and we'll end up starving workers.
The plan, as formed by the master, should be more along the lines of
"this is what I'm gonna have my workers do" along w/ how many workers,
etc, and then it goes and does it. Perhaps for an 'explain analyze' we
return information about what workers actually *did* what, but that's a
whole different discussion.
3. Some places where optimizations are possible:
- Currently after getting the tuple from heap, it is deformed by
worker and sent via message queue to master backend, master
backend then forms the tuple and send it to upper layer which
before sending it to frontend again deforms it via slot_getallattrs(slot).
If this is done as I was proposing above, we might be able to avoid
this, but I don't know that it's a huge issue either way.. The bigger
issue is getting the filtering pushed down.
- Master backend currently receives the data from multiple workers
serially. We can optimize in a way that it can check other queues,
if there is no data in current queue.
Yes, this is pretty critical. In fact, it's one of the recommendations
I made previously about how to change the Append node to parallelize
Foreign Scan node work.
- Master backend is just responsible for coordination among workers
It shares the required information to workers and then fetch the
data processed by each worker, by using some more logic, we might
be able to make master backend also fetch data from heap rather than
doing just co-ordination among workers.
I don't think this is really necessary...
I think in all above places we can do some optimisation, however
we can do that later as well, unless they hit the performance badly for
cases which people care most.
I agree that we can improve the performance through various
optimizations later, but it's important to get the general structure and
design right or we'll end up having to reimplement a lot of it.
4. Should parallel_seqscan_degree value be dependent on other
backend processes like MaxConnections, max_worker_processes,
autovacuum_max_workers do or should it be independent like
max_wal_senders?
Well, we're not going to be able to spin off more workers than we have
process slots, but I'm not sure we need anything more than that? In any
case, this is definitely an area we can work on improving later and I
don't think it really impacts the rest of the design.
Thanks,
Stephen
On Sat, Dec 6, 2014 at 5:37 PM, Stephen Frost <sfrost@snowman.net> wrote:
* Amit Kapila (amit.kapila16@gmail.com) wrote:
1. As the patch currently stands, it just shares the relevant
data (like relid, target list, block range each worker should
perform on etc.) to the worker and then worker receives that
data and form the planned statement which it will execute and
send the results back to master backend. So the question
here is do you think it is reasonable or should we try to form
the complete plan for each worker and then share the same
and may be other information as well like range table entries
which are required. My personal gut feeling in this matter
is that for long term it might be better to form the complete
plan of each worker in master and share the same, however
I think the current way as done in patch (okay that needs
some improvement) is also not bad and quite easier to implement.For my 2c, I'd like to see it support exactly what the SeqScan node
supports and then also what Foreign Scan supports. That would mean we'd
then be able to push filtering down to the workers which would be great.
Even better would be figuring out how to parallelize an Append node
(perhaps only possible when the nodes underneath are all SeqScan or
ForeignScan nodes) since that would allow us to then parallelize the
work across multiple tables and remote servers.One of the big reasons why I was asking about performance data is that,
today, we can't easily split a single relation across multiple i/o
channels. Sure, we can use RAID and get the i/o channel that the table
sits on faster than a single disk and possibly fast enough that a single
CPU can't keep up, but that's not quite the same. The historical
recommendations for Hadoop nodes is around one CPU per drive (of course,
it'll depend on workload, etc, etc, but still) and while there's still a
lot of testing, etc, to be done before we can be sure about the 'right'
answer for PG (and it'll also vary based on workload, etc), that strikes
me as a pretty reasonable rule-of-thumb to go on.Of course, I'm aware that this won't be as easy to implement..
2. Next question related to above is what should be the
output of ExplainPlan, as currently worker is responsible
for forming its own plan, Explain Plan is not able to show
the detailed plan for each worker, is that okay?I'm not entirely following this. How can the worker be responsible for
its own "plan" when the information passed to it (per the above
paragraph..) is pretty minimal?
Because for a simple sequence scan that much information is sufficient,
basically if we have scanrelid, target list, qual and then RTE (primarily
relOid), then worker can form and perform sequence scan.
In general, I don't think we need to
have specifics like "this worker is going to do exactly X" because we
will eventually need some communication to happen between the worker and
the master process where the worker can ask for more work because it's
finished what it was tasked with and the master will need to give it
another chunk of work to do. I don't think we want exactly what each
worker process will do to be fully formed at the outset because, even
with the best information available, given concurrent load on the
system, it's not going to be perfect and we'll end up starving workers.
The plan, as formed by the master, should be more along the lines of
"this is what I'm gonna have my workers do" along w/ how many workers,
etc, and then it goes and does it.
I think here you want to say that work allocation for workers should be
dynamic rather fixed which I think makes sense, however we can try
such an optimization after some initial performance data.
Perhaps for an 'explain analyze' we
return information about what workers actually *did* what, but that's a
whole different discussion.
Agreed.
3. Some places where optimizations are possible:
- Currently after getting the tuple from heap, it is deformed by
worker and sent via message queue to master backend, master
backend then forms the tuple and send it to upper layer which
before sending it to frontend again deforms it via
slot_getallattrs(slot).
If this is done as I was proposing above, we might be able to avoid
this, but I don't know that it's a huge issue either way.. The bigger
issue is getting the filtering pushed down.- Master backend currently receives the data from multiple workers
serially. We can optimize in a way that it can check other queues,
if there is no data in current queue.Yes, this is pretty critical. In fact, it's one of the recommendations
I made previously about how to change the Append node to parallelize
Foreign Scan node work.- Master backend is just responsible for coordination among workers
It shares the required information to workers and then fetch the
data processed by each worker, by using some more logic, we might
be able to make master backend also fetch data from heap rather than
doing just co-ordination among workers.I don't think this is really necessary...
I think in all above places we can do some optimisation, however
we can do that later as well, unless they hit the performance badly for
cases which people care most.I agree that we can improve the performance through various
optimizations later, but it's important to get the general structure and
design right or we'll end up having to reimplement a lot of it.
So to summarize my understanding, below are the set of things
which I should work on and in the order they are listed.
1. Push down qualification
2. Performance Data
3. Improve the way to push down the information related to worker.
4. Dynamic allocation of work for workers.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Sat, Dec 6, 2014 at 12:13 AM, David Rowley <dgrowleyml@gmail.com> wrote:
It's bare-bones core support for allowing aggregate states to be merged
together with another aggregate state. I would imagine that if a query such
as:SELECT MAX(value) FROM bigtable;
was run, then a series of parallel workers could go off and each find the
max value from their portion of the table and then perhaps some other node
type would then take all the intermediate results from the workers, once
they're finished, and join all of the aggregate states into one and return
that. Naturally, you'd need to check that all aggregates used in the
targetlist had a merge function first.
I think this is great infrastructure and could also be useful for
pushing down aggregates in cases involving foreign data wrappers. But
I suggest we discuss it on a separate thread because it's not related
to parallel seq scan per se.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Dec 6, 2014 at 1:50 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I think we have access to this information in planner (RelOptInfo -> pages),
if we want, we can use that to eliminate the small relations from
parallelism, but question is how big relations do we want to consider
for parallelism, one way is to check via tests which I am planning to
follow, do you think we have any heuristic which we can use to decide
how big relations should be consider for parallelism?
Surely the Path machinery needs to decide this in particular cases
based on cost. We should assign some cost to starting a parallel
worker via some new GUC, like parallel_startup_cost = 100,000. And
then we should also assign a cost to the act of relaying a tuple from
the parallel worker to the master, maybe cpu_tuple_cost (or some new
GUC). For a small relation, or a query with a LIMIT clause, the
parallel startup cost will make starting a lot of workers look
unattractive, but for bigger relations it will make sense from a cost
perspective, which is exactly what we want.
There are probably other important considerations based on goals for
overall resource utilization, and also because at a certain point
adding more workers won't help because the disk will be saturated. I
don't know exactly what we should do about those issues yet, but the
steps described in the previous paragraph seem like a good place to
start anyway.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Sat, Dec 6, 2014 at 7:07 AM, Stephen Frost <sfrost@snowman.net> wrote:
For my 2c, I'd like to see it support exactly what the SeqScan node
supports and then also what Foreign Scan supports. That would mean we'd
then be able to push filtering down to the workers which would be great.
Even better would be figuring out how to parallelize an Append node
(perhaps only possible when the nodes underneath are all SeqScan or
ForeignScan nodes) since that would allow us to then parallelize the
work across multiple tables and remote servers.
I don't see how we can support the stuff ForeignScan does; presumably
any parallelism there is up to the FDW to implement, using whatever
in-core tools we provide. I do agree that parallelizing Append nodes
is useful; but let's get one thing done first before we start trying
to do thing #2.
I'm not entirely following this. How can the worker be responsible for
its own "plan" when the information passed to it (per the above
paragraph..) is pretty minimal? In general, I don't think we need to
have specifics like "this worker is going to do exactly X" because we
will eventually need some communication to happen between the worker and
the master process where the worker can ask for more work because it's
finished what it was tasked with and the master will need to give it
another chunk of work to do. I don't think we want exactly what each
worker process will do to be fully formed at the outset because, even
with the best information available, given concurrent load on the
system, it's not going to be perfect and we'll end up starving workers.
The plan, as formed by the master, should be more along the lines of
"this is what I'm gonna have my workers do" along w/ how many workers,
etc, and then it goes and does it. Perhaps for an 'explain analyze' we
return information about what workers actually *did* what, but that's a
whole different discussion.
I agree with this. For a first version, I think it's OK to start a
worker up for a particular sequential scan and have it help with that
sequential scan until the scan is completed, and then exit. It should
not, as the present version of the patch does, assign a fixed block
range to each worker; instead, workers should allocate a block or
chunk of blocks to work on until no blocks remain. That way, even if
every worker but one gets stuck, the rest of the scan can still
finish.
Eventually, we will want to be smarter about sharing works between
multiple parts of the plan, but I think it is just fine to leave that
as a future enhancement for now.
- Master backend is just responsible for coordination among workers
It shares the required information to workers and then fetch the
data processed by each worker, by using some more logic, we might
be able to make master backend also fetch data from heap rather than
doing just co-ordination among workers.I don't think this is really necessary...
I think it would be an awfully good idea to make this work. The
master thread may be significantly faster than any of the others
because it has no IPC costs. We don't want to leave our best resource
sitting on the bench.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Dec 8, 2014 at 11:21 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Sat, Dec 6, 2014 at 1:50 AM, Amit Kapila <amit.kapila16@gmail.com>
wrote:
I think we have access to this information in planner (RelOptInfo ->
pages),
if we want, we can use that to eliminate the small relations from
parallelism, but question is how big relations do we want to consider
for parallelism, one way is to check via tests which I am planning to
follow, do you think we have any heuristic which we can use to decide
how big relations should be consider for parallelism?Surely the Path machinery needs to decide this in particular cases
based on cost. We should assign some cost to starting a parallel
worker via some new GUC, like parallel_startup_cost = 100,000. And
then we should also assign a cost to the act of relaying a tuple from
the parallel worker to the master, maybe cpu_tuple_cost (or some new
GUC). For a small relation, or a query with a LIMIT clause, the
parallel startup cost will make starting a lot of workers look
unattractive, but for bigger relations it will make sense from a cost
perspective, which is exactly what we want.
Sounds sensible. cpu_tuple_cost is already used for some other
purpose so not sure if it is right thing to override that parameter,
how about cpu_tuple_communication_cost or cpu_tuple_comm_cost.
There are probably other important considerations based on goals for
overall resource utilization, and also because at a certain point
adding more workers won't help because the disk will be saturated. I
don't know exactly what we should do about those issues yet, but the
steps described in the previous paragraph seem like a good place to
start anyway.
Agreed.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Mon, Dec 8, 2014 at 11:27 PM, Robert Haas <robertmhaas@gmail.com> wrote:
On Sat, Dec 6, 2014 at 7:07 AM, Stephen Frost <sfrost@snowman.net> wrote:
For my 2c, I'd like to see it support exactly what the SeqScan node
supports and then also what Foreign Scan supports. That would mean we'd
then be able to push filtering down to the workers which would be great.
Even better would be figuring out how to parallelize an Append node
(perhaps only possible when the nodes underneath are all SeqScan or
ForeignScan nodes) since that would allow us to then parallelize the
work across multiple tables and remote servers.I don't see how we can support the stuff ForeignScan does; presumably
any parallelism there is up to the FDW to implement, using whatever
in-core tools we provide. I do agree that parallelizing Append nodes
is useful; but let's get one thing done first before we start trying
to do thing #2.I'm not entirely following this. How can the worker be responsible for
its own "plan" when the information passed to it (per the above
paragraph..) is pretty minimal? In general, I don't think we need to
have specifics like "this worker is going to do exactly X" because we
will eventually need some communication to happen between the worker and
the master process where the worker can ask for more work because it's
finished what it was tasked with and the master will need to give it
another chunk of work to do. I don't think we want exactly what each
worker process will do to be fully formed at the outset because, even
with the best information available, given concurrent load on the
system, it's not going to be perfect and we'll end up starving workers.
The plan, as formed by the master, should be more along the lines of
"this is what I'm gonna have my workers do" along w/ how many workers,
etc, and then it goes and does it. Perhaps for an 'explain analyze' we
return information about what workers actually *did* what, but that's a
whole different discussion.I agree with this. For a first version, I think it's OK to start a
worker up for a particular sequential scan and have it help with that
sequential scan until the scan is completed, and then exit. It should
not, as the present version of the patch does, assign a fixed block
range to each worker; instead, workers should allocate a block or
chunk of blocks to work on until no blocks remain. That way, even if
every worker but one gets stuck, the rest of the scan can still
finish.
I will check on this point and see if it is feasible to do something on
those lines, basically currently at Executor initialization phase, we
set the scan limits and then during Executor Run phase use
heap_getnext to fetch the tuples accordingly, but doing it dynamically
means at ExecutorRun phase we need to reset the scan limit for
which page/pages to scan, still I have to check if there is any problem
with such an idea. Do you any different idea in mind?
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
On Tue, Dec 9, 2014 at 12:46 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:
I agree with this. For a first version, I think it's OK to start a
worker up for a particular sequential scan and have it help with that
sequential scan until the scan is completed, and then exit. It should
not, as the present version of the patch does, assign a fixed block
range to each worker; instead, workers should allocate a block or
chunk of blocks to work on until no blocks remain. That way, even if
every worker but one gets stuck, the rest of the scan can still
finish.I will check on this point and see if it is feasible to do something on
those lines, basically currently at Executor initialization phase, we
set the scan limits and then during Executor Run phase use
heap_getnext to fetch the tuples accordingly, but doing it dynamically
means at ExecutorRun phase we need to reset the scan limit for
which page/pages to scan, still I have to check if there is any problem
with such an idea. Do you any different idea in mind?
Hmm. Well, it looks like there are basically two choices: you can
either (as you propose) deal with this above the level of the
heap_beginscan/heap_getnext API by scanning one or a few pages at a
time and then resetting the scan to a new starting page via
heap_setscanlimits; or alternatively, you can add a callback to
HeapScanDescData that, if non-NULL, will be invoked to get the next
block number to scan. I'm not entirely sure which is better.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Mon, Dec 8, 2014 at 10:40 AM, Amit Kapila <amit.kapila16@gmail.com>
wrote:
On Sat, Dec 6, 2014 at 5:37 PM, Stephen Frost <sfrost@snowman.net> wrote:
So to summarize my understanding, below are the set of things
which I should work on and in the order they are listed.1. Push down qualification
2. Performance Data
3. Improve the way to push down the information related to worker.
4. Dynamic allocation of work for workers.
I have worked on the patch to accomplish above mentioned points
1, 2 and partly 3 and would like to share the progress with community.
If the statement contain quals that don't have volatile functions, then
they will be pushed down and the parallel can will be considered for
cost evaluation. I think eventually we might need some better way
to decide about which kind of functions are okay to be pushed.
I have also unified the way information is passed from master backend
to worker backends which is convert each node to string that has to be
passed and then later workers convert string to node, this has simplified
the related code.
I have taken performance data for different selectivity and complexity of
qual expressions, I understand that there will be other kind of scenario's
which we need to consider, however I think the current set of tests is good
place to start, please feel free to comment on kind of scenario's which you
want me to check
Performance Data
------------------------------
*m/c details*
IBM POWER-8 24 cores, 192 hardware threads
RAM = 492GB
*non-default settings in postgresql.conf*
max_connections=300
shared_buffers = 8GB
checkpoint_segments = 300
checkpoint_timeout = 30min
max_worker_processes=100
create table tbl_perf(c1 int, c2 char(1000));
30 million rows
------------------------
insert into tbl_perf values(generate_series(1,10000000),'aaaaa');
insert into tbl_perf values(generate_series(10000000,30000000),'aaaaa');
Function used in quals
-----------------------------------
A simple function which will perform some calculation and return
the value passed which can be used in qual condition.
create or replace function calc_factorial(a integer, fact_val integer)
returns integer
as $$
begin
perform (fact_val)!;
return a;
end;
$$ language plpgsql STABLE;
In below data,
num_workers - number of parallel workers configured using
parallel_seqscan_degree. 0, means it will execute sequence
scan and greater than 0 means parallel sequence scan.
exec_time - Execution Time given by Explain Analyze statement.
*Tests having quals containing function evaluation in qual*
*expressions.*
*Test-1*
*Query -* Explain analyze select c1 from tbl_perf where
c1 > calc_factorial(29700000,10) and c2 like '%aa%';
*Selection_criteria – *1% of rows will be selected
*num_workers* *exec_time (ms)* 0 229534 2 121741 4 67051 8 35607 16
24743
*Test-2*
*Query - *Explain analyze select c1 from tbl_perf where
c1 > calc_factorial(27000000,10) and c2 like '%aa%';
*Selection_criteria – *10% of rows will be selected
*num_workers* *exec_time (ms)* 0 226671 2 151587 4 93648 8 70540 16
55466
*Test-3*
*Query -* Explain analyze select c1 from tbl_perf
where c1 > calc_factorial(22500000,10) and c2 like '%aa%';
*Selection_criteria –* 25% of rows will be selected
*num_workers* *exec_time (ms)* 0 232673 2 197609 4 142686 8 111664 16
98097
*Tests having quals containing simple expressions in qual.*
*Test-4*
*Query - *Explain analyze select c1 from tbl_perf
where c1 > 29700000 and c2 like '%aa%';
*Selection_criteria –* 1% of rows will be selected
*num_workers* *exec_time (ms)* 0 15505 2 9155 4 6030 8 4523 16 4459
32 8259 64 13388
*Test-5*
*Query - *Explain analyze select c1 from tbl_perf
where c1 > 28500000 and c2 like '%aa%';
*Selection_criteria –* 5% of rows will be selected
*num_workers* *exec_time (ms)* 0 18906 2 13446 4 8970 8 7887 16 10403
*Test-6*
*Query -* Explain analyze select c1 from tbl_perf
where c1 > 27000000 and c2 like '%aa%';
*Selection_criteria – *10% of rows will be selected
*num_workers* *exec_time (ms)* 0 16132 2 23780 4 20275 8 11390 16
11418
Conclusion
------------------
1. Parallel workers help a lot when there is an expensive qualification
to evaluated, the more expensive the qualification the more better are
results.
2. It works well for low selectivity quals and as the selectivity increases,
the benefit tends to go down due to additional tuple communication cost
between workers and master backend.
3. After certain point, increasing having more number of workers won't
help and rather have negative impact, refer Test-4.
I think as discussed previously we need to introduce 2 additional cost
variables (parallel_startup_cost, cpu_tuple_communication_cost) to
estimate the parallel seq scan cost so that when the tables are small
or selectivity is high, it should increase the cost of parallel plan.
Thoughts and feedback for the current state of patch is welcome.
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com