From 24017512fbf34efaf48961fae3798407ac59a390 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 3 Jul 2016 15:10:13 -0700
Subject: [PATCH 04/20] WIP: Optimize slot_deform_tuple() significantly.

Todo:
* benchmark computed goto vs. switch
* if computed goto comes ahead, only optionally use it
---
 src/backend/access/common/heaptuple.c | 562 ++++++++++++++++++++++++++++++++--
 src/backend/executor/execMain.c       |   3 +
 src/backend/executor/execTuples.c     |  21 ++
 src/include/executor/tuptable.h       |  54 ++++
 4 files changed, 614 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index 6d0f3f3..256243a 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -70,6 +70,8 @@
 #define VARLENA_ATT_IS_PACKABLE(att) \
 	((att)->attstorage != 'p')
 
+static void slot_deform_tuple(TupleTableSlot *slot, int natts);
+
 
 /* ----------------------------------------------------------------
  *						misc support routines
@@ -953,6 +955,192 @@ heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
 	}
 }
 
+static void
+slot_push_deform_step(TupleTableSlot *slot, DeparseSlotState *dp)
+{
+	if (slot->tts_dp_alloc == 0)
+	{
+		slot->tts_dp =  MemoryContextAllocZero(slot->tts_mcxt,
+											   sizeof(DeparseSlotState) * 16);
+		slot->tts_dp_alloc = 16;
+	}
+	else if (slot->tts_dp_len == slot->tts_dp_alloc)
+	{
+		slot->tts_dp = repalloc(slot->tts_dp,
+								sizeof(DeparseSlotState) * slot->tts_dp_alloc * 2);
+		slot->tts_dp_alloc *= 2;
+	}
+
+	memcpy(&slot->tts_dp[slot->tts_dp_len++], dp, sizeof(DeparseSlotState));
+}
+
+void
+slot_prepare_deform(TupleTableSlot *slot)
+{
+	DeparseSlotState curdp;
+	TupleDesc desc = slot->tts_tupleDescriptor;
+	int natt;
+	int attcuralign = 0;
+
+	for (natt = 0; natt < desc->natts; natt++)
+	{
+		Form_pg_attribute att = desc->attrs[natt];
+
+		/* reset */
+		memset(&curdp, 0, sizeof(curdp));
+
+		curdp.attlen = att->attlen;
+		curdp.attbyval = att->attbyval;
+
+		if (att->attbyval)
+		{
+			switch (att->attlen)
+			{
+				case 1:
+					Assert(att->attalign = 'c');
+					curdp.attopcode = DO_BYVAL_1;
+					curdp.alignto = 1;
+					break;
+				case 2:
+					Assert(att->attalign = 's');
+					curdp.attopcode = DO_BYVAL_2;
+					curdp.alignto = ALIGNOF_SHORT;
+					break;
+				case 4:
+					Assert(att->attalign = 'i');
+					curdp.attopcode = DO_BYVAL_4;
+					curdp.alignto = ALIGNOF_INT;
+					break;
+				case 8:
+					Assert(att->attalign = 'd');
+					curdp.attopcode = DO_BYVAL_8;
+					curdp.alignto = ALIGNOF_DOUBLE;
+					break;
+				default:
+					Assert(false);
+			}
+
+			/* if not guaranteed to be correctly aligned, include alignment code */
+			if (attcuralign < 0)
+				curdp.attopcode += DO_ALIGN_IND_FIXED_LENGTH;
+			else if (attcuralign != TYPEALIGN(curdp.alignto, attcuralign))
+			{
+				curdp.attopcode += DO_ALIGN_IND_FIXED_LENGTH;
+				attcuralign += TYPEALIGN(curdp.alignto, attcuralign);
+			}
+
+			if (attcuralign >= 0)
+				attcuralign += att->attlen;
+		}
+		else
+		{
+			/* compute alignment */
+			if (att->attalign == 'i')
+			{
+				curdp.alignto = ALIGNOF_INT;
+			}
+			else if (att->attalign == 'c')
+			{
+				curdp.alignto = 1;
+			}
+			else if (att->attalign == 'd')
+			{
+				curdp.alignto = ALIGNOF_DOUBLE;
+			}
+			else if (att->attalign == 's')
+			{
+				curdp.alignto = ALIGNOF_SHORT;
+			}
+			else
+			{
+				Assert(false);
+			}
+
+
+			/* compute optcode */
+			if (att->attlen >= 0)
+			{
+				curdp.attopcode = DO_IND_FIXED_LENGTH;
+			}
+			else if (att->attlen == -1)
+			{
+				curdp.attopcode = DO_VARLENA;
+			}
+			else if (att->attlen == -2)
+			{
+				curdp.attopcode = DO_CSTRING;
+			}
+			else
+			{
+				Assert(false);
+			}
+
+			/* if not guaranteed to be correctly aligned, include alignment code */
+			if (attcuralign < 0 ||
+				attcuralign != TYPEALIGN(curdp.alignto, attcuralign))
+			{
+				curdp.attopcode += DO_ALIGN_IND_FIXED_LENGTH;
+				attcuralign = TYPEALIGN(curdp.alignto, attcuralign);
+			}
+
+			if (attcuralign >= 0 && att->attlen >= 0)
+				attcuralign += att->attlen;
+
+			if (att->attlen <= 0)
+				attcuralign = -1;
+		}
+
+		if (!att->attnotnull)
+		{
+			curdp.attopcode += 7;
+			attcuralign = -1;
+		}
+
+		slot_push_deform_step(slot, &curdp);
+	}
+
+	curdp.attopcode = DO_DONE;
+	slot_push_deform_step(slot, &curdp);
+}
+
+/* FIXME: use computed goto on gcc / clang */
+// #define USE_COMPUTED_GOTO
+
+#ifdef USE_COMPUTED_GOTO
+#define SD_SWITCH(d)
+#define SD_DISPATCH() \
+	do \
+	{ \
+		attnum++; \
+		if (attnum >= natts) \
+			goto out; \
+		dp++; \
+		goto *(((void**)dispatch_table)[dp->attopcode]); \
+	} \
+	while (0)
+#define SD_CASE(name) CASE_##name
+#else
+#define SD_SWITCH(d) switch (d)
+#define SD_DISPATCH() \
+	do \
+	{ \
+		attnum++; \
+		dp++; \
+		goto starteval; \
+	} \
+	while (0)
+#define SD_CASE(name) case name
+
+#endif /* USE_COMPUTED_GOTO */
+
+#define SD_CHECKNULL \
+	if (hasnulls && att_isnull(attnum, bp)) \
+	{ \
+		isnull[attnum] = true; \
+		values[attnum] = (Datum) 0; \
+		SD_DISPATCH(); \
+	}
+
 /*
  * slot_deform_tuple
  *		Given a TupleTableSlot, extract data from the slot's physical tuple
@@ -963,11 +1151,343 @@ heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
  *		on each call we extract attributes up to the one needed, without
  *		re-computing information about previously extracted attributes.
  *		slot->tts_nvalid is the number of attributes already extracted.
+ *
+ *		NB: This requires that only nullable columns are NULL, which makes
+ *		this unsuitable for e.g. constraint evaluation.
  */
 static void
 slot_deform_tuple(TupleTableSlot *slot, int natts)
 {
 	HeapTuple	tuple = slot->tts_tuple;
+	Datum	   *values = slot->tts_values;
+	bool	   *isnull = slot->tts_isnull;
+	HeapTupleHeader tup = tuple->t_data;
+	bool		hasnulls = HeapTupleHasNulls(tuple);
+	int			attnum;
+	char	   *tpc;				/* ptr to tuple data */
+	bits8	   *bp = tup->t_bits;		/* ptr to null bitmap in tuple */
+	DeparseSlotState *dp = slot->tts_dp;
+	int			oldop;
+
+#ifdef USE_COMPUTED_GOTO
+	static const void* dispatch_table[] = {
+		&&CASE_DO_IND_FIXED_LENGTH,
+		&&CASE_DO_VARLENA,
+		&&CASE_DO_CSTRING,
+		&&CASE_DO_BYVAL_1,
+		&&CASE_DO_BYVAL_2,
+		&&CASE_DO_BYVAL_4,
+		&&CASE_DO_BYVAL_8,
+
+		&&CASE_DO_NULLABLE_IND_FIXED_LENGTH,
+		&&CASE_DO_NULLABLE_VARLENA,
+		&&CASE_DO_NULLABLE_CSTRING,
+		&&CASE_DO_NULLABLE_BYVAL_1,
+		&&CASE_DO_NULLABLE_BYVAL_2,
+		&&CASE_DO_NULLABLE_BYVAL_4,
+		&&CASE_DO_NULLABLE_BYVAL_8,
+
+		&&CASE_DO_ALIGN_IND_FIXED_LENGTH,
+		&&CASE_DO_ALIGN_VARLENA,
+		&&CASE_DO_ALIGN_CSTRING,
+		&&CASE_DO_ALIGN_BYVAL_1,
+		&&CASE_DO_ALIGN_BYVAL_2,
+		&&CASE_DO_ALIGN_BYVAL_4,
+		&&CASE_DO_ALIGN_BYVAL_8,
+
+		&&CASE_DO_ALIGN_NULLABLE_IND_FIXED_LENGTH,
+		&&CASE_DO_ALIGN_NULLABLE_VARLENA,
+		&&CASE_DO_ALIGN_NULLABLE_CSTRING,
+		&&CASE_DO_ALIGN_NULLABLE_BYVAL_1,
+		&&CASE_DO_ALIGN_NULLABLE_BYVAL_2,
+		&&CASE_DO_ALIGN_NULLABLE_BYVAL_4,
+		&&CASE_DO_ALIGN_NULLABLE_BYVAL_8,
+		&&CASE_DO_DONE
+	};
+#endif
+
+	Assert(slot->tts_dp != NULL);
+
+	tpc = (char *) tup + tup->t_hoff;
+
+	/*
+	 * Check whether the first call for this tuple, and initialize or restore
+	 * loop state.
+	 */
+	attnum = slot->tts_nvalid;
+	if (attnum > 0)
+	{
+		/* Restore state from previous execution */
+		tpc += slot->tts_off;
+		dp += attnum;
+	}
+
+	/*
+	 * To avoid checking "progress" in every loop iteration, temporarily set
+	 * the last step + 1 to DONE. That'll cause the loop below to exit.
+	 */
+
+	oldop = slot->tts_dp[natts].attopcode;
+	slot->tts_dp[natts].attopcode = DO_DONE;
+
+	/*
+	 * FIXME: Instead of checking for attnum in SD_DISPATCH, temporarily set
+	 * the opcode of the relevant step to a new 'DONE' opcode.
+	 */
+#ifdef USE_COMPUTED_GOTO
+	goto *(((void**)dispatch_table)[dp->attopcode]);
+#else
+starteval:
+#endif
+	SD_SWITCH(dp->attopcode)
+	{
+	SD_CASE(DO_IND_FIXED_LENGTH):
+		Assert(tpc == (char *) TYPEALIGN(dp->alignto, tpc));
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += dp->attlen;
+		SD_DISPATCH();
+	SD_CASE(DO_VARLENA):
+		if (!VARATT_NOT_PAD_BYTE(tpc))
+		{
+			Assert(tpc == (char *) TYPEALIGN(dp->alignto, tpc));
+		}
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += VARSIZE_ANY(tpc);
+		SD_DISPATCH();
+	SD_CASE(DO_CSTRING):
+		Assert(tpc == (char *) TYPEALIGN(dp->alignto, tpc));
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += strlen(tpc) + 1;
+		SD_DISPATCH();
+	SD_CASE(DO_BYVAL_1):
+		Assert(dp->alignto == 1);
+		isnull[attnum] = false;
+		values[attnum] = CharGetDatum(*(char *) tpc);
+		tpc += sizeof(char);
+		SD_DISPATCH();
+	SD_CASE(DO_BYVAL_2):
+		Assert(dp->alignto == ALIGNOF_SHORT);
+		Assert(tpc == (char *) TYPEALIGN(ALIGNOF_SHORT, tpc));
+		isnull[attnum] = false;
+		values[attnum] = Int16GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int16);
+		SD_DISPATCH();
+	SD_CASE(DO_BYVAL_4):
+		Assert(dp->alignto == ALIGNOF_INT);
+		Assert(tpc == (char *) TYPEALIGN(ALIGNOF_INT, tpc));
+		isnull[attnum] = false;
+		values[attnum] = Int32GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int32);
+		SD_DISPATCH();
+	SD_CASE(DO_BYVAL_8):
+		Assert(SIZEOF_DATUM == 8);
+		Assert(dp->alignto == ALIGNOF_DOUBLE);
+		Assert(tpc == (char *) TYPEALIGN(ALIGNOF_DOUBLE, tpc));
+		isnull[attnum] = false;
+		values[attnum] = *(Datum *) tpc;
+		tpc += sizeof(void*);
+		SD_DISPATCH();
+
+	SD_CASE(DO_NULLABLE_IND_FIXED_LENGTH):
+		SD_CHECKNULL;
+		tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += dp->attlen;
+		SD_DISPATCH();
+	SD_CASE(DO_NULLABLE_VARLENA):
+		SD_CHECKNULL;
+		if (!VARATT_NOT_PAD_BYTE(tpc))
+			tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += VARSIZE_ANY(tpc);
+		SD_DISPATCH();
+	SD_CASE(DO_NULLABLE_CSTRING):
+		SD_CHECKNULL;
+		tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += strlen(tpc) + 1;
+		SD_DISPATCH();
+	SD_CASE(DO_NULLABLE_BYVAL_1):
+		SD_CHECKNULL;
+		Assert(dp->alignto == 1);
+		tpc = (char *) TYPEALIGN(1, tpc);
+		isnull[attnum] = false;
+		values[attnum] = CharGetDatum(*(char *) tpc);
+		tpc += sizeof(char);
+		SD_DISPATCH();
+	SD_CASE(DO_NULLABLE_BYVAL_2):
+		SD_CHECKNULL;
+		Assert(dp->alignto == ALIGNOF_SHORT);
+		tpc = (char *) TYPEALIGN(ALIGNOF_SHORT, tpc);
+		isnull[attnum] = false;
+		values[attnum] = Int16GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int16);
+		SD_DISPATCH();
+	SD_CASE(DO_NULLABLE_BYVAL_4):
+		SD_CHECKNULL;
+		Assert(dp->alignto == ALIGNOF_INT);
+		tpc = (char *) TYPEALIGN(ALIGNOF_INT, tpc);
+		isnull[attnum] = false;
+		values[attnum] = Int32GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int32);
+		SD_DISPATCH();
+	SD_CASE(DO_NULLABLE_BYVAL_8):
+		SD_CHECKNULL;
+		Assert(SIZEOF_DATUM == 8);
+		Assert(dp->alignto == ALIGNOF_DOUBLE);
+		tpc = (char *) TYPEALIGN(ALIGNOF_DOUBLE, tpc);
+		isnull[attnum] = false;
+		values[attnum] = *(Datum *) tpc;
+		tpc += sizeof(void*);
+		SD_DISPATCH();
+
+	SD_CASE(DO_ALIGN_IND_FIXED_LENGTH):
+		tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += dp->attlen;
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_VARLENA):
+		if (!VARATT_NOT_PAD_BYTE(tpc))
+			tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += VARSIZE_ANY(tpc);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_CSTRING):
+		tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += strlen(tpc) + 1;
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_BYVAL_1):
+		Assert(dp->alignto == 1);
+		tpc = (char *) TYPEALIGN(1, tpc);
+		isnull[attnum] = false;
+		values[attnum] = CharGetDatum(*(char *) tpc);
+		tpc += sizeof(char);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_BYVAL_2):
+		Assert(dp->alignto == ALIGNOF_SHORT);
+		tpc = (char *) TYPEALIGN(ALIGNOF_SHORT, tpc);
+		isnull[attnum] = false;
+		values[attnum] = Int16GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int16);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_BYVAL_4):
+		Assert(dp->alignto == ALIGNOF_INT);
+		tpc = (char *) TYPEALIGN(ALIGNOF_INT, tpc);
+		isnull[attnum] = false;
+		values[attnum] = Int32GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int32);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_BYVAL_8):
+		Assert(SIZEOF_DATUM == 8);
+		Assert(dp->alignto == ALIGNOF_DOUBLE);
+		tpc = (char *) TYPEALIGN(ALIGNOF_DOUBLE, tpc);
+		isnull[attnum] = false;
+		values[attnum] = *(Datum *) tpc;
+		tpc += sizeof(void*);
+		SD_DISPATCH();
+
+	SD_CASE(DO_ALIGN_NULLABLE_IND_FIXED_LENGTH):
+		SD_CHECKNULL;
+		tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += dp->attlen;
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_NULLABLE_VARLENA):
+		SD_CHECKNULL;
+		if (!VARATT_NOT_PAD_BYTE(tpc))
+			tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += VARSIZE_ANY(tpc);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_NULLABLE_CSTRING):
+		SD_CHECKNULL;
+		tpc = (char *) TYPEALIGN(dp->alignto, tpc);
+		isnull[attnum] = false;
+		values[attnum] = PointerGetDatum((char *) tpc);
+		tpc += strlen(tpc) + 1;
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_NULLABLE_BYVAL_1):
+		SD_CHECKNULL;
+		Assert(dp->alignto == 1);
+		tpc = (char *) TYPEALIGN(1, tpc);
+		isnull[attnum] = false;
+		values[attnum] = CharGetDatum(*(char *) tpc);
+		tpc += sizeof(char);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_NULLABLE_BYVAL_2):
+		SD_CHECKNULL;
+		Assert(dp->alignto == ALIGNOF_SHORT);
+		tpc = (char *) TYPEALIGN(ALIGNOF_SHORT, tpc);
+		isnull[attnum] = false;
+		values[attnum] = Int16GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int16);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_NULLABLE_BYVAL_4):
+		SD_CHECKNULL;
+		Assert(dp->alignto == ALIGNOF_INT);
+		tpc = (char *) TYPEALIGN(ALIGNOF_INT, tpc);
+		isnull[attnum] = false;
+		values[attnum] = Int32GetDatum(*(int32 *) tpc);
+		tpc += sizeof(int32);
+		SD_DISPATCH();
+	SD_CASE(DO_ALIGN_NULLABLE_BYVAL_8):
+		SD_CHECKNULL;
+		Assert(SIZEOF_DATUM == 8);
+		Assert(dp->alignto == ALIGNOF_DOUBLE);
+		tpc = (char *) TYPEALIGN(ALIGNOF_DOUBLE, tpc);
+		isnull[attnum] = false;
+		values[attnum] = *(Datum *) tpc;
+		tpc += sizeof(void*);
+		SD_DISPATCH();
+	SD_CASE(DO_DONE):
+		goto out;
+	}
+out:
+
+	/* reset end marker */
+	slot->tts_dp[natts].attopcode = oldop;
+
+	/*
+	 * Save state for next execution
+	 */
+	slot->tts_nvalid = attnum;
+	slot->tts_off = tpc - ((char *) tup + tup->t_hoff);
+	slot->tts_slow = true;
+
+}
+
+#undef SD_CHECKNULL
+#undef SD_SWITCH
+#undef SD_DISPATCH
+#undef SD_CASE
+
+/*
+ * slot_deform_tuple_safe
+ *		Given a TupleTableSlot, extract data from the slot's physical tuple
+ *		into its Datum/isnull arrays.  Data is extracted up through the
+ *		natts'th column (caller must ensure this is a legal column number).
+ *
+ *		This is essentially an incremental version of heap_deform_tuple:
+ *		on each call we extract attributes up to the one needed, without
+ *		re-computing information about previously extracted attributes.
+ *		slot->tts_nvalid is the number of attributes already extracted.
+ */
+void
+slot_deform_tuple_safe(TupleTableSlot *slot, int natts)
+{
+	HeapTuple	tuple = slot->tts_tuple;
 	TupleDesc	tupleDesc = slot->tts_tupleDescriptor;
 	Datum	   *values = slot->tts_values;
 	bool	   *isnull = slot->tts_isnull;
@@ -980,23 +1500,11 @@ slot_deform_tuple(TupleTableSlot *slot, int natts)
 	bits8	   *bp = tup->t_bits;		/* ptr to null bitmap in tuple */
 	bool		slow;			/* can we use/set attcacheoff? */
 
-	/*
-	 * Check whether the first call for this tuple, and initialize or restore
-	 * loop state.
-	 */
-	attnum = slot->tts_nvalid;
-	if (attnum == 0)
-	{
-		/* Start from the first attribute */
-		off = 0;
-		slow = false;
-	}
-	else
-	{
-		/* Restore state from previous execution */
-		off = slot->tts_off;
-		slow = slot->tts_slow;
-	}
+	slot->tts_nvalid = 0;
+	/* always start from the first attribute */
+	off = 0;
+	slow = false;
+	attnum = 0;
 
 	tp = (char *) tup + tup->t_hoff;
 
@@ -1059,6 +1567,7 @@ slot_deform_tuple(TupleTableSlot *slot, int natts)
 	slot->tts_slow = slow;
 }
 
