Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

Started by Rafia Sabihover 8 years ago17 messages
#1Rafia Sabih
rafia.sabih@enterprisedb.com
2 attachment(s)

Hello everybody,

Here is a thing I observed in my recent experimentation, on changing
the value of PARALLEL_TUPLE_QUEUE_SIZE to 6553600, the performance of
a TPC-H query is improved by more than 50%.

Specifically, with this change, q12 completes in 14 seconds which was
taking 45 seconds on head. There wasn't any change in the plan
structure, just the time at gather-merge reduced which gave this
improvement.

This clearly says that the current value of PARALLEL_TUPLE_QUEUE_SIZE
is not the best one for all the queries, rather some modification in
it is very likely to improve performance significantly. One way to do
is to give this parameters as another GUC just like
min_parallel_table_scan_size, etc.

Attached .txt file gives the plan at head and with this patch,
additionally patch is attached for setting PARALLEL_TUPLE_QUEUE_SIZE
to 6553600 too.

Thoughts?
--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachments:

q12_ptqs.txttext/plain; charset=US-ASCII; name=q12_ptqs.txtDownload
change_parallel_tuple_q_size.patchapplication/octet-stream; name=change_parallel_tuple_q_size.patchDownload
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 0610180016..d66be0b301 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -55,7 +55,7 @@
 #define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000006)
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000007)
 
