make Gather node projection-capable

Started by Robert Haasabout 10 years ago8 messages
#1Robert Haas
robertmhaas@gmail.com
2 attachment(s)

The Gather node, as currently committed, is neither projection-capable
nor listed as an exception in is_projection_capable_plan. Amit
discovered this in testing, and I hit it in my testing as well. We
could just mark it as being not projection-capable, but I think it
might be better to go the other way and give it projection
capabilities. Otherwise, we're going to start generating lots of
plans like this:

Result
-> Gather
-> Partial Seq Scan

While that's not the end of the world, it seems to needlessly fly in
the face of the general principle that nodes should generally try to
support projection. So attached is a patch to make Gather
projection-capable (gather-project.patch). It has a slight dependency
on my patch to fix up the tqueue machinery for record types, so I've
attached that patch here as well (tqueue-record-types.patch).

Comments? Reviews?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

Attachments:

tqueue-record-types.patchapplication/x-patch; name=tqueue-record-types.patchDownload
commit d17bac203638c0d74696602069693aa41dea1894
Author: Robert Haas <rhaas@postgresql.org>
Date:   Wed Oct 7 12:43:22 2015 -0400

    Modify tqueue infrastructure to support transient record types.
    
    Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
    mechanism, failed to account for the fact that the RECORD pseudo-type
    uses transient typmods that are only meaningful within a single
    backend.  Transferring such tuples without modification between two
    cooperating backends does not work.  This commit installs a system
    for passing the tuple descriptors over the same shm_mq being used to
    send the tuples themselves.  The two sides might not assign the same
    transient typmod to any given tuple descriptor, so we must also
    substitute the appropriate receiver-side typmod for the one used by
    the sender.  That adds some CPU overhead, but still seems better than
    being unable to pass records between cooperating parallel processes.

diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 69df9e3..4791320 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -221,6 +221,7 @@ gather_getnext(GatherState *gatherstate)
 
 			/* wait only if local scan is done */
 			tup = TupleQueueFunnelNext(gatherstate->funnel,
+									   slot->tts_tupleDescriptor,
 									   gatherstate->need_to_scan_locally,
 									   &done);
 			if (done)
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 67143d3..53b69e0 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -21,23 +21,55 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "catalog/pg_type.h"
 #include "executor/tqueue.h"
+#include "funcapi.h"
+#include "lib/stringinfo.h"
 #include "miscadmin.h"
+#include "utils/array.h"
+#include "utils/memutils.h"
+#include "utils/typcache.h"
 
 typedef struct
 {
 	DestReceiver pub;
 	shm_mq_handle *handle;
+	MemoryContext	tmpcontext;
+	HTAB	   *recordhtab;
+	char		mode;
 }	TQueueDestReceiver;
 
+typedef struct RecordTypemodMap
+{
+	int			remotetypmod;
+	int			localtypmod;
+} RecordTypemodMap;
+
 struct TupleQueueFunnel
 {
 	int			nqueues;
 	int			maxqueues;
 	int			nextqueue;
 	shm_mq_handle **queue;
+	char	   *mode;
+	HTAB	   *typmodmap;
 };
 
+#define		TUPLE_QUEUE_MODE_CONTROL			'c'
+#define		TUPLE_QUEUE_MODE_DATA				'd'
+
+static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value);
+static void tqueueWalkRecordArray(TQueueDestReceiver *tqueue, Datum value);
+static void TupleQueueHandleControlMessage(TupleQueueFunnel *funnel,
+			Size nbytes, char *data);
+static HeapTuple TupleQueueHandleDataMessage(TupleQueueFunnel *funnel,
+							TupleDesc tupledesc, Size nbytes,
+							HeapTupleHeader data);
+static HeapTuple TupleQueueRemapTuple(TupleQueueFunnel *funnel,
+					 TupleDesc tupledesc, HeapTuple tuple);
+static Datum TupleQueueRemapRecord(TupleQueueFunnel *funnel, Datum value);
+static Datum TupleQueueRemapRecordArray(TupleQueueFunnel *funnel, Datum value);
+
 /*
  * Receive a tuple.
  */
@@ -46,12 +78,178 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 	HeapTuple	tuple;
+	HeapTupleHeader tup;
+	AttrNumber	i;
 
 	tuple = ExecMaterializeSlot(slot);
