Multi Inserts in CREATE TABLE AS - revived patch
Hi,
I would like to propose an updated patch on multi/bulk inserts in CTAS [1]/messages/by-id/CAEET0ZHRWxbRUgwzUK_tOFDWx7VE2-P=xMBT6-N+gAa9WQ=xxA@mail.gmail.com
that tries to address the review comments that came up in [1]/messages/by-id/CAEET0ZHRWxbRUgwzUK_tOFDWx7VE2-P=xMBT6-N+gAa9WQ=xxA@mail.gmail.com. One of the
main review comments was to calculate/estimate the tuple size to decide on
when to flush. I tried to solve this point with a new function
GetTupleSize()(see the patch for implementation).
I did some testing with custom configuration[2]The postgresql.conf used: shared_buffers = 40GB synchronous_commit = off checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off.
Use case 1- 100mn tuples, 2 integer columns, exec time in sec:
HEAD: *131.507* when the select part is not parallel, 128.832 when the
select part is parallel
Patch: 98.925 when the select part is not parallel, *52.901* when the
select part is parallel
Use case 2- 10mn tuples, 4 integer and 6 text columns, exec time in sec:
HEAD: *76.801* when the select part is not parallel, 66.074 when the select
part is parallel
Patch: 74.083 when the select part is not parallel, *57.739* when the
select part is parallel
Thoughts?
If the approach followed in the patch looks okay, I can work on a separate
patch for multi inserts in refresh materialized view cases.
I thank Simon Riggs for the offlist discussion.
PS: I chose to start a new thread as the previous thread [1]/messages/by-id/CAEET0ZHRWxbRUgwzUK_tOFDWx7VE2-P=xMBT6-N+gAa9WQ=xxA@mail.gmail.com was closed in
the CF. I hope that's not a problem.
[1]: /messages/by-id/CAEET0ZHRWxbRUgwzUK_tOFDWx7VE2-P=xMBT6-N+gAa9WQ=xxA@mail.gmail.com
/messages/by-id/CAEET0ZHRWxbRUgwzUK_tOFDWx7VE2-P=xMBT6-N+gAa9WQ=xxA@mail.gmail.com
[2]: The postgresql.conf used: shared_buffers = 40GB synchronous_commit = off checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off
shared_buffers = 40GB
synchronous_commit = off
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v1-0001-Multi-Inserts-in-Create-Table-As.patchapplication/octet-stream; name=v1-0001-Multi-Inserts-in-Create-Table-As.patchDownload
From 602e8e4c7ba80f3ebfa5870bb5887a32aa4dbc8b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Tue, 3 Nov 2020 11:20:59 +0530
Subject: [PATCH v1] Multi Inserts in Create Table As.
This could improve the performance and also benefits Create
Materialized View as it uses the code of Create Table As.
---
src/backend/commands/copy.c | 26 ++-----
src/backend/commands/createas.c | 112 ++++++++++++++++++++++++++----
src/backend/executor/execTuples.c | 66 ++++++++++++++++++
src/include/access/tableam.h | 15 ++++
src/include/executor/tuptable.h | 2 +-
5 files changed, 187 insertions(+), 34 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 115860a9d4..29fd28051e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -240,18 +240,6 @@ typedef struct
uint64 processed; /* # of tuples processed */
} DR_copy;
-
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list. Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES 1000
-
/*
* Flush buffers if there are >= this many bytes, as counted by the input
* size, of tuples stored.
@@ -264,11 +252,11 @@ typedef struct
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
- uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -2396,7 +2384,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
@@ -2455,8 +2443,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
- if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
- miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+ miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
return true;
return false;
}
@@ -2574,7 +2562,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -2666,7 +2654,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
int nused = buffer->nused;
Assert(buffer != NULL);
- Assert(nused < MAX_BUFFERED_TUPLES);
+ Assert(nused < MAX_MULTI_INSERT_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..c4f91c8b5d 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -553,6 +560,17 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
myState->bistate = GetBulkInsertState();
+ memset(myState->mi_slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "intorel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -562,28 +580,82 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
}
/*
- * intorel_receive --- receive one tuple
+ * intorel_flush_multi_insert --- insert multiple tuples
*/
-static bool
-intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
{
- DR_intorel *myState = (DR_intorel *) self;
+ MemoryContext oldcontext;
+ int i;
- /*
- * Note that the input slot might not be of the type of the target
- * relation. That's supported by table_tuple_insert(), but slightly less
- * efficient than inserting with the right slot - but the alternative
- * would be to copy into a slot of the right type, which would not be
- * cheap either. This also doesn't allow accessing per-AM data (say a
- * tuple's xmin), but since we don't do that here...
- */
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
- table_tuple_insert(myState->rel,
- slot,
+ table_multi_insert(myState->rel,
+ myState->mi_slots,
+ myState->mi_slots_num,
myState->output_cid,
myState->ti_options,
myState->bistate);
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
+/*
+ * intorel_receive --- receive one tuple
+ */
+static bool
+intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_intorel *myState = (DR_intorel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
+
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->rel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ intorel_flush_multi_insert(myState);
+ }
+ else
+ {
+ /*
+ * Note that the input slot might not be of the type of the target
+ * relation. That's supported by table_tuple_insert(), but slightly
+ * less efficient than inserting with the right slot - but the
+ * alternative would be to copy into a slot of the right type, which
+ * would not be cheap either. This also doesn't allow accessing per-AM
+ * data (say a tuple's xmin), but since we don't do that here...
+ */
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
+
/* We know this is a newly created relation, so there are no indexes */
return true;
@@ -596,11 +668,23 @@ static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ intorel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
+
/* close rel, but keep lock until commit */
table_close(myState->rel, NoLock);
myState->rel = NULL;
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
ExecDropSingleTupleTableSlot(tstate->slot);
pfree(tstate);
}
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_VIRTUAL(slot))
+ {
+ /* Size calculation is inspired from tts_virtual_materialize(). */
+ TupleDesc desc = slot->tts_tupleDescriptor;
+
+ for (int natt = 0; natt < desc->natts; natt++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, natt);
+ Datum val;
+
+ if (att->attbyval)
+ sz += att->attlen;
+
+ if (slot->tts_isnull[natt])
+ continue;
+
+ val = slot->tts_values[natt];
+
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz = att_addlength_datum(sz, att->attlen, val);
+ }
+
+ /*
+ * We are not interested in proceeding further if the computed size
+ * crosses maxsize limit that we are looking for.
+ */
+ if (maxsize != 0 && sz >= maxsize)
+ break;
+ }
+ }
+
+ if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+ sz = tuple->t_len;
+
+ return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
/* Follow update chain and lock latest version of tuple */
#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION (1 << 1)
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES 65535
/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
#ifndef FRONTEND
--
2.25.1
On Tue, Nov 3, 2020 at 4:54 PM Bharath Rupireddy <
bharath.rupireddyforpostgres@gmail.com> wrote:
Use case 1- 100mn tuples, 2 integer columns, exec time in sec:
HEAD: 131.507 when the select part is not parallel, 128.832 when the
select part is parallel
Patch: 98.925 when the select part is not parallel, 52.901 when the
select part is parallel
Use case 2- 10mn tuples, 4 integer and 6 text columns, exec time in sec:
HEAD: 76.801 when the select part is not parallel, 66.074 when the select
part is parallel
Patch: 74.083 when the select part is not parallel, 57.739 when the
select part is parallel
I did some more testing with v1 patch: execution time is in seconds, each
test is run 2 times, with custom configuration [1]The postgresql.conf used: shared_buffers = 40GB synchronous_commit = off checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off.
Use case 3: 1 int and 1 text column. each row size 129 bytes, size of 1
text column 101 bytes, number of rows 100million, size of heap file 12.9GB.
HEAD: 253.227, 259.575
Patch: 177.921, 174.196
We get better performance 1.4X with the patch.
Use case 4: 1 int and 30 text columns. each row size 28108 bytes, size of 1
text column 932 bytes, number of rows 10000, size of heap file 281.08MB.
HEAD: 222.812, 218.837
Patch: 222.492, 222.295
We don't see much difference with and without patch. Each time only 2
tuples(2*28108 = 56216 bytes < MAX_MULTI_INSERT_BUFFERED_BYTES(65535
bytes)) are buffered and flushed.
Use case 5: 1 int and 75 text columns. each row size 70228 bytes, size of 1
text column 932 bytes, number of rows 10000, size of heap file 702.28MB.
HEAD: 554.709, 562.745
Patch: 553.378, 560.370
We don't see much difference with and without patch. Since each row
size(70228 bytes) is bigger than the MAX_MULTI_INSERT_BUFFERED_BYTES(65535
bytes), multi insert code is not picked, each single row is inserted with
table_tuple_insert() itself.
Use case 6: 1 int and 1 text column. each row size 9205 bytes, size of 1
text column 9173 bytes, number of rows 10000, size of heap file 92.05MB.
HEAD: 70.583, 70251
Patch: 72.633, 73.521
We see 2-3 seconds more with patch. When I intentionally made the computed
tuple size to 0(sz =0) after GetTupleSize(), which means the single inserts
happen, the results are 70.364, 70.406. Looks like this 2-3 seconds extra
time is due to the multi insert code and happens for with this use case
only. And I think this should not be a problem as the difference is not
huge.
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
*+. sz = 0;*
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
[1]: The postgresql.conf used: shared_buffers = 40GB synchronous_commit = off checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off
shared_buffers = 40GB
synchronous_commit = off
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Tue, Nov 3, 2020 at 4:54 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
If the approach followed in the patch looks okay, I can work on a separate patch for multi inserts in refresh materialized view cases.
Hi, I'm attaching a v2 patch that has multi inserts for CTAS as well
as REFRESH MATERIALiZED VIEW.
I did some testing: exec time in seconds.
Use case 1: 1 int and 1 text column. each row size 129 bytes, size of
1 text column 101 bytes, number of rows 100million, size of heap file
12.9GB.
HEAD: 220.733, 220.428
Patch: 151.923, 152.484
Thoughts?
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v2-0001-Multi-Inserts-in-CTAS-Refresh-Materialized-View.patchapplication/octet-stream; name=v2-0001-Multi-Inserts-in-CTAS-Refresh-Materialized-View.patchDownload
From d62023fd55756e4914fefd412d9380dadb7b3524 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 9 Nov 2020 15:35:01 +0530
Subject: [PATCH v2] Multi Inserts in CTAS & Refresh Materialized View.
This patch adds multi inserts to
1)Create Table As and Create Materialized View
2)Refresh Materialized View
---
src/backend/commands/copy.c | 26 ++-----
src/backend/commands/createas.c | 112 ++++++++++++++++++++++++++----
src/backend/commands/matview.c | 96 +++++++++++++++++++++++--
src/backend/executor/execTuples.c | 66 ++++++++++++++++++
src/include/access/tableam.h | 15 ++++
src/include/executor/tuptable.h | 2 +-
6 files changed, 278 insertions(+), 39 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 115860a9d4..29fd28051e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -240,18 +240,6 @@ typedef struct
uint64 processed; /* # of tuples processed */
} DR_copy;
-
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list. Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES 1000
-
/*
* Flush buffers if there are >= this many bytes, as counted by the input
* size, of tuples stored.
@@ -264,11 +252,11 @@ typedef struct
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
- uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -2396,7 +2384,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
@@ -2455,8 +2443,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
- if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
- miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+ miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
return true;
return false;
}
@@ -2574,7 +2562,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -2666,7 +2654,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
int nused = buffer->nused;
Assert(buffer != NULL);
- Assert(nused < MAX_BUFFERED_TUPLES);
+ Assert(nused < MAX_MULTI_INSERT_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..c4f91c8b5d 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -553,6 +560,17 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
myState->bistate = GetBulkInsertState();
+ memset(myState->mi_slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "intorel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -562,28 +580,82 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
}
/*
- * intorel_receive --- receive one tuple
+ * intorel_flush_multi_insert --- insert multiple tuples
*/
-static bool
-intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
{
- DR_intorel *myState = (DR_intorel *) self;
+ MemoryContext oldcontext;
+ int i;
- /*
- * Note that the input slot might not be of the type of the target
- * relation. That's supported by table_tuple_insert(), but slightly less
- * efficient than inserting with the right slot - but the alternative
- * would be to copy into a slot of the right type, which would not be
- * cheap either. This also doesn't allow accessing per-AM data (say a
- * tuple's xmin), but since we don't do that here...
- */
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
- table_tuple_insert(myState->rel,
- slot,
+ table_multi_insert(myState->rel,
+ myState->mi_slots,
+ myState->mi_slots_num,
myState->output_cid,
myState->ti_options,
myState->bistate);
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
+/*
+ * intorel_receive --- receive one tuple
+ */
+static bool
+intorel_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_intorel *myState = (DR_intorel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
+
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->rel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ intorel_flush_multi_insert(myState);
+ }
+ else
+ {
+ /*
+ * Note that the input slot might not be of the type of the target
+ * relation. That's supported by table_tuple_insert(), but slightly
+ * less efficient than inserting with the right slot - but the
+ * alternative would be to copy into a slot of the right type, which
+ * would not be cheap either. This also doesn't allow accessing per-AM
+ * data (say a tuple's xmin), but since we don't do that here...
+ */
+ table_tuple_insert(myState->rel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
+
/* We know this is a newly created relation, so there are no indexes */
return true;
@@ -596,11 +668,23 @@ static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ intorel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
+
/* close rel, but keep lock until commit */
table_close(myState->rel, NoLock);
myState->rel = NULL;
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index f80a9e96a9..b05bf357ea 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -56,6 +56,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_transientrel;
static int matview_maintenance_depth = 0;
@@ -460,6 +467,18 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
myState->bistate = GetBulkInsertState();
+ memset(myState->mi_slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "transientrel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
+
/*
* Valid smgr_targblock implies something already wrote to the relation.
* This may be harmless, but this function hasn't planned for it.
@@ -467,6 +486,34 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
}
+/*
+ * transientrel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+transientrel_flush_multi_insert(DR_transientrel *myState)
+{
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+ table_multi_insert(myState->transientrel,
+ myState->mi_slots,
+ myState->mi_slots_num,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
/*
* transientrel_receive --- receive one tuple
*/
@@ -474,7 +521,33 @@ static bool
transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
+
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->transientrel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ transientrel_flush_multi_insert(myState);
+ }
+ else
+ {
/*
* Note that the input slot might not be of the type of the target
* relation. That's supported by table_tuple_insert(), but slightly less
@@ -484,11 +557,12 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
* tuple's xmin), but since we don't do that here...
*/
- table_tuple_insert(myState->transientrel,
- slot,
- myState->output_cid,
- myState->ti_options,
- myState->bistate);
+ table_tuple_insert(myState->transientrel,
+ slot,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+ }
/* We know this is a newly created relation, so there are no indexes */
@@ -502,11 +576,23 @@ static void
transientrel_shutdown(DestReceiver *self)
{
DR_transientrel *myState = (DR_transientrel *) self;
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ transientrel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
+
/* close transientrel, but keep lock until commit */
table_close(myState->transientrel, NoLock);
myState->transientrel = NULL;
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
ExecDropSingleTupleTableSlot(tstate->slot);
pfree(tstate);
}
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_VIRTUAL(slot))
+ {
+ /* Size calculation is inspired from tts_virtual_materialize(). */
+ TupleDesc desc = slot->tts_tupleDescriptor;
+
+ for (int natt = 0; natt < desc->natts; natt++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, natt);
+ Datum val;
+
+ if (att->attbyval)
+ sz += att->attlen;
+
+ if (slot->tts_isnull[natt])
+ continue;
+
+ val = slot->tts_values[natt];
+
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz = att_addlength_datum(sz, att->attlen, val);
+ }
+
+ /*
+ * We are not interested in proceeding further if the computed size
+ * crosses maxsize limit that we are looking for.
+ */
+ if (maxsize != 0 && sz >= maxsize)
+ break;
+ }
+ }
+
+ if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+ sz = tuple->t_len;
+
+ return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
/* Follow update chain and lock latest version of tuple */
#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION (1 << 1)
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES 65535
/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
#ifndef FRONTEND
--
2.25.1
On Nov 9, 2020, at 6:41 PM, Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> wrote:
On Tue, Nov 3, 2020 at 4:54 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:If the approach followed in the patch looks okay, I can work on a separate patch for multi inserts in refresh materialized view cases.
Hi, I'm attaching a v2 patch that has multi inserts for CTAS as well
as REFRESH MATERIALiZED VIEW.I did some testing: exec time in seconds.
Use case 1: 1 int and 1 text column. each row size 129 bytes, size of
1 text column 101 bytes, number of rows 100million, size of heap file
12.9GB.
HEAD: 220.733, 220.428
Patch: 151.923, 152.484Thoughts?
With Regards,
Bharath Rupireddy.
EnterpriseDB: https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.enterprisedb.com%2F&amp;data=04%7C01%7Cguopa%40vmware.com%7C2471a90558ce4bf0af5b08d8849c03bb%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637405152899337347%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&amp;sdata=QKeRMGQjOlOL%2FlQv%2BuEAb2ocLVq6zqXESKoNOaJ6YCo%3D&amp;reserved=0
<v2-0001-Multi-Inserts-in-CTAS-Refresh-Materialized-View.patch>
Thanks for doing this. There might be another solution - use raw insert interfaces (i.e. raw_heap_insert()).
Attached is the test (not formal) patch that verifies this idea. raw_heap_insert() writes the page into the
table files directly and also write the FPI xlog when the tuples filled up the whole page. This seems be
more efficient.
In addition, those raw write interfaces call smgrimmedsync() when finishing raw inserting, this is because
the write bypasses the shared buffer so a CHECKPOINT plus crash might cause data corruption since
some FPI xlogs cannot be replayed and those table files are not fsync-ed during crash. It seems that a sync
request could be forwarded to the checkpointer for each table segment file and then we do not need to call
smgrimmedsync(). If the theory is correct this should be in a separate patch. Anyway I tested this idea
also by simply commenting out the smgrimmedsync() call in heap_raw_insert_end() (a new function in
the attached patch) since forwarding fsync request is light-weight.
I did a quick and simple testing. The test environment is a centos6 vm with 7G memory on my Mac laptop.
-O3 gcc compiler option; shared_buffers as 2GB. Did not check if parallel scanning is triggered by the test
query and the data volume is not large so test time is not long.
Here are the test script.
create table t1 (a int, b int, c int, d int);
insert into t1 select i,i,i,i from generate_series(1,10000000) i;
show shared_buffers;
\timing on
create table t2 as select * from t1;
\timing off
Here are the results:
HEAD (37d2ff380312):
Time: 5143.041 ms (00:05.143)
Multi insert patch:
Time: 4456.461 ms (00:04.456)
Raw insert (attached):
Time: 2317.453 ms (00:02.317)
Raw insert + no smgrimmedsync():
Time: 2103.610 ms (00:02.104).
From the above data raw insert is better; also forwarding sync should be able to improve further
(Note my laptop is with SSD so on machine with SATA/SAS, I believe forwarding sync should
be able to help more.)
I tested removing smgrimmedsync in "vacuum full” code that uses raw insert also. FYI.
HEAD:
Time: 3567.036 ms (00:03.567)
no smgrimmedsync:
Time: 3023.487 ms (00:03.023)
Raw insert could be used on CTAS & Create MatView. For Refresh MatView the code is a bit
different. I did not spend more time on this so not sure raw insert could be used for that.
But I think the previous multi insert work could be still used in at least "INSERT tbl SELECT…” (if the INSERT
is a simple one, e.g. no trigger, no index, etc).
Regards,
Paul
Attachments:
v1-0001-ctas-using-raw-insert.patchapplication/octet-stream; name=v1-0001-ctas-using-raw-insert.patchDownload
From 6252155dcdd0a21e15fec3d8e8fe77a9b2f2b29a Mon Sep 17 00:00:00 2001
From: Paul Guo <guopa@vmware.com>
Date: Tue, 10 Nov 2020 10:26:38 +0800
Subject: [PATCH v1] ctas using raw insert
---
src/backend/access/heap/heapam_handler.c | 5 ++
src/backend/access/heap/rewriteheap.c | 84 ++++++++++++++++++++++++++++++++
src/backend/commands/createas.c | 10 +++-
src/include/access/heapam.h | 4 ++
src/include/access/rewriteheap.h | 2 +
src/include/access/tableam.h | 22 +++++++++
6 files changed, 126 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index dcaea7135f..8caa5dfef9 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2529,6 +2529,11 @@ static const TableAmRoutine heapam_methods = {
.tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative,
.multi_insert = heap_multi_insert,
+
+ .tuple_raw_insert_begin = heap_raw_insert_begin,
+ .tuple_raw_insert = heap_raw_insert,
+ .tuple_raw_insert_end = heap_raw_insert_end,
+
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 39e33763df..47f99ffa74 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -292,6 +292,90 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
return state;
}
+#if 0
+typedef struct RawWriteStateData
+{
+ Relation rs_new_rel; /* destination heap */
+ Page rs_buffer; /* page currently being built */
+ BlockNumber rs_blockno; /* block where page will go */
+ bool rs_buffer_valid; /* T if any tuples in buffer */
+ MemoryContext rs_cxt;
+} *RawWriteState;
+#endif
+
+void heap_raw_insert(RewriteState state, TupleTableSlot *slot)
+{
+ HeapTuple tuple;
+ bool shouldFree;
+
+ tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+ raw_heap_insert(state, tuple);
+
+ if (shouldFree)
+ pfree(tuple);
+}
+
+RewriteState
+heap_raw_insert_begin(Relation new_heap)
+{
+ RewriteState state;
+ MemoryContext rw_cxt;
+ MemoryContext old_cxt;
+
+ rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "Heap raw write",
+ ALLOCSET_DEFAULT_SIZES);
+ old_cxt = MemoryContextSwitchTo(rw_cxt);
+
+ /* Create and fill in the state struct */
+ state = palloc0(sizeof(RewriteStateData));
+
+ state->rs_new_rel = new_heap;
+ state->rs_buffer = (Page) palloc(BLCKSZ);
+ /* new_heap needn't be empty, just locked */
+ state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
+ state->rs_buffer_valid = false;
+ state->rs_cxt = rw_cxt;
+
+ MemoryContextSwitchTo(old_cxt);
+
+ return state;
+}
+
+void
+heap_raw_insert_end(RewriteState state)
+{
+ /* Write the last page, if any */
+ if (state->rs_buffer_valid)
+ {
+ if (RelationNeedsWAL(state->rs_new_rel))
+ log_newpage(&state->rs_new_rel->rd_node,
+ MAIN_FORKNUM,
+ state->rs_blockno,
+ state->rs_buffer,
+ true);
+ RelationOpenSmgr(state->rs_new_rel);
+
+ PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
+
+ smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno,
+ (char *) state->rs_buffer, true);
+ }
+
+ /*
+ * When we WAL-logged rel pages, we must nonetheless fsync them. The
+ * reason is the same as in storage.c's RelationCopyStorage(): we're
+ * writing data that's not in shared buffers, and so a CHECKPOINT
+ * occurring during the rewriteheap operation won't have fsync'd data we
+ * wrote before the checkpoint.
+ */
+ if (RelationNeedsWAL(state->rs_new_rel))
+ smgrimmedsync(state->rs_new_rel->rd_smgr, MAIN_FORKNUM); /* test the time */
+
+ /* Deleting the context frees everything */
+ MemoryContextDelete(state->rs_cxt);
+}
+
/*
* End a rewrite.
*
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..723226f029 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,7 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ RewriteState state;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -549,11 +550,13 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
* Fill private fields of myState for use by later routines
*/
myState->rel = intoRelationDesc;
+ myState->state = table_tuple_raw_insert_begin(intoRelationDesc);
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
myState->bistate = GetBulkInsertState();
+
/*
* Valid smgr_targblock implies something already wrote to the relation.
* This may be harmless, but this function hasn't planned for it.
@@ -578,12 +581,14 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
* tuple's xmin), but since we don't do that here...
*/
+ table_tuple_raw_insert(myState->rel, myState->state, slot);
+#if 0
table_tuple_insert(myState->rel,
slot,
myState->output_cid,
myState->ti_options,
myState->bistate);
-
+#endif
/* We know this is a newly created relation, so there are no indexes */
return true;
@@ -599,7 +604,10 @@ intorel_shutdown(DestReceiver *self)
FreeBulkInsertState(myState->bistate);
+#if 0
table_finish_bulk_insert(myState->rel, myState->ti_options);
+#endif
+ table_tuple_raw_insert_end(myState->rel, myState->state);
/* close rel, but keep lock until commit */
table_close(myState->rel, NoLock);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 92b19dba32..4b05d963c2 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -135,6 +135,10 @@ extern BulkInsertState GetBulkInsertState(void);
extern void FreeBulkInsertState(BulkInsertState);
extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
+extern void heap_raw_insert(RewriteState state, TupleTableSlot *slot);
+extern RewriteState heap_raw_insert_begin(Relation new_heap);
+extern void heap_raw_insert_end(RewriteState state);
+
extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
int options, BulkInsertState bistate);
extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index e6d7fa1e65..17fa31f066 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -25,6 +25,8 @@ extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap,
TransactionId OldestXmin, TransactionId FreezeXid,
MultiXactId MultiXactCutoff);
extern void end_heap_rewrite(RewriteState state);
+extern RewriteState heap_raw_insert_begin(Relation new_heap);
+extern void heap_raw_insert_end(RewriteState state);
extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
HeapTuple newTuple);
extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..75e245bd56 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -23,6 +23,7 @@
#include "utils/guc.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
+#include "access/rewriteheap.h"
#define DEFAULT_TABLE_ACCESS_METHOD "heap"
@@ -352,6 +353,9 @@ typedef struct TableAmRoutine
* Manipulations of physical tuples.
* ------------------------------------------------------------------------
*/
+ RewriteState (*tuple_raw_insert_begin)(Relation new_heap);
+ void (*tuple_raw_insert_end) (RewriteState state);
+ void (*tuple_raw_insert)(RewriteState state, TupleTableSlot *slot);
/* see table_tuple_insert() for reference about parameters */
void (*tuple_insert) (Relation rel, TupleTableSlot *slot,
@@ -1140,6 +1144,24 @@ table_compute_xid_horizon_for_tuples(Relation rel,
* ----------------------------------------------------------------------------
*/
+static inline RewriteState
+table_tuple_raw_insert_begin(Relation rel)
+{
+ return rel->rd_tableam->tuple_raw_insert_begin(rel);
+}
+
+static inline void
+table_tuple_raw_insert(Relation rel, RewriteState state, TupleTableSlot *slot)
+{
+ rel->rd_tableam->tuple_raw_insert(state, slot);
+}
+
+static inline void
+table_tuple_raw_insert_end(Relation rel, RewriteState state)
+{
+ rel->rd_tableam->tuple_raw_insert_end(state);
+}
+
/*
* Insert a tuple from a slot into table AM routine.
*
--
2.14.3
On Tue, Nov 10, 2020 at 10:17:15AM +0000, Paul Guo wrote:
Raw insert could be used on CTAS & Create MatView. For Refresh MatView the code is a bit
different. I did not spend more time on this so not sure raw insert could be used for that.But I think the previous multi insert work could be still used in at least "INSERT tbl SELECT…” (if the INSERT
is a simple one, e.g. no trigger, no index, etc).
Note that I've started that idea on another thread:
https://commitfest.postgresql.org/30/2553/
- should INSERT SELECT use a BulkInsertState? (and multi_insert)
There's also this one:
https://commitfest.postgresql.org/30/2818/
- split copy.c, Heikki
--
Justin
On Tue, Nov 10, 2020 at 3:47 PM Paul Guo <guopa@vmware.com> wrote:
Thanks for doing this. There might be another solution - use raw insert interfaces (i.e. raw_heap_insert()).
Attached is the test (not formal) patch that verifies this idea. raw_heap_insert() writes the page into the
table files directly and also write the FPI xlog when the tuples filled up the whole page. This seems be
more efficient.
Thanks. Will the new raw_heap_insert() APIs scale well (i.e. extend
the table parallelly) with parallelism? The existing
table_multi_insert() API scales well, see, for instance, the benefit
with parallel copy[1]/messages/by-id/CALj2ACWeQVd-xoQZHGT01_33St4xPoZQibWz46o7jW1PE3XOqQ@mail.gmail.com and parallel multi inserts in CTAS[2]/messages/by-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf+0-Jg+KYT7ZO-Ug@mail.gmail.com.
[1]: /messages/by-id/CALj2ACWeQVd-xoQZHGT01_33St4xPoZQibWz46o7jW1PE3XOqQ@mail.gmail.com
[2]: /messages/by-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf+0-Jg+KYT7ZO-Ug@mail.gmail.com
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Nov 13, 2020, at 7:21 PM, Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> wrote:
On Tue, Nov 10, 2020 at 3:47 PM Paul Guo <guopa@vmware.com> wrote:
Thanks for doing this. There might be another solution - use raw insert interfaces (i.e. raw_heap_insert()).
Attached is the test (not formal) patch that verifies this idea. raw_heap_insert() writes the page into the
table files directly and also write the FPI xlog when the tuples filled up the whole page. This seems be
more efficient.Thanks. Will the new raw_heap_insert() APIs scale well (i.e. extend
the table parallelly) with parallelism? The existing
table_multi_insert() API scales well, see, for instance, the benefit
with parallel copy[1] and parallel multi inserts in CTAS[2].
Yes definitely some work needs to be done to make raw heap insert interfaces fit the parallel work, but
it seems that there is no hard blocking issues for this?
Show quoted text
[1] - https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.postgresql.org%2Fmessage-id%2FCALj2ACWeQVd-xoQZHGT01_33St4xPoZQibWz46o7jW1PE3XOqQ%2540mail.gmail.com&amp;data=04%7C01%7Cguopa%40vmware.com%7C6fb10e05b7a243e0042608d887c651ac%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637408633136197927%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000&amp;sdata=fyQaor4yhmqVRYcK78JyPW25i7zjRoWXqZVf%2BfFYq1w%3D&amp;reserved=0
[2] - https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.postgresql.org%2Fmessage-id%2FCALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf%252B0-Jg%252BKYT7ZO-Ug%2540mail.gmail.com&amp;data=04%7C01%7Cguopa%40vmware.com%7C6fb10e05b7a243e0042608d887c651ac%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637408633136207912%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000&amp;sdata=CkFToJ11nmoyT2SodsJYYMOGP3cHSpeNYn8ZTYurn3U%3D&amp;reserved=0
On Mon, Nov 16, 2020 at 8:02 PM Paul Guo <guopa@vmware.com> wrote:
On Nov 13, 2020, at 7:21 PM, Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> wrote:
On Tue, Nov 10, 2020 at 3:47 PM Paul Guo <guopa@vmware.com> wrote:
Thanks for doing this. There might be another solution - use raw insert interfaces (i.e. raw_heap_insert()).
Attached is the test (not formal) patch that verifies this idea. raw_heap_insert() writes the page into the
table files directly and also write the FPI xlog when the tuples filled up the whole page. This seems be
more efficient.Thanks. Will the new raw_heap_insert() APIs scale well (i.e. extend
the table parallelly) with parallelism? The existing
table_multi_insert() API scales well, see, for instance, the benefit
with parallel copy[1] and parallel multi inserts in CTAS[2].Yes definitely some work needs to be done to make raw heap insert interfaces fit the parallel work, but
it seems that there is no hard blocking issues for this?
I may be wrong here. If we were to allow raw heap insert APIs to
handle parallelism, shouldn't we need some sort of shared memory to
allow coordination among workers? If we do so, at the end, aren't
these raw insert APIs equivalent to current table_multi_insert() API
which uses a separate shared ring buffer(bulk insert state) for
insertions?
And can we think of these raw insert APIs similar to the behaviour of
table_multi_insert() API for unlogged tables?
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attaching v2 patch, rebased on the latest master 17958972.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v2-0001-Multi-Inserts-in-Create-Table-As.patchapplication/octet-stream; name=v2-0001-Multi-Inserts-in-Create-Table-As.patchDownload
From 9c123880b2221079037d14688f31c556a1c38a5a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 23 Nov 2020 14:03:05 +0530
Subject: [PATCH v2] Multi Inserts in Create Table As.
This could improve the performance and also benefits Create
Materialized View as it uses the code of Create Table As.
---
src/backend/commands/copy.c | 26 +++-----
src/backend/commands/createas.c | 100 ++++++++++++++++++++++++++++--
src/backend/executor/execTuples.c | 66 ++++++++++++++++++++
src/include/access/tableam.h | 15 +++++
src/include/executor/tuptable.h | 2 +-
5 files changed, 185 insertions(+), 24 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 115860a9d4..29fd28051e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -240,18 +240,6 @@ typedef struct
uint64 processed; /* # of tuples processed */
} DR_copy;
-
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list. Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES 1000
-
/*
* Flush buffers if there are >= this many bytes, as counted by the input
* size, of tuples stored.
@@ -264,11 +252,11 @@ typedef struct
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
- uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -2396,7 +2384,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
@@ -2455,8 +2443,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
- if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
- miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+ miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
return true;
return false;
}
@@ -2574,7 +2562,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -2666,7 +2654,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
int nused = buffer->nused;
Assert(buffer != NULL);
- Assert(nused < MAX_BUFFERED_TUPLES);
+ Assert(nused < MAX_MULTI_INSERT_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a310..06d67ba7e4 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -530,15 +537,33 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ memset(myState->mi_slots, 0,
+ sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * bulk inserts and multi inserts memory context as there are no tuples to
+ * insert.
*/
if (!into->skipData)
+ {
myState->bistate = GetBulkInsertState();
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "intorel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
+ }
else
+ {
myState->bistate = NULL;
+ myState->mi_context = NULL;
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -547,6 +572,34 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
}
+/*
+ * intorel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
+{
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+ table_multi_insert(myState->rel,
+ myState->mi_slots,
+ myState->mi_slots_num,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
/*
* intorel_receive --- receive one tuple
*/
@@ -554,9 +607,36 @@ static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
/* Nothing to insert if WITH NO DATA is specified. */
- if (!myState->into->skipData)
+ if (myState->into->skipData)
+ return true;
+
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->rel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ intorel_flush_multi_insert(myState);
+ }
+ else
{
/*
* Note that the input slot might not be of the type of the target
@@ -585,12 +665,24 @@ static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
- IntoClause *into = myState->into;
- if (!into->skipData)
+ if (!myState->into->skipData)
{
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ intorel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
+
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
+
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
ExecDropSingleTupleTableSlot(tstate->slot);
pfree(tstate);
}
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_VIRTUAL(slot))
+ {
+ /* Size calculation is inspired from tts_virtual_materialize(). */
+ TupleDesc desc = slot->tts_tupleDescriptor;
+
+ for (int natt = 0; natt < desc->natts; natt++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, natt);
+ Datum val;
+
+ if (att->attbyval)
+ sz += att->attlen;
+
+ if (slot->tts_isnull[natt])
+ continue;
+
+ val = slot->tts_values[natt];
+
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz = att_addlength_datum(sz, att->attlen, val);
+ }
+
+ /*
+ * We are not interested in proceeding further if the computed size
+ * crosses maxsize limit that we are looking for.
+ */
+ if (maxsize != 0 && sz >= maxsize)
+ break;
+ }
+ }
+
+ if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+ sz = tuple->t_len;
+
+ return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
/* Follow update chain and lock latest version of tuple */
#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION (1 << 1)
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES 65535
/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
#ifndef FRONTEND
--
2.25.1
On 23/11/2020 11:15, Bharath Rupireddy wrote:
Attaching v2 patch, rebased on the latest master 17958972.
I just broke this again with commit c532d15ddd to split up copy.c.
Here's another rebased version.
- Heikki
Attachments:
v3-0001-Multi-Inserts-in-Create-Table-As.patchtext/x-patch; charset=UTF-8; name=v3-0001-Multi-Inserts-in-Create-Table-As.patchDownload
From dca55175c590914f6a802ec3d36e20db55a3e3c7 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 23 Nov 2020 14:03:05 +0530
Subject: [PATCH v3 1/1] Multi Inserts in Create Table As.
This could improve the performance and also benefits Create
Materialized View as it uses the code of Create Table As.
---
src/backend/commands/copyfrom.c | 31 +++------
src/backend/commands/createas.c | 100 ++++++++++++++++++++++++++++--
src/backend/executor/execTuples.c | 66 ++++++++++++++++++++
src/include/access/tableam.h | 15 +++++
src/include/executor/tuptable.h | 2 +-
5 files changed, 185 insertions(+), 29 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb0..a721600ade4 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -44,34 +44,17 @@
#include "utils/rel.h"
#include "utils/snapmgr.h"
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list. Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES 1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES 65535
-
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
- uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -214,7 +197,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
@@ -273,8 +256,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
- if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
- miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+ miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
return true;
return false;
}
@@ -392,7 +375,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -484,7 +467,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
int nused = buffer->nused;
Assert(buffer != NULL);
- Assert(nused < MAX_BUFFERED_TUPLES);
+ Assert(nused < MAX_MULTI_INSERT_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a3106..06d67ba7e4b 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -530,15 +537,33 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ memset(myState->mi_slots, 0,
+ sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * bulk inserts and multi inserts memory context as there are no tuples to
+ * insert.
*/
if (!into->skipData)
+ {
myState->bistate = GetBulkInsertState();
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "intorel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
+ }
else
+ {
myState->bistate = NULL;
+ myState->mi_context = NULL;
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -547,6 +572,34 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
}
+/*
+ * intorel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
+{
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+ table_multi_insert(myState->rel,
+ myState->mi_slots,
+ myState->mi_slots_num,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
/*
* intorel_receive --- receive one tuple
*/
@@ -554,9 +607,36 @@ static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
/* Nothing to insert if WITH NO DATA is specified. */
- if (!myState->into->skipData)
+ if (myState->into->skipData)
+ return true;
+
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->rel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ intorel_flush_multi_insert(myState);
+ }
+ else
{
/*
* Note that the input slot might not be of the type of the target
@@ -585,12 +665,24 @@ static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
- IntoClause *into = myState->into;
- if (!into->skipData)
+ if (!myState->into->skipData)
{
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ intorel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
+
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
+
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236f..5ee41351512 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
ExecDropSingleTupleTableSlot(tstate->slot);
pfree(tstate);
}
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_VIRTUAL(slot))
+ {
+ /* Size calculation is inspired from tts_virtual_materialize(). */
+ TupleDesc desc = slot->tts_tupleDescriptor;
+
+ for (int natt = 0; natt < desc->natts; natt++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, natt);
+ Datum val;
+
+ if (att->attbyval)
+ sz += att->attlen;
+
+ if (slot->tts_isnull[natt])
+ continue;
+
+ val = slot->tts_values[natt];
+
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz = att_addlength_datum(sz, att->attlen, val);
+ }
+
+ /*
+ * We are not interested in proceeding further if the computed size
+ * crosses maxsize limit that we are looking for.
+ */
+ if (maxsize != 0 && sz >= maxsize)
+ break;
+ }
+ }
+
+ if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+ sz = tuple->t_len;
+
+ return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61a..087aabe880b 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
/* Follow update chain and lock latest version of tuple */
#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION (1 << 1)
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES 65535
/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5abd..4336cc8ec83 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
#ifndef FRONTEND
--
2.20.1
On Mon, Nov 23, 2020 at 3:26 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
On 23/11/2020 11:15, Bharath Rupireddy wrote:
Attaching v2 patch, rebased on the latest master 17958972.
I just broke this again with commit c532d15ddd to split up copy.c.
Here's another rebased version.
Thanks! I noticed that and am about to post a new patch. Anyways,
thanks for the rebased v3 patch. Attaching here v3 again for
visibility.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v3-0001-Multi-Inserts-in-Create-Table-As.patchapplication/x-patch; name=v3-0001-Multi-Inserts-in-Create-Table-As.patchDownload
From 7795a83a2485440e01dfb432c23c362c37a2a32b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 23 Nov 2020 15:36:36 +0530
Subject: [PATCH v3] Multi Inserts in Create Table As.
This could improve the performance and also benefits Create
Materialized View as it uses the code of Create Table As.
---
src/backend/commands/copyfrom.c | 31 +++------
src/backend/commands/createas.c | 100 ++++++++++++++++++++++++++++--
src/backend/executor/execTuples.c | 66 ++++++++++++++++++++
src/include/access/tableam.h | 15 +++++
src/include/executor/tuptable.h | 2 +-
5 files changed, 185 insertions(+), 29 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb..a721600ade 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -44,34 +44,17 @@
#include "utils/rel.h"
#include "utils/snapmgr.h"
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list. Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES 1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES 65535
-
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
- uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -214,7 +197,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
@@ -273,8 +256,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
- if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
- miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+ miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
return true;
return false;
}
@@ -392,7 +375,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -484,7 +467,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
int nused = buffer->nused;
Assert(buffer != NULL);
- Assert(nused < MAX_BUFFERED_TUPLES);
+ Assert(nused < MAX_MULTI_INSERT_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a310..06d67ba7e4 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -530,15 +537,33 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ memset(myState->mi_slots, 0,
+ sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * bulk inserts and multi inserts memory context as there are no tuples to
+ * insert.
*/
if (!into->skipData)
+ {
myState->bistate = GetBulkInsertState();
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "intorel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
+ }
else
+ {
myState->bistate = NULL;
+ myState->mi_context = NULL;
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -547,6 +572,34 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
}
+/*
+ * intorel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
+{
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+ table_multi_insert(myState->rel,
+ myState->mi_slots,
+ myState->mi_slots_num,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
/*
* intorel_receive --- receive one tuple
*/
@@ -554,9 +607,36 @@ static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
/* Nothing to insert if WITH NO DATA is specified. */
- if (!myState->into->skipData)
+ if (myState->into->skipData)
+ return true;
+
+ sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->rel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ intorel_flush_multi_insert(myState);
+ }
+ else
{
/*
* Note that the input slot might not be of the type of the target
@@ -585,12 +665,24 @@ static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
- IntoClause *into = myState->into;
- if (!into->skipData)
+ if (!myState->into->skipData)
{
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ intorel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
+
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
+
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
ExecDropSingleTupleTableSlot(tstate->slot);
pfree(tstate);
}
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_VIRTUAL(slot))
+ {
+ /* Size calculation is inspired from tts_virtual_materialize(). */
+ TupleDesc desc = slot->tts_tupleDescriptor;
+
+ for (int natt = 0; natt < desc->natts; natt++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, natt);
+ Datum val;
+
+ if (att->attbyval)
+ sz += att->attlen;
+
+ if (slot->tts_isnull[natt])
+ continue;
+
+ val = slot->tts_values[natt];
+
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz = att_addlength_datum(sz, att->attlen, val);
+ }
+
+ /*
+ * We are not interested in proceeding further if the computed size
+ * crosses maxsize limit that we are looking for.
+ */
+ if (maxsize != 0 && sz >= maxsize)
+ break;
+ }
+ }
+
+ if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+ sz = tuple->t_len;
+
+ return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
/* Follow update chain and lock latest version of tuple */
#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION (1 << 1)
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES 65535
/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
#ifndef FRONTEND
--
2.25.1
On 23-11-2020 11:23, Bharath Rupireddy wrote:
On Mon, Nov 23, 2020 at 3:26 PM Heikki Linnakangas <hlinnaka@iki.fi> wrote:
On 23/11/2020 11:15, Bharath Rupireddy wrote:
Attaching v2 patch, rebased on the latest master 17958972.
I just broke this again with commit c532d15ddd to split up copy.c.
Here's another rebased version.Thanks! I noticed that and am about to post a new patch. Anyways,
thanks for the rebased v3 patch. Attaching here v3 again for
visibility.With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Hi,
Thanks for reviving the patch! I did unfortunately have to shift my
priorities somewhat and did not find much time to work on open source
things the last week(s).
I'm wondering about the use of the GetTupleSize function. As far as I
understand the idea is to limit the amount of buffered data, presumably
to not write too much data at once for intorel_flush_multi_insert.
If I understood correctly how it all works, the table slot can however
be of different type than the source slot, which makes that the call to
CopySlot() potentially stores a different amount of data than computed
by GetTupleSize(). Not sure if this is a big problem as an estimation
might be good enough?
Some other solutions/implementations would be:
- compute the size after doing CopySlot. Maybe the relation never wants
a virtual tuple and then you can also simplify GetTupleSize?
- after CopySlot ask for the memory consumed in the slot using
MemoryContextMemAllocated.
Some small things to maybe change are:
===========
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ {
+ batchslot = table_slot_create(myState->rel, NULL);
+ myState->mi_slots[myState->mi_slots_num] = batchslot;
+ }
+ else
+ batchslot = myState->mi_slots[myState->mi_slots_num];
Alternative:
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ myState->mi_slots[myState->mi_slots_num] =
table_slot_create(myState->rel, NULL);
+ batchslot = myState->mi_slots[myState->mi_slots_num];
==============
+ sz = att_align_nominal(sz, att->attalign);
This could be moved out of the if statement?
==============
Regards,
Luc
Swarm64
On Wed, Nov 25, 2020 at 2:11 PM Luc Vlaming <luc@swarm64.com> wrote:
Thanks for reviving the patch! I did unfortunately have to shift my
priorities somewhat and did not find much time to work on open source
things the last week(s).
Thanks for the comments.
I'm wondering about the use of the GetTupleSize function. As far as I
understand the idea is to limit the amount of buffered data, presumably
to not write too much data at once for intorel_flush_multi_insert.
If I understood correctly how it all works, the table slot can however
be of different type than the source slot, which makes that the call to
CopySlot() potentially stores a different amount of data than computed
by GetTupleSize(). Not sure if this is a big problem as an estimation
might be good enough?
Yeah. The tuple size may change after ExecCopySlot(). For instance, create
table t2 as select a1 from t1; where t1 has two integer columns a1, b1. I'm
creating t2 with single column a1 from t1 which makes the source slot
virtual.
Source slot is virtual and the size calculated with GetTupleSize() is 8
bytes:
(gdb) p *slot
$18 = {type = T_TupleTableSlot, tts_flags = 16, tts_nvalid = 1,
tts_ops = 0x562c592652c0 <TTSOpsVirtual>,
tts_tupleDescriptor = 0x562c5a0409f0, tts_values = 0x562c5a040b50,
tts_isnull = 0x562c5a040b58, tts_mcxt = 0x562c5a040320, tts_tid = {
ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, tts_tableOid
= 0}
(gdb) call GetTupleSize(slot, 65535)
$24 = 8
After ExecCopySlot(batchslot, slot), destination slot changes to
TTSOpsBufferHeapTuple and the GetTupleSize() gives 28 bytes now.
(gdb) p *batchslot
$19 = {type = T_TupleTableSlot, tts_flags = 20, tts_nvalid = 0,
tts_ops = 0x562c592653e0 <TTSOpsBufferHeapTuple>,
tts_tupleDescriptor = 0x7f063fbeecd0, tts_values = 0x562c5a05daa8,
tts_isnull = 0x562c5a05dab0, tts_mcxt = 0x562c5a040320, tts_tid = {
ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, tts_tableOid
= 0}
(gdb) call GetTupleSize(batchslot, 65535)
$25 = 28
I think your suggestion to call GetTupleSize() on the destination slot
after ExecCopySlot() is right. I changed it in the v4 patch.
Some other solutions/implementations would be:
- compute the size after doing CopySlot. Maybe the relation never wants
a virtual tuple and then you can also simplify GetTupleSize?
I think we need to have TTSOpsVirtual code in GetTupleSize() because
table_slot_create() which gets called before ExecCopySlot() may create
virtual slots for cases such as views and partitioned tables. Though we can
not insert into views or partitioned tables using CTAS, I want
GetTupleSize() to be a generic function. Right now, I can not find other
use cases where GetTupleSize() can be used.
- after CopySlot ask for the memory consumed in the slot using
MemoryContextMemAllocated.
MemoryContextMemAllocated of the slot's tts_mcxt will always have extra
bytes and those extra bytes are way more compared to the actual tuple
bytes. And most of the time, ExecCopySlot() will just point the src slot
tts_mcxt to dest slot tts_mcxt. For instance, for a single row with a
single integer column of 8 bytes, the mem_allocated is 49232 bytes. This is
the reason we can not rely on mem_allocated.
(gdb) p slot->tts_mcxt -----> source slot
$22 = (MemoryContext) 0x562c5a040320
(gdb) p *slot->tts_mcxt
$20 = {type = T_AllocSetContext, isReset = false, allowInCritSection =
false,
*mem_allocated = 49232*, methods = 0x562c5926d560 <AllocSetMethods>,
parent = 0x562c59f97820, firstchild = 0x562c5a042330, prevchild = 0x0,
nextchild = 0x0, name = 0x562c590d3554 "ExecutorState", ident = 0x0,
reset_cbs = 0x0}
(gdb) p batchslot->tts_mcxt -----> destination slot after
ExecCopySlot().
$23 = (MemoryContext) 0x562c5a040320
(gdb) p *batchslot->tts_mcxt
$21 = {type = T_AllocSetContext, isReset = false, allowInCritSection =
false,
*mem_allocated = 49232*, methods = 0x562c5926d560 <AllocSetMethods>,
parent = 0x562c59f97820, firstchild = 0x562c5a042330, prevchild = 0x0,
nextchild = 0x0, name = 0x562c590d3554 "ExecutorState", ident = 0x0,
reset_cbs = 0x0}
Some small things to maybe change are: =========== + if (myState->mi_slots[myState->mi_slots_num] == NULL) + { + batchslot = table_slot_create(myState->rel, NULL); + myState->mi_slots[myState->mi_slots_num] =
batchslot;
+ } + else + batchslot =
myState->mi_slots[myState->mi_slots_num];
Alternative: + if (myState->mi_slots[myState->mi_slots_num] == NULL) + myState->mi_slots[myState->mi_slots_num] = table_slot_create(myState->rel, NULL); + batchslot = myState->mi_slots[myState->mi_slots_num];
Changed.
==============
+ sz = att_align_nominal(sz, att->attalign);
This could be moved out of the if statement?==============
I don't think we can change it. If we were to move it, then sz =
att_addlength_datum(sz, att->attlen, val); which takes aligned sz may have
problems like below:
Say att_align_nominal sets sz to 4 bytes, then att_addlength_datum takes
this 4 bytes adds attlen to it. If we move att_align_nominal(sz,
att->attalign) out, then att_addlength_datum(sz, att->attlen, val) will not
consider the aligned bytes. We might have to add up the aligned bytes
separately for the else case. And also note that this code is derived from
ts_virtual_materialize(), where we have the att_align_nominal inside both
if and else blocks. I may be wrong here.
Attaching v4 patch. Consider it for further review.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Attachments:
v4-0001-Multi-Inserts-in-Create-Table-As.patchapplication/octet-stream; name=v4-0001-Multi-Inserts-in-Create-Table-As.patchDownload
From 12b5ed7319a0c44eab16beb7f116a4d77902ed4f Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Thu, 26 Nov 2020 06:39:51 +0530
Subject: [PATCH v4] Multi Inserts in Create Table As.
This could improve the performance and also benefits Create
Materialized View as it uses the code of Create Table As.
---
src/backend/commands/copyfrom.c | 31 ++-------
src/backend/commands/createas.c | 102 ++++++++++++++++++++++++++++--
src/backend/executor/execTuples.c | 66 +++++++++++++++++++
src/include/access/tableam.h | 15 +++++
src/include/executor/tuptable.h | 2 +-
5 files changed, 187 insertions(+), 29 deletions(-)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb..a721600ade 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -44,34 +44,17 @@
#include "utils/rel.h"
#include "utils/snapmgr.h"
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list. Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES 1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES 65535
-
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
- TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+ TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
BulkInsertState bistate; /* BulkInsertState for this rel */
int nused; /* number of 'slots' containing tuples */
- uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
+ uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy
* stream */
} CopyMultiInsertBuffer;
@@ -214,7 +197,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
CopyMultiInsertBuffer *buffer;
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
- memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+ memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
buffer->resultRelInfo = rri;
buffer->bistate = GetBulkInsertState();
buffer->nused = 0;
@@ -273,8 +256,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
static inline bool
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
{
- if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
- miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+ if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+ miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
return true;
return false;
}
@@ -392,7 +375,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
- for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -484,7 +467,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
int nused = buffer->nused;
Assert(buffer != NULL);
- Assert(nused < MAX_BUFFERED_TUPLES);
+ Assert(nused < MAX_MULTI_INSERT_TUPLES);
if (buffer->slots[nused] == NULL)
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a310..d0480f1990 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
CommandId output_cid; /* cmin to insert in output tuples */
int ti_options; /* table_tuple_insert performance options */
BulkInsertState bistate; /* bulk insert state */
+ MemoryContext mi_context; /* A temporary memory context for multi insert */
+ /* Buffered slots for a multi insert batch. */
+ TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+ /* Number of current buffered slots for a multi insert batch. */
+ int mi_slots_num;
+ /* Total tuple size for a multi insert batch. */
+ int mi_slots_size;
} DR_intorel;
/* utility functions for CTAS definition creation */
@@ -530,15 +537,33 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
myState->reladdr = intoRelationAddr;
myState->output_cid = GetCurrentCommandId(true);
myState->ti_options = TABLE_INSERT_SKIP_FSM;
+ memset(myState->mi_slots, 0,
+ sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
/*
* If WITH NO DATA is specified, there is no need to set up the state for
- * bulk inserts as there are no tuples to insert.
+ * bulk inserts and multi inserts memory context as there are no tuples to
+ * insert.
*/
if (!into->skipData)
+ {
myState->bistate = GetBulkInsertState();
+
+ /*
+ * Create a temporary memory context so that we can reset once per
+ * multi insert batch.
+ */
+ myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+ "intorel_multi_insert",
+ ALLOCSET_DEFAULT_SIZES);
+ }
else
+ {
myState->bistate = NULL;
+ myState->mi_context = NULL;
+ }
/*
* Valid smgr_targblock implies something already wrote to the relation.
@@ -547,6 +572,34 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
}
+/*
+ * intorel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
+{
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+ table_multi_insert(myState->rel,
+ myState->mi_slots,
+ myState->mi_slots_num,
+ myState->output_cid,
+ myState->ti_options,
+ myState->bistate);
+
+ MemoryContextReset(myState->mi_context);
+ MemoryContextSwitchTo(oldcontext);
+
+ for (i = 0; i < myState->mi_slots_num; i++)
+ ExecClearTuple(myState->mi_slots[i]);
+
+ myState->mi_slots_num = 0;
+ myState->mi_slots_size = 0;
+}
+
/*
* intorel_receive --- receive one tuple
*/
@@ -554,9 +607,38 @@ static bool
intorel_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
+ TupleTableSlot *batchslot;
+ Size sz = 0;
/* Nothing to insert if WITH NO DATA is specified. */
- if (!myState->into->skipData)
+ if (myState->into->skipData)
+ return true;
+
+ if (myState->mi_slots[myState->mi_slots_num] == NULL)
+ myState->mi_slots[myState->mi_slots_num] =
+ table_slot_create(myState->rel, NULL);
+
+ batchslot = myState->mi_slots[myState->mi_slots_num];
+
+ ExecCopySlot(batchslot, slot);
+
+ /*
+ * Calculate the tuple size after the original slot is copied. Because the
+ * copied slot type and the tuple size may change.
+ */
+ sz = GetTupleSize(batchslot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+ /* In case the computed tuple size is 0, we go for single inserts. */
+ if (sz != 0)
+ {
+ myState->mi_slots_num++;
+ myState->mi_slots_size += sz;
+
+ if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+ myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+ intorel_flush_multi_insert(myState);
+ }
+ else
{
/*
* Note that the input slot might not be of the type of the target
@@ -585,12 +667,24 @@ static void
intorel_shutdown(DestReceiver *self)
{
DR_intorel *myState = (DR_intorel *) self;
- IntoClause *into = myState->into;
- if (!into->skipData)
+ if (!myState->into->skipData)
{
+ int i;
+
+ if (myState->mi_slots_num != 0)
+ intorel_flush_multi_insert(myState);
+
+ for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+ ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
+
FreeBulkInsertState(myState->bistate);
table_finish_bulk_insert(myState->rel, myState->ti_options);
+
+ if (myState->mi_context)
+ MemoryContextDelete(myState->mi_context);
+
+ myState->mi_context = NULL;
}
/* close rel, but keep lock until commit */
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
ExecDropSingleTupleTableSlot(tstate->slot);
pfree(tstate);
}
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_VIRTUAL(slot))
+ {
+ /* Size calculation is inspired from tts_virtual_materialize(). */
+ TupleDesc desc = slot->tts_tupleDescriptor;
+
+ for (int natt = 0; natt < desc->natts; natt++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, natt);
+ Datum val;
+
+ if (att->attbyval)
+ sz += att->attlen;
+
+ if (slot->tts_isnull[natt])
+ continue;
+
+ val = slot->tts_values[natt];
+
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ sz = att_align_nominal(sz, att->attalign);
+ sz = att_addlength_datum(sz, att->attlen, val);
+ }
+
+ /*
+ * We are not interested in proceeding further if the computed size
+ * crosses maxsize limit that we are looking for.
+ */
+ if (maxsize != 0 && sz >= maxsize)
+ break;
+ }
+ }
+
+ if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+ sz = tuple->t_len;
+
+ return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
/* Follow update chain and lock latest version of tuple */
#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION (1 << 1)
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES 1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES 65535
/* Typedef for callback function for table_index_build_scan */
typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
#ifndef FRONTEND
--
2.25.1
On Thu, Nov 26, 2020 at 07:24:01AM +0530, Bharath Rupireddy wrote:
Yeah. The tuple size may change after ExecCopySlot(). For instance, create
table t2 as select a1 from t1; where t1 has two integer columns a1, b1. I'm
creating t2 with single column a1 from t1 which makes the source slot
virtual.
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+ Size sz = 0;
+ HeapTuple tuple = NULL;
+
+ if (TTS_IS_HEAPTUPLE(slot))
+ tuple = ((HeapTupleTableSlot *) slot)->tuple;
+ else if(TTS_IS_BUFFERTUPLE(slot))
+ tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+ else if(TTS_IS_MINIMALTUPLE(slot))
+ tuple = ((MinimalTupleTableSlot *) slot)->tuple;
There have been various talks about the methods we could use to
evaluate the threshold in bytes when evaluating that a flush can
happen, including the use of memory contexts, or even estimate the
size of the number of tuples. This one looks promising because it
seems exact, however for virtual slots I don't like much the fact that
you basically just extracted the parts of tts_virtual_materialize()
and stuck them in this routine. That's a recipe for future bugs if
the materialization logic changes. In short, I am surprised that this
calculation is not directly part of TupleTableSlotOps. What we'd want
is to get this information depending on the slot type dealt with, and
with your patch you would miss to handle any new slot type
introduced.
--
Michael
On Thu, Nov 26, 2020 at 9:55 AM Michael Paquier <michael@paquier.xyz> wrote:
+inline Size +GetTupleSize(TupleTableSlot *slot, Size maxsize) +{ + Size sz = 0; + HeapTuple tuple = NULL; + + if (TTS_IS_HEAPTUPLE(slot)) + tuple = ((HeapTupleTableSlot *) slot)->tuple; + else if(TTS_IS_BUFFERTUPLE(slot)) + tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple; + else if(TTS_IS_MINIMALTUPLE(slot)) + tuple = ((MinimalTupleTableSlot *) slot)->tuple;There have been various talks about the methods we could use to
evaluate the threshold in bytes when evaluating that a flush can
happen, including the use of memory contexts, or even estimate the
size of the number of tuples. This one looks promising because it
seems exact, however for virtual slots I don't like much the fact that
you basically just extracted the parts of tts_virtual_materialize()
and stuck them in this routine. That's a recipe for future bugs if
the materialization logic changes. In short, I am surprised that this
calculation is not directly part of TupleTableSlotOps. What we'd want
is to get this information depending on the slot type dealt with, and
with your patch you would miss to handle any new slot type
introduced.
Yes for virtual slots, I reused the code from
tts_virtual_materialize() in GetTupleSize(). I can think of below
options:
1) Make the size calculation code for virtual slots, a macro or a
static inline function and use that in tts_virtual_materialize() and
GetTupleSize().
2) Add comments in both the places, such as "if any code is changed
here, consider changing it in tts_virtual_materialize() /
GetTupleSize()"
3) Add a size variable to TupleTableSlotOps structure.
4) Add a new API to TupleTableSlotOps structure say get_slot_size().
5) For new slot types, maybe we can have comments in tuptable.h to
consider having equivalent change in GetTupleSize().
If we go with 3 and 4, will it be acceptable to add the extra code in
generic structure which gets used in most of the code base and use
that new code only in limited places (for multi inserts in CTAS and
Refresh Mat View)? I think we can go ahead with 2 and 5. Thoughts?
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On 26-11-2020 07:31, Bharath Rupireddy wrote:
On Thu, Nov 26, 2020 at 9:55 AM Michael Paquier <michael@paquier.xyz> wrote:
+inline Size +GetTupleSize(TupleTableSlot *slot, Size maxsize) +{ + Size sz = 0; + HeapTuple tuple = NULL; + + if (TTS_IS_HEAPTUPLE(slot)) + tuple = ((HeapTupleTableSlot *) slot)->tuple; + else if(TTS_IS_BUFFERTUPLE(slot)) + tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple; + else if(TTS_IS_MINIMALTUPLE(slot)) + tuple = ((MinimalTupleTableSlot *) slot)->tuple;There have been various talks about the methods we could use to
evaluate the threshold in bytes when evaluating that a flush can
happen, including the use of memory contexts, or even estimate the
size of the number of tuples. This one looks promising because it
seems exact, however for virtual slots I don't like much the fact that
you basically just extracted the parts of tts_virtual_materialize()
and stuck them in this routine. That's a recipe for future bugs if
the materialization logic changes. In short, I am surprised that this
calculation is not directly part of TupleTableSlotOps. What we'd want
is to get this information depending on the slot type dealt with, and
with your patch you would miss to handle any new slot type
introduced.Yes for virtual slots, I reused the code from
tts_virtual_materialize() in GetTupleSize(). I can think of below
options:1) Make the size calculation code for virtual slots, a macro or a
static inline function and use that in tts_virtual_materialize() and
GetTupleSize().
2) Add comments in both the places, such as "if any code is changed
here, consider changing it in tts_virtual_materialize() /
GetTupleSize()"
3) Add a size variable to TupleTableSlotOps structure.
4) Add a new API to TupleTableSlotOps structure say get_slot_size().
5) For new slot types, maybe we can have comments in tuptable.h to
consider having equivalent change in GetTupleSize().If we go with 3 and 4, will it be acceptable to add the extra code in
generic structure which gets used in most of the code base and use
that new code only in limited places (for multi inserts in CTAS and
Refresh Mat View)? I think we can go ahead with 2 and 5. Thoughts?With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
What I'm wondering about is the reason for wanting a cap on data volume.
When doing some local (highly concurrent) ingest speed tests a few weeks
ago it seemed to mostly matter how many pages were being written and the
resulting pressure on locks, etc. and not necessarily so much the actual
memory usage. I didn't collect proof on that though (yet). There was
however a very clearly observable contention point where with bigger
buffers the performance would not only stagnate but actually drop.
So what I'm kinda wondering is if we should worry more about the amount
of pages that are going to be written and maybe not so much about the
memory usage?
If this were to be the case then maybe we can consider improving the
current design, potentially in a follow-up patch? The problem I see is
that generically each tableam will have different choices to make on how
to buffer and flush multiple rows, given that a storage engine might
have more or less write amplification, a different way of extending a
relation, fsm use, etc.
Assuming we indeed want a per-tableam implementation, we could either:
- make multi_insert buffer the tuples itself and add a flush_multi_insert.
- add a new function called create_multi_insert which returns something
like a MultiInsertState, which, like a destreceiver, has a set of
callbacks to start, shutdown and insert.
With both solutions one part that to me seems appealing is that we
buffer the data in something that likely resembles the disk format very
much. Thoughts?
Regards,
Luc
Swarm64
On Thu, Nov 26, 2020 at 12:25 PM Luc Vlaming <luc@swarm64.com> wrote:
What I'm wondering about is the reason for wanting a cap on data volume.
When doing some local (highly concurrent) ingest speed tests a few weeks
ago it seemed to mostly matter how many pages were being written and the
resulting pressure on locks, etc. and not necessarily so much the actual
memory usage. I didn't collect proof on that though (yet). There was
however a very clearly observable contention point where with bigger
buffers the performance would not only stagnate but actually drop.So what I'm kinda wondering is if we should worry more about the amount
of pages that are going to be written and maybe not so much about the
memory usage?If this were to be the case then maybe we can consider improving the
current design, potentially in a follow-up patch? The problem I see is
that generically each tableam will have different choices to make on how
to buffer and flush multiple rows, given that a storage engine might
have more or less write amplification, a different way of extending a
relation, fsm use, etc.
Assuming we indeed want a per-tableam implementation, we could either:
- make multi_insert buffer the tuples itself and add a flush_multi_insert.
- add a new function called create_multi_insert which returns something
like a MultiInsertState, which, like a destreceiver, has a set of
callbacks to start, shutdown and insert.With both solutions one part that to me seems appealing is that we
buffer the data in something that likely resembles the disk format very
much. Thoughts?
IMHO, I would like to go with your option 1 i.e. add a few APIs to the
TableAmRoutine structure. Advantage is that we could use these APIs in
at least 3 places, without much code duplication: 1) COPY 2) CTAS and
3) Refresh Materialized View. I could roughly sketch the APIs in below
way:
typedef struct MultiInsertStateData
{
MemoryContext micontext; /* A temporary memory context for
multi insert. */
BulkInsertStateData *bistate; /* Bulk insert state. */
TupleTableSlot **mislots; /* Array of buffered slots. */
uint32 nslots; /* Total number of buffered slots. */
uint64 nbytes; /* Flush buffers if the total tuple
size >= nbytes. */
int32 nused; /* Number of current buffered slots for
a multi insert batch. */
int64 nsize; /* Total tuple size for a multi insert
batch. */
} MultiInsertStateData;
/* Creates a temporary memory context, allocates the
MultiInsertStateData, BulkInsertStateData and initializes other
members. */
void (*begin_multi_insert) (Relation rel,
MultiInsertStateData **mistate, uint32 nslots, uint64 nbytes);
/* Buffers the input slot into mistate slots, computes the size of the
tuple, and adds it to the total tuple size of the buffered tuples, if
this size crosses mistate->nbytes, flush the buffered tuples into
table. For heapam, existing heap_multi_insert can be used. Once the
buffer is flushed, then micontext can be reset and buffered slots can
be cleared. */
void (*do_multi_insert) (Relation rel, MultiInsertStateData
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);
/* Flush the buffered tuples if any. For heapam, existing
heap_multi_insert can be used. Deletes temporary memory context and
deallocates mistate. */
void (*end_multi_insert) (Relation rel,
MultiInsertStateData *mistate, CommandId cid, int options);
Thoughts?
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Few things:
IIUC Andres mentioned similar kinds of APIs earlier in [1]/messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de.
[1]: /messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
/messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
I would like to add some more info to one of the API:
typedef struct MultiInsertStateData
{
MemoryContext micontext; /* A temporary memory context for
multi insert. */
BulkInsertStateData *bistate; /* Bulk insert state. */
TupleTableSlot **mislots; /* Array of buffered slots. */
uint32 nslots; /* Total number of buffered slots. */
int64 nbytes; /* Flush buffers if the total tuple size >=
nbytes. */
int32 nused; /* Number of current buffered slots for a
multi insert batch. */
int64 nsize; /* Total tuple size for a multi insert batch.
*/
} MultiInsertStateData;
/* Creates a temporary memory context, allocates the MultiInsertStateData,
BulkInsertStateData and initializes other members. */
void (*begin_multi_insert) (Relation rel, MultiInsertStateData
**mistate, uint32 nslots, uint64 nbytes);
/* Buffers the input slot into mistate slots, computes the size of the
tuple, and adds it total buffer tuple size, if this size crosses
mistate->nbytes, flush the buffered tuples into table. For heapam, existing
heap_multi_insert can be used. Once the buffer is flushed, then the
micontext can be reset and buffered slots can be cleared. *If nbytes i.e.
total tuple size of the batch is not given, tuple size is not calculated,
tuples are buffered until all the nslots are filled and then flushed.* */
void (*do_multi_insert) (Relation rel, MultiInsertStateData
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);
/* Flush the buffered tuples if any. For heapam, existing heap_multi_insert
can be used. Deletes temporary memory context and deallocates mistate. */
void (*end_multi_insert) (Relation rel, MultiInsertStateData
*mistate, CommandId cid, int options);
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On 26-11-2020 12:36, Bharath Rupireddy wrote:
Few things:
IIUC Andres mentioned similar kinds of APIs earlier in [1].
[1] -
/messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
</messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de>I would like to add some more info to one of the API:
typedef struct MultiInsertStateData
{
MemoryContext micontext; /* A temporary memory context for
multi insert. */
BulkInsertStateData *bistate; /* Bulk insert state. */
TupleTableSlot **mislots; /* Array of buffered slots. */
uint32 nslots; /* Total number of buffered slots. */
int64 nbytes; /* Flush buffers if the total tuple size= nbytes. */
int32 nused; /* Number of current buffered slots for a
multi insert batch. */
int64 nsize; /* Total tuple size for a multi insert
batch. */
} MultiInsertStateData;/* Creates a temporary memory context, allocates the
MultiInsertStateData, BulkInsertStateData and initializes other members. */
void (*begin_multi_insert) (Relation rel,
MultiInsertStateData **mistate, uint32 nslots, uint64 nbytes);/* Buffers the input slot into mistate slots, computes the size of the
tuple, and adds it total buffer tuple size, if this size crosses
mistate->nbytes, flush the buffered tuples into table. For heapam,
existing heap_multi_insert can be used. Once the buffer is flushed, then
the micontext can be reset and buffered slots can be cleared. *If nbytes
i.e. total tuple size of the batch is not given, tuple size is not
calculated, tuples are buffered until all the nslots are filled and then
flushed.* */
void (*do_multi_insert) (Relation rel, MultiInsertStateData
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);/* Flush the buffered tuples if any. For heapam, existing
heap_multi_insert can be used. Deletes temporary memory context and
deallocates mistate. */
void (*end_multi_insert) (Relation rel, MultiInsertStateData
*mistate, CommandId cid, int options);With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com <http://www.enterprisedb.com>
Looks all good to me, except for the nbytes part.
Could you explain to me what use case that supports? IMHO the tableam
can best decide itself that its time to flush, based on its
implementation that e.g. considers how many pages to flush at a time and
such, etc? This means also that most of the fields of
MultiInsertStateData can be private as each tableam would return a
derivative of that struct (like with the destreceivers).
One thing I'm wondering is in which memory context the slots end up
being allocated. I'd assume we would want to keep the slots around
between flushes. If they are in the temporary context this might prove
problematic however?
Regards,
Luc
On Thu, Nov 26, 2020 at 5:34 PM Luc Vlaming <luc@swarm64.com> wrote:
On 26-11-2020 12:36, Bharath Rupireddy wrote:
Few things:
IIUC Andres mentioned similar kinds of APIs earlier in [1].
[1] -
/messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
</messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de>I would like to add some more info to one of the API:
typedef struct MultiInsertStateData
{
MemoryContext micontext; /* A temporary memory context for
multi insert. */
BulkInsertStateData *bistate; /* Bulk insert state. */
TupleTableSlot **mislots; /* Array of buffered slots. */
uint32 nslots; /* Total number of buffered slots. */
int64 nbytes; /* Flush buffers if the total tuple size= nbytes. */
int32 nused; /* Number of current buffered slots for a
multi insert batch. */
int64 nsize; /* Total tuple size for a multi insert
batch. */
} MultiInsertStateData;/* Creates a temporary memory context, allocates the
MultiInsertStateData, BulkInsertStateData and initializes other members. */
void (*begin_multi_insert) (Relation rel,
MultiInsertStateData **mistate, uint32 nslots, uint64 nbytes);/* Buffers the input slot into mistate slots, computes the size of the
tuple, and adds it total buffer tuple size, if this size crosses
mistate->nbytes, flush the buffered tuples into table. For heapam,
existing heap_multi_insert can be used. Once the buffer is flushed, then
the micontext can be reset and buffered slots can be cleared. *If nbytes
i.e. total tuple size of the batch is not given, tuple size is not
calculated, tuples are buffered until all the nslots are filled and then
flushed.* */
void (*do_multi_insert) (Relation rel, MultiInsertStateData
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);/* Flush the buffered tuples if any. For heapam, existing
heap_multi_insert can be used. Deletes temporary memory context and
deallocates mistate. */
void (*end_multi_insert) (Relation rel, MultiInsertStateData
*mistate, CommandId cid, int options);Looks all good to me, except for the nbytes part.
Could you explain to me what use case that supports? IMHO the tableam
can best decide itself that its time to flush, based on its
implementation that e.g. considers how many pages to flush at a time and
such, etc? This means also that most of the fields of
MultiInsertStateData can be private as each tableam would return a
derivative of that struct (like with the destreceivers).
nbytes is basically to support the following case, say the number of
tuples to buffer is 1000, and if all the tuples are toasted with size
in few hundred MB or even GB, then do we want to wait until 1000
tuples are buffered in which case we occupy for one query 1000*toasted
tuple size in GB. So, if we have a memory limit, then it will give
flexibility. Whether to use it or not is up to the table AM
implementation. And also that existing copy code(since it can know the
tuple size after parsing input data) uses this mechanism to decide
when to flush.
If the nbytes is not used in a table am, then the multi insert can
wait until the total tuples, how much ever large memory they occupy,
are buffered.
IMO, we can retain nbytes for now to decide on when to flush. Thoughts?
I wonder, how can the do_multi_insert() API decide on when to flush, I
mean, based on the number of pages to flush? Do we need to pass the
maximum number of pages the buffered tuples can occupy and track the
pages currently buffered tuples occupy to decide when to flush? Or is
it something that the existing table AM infrastructure already
supports? If we use the number of pages to decide on when to flush,
how well it works with parallel inserts?
One thing I'm wondering is in which memory context the slots end up
being allocated. I'd assume we would want to keep the slots around
between flushes. If they are in the temporary context this might prove
problematic however?
I should not have used the word temporary, it actually is not
temporary. This memory conext will be created in begin_multi_insert(),
all the buffered tuples are copied using this context, it will be
reset at the end of each flush and reused. It can get destroyed at the
end in end_multi_insert(). I think we should even do this with the new
APIs implementation.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On 26-11-2020 14:45, Bharath Rupireddy wrote:
On Thu, Nov 26, 2020 at 5:34 PM Luc Vlaming <luc@swarm64.com> wrote:
On 26-11-2020 12:36, Bharath Rupireddy wrote:
Few things:
IIUC Andres mentioned similar kinds of APIs earlier in [1].
[1] -
/messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
</messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de>I would like to add some more info to one of the API:
typedef struct MultiInsertStateData
{
MemoryContext micontext; /* A temporary memory context for
multi insert. */
BulkInsertStateData *bistate; /* Bulk insert state. */
TupleTableSlot **mislots; /* Array of buffered slots. */
uint32 nslots; /* Total number of buffered slots. */
int64 nbytes; /* Flush buffers if the total tuple size= nbytes. */
int32 nused; /* Number of current buffered slots for a
multi insert batch. */
int64 nsize; /* Total tuple size for a multi insert
batch. */
} MultiInsertStateData;/* Creates a temporary memory context, allocates the
MultiInsertStateData, BulkInsertStateData and initializes other members. */
void (*begin_multi_insert) (Relation rel,
MultiInsertStateData **mistate, uint32 nslots, uint64 nbytes);/* Buffers the input slot into mistate slots, computes the size of the
tuple, and adds it total buffer tuple size, if this size crosses
mistate->nbytes, flush the buffered tuples into table. For heapam,
existing heap_multi_insert can be used. Once the buffer is flushed, then
the micontext can be reset and buffered slots can be cleared. *If nbytes
i.e. total tuple size of the batch is not given, tuple size is not
calculated, tuples are buffered until all the nslots are filled and then
flushed.* */
void (*do_multi_insert) (Relation rel, MultiInsertStateData
*mistate, struct TupleTableSlot *slot, CommandId cid, int options);/* Flush the buffered tuples if any. For heapam, existing
heap_multi_insert can be used. Deletes temporary memory context and
deallocates mistate. */
void (*end_multi_insert) (Relation rel, MultiInsertStateData
*mistate, CommandId cid, int options);Looks all good to me, except for the nbytes part.
Could you explain to me what use case that supports? IMHO the tableam
can best decide itself that its time to flush, based on its
implementation that e.g. considers how many pages to flush at a time and
such, etc? This means also that most of the fields of
MultiInsertStateData can be private as each tableam would return a
derivative of that struct (like with the destreceivers).nbytes is basically to support the following case, say the number of
tuples to buffer is 1000, and if all the tuples are toasted with size
in few hundred MB or even GB, then do we want to wait until 1000
tuples are buffered in which case we occupy for one query 1000*toasted
tuple size in GB. So, if we have a memory limit, then it will give
flexibility. Whether to use it or not is up to the table AM
implementation. And also that existing copy code(since it can know the
tuple size after parsing input data) uses this mechanism to decide
when to flush.If the nbytes is not used in a table am, then the multi insert can
wait until the total tuples, how much ever large memory they occupy,
are buffered.IMO, we can retain nbytes for now to decide on when to flush. Thoughts?
I'm very sorry I had not realized at all that the toasted data would be
kept in memory until written out. I guess I'm not familiar enough with
that part yet. I assumed this would be toasted beforehand and be tableam
agnostic, and that any decision from the tableam to flush would happen
way before a lot memory would have accumulated, which is a bit naive in
hindsight.
I wonder, how can the do_multi_insert() API decide on when to flush, I
mean, based on the number of pages to flush? Do we need to pass the
maximum number of pages the buffered tuples can occupy and track the
pages currently buffered tuples occupy to decide when to flush? Or is
it something that the existing table AM infrastructure already
supports? If we use the number of pages to decide on when to flush,
how well it works with parallel inserts?
I was assuming each tableam to use its own logic, based on its needs and
the tradeoffs a storage engine might want to provide. This does not mean
it should not consider outside parameters, like the aforementioned
memory usage.
I think it would imply that each tableam implements its own tracking
mechanism for how much has accumulated, how, and when to flush, because
they might track different statistics. IMHO given that each tableam
anyway would want to implement its own logic on how to store a slot into
a page, tracking the logic for tracking these statistics seemed minor to
me. Maybe I missed some parts that should be extracted out to a generic
interface however?
Some examples of why a tableam could decide on its own on when to flush:
- the current heap implementation could accumulate a few pages (say up
to 64) and thereby limit the amount of calls to write() and limit the
accompanying blocks/context switches. This would also then make the
writes more sequential wrt the processes which can help with the
flushing I presume, like how the sequential scan was optimized to
process a consequtive set of blocks per worker (see
table_block_parallelscan_nextpage).
- something like zheap could accumulate data based on the amount of
columns so that a page with column data is completely filled, thereby
limiting the write amplification.
- something that would implement an lsm storage might accumulate a full
in-memory level before flushing it out.
One thing I'm wondering is in which memory context the slots end up
being allocated. I'd assume we would want to keep the slots around
between flushes. If they are in the temporary context this might prove
problematic however?I should not have used the word temporary, it actually is not
temporary. This memory conext will be created in begin_multi_insert(),
all the buffered tuples are copied using this context, it will be
reset at the end of each flush and reused. It can get destroyed at the
end in end_multi_insert(). I think we should even do this with the new
APIs implementation.With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Okay. But in which context are the slots themselves allocated then?
Because if we allocate the slots themselves in the context and then
reset with each flush we would have to also re-allocate the slots every
flush, which seems wasteful to me?
Regards,
Luc
Swarm64
On Fri, Nov 27, 2020 at 12:22 PM Luc Vlaming <luc@swarm64.com> wrote:
I wonder, how can the do_multi_insert() API decide on when to flush, I
mean, based on the number of pages to flush? Do we need to pass the
maximum number of pages the buffered tuples can occupy and track the
pages currently buffered tuples occupy to decide when to flush? Or is
it something that the existing table AM infrastructure already
supports? If we use the number of pages to decide on when to flush,
how well it works with parallel inserts?I was assuming each tableam to use its own logic, based on its needs and
the tradeoffs a storage engine might want to provide. This does not mean
it should not consider outside parameters, like the aforementioned
memory usage.
I think it would imply that each tableam implements its own tracking
mechanism for how much has accumulated, how, and when to flush, because
they might track different statistics. IMHO given that each tableam
anyway would want to implement its own logic on how to store a slot into
a page, tracking the logic for tracking these statistics seemed minor to
me. Maybe I missed some parts that should be extracted out to a generic
interface however?
Agree with you that tracking page level or some other info is
dependent on table am implementations.
Some examples of why a tableam could decide on its own on when to flush:
- the current heap implementation could accumulate a few pages (say up
to 64) and thereby limit the amount of calls to write() and limit the
accompanying blocks/context switches. This would also then make the
writes more sequential wrt the processes which can help with the
flushing I presume, like how the sequential scan was optimized to
process a consequtive set of blocks per worker (see
table_block_parallelscan_nextpage).
- something like zheap could accumulate data based on the amount of
columns so that a page with column data is completely filled, thereby
limiting the write amplification.
- something that would implement an lsm storage might accumulate a full
in-memory level before flushing it out.
Thanks for the details.
One thing I'm wondering is in which memory context the slots end up
being allocated. I'd assume we would want to keep the slots around
between flushes. If they are in the temporary context this might prove
problematic however?I should not have used the word temporary, it actually is not
temporary. This memory conext will be created in begin_multi_insert(),
all the buffered tuples are copied using this context, it will be
reset at the end of each flush and reused. It can get destroyed at the
end in end_multi_insert(). I think we should even do this with the new
APIs implementation.Okay. But in which context are the slots themselves allocated then?
Because if we allocate the slots themselves in the context and then
reset with each flush we would have to also re-allocate the slots every
flush, which seems wasteful to me?
Buffer slots are allocated in the memory context in which the new APIs
get called. We don't have to re-allocate the slots every time after
flushing, but we have to clear them using ExecClearTuple() and reuse.
And the memory context I specified in the MultiInsertStateData
structure is for using table_multi_insert() inside the new
do_multi_insert API after we decide to flush. There's a comment in the
existing table_multi_insert() usage in copy code, which says that
table_multi_insert() may leak the memory, for the same reason we need
that temporary memory context, which gets set just before
table_multi_insert(), and reset after that. This happens for each
batch of tuples. And in the end this context can be deleted in the
end_multi_insert API.
Hope this helps.
I'm planning to summarize and post the new APIs description here again
for other opinions.
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
Hi,
Currently, required logic for multi inserts (such as buffer slots
allocation, flushing, tuple size calculation to decide when to flush,
cleanup and so on) is being handled outside of the existing tableam APIs.
And there are a good number of cases where multi inserts can be used, such
as for existing COPY or for CTAS, CREATE/REFRESH MATERIALIZED VIEW
[proposed in this thread], and INSERT INTO SELECTs [here
<https://www.postgresql.org/list/pgsql-hackers/since/202009240000/>] which
are currently under discussion. Handling the same multi inserts logic in
many places is error prone and duplicates most of the code. To avoid this,
proposing here are generic tableam APIs, that can be used in all the cases
and which also gives the flexibility to tableam developers in implementing
multi inserts logic dependent on the underlying storage engine[1]/messages/by-id/ca3dd08f-4ce0-01df-ba30-e9981bb0d54e@swarm64.com.
I would like to seek thoughts/opinions on the proposed new APIs. Once
reviewed, I will start implementing them.
[1]: /messages/by-id/ca3dd08f-4ce0-01df-ba30-e9981bb0d54e@swarm64.com
/messages/by-id/ca3dd08f-4ce0-01df-ba30-e9981bb0d54e@swarm64.com
Below are the proposed structures and APIs:
/* Holds the multi insert related information. */
typedef struct MultiInsertStateData
{
/* A temporary memory context for multi insert. */
MemoryContext micontext;
/* Bulk insert state. */
BulkInsertStateData *bistate;
/* Array of buffered slots. */
TupleTableSlot **mislots;
/* Maximum number of slots that can be buffered. */
int32 nslots;
/* Number of slots that are currently buffered. */
int32 nused;
/*
* Maximum total tuple size that can be buffered in
* a single batch. Flush the buffered tuples if the
* current total tuple size, nsize >= nbytes.
*/
int64 nbytes;
/*
* Total tuple size in bytes of the slots that are
* currently buffered.
*/
int64 nsize;
/*
* Whether to clear the buffered slots content
* after the flush? If the relation has indexes
* or after row triggers, the buffered slots
* required outside do_multi_insert() and clean
* them using ExecClearTuple() outside the
* do_multi_insert API. If true, do_multi_insert()
* can clear the slots.
*/
bool clearslots;
/*
* If true, do_multi_insert will flush the buffered
* slots, if any, bypassing the slot count and total
* tuple size checks. This can be useful in cases,
* where one of the partition can not use multi inserts
* but others can and they have buffered few slots
* so far, which need to be flushed for visibility,
* before the partition that doesn't support can
* proceed with single inserts.
*/
bool forceflush;
} MultiInsertStateData;
/*
* Allocates and initializes the MultiInsertStateData. Creates a temporary
* memory context for multi inserts, allocates BulkInsertStateData.
*/
void (*begin_multi_insert) (Relation rel,
MultiInsertStateData **mistate,
uint32 nslots,
uint64 nbytes);
/*
* Buffers the input slot into mistate slots. Computes the size of the
tuple,
* and adds it to the total size of the buffered tuples. If this size
crosses
* nbytes, flush the buffered tuples into the table. Clear the buffered
slots
* content if clearslots is true. If nbytes i.e. the maximum total tuple
size
* of the buffered tuples is not given, the tuple size is not calculated,
* tuples are buffered until all the nslots are filled and then flushed.
*
* For heapam, existing heap_multi_insert can be called using
* rel->rd_tableam->multi_insert() for flushing.
*/
void (*do_multi_insert) (Relation rel,
struct MultiInsertStateData *mistate,
struct TupleTableSlot *slot,
CommandId cid,
int options);
/*
* Flush the buffered tuples if any. Clear the buffered slots content if
* clearslots is true. Deletes temporary memory context and deallocates
* mistate.
*/
void (*end_multi_insert) (Relation rel,
struct MultiInsertStateData *mistate,
CommandId cid,
int options);
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Mon, Nov 30, 2020 at 10:49 AM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
Hi,
Currently, required logic for multi inserts (such as buffer slots allocation, flushing, tuple size calculation to decide when to flush, cleanup and so on) is being handled outside of the existing tableam APIs. And there are a good number of cases where multi inserts can be used, such as for existing COPY or for CTAS, CREATE/REFRESH MATERIALIZED VIEW [proposed in this thread], and INSERT INTO SELECTs [here] which are currently under discussion. Handling the same multi inserts logic in many places is error prone and duplicates most of the code. To avoid this, proposing here are generic tableam APIs, that can be used in all the cases and which also gives the flexibility to tableam developers in implementing multi inserts logic dependent on the underlying storage engine[1].
I would like to seek thoughts/opinions on the proposed new APIs. Once reviewed, I will start implementing them.
IMHO, if we think that something really specific to the tableam then
it makes sense to move it there. But just to avoid duplicating the
code it might not be the best idea. Instead, you can write some
common functions and we can call them from different places. So if
something is very much common and will not vary based on the storage
type we can keep it outside the tableam interface however we can move
them into some common functions to avoid duplication.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
On Thu, Dec 3, 2020 at 1:38 PM Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Mon, Nov 30, 2020 at 10:49 AM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:Currently, required logic for multi inserts (such as buffer slots allocation, flushing, tuple size calculation to decide when to flush, cleanup and so on) is being handled outside of the existing tableam APIs. And there are a good number of cases where multi inserts can be used, such as for existing COPY or for CTAS, CREATE/REFRESH MATERIALIZED VIEW [proposed in this thread], and INSERT INTO SELECTs [here] which are currently under discussion. Handling the same multi inserts logic in many places is error prone and duplicates most of the code. To avoid this, proposing here are generic tableam APIs, that can be used in all the cases and which also gives the flexibility to tableam developers in implementing multi inserts logic dependent on the underlying storage engine[1].
I would like to seek thoughts/opinions on the proposed new APIs. Once reviewed, I will start implementing them.
IMHO, if we think that something really specific to the tableam then
it makes sense to move it there. But just to avoid duplicating the
code it might not be the best idea. Instead, you can write some
common functions and we can call them from different places. So if
something is very much common and will not vary based on the storage
type we can keep it outside the tableam interface however we can move
them into some common functions to avoid duplication.
Thanks for the response. Main design goal of the new APIs is to give
flexibility to tableam developers in implementing multi insert logic
dependent on the underlying storage engine. Currently, for all the
underlying storage engines, we follow the same multi insert logic such
as when and how to flush the buffered tuples, tuple size calculation,
and this logic doesn't take into account the underlying storage engine
capabilities. Please have a look at [1]/messages/by-id/ca3dd08f-4ce0-01df-ba30-e9981bb0d54e@swarm64.com where this point was brought
up by @Luc Vlaming. The subsequent discussion went on to some level of
agreement on the proposed APIs.
I want to clarify that avoiding duplicate multi insert code (for COPY,
CTAS, CREATE/REFRESH MAT VIEW and INSERT SELECTs) is a byproduct(not a
main design goal) if we implement the new APIs for heap AM. I feel
sorry for projecting the goal as avoiding duplicate code earlier.
I also want to mention that @Andres Freund visualized similar kinds of
APIs in [2]/messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de.
I tried to keep the API as generic as possible, please have a look at
the new structure and APIs [3]/messages/by-id/CALj2ACV8_O651C2zUqrVSRFDJkp8=TMwSdG9+mDGL+vF6CD+AQ@mail.gmail.com.
Thoughts?
[1]: /messages/by-id/ca3dd08f-4ce0-01df-ba30-e9981bb0d54e@swarm64.com
[2]: /messages/by-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
[3]: /messages/by-id/CALj2ACV8_O651C2zUqrVSRFDJkp8=TMwSdG9+mDGL+vF6CD+AQ@mail.gmail.com
With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
On Mon, 16 Nov 2020 at 15:32, Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
On Mon, Nov 16, 2020 at 8:02 PM Paul Guo <guopa@vmware.com> wrote:
On Nov 13, 2020, at 7:21 PM, Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> wrote:
On Tue, Nov 10, 2020 at 3:47 PM Paul Guo <guopa@vmware.com> wrote:
Thanks for doing this. There might be another solution - use raw insert interfaces (i.e. raw_heap_insert()).
Attached is the test (not formal) patch that verifies this idea. raw_heap_insert() writes the page into the
table files directly and also write the FPI xlog when the tuples filled up the whole page. This seems be
more efficient.Thanks. Will the new raw_heap_insert() APIs scale well (i.e. extend
the table parallelly) with parallelism? The existing
table_multi_insert() API scales well, see, for instance, the benefit
with parallel copy[1] and parallel multi inserts in CTAS[2].Yes definitely some work needs to be done to make raw heap insert interfaces fit the parallel work, but
it seems that there is no hard blocking issues for this?I may be wrong here. If we were to allow raw heap insert APIs to
handle parallelism, shouldn't we need some sort of shared memory to
allow coordination among workers? If we do so, at the end, aren't
these raw insert APIs equivalent to current table_multi_insert() API
which uses a separate shared ring buffer(bulk insert state) for
insertions?And can we think of these raw insert APIs similar to the behaviour of
table_multi_insert() API for unlogged tables?
I found the additional performance of Paul Guo's work to be compelling
and the idea workable for very large loads.
Surely LockRelationForExtension() is all the inter-process
coordination we need to make this work for parallel loads?
--
Simon Riggs http://www.EnterpriseDB.com/