-#define PARALLEL_TUPLE_QUEUE_SIZE		65536
+#define PARALLEL_TUPLE_QUEUE_SIZE		6553600
 
 /*
  * DSM structure for accumulating per-PlanState instrumentation.
#2Ashutosh Bapat
ashutosh.bapat@enterprisedb.com
In reply to: Rafia Sabih (#1)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Tue, May 30, 2017 at 5:28 AM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

Hello everybody,

Here is a thing I observed in my recent experimentation, on changing
the value of PARALLEL_TUPLE_QUEUE_SIZE to 6553600, the performance of
a TPC-H query is improved by more than 50%.

How many tuples are being gathered? This could happen if the workers
are waiting for the leader to make space in the queue after its
filled. By increasing the queue size we might be reducing the waiting
time for worker. In that case, it may be better to check why leader is
not pulling rows faster. How does the performance vary with different
values of PARALLEL_TUPLE_QUEUE_SIZE?

Specifically, with this change, q12 completes in 14 seconds which was
taking 45 seconds on head. There wasn't any change in the plan
structure, just the time at gather-merge reduced which gave this
improvement.

This clearly says that the current value of PARALLEL_TUPLE_QUEUE_SIZE
is not the best one for all the queries, rather some modification in
it is very likely to improve performance significantly. One way to do
is to give this parameters as another GUC just like
min_parallel_table_scan_size, etc.

GUC may help.

Attached .txt file gives the plan at head and with this patch,
additionally patch is attached for setting PARALLEL_TUPLE_QUEUE_SIZE
to 6553600 too.

Increasing that number would require increased DSM which may not be
available. Also, I don't see any analysis as to why 6553600 is chosen?
Is it optimal? Does that work for all kinds of work loads?

--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Robert Haas
robertmhaas@gmail.com
In reply to: Ashutosh Bapat (#2)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Tue, May 30, 2017 at 6:50 AM, Ashutosh Bapat
<ashutosh.bapat@enterprisedb.com> wrote:

Increasing that number would require increased DSM which may not be
available. Also, I don't see any analysis as to why 6553600 is chosen?
Is it optimal? Does that work for all kinds of work loads?

Picky, picky. The point is that Rafia has discovered that a large
increase can sometimes significantly improve performance. I don't
think she's necessarily proposing that (or anything else) as a final
value that we should definitely use, just getting the conversation
started.

I did a little bit of brief experimentation on this same topic a long
time ago and didn't see an improvement from boosting the queue size
beyond 64k but Rafia is testing Gather rather than Gather Merge and,
as I say, my test was very brief. I think it would be a good idea to
try to get a complete picture here. Does this help on any query that
returns many tuples through the Gather? Only the ones that use Gather
Merge? Some queries but not others with no obvious pattern? Only
this query?

Blindly adding a GUC because we found one query that would be faster
with a different value is not the right solution. If we don't even
know why a larger value is needed here and (maybe) not elsewhere, then
how will any user possibly know how to tune the GUC? And do we really
want the user to have to keep adjusting a GUC before each query to get
maximum performance? I think we need to understand the whole picture
here, and then decide what to do. Ideally this would auto-tune, but
we can't write code for that without a more complete picture of the
behavior.

BTW, there are a couple of reasons I originally picked 64k here. One
is that making it smaller was very noticeably terrible in my testing,
while making it bigger didn't help much. The other is that I figured
64k was small enough that nobody would care about the memory
utilization. I'm not sure we can assume the same thing if we make
this bigger. It's probably fine to use a 6.4M tuple queue for each
worker if work_mem is set to something big, but maybe not if work_mem
is set to the default of 4MB.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Andres Freund
andres@anarazel.de
In reply to: Robert Haas (#3)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On 2017-05-30 07:27:12 -0400, Robert Haas wrote:

The other is that I figured 64k was small enough that nobody would
care about the memory utilization. I'm not sure we can assume the
same thing if we make this bigger. It's probably fine to use a 6.4M
tuple queue for each worker if work_mem is set to something big, but
maybe not if work_mem is set to the default of 4MB.

Probably not. It might also end up being detrimental performancewise,
because we start touching more memory. I guess it'd make sense to set
it in the planner, based on the size of a) work_mem b) number of
expected tuples.

I do wonder whether the larger size fixes some scheduling issue
(i.e. while some backend is scheduled out, the other side of the queue
can continue), or whether it's largely triggered by fixable contention
inside the queue. I'd guess it's a bit of both. It should be
measurable in some cases, by comparing the amount of time blocking on
reading the queue (or continuing because the queue is empty), writing
to the queue (should always result in blocking) and time spent waiting
for the spinlock.

- Andres

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Ashutosh Bapat
ashutosh.bapat@enterprisedb.com
In reply to: Robert Haas (#3)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

I did a little bit of brief experimentation on this same topic a long
time ago and didn't see an improvement from boosting the queue size
beyond 64k but Rafia is testing Gather rather than Gather Merge and,
as I say, my test was very brief. I think it would be a good idea to
try to get a complete picture here. Does this help on any query that
returns many tuples through the Gather? Only the ones that use Gather
Merge? Some queries but not others with no obvious pattern? Only
this query?

Yes, we need to get answers to those questions. I guess, performance
measurements varying one parameter at a time would help us make right
decision. Some of the relevant parameters I could quickly think of
are: number of tuples received by Gather, size of tuples,
Gather/Gather merge, number of workers. There may be more. I am
guessing that the number of tuples that can fit the queue = (size of
queue - headers)/(size of tuple + per tuple header). Higher the size
of tuple, lesser is the number of tuples that a worker can queue up
and so higher the chances that it will wait for the leader to empty
the queue. For gather merge that varies a lot depending upon how the
data is distributed across workers and it's probably more susceptible
to variations in the number of tuples that fit in the queue. More the
number of workers, busier will be the leader and thus more chance of
workers waiting for the leader to empty the queue. But in that case a
balancing effect will be that each worker will queue lesser number of
rows. Measurements would help us see how these balancing factors play
out actually.

Blindly adding a GUC because we found one query that would be faster
with a different value is not the right solution. If we don't even
know why a larger value is needed here and (maybe) not elsewhere, then
how will any user possibly know how to tune the GUC? And do we really
want the user to have to keep adjusting a GUC before each query to get
maximum performance? I think we need to understand the whole picture
here, and then decide what to do. Ideally this would auto-tune, but
we can't write code for that without a more complete picture of the
behavior.

We will need a way for user to cap the memory allocated and GUC looks
like a better way to do that. I agree that the GUC as a tuning
parameter will be much less useful.

BTW, there are a couple of reasons I originally picked 64k here. One
is that making it smaller was very noticeably terrible in my testing,
while making it bigger didn't help much. The other is that I figured
64k was small enough that nobody would care about the memory
utilization. I'm not sure we can assume the same thing if we make
this bigger. It's probably fine to use a 6.4M tuple queue for each
worker if work_mem is set to something big, but maybe not if work_mem
is set to the default of 4MB.

AFAIK, work_mem comes from memory private to the process whereas this
memory will come from the shared memory pool. There are different OS
level settings for those and thus linking size of parallel tuple queue
with work_mem may not always work. But I agree that size of work_mem
is indicative of size of data that needs to be processed in general
and hence can be used as a good estimate of required size of the
queue.

--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Robert Haas
robertmhaas@gmail.com
In reply to: Ashutosh Bapat (#5)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Wed, May 31, 2017 at 2:35 AM, Ashutosh Bapat
<ashutosh.bapat@enterprisedb.com> wrote:

AFAIK, work_mem comes from memory private to the process whereas this
memory will come from the shared memory pool.

I don't think that really matters. The point of limits like work_mem
is to avoid driving the machine into swap. Allocating shared memory
might error out rather than causing swapping in some cases on some
systems, but any difference between private and shared memory is not
the real issue here. The issue is overall memory consumption.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Robert Haas (#3)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Tue, May 30, 2017 at 4:57 PM, Robert Haas <robertmhaas@gmail.com> wrote:

I did a little bit of brief experimentation on this same topic a long
time ago and didn't see an improvement from boosting the queue size
beyond 64k but Rafia is testing Gather rather than Gather Merge and,
as I say, my test was very brief. I think it would be a good idea to
try to get a complete picture here. Does this help on any query that
returns many tuples through the Gather? Only the ones that use Gather
Merge? Some queries but not others with no obvious pattern? Only
this query?

I did further exploration trying other values of
PARALLEL_TUPLE_QUEUE_SIZE and trying different queries and here are my
findings,
- on even setting PARALLEL_TUPLE_QUEUE_SIZE to 655360, there isn't
much improvement in q12 itself.
- there is no other TPC-H query which is showing significant
improvement on 6553600 itself. There is a small improvement in q3
which is also using gather-merge.
- as per perf analysis of q12 on head and patch, the %age of
ExecGatherMerge is 18% with patch and 98% on head, and similar with
gather_merge_readnext and gather_merge_writenext.

As per my understanding it looks like this increase in tuple queue
size is helping only gather-merge. Particularly, in the case where it
is enough stalling by master in gather-merge because it is maintaining
the sort-order. Like in q12 the index is unclustered and gather-merge
is just above parallel index scan, thus, it is likely that to maintain
the order the workers have to wait long for the in-sequence tuple is
attained by the master. Something like this might be happening, master
takes one tuple from worker 1, then next say 10 tuples from worker 2
and so on, and then finally returning to worker1, so, one worker 1 has
done enough that filled it's queue it sits idle. Hence, on increasing
the tuple queue size helps in workers to keep on working for longer
and this is improving the performance.

In other cases like q3, q18, etc. gather-merge is above sort, partial
group aggregate, etc. here the chances of stalls is comparatively
lesser stalls since the scan of relation is using the primary key,
hence the tuples in the blocks are likely to be in the order. Similar
was the case for many other cases of TPC-H queries. Other thing is
that in TPC-H benchmark queries most of the time the number of tuples
at gather-merge is fairly low so I'll try to test this on some custom
queries which exhibit aforementioned case.

Blindly adding a GUC because we found one query that would be faster
with a different value is not the right solution. If we don't even
know why a larger value is needed here and (maybe) not elsewhere, then
how will any user possibly know how to tune the GUC? And do we really
want the user to have to keep adjusting a GUC before each query to get
maximum performance? I think we need to understand the whole picture
here, and then decide what to do. Ideally this would auto-tune, but
we can't write code for that without a more complete picture of the
behavior.

Yeah may be for the scenario discussed above GUC is not the best idea
but may be using something which can tell the relation between the
ordering on index and the physical ordering of the tuples along with
the number of tuples, etc. by the planner to decide the value of
PARALLEL_TUPLE_QUEUE_SIZE might help. E.g. if the index is primary key
then the physical order is same as index order and if this the sort
key then while at gather-merge stalls would be less, but if this is
unclustered index then the physical order is way different than index
order then it is likely that workers would be stalling more so keep a
higher value of PARALLEL_TUPLE_QUEUE _SIZE based on the number of
tuples.

Again I am not yet concluding anything as this is very less
experimentation to ascertain something, I'll continue the experiments
and would be grateful to have more suggestions on that.

--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#8Andres Freund
andres@anarazel.de
In reply to: Rafia Sabih (#7)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On 2017-06-01 18:41:20 +0530, Rafia Sabih wrote:

As per my understanding it looks like this increase in tuple queue
size is helping only gather-merge. Particularly, in the case where it
is enough stalling by master in gather-merge because it is maintaining
the sort-order. Like in q12 the index is unclustered and gather-merge
is just above parallel index scan, thus, it is likely that to maintain
the order the workers have to wait long for the in-sequence tuple is
attained by the master.

I wonder if there's some way we could make this problem a bit less bad.
One underlying problem is that we don't know what the current boundary
on each worker is, unless it returns a tuple. I.e. even if some worker
is guaranteed to not return any further tuples below another worker's
last tuple, gather-merge won't know about that until it finds another
matching tuple. Perhaps, for some subsets, we could make the workers
update that boundary without producing a tuple that gather will actually
return? In the, probably reasonably common, case of having merge-joins
below the gather, it shouldn't be very hard to do so. Imagine e.g. that
every worker gets a "slot" in a dsm where it can point to a tuple
(managed by dsa.c to deal with variable-length keys) that contains the
current boundary. For a merge-join it'd not be troublesome to
occasionally - although what constitutes that isn't easy, perhaps the
master signals the worker? - put a new boundary tuple there, even if it
doesn't find a match. It's probably harder for cases where most of the
filtering happens far below the top-level worker node.

- Andres

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#9Amit Kapila
amit.kapila16@gmail.com
In reply to: Rafia Sabih (#7)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Thu, Jun 1, 2017 at 6:41 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

On Tue, May 30, 2017 at 4:57 PM, Robert Haas <robertmhaas@gmail.com> wrote:

I did a little bit of brief experimentation on this same topic a long
time ago and didn't see an improvement from boosting the queue size
beyond 64k but Rafia is testing Gather rather than Gather Merge and,
as I say, my test was very brief. I think it would be a good idea to
try to get a complete picture here. Does this help on any query that
returns many tuples through the Gather? Only the ones that use Gather
Merge? Some queries but not others with no obvious pattern? Only
this query?

I did further exploration trying other values of
PARALLEL_TUPLE_QUEUE_SIZE and trying different queries and here are my
findings,
- on even setting PARALLEL_TUPLE_QUEUE_SIZE to 655360, there isn't
much improvement in q12 itself.
- there is no other TPC-H query which is showing significant
improvement on 6553600 itself. There is a small improvement in q3
which is also using gather-merge.
- as per perf analysis of q12 on head and patch, the %age of
ExecGatherMerge is 18% with patch and 98% on head, and similar with
gather_merge_readnext and gather_merge_writenext.

As per my understanding it looks like this increase in tuple queue
size is helping only gather-merge. Particularly, in the case where it
is enough stalling by master in gather-merge because it is maintaining
the sort-order. Like in q12 the index is unclustered and gather-merge
is just above parallel index scan, thus, it is likely that to maintain
the order the workers have to wait long for the in-sequence tuple is
attained by the master. Something like this might be happening, master
takes one tuple from worker 1, then next say 10 tuples from worker 2
and so on, and then finally returning to worker1, so, one worker 1 has
done enough that filled it's queue it sits idle. Hence, on increasing
the tuple queue size helps in workers to keep on working for longer
and this is improving the performance.

Your reasoning sounds sensible to me. I think the other way to attack
this problem is that we can maintain some local queue in each of the
workers when the shared memory queue becomes full. Basically, we can
extend your "Faster processing at Gather node" patch [1]/messages/by-id/CAOGQiiMwhOd5-iKZnizn+EdzZmB0bc3xa6rKXQgvhbnQ29zCJg@mail.gmail.com such that
instead of fixed sized local queue, we can extend it when the shm
queue become full. I think that way we can handle both the problems
(worker won't stall if shm queues are full and workers can do batched
writes in shm queue to avoid the shm queue communication overhead) in
a similar way.

[1]: /messages/by-id/CAOGQiiMwhOd5-iKZnizn+EdzZmB0bc3xa6rKXQgvhbnQ29zCJg@mail.gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#10Robert Haas
robertmhaas@gmail.com
In reply to: Amit Kapila (#9)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Fri, Jun 2, 2017 at 9:01 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Your reasoning sounds sensible to me. I think the other way to attack
this problem is that we can maintain some local queue in each of the
workers when the shared memory queue becomes full. Basically, we can
extend your "Faster processing at Gather node" patch [1] such that
instead of fixed sized local queue, we can extend it when the shm
queue become full. I think that way we can handle both the problems
(worker won't stall if shm queues are full and workers can do batched
writes in shm queue to avoid the shm queue communication overhead) in
a similar way.

We still have to bound the amount of memory that we use for queueing
data in some way.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#11Amit Kapila
amit.kapila16@gmail.com
In reply to: Robert Haas (#10)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Fri, Jun 2, 2017 at 6:38 PM, Robert Haas <robertmhaas@gmail.com> wrote:

On Fri, Jun 2, 2017 at 9:01 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Your reasoning sounds sensible to me. I think the other way to attack
this problem is that we can maintain some local queue in each of the
workers when the shared memory queue becomes full. Basically, we can
extend your "Faster processing at Gather node" patch [1] such that
instead of fixed sized local queue, we can extend it when the shm
queue become full. I think that way we can handle both the problems
(worker won't stall if shm queues are full and workers can do batched
writes in shm queue to avoid the shm queue communication overhead) in
a similar way.

We still have to bound the amount of memory that we use for queueing
data in some way.

Yeah, probably till work_mem (or some percentage of work_mem). If we
want to have some extendable solution then we might want to back it up
with some file, however, we might not need to go that far. I think we
can do some experiments to see how much additional memory is
sufficient to give us maximum benefit.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#12Robert Haas
robertmhaas@gmail.com
In reply to: Amit Kapila (#11)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Fri, Jun 2, 2017 at 9:15 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

On Fri, Jun 2, 2017 at 6:38 PM, Robert Haas <robertmhaas@gmail.com> wrote:

On Fri, Jun 2, 2017 at 9:01 AM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Your reasoning sounds sensible to me. I think the other way to attack
this problem is that we can maintain some local queue in each of the
workers when the shared memory queue becomes full. Basically, we can
extend your "Faster processing at Gather node" patch [1] such that
instead of fixed sized local queue, we can extend it when the shm
queue become full. I think that way we can handle both the problems
(worker won't stall if shm queues are full and workers can do batched
writes in shm queue to avoid the shm queue communication overhead) in
a similar way.

We still have to bound the amount of memory that we use for queueing
data in some way.

Yeah, probably till work_mem (or some percentage of work_mem). If we
want to have some extendable solution then we might want to back it up
with some file, however, we might not need to go that far. I think we
can do some experiments to see how much additional memory is
sufficient to give us maximum benefit.

Yes, I think that's important. Also, I think we still need a better
understanding of in which cases the benefit is there.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#13Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Amit Kapila (#9)
1 attachment(s)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Fri, Jun 2, 2017 at 6:31 PM, Amit Kapila <amit.kapila16@gmail.com> wrote:

Your reasoning sounds sensible to me. I think the other way to attack
this problem is that we can maintain some local queue in each of the
workers when the shared memory queue becomes full. Basically, we can
extend your "Faster processing at Gather node" patch [1] such that
instead of fixed sized local queue, we can extend it when the shm
queue become full. I think that way we can handle both the problems
(worker won't stall if shm queues are full and workers can do batched
writes in shm queue to avoid the shm queue communication overhead) in
a similar way.

[1] - /messages/by-id/CAOGQiiMwhOd5-iKZnizn+EdzZmB0bc3xa6rKXQgvhbnQ29zCJg@mail.gmail.com

I worked on this idea of using local queue as a temporary buffer to
write the tuples when master is busy and shared queue is full, and it
gives quite some improvement in the query performance.

Design:
On a basic level, the design of this patch can be explained as
following, similar to shm_mq, there is a new structure local_mq which
is private for each worker. Once shared queue is full, we write the
tuple in local queue. Since, local queue is never shared we do not
need any sort of locking for writing in it, hence writing in local
queue is one cheap operation.

Once local queue is atleast 5% (for this version, I've kept this, but
we might need to modify it) full we copy the data from local to shared
queue. In case both the queues are full, wait till master reads from
shared queue, then copy some data from local to shared queue, till
required space is available, subsequently write the tuple to local
queue. If at any instant local queue becomes empty then we write the
tuple in shared queue itself, provided there is space. At the time of
worker shutdown we copy all the data from local queue to shared queue.

For this version of the patch I have kept the size of local queue =
100 * PARALLEL_TUPLE_QUEUE_SIZE = 6553600, which might not be the best
and I am open to understand the reasons for modifying it. But it is
kept that way for the scenarios where gather/gather-merge node is
slow. And I expect when a master is busy it might be for some long
time or the data to be processed is high and we would not want our
worker to wait for some long time.

Performance:
These experiments are on TPC-H scale factor 20. The patch is giving
around 20-30% performance improvement in queries with selectivity
something around 20-30%.

Head:
Default plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 15000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..334367.85 rows=10313822 width=129) (actual
time=0.057..26389.587 rows=10258702 loops=1)
Index Cond: (l_orderkey < 15000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 4737888
Planning time: 1.686 ms
Execution time: 27402.801 ms
(6 rows)

Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 15000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..193789.78 rows=10313822 width=129) (actual
time=0.354..41153.916 rows=10258702 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..193789.78 rows=2578456 width=129) (actual
time=0.062..6530.167 rows=2051740 loops=5)
Index Cond: (l_orderkey < 15000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 947578
Planning time: 0.383 ms
Execution time: 42027.645 ms
(9 rows)

Patch:
Force parallelism plan

explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 15000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..193789.78 rows=10313822 width=129) (actual
time=0.413..16690.294 rows=10258702 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..193789.78 rows=2578456 width=129) (actual
time=0.047..6185.527 rows=2051740 loops=5)
Index Cond: (l_orderkey < 15000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 947578
Planning time: 0.406 ms
Execution time: 17616.750 ms
(9 rows)

Head:
Default plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 30000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..684102.33 rows=21101661 width=129) (actual
time=0.131..55532.251 rows=20519918 loops=1)
Index Cond: (l_orderkey < 30000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 9479875
Planning time: 0.318 ms
Execution time: 57436.251 ms
(6 rows)

Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 30000000;

QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..396485.31 rows=21101661 width=129) (actual
time=0.557..69190.640 rows=20519918 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..396485.31 rows=5275415 width=129) (actual
time=0.106..12797.711 rows=4103984 loops=5)
Index Cond: (l_orderkey < 30000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 1895975
Planning time: 0.393 ms
Execution time: 70924.801 ms
(9 rows)

Patch:
Force parallelism plan:
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 30000000;

QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..396485.31 rows=21101661 width=129) (actual
time=0.424..31677.524 rows=20519918 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..396485.31 rows=5275415 width=129) (actual
time=0.075..12811.910 rows=4103984 loops=5)
Index Cond: (l_orderkey < 30000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 1895975
Planning time: 0.462 ms
Execution time: 33440.322 ms
(9 rows)

Head:
Default plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 60000000;

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..1337265.07 rows=41248987 width=129) (actual
time=0.070..107944.729 rows=41035759 loops=1)
Index Cond: (l_orderkey < 60000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 18950286
Planning time: 2.021 ms
Execution time: 111963.420 ms
(6 rows)

Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 60000000;
QUERY
PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.00..692896.08 rows=41248987 width=129) (actual
time=0.354..141432.886 rows=41035759 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on lineitem (cost=0.00..692896.08
rows=10312247 width=129) (actual time=0.029..31678.105 rows=8207152
loops=5)
Filter: ((l_extendedprice < '50000'::numeric) AND (l_orderkey
< 60000000))
Rows Removed by Filter: 15791770
Planning time: 1.883 ms
Execution time: 144859.515 ms
(8 rows)

Patch:
Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 60000000;
QUERY
PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.00..692896.08 rows=41248987 width=129) (actual
time=0.350..78312.666 rows=41035759 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on lineitem (cost=0.00..692896.08
rows=10312247 width=129) (actual time=0.027..31867.170 rows=8207152
loops=5)
Filter: ((l_extendedprice < '50000'::numeric) AND (l_orderkey
< 60000000))
Rows Removed by Filter: 15791770
Planning time: 0.439 ms
Execution time: 82057.225 ms
(8 rows)

Apart from these, Q12 from the benchmark queries shows good
improvement with this patch.

Head:
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=1001.19..426457.34 rows=1 width=27) (actual
time=42770.491..42770.491 rows=1 loops=1)
-> GroupAggregate (cost=1001.19..2979194.24 rows=7 width=27)
(actual time=42770.489..42770.489 rows=1 loops=1)
Group Key: lineitem.l_shipmode
-> Gather Merge (cost=1001.19..2969127.63 rows=575231
width=27) (actual time=11.355..42224.843 rows=311095 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Nested Loop (cost=1.13..2899612.01 rows=143808
width=27) (actual time=0.346..10385.472 rows=62906 loops=5)
-> Parallel Index Scan using idx_l_shipmode on
lineitem (cost=0.57..2796168.46 rows=143808 width=19) (actual
time=0.280..9004.095 rows=62906 loops=5)
Index Cond: (l_shipmode = ANY ('{"REG
AIR",RAIL}'::bpchar[]))
Filter: ((l_commitdate < l_receiptdate) AND
(l_shipdate < l_commitdate) AND (l_receiptdate >= '1995-01-01'::date)
AND (l_receiptdate < '1996-01-01 00:00:00'::timestamp without time
zone))
Rows Removed by Filter: 3402367
-> Index Scan using orders_pkey on orders
(cost=0.56..0.72 rows=1 width=20) (actual time=0.020..0.020 rows=1
loops=314530)
Index Cond: (o_orderkey = lineitem.l_orderkey)
Planning time: 1.202 ms
Execution time: 42841.895 ms
(15 rows)

Patch:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=1001.19..426457.34 rows=1 width=27) (actual
time=19461.653..19461.654 rows=1 loops=1)
-> GroupAggregate (cost=1001.19..2979194.24 rows=7 width=27)
(actual time=19461.651..19461.651 rows=1 loops=1)
Group Key: lineitem.l_shipmode
-> Gather Merge (cost=1001.19..2969127.63 rows=575231
width=27) (actual time=10.239..18783.386 rows=311095 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Nested Loop (cost=1.13..2899612.01 rows=143808
width=27) (actual time=0.376..19109.107 rows=66104 loops=5)
-> Parallel Index Scan using idx_l_shipmode on
lineitem (cost=0.57..2796168.46 rows=143808 width=19) (actual
time=0.310..16615.236 rows=66104 loops=5)
Index Cond: (l_shipmode = ANY ('{"REG
AIR",RAIL}'::bpchar[]))
Filter: ((l_commitdate < l_receiptdate) AND
(l_shipdate < l_commitdate) AND (l_receiptdate >= '1995-01-01'::date)
AND (l_receiptdate < '1996-01-01 00:00:00'::timestamp with out time
zone))
Rows Removed by Filter: 3574492
-> Index Scan using orders_pkey on orders
(cost=0.56..0.72 rows=1 width=20) (actual time=0.034..0.034 rows=1
loops=330519)
Index Cond: (o_orderkey = lineitem.l_orderkey)
Planning time: 3.498 ms
Execution time: 19661.054 ms
(15 rows)

This suggests that with such an idea the range of selectivity for
using parallelism can be extended for improving the performance of the
queries.

Credits:
Would like to extend thanks to my colleagues Dilip Kumar, Amit Kapila,
and Robert Haas for their discussions and words of encouragement
throughout the development of this patch.

Feedback and suggestions are welcome.

--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachments:

faster_gather_v1.patchapplication/octet-stream; name=faster_gather_v1.patchDownload
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index a4cfe96..2c176cd 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -578,6 +578,7 @@ tqueueShutdownReceiver(DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
+	empty_queue(tqueue->queue);
 	shm_mq_detach(shm_mq_get_queue(tqueue->queue));
 }
 
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f45a67c..0ad3ddb 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -79,6 +79,24 @@ struct shm_mq
 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
 };
 
+/* This is the structure for local queue where a worker can write
+ * tuples when it's shared queue is full.
+ *
+ * Each worker has it's own local queue where it can store tuples
+ * when master is busy and worker's shared queue gets full. Tuples
+ * are copied into shared queue via single memcpy equal to the space
+ * available in shared queue. Since, local queue is never shared with
+ * the master, we do not require any locking mechanism to write tuples
+ * in it, hence writing in local queue is a cheap operation.
+ */
+struct local_mq
+{
+	uint64		mq_bytes_read;
+	uint64		mq_bytes_written;
+	Size		mq_ring_size;
+	uint8		mq_ring_offset;
+	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
 /*
  * This structure is a backend-private handle for access to a queue.
  *
@@ -126,7 +144,9 @@ struct shm_mq
  */
 struct shm_mq_handle
 {
+	bool		mqh_local;
 	shm_mq	   *mqh_queue;
+	local_mq	*mqh_local_queue;
 	dsm_segment *mqh_segment;
 	BackgroundWorkerHandle *mqh_handle;
 	char	   *mqh_buffer;
@@ -147,12 +167,23 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 					 BackgroundWorkerHandle *handle);
+static bool shm_mq_is_detached(volatile shm_mq *mq);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+static uint64 space_in_shm(shm_mq *mq);
+
+/* Routines required for local queue */
+static local_mq *local_mq_create(void *address, Size size);
+static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh);
+static uint64 space_in_local(local_mq *lq, Size tuple_size);
+static bool read_local_queue(local_mq *lq, bool shm_mq_full);
+static shm_mq_result write_in_local_queue(local_mq *mq, shm_mq_iovec *iov);
+static void local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written);
+static shm_mq_result copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool read_anyway);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
 const Size	shm_mq_minimum_size =