+	tup = tuple->t_data;
+
+	/*
+	 * If any of the columns that we're sending back are records, special
+	 * handling is required, because the tuple descriptors are stored in a
+	 * backend-local cache, and the backend receiving data from us need not
+	 * have the same cache contents we do.  We grovel through the tuple,
+	 * find all the transient record types contained therein, and send
+	 * special control messages through the queue so that the receiving
+	 * process can interpret them correctly.
+	 */
+	for (i = 0; i < slot->tts_tupleDescriptor->natts; ++i)
+	{
+		Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[i];
+		MemoryContext	oldcontext;
+
+		/* Ignore nulls and non-records. */
+		if (slot->tts_isnull[i] || (attr->atttypid != RECORDOID
+			&& attr->atttypid != RECORDARRAYOID))
+			continue;
+
+		/*
+		 * OK, we're going to need to examine this attribute.  We could
+		 * use heap_deform_tuple here, but there's a possibility that the
+		 * slot already constains the deconstructed tuple, in which case
+		 * deforming it again would be needlessly inefficient.
+		 */
+		slot_getallattrs(slot);
+
+		/* Switch to temporary memory context to avoid leaking. */
+		if (tqueue->tmpcontext == NULL)
+			tqueue->tmpcontext =
+				AllocSetContextCreate(TopTransactionContext,
+									  "tqueue temporary context",
+									  ALLOCSET_DEFAULT_MINSIZE,
+									  ALLOCSET_DEFAULT_INITSIZE,
+									  ALLOCSET_DEFAULT_MAXSIZE);
+		oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
+		if (attr->atttypid == RECORDOID)
+			tqueueWalkRecord(tqueue, slot->tts_values[i]);
+		else
+			tqueueWalkRecordArray(tqueue, slot->tts_values[i]);
+		MemoryContextSwitchTo(oldcontext);
+
+		/* Clean up anything memory we allocated. */
+		MemoryContextReset(tqueue->tmpcontext);
+	}
+
+	/* If we entered control mode, switch back to data mode. */
+	if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
+	{
+		tqueue->mode = TUPLE_QUEUE_MODE_DATA;
+		shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+	}
+
+	/* Send the tuple itself. */
 	shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
 }
 
 /*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
+{
+	HeapTupleHeader	tup;
+	Oid			typmod;
+	bool		found;
+	TupleDesc	tupledesc;
+	Datum	   *values;
+	bool	   *isnull;
+	HeapTupleData	tdata;
+	AttrNumber	i;
+
+	/* Extract typmod from tuple. */
+	tup = DatumGetHeapTupleHeader(value);
+	typmod = HeapTupleHeaderGetTypMod(tup);
+
+	/* Look up tuple descriptor in typecache. */
+	tupledesc = lookup_rowtype_tupdesc(RECORDOID, typmod);
+
+	/* Initialize hash table if not done yet. */
+	if (tqueue->recordhtab == NULL)
+	{
+		HASHCTL		ctl;
+
+		ctl.keysize = sizeof(int);
+		ctl.entrysize = sizeof(int);
+		ctl.hcxt = TopMemoryContext;
+		tqueue->recordhtab = hash_create("tqueue record hashtable",
+										 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+	}
+
+	/* Have we already seen this record type?  If not, must report it. */
+	hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
+	if (!found)
+	{
+		StringInfoData	buf;
+
+		/* If message queue is in data mode, switch to control mode. */
+		if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
+		{
+			tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
+			shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+		}
+
+		/* Assemble a control message. */
+		initStringInfo(&buf);
+		appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
+		appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
+		appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
+							   sizeof(bool));
+		for (i = 0; i < tupledesc->natts; ++i)
+			appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
+								   sizeof(FormData_pg_attribute));
+
+		/* Send control message. */
+		shm_mq_send(tqueue->handle, buf.len, buf.data, false);
+	}
+
+	/* Deform the tuple so we can check each column within. */
+	values = palloc(tupledesc->natts * sizeof(Datum));
+	isnull = palloc(tupledesc->natts * sizeof(bool));
+	tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
+	ItemPointerSetInvalid(&(tdata.t_self));
+	tdata.t_tableOid = InvalidOid;
+	tdata.t_data = tup;
+	heap_deform_tuple(&tdata, tupledesc, values, isnull);
+
+	/* Recursively check each non-NULL attribute. */
+	for (i = 0; i < tupledesc->natts; ++i)
+	{
+		Form_pg_attribute attr = tupledesc->attrs[i];
+		if (isnull[i])
+			continue;
+		if (attr->atttypid == RECORDOID)
+			tqueueWalkRecord(tqueue, values[i]);
+		if (attr->atttypid == RECORDARRAYOID)
+			tqueueWalkRecordArray(tqueue, values[i]);
+	}
+
+	/* Release reference count acquired by lookup_rowtype_tupdesc. */
+	DecrTupleDescRefCount(tupledesc);
+}
+
+/*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRecordArray(TQueueDestReceiver *tqueue, Datum value)
+{
+	ArrayType  *arr = DatumGetArrayTypeP(value);
+	Datum	   *elem_values;
+	bool	   *elem_nulls;
+	int			num_elems;
+	int			i;
+
+	Assert(ARR_ELEMTYPE(arr) == RECORDOID);
+	deconstruct_array(arr, RECORDOID, -1, false, 'd',
+					  &elem_values, &elem_nulls, &num_elems);
+	for (i = 0; i < num_elems; ++i)
+		if (!elem_nulls[i])
+			tqueueWalkRecord(tqueue, elem_values[i]);
+}
+
+/*
  * Prepare to receive tuples from executor.
  */
 static void
@@ -77,6 +275,12 @@ tqueueShutdownReceiver(DestReceiver *self)
 static void
 tqueueDestroyReceiver(DestReceiver *self)
 {
+	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+
+	if (tqueue->tmpcontext != NULL)
+		MemoryContextDelete(tqueue->tmpcontext);
+	if (tqueue->recordhtab != NULL)
+		hash_destroy(tqueue->recordhtab);
 	pfree(self);
 }
 
@@ -96,6 +300,9 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
 	self->pub.rDestroy = tqueueDestroyReceiver;
 	self->pub.mydest = DestTupleQueue;
 	self->handle = handle;
