POC: Sharing record typmods between backends
Hi hackers,
Tuples can have type RECORDOID and a typmod number that identifies a
"blessed" TupleDesc in a backend-private cache. To support the
sharing of such tuples through shared memory and temporary files, I
think we need a typmod registry in shared memory. Here's a
proof-of-concept patch for discussion. I'd be grateful for any
feedback and/or flames.
This is a problem I ran into in my parallel hash join project. Robert
pointed it out to me and told me to go read tqueue.c for details, and
my first reaction was: I'll code around this by teaching the planner
to avoid sharing tuples from paths that produce transient record types
based on tlist analysis[1]/messages/by-id/CAEepm=2+zf7L_-eZ5hPW5=US+utdo=9tMVD4wt7ZSM-uOoSxWg@mail.gmail.com. Aside from being a cop-out, that approach
doesn't work because the planner doesn't actually know what types the
executor might come up with since some amount of substitution for
structurally-similar records seems to be allowed[2]/messages/by-id/CA+TgmoZMH6mJyXX=YLSOvJ8jULFqGgXWZCr_rbkc1nJ+177VSQ@mail.gmail.com (though I'm not
sure I can explain that). So... we're gonna need a bigger boat.
The patch uses typcache.c's backend-private cache still, but if the
backend is currently "attached" to a shared registry then it functions
as a write though cache. There is no cache-invalidation problem
because registered typmods are never unregistered. parallel.c exports
the leader's existing record typmods into a shared registry, and
attaches to it in workers. A DSM detach hook returns backends to
private cache mode when parallelism ends.
Some thoughts:
* Maybe it would be better to have just one DSA area, rather than the
one controlled by execParallel.c (for executor nodes to use) and this
new one controlled by parallel.c (for the ParallelContext). Those
scopes are approximately the same at least in the parallel query case,
but...
* It would be nice for the SharedRecordTypeRegistry to be able to
survive longer than a single parallel query, perhaps in a per-session
DSM segment. Perhaps eventually we will want to consider a
query-scoped area, a transaction-scoped area and a session-scoped
area? I didn't investigate that for this POC.
* It seemed to be a reasonable goal to avoid allocating an extra DSM
segment for every parallel query, so the new DSA area is created
in-place. 192KB turns out to be enough to hold an empty
SharedRecordTypmodRegistry due to dsa.c's superblock allocation scheme
(that's two 64KB size class superblocks + some DSA control
information). It'll create a new DSM segment as soon as you start
using blessed records, and will do so for every parallel query you
start from then on with the same backend. Erm, maybe adding 192KB to
every parallel query DSM segment won't be popular...
* Perhaps simplehash + an LWLock would be better than dht, but I
haven't looked into that. Can it be convinced to work in DSA memory
and to grow on demand?
Here's one way to hit the new code path, so that record types blessed
in a worker are accessed from the leader:
CREATE TABLE foo AS SELECT generate_series(1, 10) AS x;
CREATE OR REPLACE FUNCTION make_record(n int)
RETURNS RECORD LANGUAGE plpgsql PARALLEL SAFE AS
$$
BEGIN
RETURN CASE n
WHEN 1 THEN ROW(1)
WHEN 2 THEN ROW(1, 2)
WHEN 3 THEN ROW(1, 2, 3)
WHEN 4 THEN ROW(1, 2, 3, 4)
ELSE ROW(1, 2, 3, 4, 5)
END;
END;
$$;
SET force_parallel_mode = 1;
SELECT make_record(x) FROM foo;
PATCH
1. Apply dht-v3.patch[3]/messages/by-id/CAEepm=3d8o8XdVwYT6O=bHKsKAM2pu2D6sV1S_=4d+jStVCE7w@mail.gmail.com.
2. Apply shared-record-typmod-registry-v1.patch.
3. Apply rip-out-tqueue-remapping-v1.patch.
[1]: /messages/by-id/CAEepm=2+zf7L_-eZ5hPW5=US+utdo=9tMVD4wt7ZSM-uOoSxWg@mail.gmail.com
[2]: /messages/by-id/CA+TgmoZMH6mJyXX=YLSOvJ8jULFqGgXWZCr_rbkc1nJ+177VSQ@mail.gmail.com
[3]: /messages/by-id/CAEepm=3d8o8XdVwYT6O=bHKsKAM2pu2D6sV1S_=4d+jStVCE7w@mail.gmail.com
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
rip-out-tqueue-remapping-v1.patchapplication/octet-stream; name=rip-out-tqueue-remapping-v1.patchDownload
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 8d7e711b3bc..d7bfc3deb21 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -3,25 +3,10 @@
* tqueue.c
* Use shm_mq to send & receive tuples between parallel backends
*
- * Most of the complexity in this module arises from transient RECORD types,
- * which all have type RECORDOID and are distinguished by typmod numbers
- * that are managed per-backend (see src/backend/utils/cache/typcache.c).
- * The sender's set of RECORD typmod assignments probably doesn't match the
- * receiver's. To deal with this, we make the sender send a description
- * of each transient RECORD type appearing in the data it sends. The
- * receiver finds or creates a matching type in its own typcache, and then
- * maps the sender's typmod for that type to its own typmod.
- *
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
- * under the hood, writes tuples from the executor to a shm_mq. If
- * necessary, it also writes control messages describing transient
- * record types used within the tuple.
+ * under the hood, writes tuples from the executor to a shm_mq.
*
- * A TupleQueueReader reads tuples, and control messages if any are sent,
- * from a shm_mq and returns the tuples. If transient record types are
- * in use, it registers those types locally based on the control messages
- * and rewrites the typmods sent by the remote side to the corresponding
- * local record typmods.
+ * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -35,186 +20,33 @@
#include "postgres.h"
#include "access/htup_details.h"
-#include "catalog/pg_type.h"
#include "executor/tqueue.h"
-#include "funcapi.h"
-#include "lib/stringinfo.h"
-#include "miscadmin.h"
-#include "utils/array.h"
-#include "utils/lsyscache.h"
-#include "utils/memutils.h"
-#include "utils/rangetypes.h"
-#include "utils/syscache.h"
-#include "utils/typcache.h"
-
-
-/*
- * The data transferred through the shm_mq is divided into messages.
- * One-byte messages are mode-switch messages, telling the receiver to switch
- * between "control" and "data" modes. (We always start up in "data" mode.)
- * Otherwise, when in "data" mode, each message is a tuple. When in "control"
- * mode, each message defines one transient-typmod-to-tupledesc mapping to
- * let us interpret future tuples. Both of those cases certainly require
- * more than one byte, so no confusion is possible.
- */
-#define TUPLE_QUEUE_MODE_CONTROL 'c' /* mode-switch message contents */
-#define TUPLE_QUEUE_MODE_DATA 'd'
-
-/*
- * Both the sender and receiver build trees of TupleRemapInfo nodes to help
- * them identify which (sub) fields of transmitted tuples are composite and
- * may thus need remap processing. We might need to look within arrays and
- * ranges, not only composites, to find composite sub-fields. A NULL
- * TupleRemapInfo pointer indicates that it is known that the described field
- * is not composite and has no composite substructure.
- *
- * Note that we currently have to look at each composite field at runtime,
- * even if we believe it's of a named composite type (i.e., not RECORD).
- * This is because we allow the actual value to be a compatible transient
- * RECORD type. That's grossly inefficient, and it would be good to get
- * rid of the requirement, but it's not clear what would need to change.
- *
- * Also, we allow the top-level tuple structure, as well as the actual
- * structure of composite subfields, to change from one tuple to the next
- * at runtime. This may well be entirely historical, but it's mostly free
- * to support given the previous requirement; and other places in the system
- * also permit this, so it's not entirely clear if we could drop it.
- */
-
-typedef enum
-{
- TQUEUE_REMAP_ARRAY, /* array */
- TQUEUE_REMAP_RANGE, /* range */
- TQUEUE_REMAP_RECORD /* composite type, named or transient */
-} TupleRemapClass;
-
-typedef struct TupleRemapInfo TupleRemapInfo;
-
-typedef struct ArrayRemapInfo
-{
- int16 typlen; /* array element type's storage properties */
- bool typbyval;
- char typalign;
- TupleRemapInfo *element_remap; /* array element type's remap info */
-} ArrayRemapInfo;
-
-typedef struct RangeRemapInfo
-{
- TypeCacheEntry *typcache; /* range type's typcache entry */
- TupleRemapInfo *bound_remap; /* range bound type's remap info */
-} RangeRemapInfo;
-
-typedef struct RecordRemapInfo
-{
- /* Original (remote) type ID info last seen for this composite field */
- Oid rectypid;
- int32 rectypmod;
- /* Local RECORD typmod, or -1 if unset; not used on sender side */
- int32 localtypmod;
- /* If no fields of the record require remapping, these are NULL: */
- TupleDesc tupledesc; /* copy of record's tupdesc */
- TupleRemapInfo **field_remap; /* each field's remap info */
-} RecordRemapInfo;
-
-struct TupleRemapInfo
-{
- TupleRemapClass remapclass;
- union
- {
- ArrayRemapInfo arr;
- RangeRemapInfo rng;
- RecordRemapInfo rec;
- } u;
-};
/*
* DestReceiver object's private contents
*
* queue and tupledesc are pointers to data supplied by DestReceiver's caller.
- * The recordhtab and remap info are owned by the DestReceiver and are kept
- * in mycontext. tmpcontext is a tuple-lifespan context to hold cruft
- * created while traversing each tuple to find record subfields.
*/
typedef struct TQueueDestReceiver
{
DestReceiver pub; /* public fields */
shm_mq_handle *queue; /* shm_mq to send to */
- MemoryContext mycontext; /* context containing TQueueDestReceiver */
- MemoryContext tmpcontext; /* per-tuple context, if needed */
- HTAB *recordhtab; /* table of transmitted typmods, if needed */
- char mode; /* current message mode */
TupleDesc tupledesc; /* current top-level tuple descriptor */
- TupleRemapInfo **field_remapinfo; /* current top-level remap info */
} TQueueDestReceiver;
/*
- * Hash table entries for mapping remote to local typmods.
- */
-typedef struct RecordTypmodMap
-{
- int32 remotetypmod; /* hash key (must be first!) */
- int32 localtypmod;
-} RecordTypmodMap;
-
-/*
* TupleQueueReader object's private contents
*
* queue and tupledesc are pointers to data supplied by reader's caller.
- * The typmodmap and remap info are owned by the TupleQueueReader and
- * are kept in mycontext.
*
* "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
*/
struct TupleQueueReader
{
shm_mq_handle *queue; /* shm_mq to receive from */
- MemoryContext mycontext; /* context containing TupleQueueReader */
- HTAB *typmodmap; /* RecordTypmodMap hashtable, if needed */
- char mode; /* current message mode */
TupleDesc tupledesc; /* current top-level tuple descriptor */
- TupleRemapInfo **field_remapinfo; /* current top-level remap info */
};
-/* Local function prototypes */
-static void TQExamine(TQueueDestReceiver *tqueue,
- TupleRemapInfo *remapinfo,
- Datum value);
-static void TQExamineArray(TQueueDestReceiver *tqueue,
- ArrayRemapInfo *remapinfo,
- Datum value);
-static void TQExamineRange(TQueueDestReceiver *tqueue,
- RangeRemapInfo *remapinfo,
- Datum value);
-static void TQExamineRecord(TQueueDestReceiver *tqueue,
- RecordRemapInfo *remapinfo,
- Datum value);
-static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod,
- TupleDesc tupledesc);
-static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
- Size nbytes, char *data);
-static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
- Size nbytes, HeapTupleHeader data);
-static HeapTuple TQRemapTuple(TupleQueueReader *reader,
- TupleDesc tupledesc,
- TupleRemapInfo **field_remapinfo,
- HeapTuple tuple);
-static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
- Datum value, bool *changed);
-static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
- Datum value, bool *changed);
-static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
- Datum value, bool *changed);
-static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
- Datum value, bool *changed);
-static TupleRemapInfo *BuildTupleRemapInfo(Oid typid, MemoryContext mycontext);
-static TupleRemapInfo *BuildArrayRemapInfo(Oid elemtypid,
- MemoryContext mycontext);
-static TupleRemapInfo *BuildRangeRemapInfo(Oid rngtypid,
- MemoryContext mycontext);
-static TupleRemapInfo **BuildFieldRemapInfo(TupleDesc tupledesc,
- MemoryContext mycontext);
-
-
/*
* Receive a tuple from a query, and send it to the designated shm_mq.
*
@@ -228,81 +60,8 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
HeapTuple tuple;
shm_mq_result result;
- /*
- * If first time through, compute remapping info for the top-level fields.
- * On later calls, if the tupledesc has changed, set up for the new
- * tupledesc. (This is a strange test both because the executor really
- * shouldn't change the tupledesc, and also because it would be unsafe if
- * the old tupledesc could be freed and a new one allocated at the same
- * address. But since some very old code in printtup.c uses a similar
- * approach, we adopt it here as well.)
- *
- * Here and elsewhere in this module, when replacing remapping info we
- * pfree the top-level object because that's easy, but we don't bother to
- * recursively free any substructure. This would lead to query-lifespan
- * memory leaks if the mapping info actually changed frequently, but since
- * we don't expect that to happen, it doesn't seem worth expending code to
- * prevent it.
- */
if (tqueue->tupledesc != tupledesc)
- {
- /* Is it worth trying to free substructure of the remap tree? */
- if (tqueue->field_remapinfo != NULL)
- pfree(tqueue->field_remapinfo);
- tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
- tqueue->mycontext);
tqueue->tupledesc = tupledesc;
- }
-
- /*
- * When, because of the types being transmitted, no record typmod mapping
- * can be needed, we can skip a good deal of work.
- */
- if (tqueue->field_remapinfo != NULL)
- {
- TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
- int i;
- MemoryContext oldcontext = NULL;
-
- /* Deform the tuple so we can examine fields, if not done already. */
- slot_getallattrs(slot);
-
- /* Iterate over each attribute and search it for transient typmods. */
- for (i = 0; i < tupledesc->natts; i++)
- {
- /* Ignore nulls and types that don't need special handling. */
- if (slot->tts_isnull[i] || remapinfo[i] == NULL)
- continue;
-
- /* Switch to temporary memory context to avoid leaking. */
- if (oldcontext == NULL)
- {
- if (tqueue->tmpcontext == NULL)
- tqueue->tmpcontext =
- AllocSetContextCreate(tqueue->mycontext,
- "tqueue sender temp context",
- ALLOCSET_DEFAULT_SIZES);
- oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
- }
-
- /* Examine the value. */
- TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
- }
-
- /* If we used the temp context, reset it and restore prior context. */
- if (oldcontext != NULL)
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextReset(tqueue->tmpcontext);
- }
-
- /* If we entered control mode, switch back to data mode. */
- if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
- {
- tqueue->mode = TUPLE_QUEUE_MODE_DATA;
- shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
- }
- }
/* Send the tuple itself. */
tuple = ExecMaterializeSlot(slot);
@@ -320,248 +79,6 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
}
/*
- * Examine the given datum and send any necessary control messages for
- * transient record types contained in it.
- *
- * remapinfo is previously-computed remapping info about the datum's type.
- *
- * This function just dispatches based on the remap class.
- */
-static void
-TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
-{
- /* This is recursive, so it could be driven to stack overflow. */
- check_stack_depth();
-
- switch (remapinfo->remapclass)
- {
- case TQUEUE_REMAP_ARRAY:
- TQExamineArray(tqueue, &remapinfo->u.arr, value);
- break;
- case TQUEUE_REMAP_RANGE:
- TQExamineRange(tqueue, &remapinfo->u.rng, value);
- break;
- case TQUEUE_REMAP_RECORD:
- TQExamineRecord(tqueue, &remapinfo->u.rec, value);
- break;
- }
-}
-
-/*
- * Examine a record datum and send any necessary control messages for
- * transient record types contained in it.
- */
-static void
-TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo,
- Datum value)
-{
- HeapTupleHeader tup;
- Oid typid;
- int32 typmod;
- TupleDesc tupledesc;
-
- /* Extract type OID and typmod from tuple. */
- tup = DatumGetHeapTupleHeader(value);
- typid = HeapTupleHeaderGetTypeId(tup);
- typmod = HeapTupleHeaderGetTypMod(tup);
-
- /*
- * If first time through, or if this isn't the same composite type as last
- * time, consider sending a control message, and then look up the
- * necessary information for examining the fields.
- */
- if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
- {
- /* Free any old data. */
- if (remapinfo->tupledesc != NULL)
- FreeTupleDesc(remapinfo->tupledesc);
- /* Is it worth trying to free substructure of the remap tree? */
- if (remapinfo->field_remap != NULL)
- pfree(remapinfo->field_remap);
-
- /* Look up tuple descriptor in typcache. */
- tupledesc = lookup_rowtype_tupdesc(typid, typmod);
-
- /*
- * If this is a transient record type, send the tupledesc in a control
- * message. (TQSendRecordInfo is smart enough to do this only once
- * per typmod.)
- */
- if (typid == RECORDOID)
- TQSendRecordInfo(tqueue, typmod, tupledesc);
-
- /* Figure out whether fields need recursive processing. */
- remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
- tqueue->mycontext);
- if (remapinfo->field_remap != NULL)
- {
- /*
- * We need to inspect the record contents, so save a copy of the
- * tupdesc. (We could possibly just reference the typcache's
- * copy, but then it's problematic when to release the refcount.)
- */
- MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
-
- remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- /* No fields of the record require remapping. */
- remapinfo->tupledesc = NULL;
- }
- remapinfo->rectypid = typid;
- remapinfo->rectypmod = typmod;
-
- /* Release reference count acquired by lookup_rowtype_tupdesc. */
- DecrTupleDescRefCount(tupledesc);
- }
-
- /*
- * If field remapping is required, deform the tuple and examine each
- * field.
- */
- if (remapinfo->field_remap != NULL)
- {
- Datum *values;
- bool *isnull;
- HeapTupleData tdata;
- int i;
-
- /* Deform the tuple so we can check each column within. */
- tupledesc = remapinfo->tupledesc;
- values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
- isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
- tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
- ItemPointerSetInvalid(&(tdata.t_self));
- tdata.t_tableOid = InvalidOid;
- tdata.t_data = tup;
- heap_deform_tuple(&tdata, tupledesc, values, isnull);
-
- /* Recursively check each interesting non-NULL attribute. */
- for (i = 0; i < tupledesc->natts; i++)
- {
- if (!isnull[i] && remapinfo->field_remap[i])
- TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
- }
-
- /* Need not clean up, since we're in a short-lived context. */
- }
-}
-
-/*
- * Examine an array datum and send any necessary control messages for
- * transient record types contained in it.
- */
-static void
-TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo,
- Datum value)
-{
- ArrayType *arr = DatumGetArrayTypeP(value);
- Oid typid = ARR_ELEMTYPE(arr);
- Datum *elem_values;
- bool *elem_nulls;
- int num_elems;
- int i;
-
- /* Deconstruct the array. */
- deconstruct_array(arr, typid, remapinfo->typlen,
- remapinfo->typbyval, remapinfo->typalign,
- &elem_values, &elem_nulls, &num_elems);
-
- /* Examine each element. */
- for (i = 0; i < num_elems; i++)
- {
- if (!elem_nulls[i])
- TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
- }
-}
-
-/*
- * Examine a range datum and send any necessary control messages for
- * transient record types contained in it.
- */
-static void
-TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo,
- Datum value)
-{
- RangeType *range = DatumGetRangeType(value);
- RangeBound lower;
- RangeBound upper;
- bool empty;
-
- /* Extract the lower and upper bounds. */
- range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
-
- /* Nothing to do for an empty range. */
- if (empty)
- return;
-
- /* Examine each bound, if present. */
- if (!upper.infinite)
- TQExamine(tqueue, remapinfo->bound_remap, upper.val);
- if (!lower.infinite)
- TQExamine(tqueue, remapinfo->bound_remap, lower.val);
-}
-
-/*
- * Send tuple descriptor information for a transient typmod, unless we've
- * already done so previously.
- */
-static void
-TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
-{
- StringInfoData buf;
- bool found;
- int i;
-
- /* Initialize hash table if not done yet. */
- if (tqueue->recordhtab == NULL)
- {
- HASHCTL ctl;
-
- MemSet(&ctl, 0, sizeof(ctl));
- /* Hash table entries are just typmods */
- ctl.keysize = sizeof(int32);
- ctl.entrysize = sizeof(int32);
- ctl.hcxt = tqueue->mycontext;
- tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
- 100, &ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- }
-
- /* Have we already seen this record type? If not, must report it. */
- hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
- if (found)
- return;
-
- elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
-
- /* If message queue is in data mode, switch to control mode. */
- if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
- {
- tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
- shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
- }
-
- /* Assemble a control message. */
- initStringInfo(&buf);
- appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
- appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
- appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
- for (i = 0; i < tupledesc->natts; i++)
- {
- appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
- sizeof(FormData_pg_attribute));
- }
-
- /* Send control message. */
- shm_mq_send(tqueue->queue, buf.len, buf.data, false);
-
- /* We assume it's OK to leak buf because we're in a short-lived context. */
-}
-
-/*
* Prepare to receive tuples from executor.
*/
static void
@@ -587,15 +104,6 @@ tqueueShutdownReceiver(DestReceiver *self)
static void
tqueueDestroyReceiver(DestReceiver *self)
{
- TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
-
- if (tqueue->tmpcontext != NULL)
- MemoryContextDelete(tqueue->tmpcontext);
- if (tqueue->recordhtab != NULL)
- hash_destroy(tqueue->recordhtab);
- /* Is it worth trying to free substructure of the remap tree? */
- if (tqueue->field_remapinfo != NULL)
- pfree(tqueue->field_remapinfo);
pfree(self);
}
@@ -615,13 +123,8 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue;
self->queue = handle;
- self->mycontext = CurrentMemoryContext;
- self->tmpcontext = NULL;
- self->recordhtab = NULL;
- self->mode = TUPLE_QUEUE_MODE_DATA;
/* Top-level tupledesc is not known yet */
self->tupledesc = NULL;
- self->field_remapinfo = NULL;
return (DestReceiver *) self;
}
@@ -635,11 +138,7 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
reader->queue = handle;
- reader->mycontext = CurrentMemoryContext;
- reader->typmodmap = NULL;
- reader->mode = TUPLE_QUEUE_MODE_DATA;
reader->tupledesc = tupledesc;
- reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
return reader;
}
@@ -651,11 +150,6 @@ void
DestroyTupleQueueReader(TupleQueueReader *reader)
{
shm_mq_detach(shm_mq_get_queue(reader->queue));
- if (reader->typmodmap != NULL)
- hash_destroy(reader->typmodmap);
- /* Is it worth trying to free substructure of the remap tree? */
- if (reader->field_remapinfo != NULL)
- pfree(reader->field_remapinfo);
pfree(reader);
}
@@ -667,9 +161,6 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
* is set to true when there are no remaining tuples and otherwise to false.
*
* The returned tuple, if any, is allocated in CurrentMemoryContext.
- * That should be a short-lived (tuple-lifespan) context, because we are
- * pretty cavalier about leaking memory in that context if we have to do
- * tuple remapping.
*
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
@@ -678,64 +169,29 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
HeapTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
+ HeapTupleData htup;
shm_mq_result result;
+ Size nbytes;
+ void *data;
if (done != NULL)
*done = false;
- for (;;)
- {
- Size nbytes;
- void *data;
-
- /* Attempt to read a message. */
- result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
-
- /* If queue is detached, set *done and return NULL. */
- if (result == SHM_MQ_DETACHED)
- {
- if (done != NULL)
- *done = true;
- return NULL;
- }
-
- /* In non-blocking mode, bail out if no message ready yet. */
- if (result == SHM_MQ_WOULD_BLOCK)
- return NULL;
- Assert(result == SHM_MQ_SUCCESS);
+ /* Attempt to read a message. */
+ result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
- /*
- * We got a message (see message spec at top of file). Process it.
- */
- if (nbytes == 1)
- {
- /* Mode switch message. */
- reader->mode = ((char *) data)[0];
- }
- else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
- {
- /* Tuple data. */
- return TupleQueueHandleDataMessage(reader, nbytes, data);
- }
- else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
- {
- /* Control message, describing a transient record type. */
- TupleQueueHandleControlMessage(reader, nbytes, data);
- }
- else
- elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
+ /* If queue is detached, set *done and return NULL. */
+ if (result == SHM_MQ_DETACHED)
+ {
+ if (done != NULL)
+ *done = true;
+ return NULL;
}
-}
-/*
- * Handle a data message - that is, a tuple - from the remote side.
- */
-static HeapTuple
-TupleQueueHandleDataMessage(TupleQueueReader *reader,
- Size nbytes,
- HeapTupleHeader data)
-{
- HeapTupleData htup;
+ /* In non-blocking mode, bail out if no message ready yet. */
+ if (result == SHM_MQ_WOULD_BLOCK)
+ return NULL;
+ Assert(result == SHM_MQ_SUCCESS);
/*
* Set up a dummy HeapTupleData pointing to the data from the shm_mq
@@ -746,531 +202,5 @@ TupleQueueHandleDataMessage(TupleQueueReader *reader,
htup.t_len = nbytes;
htup.t_data = data;
- /*
- * Either just copy the data into a regular palloc'd tuple, or remap it,
- * as required.
- */
- return TQRemapTuple(reader,
- reader->tupledesc,
- reader->field_remapinfo,
- &htup);
-}
-
-/*
- * Copy the given tuple, remapping any transient typmods contained in it.
- */
-static HeapTuple
-TQRemapTuple(TupleQueueReader *reader,
- TupleDesc tupledesc,
- TupleRemapInfo **field_remapinfo,
- HeapTuple tuple)
-{
- Datum *values;
- bool *isnull;
- bool changed = false;
- int i;
-
- /*
- * If no remapping is necessary, just copy the tuple into a single
- * palloc'd chunk, as caller will expect.
- */
- if (field_remapinfo == NULL)
- return heap_copytuple(tuple);
-
- /* Deform tuple so we can remap record typmods for individual attrs. */
- values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
- isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
- heap_deform_tuple(tuple, tupledesc, values, isnull);
-
- /* Recursively process each interesting non-NULL attribute. */
- for (i = 0; i < tupledesc->natts; i++)
- {
- if (isnull[i] || field_remapinfo[i] == NULL)
- continue;
- values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
- }
-
- /* Reconstruct the modified tuple, if anything was modified. */
- if (changed)
- return heap_form_tuple(tupledesc, values, isnull);
- else
- return heap_copytuple(tuple);
-}
-
-/*
- * Process the given datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- *
- * remapinfo is previously-computed remapping info about the datum's type.
- *
- * This function just dispatches based on the remap class.
- */
-static Datum
-TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- /* This is recursive, so it could be driven to stack overflow. */
- check_stack_depth();
-
- switch (remapinfo->remapclass)
- {
- case TQUEUE_REMAP_ARRAY:
- return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
-
- case TQUEUE_REMAP_RANGE:
- return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
-
- case TQUEUE_REMAP_RECORD:
- return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
- }
-
- elog(ERROR, "unrecognized tqueue remap class: %d",
- (int) remapinfo->remapclass);
- return (Datum) 0;
-}
-
-/*
- * Process the given array datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- */
-static Datum
-TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- ArrayType *arr = DatumGetArrayTypeP(value);
- Oid typid = ARR_ELEMTYPE(arr);
- bool element_changed = false;
- Datum *elem_values;
- bool *elem_nulls;
- int num_elems;
- int i;
-
- /* Deconstruct the array. */
- deconstruct_array(arr, typid, remapinfo->typlen,
- remapinfo->typbyval, remapinfo->typalign,
- &elem_values, &elem_nulls, &num_elems);
-
- /* Remap each element. */
- for (i = 0; i < num_elems; i++)
- {
- if (!elem_nulls[i])
- elem_values[i] = TQRemap(reader,
- remapinfo->element_remap,
- elem_values[i],
- &element_changed);
- }
-
- if (element_changed)
- {
- /* Reconstruct and return the array. */
- *changed = true;
- arr = construct_md_array(elem_values, elem_nulls,
- ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
- typid, remapinfo->typlen,
- remapinfo->typbyval, remapinfo->typalign);
- return PointerGetDatum(arr);
- }
-
- /* Else just return the value as-is. */
- return value;
-}
-
-/*
- * Process the given range datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- */
-static Datum
-TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- RangeType *range = DatumGetRangeType(value);
- bool bound_changed = false;
- RangeBound lower;
- RangeBound upper;
- bool empty;
-
- /* Extract the lower and upper bounds. */
- range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
-
- /* Nothing to do for an empty range. */
- if (empty)
- return value;
-
- /* Remap each bound, if present. */
- if (!upper.infinite)
- upper.val = TQRemap(reader, remapinfo->bound_remap,
- upper.val, &bound_changed);
- if (!lower.infinite)
- lower.val = TQRemap(reader, remapinfo->bound_remap,
- lower.val, &bound_changed);
-
- if (bound_changed)
- {
- /* Reserialize. */
- *changed = true;
- range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
- return RangeTypeGetDatum(range);
- }
-
- /* Else just return the value as-is. */
- return value;
-}
-
-/*
- * Process the given record datum and replace any transient record typmods
- * contained in it. Set *changed to TRUE if we actually changed the datum.
- */
-static Datum
-TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo,
- Datum value, bool *changed)
-{
- HeapTupleHeader tup;
- Oid typid;
- int32 typmod;
- bool changed_typmod;
- TupleDesc tupledesc;
-
- /* Extract type OID and typmod from tuple. */
- tup = DatumGetHeapTupleHeader(value);
- typid = HeapTupleHeaderGetTypeId(tup);
- typmod = HeapTupleHeaderGetTypMod(tup);
-
- /*
- * If first time through, or if this isn't the same composite type as last
- * time, identify the required typmod mapping, and then look up the
- * necessary information for processing the fields.
- */
- if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
- {
- /* Free any old data. */
- if (remapinfo->tupledesc != NULL)
- FreeTupleDesc(remapinfo->tupledesc);
- /* Is it worth trying to free substructure of the remap tree? */
- if (remapinfo->field_remap != NULL)
- pfree(remapinfo->field_remap);
-
- /* If transient record type, look up matching local typmod. */
- if (typid == RECORDOID)
- {
- RecordTypmodMap *mapent;
-
- Assert(reader->typmodmap != NULL);
- mapent = hash_search(reader->typmodmap, &typmod,
- HASH_FIND, NULL);
- if (mapent == NULL)
- elog(ERROR, "tqueue received unrecognized remote typmod %d",
- typmod);
- remapinfo->localtypmod = mapent->localtypmod;
- }
- else
- remapinfo->localtypmod = -1;
-
- /* Look up tuple descriptor in typcache. */
- tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
-
- /* Figure out whether fields need recursive processing. */
- remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
- reader->mycontext);
- if (remapinfo->field_remap != NULL)
- {
- /*
- * We need to inspect the record contents, so save a copy of the
- * tupdesc. (We could possibly just reference the typcache's
- * copy, but then it's problematic when to release the refcount.)
- */
- MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
-
- remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- /* No fields of the record require remapping. */
- remapinfo->tupledesc = NULL;
- }
- remapinfo->rectypid = typid;
- remapinfo->rectypmod = typmod;
-
- /* Release reference count acquired by lookup_rowtype_tupdesc. */
- DecrTupleDescRefCount(tupledesc);
- }
-
- /* If transient record, replace remote typmod with local typmod. */
- if (typid == RECORDOID && typmod != remapinfo->localtypmod)
- {
- typmod = remapinfo->localtypmod;
- changed_typmod = true;
- }
- else
- changed_typmod = false;
-
- /*
- * If we need to change the typmod, or if there are any potentially
- * remappable fields, replace the tuple.
- */
- if (changed_typmod || remapinfo->field_remap != NULL)
- {
- HeapTupleData htup;
- HeapTuple atup;
-
- /* For now, assume we always need to change the tuple in this case. */
- *changed = true;
-
- /* Copy tuple, possibly remapping contained fields. */
- ItemPointerSetInvalid(&htup.t_self);
- htup.t_tableOid = InvalidOid;
- htup.t_len = HeapTupleHeaderGetDatumLength(tup);
- htup.t_data = tup;
- atup = TQRemapTuple(reader,
- remapinfo->tupledesc,
- remapinfo->field_remap,
- &htup);
-
- /* Apply the correct labeling for a local Datum. */
- HeapTupleHeaderSetTypeId(atup->t_data, typid);
- HeapTupleHeaderSetTypMod(atup->t_data, typmod);
- HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
-
- /* And return the results. */
- return HeapTupleHeaderGetDatum(atup->t_data);
- }
-
- /* Else just return the value as-is. */
- return value;
-}
-
-/*
- * Handle a control message from the tuple queue reader.
- *
- * Control messages are sent when the remote side is sending tuples that
- * contain transient record types. We need to arrange to bless those
- * record types locally and translate between remote and local typmods.
- */
-static void
-TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
- char *data)
-{
- int32 remotetypmod;
- int natts;
- bool hasoid;
- Size offset = 0;
- Form_pg_attribute *attrs;
- TupleDesc tupledesc;
- RecordTypmodMap *mapent;
- bool found;
- int i;
-
- /* Extract remote typmod. */
- memcpy(&remotetypmod, &data[offset], sizeof(int32));
- offset += sizeof(int32);
-
- /* Extract attribute count. */
- memcpy(&natts, &data[offset], sizeof(int));
- offset += sizeof(int);
-
- /* Extract hasoid flag. */
- memcpy(&hasoid, &data[offset], sizeof(bool));
- offset += sizeof(bool);
-
- /* Extract attribute details. The tupledesc made here is just transient. */
- attrs = palloc(natts * sizeof(Form_pg_attribute));
- for (i = 0; i < natts; i++)
- {
- attrs[i] = palloc(sizeof(FormData_pg_attribute));
- memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
- offset += sizeof(FormData_pg_attribute);
- }
-
- /* We should have read the whole message. */
- Assert(offset == nbytes);
-
- /* Construct TupleDesc, and assign a local typmod. */
- tupledesc = CreateTupleDesc(natts, hasoid, attrs);
- tupledesc = BlessTupleDesc(tupledesc);
-
- /* Create mapping hashtable if it doesn't exist already. */
- if (reader->typmodmap == NULL)
- {
- HASHCTL ctl;
-
- MemSet(&ctl, 0, sizeof(ctl));
- ctl.keysize = sizeof(int32);
- ctl.entrysize = sizeof(RecordTypmodMap);
- ctl.hcxt = reader->mycontext;
- reader->typmodmap = hash_create("tqueue receiver record type hashtable",
- 100, &ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- }
-
- /* Create map entry. */
- mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
- &found);
- if (found)
- elog(ERROR, "duplicate tqueue control message for typmod %d",
- remotetypmod);
- mapent->localtypmod = tupledesc->tdtypmod;
-
- elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
- remotetypmod, mapent->localtypmod);
-}
-
-/*
- * Build remap info for the specified data type, storing it in mycontext.
- * Returns NULL if neither the type nor any subtype could require remapping.
- */
-static TupleRemapInfo *
-BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
-{
- HeapTuple tup;
- Form_pg_type typ;
-
- /* This is recursive, so it could be driven to stack overflow. */
- check_stack_depth();
-
-restart:
- tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
- if (!HeapTupleIsValid(tup))
- elog(ERROR, "cache lookup failed for type %u", typid);
- typ = (Form_pg_type) GETSTRUCT(tup);
-
- /* Look through domains to underlying base type. */
- if (typ->typtype == TYPTYPE_DOMAIN)
- {
- typid = typ->typbasetype;
- ReleaseSysCache(tup);
- goto restart;
- }
-
- /* If it's a true array type, deal with it that way. */
- if (OidIsValid(typ->typelem) && typ->typlen == -1)
- {
- typid = typ->typelem;
- ReleaseSysCache(tup);
- return BuildArrayRemapInfo(typid, mycontext);
- }
-
- /* Similarly, deal with ranges appropriately. */
- if (typ->typtype == TYPTYPE_RANGE)
- {
- ReleaseSysCache(tup);
- return BuildRangeRemapInfo(typid, mycontext);
- }
-
- /*
- * If it's a composite type (including RECORD), set up for remapping. We
- * don't attempt to determine the status of subfields here, since we do
- * not have enough information yet; just mark everything invalid.
- */
- if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
- {
- TupleRemapInfo *remapinfo;
-
- remapinfo = (TupleRemapInfo *)
- MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
- remapinfo->remapclass = TQUEUE_REMAP_RECORD;
- remapinfo->u.rec.rectypid = InvalidOid;
- remapinfo->u.rec.rectypmod = -1;
- remapinfo->u.rec.localtypmod = -1;
- remapinfo->u.rec.tupledesc = NULL;
- remapinfo->u.rec.field_remap = NULL;
- ReleaseSysCache(tup);
- return remapinfo;
- }
-
- /* Nothing else can possibly need remapping attention. */
- ReleaseSysCache(tup);
- return NULL;
-}
-
-static TupleRemapInfo *
-BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
-{
- TupleRemapInfo *remapinfo;
- TupleRemapInfo *element_remapinfo;
-
- /* See if element type requires remapping. */
- element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
- /* If not, the array doesn't either. */
- if (element_remapinfo == NULL)
- return NULL;
- /* OK, set up to remap the array. */
- remapinfo = (TupleRemapInfo *)
- MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
- remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
- get_typlenbyvalalign(elemtypid,
- &remapinfo->u.arr.typlen,
- &remapinfo->u.arr.typbyval,
- &remapinfo->u.arr.typalign);
- remapinfo->u.arr.element_remap = element_remapinfo;
- return remapinfo;
-}
-
-static TupleRemapInfo *
-BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
-{
- TupleRemapInfo *remapinfo;
- TupleRemapInfo *bound_remapinfo;
- TypeCacheEntry *typcache;
-
- /*
- * Get range info from the typcache. We assume this pointer will stay
- * valid for the duration of the query.
- */
- typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
- if (typcache->rngelemtype == NULL)
- elog(ERROR, "type %u is not a range type", rngtypid);
-
- /* See if range bound type requires remapping. */
- bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
- mycontext);
- /* If not, the range doesn't either. */
- if (bound_remapinfo == NULL)
- return NULL;
- /* OK, set up to remap the range. */
- remapinfo = (TupleRemapInfo *)
- MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
- remapinfo->remapclass = TQUEUE_REMAP_RANGE;
- remapinfo->u.rng.typcache = typcache;
- remapinfo->u.rng.bound_remap = bound_remapinfo;
- return remapinfo;
-}
-
-/*
- * Build remap info for fields of the type described by the given tupdesc.
- * Returns an array of TupleRemapInfo pointers, or NULL if no field
- * requires remapping. Data is allocated in mycontext.
- */
-static TupleRemapInfo **
-BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
-{
- TupleRemapInfo **remapinfo;
- bool noop = true;
- int i;
-
- /* Recursively determine the remapping status of each field. */
- remapinfo = (TupleRemapInfo **)
- MemoryContextAlloc(mycontext,
- tupledesc->natts * sizeof(TupleRemapInfo *));
- for (i = 0; i < tupledesc->natts; i++)
- {
- Form_pg_attribute attr = tupledesc->attrs[i];
-
- if (attr->attisdropped)
- {
- remapinfo[i] = NULL;
- continue;
- }
- remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
- if (remapinfo[i] != NULL)
- noop = false;
- }
-
- /* If no fields require remapping, report that by returning NULL. */
- if (noop)
- {
- pfree(remapinfo);
- remapinfo = NULL;
- }
-
- return remapinfo;
+ return heap_copytuple(&htup);
}
shared-record-typmod-registry-v1.patchapplication/octet-stream; name=shared-record-typmod-registry-v1.patchDownload
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 9fd7b4e019b..b72f5363045 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -343,11 +343,68 @@ DecrTupleDescRefCount(TupleDesc tupdesc)
}
/*
- * Compare two TupleDesc structures for logical equality
+ * Compare two TupleDesc attributes for logical equality
*
* Note: we deliberately do not check the attrelid and tdtypmod fields.
* This allows typcache.c to use this routine to see if a cached record type
* matches a requested type, and is harmless for relcache.c's uses.
+ */
+bool
+equalTupleDescAttrs(Form_pg_attribute attr1, Form_pg_attribute attr2)
+{
+ /*
+ * We do not need to check every single field here: we can disregard
+ * attrelid and attnum (which were used to place the row in the attrs
+ * array in the first place). It might look like we could dispense
+ * with checking attlen/attbyval/attalign, since these are derived
+ * from atttypid; but in the case of dropped columns we must check
+ * them (since atttypid will be zero for all dropped columns) and in
+ * general it seems safer to check them always.
+ *
+ * attcacheoff must NOT be checked since it's possibly not set in both
+ * copies.
+ */
+ if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
+ return false;
+ if (attr1->atttypid != attr2->atttypid)
+ return false;
+ if (attr1->attstattarget != attr2->attstattarget)
+ return false;
+ if (attr1->attlen != attr2->attlen)
+ return false;
+ if (attr1->attndims != attr2->attndims)
+ return false;
+ if (attr1->atttypmod != attr2->atttypmod)
+ return false;
+ if (attr1->attbyval != attr2->attbyval)
+ return false;
+ if (attr1->attstorage != attr2->attstorage)
+ return false;
+ if (attr1->attalign != attr2->attalign)
+ return false;
+ if (attr1->attnotnull != attr2->attnotnull)
+ return false;
+ if (attr1->atthasdef != attr2->atthasdef)
+ return false;
+ if (attr1->attidentity != attr2->attidentity)
+ return false;
+ if (attr1->attisdropped != attr2->attisdropped)
+ return false;
+ if (attr1->attislocal != attr2->attislocal)
+ return false;
+ if (attr1->attinhcount != attr2->attinhcount)
+ return false;
+ if (attr1->attcollation != attr2->attcollation)
+ return false;
+ /* attacl, attoptions and attfdwoptions are not even present... */
+
+ return true;
+}
+
+/*
+ * Compare two TupleDesc structures for logical equality
+ *
+ * Note: see equalTupleDescAttrs for the note on fields that we don't compare.
* We don't compare tdrefcount, either.
*/
bool
@@ -369,51 +426,8 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Form_pg_attribute attr1 = tupdesc1->attrs[i];
Form_pg_attribute attr2 = tupdesc2->attrs[i];
- /*
- * We do not need to check every single field here: we can disregard
- * attrelid and attnum (which were used to place the row in the attrs
- * array in the first place). It might look like we could dispense
- * with checking attlen/attbyval/attalign, since these are derived
- * from atttypid; but in the case of dropped columns we must check
- * them (since atttypid will be zero for all dropped columns) and in
- * general it seems safer to check them always.
- *
- * attcacheoff must NOT be checked since it's possibly not set in both
- * copies.
- */
- if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
- return false;
- if (attr1->atttypid != attr2->atttypid)
- return false;
- if (attr1->attstattarget != attr2->attstattarget)
- return false;
- if (attr1->attlen != attr2->attlen)
- return false;
- if (attr1->attndims != attr2->attndims)
- return false;
- if (attr1->atttypmod != attr2->atttypmod)
- return false;
- if (attr1->attbyval != attr2->attbyval)
- return false;
- if (attr1->attstorage != attr2->attstorage)
- return false;
- if (attr1->attalign != attr2->attalign)
- return false;
- if (attr1->attnotnull != attr2->attnotnull)
- return false;
- if (attr1->atthasdef != attr2->atthasdef)
- return false;
- if (attr1->attidentity != attr2->attidentity)
- return false;
- if (attr1->attisdropped != attr2->attisdropped)
- return false;
- if (attr1->attislocal != attr2->attislocal)
- return false;
- if (attr1->attinhcount != attr2->attinhcount)
- return false;
- if (attr1->attcollation != attr2->attcollation)
+ if (!equalTupleDescAttrs(attr1, attr2))
return false;
- /* attacl, attoptions and attfdwoptions are not even present... */
}
if (tupdesc1->constr != NULL)
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index b3d3853fbc2..29532cf379c 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -35,6 +35,7 @@
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
+#include "utils/typcache.h"
/*
@@ -46,6 +47,14 @@
*/
#define PARALLEL_ERROR_QUEUE_SIZE 16384
+/*
+ * We want to create a DSA area to store shared state that has the same extent
+ * as a parallel context, to hold the record type registry. We don't want it
+ * to have to create any DSM segments just yet in common cases, so we'll give
+ * it enough space to hold an empty SharedRecordTypmodRegistry.
+ */
+#define PARALLEL_CONTEXT_DSA_SIZE 0x30000
+
/* Magic number for parallel context TOC. */
#define PARALLEL_MAGIC 0x50477c7c
@@ -62,6 +71,8 @@
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_CONTEXT_DSA UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF000B)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -202,6 +213,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size library_len = 0;
Size guc_len = 0;
Size combocidlen = 0;
+ Size typmod_registry_size = 0;
Size tsnaplen = 0;
Size asnaplen = 0;
Size tstatelen = 0;
@@ -237,8 +249,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
+ shm_toc_estimate_chunk(&pcxt->estimator, PARALLEL_CONTEXT_DSA_SIZE);
+ typmod_registry_size = SharedRecordTypmodRegistryEstimate();
+ shm_toc_estimate_chunk(&pcxt->estimator, typmod_registry_size);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 6);
+ shm_toc_estimate_keys(&pcxt->estimator, 8);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -312,6 +327,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *asnapspace;
char *tstatespace;
char *error_queue_space;
+ char *typemod_registry_space;
+ char *context_dsa_space;
/* Serialize shared libraries we have loaded. */
libraryspace = shm_toc_allocate(pcxt->toc, library_len);
@@ -328,6 +345,27 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeComboCIDState(combocidlen, combocidspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
+ /*
+ * Make a DSA area for dynamically-sized shared state that has the
+ * same scope as this ParallelContext.
+ */
+ context_dsa_space = shm_toc_allocate(pcxt->toc,
+ PARALLEL_CONTEXT_DSA_SIZE);
+ pcxt->context_dsa = dsa_create_in_place(context_dsa_space,
+ PARALLEL_CONTEXT_DSA_SIZE,
+ LWTRANCHE_PARALLEL_CONTEXT_DSA,
+ pcxt->seg);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_CONTEXT_DSA, context_dsa_space);
+
+ /* Set up shared record type registry. */
+ typemod_registry_space = shm_toc_allocate(pcxt->toc,
+ typmod_registry_size);
+ SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
+ typemod_registry_space,
+ pcxt->context_dsa, pcxt->seg);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY,
+ typemod_registry_space);
+
/* Serialize transaction snapshot and active snapshot. */
tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
SerializeSnapshot(transaction_snapshot, tsnapspace);
@@ -633,6 +671,13 @@ DestroyParallelContext(ParallelContext *pcxt)
}
}
+ /* Detach from the context-scope DSA area, if there is one. */
+ if (pcxt->context_dsa != NULL)
+ {
+ dsa_detach(pcxt->context_dsa);
+ pcxt->context_dsa = NULL;
+ }
+
/*
* If we have allocated a shared memory segment, detach it. This will
* implicitly detach the error queues, and any other shared memory queues,
@@ -947,6 +992,9 @@ ParallelWorkerMain(Datum main_arg)
char *asnapspace;
char *tstatespace;
StringInfoData msgbuf;
+ char *typmod_registry_space;
+ char *context_dsa_space;
+ dsa_area *context_dsa;
/* Set flag to indicate that we're initializing a parallel worker. */
InitializingParallelWorker = true;
@@ -1066,6 +1114,20 @@ ParallelWorkerMain(Datum main_arg)
Assert(combocidspace != NULL);
RestoreComboCIDState(combocidspace);
+ /* Attach to the DSA area. */
+ context_dsa_space = shm_toc_lookup(toc, PARALLEL_KEY_CONTEXT_DSA);
+ Assert(context_dsa_space != NULL);
+ context_dsa = dsa_attach_in_place(context_dsa_space, seg);
+
+ /* Attach to shared record type registry. */
+ typmod_registry_space =
+ shm_toc_lookup(toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY);
+ Assert(typmod_registry_space != NULL);
+ SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
+ typmod_registry_space,
+ context_dsa,
+ seg);
+
/* Restore transaction snapshot. */
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
Assert(tsnapspace != NULL);
@@ -1114,6 +1176,9 @@ ParallelWorkerMain(Datum main_arg)
/* Must pop active snapshot so resowner.c doesn't complain. */
PopActiveSnapshot();
+ /* Detach from context-scoped DSA area. */
+ dsa_detach(context_dsa);
+
/* Shut down the parallel-worker transaction. */
EndParallelWorkerTransaction();
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 3e133941f47..76e813d59b7 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 128;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
@@ -510,7 +510,13 @@ RegisterLWLockTranches(void)
"predicate_lock_manager");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
+ LWLockRegisterTranche(LWTRANCHE_PARALLEL_CONTEXT_DSA,
+ "parallel_context_dsa");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+ LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_ATTS_INDEX,
+ "shared_record_atts_index");
+ LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX,
+ "shared_record_typmods_index");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index 0cf5001a758..f26b0a7a58f 100644
--- a/src/backend/utils/cache/typcache.c
+++ b/src/backend/utils/cache/typcache.c
@@ -55,7 +55,9 @@
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "executor/executor.h"
+#include "lib/dht.h"
#include "optimizer/planner.h"
+#include "storage/lwlock.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
@@ -67,7 +69,6 @@
#include "utils/syscache.h"
#include "utils/typcache.h"
-
/* The main type cache hashtable searched by lookup_type_cache */
static HTAB *TypeCacheHash = NULL;
@@ -148,12 +149,95 @@ typedef struct RecordCacheEntry
List *tupdescs;
} RecordCacheEntry;
+/*
+ * A mechanism for sharing record typmods between backends.
+ */
+struct SharedRecordTypmodRegistry
+{
+ dht_hash_table_handle atts_index_handle;
+ dht_hash_table_handle typmod_index_handle;
+ pg_atomic_uint32 next_typmod;
+};
+
+/*
+ * A flattened/serialized representation of a TupleDesc for use in shared
+ * memory. Can be converted to and from regular TupleDesc format. Doesn't
+ * support constraints and doesn't store the actual type OID, because this is
+ * only for use with RECORD types as created by CreateTupleDesc(). These are
+ * arranged into a linked list, in the hash table entry corresponding to the
+ * OIDs of the first 16 attributes, so we'd expect to get more than one entry
+ * in the list when named and other properties differ.
+ */
+typedef struct SerializedTupleDesc
+{
+ dsa_pointer next; /* next with the same same attribute OIDs */
+ int natts; /* number of attributes in the tuple */
+ int32 typmod; /* typmod for tuple type */
+ bool hasoid; /* tuple has oid attribute in its header */
+
+ /*
+ * The attributes follow. We only ever access the first
+ * ATTRIBUTE_FIXED_PART_SIZE bytes of each element, like the code in
+ * tupdesc.c.
+ */
+ FormData_pg_attribute attributes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedTupleDesc;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's attribute index. The key is the
+ * first REC_HASH_KEYS attribute OIDs. That means that collisions are
+ * possible, but that's OK because SerializedTupleDesc objects are arranged
+ * into a list.
+ */
+typedef struct SRTRAttsIndexEntry
+{
+ Oid leading_attr_oids[REC_HASH_KEYS];
+ dsa_pointer serialized_tupdesc;
+} SRTRAttsIndexEntry;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's typmod index. Points to a single
+ * SerializedTupleDesc in shared memory.
+ */
+typedef struct SRTRTypmodIndexEntry
+{
+ uint32 typmod;
+ dsa_pointer serialized_tupdesc;
+} SRTRTypmodIndexEntry;
+
+/* Parameters for SharedRecordTypmodRegistry's attributes hash table. */
+const static dht_parameters srtr_atts_index_params = {
+ sizeof(Oid) * REC_HASH_KEYS,
+ sizeof(SRTRAttsIndexEntry),
+ memcmp,
+ tag_hash,
+ LWTRANCHE_SHARED_RECORD_ATTS_INDEX
+};
+
+/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */
+const static dht_parameters srtr_typmod_index_params = {
+ sizeof(uint32),
+ sizeof(SRTRTypmodIndexEntry),
+ memcmp,
+ tag_hash,
+ LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX
+};
+
static HTAB *RecordCacheHash = NULL;
static TupleDesc *RecordCacheArray = NULL;
static int32 RecordCacheArrayLen = 0; /* allocated length of array */
static int32 NextRecordTypmod = 0; /* number of entries used */
+/* Current SharedRecordTypmodRegistry, if attached. */
+static struct
+{
+ SharedRecordTypmodRegistry *shared;
+ dht_hash_table *atts_index;
+ dht_hash_table *typmod_index;
+ dsa_area *area;
+} CurrentSharedRecordTypmodRegistry;
+
static void load_typcache_tupdesc(TypeCacheEntry *typentry);
static void load_rangetype_info(TypeCacheEntry *typentry);
static void load_domaintype_info(TypeCacheEntry *typentry);
@@ -174,6 +258,13 @@ static void TypeCacheConstrCallback(Datum arg, int cacheid, uint32 hashvalue);
static void load_enum_cache_data(TypeCacheEntry *tcache);
static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg);
static int enum_oid_cmp(const void *left, const void *right);
+static void shared_record_typmod_registry_detach(dsm_segment *segment,
+ Datum datum);
+static int32 find_or_allocate_shared_record_typmod(TupleDesc tupdesc);
+static TupleDesc deserialize_tupledesc(const SerializedTupleDesc *serialized);
+static dsa_pointer serialize_tupledesc(dsa_area *area,
+ const TupleDesc tupdesc);
+
/*
@@ -1199,6 +1290,33 @@ cache_record_field_properties(TypeCacheEntry *typentry)
typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES;
}
+/*
+ * Make sure that RecordCacheArray is large enough to store 'typmod'.
+ */
+static void
+ensure_record_cache_typmod_slot_exists(int32 typmod)
+{
+ if (RecordCacheArray == NULL)
+ {
+ RecordCacheArray = (TupleDesc *)
+ MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc));
+ RecordCacheArrayLen = 64;
+ }
+
+ if (typmod >= RecordCacheArrayLen)
+ {
+ int32 newlen = RecordCacheArrayLen * 2;
+
+ while (typmod >= newlen)
+ newlen *= 2;
+
+ RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
+ newlen * sizeof(TupleDesc));
+ memset(RecordCacheArray + RecordCacheArrayLen, 0,
+ (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *));
+ RecordCacheArrayLen = newlen;
+ }
+}
/*
* lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype
@@ -1229,15 +1347,49 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
/*
* It's a transient record type, so look in our record-type table.
*/
- if (typmod < 0 || typmod >= NextRecordTypmod)
+ if (typmod >= 0)
{
- if (!noError)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("record type has not been registered")));
- return NULL;
+ /* It is already in our local cache? */
+ if (typmod < RecordCacheArrayLen &&
+ RecordCacheArray[typmod] != NULL)
+ return RecordCacheArray[typmod];
+
+ /* Are we attached to a SharedRecordTypmodRegistry? */
+ if (CurrentSharedRecordTypmodRegistry.shared != NULL)
+ {
+ SRTRTypmodIndexEntry *entry;
+
+ /* Try to find it in the shared typmod index. */
+ entry = dht_find(CurrentSharedRecordTypmodRegistry.typmod_index,
+ &typmod, false);
+ if (entry != NULL)
+ {
+ SerializedTupleDesc *serialized;
+
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+ entry->serialized_tupdesc);
+ Assert(typmod == serialized->typmod);
+
+ /* We may need to extend the local RecordCacheArray. */
+ ensure_record_cache_typmod_slot_exists(typmod);
+
+ /* Produce and cache a TupleDesc. */
+ RecordCacheArray[typmod] =
+ deserialize_tupledesc(serialized);
+ dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+ entry);
+
+ return RecordCacheArray[typmod];
+ }
+ }
}
- return RecordCacheArray[typmod];
+
+ if (!noError)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("record type has not been registered")));
+ return NULL;
}
}
@@ -1362,30 +1514,27 @@ assign_record_type_typmod(TupleDesc tupDesc)
}
}
+ /* Look in the SharedRecordTypmodRegistry, if attached */
+ newtypmod = find_or_allocate_shared_record_typmod(tupDesc);
+
/* Not present, so need to manufacture an entry */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
- if (RecordCacheArray == NULL)
- {
- RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc));
- RecordCacheArrayLen = 64;
- }
- else if (NextRecordTypmod >= RecordCacheArrayLen)
- {
- int32 newlen = RecordCacheArrayLen * 2;
-
- RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
- newlen * sizeof(TupleDesc));
- RecordCacheArrayLen = newlen;
- }
+ /*
+ * Whether we just got a new typmod from a SharedRecordTypmodRegistry or
+ * we're allocating one locally, make sure the RecordCacheArray is big
+ * enough.
+ */
+ ensure_record_cache_typmod_slot_exists(Max(NextRecordTypmod, newtypmod));
/* if fail in subrs, no damage except possibly some wasted memory... */
entDesc = CreateTupleDescCopy(tupDesc);
recentry->tupdescs = lcons(entDesc, recentry->tupdescs);
/* mark it as a reference-counted tupdesc */
entDesc->tdrefcount = 1;
- /* now it's safe to advance NextRecordTypmod */
- newtypmod = NextRecordTypmod++;
+ /* now it's safe to advance NextRecordTypmod, if allocating locally */
+ if (newtypmod == -1)
+ newtypmod = NextRecordTypmod++;
entDesc->tdtypmod = newtypmod;
RecordCacheArray[newtypmod] = entDesc;
@@ -1396,6 +1545,176 @@ assign_record_type_typmod(TupleDesc tupDesc)
}
/*
+ * Return the amout of shmem required to hold a SharedRecordTypmodRegistry.
+ * This exists only to avoid exposing private innards of
+ * SharedRecordTypmodRegistry in a header.
+ */
+size_t
+SharedRecordTypmodRegistryEstimate(void)
+{
+ return sizeof(SharedRecordTypmodRegistry);
+}
+
+/*
+ * Initialize 'registry' in a pre-existing shared memory region, which must be
+ * maximally aligned and have space for SharedRecordTypmodRegistryEstimate()
+ * bytes.
+ *
+ * 'area' will be used to allocate shared memory space as required for the
+ * typemod registration. The current process, expected to be a leader process
+ * in a parallel query, will be attached automatically and its current record
+ * types will be loaded into the *registry. While attached, all calls to
+ * assign_record_type_typmod will use the shared registry. Other backends
+ * will need to attach explicitly.
+ *
+ * An on-detach callback will be installed for 'segment', so that normal
+ * private record type cache behavior can be restored when the DSM segment
+ * goes away.
+ */
+void
+SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry,
+ dsa_area *area,
+ dsm_segment *segment)
+{
+ dht_hash_table *atts_index;
+ dht_hash_table *typmod_index;
+ int32 typmod;
+
+ /* We can't already be attached to a shared registry. */
+ Assert(CurrentSharedRecordTypmodRegistry.shared == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.area == NULL);
+
+ /* Create the hash table indexed by attribute OIDs. */
+ atts_index = dht_create(area, &srtr_atts_index_params);
+ registry->atts_index_handle = dht_get_hash_table_handle(atts_index);
+
+ /* Create the hash table indexed by typmod. */
+ typmod_index = dht_create(area, &srtr_typmod_index_params);
+ registry->typmod_index_handle = dht_get_hash_table_handle(typmod_index);
+
+ /* Initialize the 'next typmode' to this backend's next value. */
+ pg_atomic_init_u32(®istry->next_typmod, NextRecordTypmod);
+
+ /*
+ * Copy all entries from this backend's private registry into the shared
+ * registry.
+ */
+ for (typmod = 0; typmod < NextRecordTypmod; ++typmod)
+ {
+ SRTRTypmodIndexEntry *typmod_index_entry;
+ SRTRAttsIndexEntry *atts_index_entry;
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ TupleDesc tupdesc;
+ Oid atts_key[REC_HASH_KEYS];
+ bool found;
+ int i;
+
+ tupdesc = RecordCacheArray[typmod];
+ if (tupdesc == NULL)
+ continue;
+
+ /* Serialize the TupleDesc into shared memory. */
+ serialized_dp = serialize_tupledesc(area, tupdesc);
+
+ /* Insert into the typmod index. */
+ typmod_index_entry = dht_find_or_insert(typmod_index,
+ &tupdesc->tdtypmod,
+ &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ typmod_index_entry->typmod = tupdesc->tdtypmod;
+ typmod_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(typmod_index, typmod_index_entry);
+
+ /* Insert into the attributes index. */
+ memset(atts_key, 0, sizeof(atts_key));
+ for (i = 0; i < Min(tupdesc->natts, REC_HASH_KEYS); ++i)
+ atts_key[i] = tupdesc->attrs[i]->atttypid;
+ atts_index_entry = dht_find_or_insert(atts_index, &atts_key, &found);
+ if (!found)
+ {
+ memcpy(atts_index_entry->leading_attr_oids,
+ atts_key,
+ sizeof(atts_key));
+ atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+ }
+
+ /* Push onto list. */
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(area, serialized_dp);
+ serialized->next = atts_index_entry->serialized_tupdesc;
+ atts_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(atts_index, atts_index_entry);
+ }
+
+ /* Set up our detach hook so that we can return to private cache mode. */
+ on_dsm_detach(segment, shared_record_typmod_registry_detach,
+ PointerGetDatum(registry));
+
+ /*
+ * Set up the global state that will tell assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal about the shared registry.
+ */
+ CurrentSharedRecordTypmodRegistry.shared = registry;
+ CurrentSharedRecordTypmodRegistry.atts_index = atts_index;
+ CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index;
+ CurrentSharedRecordTypmodRegistry.area = area;
+}
+
+/*
+ * Attach to 'registry', which must have been initialized already by another
+ * backend. Future calls to assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal will use the shared registry, until
+ * 'segment' is detached.
+ */
+void
+SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry,
+ dsa_area *area,
+ dsm_segment *segment)
+{
+ dht_hash_table *atts_index;
+ dht_hash_table *typmod_index;
+
+ /* We can't already be attached to a shared registry. */
+ Assert(CurrentSharedRecordTypmodRegistry.shared == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.area == NULL);
+
+ /*
+ * We can't already have typmods in our local cache, because they'd clash
+ * with those imported by SharedRecordTypmodRegistryInit. This should be a
+ * freshly started parallel worker. If we ever support worker recycling,
+ * a worker would need to zap its local cache in between servicing
+ * different queries, in order to be able to call this and synchronize
+ * typmods with a new leader.
+ */
+ Assert(NextRecordTypmod == 0);
+
+ /* Attach to the two hash tables. */
+ atts_index = dht_attach(area, &srtr_atts_index_params,
+ registry->atts_index_handle);
+ typmod_index = dht_attach(area, &srtr_typmod_index_params,
+ registry->typmod_index_handle);
+
+ /* Set up our detach hook so that we can return to private cache mode. */
+ on_dsm_detach(segment, shared_record_typmod_registry_detach,
+ PointerGetDatum(registry));
+
+ /*
+ * Set up the global state that will tell assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal about the shared registry.
+ */
+ CurrentSharedRecordTypmodRegistry.shared = registry;
+ CurrentSharedRecordTypmodRegistry.atts_index = atts_index;
+ CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index;
+ CurrentSharedRecordTypmodRegistry.area = area;
+}
+
+/*
* TypeCacheRelCallback
* Relcache inval callback function
*
@@ -1809,3 +2128,225 @@ enum_oid_cmp(const void *left, const void *right)
else
return 0;
}
+
+/*
+ * Serialize a TupleDesc into a SerializedTupleDesc in DSA area 'area', and
+ * return a dsa_pointer.
+ */
+static dsa_pointer
+serialize_tupledesc(dsa_area *area, const TupleDesc tupdesc)
+{
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ size_t size;
+ int i;
+
+ size = offsetof(SerializedTupleDesc, attributes) +
+ sizeof(FormData_pg_attribute) * tupdesc->natts;
+ serialized_dp = dsa_allocate(area, size);
+ serialized = (SerializedTupleDesc *) dsa_get_address(area, serialized_dp);
+
+ serialized->natts = tupdesc->natts;
+ serialized->typmod = tupdesc->tdtypmod;
+ serialized->hasoid = tupdesc->tdhasoid;
+ for (i = 0; i < tupdesc->natts; ++i)
+ memcpy(&serialized->attributes[i], tupdesc->attrs[i],
+ ATTRIBUTE_FIXED_PART_SIZE);
+
+ return serialized_dp;
+}
+
+/*
+ * Deserialize a SerializedTupleDesc to produce a TupleDesc. The result is
+ * allocated in CacheMemoryContext and has a refcount of 1.
+ */
+static TupleDesc
+deserialize_tupledesc(const SerializedTupleDesc *serialized)
+{
+ Form_pg_attribute *attributes;
+ MemoryContext oldctxt;
+ TupleDesc tupdesc;
+ int i;
+
+ /*
+ * We have an array of FormData_pg_attribute but we need an array of
+ * pointers to FormData_pg_attribute.
+ */
+ oldctxt = MemoryContextSwitchTo(CacheMemoryContext);
+ attributes = palloc(sizeof(Form_pg_attribute) * serialized->natts);
+ for (i = 0; i < serialized->natts; ++i)
+ {
+ attributes[i] = palloc(ATTRIBUTE_FIXED_PART_SIZE);
+ memcpy(attributes[i], &serialized->attributes[i],
+ ATTRIBUTE_FIXED_PART_SIZE);
+ }
+ tupdesc =
+ CreateTupleDesc(serialized->natts, serialized->hasoid, attributes);
+ tupdesc->tdtypmod = serialized->typmod;
+ tupdesc->tdrefcount = 1;
+ MemoryContextSwitchTo(oldctxt);
+
+ return tupdesc;
+}
+
+/*
+ * We can't use equalTupleDescs to compare a SerializedTupleDesc with a
+ * TupleDesc, but we don't want to allocate memory just to compare. This
+ * function produces the same result without deserializing first.
+ */
+static bool
+serialized_tupledesc_matches(SerializedTupleDesc *serialized,
+ TupleDesc tupdesc)
+{
+ int i;
+
+ if (serialized->natts != tupdesc->natts ||
+ serialized->hasoid != tupdesc->tdhasoid ||
+ tupdesc->constr != NULL)
+ return false;
+
+ for (i = 0; i < serialized->natts; ++i)
+ {
+ if (!equalTupleDescAttrs(&serialized->attributes[i],
+ tupdesc->attrs[i]))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * If we are attached to a SharedRecordTypmodRegistry, find or create a
+ * SerializedTupleDesc that matches 'tupdesc', and return its typmod.
+ * Otherwise return -1.
+ */
+static int32
+find_or_allocate_shared_record_typmod(TupleDesc tupdesc)
+{
+ SRTRAttsIndexEntry *atts_index_entry;
+ SRTRTypmodIndexEntry *typmod_index_entry;
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ Oid hashkey[REC_HASH_KEYS];
+ bool found;
+ int32 typmod;
+ int i;
+
+ /* If not even attached, nothing to do. */
+ if (CurrentSharedRecordTypmodRegistry.shared == NULL)
+ return -1;
+
+ /* Try to find a match. */
+ memset(hashkey, 0, sizeof(hashkey));
+ for (i = 0; i < tupdesc->natts; ++i)
+ hashkey[i] = tupdesc->attrs[i]->atttypid;
+ atts_index_entry = (SRTRAttsIndexEntry *)
+ dht_find_or_insert(CurrentSharedRecordTypmodRegistry.atts_index,
+ hashkey,
+ &found);
+ if (!found)
+ {
+ /* Making a new entry. */
+ memcpy(atts_index_entry->leading_attr_oids,
+ hashkey,
+ sizeof(hashkey));
+ atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+ }
+
+ /* Scan the list we found for a matching serialized one. */
+ serialized_dp = atts_index_entry->serialized_tupdesc;
+ while (DsaPointerIsValid(serialized_dp))
+ {
+ serialized =
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+ serialized_dp);
+ if (serialized_tupledesc_matches(serialized, tupdesc))
+ {
+ /* Found a match, we are finished. */
+ typmod = serialized->typmod;
+ dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+ atts_index_entry);
+ return typmod;
+ }
+ serialized_dp = serialized->next;
+ }
+
+ /* We didn't find a matching entry, so let's allocate a new one. */
+ typmod = (int)
+ pg_atomic_fetch_add_u32(&CurrentSharedRecordTypmodRegistry.shared->next_typmod,
+ 1);
+
+ /* Allocate shared memory and serialize the TupleDesc. */
+ serialized_dp = serialize_tupledesc(CurrentSharedRecordTypmodRegistry.area,
+ tupdesc);
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area, serialized_dp);
+ serialized->typmod = typmod;
+
+ /*
+ * While we still hold the atts_index entry locked, add this to
+ * typmod_index. That's important because we don't want anyone to be able
+ * to find a typmod via the former that can't yet be looked up in the
+ * latter.
+ */
+ typmod_index_entry =
+ dht_find_or_insert(CurrentSharedRecordTypmodRegistry.typmod_index,
+ &typmod, &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ typmod_index_entry->typmod = typmod;
+ typmod_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+ typmod_index_entry);
+
+ /* Push onto the front of list in atts_index_entry. */
+ serialized->next = atts_index_entry->serialized_tupdesc;
+ atts_index_entry->serialized_tupdesc = serialized_dp;
+
+ dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+ atts_index_entry);
+
+ return typmod;
+}
+
+/*
+ * DSM segment detach hook used to disconnect this backend's record typmod
+ * cache from the shared registry. Detaching from the
+ * SharedRecordTypmodRegistry returns this backend to local typmod cache mode
+ * until such time as another parallel query runs.
+ */
+static void
+shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum)
+{
+ SharedRecordTypmodRegistry *shared;
+
+ shared = (SharedRecordTypmodRegistry *) DatumGetPointer(datum);
+
+ /*
+ * XXX Should we now copy all entries from shared memory into the
+ * backend's local cache? That depends on whether you think that there is
+ * any chance this backend could see any shared tuples created by other
+ * backends after this detach operation. If tuples somehow survived from
+ * query to query, that would be true. But presently I don't think they
+ * do, if we assume that all mechanisms that allow us to receive tuples
+ * from other backends are linked to DSM segment mapping lifetime (tuple
+ * queues, shared hash tables, shared temporary files).
+ *
+ * The only thing we need to synchronize to return to local-typmod-cache
+ * mode is NextRecordTypmod. That means that we can resume generating new
+ * backend-local entries that don't clash.
+ */
+ NextRecordTypmod = pg_atomic_read_u32(&shared->next_typmod);
+
+ /*
+ * We don't free the SharedRecordTypmodRegistry's DSM memory, though we
+ * could using a reference counting scheme if we wanted to. There doesn't
+ * seem to be any point because the whole DSA area will be going away
+ * automatically when the DSM segment containing it is destroyed,
+ * conceptually like a MemoryContext.
+ */
+ CurrentSharedRecordTypmodRegistry.shared = NULL;
+ CurrentSharedRecordTypmodRegistry.atts_index = NULL;
+ CurrentSharedRecordTypmodRegistry.typmod_index = NULL;
+ CurrentSharedRecordTypmodRegistry.area = NULL;
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 5065a3830cf..a0c3b8eb955 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -19,6 +19,7 @@
#include "postmaster/bgworker.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
+#include "utils/dsa.h"
typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
@@ -41,6 +42,7 @@ typedef struct ParallelContext
ErrorContextCallback *error_context_stack;
shm_toc_estimator estimator;
dsm_segment *seg;
+ dsa_area *context_dsa;
void *private_memory;
shm_toc *toc;
ParallelWorkerInfo *worker;
diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h
index b48f839028b..97a73b1483e 100644
--- a/src/include/access/tupdesc.h
+++ b/src/include/access/tupdesc.h
@@ -110,6 +110,9 @@ extern void DecrTupleDescRefCount(TupleDesc tupdesc);
DecrTupleDescRefCount(tupdesc); \
} while (0)
+extern bool equalTupleDescAttrs(Form_pg_attribute attr1,
+ Form_pg_attribute attr2);
+
extern bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
extern void TupleDescInitEntry(TupleDesc desc,
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 0cd45bb6d8e..c0117f7bafd 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -212,7 +212,10 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_QUERY_DSA,
+ LWTRANCHE_PARALLEL_CONTEXT_DSA,
LWTRANCHE_TBM,
+ LWTRANCHE_SHARED_RECORD_ATTS_INDEX,
+ LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 1bf94e2548d..861610a2835 100644
--- a/src/include/utils/typcache.h
+++ b/src/include/utils/typcache.h
@@ -18,6 +18,8 @@
#include "access/tupdesc.h"
#include "fmgr.h"
+#include "storage/dsm.h"
+#include "utils/dsa.h"
/* DomainConstraintCache is an opaque struct known only within typcache.c */
@@ -139,6 +141,7 @@ typedef struct DomainConstraintRef
MemoryContextCallback callback; /* used to release refcount when done */
} DomainConstraintRef;
+typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry;
extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags);
@@ -160,4 +163,12 @@ extern void assign_record_type_typmod(TupleDesc tupDesc);
extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
+extern size_t SharedRecordTypmodRegistryEstimate(void);
+extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
+ dsa_area *area,
+ dsm_segment *seg);
+extern void SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *,
+ dsa_area *area,
+ dsm_segment *seg);
+
#endif /* TYPCACHE_H */
On Fri, Apr 7, 2017 at 5:21 PM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
* It would be nice for the SharedRecordTypeRegistry to be able to
survive longer than a single parallel query, perhaps in a per-session
DSM segment. Perhaps eventually we will want to consider a
query-scoped area, a transaction-scoped area and a session-scoped
area? I didn't investigate that for this POC.
This seems like the right way to go. I think there should be one
extra patch in this patch stack, to create a per-session DSA area (and
perhaps a "SharedSessionState" struct?) that worker backends can
attach to. It could be created when you first run a parallel query,
and then reused for all parallel queries for the rest of your session.
So, after you've run one parallel query, all future record typmod
registrations would get pushed (write-through style) into shmem, for
use by other backends that you might start in future parallel queries.
That will avoid having to copy the leader's registered record typmods
into shmem for every query going forward (the behaviour of the current
POC patch).
* Perhaps simplehash + an LWLock would be better than dht, but I
haven't looked into that. Can it be convinced to work in DSA memory
and to grow on demand?
Any views on this?
1. Apply dht-v3.patch[3].
2. Apply shared-record-typmod-registry-v1.patch.
3. Apply rip-out-tqueue-remapping-v1.patch.
Here's a rebased version of the second patch (the other two still
apply). It's still POC code only and still uses a
per-parallel-context DSA area for space, not the per-session one I am
now proposing we develop, if people are in favour of the approach.
In case it wasn't clear from my earlier description, a nice side
effect of using a shared typmod registry is that you can delete 85% of
tqueue.c (see patch #3), so if you don't count the hash table
implementation we come out about even in terms of lines of code.
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
shared-record-typmod-registry-v2.patchapplication/octet-stream; name=shared-record-typmod-registry-v2.patchDownload
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 9fd7b4e019b..b72f5363045 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -343,11 +343,68 @@ DecrTupleDescRefCount(TupleDesc tupdesc)
}
/*
- * Compare two TupleDesc structures for logical equality
+ * Compare two TupleDesc attributes for logical equality
*
* Note: we deliberately do not check the attrelid and tdtypmod fields.
* This allows typcache.c to use this routine to see if a cached record type
* matches a requested type, and is harmless for relcache.c's uses.
+ */
+bool
+equalTupleDescAttrs(Form_pg_attribute attr1, Form_pg_attribute attr2)
+{
+ /*
+ * We do not need to check every single field here: we can disregard
+ * attrelid and attnum (which were used to place the row in the attrs
+ * array in the first place). It might look like we could dispense
+ * with checking attlen/attbyval/attalign, since these are derived
+ * from atttypid; but in the case of dropped columns we must check
+ * them (since atttypid will be zero for all dropped columns) and in
+ * general it seems safer to check them always.
+ *
+ * attcacheoff must NOT be checked since it's possibly not set in both
+ * copies.
+ */
+ if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
+ return false;
+ if (attr1->atttypid != attr2->atttypid)
+ return false;
+ if (attr1->attstattarget != attr2->attstattarget)
+ return false;
+ if (attr1->attlen != attr2->attlen)
+ return false;
+ if (attr1->attndims != attr2->attndims)
+ return false;
+ if (attr1->atttypmod != attr2->atttypmod)
+ return false;
+ if (attr1->attbyval != attr2->attbyval)
+ return false;
+ if (attr1->attstorage != attr2->attstorage)
+ return false;
+ if (attr1->attalign != attr2->attalign)
+ return false;
+ if (attr1->attnotnull != attr2->attnotnull)
+ return false;
+ if (attr1->atthasdef != attr2->atthasdef)
+ return false;
+ if (attr1->attidentity != attr2->attidentity)
+ return false;
+ if (attr1->attisdropped != attr2->attisdropped)
+ return false;
+ if (attr1->attislocal != attr2->attislocal)
+ return false;
+ if (attr1->attinhcount != attr2->attinhcount)
+ return false;
+ if (attr1->attcollation != attr2->attcollation)
+ return false;
+ /* attacl, attoptions and attfdwoptions are not even present... */
+
+ return true;
+}
+
+/*
+ * Compare two TupleDesc structures for logical equality
+ *
+ * Note: see equalTupleDescAttrs for the note on fields that we don't compare.
* We don't compare tdrefcount, either.
*/
bool
@@ -369,51 +426,8 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Form_pg_attribute attr1 = tupdesc1->attrs[i];
Form_pg_attribute attr2 = tupdesc2->attrs[i];
- /*
- * We do not need to check every single field here: we can disregard
- * attrelid and attnum (which were used to place the row in the attrs
- * array in the first place). It might look like we could dispense
- * with checking attlen/attbyval/attalign, since these are derived
- * from atttypid; but in the case of dropped columns we must check
- * them (since atttypid will be zero for all dropped columns) and in
- * general it seems safer to check them always.
- *
- * attcacheoff must NOT be checked since it's possibly not set in both
- * copies.
- */
- if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
- return false;
- if (attr1->atttypid != attr2->atttypid)
- return false;
- if (attr1->attstattarget != attr2->attstattarget)
- return false;
- if (attr1->attlen != attr2->attlen)
- return false;
- if (attr1->attndims != attr2->attndims)
- return false;
- if (attr1->atttypmod != attr2->atttypmod)
- return false;
- if (attr1->attbyval != attr2->attbyval)
- return false;
- if (attr1->attstorage != attr2->attstorage)
- return false;
- if (attr1->attalign != attr2->attalign)
- return false;
- if (attr1->attnotnull != attr2->attnotnull)
- return false;
- if (attr1->atthasdef != attr2->atthasdef)
- return false;
- if (attr1->attidentity != attr2->attidentity)
- return false;
- if (attr1->attisdropped != attr2->attisdropped)
- return false;
- if (attr1->attislocal != attr2->attislocal)
- return false;
- if (attr1->attinhcount != attr2->attinhcount)
- return false;
- if (attr1->attcollation != attr2->attcollation)
+ if (!equalTupleDescAttrs(attr1, attr2))
return false;
- /* attacl, attoptions and attfdwoptions are not even present... */
}
if (tupdesc1->constr != NULL)
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 2dad3e8a655..54acd989101 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -36,6 +36,7 @@
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
+#include "utils/typcache.h"
/*
@@ -47,6 +48,14 @@
*/
#define PARALLEL_ERROR_QUEUE_SIZE 16384
+/*
+ * We want to create a DSA area to store shared state that has the same extent
+ * as a parallel context, to hold the record type registry. We don't want it
+ * to have to create any DSM segments just yet in common cases, so we'll give
+ * it enough space to hold an empty SharedRecordTypmodRegistry.
+ */
+#define PARALLEL_CONTEXT_DSA_SIZE 0x30000
+
/* Magic number for parallel context TOC. */
#define PARALLEL_MAGIC 0x50477c7c
@@ -63,6 +72,9 @@
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_CONTEXT_DSA UINT64CONST(0xFFFFFFFFFFFF000B)
+#define PARALLEL_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF000C)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -191,6 +203,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size library_len = 0;
Size guc_len = 0;
Size combocidlen = 0;
+ Size typmod_registry_size = 0;
Size tsnaplen = 0;
Size asnaplen = 0;
Size tstatelen = 0;
@@ -226,8 +239,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
+ shm_toc_estimate_chunk(&pcxt->estimator, PARALLEL_CONTEXT_DSA_SIZE);
+ typmod_registry_size = SharedRecordTypmodRegistryEstimate();
+ shm_toc_estimate_chunk(&pcxt->estimator, typmod_registry_size);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 6);
+ shm_toc_estimate_keys(&pcxt->estimator, 8);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -295,6 +311,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *asnapspace;
char *tstatespace;
char *error_queue_space;
+ char *typemod_registry_space;
+ char *context_dsa_space;
char *entrypointstate;
Size lnamelen;
@@ -313,6 +331,27 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeComboCIDState(combocidlen, combocidspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
+ /*
+ * Make a DSA area for dynamically-sized shared state that has the
+ * same scope as this ParallelContext.
+ */
+ context_dsa_space = shm_toc_allocate(pcxt->toc,
+ PARALLEL_CONTEXT_DSA_SIZE);
+ pcxt->context_dsa = dsa_create_in_place(context_dsa_space,
+ PARALLEL_CONTEXT_DSA_SIZE,
+ LWTRANCHE_PARALLEL_CONTEXT_DSA,
+ pcxt->seg);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_CONTEXT_DSA, context_dsa_space);
+
+ /* Set up shared record type registry. */
+ typemod_registry_space = shm_toc_allocate(pcxt->toc,
+ typmod_registry_size);
+ SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
+ typemod_registry_space,
+ pcxt->context_dsa, pcxt->seg);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY,
+ typemod_registry_space);
+
/* Serialize transaction snapshot and active snapshot. */
tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
SerializeSnapshot(transaction_snapshot, tsnapspace);
@@ -618,6 +657,13 @@ DestroyParallelContext(ParallelContext *pcxt)
}
}
+ /* Detach from the context-scope DSA area, if there is one. */
+ if (pcxt->context_dsa != NULL)
+ {
+ dsa_detach(pcxt->context_dsa);
+ pcxt->context_dsa = NULL;
+ }
+
/*
* If we have allocated a shared memory segment, detach it. This will
* implicitly detach the error queues, and any other shared memory queues,
@@ -938,6 +984,9 @@ ParallelWorkerMain(Datum main_arg)
char *asnapspace;
char *tstatespace;
StringInfoData msgbuf;
+ char *typmod_registry_space;
+ char *context_dsa_space;
+ dsa_area *context_dsa;
/* Set flag to indicate that we're initializing a parallel worker. */
InitializingParallelWorker = true;
@@ -1069,6 +1118,20 @@ ParallelWorkerMain(Datum main_arg)
Assert(combocidspace != NULL);
RestoreComboCIDState(combocidspace);
+ /* Attach to the DSA area. */
+ context_dsa_space = shm_toc_lookup(toc, PARALLEL_KEY_CONTEXT_DSA);
+ Assert(context_dsa_space != NULL);
+ context_dsa = dsa_attach_in_place(context_dsa_space, seg);
+
+ /* Attach to shared record type registry. */
+ typmod_registry_space =
+ shm_toc_lookup(toc, PARALLEL_KEY_RECORD_TYPMOD_REGISTRY);
+ Assert(typmod_registry_space != NULL);
+ SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
+ typmod_registry_space,
+ context_dsa,
+ seg);
+
/* Restore transaction snapshot. */
tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
Assert(tsnapspace != NULL);
@@ -1114,6 +1177,9 @@ ParallelWorkerMain(Datum main_arg)
/* Must pop active snapshot so resowner.c doesn't complain. */
PopActiveSnapshot();
+ /* Detach from context-scoped DSA area. */
+ dsa_detach(context_dsa);
+
/* Shut down the parallel-worker transaction. */
EndParallelWorkerTransaction();
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 35536e47894..0d7996b5205 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 128;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
@@ -510,7 +510,13 @@ RegisterLWLockTranches(void)
"predicate_lock_manager");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
+ LWLockRegisterTranche(LWTRANCHE_PARALLEL_CONTEXT_DSA,
+ "parallel_context_dsa");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+ LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_ATTS_INDEX,
+ "shared_record_atts_index");
+ LWLockRegisterTranche(LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX,
+ "shared_record_typmods_index");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index 0cf5001a758..f26b0a7a58f 100644
--- a/src/backend/utils/cache/typcache.c
+++ b/src/backend/utils/cache/typcache.c
@@ -55,7 +55,9 @@
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "executor/executor.h"
+#include "lib/dht.h"
#include "optimizer/planner.h"
+#include "storage/lwlock.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
@@ -67,7 +69,6 @@
#include "utils/syscache.h"
#include "utils/typcache.h"
-
/* The main type cache hashtable searched by lookup_type_cache */
static HTAB *TypeCacheHash = NULL;
@@ -148,12 +149,95 @@ typedef struct RecordCacheEntry
List *tupdescs;
} RecordCacheEntry;
+/*
+ * A mechanism for sharing record typmods between backends.
+ */
+struct SharedRecordTypmodRegistry
+{
+ dht_hash_table_handle atts_index_handle;
+ dht_hash_table_handle typmod_index_handle;
+ pg_atomic_uint32 next_typmod;
+};
+
+/*
+ * A flattened/serialized representation of a TupleDesc for use in shared
+ * memory. Can be converted to and from regular TupleDesc format. Doesn't
+ * support constraints and doesn't store the actual type OID, because this is
+ * only for use with RECORD types as created by CreateTupleDesc(). These are
+ * arranged into a linked list, in the hash table entry corresponding to the
+ * OIDs of the first 16 attributes, so we'd expect to get more than one entry
+ * in the list when named and other properties differ.
+ */
+typedef struct SerializedTupleDesc
+{
+ dsa_pointer next; /* next with the same same attribute OIDs */
+ int natts; /* number of attributes in the tuple */
+ int32 typmod; /* typmod for tuple type */
+ bool hasoid; /* tuple has oid attribute in its header */
+
+ /*
+ * The attributes follow. We only ever access the first
+ * ATTRIBUTE_FIXED_PART_SIZE bytes of each element, like the code in
+ * tupdesc.c.
+ */
+ FormData_pg_attribute attributes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedTupleDesc;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's attribute index. The key is the
+ * first REC_HASH_KEYS attribute OIDs. That means that collisions are
+ * possible, but that's OK because SerializedTupleDesc objects are arranged
+ * into a list.
+ */
+typedef struct SRTRAttsIndexEntry
+{
+ Oid leading_attr_oids[REC_HASH_KEYS];
+ dsa_pointer serialized_tupdesc;
+} SRTRAttsIndexEntry;
+
+/*
+ * An entry in SharedRecordTypmodRegistry's typmod index. Points to a single
+ * SerializedTupleDesc in shared memory.
+ */
+typedef struct SRTRTypmodIndexEntry
+{
+ uint32 typmod;
+ dsa_pointer serialized_tupdesc;
+} SRTRTypmodIndexEntry;
+
+/* Parameters for SharedRecordTypmodRegistry's attributes hash table. */
+const static dht_parameters srtr_atts_index_params = {
+ sizeof(Oid) * REC_HASH_KEYS,
+ sizeof(SRTRAttsIndexEntry),
+ memcmp,
+ tag_hash,
+ LWTRANCHE_SHARED_RECORD_ATTS_INDEX
+};
+
+/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */
+const static dht_parameters srtr_typmod_index_params = {
+ sizeof(uint32),
+ sizeof(SRTRTypmodIndexEntry),
+ memcmp,
+ tag_hash,
+ LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX
+};
+
static HTAB *RecordCacheHash = NULL;
static TupleDesc *RecordCacheArray = NULL;
static int32 RecordCacheArrayLen = 0; /* allocated length of array */
static int32 NextRecordTypmod = 0; /* number of entries used */
+/* Current SharedRecordTypmodRegistry, if attached. */
+static struct
+{
+ SharedRecordTypmodRegistry *shared;
+ dht_hash_table *atts_index;
+ dht_hash_table *typmod_index;
+ dsa_area *area;
+} CurrentSharedRecordTypmodRegistry;
+
static void load_typcache_tupdesc(TypeCacheEntry *typentry);
static void load_rangetype_info(TypeCacheEntry *typentry);
static void load_domaintype_info(TypeCacheEntry *typentry);
@@ -174,6 +258,13 @@ static void TypeCacheConstrCallback(Datum arg, int cacheid, uint32 hashvalue);
static void load_enum_cache_data(TypeCacheEntry *tcache);
static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg);
static int enum_oid_cmp(const void *left, const void *right);
+static void shared_record_typmod_registry_detach(dsm_segment *segment,
+ Datum datum);
+static int32 find_or_allocate_shared_record_typmod(TupleDesc tupdesc);
+static TupleDesc deserialize_tupledesc(const SerializedTupleDesc *serialized);
+static dsa_pointer serialize_tupledesc(dsa_area *area,
+ const TupleDesc tupdesc);
+
/*
@@ -1199,6 +1290,33 @@ cache_record_field_properties(TypeCacheEntry *typentry)
typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES;
}
+/*
+ * Make sure that RecordCacheArray is large enough to store 'typmod'.
+ */
+static void
+ensure_record_cache_typmod_slot_exists(int32 typmod)
+{
+ if (RecordCacheArray == NULL)
+ {
+ RecordCacheArray = (TupleDesc *)
+ MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc));
+ RecordCacheArrayLen = 64;
+ }
+
+ if (typmod >= RecordCacheArrayLen)
+ {
+ int32 newlen = RecordCacheArrayLen * 2;
+
+ while (typmod >= newlen)
+ newlen *= 2;
+
+ RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
+ newlen * sizeof(TupleDesc));
+ memset(RecordCacheArray + RecordCacheArrayLen, 0,
+ (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *));
+ RecordCacheArrayLen = newlen;
+ }
+}
/*
* lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype
@@ -1229,15 +1347,49 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
/*
* It's a transient record type, so look in our record-type table.
*/
- if (typmod < 0 || typmod >= NextRecordTypmod)
+ if (typmod >= 0)
{
- if (!noError)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("record type has not been registered")));
- return NULL;
+ /* It is already in our local cache? */
+ if (typmod < RecordCacheArrayLen &&
+ RecordCacheArray[typmod] != NULL)
+ return RecordCacheArray[typmod];
+
+ /* Are we attached to a SharedRecordTypmodRegistry? */
+ if (CurrentSharedRecordTypmodRegistry.shared != NULL)
+ {
+ SRTRTypmodIndexEntry *entry;
+
+ /* Try to find it in the shared typmod index. */
+ entry = dht_find(CurrentSharedRecordTypmodRegistry.typmod_index,
+ &typmod, false);
+ if (entry != NULL)
+ {
+ SerializedTupleDesc *serialized;
+
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+ entry->serialized_tupdesc);
+ Assert(typmod == serialized->typmod);
+
+ /* We may need to extend the local RecordCacheArray. */
+ ensure_record_cache_typmod_slot_exists(typmod);
+
+ /* Produce and cache a TupleDesc. */
+ RecordCacheArray[typmod] =
+ deserialize_tupledesc(serialized);
+ dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+ entry);
+
+ return RecordCacheArray[typmod];
+ }
+ }
}
- return RecordCacheArray[typmod];
+
+ if (!noError)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("record type has not been registered")));
+ return NULL;
}
}
@@ -1362,30 +1514,27 @@ assign_record_type_typmod(TupleDesc tupDesc)
}
}
+ /* Look in the SharedRecordTypmodRegistry, if attached */
+ newtypmod = find_or_allocate_shared_record_typmod(tupDesc);
+
/* Not present, so need to manufacture an entry */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
- if (RecordCacheArray == NULL)
- {
- RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc));
- RecordCacheArrayLen = 64;
- }
- else if (NextRecordTypmod >= RecordCacheArrayLen)
- {
- int32 newlen = RecordCacheArrayLen * 2;
-
- RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
- newlen * sizeof(TupleDesc));
- RecordCacheArrayLen = newlen;
- }
+ /*
+ * Whether we just got a new typmod from a SharedRecordTypmodRegistry or
+ * we're allocating one locally, make sure the RecordCacheArray is big
+ * enough.
+ */
+ ensure_record_cache_typmod_slot_exists(Max(NextRecordTypmod, newtypmod));
/* if fail in subrs, no damage except possibly some wasted memory... */
entDesc = CreateTupleDescCopy(tupDesc);
recentry->tupdescs = lcons(entDesc, recentry->tupdescs);
/* mark it as a reference-counted tupdesc */
entDesc->tdrefcount = 1;
- /* now it's safe to advance NextRecordTypmod */
- newtypmod = NextRecordTypmod++;
+ /* now it's safe to advance NextRecordTypmod, if allocating locally */
+ if (newtypmod == -1)
+ newtypmod = NextRecordTypmod++;
entDesc->tdtypmod = newtypmod;
RecordCacheArray[newtypmod] = entDesc;
@@ -1396,6 +1545,176 @@ assign_record_type_typmod(TupleDesc tupDesc)
}
/*
+ * Return the amout of shmem required to hold a SharedRecordTypmodRegistry.
+ * This exists only to avoid exposing private innards of
+ * SharedRecordTypmodRegistry in a header.
+ */
+size_t
+SharedRecordTypmodRegistryEstimate(void)
+{
+ return sizeof(SharedRecordTypmodRegistry);
+}
+
+/*
+ * Initialize 'registry' in a pre-existing shared memory region, which must be
+ * maximally aligned and have space for SharedRecordTypmodRegistryEstimate()
+ * bytes.
+ *
+ * 'area' will be used to allocate shared memory space as required for the
+ * typemod registration. The current process, expected to be a leader process
+ * in a parallel query, will be attached automatically and its current record
+ * types will be loaded into the *registry. While attached, all calls to
+ * assign_record_type_typmod will use the shared registry. Other backends
+ * will need to attach explicitly.
+ *
+ * An on-detach callback will be installed for 'segment', so that normal
+ * private record type cache behavior can be restored when the DSM segment
+ * goes away.
+ */
+void
+SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry,
+ dsa_area *area,
+ dsm_segment *segment)
+{
+ dht_hash_table *atts_index;
+ dht_hash_table *typmod_index;
+ int32 typmod;
+
+ /* We can't already be attached to a shared registry. */
+ Assert(CurrentSharedRecordTypmodRegistry.shared == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.area == NULL);
+
+ /* Create the hash table indexed by attribute OIDs. */
+ atts_index = dht_create(area, &srtr_atts_index_params);
+ registry->atts_index_handle = dht_get_hash_table_handle(atts_index);
+
+ /* Create the hash table indexed by typmod. */
+ typmod_index = dht_create(area, &srtr_typmod_index_params);
+ registry->typmod_index_handle = dht_get_hash_table_handle(typmod_index);
+
+ /* Initialize the 'next typmode' to this backend's next value. */
+ pg_atomic_init_u32(®istry->next_typmod, NextRecordTypmod);
+
+ /*
+ * Copy all entries from this backend's private registry into the shared
+ * registry.
+ */
+ for (typmod = 0; typmod < NextRecordTypmod; ++typmod)
+ {
+ SRTRTypmodIndexEntry *typmod_index_entry;
+ SRTRAttsIndexEntry *atts_index_entry;
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ TupleDesc tupdesc;
+ Oid atts_key[REC_HASH_KEYS];
+ bool found;
+ int i;
+
+ tupdesc = RecordCacheArray[typmod];
+ if (tupdesc == NULL)
+ continue;
+
+ /* Serialize the TupleDesc into shared memory. */
+ serialized_dp = serialize_tupledesc(area, tupdesc);
+
+ /* Insert into the typmod index. */
+ typmod_index_entry = dht_find_or_insert(typmod_index,
+ &tupdesc->tdtypmod,
+ &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ typmod_index_entry->typmod = tupdesc->tdtypmod;
+ typmod_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(typmod_index, typmod_index_entry);
+
+ /* Insert into the attributes index. */
+ memset(atts_key, 0, sizeof(atts_key));
+ for (i = 0; i < Min(tupdesc->natts, REC_HASH_KEYS); ++i)
+ atts_key[i] = tupdesc->attrs[i]->atttypid;
+ atts_index_entry = dht_find_or_insert(atts_index, &atts_key, &found);
+ if (!found)
+ {
+ memcpy(atts_index_entry->leading_attr_oids,
+ atts_key,
+ sizeof(atts_key));
+ atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+ }
+
+ /* Push onto list. */
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(area, serialized_dp);
+ serialized->next = atts_index_entry->serialized_tupdesc;
+ atts_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(atts_index, atts_index_entry);
+ }
+
+ /* Set up our detach hook so that we can return to private cache mode. */
+ on_dsm_detach(segment, shared_record_typmod_registry_detach,
+ PointerGetDatum(registry));
+
+ /*
+ * Set up the global state that will tell assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal about the shared registry.
+ */
+ CurrentSharedRecordTypmodRegistry.shared = registry;
+ CurrentSharedRecordTypmodRegistry.atts_index = atts_index;
+ CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index;
+ CurrentSharedRecordTypmodRegistry.area = area;
+}
+
+/*
+ * Attach to 'registry', which must have been initialized already by another
+ * backend. Future calls to assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal will use the shared registry, until
+ * 'segment' is detached.
+ */
+void
+SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry,
+ dsa_area *area,
+ dsm_segment *segment)
+{
+ dht_hash_table *atts_index;
+ dht_hash_table *typmod_index;
+
+ /* We can't already be attached to a shared registry. */
+ Assert(CurrentSharedRecordTypmodRegistry.shared == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.atts_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.typmod_index == NULL);
+ Assert(CurrentSharedRecordTypmodRegistry.area == NULL);
+
+ /*
+ * We can't already have typmods in our local cache, because they'd clash
+ * with those imported by SharedRecordTypmodRegistryInit. This should be a
+ * freshly started parallel worker. If we ever support worker recycling,
+ * a worker would need to zap its local cache in between servicing
+ * different queries, in order to be able to call this and synchronize
+ * typmods with a new leader.
+ */
+ Assert(NextRecordTypmod == 0);
+
+ /* Attach to the two hash tables. */
+ atts_index = dht_attach(area, &srtr_atts_index_params,
+ registry->atts_index_handle);
+ typmod_index = dht_attach(area, &srtr_typmod_index_params,
+ registry->typmod_index_handle);
+
+ /* Set up our detach hook so that we can return to private cache mode. */
+ on_dsm_detach(segment, shared_record_typmod_registry_detach,
+ PointerGetDatum(registry));
+
+ /*
+ * Set up the global state that will tell assign_record_type_typmod and
+ * lookup_rowtype_tupdesc_internal about the shared registry.
+ */
+ CurrentSharedRecordTypmodRegistry.shared = registry;
+ CurrentSharedRecordTypmodRegistry.atts_index = atts_index;
+ CurrentSharedRecordTypmodRegistry.typmod_index = typmod_index;
+ CurrentSharedRecordTypmodRegistry.area = area;
+}
+
+/*
* TypeCacheRelCallback
* Relcache inval callback function
*
@@ -1809,3 +2128,225 @@ enum_oid_cmp(const void *left, const void *right)
else
return 0;
}
+
+/*
+ * Serialize a TupleDesc into a SerializedTupleDesc in DSA area 'area', and
+ * return a dsa_pointer.
+ */
+static dsa_pointer
+serialize_tupledesc(dsa_area *area, const TupleDesc tupdesc)
+{
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ size_t size;
+ int i;
+
+ size = offsetof(SerializedTupleDesc, attributes) +
+ sizeof(FormData_pg_attribute) * tupdesc->natts;
+ serialized_dp = dsa_allocate(area, size);
+ serialized = (SerializedTupleDesc *) dsa_get_address(area, serialized_dp);
+
+ serialized->natts = tupdesc->natts;
+ serialized->typmod = tupdesc->tdtypmod;
+ serialized->hasoid = tupdesc->tdhasoid;
+ for (i = 0; i < tupdesc->natts; ++i)
+ memcpy(&serialized->attributes[i], tupdesc->attrs[i],
+ ATTRIBUTE_FIXED_PART_SIZE);
+
+ return serialized_dp;
+}
+
+/*
+ * Deserialize a SerializedTupleDesc to produce a TupleDesc. The result is
+ * allocated in CacheMemoryContext and has a refcount of 1.
+ */
+static TupleDesc
+deserialize_tupledesc(const SerializedTupleDesc *serialized)
+{
+ Form_pg_attribute *attributes;
+ MemoryContext oldctxt;
+ TupleDesc tupdesc;
+ int i;
+
+ /*
+ * We have an array of FormData_pg_attribute but we need an array of
+ * pointers to FormData_pg_attribute.
+ */
+ oldctxt = MemoryContextSwitchTo(CacheMemoryContext);
+ attributes = palloc(sizeof(Form_pg_attribute) * serialized->natts);
+ for (i = 0; i < serialized->natts; ++i)
+ {
+ attributes[i] = palloc(ATTRIBUTE_FIXED_PART_SIZE);
+ memcpy(attributes[i], &serialized->attributes[i],
+ ATTRIBUTE_FIXED_PART_SIZE);
+ }
+ tupdesc =
+ CreateTupleDesc(serialized->natts, serialized->hasoid, attributes);
+ tupdesc->tdtypmod = serialized->typmod;
+ tupdesc->tdrefcount = 1;
+ MemoryContextSwitchTo(oldctxt);
+
+ return tupdesc;
+}
+
+/*
+ * We can't use equalTupleDescs to compare a SerializedTupleDesc with a
+ * TupleDesc, but we don't want to allocate memory just to compare. This
+ * function produces the same result without deserializing first.
+ */
+static bool
+serialized_tupledesc_matches(SerializedTupleDesc *serialized,
+ TupleDesc tupdesc)
+{
+ int i;
+
+ if (serialized->natts != tupdesc->natts ||
+ serialized->hasoid != tupdesc->tdhasoid ||
+ tupdesc->constr != NULL)
+ return false;
+
+ for (i = 0; i < serialized->natts; ++i)
+ {
+ if (!equalTupleDescAttrs(&serialized->attributes[i],
+ tupdesc->attrs[i]))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * If we are attached to a SharedRecordTypmodRegistry, find or create a
+ * SerializedTupleDesc that matches 'tupdesc', and return its typmod.
+ * Otherwise return -1.
+ */
+static int32
+find_or_allocate_shared_record_typmod(TupleDesc tupdesc)
+{
+ SRTRAttsIndexEntry *atts_index_entry;
+ SRTRTypmodIndexEntry *typmod_index_entry;
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ Oid hashkey[REC_HASH_KEYS];
+ bool found;
+ int32 typmod;
+ int i;
+
+ /* If not even attached, nothing to do. */
+ if (CurrentSharedRecordTypmodRegistry.shared == NULL)
+ return -1;
+
+ /* Try to find a match. */
+ memset(hashkey, 0, sizeof(hashkey));
+ for (i = 0; i < tupdesc->natts; ++i)
+ hashkey[i] = tupdesc->attrs[i]->atttypid;
+ atts_index_entry = (SRTRAttsIndexEntry *)
+ dht_find_or_insert(CurrentSharedRecordTypmodRegistry.atts_index,
+ hashkey,
+ &found);
+ if (!found)
+ {
+ /* Making a new entry. */
+ memcpy(atts_index_entry->leading_attr_oids,
+ hashkey,
+ sizeof(hashkey));
+ atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+ }
+
+ /* Scan the list we found for a matching serialized one. */
+ serialized_dp = atts_index_entry->serialized_tupdesc;
+ while (DsaPointerIsValid(serialized_dp))
+ {
+ serialized =
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+ serialized_dp);
+ if (serialized_tupledesc_matches(serialized, tupdesc))
+ {
+ /* Found a match, we are finished. */
+ typmod = serialized->typmod;
+ dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+ atts_index_entry);
+ return typmod;
+ }
+ serialized_dp = serialized->next;
+ }
+
+ /* We didn't find a matching entry, so let's allocate a new one. */
+ typmod = (int)
+ pg_atomic_fetch_add_u32(&CurrentSharedRecordTypmodRegistry.shared->next_typmod,
+ 1);
+
+ /* Allocate shared memory and serialize the TupleDesc. */
+ serialized_dp = serialize_tupledesc(CurrentSharedRecordTypmodRegistry.area,
+ tupdesc);
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area, serialized_dp);
+ serialized->typmod = typmod;
+
+ /*
+ * While we still hold the atts_index entry locked, add this to
+ * typmod_index. That's important because we don't want anyone to be able
+ * to find a typmod via the former that can't yet be looked up in the
+ * latter.
+ */
+ typmod_index_entry =
+ dht_find_or_insert(CurrentSharedRecordTypmodRegistry.typmod_index,
+ &typmod, &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ typmod_index_entry->typmod = typmod;
+ typmod_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+ typmod_index_entry);
+
+ /* Push onto the front of list in atts_index_entry. */
+ serialized->next = atts_index_entry->serialized_tupdesc;
+ atts_index_entry->serialized_tupdesc = serialized_dp;
+
+ dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+ atts_index_entry);
+
+ return typmod;
+}
+
+/*
+ * DSM segment detach hook used to disconnect this backend's record typmod
+ * cache from the shared registry. Detaching from the
+ * SharedRecordTypmodRegistry returns this backend to local typmod cache mode
+ * until such time as another parallel query runs.
+ */
+static void
+shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum)
+{
+ SharedRecordTypmodRegistry *shared;
+
+ shared = (SharedRecordTypmodRegistry *) DatumGetPointer(datum);
+
+ /*
+ * XXX Should we now copy all entries from shared memory into the
+ * backend's local cache? That depends on whether you think that there is
+ * any chance this backend could see any shared tuples created by other
+ * backends after this detach operation. If tuples somehow survived from
+ * query to query, that would be true. But presently I don't think they
+ * do, if we assume that all mechanisms that allow us to receive tuples
+ * from other backends are linked to DSM segment mapping lifetime (tuple
+ * queues, shared hash tables, shared temporary files).
+ *
+ * The only thing we need to synchronize to return to local-typmod-cache
+ * mode is NextRecordTypmod. That means that we can resume generating new
+ * backend-local entries that don't clash.
+ */
+ NextRecordTypmod = pg_atomic_read_u32(&shared->next_typmod);
+
+ /*
+ * We don't free the SharedRecordTypmodRegistry's DSM memory, though we
+ * could using a reference counting scheme if we wanted to. There doesn't
+ * seem to be any point because the whole DSA area will be going away
+ * automatically when the DSM segment containing it is destroyed,
+ * conceptually like a MemoryContext.
+ */
+ CurrentSharedRecordTypmodRegistry.shared = NULL;
+ CurrentSharedRecordTypmodRegistry.atts_index = NULL;
+ CurrentSharedRecordTypmodRegistry.typmod_index = NULL;
+ CurrentSharedRecordTypmodRegistry.area = NULL;
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 590e27a4845..b5781c28693 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -19,6 +19,7 @@
#include "postmaster/bgworker.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
+#include "utils/dsa.h"
typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
@@ -40,6 +41,7 @@ typedef struct ParallelContext
ErrorContextCallback *error_context_stack;
shm_toc_estimator estimator;
dsm_segment *seg;
+ dsa_area *context_dsa;
void *private_memory;
shm_toc *toc;
ParallelWorkerInfo *worker;
diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h
index b48f839028b..97a73b1483e 100644
--- a/src/include/access/tupdesc.h
+++ b/src/include/access/tupdesc.h
@@ -110,6 +110,9 @@ extern void DecrTupleDescRefCount(TupleDesc tupdesc);
DecrTupleDescRefCount(tupdesc); \
} while (0)
+extern bool equalTupleDescAttrs(Form_pg_attribute attr1,
+ Form_pg_attribute attr2);
+
extern bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
extern void TupleDescInitEntry(TupleDesc desc,
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 0cd45bb6d8e..c0117f7bafd 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -212,7 +212,10 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_QUERY_DSA,
+ LWTRANCHE_PARALLEL_CONTEXT_DSA,
LWTRANCHE_TBM,
+ LWTRANCHE_SHARED_RECORD_ATTS_INDEX,
+ LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 1bf94e2548d..861610a2835 100644
--- a/src/include/utils/typcache.h
+++ b/src/include/utils/typcache.h
@@ -18,6 +18,8 @@
#include "access/tupdesc.h"
#include "fmgr.h"
+#include "storage/dsm.h"
+#include "utils/dsa.h"
/* DomainConstraintCache is an opaque struct known only within typcache.c */
@@ -139,6 +141,7 @@ typedef struct DomainConstraintRef
MemoryContextCallback callback; /* used to release refcount when done */
} DomainConstraintRef;
+typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry;
extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags);
@@ -160,4 +163,12 @@ extern void assign_record_type_typmod(TupleDesc tupDesc);
extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
+extern size_t SharedRecordTypmodRegistryEstimate(void);
+extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *,
+ dsa_area *area,
+ dsm_segment *seg);
+extern void SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *,
+ dsa_area *area,
+ dsm_segment *seg);
+
#endif /* TYPCACHE_H */
On Tue, May 30, 2017 at 1:09 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:
* Perhaps simplehash + an LWLock would be better than dht, but I
haven't looked into that. Can it be convinced to work in DSA memory
and to grow on demand?
Simplehash provides an option to provide your own allocator function
to it. So in the allocator function, you can allocate memory from DSA.
After it reaches some threshold it expands the size (double) and it
will again call the allocator function to allocate the bigger memory.
You can refer pagetable_allocate in tidbitmap.c.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, May 30, 2017 at 2:45 AM, Dilip Kumar <dilipbalaut@gmail.com> wrote:
On Tue, May 30, 2017 at 1:09 AM, Thomas Munro
<thomas.munro@enterprisedb.com> wrote:* Perhaps simplehash + an LWLock would be better than dht, but I
haven't looked into that. Can it be convinced to work in DSA memory
and to grow on demand?Simplehash provides an option to provide your own allocator function
to it. So in the allocator function, you can allocate memory from DSA.
After it reaches some threshold it expands the size (double) and it
will again call the allocator function to allocate the bigger memory.
You can refer pagetable_allocate in tidbitmap.c.
That only allows the pagetable to be shared, not the hash table itself.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 31, 2017 at 10:57 AM, Robert Haas <robertmhaas@gmail.com> wrote:
Simplehash provides an option to provide your own allocator function
to it. So in the allocator function, you can allocate memory from DSA.
After it reaches some threshold it expands the size (double) and it
will again call the allocator function to allocate the bigger memory.
You can refer pagetable_allocate in tidbitmap.c.That only allows the pagetable to be shared, not the hash table itself.
I agree with you. But, if I understand the use case correctly we need
to store the TupleDesc for the RECORD in shared hash so that it can be
shared across multiple processes. I think this can be achieved with
the simplehash as well.
For getting this done, we need some fixed shared memory for holding
static members of SH_TYPE and the process which creates the simplehash
will be responsible for copying these static members to the shared
location so that other processes can access the SH_TYPE. And, the
dynamic part (the actual hash entries) can be allocated using DSA by
registering SH_ALLOCATE function.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 31, 2017 at 11:16 AM, Dilip Kumar <dilipbalaut@gmail.com> wrote:
I agree with you. But, if I understand the use case correctly we need
to store the TupleDesc for the RECORD in shared hash so that it can be
shared across multiple processes. I think this can be achieved with
the simplehash as well.For getting this done, we need some fixed shared memory for holding
static members of SH_TYPE and the process which creates the simplehash
will be responsible for copying these static members to the shared
location so that other processes can access the SH_TYPE. And, the
dynamic part (the actual hash entries) can be allocated using DSA by
registering SH_ALLOCATE function.
Well, SH_TYPE's members SH_ELEMENT_TYPE *data and void *private_data
are not going to work in DSM, because they are pointers. You can
doubtless come up with a way around that problem, but I guess the
question is whether that's actually any better than just using DHT.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 31, 2017 at 12:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Well, SH_TYPE's members SH_ELEMENT_TYPE *data and void *private_data
are not going to work in DSM, because they are pointers. You can
doubtless come up with a way around that problem, but I guess the
question is whether that's actually any better than just using DHT.
Probably I misunderstood the question. I assumed that we need to bring
in DHT only for achieving this goal. But, if the question is simply
the comparison of DHT vs simplehash for this particular case then I
agree that DHT is a more appropriate choice.
--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On 2017-05-31 13:27:28 -0400, Dilip Kumar wrote:
On Wed, May 31, 2017 at 12:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Well, SH_TYPE's members SH_ELEMENT_TYPE *data and void *private_data
are not going to work in DSM, because they are pointers. You can
doubtless come up with a way around that problem, but I guess the
question is whether that's actually any better than just using DHT.Probably I misunderstood the question. I assumed that we need to bring
in DHT only for achieving this goal. But, if the question is simply
the comparison of DHT vs simplehash for this particular case then I
agree that DHT is a more appropriate choice.
Yea, I don't think simplehash is the best choice here. It's worthwhile
to use it for performance critical bits, but using it for everything
would just increase code size without much benefit. I'd tentatively
assume that anonymous record type aren't going to be super common, and
that this is going to be the biggest bottleneck if you use them.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Wed, May 31, 2017 at 1:46 PM, Andres Freund <andres@anarazel.de> wrote:
On 2017-05-31 13:27:28 -0400, Dilip Kumar wrote:
On Wed, May 31, 2017 at 12:53 PM, Robert Haas <robertmhaas@gmail.com> wrote:
Well, SH_TYPE's members SH_ELEMENT_TYPE *data and void *private_data
are not going to work in DSM, because they are pointers. You can
doubtless come up with a way around that problem, but I guess the
question is whether that's actually any better than just using DHT.Probably I misunderstood the question. I assumed that we need to bring
in DHT only for achieving this goal. But, if the question is simply
the comparison of DHT vs simplehash for this particular case then I
agree that DHT is a more appropriate choice.Yea, I don't think simplehash is the best choice here. It's worthwhile
to use it for performance critical bits, but using it for everything
would just increase code size without much benefit. I'd tentatively
assume that anonymous record type aren't going to be super common, and
that this is going to be the biggest bottleneck if you use them.
Did you mean "not going to be"?
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On May 31, 2017 11:28:18 AM PDT, Robert Haas <robertmhaas@gmail.com> wrote:
On Wed, May 31, 2017 at 1:46 PM, Andres Freund <andres@anarazel.de>
wrote:On 2017-05-31 13:27:28 -0400, Dilip Kumar wrote:
On Wed, May 31, 2017 at 12:53 PM, Robert Haas
<robertmhaas@gmail.com> wrote:
Well, SH_TYPE's members SH_ELEMENT_TYPE *data and void
*private_data
are not going to work in DSM, because they are pointers. You can
doubtless come up with a way around that problem, but I guess the
question is whether that's actually any better than just usingDHT.
Probably I misunderstood the question. I assumed that we need to
bring
in DHT only for achieving this goal. But, if the question is simply
the comparison of DHT vs simplehash for this particular case then I
agree that DHT is a more appropriate choice.Yea, I don't think simplehash is the best choice here. It's
worthwhile
to use it for performance critical bits, but using it for everything
would just increase code size without much benefit. I'd tentatively
assume that anonymous record type aren't going to be super common,and
that this is going to be the biggest bottleneck if you use them.
Did you mean "not going to be"?
Err, yes. Thanks
--
Sent from my Android device with K-9 Mail. Please excuse my brevity.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Thu, Jun 1, 2017 at 6:29 AM, Andres Freund <andres@anarazel.de> wrote:
On May 31, 2017 11:28:18 AM PDT, Robert Haas <robertmhaas@gmail.com> wrote:
On 2017-05-31 13:27:28 -0400, Dilip Kumar wrote:
[ ... various discussion in support of using DHT ... ]
Ok, good.
Here's a new version that introduces a per-session DSM segment to hold
the shared record typmod registry (and maybe more things later). The
per-session segment is created the first time you run a parallel query
(though there is handling for failure to allocate that allows the
parallel query to continue with no workers) and lives until your
leader backend exits. When parallel workers start up, they see its
handle in the per-query segment and attach to it, which puts
typcache.c into write-through cache mode so their idea of record
typmods stays in sync with the leader (and each other).
I also noticed that I could delete even more of tqueue.c than before:
it doesn't seem to have any remaining reason to need to know the
TupleDesc.
One way to test this code is to apply just
0003-rip-out-tqueue-remapping-v3.patch and then try the example from
the first message in this thread to see it break, and then try again
with the other two patches applied. By adding debugging trace you can
see that the worker pushes a bunch of TupleDescs into shmem, they get
pulled out by the leader when it sees the tuples, and then on a second
invocation the (new) worker can reuse them: it finds matches already
in shmem from the first invocation.
I used a DSM segment with a TOC and a DSA area inside that, like the
existing per-query DSM segment, but obviously you could spin it
various different ways. One example: just have a DSA area and make a
new kind of TOC thing that deals in dsa_pointers. Better ideas?
I believe combo CIDs should also go in there, to enable parallel
write, but I'm not 100% sure: that's neither per-session nor per-query
data, that's per-transaction. So perhaps the per-session DSM could
hold a per-session DSA and a per-transaction DSA, where the latter is
reset for each transaction, just like TopTransactionContext (though
dsa.c doesn't have a 'reset thyself' function currently). That seems
like a good place to store a shared combo CID hash table using DHT.
Thoughts?
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
On 2017-07-10 21:39:09 +1200, Thomas Munro wrote:
Here's a new version that introduces a per-session DSM segment to hold
the shared record typmod registry (and maybe more things later).
You like to switch it up. *.patchset.tgz??? ;)
It does concern me that we're growing yet another somewhat different
hashtable implementation. Yet I don't quite see how we could avoid
it. dynahash relies on proper pointers, simplehash doesn't do locking
(and shouldn't) and also relies on pointers, although to a much lesser
degree. All the open coded tables aren't a good match either. So I
don't quite see an alternative, but I'd love one.
Regards,
Andres
diff --git a/src/backend/lib/dht.c b/src/backend/lib/dht.c
new file mode 100644
index 00000000000..2fec70f7742
--- /dev/null
+++ b/src/backend/lib/dht.c
FWIW, not a big fan of dht as a filename (nor of dsa.c). For one DHT
usually refers to distributed hash tables, which this is not, and for
another the abbreviation is so short it's not immediately
understandable, and likely to conflict further. I think it'd possibly
ok to have dht as symbol prefixes, but rename the file to be longer.
+ * To deal with currency, it has a fixed size set of partitions, each of which
+ * is independently locked.
s/currency/concurrency/ I presume.
+ * Each bucket maps to a partition; so insert, find
+ * and iterate operations normally only acquire one lock. Therefore, good
+ * concurrency is achieved whenever they don't collide at the lock partition
s/they/operations/?
+ * level. However, when a resize operation begins, all partition locks must
+ * be acquired simultaneously for a brief period. This is only expected to
+ * happen a small number of times until a stable size is found, since growth is
+ * geometric.
I'm a bit doubtful that we need partitioning at this point, and that it
doesn't actually *degrade* performance for your typmod case.
+ * Resizing is done incrementally so that no individual insert operation pays
+ * for the potentially large cost of splitting all buckets.
I'm not sure this is a reasonable tradeoff for the use-case suggested so
far, it doesn't exactly make things simpler. We're not going to grow
much.
+/* The opaque type used for tracking iterator state. */
+struct dht_iterator;
+typedef struct dht_iterator dht_iterator;
Isn't it actually the iterator state? Rather than tracking it? Also, why
is it opaque given you're actually defining it below? Guess you'd moved
it at some point.
+/*
+ * The set of parameters needed to create or attach to a hash table. The
+ * members tranche_id and tranche_name do not need to be initialized when
+ * attaching to an existing hash table.
+ */
+typedef struct
+{
+ Size key_size; /* Size of the key (initial bytes of entry) */
+ Size entry_size; /* Total size of entry */
Let's use size_t, like we kind of concluded in the thread you started:
http://archives.postgresql.org/message-id/25076.1489699457%40sss.pgh.pa.us
:)
+ dht_compare_function compare_function; /* Compare function */
+ dht_hash_function hash_function; /* Hash function */
Might be worth explaining that these need to provided when attaching
because they're possibly process local. Did you test this with
EXEC_BACKEND?
+ int tranche_id; /* The tranche ID to use for locks. */
+} dht_parameters;
+struct dht_iterator
+{
+ dht_hash_table *hash_table; /* The hash table we are iterating over. */
+ bool exclusive; /* Whether to lock buckets exclusively. */
+ Size partition; /* The index of the next partition to visit. */
+ Size bucket; /* The index of the next bucket to visit. */
+ dht_hash_table_item *item; /* The most recently returned item. */
+ dsa_pointer last_item_pointer; /* The last item visited. */
+ Size table_size_log2; /* The table size when we started iterating. */
+ bool locked; /* Whether the current partition is locked. */
Haven't gotten to the actual code yet, but this kinda suggest we leave a
partition locked when iterating? Hm, that seems likely to result in a
fair bit of pain...
+/* Iterating over the whole hash table. */
+extern void dht_iterate_begin(dht_hash_table *hash_table,
+ dht_iterator *iterator, bool exclusive);
+extern void *dht_iterate_next(dht_iterator *iterator);
+extern void dht_iterate_delete(dht_iterator *iterator);
s/delete/delete_current/? Otherwise it looks like it's part of
manipulating just the iterator.
+extern void dht_iterate_release(dht_iterator *iterator);
I'd add lock to to the name.
+/*
+ * An item in the hash table. This wraps the user's entry object in an
+ * envelop that holds a pointer back to the bucket and a pointer to the next
+ * item in the bucket.
+ */
+struct dht_hash_table_item
+{
+ /* The hashed key, to avoid having to recompute it. */
+ dht_hash hash;
+ /* The next item in the same bucket. */
+ dsa_pointer next;
+ /* The user's entry object follows here. */
+ char entry[FLEXIBLE_ARRAY_MEMBER];
What's the point of using FLEXIBLE_ARRAY_MEMBER here? And isn't using a
char going to lead to alignment problems?
+/* The number of partitions for locking purposes. */
+#define DHT_NUM_PARTITIONS_LOG2 7
Could use some justification.
+/*
+ * The head object for a hash table. This will be stored in dynamic shared
+ * memory.
+ */
+typedef struct
+{
Why anonymous? Not that it hurts much, but seems weird to deviate just
here.
+/*
+ * Create a new hash table backed by the given dynamic shared area, with the
+ * given parameters.
+ */
+dht_hash_table *
+dht_create(dsa_area *area, const dht_parameters *params)
+{
+ dht_hash_table *hash_table;
+ dsa_pointer control;
+
+ /* Allocate the backend-local object representing the hash table. */
+ hash_table = palloc(sizeof(dht_hash_table));
Should be documented that this uses caller's MemoryContext.
+ /* Set up the array of lock partitions. */
+ {
+ int i;
+
+ for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+ {
+ LWLockInitialize(PARTITION_LOCK(hash_table, i),
+ hash_table->control->lwlock_tranche_id);
+ hash_table->control->partitions[i].count = 0;
+ }
I'd store hash_table->control->lwlock_tranche_id and partitions[i] in
local vars. Possibly hash_table->control too.
+/*
+ * Detach from a hash table. This frees backend-local resources associated
+ * with the hash table, but the hash table will continue to exist until it is
+ * either explicitly destroyed (by a backend that is still attached to it), or
+ * the area that backs it is returned to the operating system.
+ */
+void
+dht_detach(dht_hash_table *hash_table)
+{
+ /* The hash table may have been destroyed. Just free local memory. */
+ pfree(hash_table);
+}
Somewhat inclined to add debugging refcount - seems like bugs around
that might be annoying to find. Maybe also add an assert ensuring that
no locks are held?
+/*
+ * Look up an entry, given a key. Returns a pointer to an entry if one can be
+ * found with the given key. Returns NULL if the key is not found. If a
+ * non-NULL value is returned, the entry is locked and must be released by
+ * calling dht_release. If an error is raised before dht_release is called,
+ * the lock will be released automatically, but the caller must take care to
+ * ensure that the entry is not left corrupted. The lock mode is either
+ * shared or exclusive depending on 'exclusive'.
This API seems a bit fragile.
+/*
+ * Returns a pointer to an exclusively locked item which must be released with
+ * dht_release. If the key is found in the hash table, 'found' is set to true
+ * and a pointer to the existing entry is returned. If the key is not found,
+ * 'found' is set to false, and a pointer to a newly created entry is related.
"is related"?
+ */
+void *
+dht_find_or_insert(dht_hash_table *hash_table,
+ const void *key,
+ bool *found)
+{
+ size_t hash;
+ size_t partition_index;
+ dht_partition *partition;
+ dht_hash_table_item *item;
+
+ hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+ partition_index = PARTITION_FOR_HASH(hash);
+ partition = &hash_table->control->partitions[partition_index];
+
+ Assert(hash_table->control->magic == DHT_MAGIC);
+ Assert(!hash_table->exclusively_locked);
Why just exclusively locked? Why'd shared be ok?
+/*
+ * Unlock an entry which was locked by dht_find or dht_find_or_insert.
+ */
+void
+dht_release(dht_hash_table *hash_table, void *entry)
+{
+ dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+ size_t partition_index = PARTITION_FOR_HASH(item->hash);
+ bool deferred_resize_work = false;
+
+ Assert(hash_table->control->magic == DHT_MAGIC);
Assert lock held (LWLockHeldByMe())
+/*
+ * Begin iterating through the whole hash table. The caller must supply a
+ * dht_iterator object, which can then be used to call dht_iterate_next to get
+ * values until the end is reached.
+ */
+void
+dht_iterate_begin(dht_hash_table *hash_table,
+ dht_iterator *iterator,
+ bool exclusive)
+{
+ Assert(hash_table->control->magic == DHT_MAGIC);
+
+ iterator->hash_table = hash_table;
+ iterator->exclusive = exclusive;
+ iterator->partition = 0;
+ iterator->bucket = 0;
+ iterator->item = NULL;
+ iterator->last_item_pointer = InvalidDsaPointer;
+ iterator->locked = false;
+
+ /* Snapshot the size (arbitrary lock to prevent size changing). */
+ LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED);
+ ensure_valid_bucket_pointers(hash_table);
+ iterator->table_size_log2 = hash_table->size_log2;
+ LWLockRelease(PARTITION_LOCK(hash_table, 0));
Hm. So we're introducing some additional contention on partition 0 -
probably ok.
+/*
+ * Move to the next item in the hash table. Returns a pointer to an entry, or
+ * NULL if the end of the hash table has been reached. The item is locked in
+ * exclusive or shared mode depending on the argument given to
+ * dht_iterate_begin. The caller can optionally release the lock by calling
+ * dht_iterate_release, and then call dht_iterate_next again to move to the
+ * next entry. If the iteration is in exclusive mode, client code can also
+ * call dht_iterate_delete. When the end of the hash table is reached, or at
+ * any time, the client may call dht_iterate_end to abandon iteration.
+ */
I'd just shorten the end to "at any time the client may call
dht_iterate_end to ..."
+/*
+ * Release the most recently obtained lock. This can optionally be called in
+ * between calls to dht_iterator_next to allow other processes to access the
+ * same partition of the hash table.
+ */
+void
+dht_iterate_release(dht_iterator *iterator)
+{
+ Assert(iterator->locked);
+ LWLockRelease(PARTITION_LOCK(iterator->hash_table, iterator->partition));
+ iterator->locked = false;
+}
+
+/*
+ * Terminate iteration. This must be called after iteration completes,
+ * whether or not the end was reached. The iterator object may then be reused
+ * for another iteration.
+ */
+void
+dht_iterate_end(dht_iterator *iterator)
+{
+ Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+ if (iterator->locked)
+ LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+ iterator->partition));
+}
+
+/*
+ * Print out debugging information about the internal state of the hash table.
+ */
+void
+dht_dump(dht_hash_table *hash_table)
+{
+ size_t i;
+ size_t j;
+
+ Assert(hash_table->control->magic == DHT_MAGIC);
+
+ for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+ LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
Should probably assert & document that no locks are held - otherwise
there's going to be ugly deadlocks. And that's an unlikely thing to try.
+ ensure_valid_bucket_pointers(hash_table);
+
+ fprintf(stderr,
+ "hash table size = %zu\n", (size_t) 1 << hash_table->size_log2);
+ for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+ {
+ dht_partition *partition = &hash_table->control->partitions[i];
+ size_t begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2);
+ size_t end = BUCKET_INDEX_FOR_PARTITION(i + 1, hash_table->size_log2);
+
+ fprintf(stderr, " partition %zu\n", i);
+ fprintf(stderr,
+ " active buckets (key count = %zu)\n", partition->count);
+
+ for (j = begin; j < end; ++j)
+ {
+ size_t count = 0;
+ dsa_pointer bucket = hash_table->buckets[j];
+
+ while (DsaPointerIsValid(bucket))
+ {
+ dht_hash_table_item *item;
+
+ item = dsa_get_address(hash_table->area, bucket);
+
+ bucket = item->next;
+ ++count;
+ }
+ fprintf(stderr, " bucket %zu (key count = %zu)\n", j, count);
+ }
+ if (RESIZE_IN_PROGRESS(hash_table))
+ {
+ size_t begin;
+ size_t end;
+
+ begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2 - 1);
+ end = BUCKET_INDEX_FOR_PARTITION(i + 1,
+ hash_table->size_log2 - 1);
+
+ fprintf(stderr, " old buckets (key count = %zu)\n",
+ partition->old_count);
+
+ for (j = begin; j < end; ++j)
+ {
+ size_t count = 0;
+ dsa_pointer bucket = hash_table->old_buckets[j];
+
+ while (DsaPointerIsValid(bucket))
+ {
+ dht_hash_table_item *item;
+
+ item = dsa_get_address(hash_table->area, bucket);
+
+ bucket = item->next;
+ ++count;
+ }
+ fprintf(stderr,
+ " bucket %zu (key count = %zu)\n", j, count);
+ }
+ }
+ }
+
+ for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+ LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
I'd put this below actual "production" code.
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index 9fd7b4e019b..97c0125a4ba 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -337,17 +337,75 @@ DecrTupleDescRefCount(TupleDesc tupdesc)
{
Assert(tupdesc->tdrefcount > 0);
- ResourceOwnerForgetTupleDesc(CurrentResourceOwner, tupdesc);
+ if (CurrentResourceOwner != NULL)
+ ResourceOwnerForgetTupleDesc(CurrentResourceOwner, tupdesc);
if (--tupdesc->tdrefcount == 0)
FreeTupleDesc(tupdesc);
}
What's this about? CurrentResourceOwner should always be valid here, no?
If so, why did that change? I don't think it's good to detach this from
the resowner infrastructure...
/*
- * Compare two TupleDesc structures for logical equality
+ * Compare two TupleDescs' attributes for logical equality
*
* Note: we deliberately do not check the attrelid and tdtypmod fields.
* This allows typcache.c to use this routine to see if a cached record type
* matches a requested type, and is harmless for relcache.c's uses.
+ */
+bool
+equalTupleDescAttrs(Form_pg_attribute attr1, Form_pg_attribute attr2)
+{
comment not really accurate, this routine afaik isn't used by
typcache.c?
/*
- * Magic numbers for parallel state sharing. Higher-level code should use
- * smaller values, leaving these very large ones for use by this module.
+ * Magic numbers for per-context parallel state sharing. Higher-level code
+ * should use smaller values, leaving these very large ones for use by this
+ * module.
*/
#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
@@ -63,6 +74,16 @@
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
+#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
+
+/* Magic number for per-session DSM TOC. */
+#define PARALLEL_SESSION_MAGIC 0xabb0fbc9
+
+/*
+ * Magic numbers for parallel state sharing in the per-session DSM area.
+ */
+#define PARALLEL_KEY_SESSION_DSA UINT64CONST(0xFFFFFFFFFFFF0001)
+#define PARALLEL_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002)
Not this patch's fault, but this infrastructure really isn't great. We
should really replace it with a shmem.h style infrastructure, using a
dht hashtable as backing...
+/* The current per-session DSM segment, if attached. */
+static dsm_segment *current_session_segment = NULL;
+
I think it'd be better if we had a proper 'SessionState' and
'BackendSessionState' infrastructure that then contains the dsm segment
etc. I think we'll otherwise just end up with a bunch of parallel
infrastructures.
+/*
+ * A mechanism for sharing record typmods between backends.
+ */
+struct SharedRecordTypmodRegistry
+{
+ dht_hash_table_handle atts_index_handle;
+ dht_hash_table_handle typmod_index_handle;
+ pg_atomic_uint32 next_typmod;
+};
+
I think the code needs to explain better how these are intended to be
used. IIUC, atts_index is used to find typmods by "identity", and
typmod_index by the typmod, right? And we need both to avoid
all workers generating different tupledescs, right? Kinda guessable by
reading typecache.c, but that shouldn't be needed.
+/*
+ * A flattened/serialized representation of a TupleDesc for use in shared
+ * memory. Can be converted to and from regular TupleDesc format. Doesn't
+ * support constraints and doesn't store the actual type OID, because this is
+ * only for use with RECORD types as created by CreateTupleDesc(). These are
+ * arranged into a linked list, in the hash table entry corresponding to the
+ * OIDs of the first 16 attributes, so we'd expect to get more than one entry
+ * in the list when named and other properties differ.
+ */
+typedef struct SerializedTupleDesc
+{
+ dsa_pointer next; /* next with the same same attribute OIDs */
+ int natts; /* number of attributes in the tuple */
+ int32 typmod; /* typmod for tuple type */
+ bool hasoid; /* tuple has oid attribute in its header */
+
+ /*
+ * The attributes follow. We only ever access the first
+ * ATTRIBUTE_FIXED_PART_SIZE bytes of each element, like the code in
+ * tupdesc.c.
+ */
+ FormData_pg_attribute attributes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedTupleDesc;
Not a fan of a separate tupledesc representation, that's just going to
lead to divergence over time. I think we should rather change the normal
tupledesc representation to be compatible with this, and 'just' have a
wrapper struct for the parallel case (with next and such).
+/*
+ * An entry in SharedRecordTypmodRegistry's attribute index. The key is the
+ * first REC_HASH_KEYS attribute OIDs. That means that collisions are
+ * possible, but that's OK because SerializedTupleDesc objects are arranged
+ * into a list.
+ */
+/* Parameters for SharedRecordTypmodRegistry's attributes hash table. */
+const static dht_parameters srtr_atts_index_params = {
+ sizeof(Oid) * REC_HASH_KEYS,
+ sizeof(SRTRAttsIndexEntry),
+ memcmp,
+ tag_hash,
+ LWTRANCHE_SHARED_RECORD_ATTS_INDEX
+};
+
+/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */
+const static dht_parameters srtr_typmod_index_params = {
+ sizeof(uint32),
+ sizeof(SRTRTypmodIndexEntry),
+ memcmp,
+ tag_hash,
+ LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX
+};
+
I'm very much not a fan of this representation. I know you copied the
logic, but I think it's a bad idea. I think the key should just be a
dsa_pointer, and then we can have a proper tag_hash that hashes the
whole thing, and a proper comparator too. Just have
/*
* Combine two hash values, resulting in another hash value, with decent bit
* mixing.
*
* Similar to boost's hash_combine().
*/
static inline uint32
hash_combine(uint32 a, uint32 b)
{
a ^= b + 0x9e3779b9 + (a << 6) + (a >> 2);
return a;
}
and then hash everything.
+/*
+ * Make sure that RecordCacheArray is large enough to store 'typmod'.
+ */
+static void
+ensure_record_cache_typmod_slot_exists(int32 typmod)
+{
+ if (RecordCacheArray == NULL)
+ {
+ RecordCacheArray = (TupleDesc *)
+ MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc));
+ RecordCacheArrayLen = 64;
+ }
+
+ if (typmod >= RecordCacheArrayLen)
+ {
+ int32 newlen = RecordCacheArrayLen * 2;
+
+ while (typmod >= newlen)
+ newlen *= 2;
+
+ RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray,
+ newlen * sizeof(TupleDesc));
+ memset(RecordCacheArray + RecordCacheArrayLen, 0,
+ (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *));
+ RecordCacheArrayLen = newlen;
+ }
+}
Do we really want to keep this? Could just have an equivalent dynahash
for the non-parallel case?
/*
* lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype
@@ -1229,15 +1347,49 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
/*
* It's a transient record type, so look in our record-type table.
*/
- if (typmod < 0 || typmod >= NextRecordTypmod)
+ if (typmod >= 0)
{
- if (!noError)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("record type has not been registered")));
- return NULL;
+ /* It is already in our local cache? */
+ if (typmod < RecordCacheArrayLen &&
+ RecordCacheArray[typmod] != NULL)
+ return RecordCacheArray[typmod];
+
+ /* Are we attached to a SharedRecordTypmodRegistry? */
+ if (CurrentSharedRecordTypmodRegistry.shared != NULL)
Why do we want to do lookups in both? I don't think it's a good idea to
have a chance that you could have the same typmod in both the local
registry (because it'd been created before the shared one) and in the
shared (because it was created in a worker). Ah, that's for caching
purposes? If so, see my above point that we shouldn't have a serialized
version of typdesc (yesyes, constraints will be a bit ugly).
+/*
+ * If we are attached to a SharedRecordTypmodRegistry, find or create a
+ * SerializedTupleDesc that matches 'tupdesc', and return its typmod.
+ * Otherwise return -1.
+ */
+static int32
+find_or_allocate_shared_record_typmod(TupleDesc tupdesc)
+{
+ SRTRAttsIndexEntry *atts_index_entry;
+ SRTRTypmodIndexEntry *typmod_index_entry;
+ SerializedTupleDesc *serialized;
+ dsa_pointer serialized_dp;
+ Oid hashkey[REC_HASH_KEYS];
+ bool found;
+ int32 typmod;
+ int i;
+
+ /* If not even attached, nothing to do. */
+ if (CurrentSharedRecordTypmodRegistry.shared == NULL)
+ return -1;
+
+ /* Try to find a match. */
+ memset(hashkey, 0, sizeof(hashkey));
+ for (i = 0; i < tupdesc->natts; ++i)
+ hashkey[i] = tupdesc->attrs[i]->atttypid;
+ atts_index_entry = (SRTRAttsIndexEntry *)
+ dht_find_or_insert(CurrentSharedRecordTypmodRegistry.atts_index,
+ hashkey,
+ &found);
+ if (!found)
+ {
+ /* Making a new entry. */
+ memcpy(atts_index_entry->leading_attr_oids,
+ hashkey,
+ sizeof(hashkey));
+ atts_index_entry->serialized_tupdesc = InvalidDsaPointer;
+ }
+
+ /* Scan the list we found for a matching serialized one. */
+ serialized_dp = atts_index_entry->serialized_tupdesc;
+ while (DsaPointerIsValid(serialized_dp))
+ {
+ serialized =
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area,
+ serialized_dp);
+ if (serialized_tupledesc_matches(serialized, tupdesc))
+ {
+ /* Found a match, we are finished. */
+ typmod = serialized->typmod;
+ dht_release(CurrentSharedRecordTypmodRegistry.atts_index,
+ atts_index_entry);
+ return typmod;
+ }
+ serialized_dp = serialized->next;
+ }
+
+ /* We didn't find a matching entry, so let's allocate a new one. */
+ typmod = (int)
+ pg_atomic_fetch_add_u32(&CurrentSharedRecordTypmodRegistry.shared->next_typmod,
+ 1);
+
+ /* Allocate shared memory and serialize the TupleDesc. */
+ serialized_dp = serialize_tupledesc(CurrentSharedRecordTypmodRegistry.area,
+ tupdesc);
+ serialized = (SerializedTupleDesc *)
+ dsa_get_address(CurrentSharedRecordTypmodRegistry.area, serialized_dp);
+ serialized->typmod = typmod;
+
+ /*
+ * While we still hold the atts_index entry locked, add this to
+ * typmod_index. That's important because we don't want anyone to be able
+ * to find a typmod via the former that can't yet be looked up in the
+ * latter.
+ */
+ typmod_index_entry =
+ dht_find_or_insert(CurrentSharedRecordTypmodRegistry.typmod_index,
+ &typmod, &found);
+ if (found)
+ elog(ERROR, "cannot create duplicate shared record typmod");
+ typmod_index_entry->typmod = typmod;
+ typmod_index_entry->serialized_tupdesc = serialized_dp;
+ dht_release(CurrentSharedRecordTypmodRegistry.typmod_index,
+ typmod_index_entry);
What if we fail to allocate memory for the entry in typmod_index?
- Andres
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
Hi,
Please find attached a new patch series. I apologise in advance for
0001 and note that the patchset now weighs in at ~75kB compressed.
Here are my in-line replies to your two reviews:
On Tue, Jul 25, 2017 at 10:09 PM, Andres Freund <andres@anarazel.de> wrote:
It does concern me that we're growing yet another somewhat different
hashtable implementation. Yet I don't quite see how we could avoid
it. dynahash relies on proper pointers, simplehash doesn't do locking
(and shouldn't) and also relies on pointers, although to a much lesser
degree. All the open coded tables aren't a good match either. So I
don't quite see an alternative, but I'd love one.
Yeah, I agree. To deal with data structures with different pointer
types, locking policy, inlined hash/eq functions etc, perhaps there is
a way we could eventually do 'policy based design' using the kind of
macro trickery you started where we generate N different hash table
variations but only have to maintain source code for one chaining hash
table implementation? Or perl scripts that effectively behave as a
cfront^H^H^H nevermind. I'm not planning to investigate that for this
cycle.
diff --git a/src/backend/lib/dht.c b/src/backend/lib/dht.c new file mode 100644 index 00000000000..2fec70f7742 --- /dev/null +++ b/src/backend/lib/dht.cFWIW, not a big fan of dht as a filename (nor of dsa.c). For one DHT
usually refers to distributed hash tables, which this is not, and for
another the abbreviation is so short it's not immediately
understandable, and likely to conflict further. I think it'd possibly
ok to have dht as symbol prefixes, but rename the file to be longer.
OK. Now it's ds_hash_table.{c,h}, where "ds" stands for "dynamic
shared". Better? If we were to do other data structures in DSA
memory they could follow that style: ds_red_black_tree.c, ds_vector.c,
ds_deque.c etc and their identifier prefix would be drbt_, dv_, dd_
etc.
Do you want to see a separate patch to rename dsa.c? Got a better
name? You could have spoken up earlier :-) It does sound like a bit
like the thing from crypto or perhaps a scary secret government
department.
+ * To deal with currency, it has a fixed size set of partitions, each of which + * is independently locked.s/currency/concurrency/ I presume.
Fixed.
+ * Each bucket maps to a partition; so insert, find + * and iterate operations normally only acquire one lock. Therefore, good + * concurrency is achieved whenever they don't collide at the lock partitions/they/operations/?
Fixed.
+ * level. However, when a resize operation begins, all partition locks must + * be acquired simultaneously for a brief period. This is only expected to + * happen a small number of times until a stable size is found, since growth is + * geometric.I'm a bit doubtful that we need partitioning at this point, and that it
doesn't actually *degrade* performance for your typmod case.
Yeah, partitioning not needed for this case, but this is supposed to
be more generally useful. I thought about making the number of
partitions a construction parameter, but it doesn't really hurt does
it?
+ * Resizing is done incrementally so that no individual insert operation pays + * for the potentially large cost of splitting all buckets.I'm not sure this is a reasonable tradeoff for the use-case suggested so
far, it doesn't exactly make things simpler. We're not going to grow
much.
Yeah, designed to be more generally useful. Are you saying you would
prefer to see the DHT patch split into an initial submission that does
the simplest thing possible, so that the unlucky guy who causes the
hash table to grow has to do all the work of moving buckets to a
bigger hash table? Then we could move the more complicated
incremental growth stuff to a later patch.
+/* The opaque type used for tracking iterator state. */ +struct dht_iterator; +typedef struct dht_iterator dht_iterator;Isn't it actually the iterator state? Rather than tracking it? Also, why
is it opaque given you're actually defining it below? Guess you'd moved
it at some point.
Improved comment. The iterator state is defined below in the .h, but
with a warning that client code mustn't access it; it exists in the
header only because it's very useful to be able to but dht_iterator on
the stack which requires the client code to have its definition, but I
want to reserve the right to change it arbitrarily in future.
+/* + * The set of parameters needed to create or attach to a hash table. The + * members tranche_id and tranche_name do not need to be initialized when + * attaching to an existing hash table. + */ +typedef struct +{ + Size key_size; /* Size of the key (initial bytes of entry) */ + Size entry_size; /* Total size of entry */Let's use size_t, like we kind of concluded in the thread you started:
http://archives.postgresql.org/message-id/25076.1489699457%40sss.pgh.pa.us
:)
Sold.
+ dht_compare_function compare_function; /* Compare function */ + dht_hash_function hash_function; /* Hash function */Might be worth explaining that these need to provided when attaching
because they're possibly process local. Did you test this with
EXEC_BACKEND?
Added explanation. I haven't personally tested with EXEC_BACKEND but
I believe one of my colleagues had something else that uses DHT this
running on a Windows box and didn't shout at me, and I see no reason
to think it shouldn't work: as explained in the new comment, every
attacher has to supply the function pointers from their own process
space (and standard footgun rules apply if you don't supply compatible
functions).
+ int tranche_id; /* The tranche ID to use for locks. */ +} dht_parameters;+struct dht_iterator +{ + dht_hash_table *hash_table; /* The hash table we are iterating over. */ + bool exclusive; /* Whether to lock buckets exclusively. */ + Size partition; /* The index of the next partition to visit. */ + Size bucket; /* The index of the next bucket to visit. */ + dht_hash_table_item *item; /* The most recently returned item. */ + dsa_pointer last_item_pointer; /* The last item visited. */ + Size table_size_log2; /* The table size when we started iterating. */ + bool locked; /* Whether the current partition is locked. */Haven't gotten to the actual code yet, but this kinda suggest we leave a
partition locked when iterating? Hm, that seems likely to result in a
fair bit of pain...
By default yes, but you can release the lock with
dht_iterate_release_lock() and it'll be reacquired when you call
dht_iterate_next(). If you do that, then you'll continue iterating
after where you left off without visiting any item that you've already
visited, because the pointers are stored in pointer order (even though
the most recently visited item may have been freed rendering the
pointer invalid, we can still use its pointer to skip everything
already visited by numerical comparison without dereferencing it, and
it's indeterminate whether anything added while you were unlocked is
visible to you).
Maintaining linked lists in a certain order sucks, but DHT doesn't
allow duplicate keys and grows when load factor exceeds X so unless
your hash function is busted...
This is complicated, and in the category that I would normally want a
stack of heavy unit tests for. If you don't feel like making
decisions about this now, perhaps iteration (and incremental resize?)
could be removed, leaving only the most primitive get/put hash table
facilities -- just enough for this purpose? Then a later patch could
add them back, with a set of really convincing unit tests...
+/* Iterating over the whole hash table. */ +extern void dht_iterate_begin(dht_hash_table *hash_table, + dht_iterator *iterator, bool exclusive); +extern void *dht_iterate_next(dht_iterator *iterator); +extern void dht_iterate_delete(dht_iterator *iterator);s/delete/delete_current/? Otherwise it looks like it's part of
manipulating just the iterator.
Done.
+extern void dht_iterate_release(dht_iterator *iterator);
I'd add lock to to the name.
Done.
+/* + * An item in the hash table. This wraps the user's entry object in an + * envelop that holds a pointer back to the bucket and a pointer to the next + * item in the bucket. + */ +struct dht_hash_table_item +{ + /* The hashed key, to avoid having to recompute it. */ + dht_hash hash; + /* The next item in the same bucket. */ + dsa_pointer next; + /* The user's entry object follows here. */ + char entry[FLEXIBLE_ARRAY_MEMBER];What's the point of using FLEXIBLE_ARRAY_MEMBER here? And isn't using a
char going to lead to alignment problems?
Fixed. No longer using a member 'entry', just a comment that user
data follows and a macro to find it based on
MAXALIGN(sizeof(dht_hash_table_item)).
+/* The number of partitions for locking purposes. */ +#define DHT_NUM_PARTITIONS_LOG2 7Could use some justification.
Added. Short version: if it's good enough for the buffer pool...
+/* + * The head object for a hash table. This will be stored in dynamic shared + * memory. + */ +typedef struct +{Why anonymous? Not that it hurts much, but seems weird to deviate just
here.
Fixed.
+/* + * Create a new hash table backed by the given dynamic shared area, with the + * given parameters. + */ +dht_hash_table * +dht_create(dsa_area *area, const dht_parameters *params) +{ + dht_hash_table *hash_table; + dsa_pointer control; + + /* Allocate the backend-local object representing the hash table. */ + hash_table = palloc(sizeof(dht_hash_table));Should be documented that this uses caller's MemoryContext.
Done.
+ /* Set up the array of lock partitions. */ + { + int i; + + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + { + LWLockInitialize(PARTITION_LOCK(hash_table, i), + hash_table->control->lwlock_tranche_id); + hash_table->control->partitions[i].count = 0; + }I'd store hash_table->control->lwlock_tranche_id and partitions[i] in
local vars. Possibly hash_table->control too.
Tidied up. I made local vars for partitions and tranche_id.
+/* + * Detach from a hash table. This frees backend-local resources associated + * with the hash table, but the hash table will continue to exist until it is + * either explicitly destroyed (by a backend that is still attached to it), or + * the area that backs it is returned to the operating system. + */ +void +dht_detach(dht_hash_table *hash_table) +{ + /* The hash table may have been destroyed. Just free local memory. */ + pfree(hash_table); +}Somewhat inclined to add debugging refcount - seems like bugs around
that might be annoying to find. Maybe also add an assert ensuring that
no locks are held?
Added assert that not locks are held.
In an earlier version I had reference counts. Then I realised that it
wasn't really helping anything. The state of being 'attached' to a
dht_hash_table isn't really the same as holding a heavyweight resource
like a DSM segment or a file which is backed by kernel resources.
'Attaching' is just something you have to do to get a backend-local
palloc()-ated object required to interact with the hash table, and
since it's just a bit of memory there is no strict requirement to
detach from it, if you're happy to let MemoryContext do that for you.
To put it in GC terms, there is no important finalizer here. Here I
am making the same distinction that we make between stuff managed by
resowner.c (files etc) and stuff managed by MemoryContext (memory); in
the former case it's an elog()-gable offence not to close things
explicitly in non-error paths, but in the latter you're free to do
that, or pfree earlier. If in future we create more things that can
live in DSA memory, I'd like them to be similarly free-and-easy. Make
sense?
In any case this use user of DHT remains attached for the backend's lifetime.
+/* + * Look up an entry, given a key. Returns a pointer to an entry if one can be + * found with the given key. Returns NULL if the key is not found. If a + * non-NULL value is returned, the entry is locked and must be released by + * calling dht_release. If an error is raised before dht_release is called, + * the lock will be released automatically, but the caller must take care to + * ensure that the entry is not left corrupted. The lock mode is either + * shared or exclusive depending on 'exclusive'.This API seems a bit fragile.
Do you mean "... the caller must take care to ensure that the entry is
not left corrupted"? This is the same as anything protected by
LWLocks including shared buffers. If you error out, locks are
released and you had better not have left things in a bad state. I
guess this comment is really just about what C++ people call "basic
exception safety".
Or something else?
+/* + * Returns a pointer to an exclusively locked item which must be released with + * dht_release. If the key is found in the hash table, 'found' is set to true + * and a pointer to the existing entry is returned. If the key is not found, + * 'found' is set to false, and a pointer to a newly created entry is related."is related"?
Fixed.
+ */ +void * +dht_find_or_insert(dht_hash_table *hash_table, + const void *key, + bool *found) +{ + size_t hash; + size_t partition_index; + dht_partition *partition; + dht_hash_table_item *item; + + hash = hash_table->params.hash_function(key, hash_table->params.key_size); + partition_index = PARTITION_FOR_HASH(hash); + partition = &hash_table->control->partitions[partition_index]; + + Assert(hash_table->control->magic == DHT_MAGIC); + Assert(!hash_table->exclusively_locked);Why just exclusively locked? Why'd shared be ok?
It wouldn't be OK. I just didn't have the state required to assert
that. Fixed.
I think in future it should be allowed to lock more than one partition
(conceptually more than one entry) at a time, but only after figuring
out a decent API to support doing that in deadlock-avoiding order. I
don't have a need or a plan for that yet. For the same reason it's
not OK to use dht_find[_or_insert] while any iterator has locked a
partition, which wasn't documented (is now) and isn't currently
assertable.
+/* + * Unlock an entry which was locked by dht_find or dht_find_or_insert. + */ +void +dht_release(dht_hash_table *hash_table, void *entry) +{ + dht_hash_table_item *item = ITEM_FROM_ENTRY(entry); + size_t partition_index = PARTITION_FOR_HASH(item->hash); + bool deferred_resize_work = false; + + Assert(hash_table->control->magic == DHT_MAGIC);Assert lock held (LWLockHeldByMe())
Added this, and a couple more.
+/* + * Begin iterating through the whole hash table. The caller must supply a + * dht_iterator object, which can then be used to call dht_iterate_next to get + * values until the end is reached. + */ +void +dht_iterate_begin(dht_hash_table *hash_table, + dht_iterator *iterator, + bool exclusive) +{ + Assert(hash_table->control->magic == DHT_MAGIC); + + iterator->hash_table = hash_table; + iterator->exclusive = exclusive; + iterator->partition = 0; + iterator->bucket = 0; + iterator->item = NULL; + iterator->last_item_pointer = InvalidDsaPointer; + iterator->locked = false; + + /* Snapshot the size (arbitrary lock to prevent size changing). */ + LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED); + ensure_valid_bucket_pointers(hash_table); + iterator->table_size_log2 = hash_table->size_log2; + LWLockRelease(PARTITION_LOCK(hash_table, 0));Hm. So we're introducing some additional contention on partition 0 -
probably ok.
It would be cute to use MyProcPid % DHT_NUM_PARTITIONS, but that might
be a deadlock hazard if you have multiple iterators on the go at once.
Otherwise iterators only ever lock partitions in order.
+/* + * Move to the next item in the hash table. Returns a pointer to an entry, or + * NULL if the end of the hash table has been reached. The item is locked in + * exclusive or shared mode depending on the argument given to + * dht_iterate_begin. The caller can optionally release the lock by calling + * dht_iterate_release, and then call dht_iterate_next again to move to the + * next entry. If the iteration is in exclusive mode, client code can also + * call dht_iterate_delete. When the end of the hash table is reached, or at + * any time, the client may call dht_iterate_end to abandon iteration. + */I'd just shorten the end to "at any time the client may call
dht_iterate_end to ..."
Done.
[snip] + +/* + * Print out debugging information about the internal state of the hash table. + */ +void +dht_dump(dht_hash_table *hash_table) +{ + size_t i; + size_t j; + + Assert(hash_table->control->magic == DHT_MAGIC); + + for (i = 0; i < DHT_NUM_PARTITIONS; ++i) + LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);Should probably assert & document that no locks are held - otherwise
there's going to be ugly deadlocks. And that's an unlikely thing to try.
OK.
[snip]
+}I'd put this below actual "production" code.
Done.
On Tue, Aug 1, 2017 at 9:08 AM, Andres Freund <andres@anarazel.de> wrote:
Hi,
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 9fd7b4e019b..97c0125a4ba 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -337,17 +337,75 @@ DecrTupleDescRefCount(TupleDesc tupdesc) { Assert(tupdesc->tdrefcount > 0);- ResourceOwnerForgetTupleDesc(CurrentResourceOwner, tupdesc); + if (CurrentResourceOwner != NULL) + ResourceOwnerForgetTupleDesc(CurrentResourceOwner, tupdesc); if (--tupdesc->tdrefcount == 0) FreeTupleDesc(tupdesc); }What's this about? CurrentResourceOwner should always be valid here, no?
If so, why did that change? I don't think it's good to detach this from
the resowner infrastructure...
The reason is that I install a detach hook
shared_record_typmod_registry_detach() in worker processes to clear
out their typmod registry. It runs at a time when there is no
CurrentResourceOwner. It's a theoretical concern only today, because
workers are not reused. If a workers lingered in a waiting room and
then attached to a new session DSM from a different leader, then it
needs to remember nothing of the previous leader's typmods.
/* - * Compare two TupleDesc structures for logical equality + * Compare two TupleDescs' attributes for logical equality * * Note: we deliberately do not check the attrelid and tdtypmod fields. * This allows typcache.c to use this routine to see if a cached record type * matches a requested type, and is harmless for relcache.c's uses. + */ +bool +equalTupleDescAttrs(Form_pg_attribute attr1, Form_pg_attribute attr2) +{comment not really accurate, this routine afaik isn't used by
typcache.c?
I removed this whole hunk and left equalTupleDescs() alone, because I
no longer needed to make that change in this new version. See below.
/* - * Magic numbers for parallel state sharing. Higher-level code should use - * smaller values, leaving these very large ones for use by this module. + * Magic numbers for per-context parallel state sharing. Higher-level code + * should use smaller values, leaving these very large ones for use by this + * module. */ #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) @@ -63,6 +74,16 @@ #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) + +/* Magic number for per-session DSM TOC. */ +#define PARALLEL_SESSION_MAGIC 0xabb0fbc9 + +/* + * Magic numbers for parallel state sharing in the per-session DSM area. + */ +#define PARALLEL_KEY_SESSION_DSA UINT64CONST(0xFFFFFFFFFFFF0001) +#define PARALLEL_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002)Not this patch's fault, but this infrastructure really isn't great. We
should really replace it with a shmem.h style infrastructure, using a
dht hashtable as backing...
Well, I am trying to use the established programming style. We
already have a per-query DSM with a TOC indexed by magic numbers (and
executor node IDs). I add a per-session DSM with a TOC indexed by a
different set of magic numbers. We could always come up with
something better and fix it in both places later?
+/* The current per-session DSM segment, if attached. */ +static dsm_segment *current_session_segment = NULL; +I think it'd be better if we had a proper 'SessionState' and
'BackendSessionState' infrastructure that then contains the dsm segment
etc. I think we'll otherwise just end up with a bunch of parallel
infrastructures.
I'll have to come back to you on this one.
+/* + * A mechanism for sharing record typmods between backends. + */ +struct SharedRecordTypmodRegistry +{ + dht_hash_table_handle atts_index_handle; + dht_hash_table_handle typmod_index_handle; + pg_atomic_uint32 next_typmod; +}; +I think the code needs to explain better how these are intended to be
used. IIUC, atts_index is used to find typmods by "identity", and
typmod_index by the typmod, right? And we need both to avoid
all workers generating different tupledescs, right? Kinda guessable by
reading typecache.c, but that shouldn't be needed.
Fixed.
+/* + * A flattened/serialized representation of a TupleDesc for use in shared + * memory. Can be converted to and from regular TupleDesc format. Doesn't + * support constraints and doesn't store the actual type OID, because this is + * only for use with RECORD types as created by CreateTupleDesc(). These are + * arranged into a linked list, in the hash table entry corresponding to the + * OIDs of the first 16 attributes, so we'd expect to get more than one entry + * in the list when named and other properties differ. + */ +typedef struct SerializedTupleDesc +{ + dsa_pointer next; /* next with the same same attribute OIDs */ + int natts; /* number of attributes in the tuple */ + int32 typmod; /* typmod for tuple type */ + bool hasoid; /* tuple has oid attribute in its header */ + + /* + * The attributes follow. We only ever access the first + * ATTRIBUTE_FIXED_PART_SIZE bytes of each element, like the code in + * tupdesc.c. + */ + FormData_pg_attribute attributes[FLEXIBLE_ARRAY_MEMBER]; +} SerializedTupleDesc;Not a fan of a separate tupledesc representation, that's just going to
lead to divergence over time. I think we should rather change the normal
tupledesc representation to be compatible with this, and 'just' have a
wrapper struct for the parallel case (with next and such).
OK. I killed this. Instead I flattened tupleDesc to make it usable
directly in shared memory as long as there are no constraints. There
is still a small wrapper SharedTupleDesc, but that's just to bolt a
'next' pointer to them so we can chain together TupleDescs with the
same OIDs.
The new 0001 patch changes tupdesc->attrs[i]->foo with
TupleDescAttr(tupdesc, i)->foo everywhere in the tree, so that the
change from attrs[i] to &attrs[i] can be hidden.
+/* + * An entry in SharedRecordTypmodRegistry's attribute index. The key is the + * first REC_HASH_KEYS attribute OIDs. That means that collisions are + * possible, but that's OK because SerializedTupleDesc objects are arranged + * into a list. + */+/* Parameters for SharedRecordTypmodRegistry's attributes hash table. */ +const static dht_parameters srtr_atts_index_params = { + sizeof(Oid) * REC_HASH_KEYS, + sizeof(SRTRAttsIndexEntry), + memcmp, + tag_hash, + LWTRANCHE_SHARED_RECORD_ATTS_INDEX +}; + +/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */ +const static dht_parameters srtr_typmod_index_params = { + sizeof(uint32), + sizeof(SRTRTypmodIndexEntry), + memcmp, + tag_hash, + LWTRANCHE_SHARED_RECORD_TYPMOD_INDEX +}; +I'm very much not a fan of this representation. I know you copied the
logic, but I think it's a bad idea. I think the key should just be a
dsa_pointer, and then we can have a proper tag_hash that hashes the
whole thing, and a proper comparator too. Just have/*
* Combine two hash values, resulting in another hash value, with decent bit
* mixing.
*
* Similar to boost's hash_combine().
*/
static inline uint32
hash_combine(uint32 a, uint32 b)
{
a ^= b + 0x9e3779b9 + (a << 6) + (a >> 2);
return a;
}and then hash everything.
Hmm. I'm not sure I understand. I know what hash_combine is for but
what do you mean when you say they key should just be a dsa_pointer?
What's wrong with providing the key size, whole entry size, compare
function and hash function like this?
+/* + * Make sure that RecordCacheArray is large enough to store 'typmod'. + */ +static void +ensure_record_cache_typmod_slot_exists(int32 typmod) +{ + if (RecordCacheArray == NULL) + { + RecordCacheArray = (TupleDesc *) + MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc)); + RecordCacheArrayLen = 64; + } + + if (typmod >= RecordCacheArrayLen) + { + int32 newlen = RecordCacheArrayLen * 2; + + while (typmod >= newlen) + newlen *= 2; + + RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, + newlen * sizeof(TupleDesc)); + memset(RecordCacheArray + RecordCacheArrayLen, 0, + (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *)); + RecordCacheArrayLen = newlen; + } +}Do we really want to keep this? Could just have an equivalent dynahash
for the non-parallel case?
Hmm. Well the plain old array makes a lot of sense in the
non-parallel case, since we allocate typmods starting from zero. What
don't you like about it? The reason for using an array for
backend-local lookup (aside from "that's how it is already") is that
it's actually the best data structure for the job; the reason for
using a hash table in the shared case is that it gives you locking and
coordinates growth for free. (For the OID index it has to be a hash
table in both cases.)
/* * lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype @@ -1229,15 +1347,49 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError) /* * It's a transient record type, so look in our record-type table. */ - if (typmod < 0 || typmod >= NextRecordTypmod) + if (typmod >= 0) { - if (!noError) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("record type has not been registered"))); - return NULL; + /* It is already in our local cache? */ + if (typmod < RecordCacheArrayLen && + RecordCacheArray[typmod] != NULL) + return RecordCacheArray[typmod]; + + /* Are we attached to a SharedRecordTypmodRegistry? */ + if (CurrentSharedRecordTypmodRegistry.shared != NULL)Why do we want to do lookups in both? I don't think it's a good idea to
have a chance that you could have the same typmod in both the local
registry (because it'd been created before the shared one) and in the
shared (because it was created in a worker). Ah, that's for caching
purposes? If so, see my above point that we shouldn't have a serialized
version of typdesc (yesyes, constraints will be a bit ugly).
Right, that's what I've now done. It's basically a write-through
cache: we'll try to find it in the backend local structures and then
fall back to the shared one. But if we find it in shared memory,
we'll just copy the pointer it into our local data structures.
In the last version I'd build a new TupleDesc from the serialized
form, but now there is no serialized form, just TupleDesc objects
which are now shmem-friendly (except for constraints, which do not
survive the matter transfer into shmem; see TupleDescCopy).
+ /* + * While we still hold the atts_index entry locked, add this to + * typmod_index. That's important because we don't want anyone to be able + * to find a typmod via the former that can't yet be looked up in the + * latter. + */ + typmod_index_entry = + dht_find_or_insert(CurrentSharedRecordTypmodRegistry.typmod_index, + &typmod, &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + typmod_index_entry->typmod = typmod; + typmod_index_entry->serialized_tupdesc = serialized_dp; + dht_release(CurrentSharedRecordTypmodRegistry.typmod_index, + typmod_index_entry);What if we fail to allocate memory for the entry in typmod_index?
Well, I was careful to make sure that it was only pushed onto the list
in the atts_index entry until after we'd successfully allocated
entries in both indexes, so there was no way to exit from this
function leaving a TupleDesc in one index but not the other. In other
words, it might have created an atts_index entry but it'd have an
empty list. But yeah, on reflection we shouldn't leak shared_dp in
that case. In this version I added PG_CATCH() to dsa_free() it and
PG_RETHROW().
More testing and review needed.
--
Thomas Munro
http://www.enterprisedb.com
Attachments:
shared-record-typmods-v4.patchset.tgzapplication/x-gzip; name=shared-record-typmods-v4.patchset.tgzDownload
� �j�Y ��{{���8|��?��W��(��K�U$9��m��������b�` ������s�]��FPv�������X������%�q#�F�#�0�[/C?>|�o�����E���>��v�=������*�s:�N��z�����F��9�����6q�F0��f����|^�\~�����o�?|M����]]��d�^_����$Q|�:L��+�y7��7��w��!��@3��K�?�w;����^�������5 ��/�p��|o������qg}�7���`��f#�����P���:\9�b�tFN��9����x��|�\��K7v^#(�/��E���X%"ZGA,�Y��/������^�M��m�a �3��7���u:0�����G�%�;{{tu���y:��abtRb|A����N:9696���G������D����� v��]g)<2��������YGb�Fn�7���-`���_�����s,d��F@��p�8�<�E�q�VI�/X]��\�ugq�^B=}���"~ ]i~~R0�$&�I��^���Ex�lbD�f��y��^��+���{ ����&���m ;����T�7���s�h�G�������'A�m�`��s�$����O�a�\G-?]>=>:�����$>�\���w���_��_.��������w���_�W��'�����;���OK���5�0}pxx� ����~�������<���9Nh���;b:�o������:F����L������v��k�Z�x
���F���f���[��[���{��{v����������k�K��������:x�K��^,�Q��m�����*��^��5�x+�_1��X������h�e����#����������<�OgQ���\�s���Q:F���`�t?�f[�'1@{�����-\ ��B�� QC����
V��� �c��!`�X%q������o�
� /�}�#�oy_�F���f8}�����fQ��*�.{^���il��I���=;���k��?�M���~)��>�wY����� �$ta��r��
��_�=^�������Y&����_�9O�sw^���6�"�P}i�oAE�Q] DY.e��O��v���?~�����?J7�<�tO�R9�������u�q�V|:B$@~q����w���_����u�����s���������sPH�i��uI���{�M��k��m@����E�Ez�l�yF�|vg����lu��M�,Wb��k�p���N
����B� �'��[���/�Hbx/=�d���IzC��K{��]���C��<��(�!lj��s�����9U}(�|
����
�O-�����y���-��%
�~��
������
l?Y�ny)��F��]��6bS�T�4!��Bx"x�a���v�g��>������K�$�d�/����U�B��_��q�q�M���m���H
���='���EU�b�;eO�L�PD^�{�a�,Qyj�3������*z'nT$WJ�R�aT�b+��(��OA�A�
E���t1��zf?����{��/��f>�P����a��\4��k�\_��d�I������Q�u=�`���Z=�)`Q >������<�1����s�����m�Yl)���'�P��H���8�S������[��
��Nw�5��[>�f!
�
�t�H�&An�����2�J�K=���B���]rP�R� ���������� ��W����/�Yq�v����.����ly5r��
d2��B����N���uS��aY� �
��zBG�Yw�+�;�qs[�H]�,_Y%��^�?-��� �e�
k/�|O,J{��������d����s5����7�$$��U��'.��m��(�'�n3."!SBDS"�����Z!����L�L��v�7�!�)X��K�x�`�����w�<