@@ -286,6 +317,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
 
 	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+	mqh->mqh_local = false;
+	mqh->mqh_local_queue = NULL;
 	mqh->mqh_queue = mq;
 	mqh->mqh_segment = seg;
 	mqh->mqh_buffer = NULL;
@@ -315,17 +348,129 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 }
 
 /*
- * Write a message into a shared message queue.
+ * Write a message into a shared or local message queue, as per the space
+ * availability in these queues. If space is available in shared queue then
+ * we simply write the message there and return. Else we write it in local
+ * queue. Once both the queues are full, we wait till some of the data in
+ * shared queue is read and then copy the data from local to shared queue
+ * and continue writing in local queue. After writing in local queue we
+ * check if there is space available in shared queue and we copy the data
+ * from local to shared queue then itself.
  */
 shm_mq_result
 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 {
 	shm_mq_iovec iov;
+	local_mq *lq;
+	shm_mq_result res;
+	Size tuple_size;
+	uint64 local_space, shm_space;
 
 	iov.data = data;
 	iov.len = nbytes;
+	/* this is actual size for this tuple which will be written in queue */
+	tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len);
+
+	/* create and attach a local queue, if it is not yet created */
+	if (mqh->mqh_local_queue == NULL)
+		mqh = local_mq_attach(mqh);
+
+	lq = mqh->mqh_local_queue;
+
+	local_space = space_in_local(lq, tuple_size);
+
+	/*
+	 * if never been to local_queue yet or local_queue is empty, then check
+	 * space in shared queue
+	 */
+	if (!mqh->mqh_local || local_space == 0)
+		shm_space = space_in_shm(mqh->mqh_queue);
+
+	/*
+	 * if there is enough space in shared_queue then write the tuple in it.
+	 */
+	if(shm_space > tuple_size && !mqh->mqh_local)
+		res = shm_mq_sendv(mqh, &iov, 1, nowait);
+	else
+	{
+		/*
+		 * once started with local queue, the tuples will flow from local to
+		 * shared queue untill local queue is empty
+		 */
+		mqh->mqh_local = true;
+		if (shm_mq_is_detached(mqh->mqh_queue))
+				return SHM_MQ_DETACHED;
+
+		local_space = space_in_local(lq, tuple_size);
+
+		/* write in local queue if there is enough space*/
+		if (local_space > tuple_size)
+		{
+			res = write_in_local_queue(lq, &iov);
+
+			/* check is there is required space in shared queue */
+			shm_space = space_in_shm(mqh->mqh_queue);
 
-	return shm_mq_sendv(mqh, &iov, 1, nowait);
+			/*
+			 * if we have some data in local queue and enough space
+			 * is available in shared queue then copy it to shared queue
+			 */
+			if (read_local_queue(lq, false) && shm_space > 0)
+				copy_local_to_shared(lq, mqh, false);
+
+			if (shm_mq_is_detached(mqh->mqh_queue))
+				return SHM_MQ_DETACHED;
+		}
+		else
+		{
+			/*
+			 * if local queue is full, then copy some data to shared queue till enough
+			 * space becomes available in local queue
+			 */
+			do
+			{
+				if (shm_mq_is_detached(mqh->mqh_queue))
+					return SHM_MQ_DETACHED;
+
+				shm_space = space_in_shm(mqh->mqh_queue);
+
+				/*
+				 * cannot send data to shared queue, unless there is required
+				 * space, so keep on trying till we get some space, since we
+				 * cannot write anymore in local queue as of now
+				 */
+				while(shm_space <= 0)
+				{
+					if (shm_mq_is_detached(mqh->mqh_queue))
+						return SHM_MQ_DETACHED;
+
+					shm_space = space_in_shm(mqh->mqh_queue);
+				}
+				if (read_local_queue(lq, true) && shm_space > 0)
+					copy_local_to_shared(lq, mqh, false);
+
+				local_space = space_in_local(lq, tuple_size);
+
+				if (shm_mq_is_detached(mqh->mqh_queue))
+					return SHM_MQ_DETACHED;
+
+			}while (local_space <= tuple_size);
+
+			/*
+			 * once space is available in local queue, write the tuple appropriately.
+			 * Write tuple in local queue unless it has become empty, then write in
+			 * shared queue itself.
+			 */
+			if (local_space > 0)
+				res = write_in_local_queue(lq, &iov);
+			else
+			{
+				mqh->mqh_local = false;
+				res = shm_mq_sendv(mqh, &iov, 1, nowait);
+			}
+		}
+	}
+	return res;
 }
 
 /*
@@ -1102,7 +1247,18 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 
 	return result;
 }
-
+/*
+ * Get if the shm_mq is deatched.
+ */
+static bool
+shm_mq_is_detached(volatile shm_mq *mq)
+{
+	bool ret;
+	SpinLockAcquire(&mq->mq_mutex);
+	ret = mq->mq_detached;
+	SpinLockRelease(&mq->mq_mutex);
+	return ret;
+}
 /*
  * Get the number of bytes read.  The receiver need not use this to access
  * the count of bytes read, but the sender must.
@@ -1195,3 +1351,265 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 
 	shm_mq_detach(mq);
 }
+
+/* Routines required for local queue */
+
+/*
+ * Initialize a new local message queue, this is kept quite similar to shm_mq_create.
+ */
+static local_mq *
+local_mq_create(void *address, Size size)
+{
+	local_mq	   *mq = address;
+	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+	size = MAXALIGN_DOWN(size);
+
+	/* Queue size must be large enough to hold some data. */
+	Assert(size > data_offset);
+
+	/* Initialize queue header. */
+	mq->mq_bytes_read = 0;
+	mq->mq_bytes_written = 0;
+	mq->mq_ring_size = size - data_offset;
+	mq->mq_ring_offset = data_offset - offsetof(local_mq, mq_ring);
+	return mq;
+}
+/* routine to create and attach local_mq to the shm_mq_handle */
+static shm_mq_handle *
+local_mq_attach(shm_mq_handle *mqh)
+{
+	/*
+	 * create a local queue, the size of this queue should be way higher
+	 * than PARALLEL_TUPLE_QUEUE_SIZE
+	 */
+	char *mq;
+	Size len;
+
+	len = 6553600;
+	mq = palloc0(len);
+	mqh->mqh_local_queue = local_mq_create(mq, len);
+
+	return mqh;
+}
+
+/* check the space availability in local queue */
+static uint64
+space_in_local(local_mq *lq, Size tuple_size)
+{
+		uint64 read, written, used, available, ringsize, writer_offset, reader_offset;
+
+		ringsize = lq->mq_ring_size;
+		read = lq->mq_bytes_read;
+		written = lq->mq_bytes_written;
+		used = written - read;
+		available = ringsize - used;
+
+		ringsize = lq->mq_ring_size;
+		writer_offset = lq->mq_bytes_written % ringsize;
+		reader_offset = lq->mq_bytes_read % ringsize;
+
+		if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset)
+			available = (ringsize - writer_offset);
+
+			return available;
+}
+/* routine to check if there is enough space in shared_queue */
+static uint64
+space_in_shm(shm_mq *mq)
+{
+		uint64 read, written, used, available, ringsize;
+		bool detached;
+
+		detached = false;
+		ringsize = mq->mq_ring_size;
+		read = shm_mq_get_bytes_read(mq, &detached);
+		written = shm_mq_get_bytes_written(mq, &detached);
+
+		used = written - read;
+		available = ringsize - used;
+
+		return available;
+}
+
+/*
+ * Routine to check if reading from local queue is possible. If local
+ * queue is atleast 5% used or shm_mq is full then we allow reading
+ * from local queue
+ */
+static bool
+read_local_queue(local_mq *lq, bool shm_mq_full)
+{
+	uint64 written, read;
+
+	written = lq->mq_bytes_written;
+	read = lq->mq_bytes_read;
+
+	if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size)
+		return true;
+
+	else
+		return true;
+}
+
+/* Routine to write tuple in local queue. */
+static shm_mq_result
+write_in_local_queue(local_mq *lq, shm_mq_iovec *iov)
+{
+	uint64 bytes_written, nbytes, tuple_size;
+	Size chunksize;
+	int i;
+
+	tuple_size = sizeof(Size) + iov->len;
+	nbytes = 0;
+	bytes_written = 0;
+
+	/* Compute total size of write. */
+	for (i = 0; i < 1; ++i)
+		nbytes += iov[i].len;
+
+	local_mq_send_bytes(lq, sizeof(Size), ((char *) &nbytes), &bytes_written);
+
+	chunksize = iov[0].len;
+	local_mq_send_bytes(lq, chunksize, &iov[0].data[0], &bytes_written);
+
+	Assert(bytes_written > 0);
+	Assert(bytes_written == tuple_size);
+	return SHM_MQ_SUCCESS;
+}
+
+/* Routine to pass a batch of tuples from local to shared queue in one go */
+static shm_mq_result
+copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool nowait)
+{
+	uint64 to_read, bytes_read, read_offset, available, used;
+	char *data;
+	shm_mq_result res;
+
+	bytes_read = 0;
+
+	used = lq->mq_bytes_written - lq->mq_bytes_read;
+	Assert(used <= lq->mq_ring_size);
+	Assert(lq->mq_bytes_read <= lq->mq_bytes_written);
+	read_offset = lq->mq_bytes_read % lq->mq_ring_size;
+	available = space_in_shm(mqh->mqh_queue);
+
+	/* always read data in the aligned form */
+	to_read = MAXALIGN_DOWN(Min(used, available));
+
+	/*
+	 * if the amount of data to be send from local queue involves wrapping of
+	 * local queue, then send only the data till the end of queue right now
+	 * and rest later.
+	 */
+	if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size)
+		to_read = lq->mq_ring_size - (lq->mq_bytes_read % lq->mq_ring_size);
+
+	if (shm_mq_is_detached(mqh->mqh_queue))
+		return SHM_MQ_DETACHED;
+
+	Assert(to_read > 0);
+
+	data = &(lq->mq_ring[lq->mq_ring_offset + read_offset]);
+
+	res = shm_mq_send_bytes(mqh, to_read, data, nowait, &bytes_read);
+
+	if(res == SHM_MQ_DETACHED)
+		return SHM_MQ_DETACHED;
+
+	if (res != SHM_MQ_SUCCESS)
+		return res;
+
+	Assert(bytes_read == to_read);
+	lq->mq_bytes_read += bytes_read;
+	shm_mq_notify_receiver(mqh->mqh_queue);
+
+	return res;
+}
+/*
+ * This is the function which actually writes the tuple in the local_queue,
+ * it is same as shm_mq_send_bytes is for shm_mq.
+ */
+static void
+local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written)
+{
+	uint64		used;
+	Size		ringsize = mq->mq_ring_size;
+	Size		available, sent = 0, sendnow;
+
+	uint64		rb;
+
+	while(sent < nbytes)
+	{
+		/* Compute number of ring buffer bytes used and available. */
+		rb = mq->mq_bytes_read;
+		Assert (mq->mq_bytes_written >= rb);
+		used = mq->mq_bytes_written - rb;
+		Assert(used <= ringsize);
+		available = Min(ringsize - used, nbytes - sent);
+
+		if(available == 0)
+			elog(ERROR,"local queue full, this should never be reached");
+
+		else
+		{
+			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
+			sendnow = Min(available, ringsize - offset);
+
+			/* Write as much data as we can via a single memcpy(). */
+			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow);
+			sent += sendnow;
+			/*
+			 * Update count of bytes written, with alignment padding.  Note
+			 * that this will never actually insert any padding except at the
+			 * end of a run of bytes, because the buffer size is a multiple of
+			 * MAXIMUM_ALIGNOF, and each read is as well.
+			 */
+			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+			mq->mq_bytes_written += MAXALIGN(sendnow);
+		}
+	}
+	*bytes_written += sent;
+}
+/*
+ * Empty the local queue by copying all the data from local to shared queue.
+ * This is required before shutdown of worker.
+ */
+void
+empty_queue(shm_mq_handle *mqh)
+{
+	shm_mq_result res;
+	local_mq *lq;
+	uint64 shm_space;
+
+	lq = mqh->mqh_local_queue;
+
+	if (lq == NULL || lq->mq_bytes_written == 0 || shm_mq_is_detached(mqh->mqh_queue))
+		return;
+
+	while(lq->mq_bytes_written > lq->mq_bytes_read)
+	{
+		shm_space = space_in_shm(mqh->mqh_queue);
+		/*
+		 * cannot send data to shared queue, unless there is required
+		 * space, so keep on trying till we get some space
+		 */
+		while(shm_space <= 0)
+		{
+			if (shm_mq_is_detached(mqh->mqh_queue))
+				return;
+
+			shm_space = space_in_shm(mqh->mqh_queue);
+		}
+
+		if (read_local_queue(lq, true))
+			res = copy_local_to_shared(lq, mqh, false);
+
+		if (res == SHM_MQ_DETACHED)
+			return;
+	}
+	/* this local queue is not required anymore, hence free the space. */
+	pfree(mqh->mqh_local_queue);
+	return;
+}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 02a93e0..6ba3cb3 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -37,7 +37,7 @@ typedef enum
 {
 	SHM_MQ_SUCCESS,				/* Sent or received a message. */
 	SHM_MQ_WOULD_BLOCK,			/* Not completed; retry later. */
-	SHM_MQ_DETACHED				/* Other process has detached queue. */
+	SHM_MQ_DETACHED			/* Other process has detached queue. */
 } shm_mq_result;
 
 /*
@@ -82,4 +82,8 @@ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 /* Smallest possible queue. */
 extern PGDLLIMPORT const Size shm_mq_minimum_size;
 
