Parallel Inserts in CREATE TABLE AS
Hi,
The idea of this patch is to allow the leader and each worker insert the
tuples in parallel if the SELECT part of the CTAS is parallelizable. Along
with the parallel inserts, if the CTAS code path is allowed to do
table_multi_insert()[1]For table_multi_insert() in CTAS, I used an in-progress patch available at /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com, then the gain we achieve is as follows:
For a table with 2 integer columns, 100million tuples(more testing results
are at [2]Table with 2 integer columns, 100million tuples, with leader participation,with default postgresql.conf file. All readings are of triplet form - (workers, exec time in sec, improvement). case 1: no multi inserts - (0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X) case 2: with multi inserts - (0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X) case 3: same table but unlogged with multi inserts - (0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X)), the exec time on the HEAD is *120sec*, where as with the
parallelism patch proposed here and multi insert patch [1]For table_multi_insert() in CTAS, I used an in-progress patch available at /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com, with 3 workers
and leader participation the exec time is *22sec(5.45X)*. With the current
CTAS code which does single tuple insert(see intorel_receive()), the
performance gain is limited to ~1.7X with parallelism. This is due to the
fact that the workers contend more for locks on buffer pages while
extending the table. So, the maximum benefit we could get for CTAS is with
both parallelism and multi tuple inserts.
The design:
Let the planner know that the SELECT is from CTAS in createas.c so that it
can set the number of tuples transferred from the workers to Gather node to
0. With this change, there are chances that the planner may choose the
parallel plan. After the planning, check if the upper plan node is Gather
in createas.c and mark a parallelism flag in the CTAS dest receiver. Pass
the into clause, object id, command id from the leader to workers, so that
each worker can create its own CTAS dest receiver. Leader inserts it's
share of tuples if instructed to do, and so are workers. Each worker writes
atomically it's number of inserted tuples into a shared memory variable,
the leader combines this with it's own number of inserted tuples and shares
to the client.
Below things are still pending. Thoughts are most welcome:
1. How better we can lift the "cannot insert tuples in a parallel worker"
from heap_prepare_insert() for only CTAS cases or for that matter parallel
copy? How about having a variable in any of the worker global contexts and
use that? Of course, we can remove this restriction entirely in case we
fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY.
2. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having some
textual info along with the Gather node?
-----------------------------------------------------------------------------
Gather (cost=1000.00..108738.90 rows=0 width=8)
Workers Planned: 2
-> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954
width=8)
Filter: (many < 10000)
-----------------------------------------------------------------------------
3. Need to restrict parallel inserts, if CTAS tries to create temp/global
tables as the workers will not have access to those tables. Need to analyze
whether to allow parallelism if CTAS has prepared statements or with no
data.
4. Need to stop unnecessary parallel shared state such as tuple queue being
created and shared to workers.
5. Addition of new test cases. Testing with more scenarios and different
data sets, sizes, tablespaces, select into. Analysis on the 2 mismatches in
write_parallel.sql regression test.
Thoughts?
Credits:
1. Thanks to DIlip Kumar for the main design idea and the discussions.
Thanks to Vignesh for the discussions.
2. Patch development, testing is by me.
3. Thanks to the authors of table_multi_insert() in CTAS patch [1]For table_multi_insert() in CTAS, I used an in-progress patch available at /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com.
[1]: For table_multi_insert() in CTAS, I used an in-progress patch available at /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com
available at
/messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com
[2]: Table with 2 integer columns, 100million tuples, with leader participation,with default postgresql.conf file. All readings are of triplet form - (workers, exec time in sec, improvement). case 1: no multi inserts - (0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X) case 2: with multi inserts - (0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X) case 3: same table but unlogged with multi inserts - (0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X)
participation,with default postgresql.conf file. All readings are of
triplet form - (workers, exec time in sec, improvement).
case 1: no multi inserts -
(0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X)
case 2: with multi inserts -
(0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X)
case 3: same table but unlogged with multi inserts -
(0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X)
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v1-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchapplication/x-patch; name=v1-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchDownload+374-126
[1] - For table_multi_insert() in CTAS, I used an in-progress patch available at /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com
[2] - Table with 2 integer columns, 100million tuples, with leader participation,with default postgresql.conf file. All readings are of triplet form - (workers, exec time in sec, improvement).
case 1: no multi inserts - (0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X)
case 2: with multi inserts - (0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X)
case 3: same table but unlogged with multi inserts - (0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X)
I feel this enhancement could give good improvement, +1 for this.
Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
Hi,
On 2020-09-23 17:20:20 +0530, Bharath Rupireddy wrote:
The idea of this patch is to allow the leader and each worker insert the
tuples in parallel if the SELECT part of the CTAS is parallelizable.
Cool!
The design:
I think it'd be good if you could explain a bit more why you think this
safe to do in the way you have done it.
E.g. from a quick scroll through the patch, there's not even a comment
explaining that the only reason there doesn't need to be code dealing
with xid assignment because we already did the catalog changes to create
the table. But how does that work for SELECT INTO? Are you prohibiting
that? ...
Pass the into clause, object id, command id from the leader to
workers, so that each worker can create its own CTAS dest
receiver. Leader inserts it's share of tuples if instructed to do, and
so are workers. Each worker writes atomically it's number of inserted
tuples into a shared memory variable, the leader combines this with
it's own number of inserted tuples and shares to the client.Below things are still pending. Thoughts are most welcome:
1. How better we can lift the "cannot insert tuples in a parallel worker"
from heap_prepare_insert() for only CTAS cases or for that matter parallel
copy? How about having a variable in any of the worker global contexts and
use that? Of course, we can remove this restriction entirely in case we
fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY.
I have mentioned before that I think it'd be good if we changed the
insert APIs to have a more 'scan' like structure. I am thinking of
something like
TableInsertScan* table_begin_insert(Relation);
table_tuple_insert(TableInsertScan *is, other, args);
table_multi_insert(TableInsertScan *is, other, args);
table_end_insert(TableInsertScan *);
that'd then replace the BulkInsertStateData logic we have right now. But
more importantly it'd allow an AM to optimize operations across multiple
inserts, which is important for column stores.
And for the purpose of your question, we could then have a
table_insert_allow_parallel(TableInsertScan *);
or an additional arg to table_begin_insert().
3. Need to restrict parallel inserts, if CTAS tries to create temp/global
tables as the workers will not have access to those tables. Need to analyze
whether to allow parallelism if CTAS has prepared statements or with no
data.
In which case does CTAS not create a table? You definitely need to
ensure that the table is created before your workers are started, and
there needs to be in a different CommandId.
Greetings,
Andres Freund
Thanks Andres for the comments.
On Thu, Sep 24, 2020 at 8:11 AM Andres Freund <andres@anarazel.de> wrote:
The design:
I think it'd be good if you could explain a bit more why you think this
safe to do in the way you have done it.E.g. from a quick scroll through the patch, there's not even a comment
explaining that the only reason there doesn't need to be code dealing
with xid assignment because we already did the catalog changes to create
the table.
Yes we do a bunch of catalog changes related to the created new table.
We will have both the txn id and command id assigned when catalogue
changes are being made. But, right after the table is created in the
leader, the command id is incremented (CommandCounterIncrement() is
called from create_ctas_internal()) whereas the txn id remains the
same. The new command id is marked as GetCurrentCommandId(true); in
intorel_startup, then the parallel mode is entered. The txn id and
command id are serialized into parallel DSM, they are then available
to all parallel workers. This is discussed in [1]/messages/by-id/CAJcOf-fn1nhEtaU91NvRuA3EbvbJGACMd4_c+Uu3XU5VMv37Aw@mail.gmail.com.
Few changes I have to make in the parallel worker code: set
currentCommandIdUsed = true;, may be via a common API
SetCurrentCommandIdUsedForWorker() proposed in [1]/messages/by-id/CAJcOf-fn1nhEtaU91NvRuA3EbvbJGACMd4_c+Uu3XU5VMv37Aw@mail.gmail.com and remove the
extra command id sharing from the leader to workers.
I will add a few comments in the upcoming patch related to the above info.
But how does that work for SELECT INTO? Are you prohibiting
that? ...
In case of SELECT INTO, a new table gets created and I'm not
prohibiting the parallel inserts and I think we don't need to.
Thoughts?
Below things are still pending. Thoughts are most welcome:
1. How better we can lift the "cannot insert tuples in a parallel worker"
from heap_prepare_insert() for only CTAS cases or for that matter parallel
copy? How about having a variable in any of the worker global contexts and
use that? Of course, we can remove this restriction entirely in case we
fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY.And for the purpose of your question, we could then have a
table_insert_allow_parallel(TableInsertScan *);
or an additional arg to table_begin_insert().
Removing "cannot insert tuples in a parallel worker" restriction from
heap_prepare_insert() is a common problem for parallel inserts in
general, i.e. parallel inserts in CTAS, parallel INSERT INTO
SELECTs[1]/messages/by-id/CAJcOf-fn1nhEtaU91NvRuA3EbvbJGACMd4_c+Uu3XU5VMv37Aw@mail.gmail.com and parallel copy[2]/messages/by-id/CAA4eK1+kpddvvLxWm4BuG_AhVvYz8mKAEa7osxp_X0d4ZEiV=g@mail.gmail.com. It will be good if a common solution
is agreed.
3. Need to restrict parallel inserts, if CTAS tries to create temp/global
tables as the workers will not have access to those tables. Need to analyze
whether to allow parallelism if CTAS has prepared statements or with no
data.In which case does CTAS not create a table?
AFAICS, the table gets created in all the cases but the insertion of
the data gets skipped if the user specifies "with no data" option in
which case the select part is not even planned, and so the parallelism
will also not be picked.
You definitely need to
ensure that the table is created before your workers are started, and
there needs to be in a different CommandId.
Yeah, this is already being done. Table gets created in the
leader(intorel_startup which gets called from dest->rStartup(dest in
standard_ExecutorRun()) before entering the parallel mode.
[1]: /messages/by-id/CAJcOf-fn1nhEtaU91NvRuA3EbvbJGACMd4_c+Uu3XU5VMv37Aw@mail.gmail.com
[2]: /messages/by-id/CAA4eK1+kpddvvLxWm4BuG_AhVvYz8mKAEa7osxp_X0d4ZEiV=g@mail.gmail.com
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Mon, Sep 28, 2020 at 3:58 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
Thanks Andres for the comments.
On Thu, Sep 24, 2020 at 8:11 AM Andres Freund <andres@anarazel.de> wrote:
The design:
I think it'd be good if you could explain a bit more why you think this
safe to do in the way you have done it.E.g. from a quick scroll through the patch, there's not even a comment
explaining that the only reason there doesn't need to be code dealing
with xid assignment because we already did the catalog changes to create
the table.Yes we do a bunch of catalog changes related to the created new table.
We will have both the txn id and command id assigned when catalogue
changes are being made. But, right after the table is created in the
leader, the command id is incremented (CommandCounterIncrement() is
called from create_ctas_internal()) whereas the txn id remains the
same. The new command id is marked as GetCurrentCommandId(true); in
intorel_startup, then the parallel mode is entered. The txn id and
command id are serialized into parallel DSM, they are then available
to all parallel workers. This is discussed in [1].Few changes I have to make in the parallel worker code: set
currentCommandIdUsed = true;, may be via a common API
SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the
extra command id sharing from the leader to workers.I will add a few comments in the upcoming patch related to the above info.
Yes, that would be good.
But how does that work for SELECT INTO? Are you prohibiting
that? ...In case of SELECT INTO, a new table gets created and I'm not
prohibiting the parallel inserts and I think we don't need to.
So, in this case, also do we ensure that table is created before we
launch the workers. If so, I think you can explain in comments about
it and what you need to do that to ensure the same.
While skimming through the patch, a small thing I noticed:
+ /*
+ * SELECT part of the CTAS is parallelizable, so we can make
+ * each parallel worker insert the tuples that are resulted
+ * in it's execution into the target table.
+ */
+ if (!is_matview &&
+ IsA(plan->planTree, Gather))
+ ((DR_intorel *) dest)->is_parallel = true;
+
I am not sure at this stage if this is the best way to make CTAS as
parallel but if so, then probably you can expand the comments a bit to
say why you consider only Gather node (and that too when it is the
top-most node) and why not another parallel node like GatherMerge?
Thoughts?
Below things are still pending. Thoughts are most welcome:
1. How better we can lift the "cannot insert tuples in a parallel worker"
from heap_prepare_insert() for only CTAS cases or for that matter parallel
copy? How about having a variable in any of the worker global contexts and
use that? Of course, we can remove this restriction entirely in case we
fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY.And for the purpose of your question, we could then have a
table_insert_allow_parallel(TableInsertScan *);
or an additional arg to table_begin_insert().Removing "cannot insert tuples in a parallel worker" restriction from
heap_prepare_insert() is a common problem for parallel inserts in
general, i.e. parallel inserts in CTAS, parallel INSERT INTO
SELECTs[1] and parallel copy[2]. It will be good if a common solution
is agreed.
Right, for now, I think you can simply remove that check from the code
instead of just commenting it. We will see if there is a better
check/Assert we can add there.
--
With Regards,
Amit Kapila.
On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Yes we do a bunch of catalog changes related to the created new table.
We will have both the txn id and command id assigned when catalogue
changes are being made. But, right after the table is created in the
leader, the command id is incremented (CommandCounterIncrement() is
called from create_ctas_internal()) whereas the txn id remains the
same. The new command id is marked as GetCurrentCommandId(true); in
intorel_startup, then the parallel mode is entered. The txn id and
command id are serialized into parallel DSM, they are then available
to all parallel workers. This is discussed in [1].Few changes I have to make in the parallel worker code: set
currentCommandIdUsed = true;, may be via a common API
SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the
extra command id sharing from the leader to workers.I will add a few comments in the upcoming patch related to the above info.
Yes, that would be good.
Added comments.
But how does that work for SELECT INTO? Are you prohibiting
that? ...In case of SELECT INTO, a new table gets created and I'm not
prohibiting the parallel inserts and I think we don't need to.So, in this case, also do we ensure that table is created before we
launch the workers. If so, I think you can explain in comments about
it and what you need to do that to ensure the same.
For SELECT INTO, the table gets created by the leader in
create_ctas_internal(), then ExecInitParallelPlan() gets called which
launches the workers and then the leader(if asked to do so) and the
workers insert the rows. So, we don't need to do any extra work to
ensure the table gets created before the workers start inserting
tuples.
While skimming through the patch, a small thing I noticed: + /* + * SELECT part of the CTAS is parallelizable, so we can make + * each parallel worker insert the tuples that are resulted + * in it's execution into the target table. + */ + if (!is_matview && + IsA(plan->planTree, Gather)) + ((DR_intorel *) dest)->is_parallel = true; +I am not sure at this stage if this is the best way to make CTAS as
parallel but if so, then probably you can expand the comments a bit to
say why you consider only Gather node (and that too when it is the
top-most node) and why not another parallel node like GatherMerge?
If somebody expects to preserve the order of the tuples that are
coming from GatherMerge node of the select part in CTAS or SELECT INTO
while inserting, now if parallelism is allowed, that may not be the
case i.e. the order of insertion of tuples may vary. I'm not quite
sure, if someone wants to use order by in the select parts of CTAS or
SELECT INTO in a real world use case. Thoughts?
Right, for now, I think you can simply remove that check from the code
instead of just commenting it. We will see if there is a better
check/Assert we can add there.
Done.
I also worked on some of the open points I listed earlier in my mail.
3. Need to restrict parallel inserts, if CTAS tries to create temp/global tables as the workers will not have access to those tables.
Done.
Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data.
For prepared statements, the parallelism will not be picked and so is
parallel insertion.
For CTAS with no data option case the select part is not even planned,
and so the parallelism will also not be picked.
4. Need to stop unnecessary parallel shared state such as tuple queue being created and shared to workers.
Done.
I'm listing the things that are still pending.
1. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.
2. Addition of new test cases. Testing with more scenarios and
different data sets, sizes, tablespaces, select into. Analysis on the
2 mismatches in write_parallel.sql regression test.
Attaching v2 patch, thoughts and comments are welcome.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v2-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchapplication/octet-stream; name=v2-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchDownload+437-144
On Wed, Oct 14, 2020 at 2:46 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
While skimming through the patch, a small thing I noticed: + /* + * SELECT part of the CTAS is parallelizable, so we can make + * each parallel worker insert the tuples that are resulted + * in it's execution into the target table. + */ + if (!is_matview && + IsA(plan->planTree, Gather)) + ((DR_intorel *) dest)->is_parallel = true; +I am not sure at this stage if this is the best way to make CTAS as
parallel but if so, then probably you can expand the comments a bit to
say why you consider only Gather node (and that too when it is the
top-most node) and why not another parallel node like GatherMerge?If somebody expects to preserve the order of the tuples that are
coming from GatherMerge node of the select part in CTAS or SELECT INTO
while inserting, now if parallelism is allowed, that may not be the
case i.e. the order of insertion of tuples may vary. I'm not quite
sure, if someone wants to use order by in the select parts of CTAS or
SELECT INTO in a real world use case. Thoughts?
I think there is no reason why one can't use ORDER BY in the
statements we are talking about here. But, I think we can't enable
parallelism for GatherMerge is because for that node we always need to
fetch the data in the leader backend to perform the final merge phase.
So, I was expecting a small comment saying something on those lines.
Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data.
For prepared statements, the parallelism will not be picked and so is
parallel insertion.
Hmm, I am not sure what makes you say this statement. The parallelism
is enabled for prepared statements since commit 57a6a72b6b.
I'm listing the things that are still pending.
1. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.
I am also not sure about this point because we don't display anything
for the DDL part in explain. Can you propose by showing some example
of what you have in mind?
--
With Regards,
Amit Kapila.
On Wed, Oct 14, 2020 at 6:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
If somebody expects to preserve the order of the tuples that are
coming from GatherMerge node of the select part in CTAS or SELECT INTO
while inserting, now if parallelism is allowed, that may not be the
case i.e. the order of insertion of tuples may vary. I'm not quite
sure, if someone wants to use order by in the select parts of CTAS or
SELECT INTO in a real world use case. Thoughts?I think there is no reason why one can't use ORDER BY in the
statements we are talking about here. But, I think we can't enable
parallelism for GatherMerge is because for that node we always need to
fetch the data in the leader backend to perform the final merge phase.
So, I was expecting a small comment saying something on those lines.
Sure, I will add comments in the upcoming patch.
For prepared statements, the parallelism will not be picked and so is
parallel insertion.Hmm, I am not sure what makes you say this statement. The parallelism
is enabled for prepared statements since commit 57a6a72b6b.
Thanks for letting me know this. I misunderstood the parallelism for
prepared statements. Now, I verified with a proper use case(see below),
where I had a prepared statement, CTAS having EXECUTE, in this case too
parallelism is picked and parallel insertion happened with the patch
proposed in this thread. Do we have any problems if we allow parallel
insertion for these cases?
PREPARE myselect AS SELECT * FROM t1;
EXPLAIN ANALYZE CREATE TABLE t1_test AS EXECUTE myselect;
I think the commit 57a6a72b6b has not added any test cases, isn't it good
to add one in prepare.sql or select_parallel.sql?
1. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.I am also not sure about this point because we don't display anything
for the DDL part in explain. Can you propose by showing some example
of what you have in mind?
I thought we could have something like this.
-----------------------------------------------------------------------------
Gather (cost=1000.00..108738.90 rows=0 width=8)
Workers Planned: 2 *Parallel Insert on t_test1*
-> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954
width=8)
Filter: (many < 10000)
-----------------------------------------------------------------------------
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Thu, Oct 15, 2020 at 9:14 AM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Wed, Oct 14, 2020 at 6:16 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
For prepared statements, the parallelism will not be picked and so is
parallel insertion.Hmm, I am not sure what makes you say this statement. The parallelism
is enabled for prepared statements since commit 57a6a72b6b.Thanks for letting me know this. I misunderstood the parallelism for prepared statements. Now, I verified with a proper use case(see below), where I had a prepared statement, CTAS having EXECUTE, in this case too parallelism is picked and parallel insertion happened with the patch proposed in this thread. Do we have any problems if we allow parallel insertion for these cases?
PREPARE myselect AS SELECT * FROM t1;
EXPLAIN ANALYZE CREATE TABLE t1_test AS EXECUTE myselect;I think the commit 57a6a72b6b has not added any test cases, isn't it good to add one in prepare.sql or select_parallel.sql?
I am not sure if it is worth as this is not functionality which is too
complex or there are many chances of getting it broken.
1. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.I am also not sure about this point because we don't display anything
for the DDL part in explain. Can you propose by showing some example
of what you have in mind?I thought we could have something like this.
-----------------------------------------------------------------------------
Gather (cost=1000.00..108738.90 rows=0 width=8)
Workers Planned: 2 Parallel Insert on t_test1
-> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954 width=8)
Filter: (many < 10000)
-----------------------------------------------------------------------------
maybe something like below:
Gather (cost=1000.00..108738.90 rows=0 width=8)
-> Create t_test1
-> Parallel Seq Scan on t_test
I don't know what is the best thing to do here. I think for the
temporary purpose you can keep something like above then once the
patch is matured then we can take a separate opinion for this.
--
With Regards,
Amit Kapila.
On 14.10.20 11:16, Bharath Rupireddy wrote:
On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila <amit.kapila16@gmail.com> wrote:
Yes we do a bunch of catalog changes related to the created new table.
We will have both the txn id and command id assigned when catalogue
changes are being made. But, right after the table is created in the
leader, the command id is incremented (CommandCounterIncrement() is
called from create_ctas_internal()) whereas the txn id remains the
same. The new command id is marked as GetCurrentCommandId(true); in
intorel_startup, then the parallel mode is entered. The txn id and
command id are serialized into parallel DSM, they are then available
to all parallel workers. This is discussed in [1].Few changes I have to make in the parallel worker code: set
currentCommandIdUsed = true;, may be via a common API
SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the
extra command id sharing from the leader to workers.I will add a few comments in the upcoming patch related to the above info.
Yes, that would be good.
Added comments.
But how does that work for SELECT INTO? Are you prohibiting
that? ...In case of SELECT INTO, a new table gets created and I'm not
prohibiting the parallel inserts and I think we don't need to.So, in this case, also do we ensure that table is created before we
launch the workers. If so, I think you can explain in comments about
it and what you need to do that to ensure the same.For SELECT INTO, the table gets created by the leader in
create_ctas_internal(), then ExecInitParallelPlan() gets called which
launches the workers and then the leader(if asked to do so) and the
workers insert the rows. So, we don't need to do any extra work to
ensure the table gets created before the workers start inserting
tuples.While skimming through the patch, a small thing I noticed: + /* + * SELECT part of the CTAS is parallelizable, so we can make + * each parallel worker insert the tuples that are resulted + * in it's execution into the target table. + */ + if (!is_matview && + IsA(plan->planTree, Gather)) + ((DR_intorel *) dest)->is_parallel = true; +I am not sure at this stage if this is the best way to make CTAS as
parallel but if so, then probably you can expand the comments a bit to
say why you consider only Gather node (and that too when it is the
top-most node) and why not another parallel node like GatherMerge?If somebody expects to preserve the order of the tuples that are
coming from GatherMerge node of the select part in CTAS or SELECT INTO
while inserting, now if parallelism is allowed, that may not be the
case i.e. the order of insertion of tuples may vary. I'm not quite
sure, if someone wants to use order by in the select parts of CTAS or
SELECT INTO in a real world use case. Thoughts?Right, for now, I think you can simply remove that check from the code
instead of just commenting it. We will see if there is a better
check/Assert we can add there.Done.
I also worked on some of the open points I listed earlier in my mail.
3. Need to restrict parallel inserts, if CTAS tries to create temp/global tables as the workers will not have access to those tables.
Done.
Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data.
For prepared statements, the parallelism will not be picked and so is
parallel insertion.
For CTAS with no data option case the select part is not even planned,
and so the parallelism will also not be picked.4. Need to stop unnecessary parallel shared state such as tuple queue being created and shared to workers.
Done.
I'm listing the things that are still pending.
1. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.
2. Addition of new test cases. Testing with more scenarios and
different data sets, sizes, tablespaces, select into. Analysis on the
2 mismatches in write_parallel.sql regression test.Attaching v2 patch, thoughts and comments are welcome.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Hi,
Really looking forward to this ending up in postgres as I think it's a
very nice improvement.
Whilst reviewing your patch I was wondering: is there a reason you did
not introduce a batch insert in the destreceiver for the CTAS? For me
this makes a huge difference in ingest speed as otherwise the inserts do
not really scale so well as lock contention start to be a big problem.
If you like I can make a patch to introduce this on top?
Kind regards,
Luc
Swarm64
On Fri, Oct 16, 2020 at 11:33 AM Luc Vlaming <luc@swarm64.com> wrote:
Really looking forward to this ending up in postgres as I think it's a
very nice improvement.Whilst reviewing your patch I was wondering: is there a reason you did
not introduce a batch insert in the destreceiver for the CTAS? For me
this makes a huge difference in ingest speed as otherwise the inserts do
not really scale so well as lock contention start to be a big problem.
If you like I can make a patch to introduce this on top?
Thanks for your interest. You are right, we can get maximum
improvement if we have multi inserts in destreceiver for the CTAS on
the similar lines to COPY FROM command. I specified this point in my
first mail [1]/messages/by-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf+0-Jg+KYT7ZO-Ug@mail.gmail.com. You may want to take a look at an already existing
patch [2]/messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com for multi inserts, I think there are some review comments to
be addressed in that patch. I would love to see the multi insert patch
getting revived.
[1]: /messages/by-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf+0-Jg+KYT7ZO-Ug@mail.gmail.com
[2]: /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.com
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On 16.10.20 08:23, Bharath Rupireddy wrote:
On Fri, Oct 16, 2020 at 11:33 AM Luc Vlaming <luc@swarm64.com> wrote:
Really looking forward to this ending up in postgres as I think it's a
very nice improvement.Whilst reviewing your patch I was wondering: is there a reason you did
not introduce a batch insert in the destreceiver for the CTAS? For me
this makes a huge difference in ingest speed as otherwise the inserts do
not really scale so well as lock contention start to be a big problem.
If you like I can make a patch to introduce this on top?Thanks for your interest. You are right, we can get maximum
improvement if we have multi inserts in destreceiver for the CTAS on
the similar lines to COPY FROM command. I specified this point in my
first mail [1]. You may want to take a look at an already existing
patch [2] for multi inserts, I think there are some review comments to
be addressed in that patch. I would love to see the multi insert patch
getting revived.[1] - /messages/by-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf+0-Jg+KYT7ZO-Ug@mail.gmail.com
[2] - /messages/by-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1=N1AC-Fw@mail.gmail.comWith Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Sorry had not seen that pointer in your first email.
I'll first finish some other patches I'm working on and then I'll try to
revive that patch. Thanks for the pointers.
Kind regards,
Luc
Swarm64
On Thu, Oct 15, 2020 at 3:18 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
1. How to represent the parallel insert for CTAS in explain plans?
The
explain CTAS shows the plan for only the SELECT part. How about
having
some textual info along with the Gather node? I'm not quite sure on
this point, any suggestions are welcome.I am also not sure about this point because we don't display anything
for the DDL part in explain. Can you propose by showing some example
of what you have in mind?I thought we could have something like this.
-----------------------------------------------------------------------------
Gather (cost=1000.00..108738.90 rows=0 width=8)
Workers Planned: 2 Parallel Insert on t_test1
-> Parallel Seq Scan on t_test (cost=0.00..106748.00
rows=4954 width=8)
Filter: (many < 10000)
-----------------------------------------------------------------------------
maybe something like below:
Gather (cost=1000.00..108738.90 rows=0 width=8)
-> Create t_test1
-> Parallel Seq Scan on t_testI don't know what is the best thing to do here. I think for the
temporary purpose you can keep something like above then once the
patch is matured then we can take a separate opinion for this.
Agreed. Here's a snapshot of explain with the change suggested.
postgres=# EXPLAIN (ANALYZE, COSTS OFF) CREATE TABLE t1_test AS SELECT *
FROM t1;
QUERY PLAN
---------------------------------------------------------------------------------
Gather (actual time=970.524..972.913 rows=0 loops=1)
* -> Create t1_test*
Workers Planned: 2
Workers Launched: 2
-> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333
loops=3)
Planning Time: 0.049 ms
Execution Time: 973.733 ms
I think there is no reason why one can't use ORDER BY in the
statements we are talking about here. But, I think we can't enable
parallelism for GatherMerge is because for that node we always need to
fetch the data in the leader backend to perform the final merge phase.
So, I was expecting a small comment saying something on those lines.
Added comments.
2. Addition of new test cases.
Added new test cases.
Analysis on the 2 mismatches in write_parallel.sql regression test.
Done. It needed a small code change in costsize.c. Now, both make check and
make check-world passes.
Apart from above, a couple of other things I have finished with the v3
patch.
1. Both make check and make check-world with force_parallel_mode = regress
passes.
2. I enabled parallel inserts in case of materialized views. Hope that's
fine.
Attaching v3 patch herewith.
I'm done with all the open points in my list. Please review the v3 patch
and provide comments.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v3-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchapplication/x-patch; name=v3-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchDownload+694-153
On Mon, Oct 19, 2020 at 10:47 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
Attaching v3 patch herewith.
I'm done with all the open points in my list. Please review the v3 patch and provide comments.
Attaching v4 patch, rebased on the latest master 68b1a4877e. Also,
added this feature to commitfest -
https://commitfest.postgresql.org/31/2841/
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v4-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchapplication/x-patch; name=v4-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchDownload+682-138
Hi,
I'm very interested in this feature,
and I'm looking at the patch, here are some comments.
1.
+ if (!TupIsNull(outerTupleSlot))
+ {
+ (void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest);
+ node->ps.state->es_processed++;
+ }
+
+ if(TupIsNull(outerTupleSlot))
+ break;
+ }
How about the following style:
if(TupIsNull(outerTupleSlot))
Break;
(void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest);
node->ps.state->es_processed++;
Which looks cleaner.
2.
+
+ if (into != NULL &&
+ IsA(into, IntoClause))
+ {
The check can be replaced by ISCTAS(into).
3.
+ /*
+ * For parallelizing inserts in CTAS i.e. making each
+ * parallel worker inerst it's tuples, we must send
+ * information such as intoclause(for each worker
'inerst' looks like a typo (insert).
4.
+ /* Estimate space for into clause for CTAS. */
+ if (ISCTAS(planstate->intoclause))
+ {
+ intoclausestr = nodeToString(planstate->intoclause);
+ shm_toc_estimate_chunk(&pcxt->estimator, strlen(intoclausestr) + 1);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
...
+ if (intoclausestr != NULL)
+ {
+ char *shmptr = (char *)shm_toc_allocate(pcxt->toc,
+ strlen(intoclausestr) + 1);
+ strcpy(shmptr, intoclausestr);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, shmptr);
+ }
The code here call strlen(intoclausestr) for two times,
After checking the existing code in ExecInitParallelPlan,
It used to store the strlen in a variable.
So how about the following style:
intoclause_len = strlen(intoclausestr);
...
/* Store serialized intoclause. */
intoclause_space = shm_toc_allocate(pcxt->toc, intoclause_len + 1);
memcpy(shmptr, intoclausestr, intoclause_len + 1);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space);
the code in ExecInitParallelPlan
5.
+ if (intoclausestr != NULL)
+ {
+ char *shmptr = (char *)shm_toc_allocate(pcxt->toc,
+ strlen(intoclausestr) + 1);
+ strcpy(shmptr, intoclausestr);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, shmptr);
+ }
+
/* Set up the tuple queues that the workers will write into. */
- pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
+ if (intoclausestr == NULL)
+ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
The two check about intoclausestr seems can be combined like:
if (intoclausestr != NULL)
{
...
}
else
{
...
}
Best regards,
houzj
On Tue, Nov 24, 2020 at 4:43 PM Hou, Zhijie <houzj.fnst@cn.fujitsu.com> wrote:
I'm very interested in this feature,
and I'm looking at the patch, here are some comments.
Thanks for the review.
How about the following style:
if(TupIsNull(outerTupleSlot))
Break;(void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest);
node->ps.state->es_processed++;Which looks cleaner.
Done.
The check can be replaced by ISCTAS(into).
Done.
'inerst' looks like a typo (insert).
Corrected.
The code here call strlen(intoclausestr) for two times,
After checking the existing code in ExecInitParallelPlan,
It used to store the strlen in a variable.So how about the following style:
intoclause_len = strlen(intoclausestr);
...
/* Store serialized intoclause. */
intoclause_space = shm_toc_allocate(pcxt->toc, intoclause_len + 1);
memcpy(shmptr, intoclausestr, intoclause_len + 1);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space);
Done.
The two check about intoclausestr seems can be combined like:
if (intoclausestr != NULL)
{
...
}
else
{
...
}
Done.
Attaching v5 patch. Please consider it for further review.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v5-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchapplication/x-patch; name=v5-0001-Parallel-Inserts-in-CREATE-TABLE-AS.patchDownload+688-139
Hi,
I have an issue about the following code:
econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext);
+ if (ISCTAS(node->ps.intoclause))
+ {
+ ExecParallelInsertInCTAS(node);
+ return NULL;
+ }
/* If no projection is required, we're done. */
if (node->ps.ps_ProjInfo == NULL)
return slot;
/*
* Form the result tuple using ExecProject(), and return it.
*/
econtext->ecxt_outertuple = slot;
return ExecProject(node->ps.ps_ProjInfo);
It seems the projection will be skipped.
Is this because projection is not required in this case ?
(I'm not very familiar with where the projection will be.)
If projection is not required here, shall we add some comments here?
Best regards,
houzj
On Thu, Nov 26, 2020 at 7:47 AM Hou, Zhijie <houzj.fnst@cn.fujitsu.com> wrote:
Hi,
I have an issue about the following code:
econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext);+ if (ISCTAS(node->ps.intoclause)) + { + ExecParallelInsertInCTAS(node); + return NULL; + }/* If no projection is required, we're done. */
if (node->ps.ps_ProjInfo == NULL)
return slot;/*
* Form the result tuple using ExecProject(), and return it.
*/
econtext->ecxt_outertuple = slot;
return ExecProject(node->ps.ps_ProjInfo);It seems the projection will be skipped.
Is this because projection is not required in this case ?
(I'm not very familiar with where the projection will be.)
For parallel inserts in CTAS, I don't think we need to project the
tuples being returned from the underlying plan nodes, and also we have
nothing to project from the Gather node further up. The required
projection will happen while the tuples are being returned from the
underlying nodes and the projected tuples are being directly fed to
CTAS's dest receiver intorel_receive(), from there into the created
table. We don't need ExecProject again in ExecParallelInsertInCTAS().
For instance, projection will always be done when the tuple is being
returned from an underlying sequential scan node(see ExecScan() -->
ExecProject() and this is true for both leader and workers. In both
leader and workers, we are just calling CTAS's dest receiver
intorel_receive().
Thoughts?
If projection is not required here, shall we add some comments here?
If the above point looks okay, I can add a comment.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Hi ,
On Thu, Nov 26, 2020 at 7:47 AM Hou, Zhijie <houzj.fnst@cn.fujitsu.com>
wrote:Hi,
I have an issue about the following code:
econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext);+ if (ISCTAS(node->ps.intoclause)) + { + ExecParallelInsertInCTAS(node); + return NULL; + }/* If no projection is required, we're done. */
if (node->ps.ps_ProjInfo == NULL)
return slot;/*
* Form the result tuple using ExecProject(), and return it.
*/
econtext->ecxt_outertuple = slot;
return ExecProject(node->ps.ps_ProjInfo);It seems the projection will be skipped.
Is this because projection is not required in this case ?
(I'm not very familiar with where the projection will be.)For parallel inserts in CTAS, I don't think we need to project the tuples
being returned from the underlying plan nodes, and also we have nothing
to project from the Gather node further up. The required projection will
happen while the tuples are being returned from the underlying nodes and
the projected tuples are being directly fed to CTAS's dest receiver
intorel_receive(), from there into the created table. We don't need
ExecProject again in ExecParallelInsertInCTAS().For instance, projection will always be done when the tuple is being returned
from an underlying sequential scan node(see ExecScan() -->
ExecProject() and this is true for both leader and workers. In both leader
and workers, we are just calling CTAS's dest receiver intorel_receive().Thoughts?
I took a deep look at the projection logic.
In most cases, you are right that Gather node does not need projection.
In some rare cases, such as Subplan (or initplan I guess).
The projection will happen in Gather node.
The example:
Create table test(i int);
Create table test2(a int, b int);
insert into test values(generate_series(1,10000000,1));
insert into test2 values(generate_series(1,1000,1), generate_series(1,1000,1));
postgres=# explain(verbose, costs off) select test.i,(select i from (select * from test2) as tt limit 1) from test where test.i < 2000;
QUERY PLAN
----------------------------------------
Gather
Output: test.i, (SubPlan 1)
Workers Planned: 2
-> Parallel Seq Scan on public.test
Output: test.i
Filter: (test.i < 2000)
SubPlan 1
-> Limit
Output: (test.i)
-> Seq Scan on public.test2
Output: test.i
In this case, projection is necessary,
because the subplan will be executed in projection.
If skipped, the table created will loss some data.
Best regards,
houzj
On Thu, Nov 26, 2020 at 12:15 PM Hou, Zhijie <houzj.fnst@cn.fujitsu.com> wrote:
I took a deep look at the projection logic.
In most cases, you are right that Gather node does not need projection.In some rare cases, such as Subplan (or initplan I guess).
The projection will happen in Gather node.The example:
Create table test(i int);
Create table test2(a int, b int);
insert into test values(generate_series(1,10000000,1));
insert into test2 values(generate_series(1,1000,1), generate_series(1,1000,1));postgres=# explain(verbose, costs off) select test.i,(select i from (select * from test2) as tt limit 1) from test where test.i < 2000;
QUERY PLAN
----------------------------------------
Gather
Output: test.i, (SubPlan 1)
Workers Planned: 2
-> Parallel Seq Scan on public.test
Output: test.i
Filter: (test.i < 2000)
SubPlan 1
-> Limit
Output: (test.i)
-> Seq Scan on public.test2
Output: test.iIn this case, projection is necessary,
because the subplan will be executed in projection.If skipped, the table created will loss some data.
Thanks a lot for the use case. Yes with the current patch table will
lose data related to the subplan. On analyzing further, I think we can
not allow parallel inserts in the cases when the Gather node has some
projections to do. Because the workers can not perform that
projection. So, having ps_ProjInfo in the Gather node is an indication
for us to disable parallel inserts and only the leader can do the
insertions after the Gather node does the required projections.
Thoughts?
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com