Does TupleQueueReaderNext() really need to copy its result?
Hi,
A comment in tqueue.c says that the bytes return by shm_mq_receive()
"had better be sufficiently aligned", before assigning the pointer to
htup.t_data. Then it copies htup and returns the copy (and it did so
in the earlier version that had all the remapping stuff, too, but
sometimes it deformed it directly so it really did need to be suitably
aligned in that case IIUC).
Given that shm_mq.c proudly documents that it avoids copying the data
on the receiving side (unless it has to reconstruct a message that was
split up), and given that it promises that the pointed-to data remains
valid until your next call, it seems that it should be safe to return
a pointer to the same HeapTupleData object every time (perhaps a
member of the TupleQueueReader struct) and just adjust its t_data and
t_len members every time, so that the gather node emits tuples
directly from the shared memory queue (and then of course tell the
slot not to pfree()). Alternatively, if the value returned by
shm_mq_receive() is not really suitably aligned, then the comment is a
bit misleading.
--
Thomas Munro
https://enterprisedb.com
On Thu, Aug 22, 2019 at 12:08 PM Thomas Munro <thomas.munro@gmail.com> wrote:
Given that shm_mq.c proudly documents that it avoids copying the data
on the receiving side (unless it has to reconstruct a message that was
split up), and given that it promises that the pointed-to data remains
valid until your next call, it seems that it should be safe to return
a pointer to the same HeapTupleData object every time (perhaps a
member of the TupleQueueReader struct) and just adjust its t_data and
t_len members every time, so that the gather node emits tuples
directly from the shared memory queue (and then of course tell the
slot not to pfree()). Alternatively, if the value returned by
shm_mq_receive() is not really suitably aligned, then the comment is a
bit misleading.
Couldn't resist trying this, and it seems to work. Based on the
comment "the buffer size is a multiple of MAXIMUM_ALIGNOF, and each
read and write is as well", it should always work (though I wish
shm_mq_receive_bytes()'s documentation would discuss message alignment
explicitly if that's true). On the other hand, I doubt it makes a
difference, so this is more of a question: is this the way it was
supposed to work?
(It was ~40% faster at shunting a large SELECT * through the queue
with asserts enabled, which made me happy for moment, but it was only
an illusion and not measurable in the noise without asserts).
--
Thomas Munro
https://enterprisedb.com
Attachments:
0001-Avoid-unnecessary-copying-in-tqueue.c.patchapplication/octet-stream; name=0001-Avoid-unnecessary-copying-in-tqueue.c.patchDownload
From 014b5e85761550df78897b18e52493fb7cc07a7d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 24 Aug 2019 14:09:00 +1200
Subject: [PATCH] Avoid unnecessary copying in tqueue.c.
When receiving a tuple, don't make a new copy. Instead, reuse
the same HeapTupleData object and have it point directly to the
data returned by shm_mq_receive_bytes(), which in turn is valid
until the next call. Client code that wants the tuple to live
longer that the next call to TupleQueueReaderNext() should make
its own copy.
Gather can emit each tuples it reads directly. GatherMerge
needs to make an extra copy because it reads ahead, invalidating
pointers it has buffered.
Discussion: https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
---
src/backend/executor/nodeGather.c | 2 +-
src/backend/executor/nodeGatherMerge.c | 6 +++++-
src/backend/executor/tqueue.c | 22 +++++++++++-----------
3 files changed, 17 insertions(+), 13 deletions(-)
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 69d5a1f239..41a7d28e1c 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -280,7 +280,7 @@ gather_getnext(GatherState *gatherstate)
{
ExecStoreHeapTuple(tup, /* tuple to store */
fslot, /* slot to store the tuple */
- true); /* pfree tuple when done with it */
+ false); /* don't pfree tuple when done */
return fslot;
}
}
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 6ef128e2ab..77f09d0a8a 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done);
- return tup;
+ /*
+ * Since we'll be holding onto these, we need make a copy. The tuple we
+ * just read is only valid until the next read.
+ */
+ return heap_copytuple(tup);
}
/*
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 2b27b41850..adf82e2141 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -43,6 +43,7 @@ typedef struct TQueueDestReceiver
struct TupleQueueReader
{
shm_mq_handle *queue; /* shm_mq to receive from */
+ HeapTupleData htup; /* used for returned tuples */
};
/*
@@ -140,6 +141,8 @@ CreateTupleQueueReader(shm_mq_handle *handle)
{
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
+ ItemPointerSetInvalid(&reader->htup.t_self);
+ reader->htup.t_tableOid = InvalidOid;
reader->queue = handle;
return reader;
@@ -164,9 +167,9 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false.
*
- * The returned tuple, if any, is allocated in CurrentMemoryContext.
- * Note that this routine must not leak memory! (We used to allow that,
- * but not any more.)
+ * The returned tuple, if any, is either in shared memory or a private buffer
+ * and remains valid until the next call. It should not be freed by the
+ * caller.
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
@@ -175,7 +178,6 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
HeapTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
- HeapTupleData htup;
shm_mq_result result;
Size nbytes;
void *data;
@@ -200,13 +202,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Assert(result == SHM_MQ_SUCCESS);
/*
- * Set up a dummy HeapTupleData pointing to the data from the shm_mq
- * (which had better be sufficiently aligned).
+ * Point our HeapTupleData to the data from the shm_mq (which had better
+ * be sufficiently aligned).
*/
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = nbytes;
- htup.t_data = data;
+ reader->htup.t_len = nbytes;
+ reader->htup.t_data = data;
- return heap_copytuple(&htup);
+ return &reader->htup;
}
--
2.22.1
On Fri, Aug 23, 2019 at 10:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
Couldn't resist trying this, and it seems to work. Based on the
comment "the buffer size is a multiple of MAXIMUM_ALIGNOF, and each
read and write is as well", it should always work (though I wish
shm_mq_receive_bytes()'s documentation would discuss message alignment
explicitly if that's true). On the other hand, I doubt it makes a
difference, so this is more of a question: is this the way it was
supposed to work?
There's a comment in htup.h which says:
* * Separately allocated tuple: t_data points to a palloc'd chunk that
* is not adjacent to the HeapTupleData. (This case is deprecated since
* it's difficult to tell apart from case #1. It should be used only in
* limited contexts where the code knows that case #1 will never apply.)
I got scared and ran away.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
Hi,
On 2019-08-26 14:09:45 -0400, Robert Haas wrote:
On Fri, Aug 23, 2019 at 10:22 PM Thomas Munro <thomas.munro@gmail.com> wrote:
Couldn't resist trying this, and it seems to work. Based on the
comment "the buffer size is a multiple of MAXIMUM_ALIGNOF, and each
read and write is as well", it should always work (though I wish
shm_mq_receive_bytes()'s documentation would discuss message alignment
explicitly if that's true). On the other hand, I doubt it makes a
difference, so this is more of a question: is this the way it was
supposed to work?There's a comment in htup.h which says:
* * Separately allocated tuple: t_data points to a palloc'd chunk that
* is not adjacent to the HeapTupleData. (This case is deprecated since
* it's difficult to tell apart from case #1. It should be used only in
* limited contexts where the code knows that case #1 will never apply.)I got scared and ran away.
Perhaps this'd could be sidestepped by funneling through MinimalTuples
instead of HeapTuples. Afaict that should always be sufficient, because
all system column accesses ought to happen below (including being
projected into a separate column, if needed above). With the added
benefit of needing less space, of course.
Greetings,
Andres Freund
On Tue, Aug 27, 2019 at 6:35 AM Andres Freund <andres@anarazel.de> wrote:
On 2019-08-26 14:09:45 -0400, Robert Haas wrote:
There's a comment in htup.h which says:
* * Separately allocated tuple: t_data points to a palloc'd chunk that
* is not adjacent to the HeapTupleData. (This case is deprecated since
* it's difficult to tell apart from case #1. It should be used only in
* limited contexts where the code knows that case #1 will never apply.)I got scared and ran away.
Perhaps this'd could be sidestepped by funneling through MinimalTuples
instead of HeapTuples. Afaict that should always be sufficient, because
all system column accesses ought to happen below (including being
projected into a separate column, if needed above). With the added
benefit of needing less space, of course.
I tried that out (attached). That makes various simple tests like
this to go 10%+ faster on my development machine:
create table s as select generate_series(1, 50000000)::int i,
'hello world' a,
'this is a message' b,
42 c;
select pg_prewarm('s');
set force_parallel_mode = on;
explain analyze select * from s;
PS It looks like the following load of mq_ring_size might be running
a little hot due to false sharing with the atomic counters:
if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
Attachments:
0001-Use-MinimalTuple-for-tuple-queues.patchtext/x-patch; charset=US-ASCII; name=0001-Use-MinimalTuple-for-tuple-queues.patchDownload
From 0ba21ee67c9e8f2be404fa7e16d30a815310c52a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 14 May 2020 19:08:50 +1200
Subject: [PATCH] Use MinimalTuple for tuple queues.
This saves 8 bytes per tuple in the queue, and avoids the need to copy
and free tuples on the receiving side.
Gather can emit the returned MinimalTuple directly, but GatherMerge now
needs to make an explicit copy because it buffers multiple tuples at a
time.
Discussion: https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
---
src/backend/executor/nodeGather.c | 16 +++++------
src/backend/executor/nodeGatherMerge.c | 40 ++++++++++++++------------
src/backend/executor/tqueue.c | 30 +++++++++----------
src/include/executor/tqueue.h | 4 +--
4 files changed, 46 insertions(+), 44 deletions(-)
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 6b8ed867d5..a01b46af14 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,7 +46,7 @@
static TupleTableSlot *ExecGather(PlanState *pstate);
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
-static HeapTuple gather_readnext(GatherState *gatherstate);
+static MinimalTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* Initialize funnel slot to same tuple descriptor as outer plan.
*/
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
/*
* Gather doesn't support checking a qual (it's always more efficient to
@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate)
PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot;
- HeapTuple tup;
+ MinimalTuple tup;
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{
@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate)
if (HeapTupleIsValid(tup))
{
- ExecStoreHeapTuple(tup, /* tuple to store */
- fslot, /* slot to store the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ fslot, /* slot to store the tuple */
+ false); /* don't pfree tuple */
return fslot;
}
}
@@ -308,7 +308,7 @@ gather_getnext(GatherState *gatherstate)
/*
* Attempt to read a tuple from one of our parallel workers.
*/
-static HeapTuple
+static MinimalTuple
gather_readnext(GatherState *gatherstate)
{
int nvisited = 0;
@@ -316,7 +316,7 @@ gather_readnext(GatherState *gatherstate)
for (;;)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
bool readerdone;
/* Check for async events, particularly messages from workers. */
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 317ddb4ae2..47129344f3 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -45,7 +45,7 @@
*/
typedef struct GMReaderTupleBuffer
{
- HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
+ MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
int nTuples; /* number of tuples currently stored */
int readCounter; /* index of next tuple to extract */
bool done; /* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
-static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
- bool nowait, bool *done);
+static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+ bool nowait, bool *done);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
static void gather_merge_setup(GatherMergeState *gm_state);
static void gather_merge_init(GatherMergeState *gm_state);
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
{
/* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple =
- (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+ (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
/* Initialize tuple slot for worker */
gm_state->gm_slots[i + 1] =
ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
}
/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
while (tuple_buffer->readCounter < tuple_buffer->nTuples)
- heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+ pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
ExecClearTuple(gm_state->gm_slots[i + 1]);
}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
/* Try to fill additional slots in the array. */
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
{
- HeapTuple tuple;
+ MinimalTuple tuple;
tuple = gm_readnext_tuple(gm_state,
reader,
true,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tuple))
+ if (!tuple)
break;
tuple_buffer->tuple[i] = tuple;
tuple_buffer->nTuples++;
@@ -637,7 +637,7 @@ static bool
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
{
GMReaderTupleBuffer *tuple_buffer;
- HeapTuple tup;
+ MinimalTuple tup;
/*
* If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
reader,
nowait,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tup))
+ if (!tup)
return false;
/*
@@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
load_tuple_array(gm_state, reader);
}
- Assert(HeapTupleIsValid(tup));
+ Assert(tup);
/* Build the TupleTableSlot for the given tuple */
- ExecStoreHeapTuple(tup, /* tuple to store */
- gm_state->gm_slots[reader], /* slot in which to store
- * the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ gm_state->gm_slots[reader], /* slot in which to store
+ * the tuple */
+ true); /* pfree tuple when done with it */
return true;
}
@@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
/*
* Attempt to read a tuple from given worker.
*/
-static HeapTuple
+static MinimalTuple
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
bool *done)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done);
- return tup;
+ /*
+ * Since we'll be buffering these across multiple calls, we need to make a
+ * copy.
+ */
+ return tup ? heap_copy_minimal_tuple(tup) : NULL;
}
/*
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index e5656fbfac..30a264ebea 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -54,16 +54,16 @@ static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- HeapTuple tuple;
+ MinimalTuple tuple;
shm_mq_result result;
bool should_free;
/* Send the tuple itself. */
- tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
- result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
+ tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
+ result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
if (should_free)
- heap_freetuple(tuple);
+ pfree(tuple);
/* Check for failure. */
if (result == SHM_MQ_DETACHED)
@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false.
*
- * The returned tuple, if any, is allocated in CurrentMemoryContext.
- * Note that this routine must not leak memory! (We used to allow that,
- * but not any more.)
+ * The returned tuple, if any, is either in shared memory or a private buffer
+ * and should not be freed. The pointer is invalid after the next call to
+ * TupleQueueReaderNext().
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned.
*/
-HeapTuple
+MinimalTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
- HeapTupleData htup;
+ MinimalTuple tuple;
shm_mq_result result;
Size nbytes;
void *data;
@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Assert(result == SHM_MQ_SUCCESS);
/*
- * Set up a dummy HeapTupleData pointing to the data from the shm_mq
- * (which had better be sufficiently aligned).
+ * Return a pointer to the queue memory directly (which had better be
+ * sufficiently aligned).
*/
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = nbytes;
- htup.t_data = data;
+ tuple = (MinimalTuple) data;
+ Assert(tuple->t_len == nbytes);
- return heap_copytuple(&htup);
+ return tuple;
}
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 93655ef6bd..264eb56641 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
/* Use these to receive tuples from a shm_mq. */
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
extern void DestroyTupleQueueReader(TupleQueueReader *reader);
-extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
- bool nowait, bool *done);
+extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader,
+ bool nowait, bool *done);
#endif /* TQUEUE_H */
--
2.20.1
On Thu, May 14, 2020 at 10:55 PM Thomas Munro <thomas.munro@gmail.com> wrote:
On Tue, Aug 27, 2019 at 6:35 AM Andres Freund <andres@anarazel.de> wrote:
Perhaps this'd could be sidestepped by funneling through MinimalTuples
instead of HeapTuples. Afaict that should always be sufficient, because
all system column accesses ought to happen below (including being
projected into a separate column, if needed above). With the added
benefit of needing less space, of course.
Right, create_gather[_merge]_plan() does create_plan_recurse(...,
CP_EXACT_TLIST). Here's a new version that updates the comment there
to note that this is not merely a good idea but a requirement, due to
the MinimalTuple conveyance. (I think there may be another reason the
system columns are projected away even without that, but saying so
explicitly and documenting it seems useful either way).
I tried that out (attached). That makes various simple tests like
this to go 10%+ faster on my development machine:
I registered this patch as https://commitfest.postgresql.org/28/2560/
in case someone would like to review it.
Attachments:
v2-0001-Use-MinimalTuple-for-tuple-queues.patchtext/x-patch; charset=US-ASCII; name=v2-0001-Use-MinimalTuple-for-tuple-queues.patchDownload
From 103c74c34f0d0d97d162c38d34cd720774325b25 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 14 May 2020 19:08:50 +1200
Subject: [PATCH v2] Use MinimalTuple for tuple queues.
This saves 8 bytes per tuple in the queue, and avoids the need to copy
and free tuples on the receiving side.
Gather can emit the returned MinimalTuple directly, but GatherMerge now
needs to make an explicit copy because it buffers multiple tuples at a
time.
Discussion: https://postgr.es/m/CA%2BhUKG%2B8T_ggoUTAE-U%3DA%2BOcPc4%3DB0nPPHcSfffuQhvXXjML6w%40mail.gmail.com
---
src/backend/executor/nodeGather.c | 16 +++++-----
src/backend/executor/nodeGatherMerge.c | 40 ++++++++++++++-----------
src/backend/executor/tqueue.c | 30 +++++++++----------
src/backend/optimizer/plan/createplan.c | 8 +++--
src/include/executor/tqueue.h | 4 +--
5 files changed, 51 insertions(+), 47 deletions(-)
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 6b8ed867d5..a01b46af14 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,7 +46,7 @@
static TupleTableSlot *ExecGather(PlanState *pstate);
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
-static HeapTuple gather_readnext(GatherState *gatherstate);
+static MinimalTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
@@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
* Initialize funnel slot to same tuple descriptor as outer plan.
*/
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
/*
* Gather doesn't support checking a qual (it's always more efficient to
@@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate)
PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot;
- HeapTuple tup;
+ MinimalTuple tup;
while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{
@@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate)
if (HeapTupleIsValid(tup))
{
- ExecStoreHeapTuple(tup, /* tuple to store */
- fslot, /* slot to store the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ fslot, /* slot to store the tuple */
+ false); /* don't pfree tuple */
return fslot;
}
}
@@ -308,7 +308,7 @@ gather_getnext(GatherState *gatherstate)
/*
* Attempt to read a tuple from one of our parallel workers.
*/
-static HeapTuple
+static MinimalTuple
gather_readnext(GatherState *gatherstate)
{
int nvisited = 0;
@@ -316,7 +316,7 @@ gather_readnext(GatherState *gatherstate)
for (;;)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
bool readerdone;
/* Check for async events, particularly messages from workers. */
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 317ddb4ae2..47129344f3 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -45,7 +45,7 @@
*/
typedef struct GMReaderTupleBuffer
{
- HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
+ MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
int nTuples; /* number of tuples currently stored */
int readCounter; /* index of next tuple to extract */
bool done; /* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
-static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
- bool nowait, bool *done);
+static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+ bool nowait, bool *done);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
static void gather_merge_setup(GatherMergeState *gm_state);
static void gather_merge_init(GatherMergeState *gm_state);
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
{
/* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple =
- (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+ (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
/* Initialize tuple slot for worker */
gm_state->gm_slots[i + 1] =
ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
}
/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
while (tuple_buffer->readCounter < tuple_buffer->nTuples)
- heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+ pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
ExecClearTuple(gm_state->gm_slots[i + 1]);
}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
/* Try to fill additional slots in the array. */
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
{
- HeapTuple tuple;
+ MinimalTuple tuple;
tuple = gm_readnext_tuple(gm_state,
reader,
true,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tuple))
+ if (!tuple)
break;
tuple_buffer->tuple[i] = tuple;
tuple_buffer->nTuples++;
@@ -637,7 +637,7 @@ static bool
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
{
GMReaderTupleBuffer *tuple_buffer;
- HeapTuple tup;
+ MinimalTuple tup;
/*
* If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
reader,
nowait,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tup))
+ if (!tup)
return false;
/*
@@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
load_tuple_array(gm_state, reader);
}
- Assert(HeapTupleIsValid(tup));
+ Assert(tup);
/* Build the TupleTableSlot for the given tuple */
- ExecStoreHeapTuple(tup, /* tuple to store */
- gm_state->gm_slots[reader], /* slot in which to store
- * the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ gm_state->gm_slots[reader], /* slot in which to store
+ * the tuple */
+ true); /* pfree tuple when done with it */
return true;
}
@@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
/*
* Attempt to read a tuple from given worker.
*/
-static HeapTuple
+static MinimalTuple
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
bool *done)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done);
- return tup;
+ /*
+ * Since we'll be buffering these across multiple calls, we need to make a
+ * copy.
+ */
+ return tup ? heap_copy_minimal_tuple(tup) : NULL;
}
/*
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index e5656fbfac..30a264ebea 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -54,16 +54,16 @@ static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- HeapTuple tuple;
+ MinimalTuple tuple;
shm_mq_result result;
bool should_free;
/* Send the tuple itself. */
- tuple = ExecFetchSlotHeapTuple(slot, true, &should_free);
- result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
+ tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
+ result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
if (should_free)
- heap_freetuple(tuple);
+ pfree(tuple);
/* Check for failure. */
if (result == SHM_MQ_DETACHED)
@@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false.
*
- * The returned tuple, if any, is allocated in CurrentMemoryContext.
- * Note that this routine must not leak memory! (We used to allow that,
- * but not any more.)
+ * The returned tuple, if any, is either in shared memory or a private buffer
+ * and should not be freed. The pointer is invalid after the next call to
+ * TupleQueueReaderNext().
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned.
*/
-HeapTuple
+MinimalTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
- HeapTupleData htup;
+ MinimalTuple tuple;
shm_mq_result result;
Size nbytes;
void *data;
@@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
Assert(result == SHM_MQ_SUCCESS);
/*
- * Set up a dummy HeapTupleData pointing to the data from the shm_mq
- * (which had better be sufficiently aligned).
+ * Return a pointer to the queue memory directly (which had better be
+ * sufficiently aligned).
*/
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = nbytes;
- htup.t_data = data;
+ tuple = (MinimalTuple) data;
+ Assert(tuple->t_len == nbytes);
- return heap_copytuple(&htup);
+ return tuple;
}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index eb9543f6ad..c0be65409e 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1730,8 +1730,10 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
List *tlist;
/*
- * Although the Gather node can project, we prefer to push down such work
- * to its child node, so demand an exact tlist from the child.
+ * Push projection down to the child node. That way, the projection work
+ * is parallelized, and there can be no system columns in the result (they
+ * can't travel through a tuple queue because it uses MinimalTuple
+ * representation).
*/
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
@@ -1766,7 +1768,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
List *pathkeys = best_path->path.pathkeys;
List *tlist = build_path_tlist(root, &best_path->path);
- /* As with Gather, it's best to project away columns in the workers. */
+ /* As with Gather, project away columns in the workers. */
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
/* Create a shell for a GatherMerge plan. */
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 93655ef6bd..264eb56641 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
/* Use these to receive tuples from a shm_mq. */
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
extern void DestroyTupleQueueReader(TupleQueueReader *reader);
-extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
- bool nowait, bool *done);
+extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader,
+ bool nowait, bool *done);
#endif /* TQUEUE_H */
--
2.20.1
Hi Thomas,
+1 to the idea! I ran some experiments on both of your patches.
I could reproduce the speed gain that you saw for a plan with a simple
parallel sequential scan. However, I got no gain at all for a parallel
hash join and parallel agg query.
-----------------------------------------------------------------------
select pg_prewarm('lineitem');
-- lineitem is 17G. (TPCH scale = 20). shared_buffers = 30G
explain analyze select * from lineitem;
[w/o any patch] 99s
[w/ first patch] 89s
[w/ last minimal tuple patch] 79s
-----------------------------------------------------------------------
select pg_prewarm('lineitem');
-- lineitem is 17G. (TPCH scale = 20). shared_buffers = 30G
explain analyze select count(*) from lineitem;
[w/o any patch] 10s
[w/ first patch] 10s
[w/ last minimal tuple patch] 10s
-----------------------------------------------------------------------
select pg_prewarm('lineitem');
select pg_prewarm('orders');
-- lineitem is 17G, orders is 4G. (TPCH scale = 20). shared_buffers = 30G
explain analyze select count(*)
from lineitem
join orders on l_orderkey = o_orderkey
where o_totalprice > 5.00;
[w/o any patch] 54s
[w/ first patch] 53s
[w/ last minimal tuple patch] 56s
-----------------------------------------------------------------------
Maybe I'm missing something, since there should be improvements with
anything that has a gather?
As for gather merge, is it possible to have a situation where the slot
input to tqueueReceiveSlot() is a heap slot (as would be the case for a
simple select *)? If yes, in those scenarios, we would be incurring an
extra call to minimal_tuple_from_heap_tuple() because of the extra call
to ExecFetchSlotMinimalTuple() inside tqueueReceiveSlot() in your patch.
And since, in a gather merge, we can't avoid the copy on the leader side
(heap_copy_minimal_tuple() inside gm_readnext_tuple()), we would be
doing extra work in that scenario. I couldn't come up with a plan that
creates a scenario like this however.
Regards,
Soumyadeep (VMware)
On Sat, Jul 11, 2020 at 1:37 PM Soumyadeep Chakraborty
<soumyadeep2007@gmail.com> wrote:
+1 to the idea! I ran some experiments on both of your patches.
Hi Soumyadeep,
Thanks for testing!
I could reproduce the speed gain that you saw for a plan with a simple
parallel sequential scan. However, I got no gain at all for a parallel
hash join and parallel agg query.
Right, it's not going to make a difference when you only send one
tuple through the queue, like COUNT(*) does.
As for gather merge, is it possible to have a situation where the slot
input to tqueueReceiveSlot() is a heap slot (as would be the case for a
simple select *)? If yes, in those scenarios, we would be incurring an
extra call to minimal_tuple_from_heap_tuple() because of the extra call
to ExecFetchSlotMinimalTuple() inside tqueueReceiveSlot() in your patch.
And since, in a gather merge, we can't avoid the copy on the leader side
(heap_copy_minimal_tuple() inside gm_readnext_tuple()), we would be
doing extra work in that scenario. I couldn't come up with a plan that
creates a scenario like this however.
Hmm. I wish we had a way to do an "in-place" copy-to-minimal-tuple
where the caller supplies the memory, with some fast protocol to get
the size right. We could use that for copying tuples into shm queues,
hash join tables etc without an extra palloc()/pfree() and double
copy.
Hey Thomas,
On Fri, Jul 10, 2020 at 7:30 PM Thomas Munro <thomas.munro@gmail.com> wrote:
I could reproduce the speed gain that you saw for a plan with a simple
parallel sequential scan. However, I got no gain at all for a parallel
hash join and parallel agg query.Right, it's not going to make a difference when you only send one
tuple through the queue, like COUNT(*) does.
How silly of me! I should have paid more attention to the rows output
from each worker and that there was a select count(*) on the join query.
Anyway, these are a new set of results:
-----------------------------------------------------------------------
select pg_prewarm('lineitem');
select pg_prewarm('orders');
-- lineitem is 17G, orders is 4G. (TPCH scale = 20). shared_buffers = 30G
explain analyze select *
from lineitem
join orders on l_orderkey = o_orderkey
where o_totalprice > 5.00;
[w/o any patch] 637s
[w/ first patch] 635s
[w/ last minimal tuple patch] 568s
-----------------------------------------------------------------------
We do indeed get the speedup.
As for gather merge, is it possible to have a situation where the slot
input to tqueueReceiveSlot() is a heap slot (as would be the case for a
simple select *)? If yes, in those scenarios, we would be incurring an
extra call to minimal_tuple_from_heap_tuple() because of the extra call
to ExecFetchSlotMinimalTuple() inside tqueueReceiveSlot() in your patch.
And since, in a gather merge, we can't avoid the copy on the leader side
(heap_copy_minimal_tuple() inside gm_readnext_tuple()), we would be
doing extra work in that scenario. I couldn't come up with a plan that
creates a scenario like this however.Hmm. I wish we had a way to do an "in-place" copy-to-minimal-tuple
where the caller supplies the memory, with some fast protocol to get
the size right. We could use that for copying tuples into shm queues,
hash join tables etc without an extra palloc()/pfree() and double
copy.
Do you mean that we should have an implementation for
get_minimal_tuple() for the heap AM and have it return a pointer to the
minimal tuple from the MINIMAL_TUPLE_OFFSET? And then a caller such as
tqueueReceiveSlot() will ensure that the heap tuple from which it wants
to extract the minimal tuple was allocated in the tuple queue in the
first place? If we consider that the node directly below a gather is a
SeqScan, then we could possibly, in ExecInitSeqScan() set-up the
ss_ScanTupleSlot to point to memory in the shared tuple queue?
Similarly, other ExecInit*() methods can do the same for other executor
nodes that involve parallelism? Of course, things would be slightly
different for
the other use cases you mentioned (such as hash table population)
All things considered, I think the patch in its current form should go
in. Having the in-place copy, could be done as a separate patch? Do you
agree?
Regards,
Soumyadeep (VMware)
On Sun, Jul 12, 2020 at 7:25 AM Soumyadeep Chakraborty
<soumyadeep2007@gmail.com> wrote:
Do you mean that we should have an implementation for
get_minimal_tuple() for the heap AM and have it return a pointer to the
minimal tuple from the MINIMAL_TUPLE_OFFSET? And then a caller such as
tqueueReceiveSlot() will ensure that the heap tuple from which it wants
to extract the minimal tuple was allocated in the tuple queue in the
first place? If we consider that the node directly below a gather is a
SeqScan, then we could possibly, in ExecInitSeqScan() set-up the
ss_ScanTupleSlot to point to memory in the shared tuple queue?
Similarly, other ExecInit*() methods can do the same for other executor
nodes that involve parallelism? Of course, things would be slightly
different for
the other use cases you mentioned (such as hash table population)
What I mean is that where ExecHashTableInsert() and
tqueueReceiveSlot() do ExecFetchSlotMinimalTuple(), you usually get a
freshly allocated copy, and then you copy that again, and free it.
There may be something similar going on in tuplestore and sort code.
Perhaps we could have something like
ExecFetchSlotMinimalTupleInPlace(slot, output_buffer,
output_buffer_size) that returns a value that indicates either success
or hey-that-buffer's-too-small-I-need-N-bytes, or something like that.
That silly extra copy is something Andres pointed out to me in some
perf results involving TPCH hash joins, a couple of years ago.
All things considered, I think the patch in its current form should go
in.
Thanks for the testing and review! Pushed.
Having the in-place copy, could be done as a separate patch? Do you
agree?
Yeah.
Hi,
I am starting a new thread that continues with the following point
that was discussed in [1]/messages/by-id/CA+hUKGLrN2M18-hACEJbNoj2sn_WoUj9rkkBeoPK7SY427pAnA@mail.gmail.com ....
On Fri, 17 Jul 2020 at 09:05, Thomas Munro <thomas.munro@gmail.com> wrote:
On Sun, Jul 12, 2020 at 7:25 AM Soumyadeep Chakraborty
<soumyadeep2007@gmail.com> wrote:Do you mean that we should have an implementation for
get_minimal_tuple() for the heap AM and have it return a pointer to the
minimal tuple from the MINIMAL_TUPLE_OFFSET? And then a caller such as
tqueueReceiveSlot() will ensure that the heap tuple from which it wants
to extract the minimal tuple was allocated in the tuple queue in the
first place? If we consider that the node directly below a gather is a
SeqScan, then we could possibly, in ExecInitSeqScan() set-up the
ss_ScanTupleSlot to point to memory in the shared tuple queue?
Similarly, other ExecInit*() methods can do the same for other executor
nodes that involve parallelism? Of course, things would be slightly
different for
the other use cases you mentioned (such as hash table population)What I mean is that where ExecHashTableInsert() and
tqueueReceiveSlot() do ExecFetchSlotMinimalTuple(), you usually get a
freshly allocated copy, and then you copy that again, and free it.
There may be something similar going on in tuplestore and sort code.
Perhaps we could have something like
ExecFetchSlotMinimalTupleInPlace(slot, output_buffer,
output_buffer_size) that returns a value that indicates either success
or hey-that-buffer's-too-small-I-need-N-bytes, or something like that.
That silly extra copy is something Andres pointed out to me in some
perf results involving TPCH hash joins, a couple of years ago.
I went ahead and tried doing this. I chose an approach where we can
return the pointer to the in-place minimal tuple data if it's a
heap/buffer/minimal tuple slot. A new function
ExecFetchSlotMinimalTupleData() returns in-place minimal tuple data.
If it's neither heap, buffer or minimal tuple, it returns a copy as
usual. The receiver should not assume the data is directly taken from
MinimalTupleData, so it should set it's t_len to the number of bytes
returned. Patch attached
(0001-Avoid-redundant-tuple-copy-while-sending-tuples-to-G.patch)
Thomas, I guess you had a different approach in mind when you said
about "returning either success or
hey-that-buffer's-too-small-I-need-N-bytes". But what looks clear to
me is that avoiding the copy shows consistent improvement of 4 to 10%
for simple parallel table scans. I tried my patch on both x86_64 and
arm64, and observed this speedup on both.
create table tab as select generate_series(1, 20000000) id,
'abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz' v;
select pg_prewarm('tab'::regclass);
explain analyze select * from tab where id %2 = 0;
Times in milli-secs :
HEAD : 1833.119 1816.522 1875.648 1872.153 1834.383
Patch'ed : 1763.786 1721.160 1714.665 1719.738 1704.478
This was with the default 2 parallel workers. With 3 or 4 workers, for
the above testcase I didn't see a noticeable difference. I think, if I
double the number of rows, the difference will be noticeable. In any
case, the gain would go on reducing with the number of workers,
because the tuple copy also gets parallelized. In some scenarios,
parallel_leader_participation=off causes the difference to amplify.
Haven't had a chance to see if this helps any of the TPC-H queries.
Also attached is a patch guc_for_testing.patch that I used for testing
the gain. This patch is only for testing. Without this, in order to
compare the performance figures it requires server restart, and the
figures anyway shift back and forth by 5-15 percent after each
restart, which creates lot of noise when comparing figures with and
without fix. Use this GUC enable_fix to enable/disable the fix.
[1]: /messages/by-id/CA+hUKGLrN2M18-hACEJbNoj2sn_WoUj9rkkBeoPK7SY427pAnA@mail.gmail.com
--
Thanks,
-Amit Khandekar
Huawei Technologies
Attachments:
guc_for_testing.patchtext/x-patch; charset=US-ASCII; name=guc_for_testing.patchDownload
From 5d19626e35e50a5630e5f1a042f7ecee6acb7c70 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amitdkhan.pg@gmail.com>
Date: Wed, 9 Sep 2020 12:03:01 +0800
Subject: [PATCH 2/2] Add guc for ease of testing speedup.
This is only for testing the performance gain. Otherwise, to compare
the performance figures, it requires server restart, and the figures
anyway shift back and forth by 5-15 percent after each restart, which
creates lot of noise when comparing figures with and without fix.
With this, we can easily see at least 4-10% difference in execution
times by setting/unsetting the GUC enable_fix.
---
src/backend/executor/tqueue.c | 12 ++++++++++++
src/backend/utils/misc/guc.c | 12 ++++++++++++
2 files changed, 24 insertions(+)
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index c70ee0f39a..7dd60b5c7e 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -45,6 +45,7 @@ struct TupleQueueReader
shm_mq_handle *queue; /* shm_mq to receive from */
};
+extern bool enable_fix;
/*
* Receive a tuple from a query, and send it to the designated shm_mq.
*
@@ -60,12 +61,23 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
bool should_free;
/* Send the minimal tuple data. */
+ if (enable_fix)
+ {
minimal_tuple_data = ExecFetchSlotMinimalTupleData(slot, &len, &should_free);
result = shm_mq_send(tqueue->queue, len, minimal_tuple_data, false);
if (should_free)
pfree(minimal_tuple_data);
+ }
+ else
+ {
+ MinimalTuple tuple;
+ tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
+ result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
+ if (should_free)
+ pfree(tuple);
+ }
/* Check for failure. */
if (result == SHM_MQ_DETACHED)
return false;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index de87ad6ef7..8b3f1339cb 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -138,6 +138,8 @@ extern bool trace_syncscan;
extern bool optimize_bounded_sort;
#endif
+bool enable_fix = true;
+
static int GUC_check_errcode_value;
/* global variables for check hook support */
@@ -2036,6 +2038,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"enable_fix", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+ gettext_noop("dummy"),
+ gettext_noop("dummy"),
+ GUC_EXPLAIN
+ },
+ &enable_fix,
+ true,
+ NULL, NULL, NULL
+ },
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
--
2.17.1
0001-Avoid-redundant-tuple-copy-while-sending-tuples-to-G.patchtext/x-patch; charset=US-ASCII; name=0001-Avoid-redundant-tuple-copy-while-sending-tuples-to-G.patchDownload
From 286310e6a52af7883d20899163aaa349b479d614 Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amitdkhan.pg@gmail.com>
Date: Wed, 9 Sep 2020 11:52:59 +0800
Subject: [PATCH 1/2] Avoid redundant tuple copy while sending tuples to Gather
If the tuple to be sent is a heap tuple, get the pointer to the minimal
tuple data from the heap tuple, and use this as the source data for
shm_mq_send(). This allows us to prevent a tuple copy when the slot
is a heap tuple slot.
Device a new function ExecFetchSlotMinimalTupleData() that tries to
send an in-place minimal tuple data or returns a copy only if the
slot is something other than a heap tuple or minimal tuple slot.
This shows between 4 to 10% speed up for simple non-aggregate parallel
queries.
---
src/backend/executor/execTuples.c | 59 +++++++++++++++++++++++++++++++
src/backend/executor/tqueue.c | 17 +++++----
src/include/executor/tuptable.h | 2 ++
3 files changed, 71 insertions(+), 7 deletions(-)
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..55a6652b4e 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -1682,6 +1682,65 @@ ExecFetchSlotMinimalTuple(TupleTableSlot *slot,
}
}
+/* --------------------------------
+ * ExecFetchSlotMinimalTupleData
+ * Return minimal tuple data, or the data belonging to minimal tuple
+ * section if it's a heap tuple slot.
+ *
+ * This function is similar to ExecFetchSlotMinimalTuple(), but it goes a
+ * step further in trying to avoid redundant tuple copy. It does this by
+ * returning the in-place minimal tuple data region if it's a heap tuple.
+ * If the slot is neither a minimal tuple nor heap tuple slot, a minimal
+ * tuple copy is returned, and should_free is set to true. Callers can use
+ * this if they only want the underlying minimal tuple data.
+ * One use is where minimal tuple data is sent through the gather queues.
+ * The receiver end can then treat the data as a MinimalTupleData, but
+ * they should update it's t_len field, because the data might have
+ * originally belonged to a heap tuple rather than minimal tuple.
+ */
+char *
+ExecFetchSlotMinimalTupleData(TupleTableSlot *slot, uint32 *len,
+ bool *shouldFree)
+{
+ /*
+ * sanity checks
+ */
+ Assert(slot != NULL);
+ Assert(!TTS_EMPTY(slot));
+
+ if (slot->tts_ops->get_heap_tuple)
+ {
+ HeapTuple htuple;
+
+ if (shouldFree)
+ *shouldFree = false;
+ htuple = slot->tts_ops->get_heap_tuple(slot);
+ *len = htuple->t_len - MINIMAL_TUPLE_OFFSET;
+
+ return (char*) htuple->t_data + MINIMAL_TUPLE_OFFSET;
+ }
+ else
+ {
+ MinimalTuple mtuple;
+
+ if (slot->tts_ops->get_minimal_tuple)
+ {
+ if (shouldFree)
+ *shouldFree = false;
+ mtuple = slot->tts_ops->get_minimal_tuple(slot);
+ }
+ else
+ {
+ if (shouldFree)
+ *shouldFree = true;
+ mtuple = slot->tts_ops->copy_minimal_tuple(slot);
+ }
+ *len = mtuple->t_len;
+
+ return (char *) mtuple;
+ }
+}
+
/* --------------------------------
* ExecFetchSlotHeapTupleDatum
* Fetch the slot's tuple as a composite-type Datum.
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 30a264ebea..c70ee0f39a 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -54,16 +54,17 @@ static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
- MinimalTuple tuple;
+ char *minimal_tuple_data;
+ uint32 len;
shm_mq_result result;
bool should_free;
- /* Send the tuple itself. */
- tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
- result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
+ /* Send the minimal tuple data. */
+ minimal_tuple_data = ExecFetchSlotMinimalTupleData(slot, &len, &should_free);
+ result = shm_mq_send(tqueue->queue, len, minimal_tuple_data, false);
if (should_free)
- pfree(tuple);
+ pfree(minimal_tuple_data);
/* Check for failure. */
if (result == SHM_MQ_DETACHED)
@@ -201,10 +202,12 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
/*
* Return a pointer to the queue memory directly (which had better be
- * sufficiently aligned).
+ * sufficiently aligned). Also, the sender might not have updated the t_len
+ * if the data belonged to a heap tuple rather than a minimal tuple. So
+ * update it now.
*/
tuple = (MinimalTuple) data;
- Assert(tuple->t_len == nbytes);
+ tuple->t_len = nbytes;
return tuple;
}
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..da78929035 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -325,6 +325,8 @@ extern void ExecStoreHeapTupleDatum(Datum data, TupleTableSlot *slot);
extern HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree);
extern MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot,
bool *shouldFree);
+extern char *ExecFetchSlotMinimalTupleData(TupleTableSlot *slot, uint32 *len,
+ bool *shouldFree);
extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
--
2.17.1
On Wed, Sep 9, 2020 at 5:23 PM Amit Khandekar <amitdkhan.pg@gmail.com> wrote:
I went ahead and tried doing this. I chose an approach where we can
return the pointer to the in-place minimal tuple data if it's a
heap/buffer/minimal tuple slot. A new function
ExecFetchSlotMinimalTupleData() returns in-place minimal tuple data.
If it's neither heap, buffer or minimal tuple, it returns a copy as
usual. The receiver should not assume the data is directly taken from
MinimalTupleData, so it should set it's t_len to the number of bytes
returned. Patch attached
(0001-Avoid-redundant-tuple-copy-while-sending-tuples-to-G.patch)
+char *
+ExecFetchSlotMinimalTupleData(TupleTableSlot *slot, uint32 *len,
+ bool *shouldFree)
Interesting approach. It's a bit of a weird interface, returning a
pointer to a non-valid MinimalTuple that requires extra tweaking after
you copy it to make it a valid one and that you're not allowed to
tweak in-place. I'd probably make that return "const void *" just for
a bit of extra documentation. I wonder if there is a way we could
make "Minimal Tuples but with the length travelling separately (and
perhaps chopped off?)" into a first-class concept... It's also a
shame to be schlepping a bunch of padding bytes around.
tuple = (MinimalTuple) data;
- Assert(tuple->t_len == nbytes);
+ tuple->t_len = nbytes;
Hmm, so you have to scribble on shared memory on the receiving side.
I wondered about a couple of different ways to share the length field
with the shm_mq envelope, but that all seems a bit too weird...
Thomas, I guess you had a different approach in mind when you said
about "returning either success or
hey-that-buffer's-too-small-I-need-N-bytes". But what looks clear to
Yeah I tried some things like that, but I wasn't satisfied with any of
them; basically the extra work involved in negotiating the size was a
bit too high. On the other hand, because my interface was "please
write a MinimalTuple here!", it had the option to *form* a
MinimalTuple directly in place, whereas your approach can only avoid
creating and destroying a temporary tuple when the source is a heap
tuple.
me is that avoiding the copy shows consistent improvement of 4 to 10%
for simple parallel table scans. I tried my patch on both x86_64 and
arm64, and observed this speedup on both.
I think that's a great validation of the goal but I hope we can figure
out a way that avoids the temporary tuple for more cases. FWIW I saw
hash self joins running a couple of percent faster with one of my
abandoned patches.
Hi,
On 2020-09-17 14:20:50 +1200, Thomas Munro wrote:
I wonder if there is a way we could make "Minimal Tuples but with the
length travelling separately (and perhaps chopped off?)" into a
first-class concept... It's also a shame to be schlepping a bunch of
padding bytes around.
There really is no justification for having MinimalTuples, as we have
them today at least, anymore. We used to rely on being able to construct
pointers to MinimalTuples that are mostly compatible with HeapTuple. But
I think there's none of those left since PG 12.
I think it'd make a bit more sense to do some steps towards having a
more suitable "minimal" tuple representation, rather than doing this
local, pretty ugly, hacks. A good way would be to just starting to
remove the padding, unnecessary fields etc from MinimalTuple.
I also think that it'd be good to look at a few of the other places that
are heavily bottlenecked by MinimalTuple overhead before designing new
API around this. IIRC it's e.g. very easy to see hash joins spending a
lot of time doing MinimalTuple copies & conversions.
tuple = (MinimalTuple) data; - Assert(tuple->t_len == nbytes); + tuple->t_len = nbytes;Hmm, so you have to scribble on shared memory on the receiving side.
Ick, I would really like to avoid this.
Thomas, I guess you had a different approach in mind when you said
about "returning either success or
hey-that-buffer's-too-small-I-need-N-bytes". But what looks clear toYeah I tried some things like that, but I wasn't satisfied with any of
them; basically the extra work involved in negotiating the size was a
bit too high. On the other hand, because my interface was "please
write a MinimalTuple here!", it had the option to *form* a
MinimalTuple directly in place, whereas your approach can only avoid
creating and destroying a temporary tuple when the source is a heap
tuple.
There's a lot of cases where the source is a virtual slot (since we'll
often project stuff below Gather). So it'd be quite advantageous to
avoid building an unnecessary HeapTuple in those cases.
I wonder if it would be sensible to build minimal tuples using
tts_values/isnull in some cases. This might even be advantageous in
case of heap / minimal tuples, because IIRC right now the code will
materialize the slot unnecessarily. Not sure how common that is.
Greetings,
Andres Freund
On Thu, 17 Sep 2020 at 08:55, Andres Freund <andres@anarazel.de> wrote:
Hi,
On 2020-09-17 14:20:50 +1200, Thomas Munro wrote:
I wonder if there is a way we could make "Minimal Tuples but with the
length travelling separately (and perhaps chopped off?)" into a
first-class concept... It's also a shame to be schlepping a bunch of
padding bytes around.
Yeah, I think we can pass a "length" data separately, but since the
receiver end already is assuming that it knows the received data is a
minimal tuple, I thought why not skip passing this redundant
component. But anyways, if you and Andres are suggesting that being
able to skip the copy is important for virtual tuples as well, then I
think the approach you suggested (supplying an allocated memory to the
tuple API for conversion) would be one of the better options with us,
if not the only good option. Maybe I will try looking into the shm_mq
working to see if we can come up with a good solution.
There really is no justification for having MinimalTuples, as we have
them today at least, anymore. We used to rely on being able to construct
pointers to MinimalTuples that are mostly compatible with HeapTuple. But
I think there's none of those left since PG 12.
Ah ok.
I think it'd make a bit more sense to do some steps towards having a
more suitable "minimal" tuple representation, rather than doing this
local, pretty ugly, hacks. A good way would be to just starting to
remove the padding, unnecessary fields etc from MinimalTuple.
So there are two things we wish to do :
1. Prevent an extra tuple forming step before sending minimal tuple
data. Possibly device an shm_mq API to get memory to write tuple of a
given length, and device something like
FormMinimalTupleDataInHere(memory_allocated_by_shm_mq) which will
write minimal tuple data.
2. Shrink the MinimalTupleData structure because it no longer needs
the current padding etc and we can substitute this new MinimalTuple
structure with the current one all over the code wherever it is
currently being used.
If we remove the unnecessary fields from the tuple data being sent to
Gather node, then we need to again form a MinimalTuple at the
receiving end, which again adds an extra tuple forming. So I
understand, that's the reason why you are saying we should shrink the
MinimalTupleData structure itself, in which case we will continue to
use the received new MinimalTupledata as an already-formed tuple, like
how we are doing now.
Now, the above two things (1. and 2.) look independent to me. Suppose
we first do 1. i.e. we come up with a good way to form an in-place
MinimalTuple at the sender's end, without any change to the
MinimalTupleData. And then when we do 2. i.e. shrink the
MinimalTupleData; but for that, we won't require any change in the
in-place-tuple-forming API we wrote in 1. . Just the existing
underlying function heap_form_minimal_tuple() or something similar
might need to be changed. At least that's what I feel right now.
I also think that it'd be good to look at a few of the other places that
are heavily bottlenecked by MinimalTuple overhead before designing new
API around this. IIRC it's e.g. very easy to see hash joins spending a
lot of time doing MinimalTuple copies & conversions.
Yeah, makes sense. The above FormMinimalTupleDataInHere() should be
able to be used for these other places as well. Will keep that in
mind.
Thomas, I guess you had a different approach in mind when you said
about "returning either success or
hey-that-buffer's-too-small-I-need-N-bytes". But what looks clear toYeah I tried some things like that, but I wasn't satisfied with any of
them; basically the extra work involved in negotiating the size was a
bit too high.
Hmm, ok. Let me see if there is any way around this.
On the other hand, because my interface was "please
write a MinimalTuple here!", it had the option to *form* a
MinimalTuple directly in place, whereas your approach can only avoid
creating and destroying a temporary tuple when the source is a heap
tuple.
True.
There's a lot of cases where the source is a virtual slot (since we'll
often project stuff below Gather). So it'd be quite advantageous to
avoid building an unnecessary HeapTuple in those cases.
Yeah right.