+/* Routines and structures required for local and shared queue type architecture */
+extern void empty_queue(shm_mq_handle *mqh);
+struct local_mq;
+typedef struct local_mq local_mq;
 #endif							/* SHM_MQ_H */
#14Dilip Kumar
dilipbalaut@gmail.com
In reply to: Rafia Sabih (#13)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

I worked on this idea of using local queue as a temporary buffer to
write the tuples when master is busy and shared queue is full, and it
gives quite some improvement in the query performance.

I have done some initial review of this patch and I have some comments.

/* this is actual size for this tuple which will be written in queue */
+ tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len);
+
+ /* create and attach a local queue, if it is not yet created */
+ if (mqh->mqh_local_queue == NULL)
+ mqh = local_mq_attach(mqh);

I think we can create the local queue when first time we need it. So
basically you
can move this code inside else part where we first identify that there is no
space in the shared queue.

------
+ /* write in local queue if there is enough space*/
+ if (local_space > tuple_size)

I think the condition should be if (local_space >= tuple_size)

------
+ while(shm_space <= 0)
+ {
+ if (shm_mq_is_detached(mqh->mqh_queue))
+ return SHM_MQ_DETACHED;
+
+ shm_space = space_in_shm(mqh->mqh_queue);
+ }

Instead of waiting in CPU intensive while loop we should wait on some latch, why
can't we use wait latch of the shared queue and whenever some space
free, the latch will
be set then we can recheck the space and if it's enough we can write
to share queue
otherwise wait on the latch again.

