From 6252155dcdd0a21e15fec3d8e8fe77a9b2f2b29a Mon Sep 17 00:00:00 2001 From: Paul Guo Date: Tue, 10 Nov 2020 10:26:38 +0800 Subject: [PATCH v1] ctas using raw insert --- src/backend/access/heap/heapam_handler.c | 5 ++ src/backend/access/heap/rewriteheap.c | 84 ++++++++++++++++++++++++++++++++ src/backend/commands/createas.c | 10 +++- src/include/access/heapam.h | 4 ++ src/include/access/rewriteheap.h | 2 + src/include/access/tableam.h | 22 +++++++++ 6 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index dcaea7135f..8caa5dfef9 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2529,6 +2529,11 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_raw_insert_begin = heap_raw_insert_begin, + .tuple_raw_insert = heap_raw_insert, + .tuple_raw_insert_end = heap_raw_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 39e33763df..47f99ffa74 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -292,6 +292,90 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm return state; } +#if 0 +typedef struct RawWriteStateData +{ + Relation rs_new_rel; /* destination heap */ + Page rs_buffer; /* page currently being built */ + BlockNumber rs_blockno; /* block where page will go */ + bool rs_buffer_valid; /* T if any tuples in buffer */ + MemoryContext rs_cxt; +} *RawWriteState; +#endif + +void heap_raw_insert(RewriteState state, TupleTableSlot *slot) +{ + HeapTuple tuple; + bool shouldFree; + + tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + raw_heap_insert(state, tuple); + + if (shouldFree) + pfree(tuple); +} + +RewriteState +heap_raw_insert_begin(Relation new_heap) +{ + RewriteState state; + MemoryContext rw_cxt; + MemoryContext old_cxt; + + rw_cxt = AllocSetContextCreate(CurrentMemoryContext, + "Heap raw write", + ALLOCSET_DEFAULT_SIZES); + old_cxt = MemoryContextSwitchTo(rw_cxt); + + /* Create and fill in the state struct */ + state = palloc0(sizeof(RewriteStateData)); + + state->rs_new_rel = new_heap; + state->rs_buffer = (Page) palloc(BLCKSZ); + /* new_heap needn't be empty, just locked */ + state->rs_blockno = RelationGetNumberOfBlocks(new_heap); + state->rs_buffer_valid = false; + state->rs_cxt = rw_cxt; + + MemoryContextSwitchTo(old_cxt); + + return state; +} + +void +heap_raw_insert_end(RewriteState state) +{ + /* Write the last page, if any */ + if (state->rs_buffer_valid) + { + if (RelationNeedsWAL(state->rs_new_rel)) + log_newpage(&state->rs_new_rel->rd_node, + MAIN_FORKNUM, + state->rs_blockno, + state->rs_buffer, + true); + RelationOpenSmgr(state->rs_new_rel); + + PageSetChecksumInplace(state->rs_buffer, state->rs_blockno); + + smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno, + (char *) state->rs_buffer, true); + } + + /* + * When we WAL-logged rel pages, we must nonetheless fsync them. The + * reason is the same as in storage.c's RelationCopyStorage(): we're + * writing data that's not in shared buffers, and so a CHECKPOINT + * occurring during the rewriteheap operation won't have fsync'd data we + * wrote before the checkpoint. + */ + if (RelationNeedsWAL(state->rs_new_rel)) + smgrimmedsync(state->rs_new_rel->rd_smgr, MAIN_FORKNUM); /* test the time */ + + /* Deleting the context frees everything */ + MemoryContextDelete(state->rs_cxt); +} + /* * End a rewrite. * diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index d53ec952d0..723226f029 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -61,6 +61,7 @@ typedef struct CommandId output_cid; /* cmin to insert in output tuples */ int ti_options; /* table_tuple_insert performance options */ BulkInsertState bistate; /* bulk insert state */ + RewriteState state; } DR_intorel; /* utility functions for CTAS definition creation */ @@ -549,11 +550,13 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * Fill private fields of myState for use by later routines */ myState->rel = intoRelationDesc; + myState->state = table_tuple_raw_insert_begin(intoRelationDesc); myState->reladdr = intoRelationAddr; myState->output_cid = GetCurrentCommandId(true); myState->ti_options = TABLE_INSERT_SKIP_FSM; myState->bistate = GetBulkInsertState(); + /* * Valid smgr_targblock implies something already wrote to the relation. * This may be harmless, but this function hasn't planned for it. @@ -578,12 +581,14 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * tuple's xmin), but since we don't do that here... */ + table_tuple_raw_insert(myState->rel, myState->state, slot); +#if 0 table_tuple_insert(myState->rel, slot, myState->output_cid, myState->ti_options, myState->bistate); - +#endif /* We know this is a newly created relation, so there are no indexes */ return true; @@ -599,7 +604,10 @@ intorel_shutdown(DestReceiver *self) FreeBulkInsertState(myState->bistate); +#if 0 table_finish_bulk_insert(myState->rel, myState->ti_options); +#endif + table_tuple_raw_insert_end(myState->rel, myState->state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 92b19dba32..4b05d963c2 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -135,6 +135,10 @@ extern BulkInsertState GetBulkInsertState(void); extern void FreeBulkInsertState(BulkInsertState); extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); +extern void heap_raw_insert(RewriteState state, TupleTableSlot *slot); +extern RewriteState heap_raw_insert_begin(Relation new_heap); +extern void heap_raw_insert_end(RewriteState state); + extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h index e6d7fa1e65..17fa31f066 100644 --- a/src/include/access/rewriteheap.h +++ b/src/include/access/rewriteheap.h @@ -25,6 +25,8 @@ extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap, TransactionId OldestXmin, TransactionId FreezeXid, MultiXactId MultiXactCutoff); extern void end_heap_rewrite(RewriteState state); +extern RewriteState heap_raw_insert_begin(Relation new_heap); +extern void heap_raw_insert_end(RewriteState state); extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple, HeapTuple newTuple); extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 387eb34a61..75e245bd56 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -23,6 +23,7 @@ #include "utils/guc.h" #include "utils/rel.h" #include "utils/snapshot.h" +#include "access/rewriteheap.h" #define DEFAULT_TABLE_ACCESS_METHOD "heap" @@ -352,6 +353,9 @@ typedef struct TableAmRoutine * Manipulations of physical tuples. * ------------------------------------------------------------------------ */ + RewriteState (*tuple_raw_insert_begin)(Relation new_heap); + void (*tuple_raw_insert_end) (RewriteState state); + void (*tuple_raw_insert)(RewriteState state, TupleTableSlot *slot); /* see table_tuple_insert() for reference about parameters */ void (*tuple_insert) (Relation rel, TupleTableSlot *slot, @@ -1140,6 +1144,24 @@ table_compute_xid_horizon_for_tuples(Relation rel, * ---------------------------------------------------------------------------- */ +static inline RewriteState +table_tuple_raw_insert_begin(Relation rel) +{ + return rel->rd_tableam->tuple_raw_insert_begin(rel); +} + +static inline void +table_tuple_raw_insert(Relation rel, RewriteState state, TupleTableSlot *slot) +{ + rel->rd_tableam->tuple_raw_insert(state, slot); +} + +static inline void +table_tuple_raw_insert_end(Relation rel, RewriteState state) +{ + rel->rd_tableam->tuple_raw_insert_end(state); +} + /* * Insert a tuple from a slot into table AM routine. * -- 2.14.3