+	self->tmpcontext = NULL;
+	self->recordhtab = NULL;
+	self->mode = TUPLE_QUEUE_MODE_DATA;
 
 	return (DestReceiver *) self;
 }
@@ -110,6 +317,7 @@ CreateTupleQueueFunnel(void)
 
 	funnel->maxqueues = 8;
 	funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+	funnel->mode = palloc(funnel->maxqueues * sizeof(char));
 
 	return funnel;
 }
@@ -125,6 +333,9 @@ DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
 	for (i = 0; i < funnel->nqueues; i++)
 		shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
 	pfree(funnel->queue);
+	pfree(funnel->mode);
+	if (funnel->typmodmap != NULL)
+		hash_destroy(funnel->typmodmap);
 	pfree(funnel);
 }
 
@@ -134,12 +345,6 @@ DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
 void
 RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
 {
-	if (funnel->nqueues < funnel->maxqueues)
-	{
-		funnel->queue[funnel->nqueues++] = handle;
-		return;
-	}
-
 	if (funnel->nqueues >= funnel->maxqueues)
 	{
 		int			newsize = funnel->nqueues * 2;
@@ -148,10 +353,12 @@ RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
 
 		funnel->queue = repalloc(funnel->queue,
 								 newsize * sizeof(shm_mq_handle *));
+		funnel->mode = repalloc(funnel->mode, newsize * sizeof(bool));
 		funnel->maxqueues = newsize;
 	}
 
-	funnel->queue[funnel->nqueues++] = handle;
+	funnel->queue[funnel->nqueues] = handle;
+	funnel->mode[funnel->nqueues++] = TUPLE_QUEUE_MODE_DATA;
 }
 
 /*
@@ -172,7 +379,8 @@ RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
  * any other case.
  */
 HeapTuple
-TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+TupleQueueFunnelNext(TupleQueueFunnel *funnel, TupleDesc tupledesc,
+					 bool nowait, bool *done)
 {
 	int			waitpos = funnel->nextqueue;
 
@@ -190,6 +398,7 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 	for (;;)
 	{
 		shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
+		char	   *modep = &funnel->mode[funnel->nextqueue];
 		shm_mq_result result;
 		Size		nbytes;
 		void	   *data;
@@ -198,15 +407,10 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 		result = shm_mq_receive(mqh, &nbytes, &data, true);
 
 		/*
-		 * Normally, we advance funnel->nextqueue to the next queue at this
-		 * point, but if we're pointing to a queue that we've just discovered
-		 * is detached, then forget that queue and leave the pointer where it
-		 * is until the number of remaining queues fall below that pointer and
-		 * at that point make the pointer point to the first queue.
+		 * If this queue has been detached, forget about it and shift the
+		 * remmaining queues downward in the array.
 		 */
-		if (result != SHM_MQ_DETACHED)
-			funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
-		else
+		if (result == SHM_MQ_DETACHED)
 		{
 			--funnel->nqueues;
 			if (funnel->nqueues == 0)
@@ -230,21 +434,32 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 			continue;
 		}
 
+		/* Advance nextqueue pointer to next queue in round-robin fashion. */
+		funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
+
 		/* If we got a message, return it. */
 		if (result == SHM_MQ_SUCCESS)
 		{
-			HeapTupleData htup;
-
-			/*
-			 * The tuple data we just read from the queue is only valid until
-			 * we again attempt to read from it.  Copy the tuple into a single
-			 * palloc'd chunk as callers will expect.
-			 */
-			ItemPointerSetInvalid(&htup.t_self);
-			htup.t_tableOid = InvalidOid;
-			htup.t_len = nbytes;
-			htup.t_data = data;
-			return heap_copytuple(&htup);
+			if (nbytes == 1)
+			{
+				/* Mode switch message. */
+				*modep = ((char *) data)[0];
+				continue;
+			}
+			else if (*modep == TUPLE_QUEUE_MODE_DATA)
+			{
+				/* Tuple data. */
+				return TupleQueueHandleDataMessage(funnel, tupledesc,
+												   nbytes, data);
+			}
+			else if (*modep == TUPLE_QUEUE_MODE_CONTROL)
+			{
+				/* Control message, describing a transient record type. */
+				TupleQueueHandleControlMessage(funnel, nbytes, data);
+				continue;
+			}
+			else
+				elog(ERROR, "invalid mode: %d", (int) *modep);
 		}
 
 		/*
@@ -262,3 +477,224 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 		}
 	}
 }
+
+/*
+ * Handle a data message - that is, a tuple - from the tuple queue funnel.
+ */
+static HeapTuple
+TupleQueueHandleDataMessage(TupleQueueFunnel *funnel, TupleDesc tupledesc,
+							Size nbytes, HeapTupleHeader data)
+{
+	HeapTupleData htup;
+
+	ItemPointerSetInvalid(&htup.t_self);
+	htup.t_tableOid = InvalidOid;
+	htup.t_len = nbytes;
+	htup.t_data = data;
+
+	/* If necessary, remap record typmods. */
+	if (funnel->typmodmap != NULL)
+	{
+		HeapTuple	newtuple;
+
+		newtuple = TupleQueueRemapTuple(funnel, tupledesc, &htup);
+		if (newtuple != NULL)
+			return newtuple;
+	}
+
+	/*
+	 * Otherwise, just copy the tuple into a single palloc'd chunk, as
+	 * callers will expect.
+	 */
+	return heap_copytuple(&htup);
+}
+
+/*
+ * Remap tuple typmods per control information received from remote side.
+ */
+static HeapTuple
+TupleQueueRemapTuple(TupleQueueFunnel *funnel, TupleDesc tupledesc,
+					 HeapTuple tuple)
+{
+	Datum	   *values;
+	bool	   *isnull;
+	bool		dirty = false;
+	int			i;
+
+	/* Deform tuple so we can remap record typmods for individual attrs. */
+	values = palloc(tupledesc->natts * sizeof(Datum));
+	isnull = palloc(tupledesc->natts * sizeof(bool));
+	heap_deform_tuple(tuple, tupledesc, values, isnull);
+
+	/* Recursively check each non-NULL attribute. */
+	for (i = 0; i < tupledesc->natts; ++i)
+	{
+		Form_pg_attribute attr = tupledesc->attrs[i];
+
+		if (isnull[i])
+			continue;
+
+		if (attr->atttypid == RECORDOID)
+		{
+			values[i] = TupleQueueRemapRecord(funnel, values[i]);
+			dirty = true;
+		}
+
+
+		if (attr->atttypid == RECORDARRAYOID)
+		{
+			values[i] = TupleQueueRemapRecordArray(funnel, values[i]);
+			dirty = true;
+		}
+	}
+
+	/* If we didn't need to change anything, just return NULL. */
+	if (!dirty)
+		return NULL;
+
+	/* Reform the modified tuple. */
+	return heap_form_tuple(tupledesc, values, isnull);
+}
+
+static Datum
+TupleQueueRemapRecord(TupleQueueFunnel *funnel, Datum value)
+{
+	HeapTupleHeader	tup;
+	int				remotetypmod;
+	RecordTypemodMap *mapent;
+	TupleDesc		atupledesc;
+	HeapTupleData	htup;
+	HeapTuple		atup;
+
+	tup = DatumGetHeapTupleHeader(value);
+
+	/* Map remote typmod to local typmod and get tupledesc. */
+	remotetypmod = HeapTupleHeaderGetTypMod(tup);
+	Assert(funnel->typmodmap != NULL);
+	mapent = hash_search(funnel->typmodmap, &remotetypmod,
+						 HASH_FIND, NULL);
+	if (mapent == NULL)
+		elog(ERROR, "found unrecognized remote typmod %d",
+			 mapent->remotetypmod);
+	atupledesc = lookup_rowtype_tupdesc(RECORDOID, mapent->localtypmod);
+
+	/* Recursively process contents of record. */
+	ItemPointerSetInvalid(&htup.t_self);
+	htup.t_tableOid = InvalidOid;
+	htup.t_len = HeapTupleHeaderGetDatumLength(tup);
+	htup.t_data = tup;
+	atup = TupleQueueRemapTuple(funnel, atupledesc, &htup);
+
+	/* Release reference count acquired by lookup_rowtype_tupdesc. */
+	DecrTupleDescRefCount(atupledesc);
+
+	/*
+	 * Even if none of the attributes inside this tuple are records that
+	 * require typmod remapping, we still need to change the typmod on
+	 * the record itself.  However, we can do that by copying the tuple
+	 * rather than reforming it.
+	 */
+	if (atup == NULL)
+	{
+		atup = heap_copytuple(&htup);
+		HeapTupleHeaderSetTypMod(atup->t_data, mapent->localtypmod);
+	}
+
+	return HeapTupleHeaderGetDatum(atup->t_data);
+}
+
+static Datum
+TupleQueueRemapRecordArray(TupleQueueFunnel *funnel, Datum value)
+{
+	ArrayType  *arr = DatumGetArrayTypeP(value);
+	Datum	   *elem_values;
+	bool	   *elem_nulls;
+	int			num_elems;
+	int			i;
+
+	Assert(ARR_ELEMTYPE(arr) == RECORDOID);
+	deconstruct_array(arr, RECORDOID, -1, false, 'd',
+					  &elem_values, &elem_nulls, &num_elems);
+	for (i = 0; i < num_elems; ++i)
+		if (!elem_nulls[i])
+			elem_values[i] = TupleQueueRemapRecord(funnel, elem_values[i]);
+	arr = construct_md_array(elem_values, elem_nulls,
+							 ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
+							 RECORDOID,
+							 -1, false, 'd');
+	return PointerGetDatum(arr);
+}
+
+/*
+ * Handle a control message from the tuple queue funnel.
+ *
+ * Control messages are sent when the remote side is sending tuples that
+ * contain transient record types.  We need to arrange to bless those
+ * record types locally and translate between remote and local typmods.
+ */
+static void
+TupleQueueHandleControlMessage(TupleQueueFunnel *funnel, Size nbytes,
+							   char *data)
+{
+	int		natts;
+	int		remotetypmod;
+	bool	hasoid;
+	char   *buf = data;
+	int		rc = 0;
+	int		i;
+	Form_pg_attribute *attrs;
+	MemoryContext	oldcontext;
+	TupleDesc	tupledesc;
+	RecordTypemodMap *mapent;
+	bool	found;
+
+	/* Extract remote typmod. */
+	memcpy(&remotetypmod, &buf[rc], sizeof(int));
+	rc += sizeof(int);
+
+	/* Extract attribute count. */
+	memcpy(&natts, &buf[rc], sizeof(int));
+	rc += sizeof(int);
+
+	/* Extract hasoid flag. */
+	memcpy(&hasoid, &buf[rc], sizeof(bool));
+	rc += sizeof(bool);
+
+	/* Extract attribute details. */
+	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+	attrs = palloc(natts * sizeof(Form_pg_attribute));
+	for (i = 0; i < natts; ++i)
+	{
+		attrs[i] = palloc(sizeof(FormData_pg_attribute));
+		memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
+		rc += sizeof(FormData_pg_attribute);
+	}
+	MemoryContextSwitchTo(oldcontext);
+
+	/* We should have read the whole message. */
+	Assert(rc == nbytes);
+
+	/* Construct TupleDesc. */
+	tupledesc = CreateTupleDesc(natts, hasoid, attrs);
+	tupledesc = BlessTupleDesc(tupledesc);
+
+	/* Create map if it doesn't exist already. */
+	if (funnel->typmodmap == NULL)
+	{
+		HASHCTL		ctl;
+
+		ctl.keysize = sizeof(int);
+		ctl.entrysize = sizeof(RecordTypemodMap);
+		ctl.hcxt = CurTransactionContext;
+		funnel->typmodmap = hash_create("typmodmap hashtable",
+							 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+	}
+
+	/* Create map entry. */
+	mapent = hash_search(funnel->typmodmap, &remotetypmod, HASH_ENTER,
+						 &found);
+	if (found)
+		elog(ERROR, "duplicate message for typmod %d",
+			 remotetypmod);
+	mapent->localtypmod = tupledesc->tdtypmod;
+}
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 6f8eb73..59f35c7 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -25,7 +25,7 @@ typedef struct TupleQueueFunnel TupleQueueFunnel;
 extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
 extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
 extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
-extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
-					 bool *done);
+extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, TupleDesc tupledesc,
+					 bool nowait, bool *done);
 
 #endif   /* TQUEUE_H */
gather-project.patchapplication/x-patch; name=gather-project.patchDownload
commit da52bc825554ea7937398b4b296f3ecd6e6822af
Author: Robert Haas <rhaas@postgresql.org>
Date:   Tue Oct 20 21:47:18 2015 -0400

    Make Gather node projection-capable.
    
    The original Gather code failed to mark a Gather node as not able to
    do projection, but it couldn't, even though it did call initialize its
    projection info via ExecAssignProjectionInfo.  There doesn't seem to
    be any good reason for this node not to have projection capability,
    so clean things up so that it does.  Without this, plans using Gather
    nodes need to carry extra Result nodes to do projection.

diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 4791320..48d6c31 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
 #include "executor/nodeGather.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
+#include "utils/memutils.h"
 #include "utils/rel.h"
 
 
@@ -50,6 +51,9 @@ GatherState *
 ExecInitGather(Gather *node, EState *estate, int eflags)
 {
 	GatherState *gatherstate;
+	Plan	   *outerNode;
+	bool		hasoid;
+	TupleDesc	tupDesc;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	/*
 	 * tuple table initialization
 	 */
+	gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
 	ExecInitResultTupleSlot(estate, &gatherstate->ps);
 
 	/*
 	 * now initialize outer plan
 	 */
-	outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
-
+	outerNode = outerPlan(node);
+	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
 
@@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&gatherstate->ps);
 	ExecAssignProjectionInfo(&gatherstate->ps, NULL);
 
+	/*
+	 * Initialize funnel slot to same tuple descriptor as outer plan.
+	 */
+	if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
+		hasoid = false;
+	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
+	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
+
 	return gatherstate;
 }
 
