>From a7676fcd38e3bc95e77cb751962909c98e5fe6cd Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 17 Feb 2013 01:38:17 +0100
Subject: [PATCH 2/2] Add support for multiple kinds of external toast datums
 & different toast compression algorithms

There are several usecases where our current representation of external toast
datums is limiting:
* adding new compression schemes
* avoidance of repeated detoasting
* externally decoded toast tuples

For that support 'tags' on external (varattrib_1b_e) varlenas which recoin the
current va_len_1be field to store the tag (or type) of a varlena. To determine
the actual length a macro VARTAG_SIZE(tag) is added which can be used to map
from a tag to the actual length.

This patch adds support for 'indirect' tuples which point to some externally
allocated memory containing a toast tuple. It also implements the stub for a
different compression algorithm.

Transparently add capability to compress with snappy
---
 src/backend/access/common/indextuple.c |    2 +-
 src/backend/access/heap/tuptoaster.c   |  341 +++++++++++++++++++++++++++-----
 src/backend/utils/misc/guc.c           |   11 ++
 src/include/access/tuptoaster.h        |    3 +-
 src/include/c.h                        |    2 +
 src/include/postgres.h                 |  119 ++++++-----
 src/include/utils/pg_lzcompress.h      |    4 +-
 7 files changed, 381 insertions(+), 101 deletions(-)

diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index 35cc55f..3ff2974 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -89,7 +89,7 @@ index_form_tuple(TupleDesc tupleDescriptor,
 		VARSIZE(DatumGetPointer(untoasted_values[i])) > TOAST_INDEX_TARGET &&
 			(att->attstorage == 'x' || att->attstorage == 'm'))
 		{
-			Datum		cvalue = toast_compress_datum(untoasted_values[i]);
+			Datum		cvalue = toast_compress_datum(untoasted_values[i], true /* inline */);
 
 			if (DatumGetPointer(cvalue) != NULL)
 			{
diff --git a/src/backend/access/heap/tuptoaster.c b/src/backend/access/heap/tuptoaster.c
index fc37ceb..aacb1ba 100644
--- a/src/backend/access/heap/tuptoaster.c
+++ b/src/backend/access/heap/tuptoaster.c
@@ -34,6 +34,7 @@
 #include "access/heapam.h"
 #include "access/tuptoaster.h"
 #include "access/xact.h"
+#include "common/snappy/snappy.h"
 #include "catalog/catalog.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_lzcompress.h"
@@ -72,6 +73,7 @@ do { \
 	memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
 } while (0)
 
+int toast_compression_algo = 0;
 
 static void toast_delete_datum(Relation rel, Datum value);
 static Datum toast_save_datum(Relation rel, Datum value,
@@ -81,7 +83,8 @@ static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
 static struct varlena *toast_fetch_datum(struct varlena * attr);
 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
 						int32 sliceoffset, int32 length);
-
+static Datum toast_uncompress_datum(Datum attr);
+static Size toast_uncompressed_length(struct varlena *attr);
 
 /* ----------
  * heap_tuple_fetch_attr -
@@ -128,7 +131,7 @@ heap_tuple_fetch_attr(struct varlena * attr)
 struct varlena *
 heap_tuple_untoast_attr(struct varlena * attr)
 {
-	if (VARATT_IS_EXTERNAL(attr))
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
 	{
 		/*
 		 * This is an externally stored datum --- fetch it back from there
@@ -137,24 +140,32 @@ heap_tuple_untoast_attr(struct varlena * attr)
 		/* If it's compressed, decompress it */
 		if (VARATT_IS_COMPRESSED(attr))
 		{
-			PGLZ_Header *tmp = (PGLZ_Header *) attr;
+			struct varlena *tmp = attr;
+
+			attr = (struct varlena *) DatumGetPointer(
+				toast_uncompress_datum(PointerGetDatum(attr)));
 
-			attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-			SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-			pglz_decompress(tmp, VARDATA(attr));
-			pfree(tmp);
+			/* don't free tuple if its coming from memory, not disk */
+			if (!VARATT_IS_EXTERNAL_INDIRECT(tmp))
+				pfree(tmp);
 		}
 	}
+	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		struct varatt_indirect redirect;
+		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+		attr = (struct varlena *)redirect.pointer;
+		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
+
+		attr = heap_tuple_untoast_attr(attr);
+	}
 	else if (VARATT_IS_COMPRESSED(attr))
 	{
 		/*
 		 * This is a compressed value inside of the main tuple
 		 */
-		PGLZ_Header *tmp = (PGLZ_Header *) attr;
-
-		attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-		SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-		pglz_decompress(tmp, VARDATA(attr));
+		attr = (struct varlena *) DatumGetPointer(
+			toast_uncompress_datum(PointerGetDatum(attr)));
 	}
 	else if (VARATT_IS_SHORT(attr))
 	{
@@ -191,7 +202,7 @@ heap_tuple_untoast_attr_slice(struct varlena * attr,
 	char	   *attrdata;
 	int32		attrsize;
 
-	if (VARATT_IS_EXTERNAL(attr))
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
 	{
 		struct varatt_external toast_pointer;
 
@@ -204,20 +215,20 @@ heap_tuple_untoast_attr_slice(struct varlena * attr,
 		/* fetch it back (compressed marker will get set automatically) */
 		preslice = toast_fetch_datum(attr);
 	}
+	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		struct varatt_indirect redirect;
+		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+		return heap_tuple_untoast_attr_slice(redirect.pointer,
+											 sliceoffset, slicelength);
+	}
 	else
 		preslice = attr;
 
 	if (VARATT_IS_COMPRESSED(preslice))
 	{
-		PGLZ_Header *tmp = (PGLZ_Header *) preslice;
-		Size		size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
-
-		preslice = (struct varlena *) palloc(size);
-		SET_VARSIZE(preslice, size);
-		pglz_decompress(tmp, VARDATA(preslice));
-
-		if (tmp != (PGLZ_Header *) attr)
-			pfree(tmp);
+		preslice = (struct varlena *) DatumGetPointer(
+			toast_uncompress_datum(PointerGetDatum(preslice)));
 	}
 
 	if (VARATT_IS_SHORT(preslice))
@@ -267,7 +278,7 @@ toast_raw_datum_size(Datum value)
 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
 	Size		result;
 
-	if (VARATT_IS_EXTERNAL(attr))
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
 	{
 		/* va_rawsize is the size of the original datum -- including header */
 		struct varatt_external toast_pointer;
@@ -275,10 +286,16 @@ toast_raw_datum_size(Datum value)
 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
 		result = toast_pointer.va_rawsize;
 	}
+	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		struct varatt_indirect toast_pointer;
+
+		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+		return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
+	}
 	else if (VARATT_IS_COMPRESSED(attr))
 	{
-		/* here, va_rawsize is just the payload size */
-		result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
+		result = toast_uncompressed_length(attr);
 	}
 	else if (VARATT_IS_SHORT(attr))
 	{
@@ -308,7 +325,7 @@ toast_datum_size(Datum value)
 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
 	Size		result;
 
-	if (VARATT_IS_EXTERNAL(attr))
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
 	{
 		/*
 		 * Attribute is stored externally - return the extsize whether
@@ -320,6 +337,13 @@ toast_datum_size(Datum value)
 		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
 		result = toast_pointer.va_extsize;
 	}
+	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		struct varatt_indirect toast_pointer;
+
+		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+		return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
+	}
 	else if (VARATT_IS_SHORT(attr))
 	{
 		result = VARSIZE_SHORT(attr);
@@ -387,12 +411,56 @@ toast_delete(Relation rel, HeapTuple oldtup)
 		{
 			Datum		value = toast_values[i];
 
-			if (!toast_isnull[i] && VARATT_IS_EXTERNAL(PointerGetDatum(value)))
+			if (toast_isnull[i])
+				continue;
+			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
 				toast_delete_datum(rel, value);
+			else if (VARATT_IS_EXTERNAL_INDIRECT(PointerGetDatum(value)))
+				elog(ERROR, "cannot delete tuples with indirect toast tuples for now");
 		}
 	}
 }
 
+/* ----------
+ * toast_datum_differs -
+ *
+ *  Determine whether two toasted datums are the same and don't have to be
+ *  stored again.
+ * ----------
+ */
+static bool
+toast_datum_differs(struct varlena *old_value, struct varlena *new_value)
+{
+	Assert(VARATT_IS_EXTERNAL(old_value));
+	Assert(VARATT_IS_EXTERNAL(new_value));
+
+	/* fast path for the common case where we have the toast oid available */
+	if (VARATT_IS_EXTERNAL_ONDISK(old_value) &&
+		VARATT_IS_EXTERNAL_ONDISK(new_value))
+		return memcmp((char *) old_value, (char *) new_value,
+					  VARSIZE_EXTERNAL(old_value)) != 0;
+
+	/*
+	 * compare size of tuples, so we don't uselessly detoast/decompress tuples
+	 * if they can't be the same anyway.
+	 */
+	if (toast_raw_datum_size(PointerGetDatum(old_value)) !=
+		toast_raw_datum_size(PointerGetDatum(new_value)))
+		return false;
+
+	old_value = heap_tuple_untoast_attr(old_value);
+	new_value = heap_tuple_untoast_attr(new_value);
+
+	Assert(!VARATT_IS_EXTERNAL(old_value));
+	Assert(!VARATT_IS_EXTERNAL(new_value));
+	Assert(!VARATT_IS_COMPRESSED(old_value));
+	Assert(!VARATT_IS_COMPRESSED(new_value));
+	Assert(VARSIZE_ANY_EXHDR(old_value) == VARSIZE_ANY_EXHDR(new_value));
+
+	/* compare payload, we're fine with unaligned data */
+	return memcmp(VARDATA_ANY(old_value), VARDATA_ANY(new_value),
+				  VARSIZE_ANY_EXHDR(old_value)) != 0;
+}
 
 /* ----------
  * toast_insert_or_update -
@@ -497,8 +565,7 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 				VARATT_IS_EXTERNAL(old_value))
 			{
 				if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
-					memcmp((char *) old_value, (char *) new_value,
-						   VARSIZE_EXTERNAL(old_value)) != 0)
+					toast_datum_differs(old_value, new_value))
 				{
 					/*
 					 * The old external stored value isn't needed any more
@@ -645,7 +712,7 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 		if (att[i]->attstorage == 'x')
 		{
 			old_value = toast_values[i];
-			new_value = toast_compress_datum(old_value);
+			new_value = toast_compress_datum(old_value, true /* inline */);
 
 			if (DatumGetPointer(new_value) != NULL)
 			{
@@ -784,7 +851,7 @@ toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 		 */
 		i = biggest_attno;
 		old_value = toast_values[i];
-		new_value = toast_compress_datum(old_value);
+		new_value = toast_compress_datum(old_value, true /* inline */);
 
 		if (DatumGetPointer(new_value) != NULL)
 		{
@@ -1177,10 +1244,13 @@ toast_flatten_tuple_attribute(Datum value,
  * ----------
  */
 Datum
-toast_compress_datum(Datum value)
+toast_compress_datum(Datum value, bool is_inline)
 {
-	struct varlena *tmp;
+	struct varlena *tmp = NULL;
 	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+	int32		compressed_size;
+	Size		buffer_size;
+	int			ret;
 
 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
@@ -1188,36 +1258,195 @@ toast_compress_datum(Datum value)
 	/*
 	 * No point in wasting a palloc cycle if value size is out of the allowed
 	 * range for compression
+	 *
+	 * XXX: Generalize concept, without referring to PGLZ
 	 */
 	if (valsize < PGLZ_strategy_default->min_input_size ||
 		valsize > PGLZ_strategy_default->max_input_size)
 		return PointerGetDatum(NULL);
 
-	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
+	/*
+	 * choose compressor based on toast_compression_algo GUC.
+	 * XXX: We probably rather want a storage attribute for that.
+	 */
+	/* compress with pglz */
+	if (toast_compression_algo == 0)
+	{
+		tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
+
+		if (!pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
+						   (PGLZ_Header *) tmp, PGLZ_strategy_default))
+			goto incompressible;
+		/* pglz_compress sets rawsize internally */
+	}
+	/* compress with snappy */
+	else if (toast_compression_algo == 1)
+	{
+		static struct snappy_env *snappy_env = NULL;
+		if (snappy_env == NULL)
+		{
+			snappy_env = malloc(sizeof(struct snappy_env));
+			snappy_init_env(snappy_env);
+		}
+		/* ask compressor about buffer size */
+		buffer_size = snappy_max_compressed_length(valsize);
+		tmp = (struct varlena *) palloc(buffer_size	+ VARHDRSZ + 4);
+
+		ret = snappy_compress(snappy_env, VARDATA_ANY(DatumGetPointer(value)),
+							  (size_t)valsize, ((char *)VARDATA(tmp)) + 4,
+							  &buffer_size);
+		/* EIO is returned for incompressible data */
+		if (ret == EIO)
+			goto incompressible;
+		else if (ret != 0)
+			elog(ERROR, "compression failed: %d", ret);
+
+		/* encode compression algorithm in size */
+		*((uint32 *)VARDATA(tmp)) = 1 << 30 | valsize;
+		SET_VARSIZE_COMPRESSED(tmp, buffer_size + VARHDRSZ + 4);
+	}
+	else
+		elog(ERROR, "huh? There's not much between 1 and zero");
+
+	compressed_size = VARSIZE(tmp);
 
 	/*
-	 * We recheck the actual size even if pglz_compress() reports success,
-	 * because it might be satisfied with having saved as little as one byte
-	 * in the compressed data --- which could turn into a net loss once you
-	 * consider header and alignment padding.  Worst case, the compressed
-	 * format might require three padding bytes (plus header, which is
-	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
-	 * only one header byte and no padding if the value is short enough.  So
-	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
+	 * Check whether the compression was sufficiently effective. Some of the
+	 * compression methods check for blowing up to a larger amount of data than
+	 * the source, some don't. Even if they do, like pglz_compress(), they
+	 * might reports success, having saved as little as one byte in the
+	 * compressed data --- which could turn into a net loss once you consider
+	 * header and alignment padding.  Worst case, the compressed format might
+	 * require three padding bytes (plus header, which is included in
+	 * VARSIZE(tmp)), whereas the uncompressed format would take only one
+	 * header byte and no padding if the value is short enough.  So we insist
+	 * on a savings of more than 2 bytes to ensure we have a gain.
 	 */
-	if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
-					  (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
-		VARSIZE(tmp) < valsize - 2)
+	if (compressed_size < valsize - 2)
 	{
 		/* successful compression */
 		return PointerGetDatum(tmp);
 	}
+
+	/* incompressible data */
+incompressible:
+	if (tmp != NULL)
+		pfree(tmp);
+	return PointerGetDatum(NULL);
+}
+
+static Size
+toast_uncompressed_length(struct varlena *attr)
+{
+#ifdef USE_ASSERT_CHECKING
+	uint8 compression_type;
+#endif
+	Size result;
+
+	Assert(VARATT_IS_COMPRESSED(attr));
+
+	/*
+	 * Some compression methods have internal knowledge about the datum
+	 * length. Crosscheck.
+	 */
+#ifdef USE_ASSERT_CHECKING
+	compression_type = (*(uint32 *) (VARDATA(attr))) >> 30;
+
+	/* pglz stores the size as uint32 at the beginning */
+	if (compression_type == 0)
+	{
+		/* here, rawsize is just the payload size */
+		result = PGLZ_RAW_SIZE((PGLZ_Header *)attr);
+	}
+
+	/* snappy encodes the length as a varint */
+	else if (compression_type == 1)
+	{
+		if (!snappy_uncompressed_length(((char *)VARDATA(attr)) + 4,
+									   VARSIZE_ANY_EXHDR(attr) - 4,
+									   &result))
+			elog(ERROR, "could not read uncompressed size");
+	}
 	else
 	{
-		/* incompressible data */
-		pfree(tmp);
-		return PointerGetDatum(NULL);
+		elog(ERROR, "unknown compression method %u", (uint32)compression_type);
+	}
+	Assert(((*(uint32 *) (VARDATA(attr))) & 0x3ffffff) == result);
+#endif
+
+	result = (*(uint32 *) (VARDATA(attr))) & 0x3ffffff;
+
+	/* varlena overhead */
+	result += VARHDRSZ;
+	return result;
+}
+
+static Datum
+toast_uncompress_datum(Datum value)
+{
+	struct varlena *attr =  (struct varlena *) DatumGetPointer(value);
+	uint8 compression_type;
+
+	Assert(VARATT_IS_4B_C(value));
+
+	/* ----
+	 * Disambiguate between compression strategies:
+	 *
+	 * In PGLZ - the formerly only compression method - the first 4 bytes are
+	 * used to store the raw size of the datum as a signed integer. Since that
+	 * cannot be more than 1GB due to toast limitations we have the 2 high bits
+	 * to disambiguate whether its pglz or something more modern. We cannot
+	 * change the meaning of Datums with the first 2 bits unset since we need
+	 * to support the old ondisk format.
+	 *
+	 * If it's not pglz we store 1 byte of 1's and then 1 byte determining the
+	 * compression method. We could just use the two bytes to store 3 other
+	 * compression methods but maybe we better don't paint ourselves in a
+	 * corner again.
+	 * ----
+	 */
+	compression_type = (*(uint32 *) VARDATA(value)) >> 30;
+
+	if (compression_type == 0)
+	{
+		PGLZ_Header *tmp = (PGLZ_Header *) DatumGetPointer(value);
+		attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
+		SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
+		pglz_decompress(tmp, VARDATA(attr));
 	}
+	else if (compression_type == 1)
+	{
+		void *compressed_data;
+		Size compressed_length;
+		int ret;
+		Size uncompressed_length;
+
+		compressed_data = ((char *)VARDATA(attr)) + 4;
+		compressed_length = VARSIZE_ANY_EXHDR(attr) - 4;
+
+		ret = snappy_uncompressed_length(compressed_data,
+										 compressed_length,
+										 &uncompressed_length);
+		if (!ret)
+			elog(ERROR, "failed to determine compression length");
+		if (uncompressed_length != ((*(uint32 *) (VARDATA(attr))) & 0x3ffffff))
+			elog(ERROR, "compression size mismatch");
+
+		attr = (struct varlena *) palloc(uncompressed_length + VARHDRSZ);
+		SET_VARSIZE(attr, uncompressed_length + VARHDRSZ);
+
+		ret = snappy_uncompress(compressed_data,
+								compressed_length,
+								VARDATA(attr));
+		if (ret != 0)
+			elog(ERROR, "decompression failed: %d", ret);
+	}
+	else
+	{
+		elog(ERROR, "unknown extended compression method %c",
+			 compression_type);
+	}
+	return PointerGetDatum(attr);
 }
 
 
@@ -1258,6 +1487,8 @@ toast_save_datum(Relation rel, Datum value,
 	int32		data_todo;
 	Pointer		dval = DatumGetPointer(value);
 
+	Assert(!VARATT_IS_EXTERNAL(value));
+
 	/*
 	 * Open the toast relation and its index.  We can use the index to check
 	 * uniqueness of the OID we assign to the toasted item, even though it has
@@ -1284,10 +1515,11 @@ toast_save_datum(Relation rel, Datum value,
 	}
 	else if (VARATT_IS_COMPRESSED(dval))
 	{
+		struct varlena *dval_a = (struct varlena *) dval;
 		data_p = VARDATA(dval);
 		data_todo = VARSIZE(dval) - VARHDRSZ;
 		/* rawsize in a compressed datum is just the size of the payload */
-		toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
+		toast_pointer.va_rawsize = toast_uncompressed_length(dval_a);
 		toast_pointer.va_extsize = data_todo;
 		/* Assert that the numbers look like it's compressed */
 		Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
@@ -1341,7 +1573,7 @@ toast_save_datum(Relation rel, Datum value,
 		{
 			struct varatt_external old_toast_pointer;
 
-			Assert(VARATT_IS_EXTERNAL(oldexternal));
+			Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
 			/* Must copy to access aligned fields */
 			VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
 			if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
@@ -1456,7 +1688,7 @@ toast_save_datum(Relation rel, Datum value,
 	 * Create the TOAST pointer value that we'll return
 	 */
 	result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
-	SET_VARSIZE_EXTERNAL(result, TOAST_POINTER_SIZE);
+	SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
 	memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
 
 	return PointerGetDatum(result);
@@ -1483,6 +1715,8 @@ toast_delete_datum(Relation rel, Datum value)
 	if (!VARATT_IS_EXTERNAL(attr))
 		return;
 
+	Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
+
 	/* Must copy to access aligned fields */
 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
 
@@ -1608,6 +1842,9 @@ toast_fetch_datum(struct varlena * attr)
 	char	   *chunkdata;
 	int32		chunksize;
 
+	if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+		elog(ERROR, "shouldn't be called this way");
+
 	/* Must copy to access aligned fields */
 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
 
@@ -1775,7 +2012,7 @@ toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
 	int32		chcpystrt;
 	int32		chcpyend;
 
-	Assert(VARATT_IS_EXTERNAL(attr));
+	Assert(VARATT_IS_EXTERNAL_ONDISK(attr));
 
 	/* Must copy to access aligned fields */
 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ea16c64..5f3b4f5 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
 
 #include "access/gin.h"
 #include "access/transam.h"
+#include "access/tuptoaster.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
@@ -1889,6 +1890,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"toast_compression_algo", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("chooses the compression algo: 0 pglz, 1: snappy."),
+			NULL
+		},
+		&toast_compression_algo,
+		0, 0, 1,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"vacuum_freeze_min_age", PGC_USERSET, CLIENT_CONN_STATEMENT,
 			gettext_noop("Minimum age at which VACUUM should freeze a table row."),
 			NULL
diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h
index 6f4fc45..1db9073 100644
--- a/src/include/access/tuptoaster.h
+++ b/src/include/access/tuptoaster.h
@@ -94,6 +94,7 @@
 	 sizeof(int32) -									\
 	 VARHDRSZ)
 
+extern int toast_compression_algo;
 
 /* ----------
  * toast_insert_or_update -
@@ -170,7 +171,7 @@ extern Datum toast_flatten_tuple_attribute(Datum value,
  *	Create a compressed version of a varlena datum, if possible
  * ----------
  */
-extern Datum toast_compress_datum(Datum value);
+extern Datum toast_compress_datum(Datum value, bool is_inline);
 
 /* ----------
  * toast_raw_datum_size -
diff --git a/src/include/c.h b/src/include/c.h
index f2c9e12..7193af6 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -573,6 +573,8 @@ typedef NameData *Name;
 #define AssertMacro(condition)	((void)true)
 #define AssertArg(condition)
 #define AssertState(condition)
+#define TrapMacro(condition, errorType)	(true)
+
 #elif defined(FRONTEND)
 
 #include <assert.h>
diff --git a/src/include/postgres.h b/src/include/postgres.h
index 30e1dee..8b0fc7c 100644
--- a/src/include/postgres.h
+++ b/src/include/postgres.h
@@ -54,23 +54,52 @@
  */
 
 /*
- * struct varatt_external is a "TOAST pointer", that is, the information
- * needed to fetch a stored-out-of-line Datum.	The data is compressed
- * if and only if va_extsize < va_rawsize - VARHDRSZ.  This struct must not
- * contain any padding, because we sometimes compare pointers using memcmp.
+ * struct varatt_external is a "TOAST pointer", that is, the information needed
+ * to fetch a Datum stored in an out-of-line on-disk Datum. The data is
+ * compressed if and only if va_extsize < va_rawsize - VARHDRSZ.  This struct
+ * must not contain any padding, because we sometimes compare pointers using
+ * memcmp.
  *
  * Note that this information is stored unaligned within actual tuples, so
  * you need to memcpy from the tuple into a local struct variable before
  * you can look at these fields!  (The reason we use memcmp is to avoid
  * having to do that just to detect equality of two TOAST pointers...)
  */
-struct varatt_external
+typedef struct varatt_external
 {
 	int32		va_rawsize;		/* Original data size (includes header) */
 	int32		va_extsize;		/* External saved size (doesn't) */
 	Oid			va_valueid;		/* Unique ID of value within TOAST table */
 	Oid			va_toastrelid;	/* RelID of TOAST table containing it */
-};
+} varatt_external;
+
+/*
+ * Out-of-line Datum thats stored in memory in contrast to varatt_external
+ * pointers which points to data in an external toast relation.
+ *
+ * Note that just as varatt_external's this is stored unaligned within the
+ * tuple.
+ */
+typedef struct varatt_indirect
+{
+	struct varlena *pointer;	/* Pointer to in-memory varlena */
+} varatt_indirect;
+
+
+/*
+ * Type of external toast datum stored. The peculiar value for VARTAG_ONDISK
+ * comes from the requirement for on-disk compatibility with the older
+ * definitions of varattrib_1b_e where v_tag was named va_len_1be...
+ */
+typedef enum vartag_external {
+	VARTAG_INDIRECT = 1,
+	VARTAG_ONDISK = 18
+} vartag_external;
+
+#define VARTAG_SIZE(tag) \
+	((tag) == VARTAG_INDIRECT ? sizeof(varatt_indirect) :		\
+	 (tag) == VARTAG_ONDISK ? sizeof(varatt_external) : \
+	 TrapMacro(false, "unknown vartag"))
 
 /*
  * These structs describe the header of a varlena object that may have been
@@ -81,32 +110,27 @@ struct varatt_external
  * compiler might otherwise think it could generate code that assumes
  * alignment while touching fields of a 1-byte-header varlena.
  */
-typedef union
+
+/* Normal and inline compressed varlena (4-byte length) */
+typedef struct
 {
-	struct						/* Normal varlena (4-byte length) */
-	{
-		uint32		va_header;
-		char		va_data[1];
-	}			va_4byte;
-	struct						/* Compressed-in-line format */
-	{
-		uint32		va_header;
-		uint32		va_rawsize; /* Original data size (excludes header) */
-		char		va_data[1]; /* Compressed data */
-	}			va_compressed;
+	uint32		va_header;
+	char		va_data[1];
 } varattrib_4b;
 
+/* short inline uncompressed varlena (1-byte lenght) */
 typedef struct
 {
 	uint8		va_header;
 	char		va_data[1];		/* Data begins here */
 } varattrib_1b;
 
+/* inline portion of a short varlena pointing to an external resource */
 typedef struct
 {
 	uint8		va_header;		/* Always 0x80 or 0x01 */
-	uint8		va_len_1be;		/* Physical length of datum */
-	char		va_data[1];		/* Data (for now always a TOAST pointer) */
+	uint8		va_tag;			/* Type of datum */
+	char		va_data[1];		/* Data (of the type indicated by va_tag) */
 } varattrib_1b_e;
 
 /*
@@ -130,6 +154,9 @@ typedef struct
  * first byte.	Also, it is not possible for a 1-byte length word to be zero;
  * this lets us disambiguate alignment padding bytes from the start of an
  * unaligned datum.  (We now *require* pad bytes to be filled with zero!)
+ *
+ * In TOAST datums the tag field in varattrib_1b_e is used to discern whether
+ * its an indirection pointer or more commonly an on-disk tuple.
  */
 
 /*
@@ -158,21 +185,21 @@ typedef struct
 
 /* VARSIZE_4B() should only be used on known-aligned data */
 #define VARSIZE_4B(PTR) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header & 0x3FFFFFFF)
+	(((varattrib_4b *) (PTR))->va_header & 0x3FFFFFFF)
 #define VARSIZE_1B(PTR) \
 	(((varattrib_1b *) (PTR))->va_header & 0x7F)
-#define VARSIZE_1B_E(PTR) \
-	(((varattrib_1b_e *) (PTR))->va_len_1be)
+#define VARTAG_1B_E(PTR) \
+	(((varattrib_1b_e *) (PTR))->va_tag)
 
 #define SET_VARSIZE_4B(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = (len) & 0x3FFFFFFF)
+	(((varattrib_4b *) (PTR))->va_header = (len) & 0x3FFFFFFF)
 #define SET_VARSIZE_4B_C(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = ((len) & 0x3FFFFFFF) | 0x40000000)
+	(((varattrib_4b *) (PTR))->va_header = ((len) & 0x3FFFFFFF) | 0x40000000)
 #define SET_VARSIZE_1B(PTR,len) \
 	(((varattrib_1b *) (PTR))->va_header = (len) | 0x80)
-#define SET_VARSIZE_1B_E(PTR,len) \
+#define SET_VARTAG_1B_E(PTR,tag) \
 	(((varattrib_1b_e *) (PTR))->va_header = 0x80, \
-	 ((varattrib_1b_e *) (PTR))->va_len_1be = (len))
+	 ((varattrib_1b_e *) (PTR))->va_tag = (tag))
 #else							/* !WORDS_BIGENDIAN */
 
 #define VARATT_IS_4B(PTR) \
@@ -190,24 +217,24 @@ typedef struct
 
 /* VARSIZE_4B() should only be used on known-aligned data */
 #define VARSIZE_4B(PTR) \
-	((((varattrib_4b *) (PTR))->va_4byte.va_header >> 2) & 0x3FFFFFFF)
+	((((varattrib_4b *) (PTR))->va_header >> 2) & 0x3FFFFFFF)
 #define VARSIZE_1B(PTR) \
 	((((varattrib_1b *) (PTR))->va_header >> 1) & 0x7F)
-#define VARSIZE_1B_E(PTR) \
-	(((varattrib_1b_e *) (PTR))->va_len_1be)
+#define VARTAG_1B_E(PTR) \
+	(((varattrib_1b_e *) (PTR))->va_tag)
 
 #define SET_VARSIZE_4B(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = (((uint32) (len)) << 2))
+	(((varattrib_4b *) (PTR))->va_header = (((uint32) (len)) << 2))
 #define SET_VARSIZE_4B_C(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = (((uint32) (len)) << 2) | 0x02)
+	(((varattrib_4b *) (PTR))->va_header = (((uint32) (len)) << 2) | 0x02)
 #define SET_VARSIZE_1B(PTR,len) \
 	(((varattrib_1b *) (PTR))->va_header = (((uint8) (len)) << 1) | 0x01)
-#define SET_VARSIZE_1B_E(PTR,len) \
+#define SET_VARTAG_1B_E(PTR,tag) \
 	(((varattrib_1b_e *) (PTR))->va_header = 0x01, \
-	 ((varattrib_1b_e *) (PTR))->va_len_1be = (len))
+	 ((varattrib_1b_e *) (PTR))->va_tag = (tag))
 #endif   /* WORDS_BIGENDIAN */
 
-#define VARHDRSZ_SHORT			1
+#define VARHDRSZ_SHORT			offsetof(varattrib_1b, va_data)
 #define VARATT_SHORT_MAX		0x7F
 #define VARATT_CAN_MAKE_SHORT(PTR) \
 	(VARATT_IS_4B_U(PTR) && \
@@ -215,16 +242,12 @@ typedef struct
 #define VARATT_CONVERTED_SHORT_SIZE(PTR) \
 	(VARSIZE(PTR) - VARHDRSZ + VARHDRSZ_SHORT)
 
-#define VARHDRSZ_EXTERNAL		2
+#define VARHDRSZ_EXTERNAL		offsetof(varattrib_1b_e, va_data)
 
-#define VARDATA_4B(PTR)		(((varattrib_4b *) (PTR))->va_4byte.va_data)
-#define VARDATA_4B_C(PTR)	(((varattrib_4b *) (PTR))->va_compressed.va_data)
+#define VARDATA_4B(PTR)		(((varattrib_4b *) (PTR))->va_data)
 #define VARDATA_1B(PTR)		(((varattrib_1b *) (PTR))->va_data)
 #define VARDATA_1B_E(PTR)	(((varattrib_1b_e *) (PTR))->va_data)
 
-#define VARRAWSIZE_4B_C(PTR) \
-	(((varattrib_4b *) (PTR))->va_compressed.va_rawsize)
-
 /* Externally visible macros */
 
 /*
@@ -249,26 +272,32 @@ typedef struct
 #define VARSIZE_SHORT(PTR)					VARSIZE_1B(PTR)
 #define VARDATA_SHORT(PTR)					VARDATA_1B(PTR)
 
-#define VARSIZE_EXTERNAL(PTR)				VARSIZE_1B_E(PTR)
+#define VARTAG_EXTERNAL(PTR)				VARTAG_1B_E(PTR)
+#define VARSIZE_EXTERNAL(PTR)				(VARHDRSZ_EXTERNAL + VARTAG_SIZE(VARTAG_EXTERNAL(PTR)))
 #define VARDATA_EXTERNAL(PTR)				VARDATA_1B_E(PTR)
 
 #define VARATT_IS_COMPRESSED(PTR)			VARATT_IS_4B_C(PTR)
 #define VARATT_IS_EXTERNAL(PTR)				VARATT_IS_1B_E(PTR)
+#define VARATT_IS_EXTERNAL_ONDISK(PTR) \
+	(VARATT_IS_EXTERNAL(PTR) && VARTAG_EXTERNAL(PTR) == VARTAG_ONDISK)
+#define VARATT_IS_EXTERNAL_INDIRECT(PTR) \
+	(VARATT_IS_EXTERNAL(PTR) && VARTAG_EXTERNAL(PTR) == VARTAG_INDIRECT)
 #define VARATT_IS_SHORT(PTR)				VARATT_IS_1B(PTR)
 #define VARATT_IS_EXTENDED(PTR)				(!VARATT_IS_4B_U(PTR))
 
 #define SET_VARSIZE(PTR, len)				SET_VARSIZE_4B(PTR, len)
 #define SET_VARSIZE_SHORT(PTR, len)			SET_VARSIZE_1B(PTR, len)
 #define SET_VARSIZE_COMPRESSED(PTR, len)	SET_VARSIZE_4B_C(PTR, len)
-#define SET_VARSIZE_EXTERNAL(PTR, len)		SET_VARSIZE_1B_E(PTR, len)
+
+#define SET_VARTAG_EXTERNAL(PTR, tag)		SET_VARTAG_1B_E(PTR, tag)
 
 #define VARSIZE_ANY(PTR) \
-	(VARATT_IS_1B_E(PTR) ? VARSIZE_1B_E(PTR) : \
+	(VARATT_IS_1B_E(PTR) ? VARSIZE_EXTERNAL(PTR) : \
 	 (VARATT_IS_1B(PTR) ? VARSIZE_1B(PTR) : \
 	  VARSIZE_4B(PTR)))
 
 #define VARSIZE_ANY_EXHDR(PTR) \
-	(VARATT_IS_1B_E(PTR) ? VARSIZE_1B_E(PTR)-VARHDRSZ_EXTERNAL : \
+	(VARATT_IS_1B_E(PTR) ? VARSIZE_EXTERNAL(PTR)-VARHDRSZ_EXTERNAL : \
 	 (VARATT_IS_1B(PTR) ? VARSIZE_1B(PTR)-VARHDRSZ_SHORT : \
 	  VARSIZE_4B(PTR)-VARHDRSZ))
 
diff --git a/src/include/utils/pg_lzcompress.h b/src/include/utils/pg_lzcompress.h
index 4af24a3..4ee7308 100644
--- a/src/include/utils/pg_lzcompress.h
+++ b/src/include/utils/pg_lzcompress.h
@@ -19,8 +19,8 @@
  */
 typedef struct PGLZ_Header
 {
-	int32		vl_len_;		/* varlena header (do not touch directly!) */
-	int32		rawsize;
+	uint32		vl_len_;		/* varlena header (do not touch directly!) */
+	uint32		rawsize;
 } PGLZ_Header;
 
 
-- 
1.7.10.4

