From 63ee9cfc5076c1fe0bc36b1d619f20f8133fb7a2 Mon Sep 17 00:00:00 2001 From: Hari Date: Thu, 1 Jun 2017 15:41:01 +1000 Subject: [PATCH 04/10] Heap storage handler Adding the handler function in the relation and add other required function hooks to support the heap storage --- src/backend/access/heap/heapam.c | 510 +++++++++++++++++++++++++++++++++---- src/backend/utils/cache/relcache.c | 117 ++++++++- src/include/access/heapam.h | 43 +--- src/include/catalog/pg_proc.h | 5 + src/include/utils/rel.h | 12 + src/include/utils/relcache.h | 2 + 6 files changed, 595 insertions(+), 94 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e890e08..fe8f91f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -45,6 +45,7 @@ #include "access/multixact.h" #include "access/parallel.h" #include "access/relscan.h" +#include "access/storageamapi.h" #include "access/sysattr.h" #include "access/transam.h" #include "access/tuptoaster.h" @@ -66,6 +67,7 @@ #include "storage/smgr.h" #include "storage/spin.h" #include "storage/standby.h" +#include "utils/builtins.h" #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -74,11 +76,21 @@ #include "utils/syscache.h" #include "utils/tqual.h" - /* GUC variable */ bool synchronize_seqscans = true; - +static void heap_setscanlimits(HeapScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks); +static HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key); +static HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key); +static HeapScanDesc heap_beginscan_strat(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key, + bool allow_strat, bool allow_sync); +static HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key); +static HeapScanDesc heap_beginscan_sampling(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key, + bool allow_strat, bool allow_sync, bool allow_pagemode); static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, @@ -89,7 +101,36 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, bool is_bitmapscan, bool is_samplescan, bool temp_snap); +static StorageTuple heap_getnext(HeapScanDesc sscan, ScanDirection direction); +static void heap_endscan(HeapScanDesc sscan); + +static void heapgetpage(HeapScanDesc scan, BlockNumber page); +static void heap_rescan(HeapScanDesc scan, ScanKey key); +static void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, + bool allow_strat, bool allow_sync, bool allow_pagemode); +static void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot); + +static bool heap_fetch(Relation relation, + ItemPointer tid, + Snapshot snapshot, + StorageTuple *stuple, + Buffer *userbuf, + bool keep_buf, + Relation stats_relation); + static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); + +static HTSU_Result heap_delete(Relation relation, ItemPointer tid, + CommandId cid, Snapshot crosscheck, bool wait, + HeapUpdateFailureData *hufd); +static HTSU_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, + CommandId cid, Snapshot crosscheck, bool wait, + HeapUpdateFailureData *hufd, LockTupleMode *lockmode); +static HTSU_Result heap_lock_tuple(Relation relation, ItemPointer tid, StorageTuple stuple, + CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, + bool follow_updates, + Buffer *buffer, HeapUpdateFailureData *hufd); + static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -124,6 +165,9 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified, bool *copy); +static void heapam_finish_speculative(Relation relation, TupleTableSlot *slot); +static void heapam_abort_speculative(Relation relation, TupleTableSlot *slot); +static void heap_finish_speculative(Relation relation, HeapTuple tuple); /* @@ -202,6 +246,358 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = (MultiXactStatusLock[(status)]) /* ---------------------------------------------------------------- + * storage AM support routines for heapam + * ---------------------------------------------------------------- + */ + +/* + * Insert a heap tuple from a slot, which may contain an OID and speculative + * insertion token. + */ +static Oid +heapam_heap_insert(Relation relation, TupleTableSlot *slot, CommandId cid, + int options, BulkInsertState bistate, + ItemPointer iptr) +{ + Oid oid; + HeapTuple tuple = NULL; + + if (slot->tts_storage) + { + HeapamTuple *htuple = slot->tts_storage; + tuple = htuple->hst_heaptuple; + + if (relation->rd_rel->relhasoids) + HeapTupleSetOid(tuple, InvalidOid); + } + else + { + /* + * Obtain the physical tuple to insert, building from the slot values. + * XXX: maybe the slot already contains a physical tuple in the right + * format? In fact, if the slot isn't fully deformed, this is completely + * bogus ... + */ + tuple = heap_form_tuple(slot->tts_tupleDescriptor, + slot->tts_values, + slot->tts_isnull); + } + + /* Set the OID, if the slot has one */ + if (slot->tts_tupleOid != InvalidOid) + HeapTupleHeaderSetOid(tuple->t_data, slot->tts_tupleOid); + + /* Update the tuple with table oid */ + if (slot->tts_tableOid != InvalidOid) + tuple->t_tableOid = slot->tts_tableOid; + + /* Set the speculative insertion token, if the slot has one */ + if ((options & HEAP_INSERT_SPECULATIVE) && slot->tts_speculativeToken) + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, slot->tts_speculativeToken); + + /* Perform the insertion, and copy the resulting ItemPointer */ + oid = heap_insert(relation, tuple, cid, options, bistate); + ItemPointerCopy(&tuple->t_self, iptr); + + if (slot->tts_storage == NULL) + ExecStoreTuple(tuple, slot, InvalidBuffer, true); + + return oid; +} + +static HTSU_Result +heapam_heap_delete(Relation relation, ItemPointer tid, CommandId cid, + Snapshot crosscheck, bool wait, + HeapUpdateFailureData *hufd) +{ + return heap_delete(relation, tid, cid, crosscheck, wait, hufd); +} + +static HTSU_Result +heapam_heap_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, + CommandId cid, Snapshot crosscheck, bool wait, + HeapUpdateFailureData *hufd, LockTupleMode *lockmode, + ItemPointer iptr) +{ + HeapTuple tuple; + HTSU_Result result; + + if (slot->tts_storage) + { + HeapamTuple *htuple = slot->tts_storage; + tuple = htuple->hst_heaptuple; + } + else + { + tuple = heap_form_tuple(slot->tts_tupleDescriptor, + slot->tts_values, + slot->tts_isnull); + } + + /* Set the OID, if the slot has one */ + if (slot->tts_tupleOid != InvalidOid) + HeapTupleHeaderSetOid(tuple->t_data, slot->tts_tupleOid); + + /* Update the tuple with table oid */ + if (slot->tts_tableOid != InvalidOid) + tuple->t_tableOid = slot->tts_tableOid; + + result = heap_update(relation, otid, tuple, cid, crosscheck, wait, + hufd, lockmode); + ItemPointerCopy(&tuple->t_self, iptr); + + if (slot->tts_storage == NULL) + ExecStoreTuple(tuple, slot, InvalidBuffer, true); + + return result; +} + +static void +heapam_finish_speculative(Relation relation, TupleTableSlot *slot) +{ + HeapamTuple *stuple = (HeapamTuple *) slot->tts_storage; + + Assert(slot->tts_speculativeToken != 0); + + heap_finish_speculative(relation, stuple->hst_heaptuple); + slot->tts_speculativeToken = 0; +} + +static void +heapam_abort_speculative(Relation relation, TupleTableSlot *slot) +{ + HeapamTuple *stuple = (HeapamTuple *) slot->tts_storage; + + Assert(slot->tts_speculativeToken != 0); + + heap_abort_speculative(relation, stuple->hst_heaptuple); + slot->tts_speculativeToken = 0; +} + +static TransactionId +heapam_get_xmin(StorageTuple tuple) +{ + TransactionId xmin; + + xmin = HeapTupleHeaderGetXmin(((HeapTuple)tuple)->t_data); + return xmin; +} + +static TransactionId +heapam_get_updated_xid(StorageTuple tuple) +{ + TransactionId xid; + + xid = HeapTupleHeaderGetUpdateXid(((HeapTuple)tuple)->t_data); + return xid; +} + +static CommandId +heapam_get_cmin(StorageTuple tuple) +{ + return HeapTupleHeaderGetCmin(((HeapTuple)tuple)->t_data); +} + +static ItemPointer +heapam_get_itempointer(StorageTuple tuple) +{ + return &(((HeapTuple)tuple)->t_self); +} + +static ItemPointerData +heapam_get_ctid(StorageTuple tuple) +{ + return ((HeapTuple)tuple)->t_data->t_ctid; +} + +static bool +heapam_tuple_is_heaopnly(StorageTuple tuple) +{ + return HeapTupleIsHeapOnly((HeapTuple)tuple); +} + +static StorageTuple +heapam_form_tuple_by_datum(Datum data, Oid tableoid) +{ + HeapTuple newTuple; + HeapTupleHeader td; + + td = DatumGetHeapTupleHeader(data); + + newTuple = (HeapTuple) palloc(HEAPTUPLESIZE + HeapTupleHeaderGetDatumLength(td)); + newTuple->t_len = HeapTupleHeaderGetDatumLength(td); + newTuple->t_self = td->t_ctid; + newTuple->t_tableOid = tableoid; + newTuple->t_data = (HeapTupleHeader) ((char *) newTuple + HEAPTUPLESIZE); + memcpy((char *) newTuple->t_data, (char *) td, newTuple->t_len); + + return newTuple; +} + +static HeapTuple +heapam_get_tuple(TupleTableSlot *slot) +{ + HeapamTuple *stuple = (HeapamTuple *)slot->tts_storage; + + return stuple->hst_heaptuple; +} + +static bool +heapam_is_physical_tuple(TupleTableSlot *slot) +{ + HeapamTuple *stuple = (HeapamTuple *)slot->tts_storage; + + return (stuple->hst_heaptuple != &slot->tts_minhdr); +} + +static void +slot_update_tuple_tableoid(TupleTableSlot *slot, Oid tableoid) +{ + HeapTuple tuple; + + tuple = ExecHeapifySlot(slot); + tuple->t_tableOid = tableoid; +} + +static void +slot_set_tuple_oid(TupleTableSlot *slot, Oid tupleoid) +{ + HeapTuple tuple; + + tuple = ExecHeapifySlot(slot); + HeapTupleSetOid(tuple, tupleoid); +} + +static void +heapam_slot_store_tuple(TupleTableSlot *slot, StorageTuple tuple, bool shouldFree) +{ + HeapamTuple *stuple; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(slot->tts_mcxt); + + stuple = (HeapamTuple *)palloc(sizeof(HeapamTuple)); + stuple->hst_heaptuple = tuple; + stuple->hst_shouldFree = shouldFree; + stuple->hst_slow = false; + stuple->hst_off = 0; + + MemoryContextSwitchTo(oldcontext); + + slot->tts_storage = stuple; +} + +static void +heapam_slot_clear_tuple(TupleTableSlot *slot) +{ + HeapamTuple *stuple; + + /* XXX should this be an Assert() instead? */ + if (slot->tts_isempty) + return; + + stuple = slot->tts_storage; + if (stuple == NULL) + return; + + if (stuple->hst_shouldFree) + heap_freetuple(stuple->hst_heaptuple); + if (slot->tts_shouldFreeMin) + heap_free_minimal_tuple(slot->tts_mintuple); + + slot->tts_shouldFreeMin = false; + + pfree(stuple); + slot->tts_storage = NULL; +} + +static HeapTuple +heapam_copy_tuple(TupleTableSlot *slot) +{ + HeapamTuple *stuple = (HeapamTuple *)slot->tts_storage; + return heap_copytuple(stuple->hst_heaptuple); +} + +static MinimalTuple +minimal_tuple_from_heapam_tuple(TupleTableSlot *slot) +{ + HeapamTuple *stuple = (HeapamTuple *)slot->tts_storage; + return minimal_tuple_from_heap_tuple(stuple->hst_heaptuple); +} + +static void +heapam_tuple_set_speculative_token(TupleTableSlot *slot, uint32 specToken) +{ + HeapTuple tuple; + + /* + * This implementation violates the contract of ExecFetchSlotTuple, which + * states that the returned tuple must be treated as read-only. Probably + * the way to fix this is to set the speculative token in the slot rather + * than the heaptuple directly, so that it can correctly be set on the + * tuple once it is materialized. + */ + tuple = ExecFetchSlotTuple(slot); + HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken); +} + +Datum +heapam_storage_handler(PG_FUNCTION_ARGS) +{ + StorageAmRoutine *amroutine = makeNode(StorageAmRoutine); + + amroutine->scan_begin = heap_beginscan; + amroutine->scan_begin_catalog = heap_beginscan_catalog; + amroutine->scan_begin_strat = heap_beginscan_strat; + amroutine->scan_begin_bm = heap_beginscan_bm; + amroutine->scan_begin_sampling = heap_beginscan_sampling; + amroutine->scansetlimits = heap_setscanlimits; + amroutine->scan_getnext = heap_getnext; + amroutine->scan_end = heap_endscan; + //amroutine->scan_getpage = ; + amroutine->scan_rescan = heap_rescan; + amroutine->scan_rescan_set_params = heap_rescan_set_params; + amroutine->scan_update_snapshot = heap_update_snapshot; + + amroutine->tuple_fetch = heap_fetch; + + amroutine->tuple_insert = heapam_heap_insert; + amroutine->tuple_delete = heapam_heap_delete; + amroutine->tuple_update = heapam_heap_update; + amroutine->tuple_lock = heap_lock_tuple; + + amroutine->tuple_get_cmin = heapam_get_cmin; + amroutine->tuple_get_xmin = heapam_get_xmin; + amroutine->tuple_get_updated_xid = heapam_get_updated_xid; + amroutine->tuple_get_itempointer = heapam_get_itempointer; + amroutine->tuple_get_ctid = heapam_get_ctid; + amroutine->tuple_is_heaponly = heapam_tuple_is_heaopnly; + amroutine->tuple_from_datum = heapam_form_tuple_by_datum; + + amroutine->slot_store_tuple = heapam_slot_store_tuple; + amroutine->slot_getattr = slot_getattr; + amroutine->slot_getallattrs = slot_getallattrs; + amroutine->slot_getsomeattrs = slot_getsomeattrs; + amroutine->slot_attisnull = slot_attisnull; + amroutine->slot_clear_tuple = heapam_slot_clear_tuple; + amroutine->slot_copy_tuple = heapam_copy_tuple; + amroutine->slot_get_tuple = heapam_get_tuple; + amroutine->slot_copy_min_tuple = minimal_tuple_from_heapam_tuple; + amroutine->slot_is_physical_tuple = heapam_is_physical_tuple; + amroutine->slot_update_tableoid = slot_update_tuple_tableoid; + amroutine->slot_set_tuple_oid = slot_set_tuple_oid; + + + amroutine->tuple_set_speculative_token = heapam_tuple_set_speculative_token; + amroutine->speculative_finish = heapam_finish_speculative; + amroutine->speculative_abort = heapam_abort_speculative; + + /* XXX more? */ + + PG_RETURN_POINTER(amroutine); +} + +/* ---------------------------------------------------------------- * heap support routines * ---------------------------------------------------------------- */ @@ -321,9 +717,11 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * startBlk is the page to start at * numBlks is number of pages to scan (InvalidBlockNumber means "all") */ -void -heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks) +static void +heap_setscanlimits(HeapScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks) { + HeapScanDesc scan = (HeapScanDesc) sscan; + Assert(!scan->rs_inited); /* else too late to change */ Assert(!scan->rs_syncscan); /* else rs_startblock is significant */ @@ -1387,7 +1785,7 @@ heap_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, * also allows control of whether page-mode visibility checking is used. * ---------------- */ -HeapScanDesc +static HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { @@ -1395,7 +1793,7 @@ heap_beginscan(Relation relation, Snapshot snapshot, true, true, true, false, false, false); } -HeapScanDesc +static HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) { Oid relid = RelationGetRelid(relation); @@ -1405,7 +1803,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) true, true, true, false, false, true); } -HeapScanDesc +static HeapScanDesc heap_beginscan_strat(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync) @@ -1415,7 +1813,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot, false, false, false); } -HeapScanDesc +static HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { @@ -1423,7 +1821,7 @@ heap_beginscan_bm(Relation relation, Snapshot snapshot, false, false, true, true, false, false); } -HeapScanDesc +static HeapScanDesc heap_beginscan_sampling(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode) @@ -1574,7 +1972,7 @@ heap_rescan_set_params(HeapScanDesc scan, ScanKey key, * Check handling if reldesc caching. * ---------------- */ -void +static void heap_endscan(HeapScanDesc scan) { /* Note: no locking manipulations needed */ @@ -1790,9 +2188,11 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) #endif /* !defined(HEAPDEBUGALL) */ -HeapTuple -heap_getnext(HeapScanDesc scan, ScanDirection direction) +static StorageTuple +heap_getnext(HeapScanDesc sscan, ScanDirection direction) { + HeapScanDesc scan = (HeapScanDesc) sscan; + /* Note: no locking manipulations needed */ HEAPDEBUG_1; /* heap_getnext( info ) */ @@ -1858,20 +2258,21 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) * truncate off the destination page without having killed the referencing * tuple first), but the item number might well not be good. */ -bool +static bool heap_fetch(Relation relation, + ItemPointer tid, Snapshot snapshot, - HeapTuple tuple, + StorageTuple *stuple, Buffer *userbuf, bool keep_buf, Relation stats_relation) { - ItemPointer tid = &(tuple->t_self); ItemId lp; Buffer buffer; Page page; OffsetNumber offnum; bool valid; + HeapTupleData tuple; /* * Fetch and pin the appropriate page of the relation. @@ -1900,7 +2301,7 @@ heap_fetch(Relation relation, ReleaseBuffer(buffer); *userbuf = InvalidBuffer; } - tuple->t_data = NULL; + *stuple = NULL; return false; } @@ -1922,26 +2323,27 @@ heap_fetch(Relation relation, ReleaseBuffer(buffer); *userbuf = InvalidBuffer; } - tuple->t_data = NULL; + *stuple = NULL; return false; } /* - * fill in *tuple fields + * fill in tuple fields and place it in stuple */ - tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp); - tuple->t_len = ItemIdGetLength(lp); - tuple->t_tableOid = RelationGetRelid(relation); + ItemPointerCopy(tid, &(tuple.t_self)); + tuple.t_data = (HeapTupleHeader) PageGetItem(page, lp); + tuple.t_len = ItemIdGetLength(lp); + tuple.t_tableOid = RelationGetRelid(relation); /* * check time qualification of tuple, then release lock */ - valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); + valid = HeapTupleSatisfiesVisibility(&tuple, snapshot, buffer); if (valid) - PredicateLockTuple(relation, tuple, snapshot); + PredicateLockTuple(relation, &tuple, snapshot); - CheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot); + CheckForSerializableConflictOut(valid, relation, &tuple, buffer, snapshot); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -1957,6 +2359,7 @@ heap_fetch(Relation relation, if (stats_relation != NULL) pgstat_count_heap_fetch(stats_relation); + *stuple = heap_copytuple(&tuple); return true; } @@ -3007,7 +3410,7 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) * cannot obtain cmax from a combocid generated by another transaction). * See comments for struct HeapUpdateFailureData for additional info. */ -HTSU_Result +static HTSU_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, HeapUpdateFailureData *hufd) @@ -3458,7 +3861,7 @@ simple_heap_delete(Relation relation, ItemPointer tid) * cannot obtain cmax from a combocid generated by another transaction). * See comments for struct HeapUpdateFailureData for additional info. */ -HTSU_Result +static HTSU_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait, HeapUpdateFailureData *hufd, LockTupleMode *lockmode) @@ -4536,14 +4939,13 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) * * See README.tuplock for a thorough explanation of this mechanism. */ -HTSU_Result -heap_lock_tuple(Relation relation, HeapTuple tuple, +static HTSU_Result +heap_lock_tuple(Relation relation, ItemPointer tid, StorageTuple stuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, Buffer *buffer, HeapUpdateFailureData *hufd) { HTSU_Result result; - ItemPointer tid = &(tuple->t_self); ItemId lp; Page page; Buffer vmbuffer = InvalidBuffer; @@ -4556,6 +4958,9 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, bool first_time = true; bool have_tuple_lock = false; bool cleared_all_frozen = false; + HeapTuple tuple = (HeapTuple)stuple; + + Assert(tuple != NULL); *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); block = ItemPointerGetBlockNumber(tid); @@ -5621,7 +6026,7 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid, { HTSU_Result result; ItemPointerData tupid; - HeapTupleData mytup; + HeapTuple mytup; Buffer buf; uint16 new_infomask, new_infomask2, @@ -5641,9 +6046,8 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid, new_infomask = 0; new_xmax = InvalidTransactionId; block = ItemPointerGetBlockNumber(&tupid); - ItemPointerCopy(&tupid, &(mytup.t_self)); - if (!heap_fetch(rel, SnapshotAny, &mytup, &buf, false, NULL)) + if (!heap_fetch(rel, &tupid, SnapshotAny, (StorageTuple *)&mytup, &buf, false, NULL)) { /* * if we fail to find the updated version of the tuple, it's @@ -5689,7 +6093,7 @@ l4: * end of the chain, we're done, so return success. */ if (TransactionIdIsValid(priorXmax) && - !TransactionIdEquals(HeapTupleHeaderGetXmin(mytup.t_data), + !TransactionIdEquals(HeapTupleHeaderGetXmin(mytup->t_data), priorXmax)) { result = HeapTupleMayBeUpdated; @@ -5701,15 +6105,15 @@ l4: * (sub)transaction, then we already locked the last live one in the * chain, thus we're done, so return success. */ - if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(mytup.t_data))) + if (TransactionIdDidAbort(HeapTupleHeaderGetXmin(mytup->t_data))) { UnlockReleaseBuffer(buf); return HeapTupleMayBeUpdated; } - old_infomask = mytup.t_data->t_infomask; - old_infomask2 = mytup.t_data->t_infomask2; - xmax = HeapTupleHeaderGetRawXmax(mytup.t_data); + old_infomask = mytup->t_data->t_infomask; + old_infomask2 = mytup->t_data->t_infomask2; + xmax = HeapTupleHeaderGetRawXmax(mytup->t_data); /* * If this tuple version has been updated or locked by some concurrent @@ -5722,7 +6126,7 @@ l4: TransactionId rawxmax; bool needwait; - rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data); + rawxmax = HeapTupleHeaderGetRawXmax(mytup->t_data); if (old_infomask & HEAP_XMAX_IS_MULTI) { int nmembers; @@ -5752,7 +6156,7 @@ l4: { LockBuffer(buf, BUFFER_LOCK_UNLOCK); XactLockTableWait(members[i].xid, rel, - &mytup.t_self, + &mytup->t_self, XLTW_LockUpdated); pfree(members); goto l4; @@ -5811,7 +6215,7 @@ l4: if (needwait) { LockBuffer(buf, BUFFER_LOCK_UNLOCK); - XactLockTableWait(rawxmax, rel, &mytup.t_self, + XactLockTableWait(rawxmax, rel, &mytup->t_self, XLTW_LockUpdated); goto l4; } @@ -5823,7 +6227,7 @@ l4: } /* compute the new Xmax and infomask values for the tuple ... */ - compute_new_xmax_infomask(xmax, old_infomask, mytup.t_data->t_infomask2, + compute_new_xmax_infomask(xmax, old_infomask, mytup->t_data->t_infomask2, xid, mode, false, &new_xmax, &new_infomask, &new_infomask2); @@ -5835,11 +6239,11 @@ l4: START_CRIT_SECTION(); /* ... and set them */ - HeapTupleHeaderSetXmax(mytup.t_data, new_xmax); - mytup.t_data->t_infomask &= ~HEAP_XMAX_BITS; - mytup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; - mytup.t_data->t_infomask |= new_infomask; - mytup.t_data->t_infomask2 |= new_infomask2; + HeapTupleHeaderSetXmax(mytup->t_data, new_xmax); + mytup->t_data->t_infomask &= ~HEAP_XMAX_BITS; + mytup->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; + mytup->t_data->t_infomask |= new_infomask; + mytup->t_data->t_infomask2 |= new_infomask2; MarkBufferDirty(buf); @@ -5853,7 +6257,7 @@ l4: XLogBeginInsert(); XLogRegisterBuffer(0, buf, REGBUF_STANDARD); - xlrec.offnum = ItemPointerGetOffsetNumber(&mytup.t_self); + xlrec.offnum = ItemPointerGetOffsetNumber(&mytup->t_self); xlrec.xmax = new_xmax; xlrec.infobits_set = compute_infobits(new_infomask, new_infomask2); xlrec.flags = @@ -5869,17 +6273,17 @@ l4: END_CRIT_SECTION(); /* if we find the end of update chain, we're done. */ - if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID || - ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) || - HeapTupleHeaderIsOnlyLocked(mytup.t_data)) + if (mytup->t_data->t_infomask & HEAP_XMAX_INVALID || + ItemPointerEquals(&mytup->t_self, &mytup->t_data->t_ctid) || + HeapTupleHeaderIsOnlyLocked(mytup->t_data)) { result = HeapTupleMayBeUpdated; goto out_locked; } /* tail recursion */ - priorXmax = HeapTupleHeaderGetUpdateXid(mytup.t_data); - ItemPointerCopy(&(mytup.t_data->t_ctid), &tupid); + priorXmax = HeapTupleHeaderGetUpdateXid(mytup->t_data); + ItemPointerCopy(&(mytup->t_data->t_ctid), &tupid); UnlockReleaseBuffer(buf); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); @@ -5959,7 +6363,7 @@ heap_lock_updated_tuple(Relation rel, HeapTuple tuple, ItemPointer ctid, * but clearing the token at completion isn't very expensive either. * An explicit confirmation WAL record also makes logical decoding simpler. */ -void +static void heap_finish_speculative(Relation relation, HeapTuple tuple) { Buffer buffer; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index c2e8361..98dcc85 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -34,6 +34,7 @@ #include "access/multixact.h" #include "access/nbtree.h" #include "access/reloptions.h" +#include "access/storageamapi.h" #include "access/sysattr.h" #include "access/xact.h" #include "access/xlog.h" @@ -1360,10 +1361,26 @@ RelationBuildDesc(Oid targetRelId, bool insertIt) } /* - * if it's an index, initialize index-related information + * initialize access method information */ - if (OidIsValid(relation->rd_rel->relam)) - RelationInitIndexAccessInfo(relation); + switch (relation->rd_rel->relkind) + { + case RELKIND_INDEX: + Assert(relation->rd_rel->relkind != InvalidOid); + RelationInitIndexAccessInfo(relation); + break; + case RELKIND_RELATION: + case RELKIND_SEQUENCE: + case RELKIND_TOASTVALUE: + case RELKIND_VIEW: /* Not exactly the storage, but underlying tuple access, it is required */ + case RELKIND_MATVIEW: + case RELKIND_PARTITIONED_TABLE: + RelationInitStorageAccessInfo(relation); + break; + default: + /* nothing to do in other cases */ + break; + } /* extract reloptions if any */ RelationParseRelOptions(relation, pg_class_tuple); @@ -1860,6 +1877,71 @@ LookupOpclassInfo(Oid operatorClassOid, return opcentry; } +/* + * Fill in the StorageAmRoutine for a relation + * + * relation's rd_amhandler and rd_indexcxt (XXX?) must be valid already. + */ +static void +InitStorageAmRoutine(Relation relation) +{ + StorageAmRoutine *cached, + *tmp; + + /* + * Call the amhandler in current, short-lived memory context, just in case + * it leaks anything (it probably won't, but let's be paranoid). + */ + tmp = GetStorageAmRoutine(relation->rd_amhandler); + + /* XXX do we need a separate memory context for this? */ + /* OK, now transfer the data into cache context */ + cached = (StorageAmRoutine *) MemoryContextAlloc(CacheMemoryContext, + sizeof(StorageAmRoutine)); + memcpy(cached, tmp, sizeof(StorageAmRoutine)); + relation->rd_stamroutine = cached; + + pfree(tmp); +} + +/* + * Initialize storage-access-method support data for a heap relation + */ +void +RelationInitStorageAccessInfo(Relation relation) +{ + HeapTuple tuple; + Form_pg_am aform; + + /* + * Relations that don't have a catalogued storage access method use the + * standard heapam module; otherwise a catalog lookup is in order. + */ + if (!OidIsValid(relation->rd_rel->relam)) + { + relation->rd_amhandler = HEAPAM_STORAGE_AM_HANDLER_OID; + } + else + { + /* + * Look up the storage access method, save the OID of its handler + * function. + */ + tuple = SearchSysCache1(AMOID, + ObjectIdGetDatum(relation->rd_rel->relam)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for access method %u", + relation->rd_rel->relam); + aform = (Form_pg_am) GETSTRUCT(tuple); + relation->rd_amhandler = aform->amhandler; + ReleaseSysCache(tuple); + } + + /* + * Now we can fetch the storage AM's API struct + */ + InitStorageAmRoutine(relation); +} /* * formrdesc @@ -2019,6 +2101,11 @@ formrdesc(const char *relationName, Oid relationReltype, RelationInitPhysicalAddr(relation); /* + * initialize the storage am handler + */ + relation->rd_stamroutine = GetHeapamStorageAmRoutine(); + + /* * initialize the rel-has-index flag, using hardwired knowledge */ if (IsBootstrapProcessingMode()) @@ -2346,6 +2433,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc) pfree(relation->rd_pubactions); if (relation->rd_options) pfree(relation->rd_options); + if (relation->rd_stamroutine) + pfree(relation->rd_stamroutine); if (relation->rd_indextuple) pfree(relation->rd_indextuple); if (relation->rd_indexcxt) @@ -3357,6 +3446,13 @@ RelationBuildLocalRelation(const char *relname, RelationInitPhysicalAddr(rel); + if (relkind == RELKIND_RELATION || + relkind == RELKIND_MATVIEW || + relkind == RELKIND_VIEW || /* Not exactly the storage, but underlying tuple access, it is required */ + relkind == RELKIND_PARTITIONED_TABLE || + relkind == RELKIND_TOASTVALUE) + RelationInitStorageAccessInfo(rel); + /* * Okay to insert into the relcache hash table. * @@ -3878,6 +3974,18 @@ RelationCacheInitializePhase3(void) restart = true; } + if (relation->rd_stamroutine == NULL && + (relation->rd_rel->relkind == RELKIND_RELATION || + relation->rd_rel->relkind == RELKIND_MATVIEW || + relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE || + relation->rd_rel->relkind == RELKIND_TOASTVALUE)) + { + RelationInitStorageAccessInfo(relation); + Assert (relation->rd_stamroutine != NULL); + + restart = true; + } + /* Release hold on the relation */ RelationDecrementReferenceCount(relation); @@ -5593,6 +5701,9 @@ load_relcache_init_file(bool shared) if (rel->rd_isnailed) nailed_rels++; + /* Load storage AM stuff */ + RelationInitStorageAccessInfo(rel); + Assert(rel->rd_index == NULL); Assert(rel->rd_indextuple == NULL); Assert(rel->rd_indexcxt == NULL); diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 7e85510..962291e 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -100,41 +100,21 @@ extern Relation heap_openrv_extended(const RangeVar *relation, typedef struct HeapScanDescData *HeapScanDesc; typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc; +/* struct definitions appear in storageamapi.h */ +typedef void *StorageScanDesc; +typedef void *StorageTuple; + /* * HeapScanIsValid * True iff the heap scan is valid. */ #define HeapScanIsValid(scan) PointerIsValid(scan) -extern HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key); -extern HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, - ScanKey key); -extern HeapScanDesc heap_beginscan_strat(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - bool allow_strat, bool allow_sync); -extern HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key); -extern HeapScanDesc heap_beginscan_sampling(Relation relation, - Snapshot snapshot, int nkeys, ScanKey key, - bool allow_strat, bool allow_sync, bool allow_pagemode); -extern void heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, - BlockNumber endBlk); -extern void heapgetpage(HeapScanDesc scan, BlockNumber page); -extern void heap_rescan(HeapScanDesc scan, ScanKey key); -extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, - bool allow_strat, bool allow_sync, bool allow_pagemode); -extern void heap_endscan(HeapScanDesc scan); -extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); - extern Size heap_parallelscan_estimate(Snapshot snapshot); extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, Snapshot snapshot); extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); -extern bool heap_fetch(Relation relation, Snapshot snapshot, - HeapTuple tuple, Buffer *userbuf, bool keep_buf, - Relation stats_relation); extern bool heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer, Snapshot snapshot, HeapTuple heapTuple, bool *all_dead, bool first_call); @@ -153,19 +133,6 @@ extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, CommandId cid, int options, BulkInsertState bistate); -extern HTSU_Result heap_delete(Relation relation, ItemPointer tid, - CommandId cid, Snapshot crosscheck, bool wait, - HeapUpdateFailureData *hufd); -extern void heap_finish_speculative(Relation relation, HeapTuple tuple); -extern void heap_abort_speculative(Relation relation, HeapTuple tuple); -extern HTSU_Result heap_update(Relation relation, ItemPointer otid, - HeapTuple newtup, - CommandId cid, Snapshot crosscheck, bool wait, - HeapUpdateFailureData *hufd, LockTupleMode *lockmode); -extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, - CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, - bool follow_update, - Buffer *buffer, HeapUpdateFailureData *hufd); extern void heap_inplace_update(Relation relation, HeapTuple tuple); extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid, TransactionId cutoff_multi); @@ -179,7 +146,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup); extern void heap_sync(Relation relation); -extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot); + /* in heap/pruneheap.c */ extern void heap_page_prune_opt(Relation relation, Buffer buffer); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index e232006..f6b899f 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -549,6 +549,11 @@ DESCR("convert int4 to float4"); DATA(insert OID = 319 ( int4 PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 23 "700" _null_ _null_ _null_ _null_ _null_ ftoi4 _null_ _null_ _null_ )); DESCR("convert float4 to int4"); +/* Storage access method handlers */ +DATA(insert OID = 425 ( heapam_storage_handler PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 3998 "2281" _null_ _null_ _null_ _null_ _null_ heapam_storage_handler _null_ _null_ _null_ )); +DESCR("row-oriented storage access method handler"); +#define HEAPAM_STORAGE_AM_HANDLER_OID 425 + /* Index access method handlers */ DATA(insert OID = 330 ( bthandler PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 325 "2281" _null_ _null_ _null_ _null_ _null_ bthandler _null_ _null_ _null_ )); DESCR("btree index access method handler"); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 8476896..c187c65 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -161,6 +161,12 @@ typedef struct RelationData struct HeapTupleData *rd_indextuple; /* all of pg_index tuple */ /* + * Underlying storage support + */ + Oid rd_storageam; /* OID of storage AM handler function */ + struct StorageAmRoutine *rd_stamroutine; /* storage AM's API struct */ + + /* * index access support info (used only for an index relation) * * Note: only default support procs for each opclass are cached, namely @@ -429,6 +435,12 @@ typedef struct ViewOptions #define RelationGetDescr(relation) ((relation)->rd_att) /* + * RelationGetStorageRoutine + * Returns the storage AM routine for a relation. + */ +#define RelationGetStorageRoutine(relation) ((relation)->rd_stamroutine) + +/* * RelationGetRelationName * Returns the rel's name. * diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h index 81af3ae..807345a 100644 --- a/src/include/utils/relcache.h +++ b/src/include/utils/relcache.h @@ -71,6 +71,8 @@ extern void RelationInitIndexAccessInfo(Relation relation); struct PublicationActions; extern struct PublicationActions *GetRelationPublicationActions(Relation relation); +extern void RelationInitStorageAccessInfo(Relation relation); + /* * Routines to support ereport() reports of relation-related errors */ -- 2.7.4.windows.1