@@ -113,6 +126,9 @@ ExecGather(GatherState *node)
 {
 	int			i;
 	TupleTableSlot *slot;
+	TupleTableSlot *resultSlot;
+	ExprDoneCond isDone;
+	ExprContext *econtext;
 
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
@@ -169,7 +185,53 @@ ExecGather(GatherState *node)
 		node->initialized = true;
 	}
 
-	slot = gather_getnext(node);
+	/*
+	 * Check to see if we're still projecting out tuples from a previous scan
+	 * tuple (because there is a function-returning-set in the projection
+	 * expressions).  If so, try to project another one.
+	 */
+	if (node->ps.ps_TupFromTlist)
+	{
+		resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+		if (isDone == ExprMultipleResult)
+			return resultSlot;
+		/* Done with that source tuple... */
+		node->ps.ps_TupFromTlist = false;
+	}
+
+	/*
+	 * Reset per-tuple memory context to free any expression evaluation
+	 * storage allocated in the previous tuple cycle.  Note we can't do this
+	 * until we're done projecting.
+	 */
+	econtext = node->ps.ps_ExprContext;
+	ResetExprContext(econtext);
+
+	/* Get and return the next tuple, projecting if necessary. */
+	for (;;)
+	{
+		/*
+		 * Get next tuple, either from one of our workers, or by running the
+		 * plan ourselves.
+		 */
+		slot = gather_getnext(node);
+		if (TupIsNull(slot))
+			return NULL;
+
+		/*
+		 * form the result tuple using ExecProject(), and return it --- unless
+		 * the projection produces an empty set, in which case we must loop
+		 * back around for another tuple
+		 */
+		econtext->ecxt_outertuple = slot;
+		resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+
+		if (isDone != ExprEndResult)
+		{
+			node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
+			return resultSlot;
+		}
+	}
 
 	return slot;
 }