Check other similar occurrences.
---------

+ if (read_local_queue(lq, true) && shm_space > 0)
+ copy_local_to_shared(lq, mqh, false);
+

Instead of adding shm_space > 0 in check it can be Assert or nothing
because above loop will
only break if shm_space > 0.
----

+ /*
+ * create a local queue, the size of this queue should be way higher
+ * than PARALLEL_TUPLE_QUEUE_SIZE
+ */
+ char *mq;
+ Size len;
+
+ len = 6553600;

Create some macro which is multiple of PARALLEL_TUPLE_QUEUE_SIZE,

-------

+ /* this local queue is not required anymore, hence free the space. */
+ pfree(mqh->mqh_local_queue);
+ return;
+}

I think you can remove the return at the end of the void function.
-----

In empty_queue(shm_mq_handle *mqh) function I see that only last exit path frees
the local queue but not all even though local queue is created.
----

Other cosmetic issues.
-----------------------------
+/* check the space availability in local queue */
+static uint64
+space_in_local(local_mq *lq, Size tuple_size)
+{
+ uint64 read, written, used, available, ringsize, writer_offset, reader_offset;
+
+ ringsize = lq->mq_ring_size;
+ read = lq->mq_bytes_read;
+ written = lq->mq_bytes_written;
+ used = written - read;
+ available = ringsize - used;
+
Alignment is not correct, I think you can run pgindent once.
----

+ /* check is there is required space in shared queue */
statement need refactoring. "check if there is required space in shared queue" ?

-----

+ /* write in local queue if there is enough space*/
space missing before comment end.

-----

+
+/* Routines required for local queue */
+
+/*
+ * Initialize a new local message queue, this is kept quite similar
to shm_mq_create.
+ */

Header comments formatting is not in sync with other functions.

-----

+}
+/* routine to create and attach local_mq to the shm_mq_handle */

one blank line between two functions.

-----

+ bool detached;
+
+ detached = false;

a better way is bool detached = false;

-----

Compilation warning
--------------------
shm_mq.c: In function ‘write_in_local_queue’:
shm_mq.c:1489:32: warning: variable ‘tuple_size’ set but not used
[-Wunused-but-set-variable]
uint64 bytes_written, nbytes, tuple_size;

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#15Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Dilip Kumar (#14)
1 attachment(s)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

I worked on this idea of using local queue as a temporary buffer to
write the tuples when master is busy and shared queue is full, and it
gives quite some improvement in the query performance.

I have done some initial review of this patch and I have some comments.

Thanks for the review.

/* this is actual size for this tuple which will be written in queue */
+ tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len);
+
+ /* create and attach a local queue, if it is not yet created */
+ if (mqh->mqh_local_queue == NULL)
+ mqh = local_mq_attach(mqh);

I think we can create the local queue when first time we need it. So
basically you
can move this code inside else part where we first identify that there is no
space in the shared queue.

Done.

------
+ /* write in local queue if there is enough space*/
+ if (local_space > tuple_size)

I think the condition should be if (local_space >= tuple_size)

I did this to be on the safer side, anyhow modified.

------
+ while(shm_space <= 0)
+ {
+ if (shm_mq_is_detached(mqh->mqh_queue))
+ return SHM_MQ_DETACHED;
+
+ shm_space = space_in_shm(mqh->mqh_queue);
+ }

Instead of waiting in CPU intensive while loop we should wait on some latch, why
can't we use wait latch of the shared queue and whenever some space
free, the latch will
be set then we can recheck the space and if it's enough we can write
to share queue
otherwise wait on the latch again.

Check other similar occurrences.

Done.

---------

+ if (read_local_queue(lq, true) && shm_space > 0)
+ copy_local_to_shared(lq, mqh, false);
+

Instead of adding shm_space > 0 in check it can be Assert or nothing
because above loop will
only break if shm_space > 0.
----

Done

+ /*
+ * create a local queue, the size of this queue should be way higher
+ * than PARALLEL_TUPLE_QUEUE_SIZE
+ */
+ char *mq;
+ Size len;
+
+ len = 6553600;

Create some macro which is multiple of PARALLEL_TUPLE_QUEUE_SIZE,

Done.

-------

+ /* this local queue is not required anymore, hence free the space. */
+ pfree(mqh->mqh_local_queue);
+ return;
+}

I think you can remove the return at the end of the void function.
-----

Done.

In empty_queue(shm_mq_handle *mqh) function I see that only last exit path frees
the local queue but not all even though local queue is created.
----

Modified.

Other cosmetic issues.
-----------------------------
+/* check the space availability in local queue */
+static uint64
+space_in_local(local_mq *lq, Size tuple_size)
+{
+ uint64 read, written, used, available, ringsize, writer_offset, reader_offset;
+
+ ringsize = lq->mq_ring_size;
+ read = lq->mq_bytes_read;
+ written = lq->mq_bytes_written;
+ used = written - read;
+ available = ringsize - used;
+
Alignment is not correct, I think you can run pgindent once.
----

+ /* check is there is required space in shared queue */
statement need refactoring. "check if there is required space in shared queue" ?

-----

+ /* write in local queue if there is enough space*/
space missing before comment end.

-----

+
+/* Routines required for local queue */
+
+/*
+ * Initialize a new local message queue, this is kept quite similar
to shm_mq_create.
+ */

Header comments formatting is not in sync with other functions.

-----

+}
+/* routine to create and attach local_mq to the shm_mq_handle */

one blank line between two functions.

-----

Ran pgindent for these issues.

+ bool detached;
+
+ detached = false;

a better way is bool detached = false;

-----

Done.

Compilation warning
--------------------
shm_mq.c: In function ‘write_in_local_queue’:
shm_mq.c:1489:32: warning: variable ‘tuple_size’ set but not used
[-Wunused-but-set-variable]
uint64 bytes_written, nbytes, tuple_size;

That might be in case not configured with cassert, however, it is
removed in current version anyway.

Please find the attached file for the revised version.

--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachments:

faster_gather_v2.patchapplication/octet-stream; name=faster_gather_v2.patchDownload
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index e9a5d5a1a5..b4f8386899 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -90,7 +90,10 @@ tqueueShutdownReceiver(DestReceiver *self)
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
 	if (tqueue->queue != NULL)
+	{
+		empty_queue(tqueue->queue);
 		shm_mq_detach(tqueue->queue);
+	}
 	tqueue->queue = NULL;
 }
 
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 770559a03e..75273ba51c 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -25,6 +25,8 @@
 #include "storage/shm_mq.h"
 #include "storage/spin.h"
 
+#define LOCAL_TUPLE_QUEUE_SIZE		6553600
+
 /*
  * This structure represents the actual queue, stored in shared memory.
  *
@@ -79,6 +81,25 @@ struct shm_mq
 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
 };
 
+/* This is the structure for local queue where a worker can write
+ * tuples when it's shared queue is full.
+ *
+ * Each worker has it's own local queue where it can store tuples
+ * when master is busy and worker's shared queue gets full. Tuples
+ * are copied into shared queue via single memcpy equal to the space
+ * available in shared queue. Since, local queue is never shared with
+ * the master, we do not require any locking mechanism to write tuples
+ * in it, hence writing in local queue is a cheap operation.
+ */
+struct local_mq
+{
+	uint64		mq_bytes_read;
+	uint64		mq_bytes_written;
+	Size		mq_ring_size;
+	uint8		mq_ring_offset;
+	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /*
  * This structure is a backend-private handle for access to a queue.
  *
@@ -128,7 +149,9 @@ struct shm_mq
  */
 struct shm_mq_handle
 {
+	bool		mqh_local;
 	shm_mq	   *mqh_queue;
+	local_mq   *mqh_local_queue;
 	dsm_segment *mqh_segment;
 	BackgroundWorkerHandle *mqh_handle;
 	char	   *mqh_buffer;
@@ -150,12 +173,23 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 					 BackgroundWorkerHandle *handle);
+static bool shm_mq_is_detached(volatile shm_mq *mq);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+static uint64 space_in_shm(shm_mq *mq);
+
+/* Routines required for local queue */
+static local_mq * local_mq_create(void *address, Size size);
+static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh);
+static uint64 space_in_local(local_mq * lq, Size tuple_size);
+static bool read_local_queue(local_mq * lq);
+static shm_mq_result write_in_local_queue(local_mq * mq, shm_mq_iovec *iov);
+static void local_mq_send_bytes(local_mq * mq, Size nbytes, const void *data, Size *bytes_written);
+static shm_mq_result copy_local_to_shared(local_mq * lq, shm_mq_handle *mqh, bool read_anyway);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
 const Size	shm_mq_minimum_size =
@@ -289,6 +323,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
 
 	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+	mqh->mqh_local = false;
+	mqh->mqh_local_queue = NULL;
 	mqh->mqh_queue = mq;
 	mqh->mqh_segment = seg;
 	mqh->mqh_handle = handle;