+
 /*
  * slot_getattr
  *		This function fetches an attribute of the slot's current tuple.
@@ -1076,16 +1585,16 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
 {
 	HeapTuple	tuple = slot->tts_tuple;
 	TupleDesc	tupleDesc = slot->tts_tupleDescriptor;
-	HeapTupleHeader tup;
+//	HeapTupleHeader tup;
 
 	/*
 	 * system attributes are handled by heap_getsysattr
 	 */
-	if (attnum <= 0)
+	if (unlikely(attnum <= 0))
 	{
-		if (tuple == NULL)		/* internal error */
+		if (unlikely(tuple == NULL))		/* internal error */
 			elog(ERROR, "cannot extract system attribute from virtual tuple");
-		if (tuple == &(slot->tts_minhdr))		/* internal error */
+		if (unlikely(tuple == &(slot->tts_minhdr)))		/* internal error */
 			elog(ERROR, "cannot extract system attribute from minimal tuple");
 		return heap_getsysattr(tuple, attnum, tupleDesc, isnull);
 	}
@@ -1093,7 +1602,7 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
 	/*
 	 * fast path if desired attribute already cached
 	 */
-	if (attnum <= slot->tts_nvalid)
+	if (likely(attnum <= slot->tts_nvalid))
 	{
 		*isnull = slot->tts_isnull[attnum - 1];
 		return slot->tts_values[attnum - 1];
@@ -1102,7 +1611,7 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
 	/*
 	 * return NULL if attnum is out of range according to the tupdesc
 	 */
-	if (attnum > tupleDesc->natts)
+	if (unlikely(attnum > tupleDesc->natts))
 	{
 		*isnull = true;
 		return (Datum) 0;
@@ -1112,9 +1621,9 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
 	 * otherwise we had better have a physical tuple (tts_nvalid should equal
 	 * natts in all virtual-tuple cases)
 	 */
-	if (tuple == NULL)			/* internal error */
-		elog(ERROR, "cannot extract attribute from empty tuple slot");
+	Assert(tuple != NULL);
 
+#if 0
 	/*
 	 * return NULL if attnum is out of range according to the tuple
 	 *
@@ -1123,7 +1632,7 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
 	 * than the tupdesc.)
 	 */
 	tup = tuple->t_data;
-	if (attnum > HeapTupleHeaderGetNatts(tup))
+	if (unlikely(attnum > HeapTupleHeaderGetNatts(tup)))
 	{
 		*isnull = true;
 		return (Datum) 0;
@@ -1137,6 +1646,7 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
 		*isnull = true;
 		return (Datum) 0;
 	}
+#endif
 
 	/*
 	 * If the attribute's column has been dropped, we force a NULL result.
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 66003c9..7443bab 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1707,6 +1707,9 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
 		int			natts = tupdesc->natts;
 		int			attrChk;
 
+		/* re-parse, disregarding NOT NULL constraints */
+		slot_deform_tuple_safe(slot, slot->tts_tupleDescriptor->natts);
+
 		for (attrChk = 1; attrChk <= natts; attrChk++)
 		{
 			if (tupdesc->attrs[attrChk - 1]->attnotnull &&
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 533050d..9e00649 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -123,6 +123,7 @@ MakeTupleTableSlot(void)
 	slot->tts_values = NULL;
 	slot->tts_isnull = NULL;
 	slot->tts_mintuple = NULL;
+	slot->tts_off = 0;
 
 	return slot;
 }
@@ -173,6 +174,13 @@ ExecResetTupleTable(List *tupleTable,	/* tuple table */
 			slot->tts_tupleDescriptor = NULL;
 		}
 
+		if (slot->tts_dp)
+		{
+			pfree(slot->tts_dp);
+			slot->tts_dp = NULL;
+			slot->tts_dp_alloc = slot->tts_dp_len = 0;
+		}
+
 		/* If shouldFree, release memory occupied by the slot itself */
 		if (shouldFree)
 		{
@@ -222,7 +230,10 @@ ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
 	Assert(IsA(slot, TupleTableSlot));
 	ExecClearTuple(slot);
 	if (slot->tts_tupleDescriptor)
+	{
 		ReleaseTupleDesc(slot->tts_tupleDescriptor);
+		pfree(slot->tts_dp);
+	}
 	if (slot->tts_values)
 		pfree(slot->tts_values);
 	if (slot->tts_isnull)
@@ -258,7 +269,12 @@ ExecSetSlotDescriptor(TupleTableSlot *slot,		/* slot to change */
 	 * present (we don't bother to check if they could be re-used).
 	 */
 	if (slot->tts_tupleDescriptor)
+	{
 		ReleaseTupleDesc(slot->tts_tupleDescriptor);
+		pfree(slot->tts_dp);
+		slot->tts_dp = NULL;
+		slot->tts_dp_alloc = slot->tts_dp_len = 0;
+	}
 
 	if (slot->tts_values)
 		pfree(slot->tts_values);
@@ -270,6 +286,7 @@ ExecSetSlotDescriptor(TupleTableSlot *slot,		/* slot to change */
 	 */
 	slot->tts_tupleDescriptor = tupdesc;
 	PinTupleDesc(tupdesc);
+	slot_prepare_deform(slot);
 
 	/*
 	 * Allocate Datum/isnull arrays of the appropriate size.  These must have
@@ -353,6 +370,7 @@ ExecStoreTuple(HeapTuple tuple,
 
 	/* Mark extracted state invalid */
 	slot->tts_nvalid = 0;
+	slot->tts_off = 0;
 
 	/*
 	 * If tuple is on a disk page, keep the page pinned as long as we hold a
@@ -426,6 +444,7 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
 
 	/* Mark extracted state invalid */
 	slot->tts_nvalid = 0;
+	slot->tts_off = 0;
 
 	return slot;
 }
@@ -472,6 +491,7 @@ ExecClearTuple(TupleTableSlot *slot)	/* slot in which to store tuple */
 	 */
 	slot->tts_isempty = true;
 	slot->tts_nvalid = 0;
+	slot->tts_off = 0;
 
 	return slot;
 }
@@ -499,6 +519,7 @@ ExecStoreVirtualTuple(TupleTableSlot *slot)
 
 	slot->tts_isempty = false;
 	slot->tts_nvalid = slot->tts_tupleDescriptor->natts;
+	slot->tts_off = 0;
 
 	return slot;
 }
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index 5ac0b6a..98993d9 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -18,6 +18,54 @@
 #include "access/tupdesc.h"
 #include "storage/buf.h"
 
+typedef enum DeparseOpcode
+{
+	/* not nullable, properly aligned */
+	DO_IND_FIXED_LENGTH = 0,
+	DO_VARLENA,
+	DO_CSTRING,
+	DO_BYVAL_1,
+	DO_BYVAL_2,
+	DO_BYVAL_4,
+	DO_BYVAL_8,
+
+	/* nullable, properly aligned */
+	DO_NULLABLE_IND_FIXED_LENGTH,
+	DO_NULLABLE_VARLENA,
+	DO_NULLABLE_CSTRING,
+	DO_NULLABLE_BYVAL_1,
+	DO_NULLABLE_BYVAL_2,
+	DO_NULLABLE_BYVAL_4,
+	DO_NULLABLE_BYVAL_8,
+
+	/* not nullable, possibly aligned */
+	DO_ALIGN_IND_FIXED_LENGTH,
+	DO_ALIGN_VARLENA,
+	DO_ALIGN_CSTRING,
+	DO_ALIGN_BYVAL_1,
+	DO_ALIGN_BYVAL_2,
+	DO_ALIGN_BYVAL_4,
+	DO_ALIGN_BYVAL_8,
+
+	/* nullable, possibly aligned */
+	DO_ALIGN_NULLABLE_IND_FIXED_LENGTH,
+	DO_ALIGN_NULLABLE_VARLENA,
+	DO_ALIGN_NULLABLE_CSTRING,
+	DO_ALIGN_NULLABLE_BYVAL_1,
+	DO_ALIGN_NULLABLE_BYVAL_2,
+	DO_ALIGN_NULLABLE_BYVAL_4,
+	DO_ALIGN_NULLABLE_BYVAL_8,
+	DO_DONE
+} DeparseOpcode;
+
+typedef struct DeparseSlotState
+{
+	int attlen;
+	DeparseOpcode attopcode;
+	bool attbyval;
+	uintptr_t alignto;
+} DeparseSlotState;
+
 /*----------
  * The executor stores tuples in a "tuple table" which is a List of
  * independent TupleTableSlots.  There are several cases we need to handle:
@@ -124,6 +172,9 @@ typedef struct TupleTableSlot
 	int			tts_nvalid;		/* # of valid values in tts_values */
 	Datum	   *tts_values;		/* current per-attribute values */
 	bool	   *tts_isnull;		/* current per-attribute isnull flags */
+	DeparseSlotState *tts_dp;
+	int			tts_dp_len;
+	int			tts_dp_alloc;
 	MinimalTuple tts_mintuple;	/* minimal tuple, or NULL if none */
 	HeapTupleData tts_minhdr;	/* workspace for minimal-tuple-only case */
 	long		tts_off;		/* saved state for slot_deform_tuple */
@@ -169,5 +220,8 @@ extern Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull);
 extern void slot_getallattrs(TupleTableSlot *slot);
 extern void slot_getsomeattrs(TupleTableSlot *slot, int attnum);
 extern bool slot_attisnull(TupleTableSlot *slot, int attnum);
+extern void slot_deform_tuple_safe(TupleTableSlot *slot, int natts);
+
+extern void slot_prepare_deform(TupleTableSlot *slot);
 
 #endif   /* TUPTABLE_H */
-- 
2.8.1