@@ -201,18 +263,11 @@ ExecEndGather(GatherState *node)
 static TupleTableSlot *
 gather_getnext(GatherState *gatherstate)
 {
-	PlanState  *outerPlan;
+	PlanState  *outerPlan = outerPlanState(gatherstate);
 	TupleTableSlot *outerTupleSlot;
-	TupleTableSlot *slot;
+	TupleTableSlot *fslot = gatherstate->funnel_slot;
 	HeapTuple	tup;
 
-	/*
-	 * We can use projection info of Gather for the tuples received from
-	 * worker backends as currently for all cases worker backends sends the
-	 * projected tuple as required by Gather node.
-	 */
-	slot = gatherstate->ps.ps_ProjInfo->pi_slot;
-
 	while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
 	{
 		if (gatherstate->funnel != NULL)
@@ -221,7 +276,7 @@ gather_getnext(GatherState *gatherstate)
 
 			/* wait only if local scan is done */
 			tup = TupleQueueFunnelNext(gatherstate->funnel,
-									   slot->tts_tupleDescriptor,
+									   fslot->tts_tupleDescriptor,
 									   gatherstate->need_to_scan_locally,
 									   &done);
 			if (done)
@@ -230,19 +285,17 @@ gather_getnext(GatherState *gatherstate)
 			if (HeapTupleIsValid(tup))
 			{
 				ExecStoreTuple(tup,		/* tuple to store */
-							   slot,	/* slot to store in */
+							   fslot,	/* slot in which to store the tuple */
 							   InvalidBuffer,	/* buffer associated with this
 												 * tuple */
 							   true);	/* pfree this pointer if not from heap */
 
-				return slot;
+				return fslot;
 			}
 		}
 
 		if (gatherstate->need_to_scan_locally)
 		{
-			outerPlan = outerPlanState(gatherstate);
-
 			outerTupleSlot = ExecProcNode(outerPlan);
 
 			if (!TupIsNull(outerTupleSlot))
@@ -252,7 +305,7 @@ gather_getnext(GatherState *gatherstate)
 		}
 	}
 
-	return ExecClearTuple(slot);
+	return ExecClearTuple(fslot);
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 8c6c571..48d6e6f 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -602,12 +602,15 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 			set_join_references(root, (Join *) plan, rtoffset);
 			break;
 
+		case T_Gather:
+			set_upper_references(root, plan, rtoffset);
+			break;
+
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
 		case T_Unique:
 		case T_SetOp:
-		case T_Gather:
 
 			/*
 			 * These plan types don't actually bother to evaluate their
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4fcdcc4..939bc0e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1964,6 +1964,7 @@ typedef struct GatherState
 	bool		initialized;
 	struct ParallelExecutorInfo *pei;
 	struct TupleQueueFunnel *funnel;
+	TupleTableSlot *funnel_slot;
 	bool		need_to_scan_locally;
 } GatherState;
 
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Robert Haas (#1)
Re: make Gather node projection-capable

Robert Haas <robertmhaas@gmail.com> writes:

The Gather node, as currently committed, is neither projection-capable
nor listed as an exception in is_projection_capable_plan. Amit
discovered this in testing, and I hit it in my testing as well. We
could just mark it as being not projection-capable, but I think it
might be better to go the other way and give it projection
capabilities.

Um ... why would you not want the projections to happen in the child
nodes, where they could be parallelized? Or am I missing something?

While that's not the end of the world, it seems to needlessly fly in
the face of the general principle that nodes should generally try to
support projection.

I'm not sure there is any such principle.

regards, tom lane

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#3Robert Haas
robertmhaas@gmail.com
In reply to: Tom Lane (#2)
Re: make Gather node projection-capable

On Thu, Oct 22, 2015 at 1:38 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Robert Haas <robertmhaas@gmail.com> writes:

The Gather node, as currently committed, is neither projection-capable
nor listed as an exception in is_projection_capable_plan. Amit
discovered this in testing, and I hit it in my testing as well. We
could just mark it as being not projection-capable, but I think it
might be better to go the other way and give it projection
capabilities.

Um ... why would you not want the projections to happen in the child
nodes, where they could be parallelized? Or am I missing something?

You probably would, but sometimes that might not be possible; for
example, the tlist might contain a parallel-restricted function (which
therefore has to run in the leader).

While that's not the end of the world, it seems to needlessly fly in
the face of the general principle that nodes should generally try to
support projection.

I'm not sure there is any such principle.

I just inferred that this was the principle from reading the code; it
doesn't seem to be documented anywhere. In fact, what projection
actually means doesn't seem to be documented anywhere. Feel free to
set me straight. That having been said, I hope there's SOME principle
other than "whatever we happened to implement". All of our scan and
join nodes seem to have projection capability - I assume that's not
an accident. It would simplify the executor code if we ripped all of
that out and instead had a separate Project node (or used Result), but
for some reason we have not.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#4Alvaro Herrera
alvherre@2ndquadrant.com
In reply to: Robert Haas (#3)
Re: make Gather node projection-capable

Robert Haas wrote:

On Thu, Oct 22, 2015 at 1:38 PM, Tom Lane <tgl@sss.pgh.pa.us> wrote:

Robert Haas <robertmhaas@gmail.com> writes:

The Gather node, as currently committed, is neither projection-capable
nor listed as an exception in is_projection_capable_plan. Amit
discovered this in testing, and I hit it in my testing as well. We
could just mark it as being not projection-capable, but I think it
might be better to go the other way and give it projection
capabilities.

Um ... why would you not want the projections to happen in the child
nodes, where they could be parallelized? Or am I missing something?

You probably would, but sometimes that might not be possible; for
example, the tlist might contain a parallel-restricted function (which
therefore has to run in the leader).

I don't understand your reply. Failing to parallelize the child node
does not prevent it from doing the projection itself, does it?

That said, I don't understand Tom's comment either. Surely the planner
is going to choose to do the projection in the innermost node possible,
so that the children nodes are going to do the projections most of the
time. But if for whatever reason this fails to happen, wouldn't it make
more sense to do it at Gather than having to put a Result on top?

While that's not the end of the world, it seems to needlessly fly in
the face of the general principle that nodes should generally try to
support projection.

I'm not sure there is any such principle.

I just inferred that this was the principle from reading the code; it
doesn't seem to be documented anywhere. In fact, what projection
actually means doesn't seem to be documented anywhere. Feel free to
set me straight. That having been said, I hope there's SOME principle
other than "whatever we happened to implement". All of our scan and
join nodes seem to have projection capability - I assume that's not
an accident. It would simplify the executor code if we ripped all of
that out and instead had a separate Project node (or used Result), but
for some reason we have not.

Projections are a weird construct as implemented, yeah. I imagine it's
because of performance reasons, because having separate Result nodes
everywhere would be a lot slower, wouldn't it?

--
�lvaro Herrera http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#5Robert Haas
robertmhaas@gmail.com
In reply to: Alvaro Herrera (#4)
Re: make Gather node projection-capable

On Thu, Oct 22, 2015 at 3:18 PM, Alvaro Herrera
<alvherre@2ndquadrant.com> wrote:

You probably would, but sometimes that might not be possible; for
example, the tlist might contain a parallel-restricted function (which
therefore has to run in the leader).

I don't understand your reply. Failing to parallelize the child node
does not prevent it from doing the projection itself, does it?

OK, now I'm confused. If you can't parallelize the child, there's no
Gather node, and this discussion is irrelevant. The case that matters
is something like:

SELECT pg_backend_pid(), a, b, c FROM foo WHERE x = 1;

What happens today is that the system first produces a SeqScan plan
with an all-Var tlist, which may either be a physical tlist or the
exact columns needed. Then, after join planning, which is trivial
here, we substitute for that tlist the output tlist of the plan.
Since SeqScan is projection-capable, we just emit a single-node plan
tree: SeqScan. But say we choose a PartialSeqScan plan. The tlist
can't be pushed down because pg_backend_pid() must run in the leader.
So, if Gather can't do projection, we'll end up with
Result->Gather->PartialSeqScan. If we make Gather projection-capable,
we can just end up with Gather->PartialSeqScan.

I prefer the second outcome. TBH, the major reason is that I've just
been experimenting with injecting single-copy Gather nodes into Plan
trees above the join nest and below any upper plan nodes that get
stuck on top and seeing which regression tests fail. Since we do a
lot of EXPLAIN-ing in the plan, I hacked EXPLAIN not to show the
Gather nodes. But it does show the extra Result nodes which get
generated because Gather isn't projection-capable, and that causes a
huge pile of spurious test failures. Even with the patch I posted
applies, there are some residual failures that don't look simple to
resolve, because sometimes an initplan or subplan attaches to the
Gather node since something is being projected there. So if you just
have EXPLAIN look through those nodes and show their children instead,
you still don't get the same plans you would without the test code in
all cases, but it helps a lot.

That said, I don't understand Tom's comment either. Surely the planner
is going to choose to do the projection in the innermost node possible,
so that the children nodes are going to do the projections most of the
time. But if for whatever reason this fails to happen, wouldn't it make
more sense to do it at Gather than having to put a Result on top?

The planner doesn't seem to choose to do projection in the innermost
node possible. The final tlist only gets projected at the top of the
join tree. Beneath that, it seems like we project in order to avoid
carrying Vars through nodes where that would be a needless expense,
but that's just dropping columns, not computing anything. That having
been said, I don't think that takes anything away from your chain of
reasoning here, and I agree with your conclusion. There seems to be
little reason to force a Result node atop a Gather node when we don't
do that for most other node types.

Also, the patch I'm proposing I think actually makes things quite a
bit cleaner than the status quo, because the current code initializes
the projection info and then ignores the projection info itself, while
using the slot that gets set up by initializing the projection info.
That just seems goofy. If there's some reason not to do what I'm
proposing here, I'm happy to do whatever we agree is better, but I
don't think leaving it the way it is now makes any sense.

Projections are a weird construct as implemented, yeah. I imagine it's
because of performance reasons, because having separate Result nodes
everywhere would be a lot slower, wouldn't it?

I'm not sure. It'd be my guess that this is why it wasn't made a
separate node to begin with, but I don't know.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#6Robert Haas
robertmhaas@gmail.com
In reply to: Robert Haas (#3)
Re: make Gather node projection-capable

On Thu, Oct 22, 2015 at 2:49 PM, Robert Haas <robertmhaas@gmail.com> wrote:

You probably would, but sometimes that might not be possible; for
example, the tlist might contain a parallel-restricted function (which
therefore has to run in the leader).

Oh, also: pushing down the tlist is actually sorta non-trivial at the
moment. I can stick a GatherPath on top of whatever the join planner
kicks out (although I don't have a cost model for this yet, so right
now it's just a hard-coded test), but the upper planner substitutes
the tlist into whatever the topmost plan node is - and that's the
Gather, not whatever's under it. Maybe I should have a special case
for this: if the node into which we would insert the final tlist is a
Gather, see whether it's parallel-safe, and if so, peel the Gather
node off, apply the tlist to whatever's left (adding a gating Result
node if need be) and put the Gather back on. This seems less
important than a few other things I need to get done, but certainly
worth doing. But do you know whether the upper planner path-ifaction
work will be likely to render whatever code I might write here
obsolete?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

#7Simon Riggs
simon@2ndQuadrant.com
In reply to: Robert Haas (#5)
Re: make Gather node projection-capable

On 22 October 2015 at 16:01, Robert Haas <robertmhaas@gmail.com> wrote:

If we make Gather projection-capable,
we can just end up with Gather->PartialSeqScan.

Is there a reason not to do projection in the Gather node? I don't see one.

That said, I don't understand Tom's comment either. Surely the planner
is going to choose to do the projection in the innermost node possible,
so that the children nodes are going to do the projections most of the
time. But if for whatever reason this fails to happen, wouldn't it make
more sense to do it at Gather than having to put a Result on top?

The planner doesn't seem to choose to do projection in the innermost
node possible. The final tlist only gets projected at the top of the
join tree. Beneath that, it seems like we project in order to avoid
carrying Vars through nodes where that would be a needless expense,
but that's just dropping columns, not computing anything. That having
been said, I don't think that takes anything away from your chain of
reasoning here, and I agree with your conclusion. There seems to be
little reason to force a Result node atop a Gather node when we don't
do that for most other node types.

Presumably this is a performance issue then? If we are calculating
something *after* a join which increases rows then the query will be slower
than need be.

I agree the rule should be to project as early as possible.

--
Simon Riggs http://www.2ndQuadrant.com/
<http://www.2ndquadrant.com/&gt;
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

#8Robert Haas
robertmhaas@gmail.com
In reply to: Simon Riggs (#7)
Re: make Gather node projection-capable

On Sun, Oct 25, 2015 at 11:59 AM, Simon Riggs <simon@2ndquadrant.com> wrote:

On 22 October 2015 at 16:01, Robert Haas <robertmhaas@gmail.com> wrote:

If we make Gather projection-capable,
we can just end up with Gather->PartialSeqScan.

Is there a reason not to do projection in the Gather node? I don't see one.

I don't see one either. There may be some work that needs to be done
to get the projection to happen in the Gather node in all of the cases
where we want it to happen in the Gather node, but that's not an
argument against having the capability.

That said, I don't understand Tom's comment either. Surely the planner
is going to choose to do the projection in the innermost node possible,
so that the children nodes are going to do the projections most of the
time. But if for whatever reason this fails to happen, wouldn't it make
more sense to do it at Gather than having to put a Result on top?

The planner doesn't seem to choose to do projection in the innermost
node possible. The final tlist only gets projected at the top of the
join tree. Beneath that, it seems like we project in order to avoid
carrying Vars through nodes where that would be a needless expense,
but that's just dropping columns, not computing anything. That having
been said, I don't think that takes anything away from your chain of
reasoning here, and I agree with your conclusion. There seems to be
little reason to force a Result node atop a Gather node when we don't
do that for most other node types.

Presumably this is a performance issue then? If we are calculating something
*after* a join which increases rows then the query will be slower than need
be.

I don't think there will be a performance issue in most cases because
in most cases the node immediately beneath the Gather node will be
able to do projection, which in most cases is in fact better, because
then the work gets done in the workers. However, there may be some
cases where it is useful. After having mulled it over, I think it's
likely that the reason why we didn't introduce a separate node for
projection is that you generally want to project to remove unnecessary
columns at the earliest stage that doesn't lose performance. So if we
didn't have projection capabilities built into the individual nodes,
then you'd end up with things like Aggregate -> Project -> Join ->
Project -> Scan, which would start to get silly, and likely
inefficient.

I agree the rule should be to project as early as possible.

Cool.

I'm not sure Tom was really disagreeing with the idea of making Gather
projection-capable ... it seems like he may have just been saying that
there wasn't as much of a rule as I was alleging. Which is fine: we
can decide what is best here, and I still think this is it. Barring
further objections, I'm going to commit this, because (1) the status
quo is definitely weird because Gather is abusing the projection stuff
to come up with an extra slot, so doing thing seems unappealing and
(2) I need to make other changes that touch the same areas of the
code, and I want to get this stuff done quickly so that we get a
user-visible feature people can test without writing C code in the
near future.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers