diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 9fd7b4e019b..b72f5363045 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -343,11 +343,68 @@ DecrTupleDescRefCount(TupleDesc tupdesc) } /* - * Compare two TupleDesc structures for logical equality + * Compare two TupleDesc attributes for logical equality * * Note: we deliberately do not check the attrelid and tdtypmod fields. * This allows typcache.c to use this routine to see if a cached record type * matches a requested type, and is harmless for relcache.c's uses. + */ +bool +equalTupleDescAttrs(Form_pg_attribute attr1, Form_pg_attribute attr2) +{ + /* + * We do not need to check every single field here: we can disregard + * attrelid and attnum (which were used to place the row in the attrs + * array in the first place). It might look like we could dispense + * with checking attlen/attbyval/attalign, since these are derived + * from atttypid; but in the case of dropped columns we must check + * them (since atttypid will be zero for all dropped columns) and in + * general it seems safer to check them always. + * + * attcacheoff must NOT be checked since it's possibly not set in both + * copies. + */ + if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0) + return false; + if (attr1->atttypid != attr2->atttypid) + return false; + if (attr1->attstattarget != attr2->attstattarget) + return false; + if (attr1->attlen != attr2->attlen) + return false; + if (attr1->attndims != attr2->attndims) + return false; + if (attr1->atttypmod != attr2->atttypmod) + return false; + if (attr1->attbyval != attr2->attbyval) + return false; + if (attr1->attstorage != attr2->attstorage) + return false; + if (attr1->attalign != attr2->attalign) + return false; + if (attr1->attnotnull != attr2->attnotnull) + return false; + if (attr1->atthasdef != attr2->atthasdef) + return false; + if (attr1->attidentity != attr2->attidentity) + return false; + if (attr1->attisdropped != attr2->attisdropped) + return false; + if (attr1->attislocal != attr2->attislocal) + return false; + if (attr1->attinhcount != attr2->attinhcount) + return false; + if (attr1->attcollation != attr2->attcollation) + return false; + /* attacl, attoptions and attfdwoptions are not even present... */ + + return true; +} + +/* + * Compare two TupleDesc structures for logical equality + * + * Note: see equalTupleDescAttrs for the note on fields that we don't compare. * We don't compare tdrefcount, either. */ bool @@ -369,51 +426,8 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) Form_pg_attribute attr1 = tupdesc1->attrs[i]; Form_pg_attribute attr2 = tupdesc2->attrs[i]; - /* - * We do not need to check every single field here: we can disregard - * attrelid and attnum (which were used to place the row in the attrs - * array in the first place). It might look like we could dispense - * with checking attlen/attbyval/attalign, since these are derived - * from atttypid; but in the case of dropped columns we must check - * them (since atttypid will be zero for all dropped columns) and in - * general it seems safer to check them always. - * - * attcacheoff must NOT be checked since it's possibly not set in both - * copies. - */ - if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0) - return false; - if (attr1->atttypid != attr2->atttypid) - return false; - if (attr1->attstattarget != attr2->attstattarget) - return false; - if (attr1->attlen != attr2->attlen) - return false; - if (attr1->attndims != attr2->attndims) - return false; - if (attr1->atttypmod != attr2->atttypmod) - return false; - if (attr1->attbyval != attr2->attbyval) - return false; - if (attr1->attstorage != attr2->attstorage) - return false; - if (attr1->attalign != attr2->attalign) - return false; - if (attr1->attnotnull != attr2->attnotnull) - return false; - if (attr1->atthasdef != attr2->atthasdef) - return false; - if (attr1->attidentity != attr2->attidentity) - return false; - if (attr1->attisdropped != attr2->attisdropped) - return false; - if (attr1->attislocal != attr2->attislocal) - return false; - if (attr1->attinhcount != attr2->attinhcount) - return false; - if (attr1->attcollation != attr2->attcollation) + if (!equalTupleDescAttrs(attr1, attr2)) return false; - /* attacl, attoptions and attfdwoptions are not even present... */ } if (tupdesc1->constr != NULL) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 2dad3e8a655..54acd989101 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -36,6 +36,7 @@ #include "utils/memutils.h" #include "utils/resowner.h" #include "utils/snapmgr.h" +#include "utils/typcache.h" /* @@ -47,6 +48,14 @@ */ #define PARALLEL_ERROR_QUEUE_SIZE 16384 +/* + * We want to create a DSA area to store shared state that has the same extent + * as a parallel context, to hold the record type registry. We don't want it + * to have to create any DSM segments just yet in common cases, so we'll give + * it enough space to hold an empty SharedRecordTypmodRegistry. + */ +#define PARALLEL_CONTEXT_DSA_SIZE 0x30000 + /* Magic number for parallel context TOC. */ #define PARALLEL_MAGIC 0x50477c7c @@ -63,6 +72,9 @@ #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF000A) +#define PARALLEL_KEY_CONTEXT_DSA UINT64CONST(0xFFFFFFFFFFFF000B) +#define PARALLEL_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF000C) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -191,6 +203,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size library_len = 0; Size guc_len = 0; Size combocidlen = 0; + Size typmod_registry_size = 0; Size tsnaplen = 0; Size asnaplen = 0; Size tstatelen = 0; @@ -226,8 +239,11 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + shm_toc_estimate_chunk(&pcxt->estimator, PARALLEL_CONTEXT_DSA_SIZE); + typmod_registry_size = SharedRecordTypmodRegistryEstimate(); + shm_toc_estimate_chunk(&pcxt->estimator, typmod_registry_size); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 6); + shm_toc_estimate_keys(&pcxt->estimator, 8); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -295,6 +311,8 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *error_queue_space; + char *typemod_registry_space; + char *context_dsa_space; char *entrypointstate; Size lnamelen; @@ -313,6 +331,27 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeComboCIDState(combocidlen, combocidspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); + /* + * Make a DSA area for dynamically-sized shared state that has the + * same scope as this ParallelContext. + */ + context_dsa_space = shm_toc_allocate(pcxt->toc, + PARALLEL_CONTEXT_DSA_SIZE); + pcxt->context_dsa = dsa_create_in_place(context_dsa_space, + PARALLEL_CONTEXT_DSA_SIZE, + LWTRANCHE_PARALLEL_CONTEXT_DSA, + pcxt->seg); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_CONTEXT_DSA, context_dsa_space); + + /* Set up shared record type registry. */ + typemod_registry_space = shm_toc_allocate(pcxt->toc, + typmod_registry_size); + SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *) + typemod_registry_space, + pcxt->context_dsa, pcxt->seg); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY, + typemod_registry_space); + /* Serialize transaction snapshot and active snapshot. */ tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); SerializeSnapshot(transaction_snapshot, tsnapspace); @@ -618,6 +657,13 @@ DestroyParallelContext(ParallelContext *pcxt) } } + /* Detach from the context-scope DSA area, if there is one. */ + if (pcxt->context_dsa != NULL) + { + dsa_detach(pcxt->context_dsa); + pcxt->context_dsa = NULL; + } + /* * If we have allocated a shared memory segment, detach it. This will * implicitly detach the error queues, and any other shared memory queues, @@ -938,6 +984,9 @@ ParallelWorkerMain(Datum main_arg) char *asnapspace; char *tstatespace; StringInfoData msgbuf; + char *typmod_registry_space; + char *context_dsa_space; + dsa_area *context_dsa; /* Set flag to indicate that we're initializing a parallel worker. */ InitializingParallelWorker = true; @@ -1069,6 +1118,20 @@ ParallelWorkerMain(Datum main_arg) Assert(combocidspace != NULL); RestoreComboCIDState(combocidspace); + /* Attach to the DSA area. */ + context_dsa_space = shm_toc_lookup(toc, PARALLEL_KEY_CONTEXT_DSA); + Assert(context_dsa_space != NULL); + context_dsa = dsa_attach_in_place(context_dsa_space, seg); + + /* Attach to shared record type registry. */ + typmod_registry_space = + shm_toc_lookup(toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY); + Assert(typmod_registry_space != NULL); + SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *) + typmod_registry_space, + context_dsa, + seg); + /* Restore transaction snapshot. */ tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); Assert(tsnapspace != NULL); @@ -1114,6 +1177,9 @@ ParallelWorkerMain(Datum main_arg) /* Must pop active snapshot so resowner.c doesn't complain. */ PopActiveSnapshot(); + /* Detach from context-scoped DSA area. */ + dsa_detach(context_dsa); + /* Shut down the parallel-worker transaction. */ EndParallelWorkerTransaction(); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 35536e47894..0d7996b5205 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -494,7 +494,7 @@ RegisterLWLockTranches(void) if (LWLockTrancheArray == NULL) { - LWLockTranchesAllocated = 64; + LWLockTranchesAllocated = 128; LWLockTrancheArray = (char **) MemoryContextAllocZero(TopMemoryContext, LWLockTranchesAllocated * sizeof(char *)); @@ -510,7 +510,13 @@ RegisterLWLockTranches(void) "predicate_lock_manager"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA, "parallel_query_dsa"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_CONTEXT_DSA, + "parallel_context_dsa"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); + LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_ATTS_INDEX, + "shared_record_atts_index"); + LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX, + "shared_record_typmods_index"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 0cf5001a758..f26b0a7a58f 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -55,7 +55,9 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "executor/executor.h" +#include "lib/dht.h" #include "optimizer/planner.h" +#include "storage/lwlock.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" @@ -67,7 +69,6 @@ #include "utils/syscache.h" #include "utils/typcache.h" - /* The main type cache hashtable searched by lookup_type_cache */ static HTAB *TypeCacheHash = NULL; @@ -148,12 +149,95 @@ typedef struct RecordCacheEntry List *tupdescs; } RecordCacheEntry; +/* + * A mechanism for sharing record typmods between backends. + */ +struct SharedRecordTypmodRegistry +{ + dht_hash_table_handle atts_index_handle; + dht_hash_table_handle typmod_index_handle; + pg_atomic_uint32 next_typmod; +}; + +/* + * A flattened/serialized representation of a TupleDesc for use in shared + * memory. Can be converted to and from regular TupleDesc format. Doesn't + * support constraints and doesn't store the actual type OID, because this is + * only for use with RECORD types as created by CreateTupleDesc(). These are + * arranged into a linked list, in the hash table entry corresponding to the + * OIDs of the first 16 attributes, so we'd expect to get more than one entry + * in the list when named and other properties differ. + */ +typedef struct SerializedTupleDesc +{ + dsa_pointer next; /* next with the same same attribute OIDs */ + int natts; /* number of attributes in the tuple */ + int32 typmod; /* typmod for tuple type */ + bool hasoid; /* tuple has oid attribute in its header */ + + /* + * The attributes follow. We only ever access the first + * ATTRIBUTE_FIXED_PART_SIZE bytes of each element, like the code in + * tupdesc.c. + */ + FormData_pg_attribute attributes[FLEXIBLE_ARRAY_MEMBER]; +} SerializedTupleDesc; + +/* + * An entry in SharedRecordTypmodRegistry's attribute index. The key is the + * first REC_HASH_KEYS attribute OIDs. That means that collisions are + * possible, but that's OK because SerializedTupleDesc objects are arranged + * into a list. + */ +typedef struct SRTRAttsIndexEntry +{ + Oid leading_attr_oids[REC_HASH_KEYS]; + dsa_pointer serialized_tupdesc; +} SRTRAttsIndexEntry; + +/* + * An entry in SharedRecordTypmodRegistry's typmod index. Points to a single + * SerializedTupleDesc in shared memory. + */ +typedef struct SRTRTypmodIndexEntry +{ + uint32 typmod; + dsa_pointer serialized_tupdesc; +} SRTRTypmodIndexEntry; + +/* Parameters for SharedRecordTypmodRegistry's attributes hash table. */ +const static dht_parameters srtr_atts_index_params = { + sizeof(Oid) * REC_HASH_KEYS, + sizeof(SRTRAttsIndexEntry), + memcmp, + tag_hash, + LWTRANCHE_SHARED_RECORD_ATTS_INDEX +}; + +/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */ +const static dht_parameters srtr_typmod_index_params = { + sizeof(uint32), + sizeof(SRTRTypmodIndexEntry), + memcmp, + tag_hash, + LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX +}; + static HTAB *RecordCacheHash = NULL; static TupleDesc *RecordCacheArray = NULL; static int32 RecordCacheArrayLen = 0; /* allocated length of array */ static int32 NextRecordTypmod = 0; /* number of entries used */ +/* Current SharedRecordTypmodRegistry, if attached. */ +static struct +{ + SharedRecordTypmodRegistry *shared; + dht_hash_table *atts_index; + dht_hash_table *typmod_index; + dsa_area *area; +} CurrentSharedRecordTypmodRegistry; + static void load_typcache_tupdesc(TypeCacheEntry *typentry); static void load_rangetype_info(TypeCacheEntry *typentry); static void load_domaintype_info(TypeCacheEntry *typentry); @@ -174,6 +258,13 @@ static void TypeCacheConstrCallback(Datum arg, int cacheid, uint32 hashvalue); static void load_enum_cache_data(TypeCacheEntry *tcache); static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg); static int enum_oid_cmp(const void *left, const void *right); +static void shared_record_typmod_registry_detach(dsm_segment *segment, + Datum datum); +static int32 find_or_allocate_shared_record_typmod(TupleDesc tupdesc); +static TupleDesc deserialize_tupledesc(const SerializedTupleDesc *serialized); +static dsa_pointer serialize_tupledesc(dsa_area *area, + const TupleDesc tupdesc); + /* @@ -1199,6 +1290,33 @@ cache_record_field_properties(TypeCacheEntry *typentry) typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES; } +/* + * Make sure that RecordCacheArray is large enough to store 'typmod'. + */ +static void +ensure_record_cache_typmod_slot_exists(int32 typmod) +{ + if (RecordCacheArray == NULL) + { + RecordCacheArray = (TupleDesc *) + MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc)); + RecordCacheArrayLen = 64; + } + + if (typmod >= RecordCacheArrayLen) + { + int32 newlen = RecordCacheArrayLen * 2; + + while (typmod >= newlen) + newlen *= 2; + + RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, + newlen * sizeof(TupleDesc)); + memset(RecordCacheArray + RecordCacheArrayLen, 0, + (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *)); + RecordCacheArrayLen = newlen; + } +} /* * lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype @@ -1229,15 +1347,49 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError) /* * It's a transient record type, so look in our record-type table. */ - if (typmod < 0 || typmod >= NextRecordTypmod) + if (typmod >= 0) { - if (!noError) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("record type has not been registered"))); - return NULL; + /* It is already in our local cache? */ + if (typmod < RecordCacheArrayLen && + RecordCacheArray[typmod] != NULL) + return RecordCacheArray[typmod]; + + /* Are we attached to a SharedRecordTypmodRegistry? */ + if (CurrentSharedRecordTypmodRegistry.shared != NULL) + { + SRTRTypmodIndexEntry *entry; + + /* Try to find it in the shared typmod index. */ + entry = dht_find(CurrentSharedRecordTypmodRegistry.typmod_index, + &typmod, false); + if (entry != NULL) + { + SerializedTupleDesc *serialized; + + serialized = (SerializedTupleDesc *) + dsa_get_address(CurrentSharedRecordTypmodRegistry.area, + entry->serialized_tupdesc); + Assert(typmod == serialized->typmod); + + /* We may need to extend the local RecordCacheArray. */ + ensure_record_cache_typmod_slot_exists(typmod); + + /* Produce and cache a TupleDesc. */ + RecordCacheArray[typmod] = + deserialize_tupledesc(serialized); + dht_release(CurrentSharedRecordTypmodRegistry.typmod_index, + entry); + + return RecordCacheArray[typmod]; + } + } } - return RecordCacheArray[typmod]; + + if (!noError) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("record type has not been registered"))); + return NULL; } } @@ -1362,30 +1514,27 @@ assign_record_type_typmod(TupleDesc tupDesc) } } + /* Look in the SharedRecordTypmodRegistry, if attached */ + newtypmod = find_or_allocate_shared_record_typmod(tupDesc); + /* Not present, so need to manufacture an entry */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - if (RecordCacheArray == NULL) - { - RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc)); - RecordCacheArrayLen = 64; - } - else if (NextRecordTypmod >= RecordCacheArrayLen) - { - int32 newlen = RecordCacheArrayLen * 2; - - RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, - newlen * sizeof(TupleDesc)); - RecordCacheArrayLen = newlen; - } + /* + * Whether we just got a new typmod from a SharedRecordTypmodRegistry or + * we're allocating one locally, make sure the RecordCacheArray is big + * enough. + */ + ensure_record_cache_typmod_slot_exists(Max(NextRecordTypmod, newtypmod)); /* if fail in subrs, no damage except possibly some wasted memory... */ entDesc = CreateTupleDescCopy(tupDesc); recentry->tupdescs = lcons(entDesc, recentry->tupdescs); /* mark it as a reference-counted tupdesc */ entDesc->tdrefcount = 1; - /* now it's safe to advance NextRecordTypmod */ - newtypmod = NextRecordTypmod++; + /* now it's safe to advance NextRecordTypmod, if allocating locally */ + if (newtypmod == -1) + newtypmod = NextRecordTypmod++; entDesc->tdtypmod = newtypmod; RecordCacheArray[newtypmod] = entDesc; @@ -1396,6 +1545,176 @@ assign_record_type_typmod(TupleDesc tupDesc) } /* + * Return the amout of shmem required to hold a SharedRecordTypmodRegistry. + * This exists only to avoid exposing private innards of + * SharedRecordTypmodRegistry in a header. + */ +size_t +SharedRecordTypmodRegistryEstimate(void) +{ + return sizeof(SharedRecordTypmodRegistry); +} + +/* + * Initialize 'registry' in a pre-existing shared memory region, which must be + * maximally aligned and have space for SharedRecordTypmodRegistryEstimate() + * bytes. + * + * 'area' will be used to allocate shared memory space as required for the + * typemod registration. The current process, expected to be a leader process + * in a parallel query, will be attached automatically and its current record + * types will be loaded into the *registry. While attached, all calls to + * assign_record_type_typmod will use the shared registry. Other backends + * will need to attach explicitly. + * + * An on-detach callback will be installed for 'segment', so that normal + * private record type cache behavior can be restored when the DSM segment + * goes away. + */ +void +SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry, + dsa_area *area, + dsm_segment *segment) +{ + dht_hash_table *atts_index; + dht_hash_table *typmod_index; + int32 typmod; + + /* We can't already be attached to a shared registry. */ + Assert(CurrentSharedRecordTypmodRegistry.shared == NULL); + Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL); + Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL); + Assert(CurrentSharedRecordTypmodRegistry.area == NULL); + + /* Create the hash table indexed by attribute OIDs. */ + atts_index = dht_create(area, &srtr_atts_index_params); + registry->atts_index_handle = dht_get_hash_table_handle(atts_index); + + /* Create the hash table indexed by typmod. */ + typmod_index = dht_create(area, &srtr_typmod_index_params); + registry->typmod_index_handle = dht_get_hash_table_handle(typmod_index); + + /* Initialize the 'next typmode' to this backend's next value. */ + pg_atomic_init_u32(®istry->next_typmod, NextRecordTypmod); + + /* + * Copy all entries from this backend's private registry into the shared + * registry. + */ + for (typmod = 0; typmod < NextRecordTypmod; ++typmod) + { + SRTRTypmodIndexEntry *typmod_index_entry; + SRTRAttsIndexEntry *atts_index_entry; + SerializedTupleDesc *serialized; + dsa_pointer serialized_dp; + TupleDesc tupdesc; + Oid atts_key[REC_HASH_KEYS]; + bool found; + int i; + + tupdesc = RecordCacheArray[typmod]; + if (tupdesc == NULL) + continue; + + /* Serialize the TupleDesc into shared memory. */ + serialized_dp = serialize_tupledesc(area, tupdesc); + + /* Insert into the typmod index. */ + typmod_index_entry = dht_find_or_insert(typmod_index, + &tupdesc->tdtypmod, + &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + typmod_index_entry->typmod = tupdesc->tdtypmod; + typmod_index_entry->serialized_tupdesc = serialized_dp; + dht_release(typmod_index, typmod_index_entry); + + /* Insert into the attributes index. */ + memset(atts_key, 0, sizeof(atts_key)); + for (i = 0; i < Min(tupdesc->natts, REC_HASH_KEYS); ++i) + atts_key[i] = tupdesc->attrs[i]->atttypid; + atts_index_entry = dht_find_or_insert(atts_index, &atts_key, &found); + if (!found) + { + memcpy(atts_index_entry->leading_attr_oids, + atts_key, + sizeof(atts_key)); + atts_index_entry->serialized_tupdesc = InvalidDsaPointer; + } + + /* Push onto list. */ + serialized = (SerializedTupleDesc *) + dsa_get_address(area, serialized_dp); + serialized->next = atts_index_entry->serialized_tupdesc; + atts_index_entry->serialized_tupdesc = serialized_dp; + dht_release(atts_index, atts_index_entry); + } + + /* Set up our detach hook so that we can return to private cache mode. */ + on_dsm_detach(segment, shared_record_typmod_registry_detach, + PointerGetDatum(registry)); + + /* + * Set up the global state that will tell assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal about the shared registry. + */ + CurrentSharedRecordTypmodRegistry.shared = registry; + CurrentSharedRecordTypmodRegistry.atts_index = atts_index; + CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index; + CurrentSharedRecordTypmodRegistry.area = area; +} + +/* + * Attach to 'registry', which must have been initialized already by another + * backend. Future calls to assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal will use the shared registry, until + * 'segment' is detached. + */ +void +SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry, + dsa_area *area, + dsm_segment *segment) +{ + dht_hash_table *atts_index; + dht_hash_table *typmod_index; + + /* We can't already be attached to a shared registry. */ + Assert(CurrentSharedRecordTypmodRegistry.shared == NULL); + Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL); + Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL); + Assert(CurrentSharedRecordTypmodRegistry.area == NULL); + + /* + * We can't already have typmods in our local cache, because they'd clash + * with those imported by SharedRecordTypmodRegistryInit. This should be a + * freshly started parallel worker. If we ever support worker recycling, + * a worker would need to zap its local cache in between servicing + * different queries, in order to be able to call this and synchronize + * typmods with a new leader. + */ + Assert(NextRecordTypmod == 0); + + /* Attach to the two hash tables. */ + atts_index = dht_attach(area, &srtr_atts_index_params, + registry->atts_index_handle); + typmod_index = dht_attach(area, &srtr_typmod_index_params, + registry->typmod_index_handle); + + /* Set up our detach hook so that we can return to private cache mode. */ + on_dsm_detach(segment, shared_record_typmod_registry_detach, + PointerGetDatum(registry)); + + /* + * Set up the global state that will tell assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal about the shared registry. + */ + CurrentSharedRecordTypmodRegistry.shared = registry; + CurrentSharedRecordTypmodRegistry.atts_index = atts_index; + CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index; + CurrentSharedRecordTypmodRegistry.area = area; +} + +/* * TypeCacheRelCallback * Relcache inval callback function * @@ -1809,3 +2128,225 @@ enum_oid_cmp(const void *left, const void *right) else return 0; } + +/* + * Serialize a TupleDesc into a SerializedTupleDesc in DSA area 'area', and + * return a dsa_pointer. + */ +static dsa_pointer +serialize_tupledesc(dsa_area *area, const TupleDesc tupdesc) +{ + SerializedTupleDesc *serialized; + dsa_pointer serialized_dp; + size_t size; + int i; + + size = offsetof(SerializedTupleDesc, attributes) + + sizeof(FormData_pg_attribute) * tupdesc->natts; + serialized_dp = dsa_allocate(area, size); + serialized = (SerializedTupleDesc *) dsa_get_address(area, serialized_dp); + + serialized->natts = tupdesc->natts; + serialized->typmod = tupdesc->tdtypmod; + serialized->hasoid = tupdesc->tdhasoid; + for (i = 0; i < tupdesc->natts; ++i) + memcpy(&serialized->attributes[i], tupdesc->attrs[i], + ATTRIBUTE_FIXED_PART_SIZE); + + return serialized_dp; +} + +/* + * Deserialize a SerializedTupleDesc to produce a TupleDesc. The result is + * allocated in CacheMemoryContext and has a refcount of 1. + */ +static TupleDesc +deserialize_tupledesc(const SerializedTupleDesc *serialized) +{ + Form_pg_attribute *attributes; + MemoryContext oldctxt; + TupleDesc tupdesc; + int i; + + /* + * We have an array of FormData_pg_attribute but we need an array of + * pointers to FormData_pg_attribute. + */ + oldctxt = MemoryContextSwitchTo(CacheMemoryContext); + attributes = palloc(sizeof(Form_pg_attribute) * serialized->natts); + for (i = 0; i < serialized->natts; ++i) + { + attributes[i] = palloc(ATTRIBUTE_FIXED_PART_SIZE); + memcpy(attributes[i], &serialized->attributes[i], + ATTRIBUTE_FIXED_PART_SIZE); + } + tupdesc = + CreateTupleDesc(serialized->natts, serialized->hasoid, attributes); + tupdesc->tdtypmod = serialized->typmod; + tupdesc->tdrefcount = 1; + MemoryContextSwitchTo(oldctxt); + + return tupdesc; +} + +/* + * We can't use equalTupleDescs to compare a SerializedTupleDesc with a + * TupleDesc, but we don't want to allocate memory just to compare. This + * function produces the same result without deserializing first. + */ +static bool +serialized_tupledesc_matches(SerializedTupleDesc *serialized, + TupleDesc tupdesc) +{ + int i; + + if (serialized->natts != tupdesc->natts || + serialized->hasoid != tupdesc->tdhasoid || + tupdesc->constr != NULL) + return false; + + for (i = 0; i < serialized->natts; ++i) + { + if (!equalTupleDescAttrs(&serialized->attributes[i], + tupdesc->attrs[i])) + return false; + } + + return true; +} + +/* + * If we are attached to a SharedRecordTypmodRegistry, find or create a + * SerializedTupleDesc that matches 'tupdesc', and return its typmod. + * Otherwise return -1. + */ +static int32 +find_or_allocate_shared_record_typmod(TupleDesc tupdesc) +{ + SRTRAttsIndexEntry *atts_index_entry; + SRTRTypmodIndexEntry *typmod_index_entry; + SerializedTupleDesc *serialized; + dsa_pointer serialized_dp; + Oid hashkey[REC_HASH_KEYS]; + bool found; + int32 typmod; + int i; + + /* If not even attached, nothing to do. */ + if (CurrentSharedRecordTypmodRegistry.shared == NULL) + return -1; + + /* Try to find a match. */ + memset(hashkey, 0, sizeof(hashkey)); + for (i = 0; i < tupdesc->natts; ++i) + hashkey[i] = tupdesc->attrs[i]->atttypid; + atts_index_entry = (SRTRAttsIndexEntry *) + dht_find_or_insert(CurrentSharedRecordTypmodRegistry.atts_index, + hashkey, + &found); + if (!found) + { + /* Making a new entry. */ + memcpy(atts_index_entry->leading_attr_oids, + hashkey, + sizeof(hashkey)); + atts_index_entry->serialized_tupdesc = InvalidDsaPointer; + } + + /* Scan the list we found for a matching serialized one. */ + serialized_dp = atts_index_entry->serialized_tupdesc; + while (DsaPointerIsValid(serialized_dp)) + { + serialized = + dsa_get_address(CurrentSharedRecordTypmodRegistry.area, + serialized_dp); + if (serialized_tupledesc_matches(serialized, tupdesc)) + { + /* Found a match, we are finished. */ + typmod = serialized->typmod; + dht_release(CurrentSharedRecordTypmodRegistry.atts_index, + atts_index_entry); + return typmod; + } + serialized_dp = serialized->next; + } + + /* We didn't find a matching entry, so let's allocate a new one. */ + typmod = (int) + pg_atomic_fetch_add_u32(&CurrentSharedRecordTypmodRegistry.shared->next_typmod, + 1); + + /* Allocate shared memory and serialize the TupleDesc. */ + serialized_dp = serialize_tupledesc(CurrentSharedRecordTypmodRegistry.area, + tupdesc); + serialized = (SerializedTupleDesc *) + dsa_get_address(CurrentSharedRecordTypmodRegistry.area, serialized_dp); + serialized->typmod = typmod; + + /* + * While we still hold the atts_index entry locked, add this to + * typmod_index. That's important because we don't want anyone to be able + * to find a typmod via the former that can't yet be looked up in the + * latter. + */ + typmod_index_entry = + dht_find_or_insert(CurrentSharedRecordTypmodRegistry.typmod_index, + &typmod, &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + typmod_index_entry->typmod = typmod; + typmod_index_entry->serialized_tupdesc = serialized_dp; + dht_release(CurrentSharedRecordTypmodRegistry.typmod_index, + typmod_index_entry); + + /* Push onto the front of list in atts_index_entry. */ + serialized->next = atts_index_entry->serialized_tupdesc; + atts_index_entry->serialized_tupdesc = serialized_dp; + + dht_release(CurrentSharedRecordTypmodRegistry.atts_index, + atts_index_entry); + + return typmod; +} + +/* + * DSM segment detach hook used to disconnect this backend's record typmod + * cache from the shared registry. Detaching from the + * SharedRecordTypmodRegistry returns this backend to local typmod cache mode + * until such time as another parallel query runs. + */ +static void +shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum) +{ + SharedRecordTypmodRegistry *shared; + + shared = (SharedRecordTypmodRegistry *) DatumGetPointer(datum); + + /* + * XXX Should we now copy all entries from shared memory into the + * backend's local cache? That depends on whether you think that there is + * any chance this backend could see any shared tuples created by other + * backends after this detach operation. If tuples somehow survived from + * query to query, that would be true. But presently I don't think they + * do, if we assume that all mechanisms that allow us to receive tuples + * from other backends are linked to DSM segment mapping lifetime (tuple + * queues, shared hash tables, shared temporary files). + * + * The only thing we need to synchronize to return to local-typmod-cache + * mode is NextRecordTypmod. That means that we can resume generating new + * backend-local entries that don't clash. + */ + NextRecordTypmod = pg_atomic_read_u32(&shared->next_typmod); + + /* + * We don't free the SharedRecordTypmodRegistry's DSM memory, though we + * could using a reference counting scheme if we wanted to. There doesn't + * seem to be any point because the whole DSA area will be going away + * automatically when the DSM segment containing it is destroyed, + * conceptually like a MemoryContext. + */ + CurrentSharedRecordTypmodRegistry.shared = NULL; + CurrentSharedRecordTypmodRegistry.atts_index = NULL; + CurrentSharedRecordTypmodRegistry.typmod_index = NULL; + CurrentSharedRecordTypmodRegistry.area = NULL; +} diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 590e27a4845..b5781c28693 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -19,6 +19,7 @@ #include "postmaster/bgworker.h" #include "storage/shm_mq.h" #include "storage/shm_toc.h" +#include "utils/dsa.h" typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc); @@ -40,6 +41,7 @@ typedef struct ParallelContext ErrorContextCallback *error_context_stack; shm_toc_estimator estimator; dsm_segment *seg; + dsa_area *context_dsa; void *private_memory; shm_toc *toc; ParallelWorkerInfo *worker; diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h index b48f839028b..97a73b1483e 100644 --- a/src/include/access/tupdesc.h +++ b/src/include/access/tupdesc.h @@ -110,6 +110,9 @@ extern void DecrTupleDescRefCount(TupleDesc tupdesc); DecrTupleDescRefCount(tupdesc); \ } while (0) +extern bool equalTupleDescAttrs(Form_pg_attribute attr1, + Form_pg_attribute attr2); + extern bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2); extern void TupleDescInitEntry(TupleDesc desc, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 0cd45bb6d8e..c0117f7bafd 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -212,7 +212,10 @@ typedef enum BuiltinTrancheIds LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, LWTRANCHE_PARALLEL_QUERY_DSA, + LWTRANCHE_PARALLEL_CONTEXT_DSA, LWTRANCHE_TBM, + LWTRANCHE_SHARED_RECORD_ATTS_INDEX, + LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 1bf94e2548d..861610a2835 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -18,6 +18,8 @@ #include "access/tupdesc.h" #include "fmgr.h" +#include "storage/dsm.h" +#include "utils/dsa.h" /* DomainConstraintCache is an opaque struct known only within typcache.c */ @@ -139,6 +141,7 @@ typedef struct DomainConstraintRef MemoryContextCallback callback; /* used to release refcount when done */ } DomainConstraintRef; +typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry; extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags); @@ -160,4 +163,12 @@ extern void assign_record_type_typmod(TupleDesc tupDesc); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern size_t SharedRecordTypmodRegistryEstimate(void); +extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, + dsa_area *area, + dsm_segment *seg); +extern void SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *, + dsa_area *area, + dsm_segment *seg); + #endif /* TYPCACHE_H */