@@ -319,17 +355,117 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 }
 
 /*
- * Write a message into a shared message queue.
+ * Write a message into a shared or local message queue, as per the space
+ * availability in these queues. If space is available in shared queue then
+ * we simply write the message there and return. Else we write it in local
+ * queue. Once both the queues are full, we wait till some of the data in
+ * shared queue is read and then copy the data from local to shared queue
+ * and continue writing in local queue. After writing in local queue we
+ * check if there is space available in shared queue and we copy the data
+ * from local to shared queue then itself.
  */
 shm_mq_result
 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 {
 	shm_mq_iovec iov;
+	local_mq   *lq;
+	shm_mq_result res;
+	Size		tuple_size;
+	uint64		local_space;
 
 	iov.data = data;
 	iov.len = nbytes;
+	/* this is actual size for this tuple which will be written in queue */
+	tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len);
+
+	/*
+	 * if there is enough space in shared_queue and never been to local queue
+	 * then write the tuple in shared queue.
+	 */
+	if (space_in_shm(mqh->mqh_queue) > tuple_size && !mqh->mqh_local)
+		res = shm_mq_sendv(mqh, &iov, 1, nowait);
+
+	else
+	{
+		/* if queue is detached for some reason, nothing to do */
+		if (shm_mq_is_detached(mqh->mqh_queue))
+			return SHM_MQ_DETACHED;
+
+		/*
+		 * once started with local queue, the tuples will flow from local to
+		 * shared queue untill local queue is empty
+		 */
+		mqh->mqh_local = true;
+
+		/* create and attach a local queue, if it is not yet created */
+		if (mqh->mqh_local_queue == NULL)
+			mqh = local_mq_attach(mqh);
 
-	return shm_mq_sendv(mqh, &iov, 1, nowait);
+		lq = mqh->mqh_local_queue;
+		local_space = space_in_local(lq, tuple_size);
+
+		/* write in local queue if there is enough space */
+		if (local_space >= tuple_size)
+		{
+			res = write_in_local_queue(lq, &iov);
+
+			/*
+			 * if we have some data in local queue and some space in shared
+			 * queue then copy it to shared queue
+			 */
+			if (read_local_queue(lq) && space_in_shm(mqh->mqh_queue) > 0)
+				copy_local_to_shared(lq, mqh, false);
+		}
+		else
+		{
+			/*
+			 * if local queue is full, then copy some data to shared queue
+			 * till enough space becomes available in local queue
+			 */
+			do
+			{
+				while (space_in_shm(mqh->mqh_queue) < tuple_size)
+				{
+					/*
+					 * cannot send data to shared queue, unless there is
+					 * required space, so wait till we get some space, since
+					 * we cannot write anymore in local queue as of now
+					 */
+					WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
+
+					/* Reset the latch so we don't spin. */
+					ResetLatch(MyLatch);
+
+					/* An interrupt may have occurred while we were waiting. */
+					CHECK_FOR_INTERRUPTS();
+
+					/* if queue is detached then nothing to do */
+					if (shm_mq_is_detached(mqh->mqh_queue))
+						return SHM_MQ_DETACHED;
+				}
+				if (read_local_queue(lq))
+					copy_local_to_shared(lq, mqh, false);
+
+				local_space = space_in_local(lq, tuple_size);
+
+			} while (local_space <= tuple_size);
+
+			/*
+			 * once space is available in local queue, write the tuple
+			 * appropriately. If local queue has become empty, then write the
+			 * tuple in shared queue itself, otherwise continue with local
+			 * queue itself.
+			 */
+			if (local_space > 0)
+				res = write_in_local_queue(lq, &iov);
+			else
+			{
+				mqh->mqh_local = false;
+				res = shm_mq_sendv(mqh, &iov, 1, nowait);
+			}
+		}
+	}
+	return res;
 }
 
 /*
@@ -1133,6 +1269,20 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 }
 
 /*
+ * Get if the shm_mq is deatched.
+ */
+static bool
+shm_mq_is_detached(volatile shm_mq *mq)
+{
+	bool		ret;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	ret = mq->mq_detached;
+	SpinLockRelease(&mq->mq_mutex);
+	return ret;
+}
+
+/*
  * Get the number of bytes read.  The receiver need not use this to access
  * the count of bytes read, but the sender must.
  */
@@ -1224,3 +1374,268 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 
 	shm_mq_detach_internal(mq);
 }
+
+/* Routines required for local queue */
+
+/*
+ * Initialize a new local message queue, this is kept quite similar to shm_mq_create.
+ */
+static local_mq *
+local_mq_create(void *address, Size size)
+{
+	local_mq   *mq = address;
+	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+	size = MAXALIGN_DOWN(size);
+
+	/* Queue size must be large enough to hold some data. */
+	Assert(size > data_offset);
+
+	/* Initialize queue header. */
+	mq->mq_bytes_read = 0;
+	mq->mq_bytes_written = 0;
+	mq->mq_ring_size = size - data_offset;
+	mq->mq_ring_offset = data_offset - offsetof(local_mq, mq_ring);
+	return mq;
+}
+
+/* routine to create and attach local_mq to the shm_mq_handle */
+static shm_mq_handle *
+local_mq_attach(shm_mq_handle *mqh)
+{
+	/*
+	 * create a local queue, the size of this queue should be way higher than
+	 * PARALLEL_TUPLE_QUEUE_SIZE
+	 */
+	char	   *mq;
+	Size		len;
+
+	len = LOCAL_TUPLE_QUEUE_SIZE;
+	mq = palloc0(len);
+	mqh->mqh_local_queue = local_mq_create(mq, len);
+
+	return mqh;
+}
+
+/* check the space availability in local queue */
+static uint64
+space_in_local(local_mq * lq, Size tuple_size)
+{
+	uint64		read,
+				written,
+				used,
+				available,
+				ringsize,
+				writer_offset,
+				reader_offset;
+
+	ringsize = lq->mq_ring_size;
+	read = lq->mq_bytes_read;
+	written = lq->mq_bytes_written;
+	used = written - read;
+	available = ringsize - used;
+
+	ringsize = lq->mq_ring_size;
+	writer_offset = lq->mq_bytes_written % ringsize;
+	reader_offset = lq->mq_bytes_read % ringsize;
+
+	if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset)
+		available = (ringsize - writer_offset);
+
+	return available;
+}
+
+/* routine to check if there is enough space in shared_queue */
+static uint64
+space_in_shm(shm_mq *mq)
+{
+	uint64		read,
+				written,
+				used,
+				available,
+				ringsize;
+	bool		detached = false;
+
+	ringsize = mq->mq_ring_size;
+	read = shm_mq_get_bytes_read(mq, &detached);
+	written = shm_mq_get_bytes_written(mq, &detached);
+
+	used = written - read;
+	available = ringsize - used;
+
+	return available;
+}
+
+/*
+ * Routine to check if reading from local queue is possible. If local
+ * queue is atleast 5% used then we allow reading from local queue
+ */
+static bool
+read_local_queue(local_mq * lq)
+{
+	uint64		written,
+				read;
+
+	written = lq->mq_bytes_written;
+	read = lq->mq_bytes_read;
+
+	if ((written - read) >= .05 * lq->mq_ring_size)
+		return true;
+	else
+		return false;
+}
+
+/* Routine to write tuple in local queue. */
+static shm_mq_result
+write_in_local_queue(local_mq * lq, shm_mq_iovec *iov)
+{
+	uint64		bytes_written,
+				nbytes,
+				tuple_size;
+	Size		chunksize;
+	int			i;
+
+	tuple_size = sizeof(Size) + iov->len;
+	nbytes = 0;
+	bytes_written = 0;
+
+	/* Compute total size of write. */
+	for (i = 0; i < 1; ++i)
+		nbytes += iov[i].len;
+
+	local_mq_send_bytes(lq, sizeof(Size), ((char *) &nbytes), &bytes_written);
+
+	chunksize = iov[0].len;
+	local_mq_send_bytes(lq, chunksize, &iov[0].data[0], &bytes_written);
+
+	Assert(bytes_written > 0);
+	Assert(bytes_written == tuple_size);
+	return SHM_MQ_SUCCESS;
+}
+
+/* Routine to pass a batch of tuples from local to shared queue in one go */
+static shm_mq_result
+copy_local_to_shared(local_mq * lq, shm_mq_handle *mqh, bool nowait)
+{
+	uint64		to_read,
+				bytes_read,
+				read_offset,
+				available,
+				used;
+	char	   *data;
+	shm_mq_result res;
+
+	bytes_read = 0;
+
+	if (shm_mq_is_detached(mqh->mqh_queue))
+		return SHM_MQ_DETACHED;
+
+	used = lq->mq_bytes_written - lq->mq_bytes_read;
+	Assert(used <= lq->mq_ring_size);
+	Assert(lq->mq_bytes_read <= lq->mq_bytes_written);
+	read_offset = lq->mq_bytes_read % lq->mq_ring_size;
+	available = space_in_shm(mqh->mqh_queue);
+
+	/* always read data in aligned form */
+	to_read = MAXALIGN_DOWN(Min(used, available));
+
+	/*
+	 * if the amount of data to be send from local queue involves wrapping of
+	 * local queue, then send only the data till the end of queue right now
+	 * and rest later.
+	 */
+	if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size)
+		to_read = lq->mq_ring_size - (lq->mq_bytes_read % lq->mq_ring_size);
+
+	data = &(lq->mq_ring[lq->mq_ring_offset + read_offset]);
+	res = shm_mq_send_bytes(mqh, to_read, data, nowait, &bytes_read);
+
+	if (res != SHM_MQ_SUCCESS)
+		return res;
+
+	Assert(bytes_read == to_read);
+	lq->mq_bytes_read += bytes_read;
+	shm_mq_notify_receiver(mqh->mqh_queue);
+
+	return res;
+}
+
+/*
+ * This is the function which actually writes the tuple in the local_queue,
+ * it is same as shm_mq_send_bytes is for shm_mq.
+ */
+static void
+local_mq_send_bytes(local_mq * mq, Size nbytes, const void *data, Size *bytes_written)
+{
+	uint64		used;
+	Size		ringsize = mq->mq_ring_size;
+	Size		available,
+				sent = 0,
+				sendnow;
+
+	uint64		rb;
+
+	while (sent < nbytes)
+	{
+		/* Compute number of ring buffer bytes used and available. */
+		rb = mq->mq_bytes_read;
+		Assert(mq->mq_bytes_written >= rb);
+		used = mq->mq_bytes_written - rb;
+		Assert(used <= ringsize);
+		available = Min(ringsize - used, nbytes - sent);
+
+		if (available == 0)
+			elog(ERROR, "local queue full, this should never be reached");
+
+		else
+		{
+			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
+
+			sendnow = Min(available, ringsize - offset);
+
+			/* Write as much data as we can via a single memcpy(). */
+			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow);
+			sent += sendnow;
+
+			/*
+			 * Update count of bytes written, with alignment padding.  Note
+			 * that this will never actually insert any padding except at the
+			 * end of a run of bytes, because the buffer size is a multiple of
+			 * MAXIMUM_ALIGNOF, and each read is as well.
+			 */
+			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+			mq->mq_bytes_written += MAXALIGN(sendnow);
+		}
+	}
+	*bytes_written += sent;
+}
+
+/*
+ * Empty the local queue by copying all the data from local to shared queue.
+ * This is required before shutdown of worker.
+ */
+void
+empty_queue(shm_mq_handle *mqh)
+{
+	local_mq   *lq;
+	uint64		read,
+				written;
+
+	lq = mqh->mqh_local_queue;
+
+	if (lq == NULL || lq->mq_bytes_written == 0)
+		return;
+
+	read = lq->mq_bytes_read;
+	written = lq->mq_bytes_written;
+
+	while (written > read && !shm_mq_is_detached(mqh->mqh_queue))
+	{
+		copy_local_to_shared(lq, mqh, false);
+		read = lq->mq_bytes_read;
+		written = lq->mq_bytes_written;
+	}
+	/* this local queue is not required anymore, hence free the space. */
+	pfree(mqh->mqh_local_queue);
+}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 7709efcc48..80afb435e0 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -82,4 +82,8 @@ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 /* Smallest possible queue. */
 extern PGDLLIMPORT const Size shm_mq_minimum_size;
 
+/* Routines and structures required for local and shared queue type architecture */
+extern void empty_queue(shm_mq_handle *mqh);
+struct local_mq;
+typedef struct local_mq local_mq;
 #endif							/* SHM_MQ_H */
#16Dilip Kumar
dilipbalaut@gmail.com
In reply to: Rafia Sabih (#15)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Thu, Sep 21, 2017 at 4:50 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

Please find the attached file for the revised version.

Thanks for the updated patch, I have some more comments.

+static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh);
+static uint64 space_in_local(local_mq * lq, Size tuple_size);
+static bool read_local_queue(local_mq * lq, bool shm_mq_full);

local_mq * lq -> local_mq *lq
same for other places as well.

---

+static uint64 space_in_shm(shm_mq *mq);
+
+static uint64 space_in_local(local_mq * lq, Size tuple_size);

we better use Size here instead if uint64

---

+ available = ringsize - used;
+
+ ringsize = lq->mq_ring_size;
+ writer_offset = lq->mq_bytes_written % ringsize;
+ reader_offset = lq->mq_bytes_read % ringsize;
+
+ if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset)
+ available = (ringsize - writer_offset);

even though there is space in queue but tuple need rotation then we
are not allowing it to
write into the local queue. If there is some strong reason behind
that, please add comments
to explain the same.
---

+ if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size)
+ return true;
+
+ else
+ return true;
+}

Seems like you want to return 'false' in the else case.

----

+ read_offset = lq->mq_bytes_read % lq->mq_ring_size;
+ available = space_in_shm(mqh->mqh_queue);
+
+ /* always read data in the aligned form */
+ to_read = MAXALIGN_DOWN(Min(used, available));
+
+ /*
+ * if the amount of data to be send from local queue involves wrapping of
+ * local queue, then send only the data till the end of queue right now
+ * and rest later.
+ */
+ if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size)

You can directly use "read_offset" instead of recalculating
lq->mq_bytes_read % lq->mq_ring_size.

----
+ do
+ {
+ if (shm_mq_is_detached(mqh->mqh_queue))
+ return SHM_MQ_DETACHED;
+
+ shm_space = space_in_shm(mqh->mqh_queue);
+
+ /*
+ * cannot send data to shared queue, unless there is required
+ * space, so wait till we get some space, since we cannot
+ * write anymore in local queue as of now
+ */
+ WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+
+ shm_space = space_in_shm(mqh->mqh_queue);
+
+ if (read_local_queue(lq, true) && shm_space > 0)
+ copy_local_to_shared(lq, mqh, false);
+
+ local_space = space_in_local(lq, tuple_size);
+
+ } while (local_space <= tuple_size);
+

1. Just after getting the shm_space, you are calling WaitLatch,
without even checking whether
that space is sufficient to send the tuple.
2. Every time after latch is set (maybe some space freed in the shared
queue) you are calling
copy_local_to_shared to send as much data as possible from local to
shared queue, but that doesn't
even guarantee that we will have sufficient space in the local queue
to accommodate the current tuple.

I think calling copy_local_to_shared multiple time (which will
internally acquire mutex), after latch
is set you can check the shared queue space, don't attempt
copy_local_to_shared unless
shm_space >=tuple_size

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#17Rafia Sabih
rafia.sabih@enterprisedb.com
In reply to: Dilip Kumar (#16)
1 attachment(s)
Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

On Thu, Sep 21, 2017 at 10:34 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Thu, Sep 21, 2017 at 4:50 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

On Sun, Sep 17, 2017 at 9:10 PM, Dilip Kumar <dilipbalaut@gmail.com> wrote:

On Wed, Sep 6, 2017 at 4:14 PM, Rafia Sabih
<rafia.sabih@enterprisedb.com> wrote:

Please find the attached file for the revised version.

Thanks for the updated patch, I have some more comments.

Again, thanks for the review. Please find the attached file for updated patch.

+static shm_mq_handle *local_mq_attach(shm_mq_handle *mqh);
+static uint64 space_in_local(local_mq * lq, Size tuple_size);
+static bool read_local_queue(local_mq * lq, bool shm_mq_full);

local_mq * lq -> local_mq *lq
same for other places as well.

---

Done. This is something pgindent does, anyhow corrected it.

+static uint64 space_in_shm(shm_mq *mq);
+
+static uint64 space_in_local(local_mq * lq, Size tuple_size);

we better use Size here instead if uint64

---

Done

+ available = ringsize - used;
+
+ ringsize = lq->mq_ring_size;
+ writer_offset = lq->mq_bytes_written % ringsize;
+ reader_offset = lq->mq_bytes_read % ringsize;
+
+ if (writer_offset + tuple_size < ringsize && reader_offset < writer_offset)
+ available = (ringsize - writer_offset);

even though there is space in queue but tuple need rotation then we
are not allowing it to
write into the local queue. If there is some strong reason behind
that, please add comments
to explain the same.
---

Corrected, it will just return available space now.

+ if (shm_mq_full || (written - read) >= .05 * lq->mq_ring_size)
+ return true;
+
+ else
+ return true;
+}

Seems like you want to return 'false' in the else case.

----

Yes and done.

+ read_offset = lq->mq_bytes_read % lq->mq_ring_size;
+ available = space_in_shm(mqh->mqh_queue);
+
+ /* always read data in the aligned form */
+ to_read = MAXALIGN_DOWN(Min(used, available));
+
+ /*
+ * if the amount of data to be send from local queue involves wrapping of
+ * local queue, then send only the data till the end of queue right now
+ * and rest later.
+ */
+ if (lq->mq_bytes_read % lq->mq_ring_size + to_read > lq->mq_ring_size)

You can directly use "read_offset" instead of recalculating
lq->mq_bytes_read % lq->mq_ring_size.

----

Done.

+ do
+ {
+ if (shm_mq_is_detached(mqh->mqh_queue))
+ return SHM_MQ_DETACHED;
+
+ shm_space = space_in_shm(mqh->mqh_queue);
+
+ /*
+ * cannot send data to shared queue, unless there is required
+ * space, so wait till we get some space, since we cannot
+ * write anymore in local queue as of now
+ */
+ WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+
+ shm_space = space_in_shm(mqh->mqh_queue);
+
+ if (read_local_queue(lq, true) && shm_space > 0)
+ copy_local_to_shared(lq, mqh, false);
+
+ local_space = space_in_local(lq, tuple_size);
+
+ } while (local_space <= tuple_size);
+

1. Just after getting the shm_space, you are calling WaitLatch,
without even checking whether
that space is sufficient to send the tuple.
2. Every time after latch is set (maybe some space freed in the shared
queue) you are calling
copy_local_to_shared to send as much data as possible from local to
shared queue, but that doesn't
even guarantee that we will have sufficient space in the local queue
to accommodate the current tuple.

I think calling copy_local_to_shared multiple time (which will
internally acquire mutex), after latch
is set you can check the shared queue space, don't attempt
copy_local_to_shared unless
shm_space >=tuple_size

Done.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachments:

faster_gather_v3.patchapplication/octet-stream; name=faster_gather_v3.patchDownload
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index e9a5d5a1a5..b4f8386899 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -90,7 +90,10 @@ tqueueShutdownReceiver(DestReceiver *self)
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 
 	if (tqueue->queue != NULL)
+	{
+		empty_queue(tqueue->queue);
 		shm_mq_detach(tqueue->queue);
+	}
 	tqueue->queue = NULL;
 }
 
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 770559a03e..fae7eb9c98 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -25,6 +25,8 @@
 #include "storage/shm_mq.h"
 #include "storage/spin.h"
 
+#define LOCAL_TUPLE_QUEUE_SIZE		6553600
+
 /*
  * This structure represents the actual queue, stored in shared memory.
  *
@@ -79,6 +81,25 @@ struct shm_mq
 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
 };
 
+/* This is the structure for local queue where a worker can write
+ * tuples when it's shared queue is full.
+ *
+ * Each worker has it's own local queue where it can store tuples
+ * when master is busy and worker's shared queue gets full. Tuples
+ * are copied into shared queue via single memcpy equal to the space
+ * available in shared queue. Since, local queue is never shared with
+ * the master, we do not require any locking mechanism to write tuples
+ * in it, hence writing in local queue is a cheap operation.
+ */
+struct local_mq
+{
+	uint64		mq_bytes_read;
+	uint64		mq_bytes_written;
+	Size		mq_ring_size;
+	uint8		mq_ring_offset;
+	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /*
  * This structure is a backend-private handle for access to a queue.
  *
@@ -128,7 +149,9 @@ struct shm_mq
  */
 struct shm_mq_handle
 {
+	bool		mqh_local;
 	shm_mq	   *mqh_queue;
+	local_mq   *mqh_local_queue;
 	dsm_segment *mqh_segment;
 	BackgroundWorkerHandle *mqh_handle;
 	char	   *mqh_buffer;
@@ -150,12 +173,23 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 					 BackgroundWorkerHandle *handle);
+static bool shm_mq_is_detached(volatile shm_mq *mq);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+static Size space_in_shm(shm_mq *mq);
+
+/* Routines required for local queue */
+static local_mq * local_mq_create(void *address, Size size);
+static shm_mq_handle * local_mq_attach(shm_mq_handle *mqh);
+static Size space_in_local(local_mq *lq, Size tuple_size);
+static bool read_local_queue(local_mq *lq);
+static shm_mq_result write_in_local_queue(local_mq *mq, shm_mq_iovec *iov);
+static void local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written);
+static shm_mq_result copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool read_anyway);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
 const Size	shm_mq_minimum_size =
@@ -289,6 +323,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
 
 	Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+	mqh->mqh_local = false;
+	mqh->mqh_local_queue = NULL;
 	mqh->mqh_queue = mq;
 	mqh->mqh_segment = seg;
 	mqh->mqh_handle = handle;
@@ -319,17 +355,120 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 }
 
 /*
- * Write a message into a shared message queue.
+ * Write a message into a shared or local message queue, as per the space
+ * availability in these queues. If space is available in shared queue then
+ * we simply write the message there and return. Else we write it in local
+ * queue. Once both the queues are full, we wait till some of the data in
+ * shared queue is read and then copy the data from local to shared queue
+ * and continue writing in local queue. After writing in local queue we
+ * check if there is space available in shared queue and we copy the data
+ * from local to shared queue then itself.
  */
 shm_mq_result
 shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
 {
 	shm_mq_iovec iov;
+	local_mq   *lq;
+	shm_mq_result res;
+	Size		tuple_size,
+				local_space,
+				shm_space;
 
 	iov.data = data;
 	iov.len = nbytes;
+	shm_space = space_in_shm(mqh->mqh_queue);
+
+	/* this is actual size for this tuple which will be written in queue */
+	tuple_size = MAXALIGN(sizeof(Size)) + MAXALIGN(iov.len);
+
+	/*
+	 * if there is enough space in shared_queue and never been to local queue
+	 * then write the tuple in shared queue.
+	 */
+	if (shm_space > tuple_size && !mqh->mqh_local)
+		res = shm_mq_sendv(mqh, &iov, 1, nowait);
+
+	else
+	{
+		/* if queue is detached for some reason, nothing to do */
+		if (shm_mq_is_detached(mqh->mqh_queue))
+			return SHM_MQ_DETACHED;
+
+		/*
+		 * once started with local queue, the tuples will flow from local to
+		 * shared queue untill local queue is empty
+		 */
+		mqh->mqh_local = true;
 
-	return shm_mq_sendv(mqh, &iov, 1, nowait);
+		/* create and attach a local queue, if it is not yet created */
+		if (mqh->mqh_local_queue == NULL)
+			mqh = local_mq_attach(mqh);
+
+		lq = mqh->mqh_local_queue;
+		local_space = space_in_local(lq, tuple_size);
+
+		/* write in local queue if there is enough space */
+		if (local_space >= tuple_size)
+		{
+			res = write_in_local_queue(lq, &iov);
+
+			/*
+			 * if we have some data in local queue and some space in shared
+			 * queue then copy it to shared queue
+			 */
+			if (read_local_queue(lq) && shm_space > 0)
+				copy_local_to_shared(lq, mqh, false);
+		}
+		else
+		{
+			/*
+			 * if local queue is full, then copy some data to shared queue
+			 * till enough space becomes available in local queue
+			 */
+			do
+			{
+				while (shm_space < tuple_size)
+				{
+					/*
+					 * cannot send data to shared queue, unless there is
+					 * required space, so wait till we get some space, since
+					 * we cannot write anymore in local queue as of now
+					 */
+					WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
+
+					/* Reset the latch so we don't spin. */
+					ResetLatch(MyLatch);
+
+					/* An interrupt may have occurred while we were waiting. */
+					CHECK_FOR_INTERRUPTS();
+
+					/* if queue is detached then nothing to do */
+					if (shm_mq_is_detached(mqh->mqh_queue))
+						return SHM_MQ_DETACHED;
+				}
+				if (read_local_queue(lq) && shm_space >= tuple_size)
+					copy_local_to_shared(lq, mqh, false);
+
+				local_space = space_in_local(lq, tuple_size);
+
+			} while (local_space <= tuple_size);
+
+			/*
+			 * once space is available in local queue, write the tuple
+			 * appropriately. If local queue has become empty, then write the
+			 * tuple in shared queue itself, otherwise continue with local
+			 * queue itself.
+			 */
+			if (local_space > 0)
+				res = write_in_local_queue(lq, &iov);
+			else
+			{
+				mqh->mqh_local = false;
+				res = shm_mq_sendv(mqh, &iov, 1, nowait);
+			}
+		}
+	}
+	return res;
 }
 
 /*
@@ -1133,6 +1272,20 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 }
 
 /*
+ * Get if the shm_mq is deatched.
+ */
+static bool
+shm_mq_is_detached(volatile shm_mq *mq)
+{
+	bool		ret;
+
+	SpinLockAcquire(&mq->mq_mutex);
+	ret = mq->mq_detached;
+	SpinLockRelease(&mq->mq_mutex);
+	return ret;
+}
+
+/*
  * Get the number of bytes read.  The receiver need not use this to access
  * the count of bytes read, but the sender must.
  */
@@ -1224,3 +1377,259 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg)
 
 	shm_mq_detach_internal(mq);
 }
+
+/* Routines required for local queue */
+
+/*
+ * Initialize a new local message queue, this is kept quite similar to shm_mq_create.
+ */
+static local_mq *
+local_mq_create(void *address, Size size)
+{
+	local_mq   *mq = address;
+	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+	size = MAXALIGN_DOWN(size);
+
+	/* Queue size must be large enough to hold some data. */
+	Assert(size > data_offset);
+
+	/* Initialize queue header. */
+	mq->mq_bytes_read = 0;
+	mq->mq_bytes_written = 0;
+	mq->mq_ring_size = size - data_offset;
+	mq->mq_ring_offset = data_offset - offsetof(local_mq, mq_ring);
+	return mq;
+}
+
+/* routine to create and attach local_mq to the shm_mq_handle */
+static shm_mq_handle *
+local_mq_attach(shm_mq_handle *mqh)
+{
+	/*
+	 * create a local queue, the size of this queue should be way higher than
+	 * PARALLEL_TUPLE_QUEUE_SIZE
+	 */
+	char	   *mq;
+	Size		len;
+
+	len = LOCAL_TUPLE_QUEUE_SIZE;
+	mq = palloc0(len);
+	mqh->mqh_local_queue = local_mq_create(mq, len);
+
+	return mqh;
+}
+
+/* check the space availability in local queue */
+static Size
+space_in_local(local_mq *lq, Size tuple_size)
+{
+	uint64		read,
+				written;
+	Size		used,
+				available,
+				ringsize;
+
+	ringsize = lq->mq_ring_size;
+	read = lq->mq_bytes_read;
+	written = lq->mq_bytes_written;
+	used = written - read;
+	available = ringsize - used;
+
+	return available;
+}
+
+/* routine to check if there is enough space in shared_queue */
+static Size
+space_in_shm(shm_mq *mq)
+{
+	uint64		read,
+				written;
+	Size		used,
+				available,
+				ringsize;
+	bool		detached = false;
+
+	ringsize = mq->mq_ring_size;
+	read = shm_mq_get_bytes_read(mq, &detached);
+	written = shm_mq_get_bytes_written(mq, &detached);
+
+	used = written - read;
+	available = ringsize - used;
+
+	return available;
+}
+
+/*
+ * Routine to check if reading from local queue is possible. If local
+ * queue is atleast 5% used then we allow reading from local queue
+ */
+static bool
+read_local_queue(local_mq *lq)
+{
+	uint64		written,
+				read;
+
+	written = lq->mq_bytes_written;
+	read = lq->mq_bytes_read;
+
+	if ((written - read) >= .05 * lq->mq_ring_size)
+		return true;
+	else
+		return false;
+}
+
+/* Routine to write tuple in local queue. */
+static shm_mq_result
+write_in_local_queue(local_mq *lq, shm_mq_iovec *iov)
+{
+	uint64		bytes_written,
+				nbytes,
+				tuple_size;
+	Size		chunksize;
+	int			i;
+
+	tuple_size = sizeof(Size) + iov->len;
+	nbytes = 0;
+	bytes_written = 0;
+
+	/* Compute total size of write. */
+	for (i = 0; i < 1; ++i)
+		nbytes += iov[i].len;
+
+	local_mq_send_bytes(lq, sizeof(Size), ((char *) &nbytes), &bytes_written);
+
+	chunksize = iov[0].len;
+	local_mq_send_bytes(lq, chunksize, &iov[0].data[0], &bytes_written);
+
+	Assert(bytes_written > 0);
+	Assert(bytes_written == tuple_size);
+	return SHM_MQ_SUCCESS;
+}
+
+/* Routine to pass a batch of tuples from local to shared queue in one go */
+static shm_mq_result
+copy_local_to_shared(local_mq *lq, shm_mq_handle *mqh, bool nowait)
+{
+	uint64		to_read,
+				bytes_read;
+	Size		read_offset,
+				available,
+				used;
+	char	   *data;
+	shm_mq_result res;
+
+	bytes_read = 0;
+
+	if (shm_mq_is_detached(mqh->mqh_queue))
+		return SHM_MQ_DETACHED;
+
+	used = lq->mq_bytes_written - lq->mq_bytes_read;
+	Assert(used <= lq->mq_ring_size);
+	Assert(lq->mq_bytes_read <= lq->mq_bytes_written);
+	read_offset = lq->mq_bytes_read % lq->mq_ring_size;
+	available = space_in_shm(mqh->mqh_queue);
+
+	/* always read data in aligned form */
+	to_read = MAXALIGN_DOWN(Min(used, available));
+
+	/*
+	 * if the amount of data to be send from local queue involves wrapping of
+	 * local queue, then send only the data till the end of queue right now
+	 * and rest later.
+	 */
+	if (read_offset + to_read > lq->mq_ring_size)
+		to_read = lq->mq_ring_size - read_offset;
+
+	data = &(lq->mq_ring[lq->mq_ring_offset + read_offset]);
+	res = shm_mq_send_bytes(mqh, to_read, data, nowait, &bytes_read);
+
+	if (res != SHM_MQ_SUCCESS)
+		return res;
+
+	Assert(bytes_read == to_read);
+	lq->mq_bytes_read += bytes_read;
+	shm_mq_notify_receiver(mqh->mqh_queue);
+
+	return res;
+}
+
+/*
+ * This is the function which actually writes the tuple in the local_queue,
+ * it is same as shm_mq_send_bytes is for shm_mq.
+ */
+static void
+local_mq_send_bytes(local_mq *mq, Size nbytes, const void *data, Size *bytes_written)
+{
+	uint64		used;
+	Size		ringsize = mq->mq_ring_size;
+	Size		available,
+				sent = 0,
+				sendnow;
+
+	uint64		rb;
+
+	while (sent < nbytes)
+	{
+		/* Compute number of ring buffer bytes used and available. */
+		rb = mq->mq_bytes_read;
+		Assert(mq->mq_bytes_written >= rb);
+		used = mq->mq_bytes_written - rb;
+		Assert(used <= ringsize);
+		available = Min(ringsize - used, nbytes - sent);
+
+		if (available == 0)
+			elog(ERROR, "local queue full, this should never be reached");
+
+		else
+		{
+			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
+
+			sendnow = Min(available, ringsize - offset);
+
+			/* Write as much data as we can via a single memcpy(). */
+			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], (char *) data + sent, sendnow);
+			sent += sendnow;
+
+			/*
+			 * Update count of bytes written, with alignment padding.  Note
+			 * that this will never actually insert any padding except at the
+			 * end of a run of bytes, because the buffer size is a multiple of
+			 * MAXIMUM_ALIGNOF, and each read is as well.
+			 */
+			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+			mq->mq_bytes_written += MAXALIGN(sendnow);
+		}
+	}
+	*bytes_written += sent;
+}
+
+/*
+ * Empty the local queue by copying all the data from local to shared queue.
+ * This is required before shutdown of worker.
+ */
+void
+empty_queue(shm_mq_handle *mqh)
+{
+	local_mq   *lq;
+	uint64		read,
+				written;
+
+	lq = mqh->mqh_local_queue;
+
+	if (lq == NULL || lq->mq_bytes_written == 0)
+		return;
+
+	read = lq->mq_bytes_read;
+	written = lq->mq_bytes_written;
+
+	while (written > read && !shm_mq_is_detached(mqh->mqh_queue))
+	{
+		copy_local_to_shared(lq, mqh, false);
+		read = lq->mq_bytes_read;
+		written = lq->mq_bytes_written;
+	}
+	/* this local queue is not required anymore, hence free the space. */
+	pfree(mqh->mqh_local_queue);
+}
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 7709efcc48..80afb435e0 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -82,4 +82,8 @@ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 /* Smallest possible queue. */
 extern PGDLLIMPORT const Size shm_mq_minimum_size;
 
+/* Routines and structures required for local and shared queue type architecture */
+extern void empty_queue(shm_mq_handle *mqh);
+struct local_mq;
+typedef struct local_mq local_mq;
 #endif							/* SHM_MQ_H */