>From dda7b9aba8a6314e3beb773f140bb733ebcdaa53 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 11 Jun 2013 12:23:51 +0200
Subject: [PATCH 2/2] pluggable compression

---
 src/backend/access/heap/tuptoaster.c | 231 +++++++++++++++++++++++++++++------
 src/backend/utils/misc/guc.c         |  11 ++
 src/include/access/tuptoaster.h      |   1 +
 src/include/postgres.h               |  36 ++----
 src/include/utils/pg_lzcompress.h    |   4 +-
 5 files changed, 220 insertions(+), 63 deletions(-)

diff --git a/src/backend/access/heap/tuptoaster.c b/src/backend/access/heap/tuptoaster.c
index fc37ceb..12e3659 100644
--- a/src/backend/access/heap/tuptoaster.c
+++ b/src/backend/access/heap/tuptoaster.c
@@ -34,6 +34,7 @@
 #include "access/heapam.h"
 #include "access/tuptoaster.h"
 #include "access/xact.h"
+#include "common/snappy/snappy.h"
 #include "catalog/catalog.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_lzcompress.h"
@@ -72,6 +73,7 @@ do { \
 	memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
 } while (0)
 
+int toast_compression_algo = 0;
 
 static void toast_delete_datum(Relation rel, Datum value);
 static Datum toast_save_datum(Relation rel, Datum value,
@@ -81,7 +83,8 @@ static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
 static struct varlena *toast_fetch_datum(struct varlena * attr);
 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
 						int32 sliceoffset, int32 length);
-
+static Datum toast_uncompress_datum(Datum attr);
+static Size toast_uncompressed_length(struct varlena *attr);
 
 /* ----------
  * heap_tuple_fetch_attr -
@@ -137,11 +140,11 @@ heap_tuple_untoast_attr(struct varlena * attr)
 		/* If it's compressed, decompress it */
 		if (VARATT_IS_COMPRESSED(attr))
 		{
-			PGLZ_Header *tmp = (PGLZ_Header *) attr;
+			struct varlena *tmp = attr;
+
+			attr = (struct varlena *) DatumGetPointer(
+				toast_uncompress_datum(PointerGetDatum(attr)));
 
-			attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-			SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-			pglz_decompress(tmp, VARDATA(attr));
 			pfree(tmp);
 		}
 	}
@@ -150,11 +153,8 @@ heap_tuple_untoast_attr(struct varlena * attr)
 		/*
 		 * This is a compressed value inside of the main tuple
 		 */
-		PGLZ_Header *tmp = (PGLZ_Header *) attr;
-
-		attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-		SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
-		pglz_decompress(tmp, VARDATA(attr));
+		attr = (struct varlena *) DatumGetPointer(
+			toast_uncompress_datum(PointerGetDatum(attr)));
 	}
 	else if (VARATT_IS_SHORT(attr))
 	{
@@ -209,15 +209,8 @@ heap_tuple_untoast_attr_slice(struct varlena * attr,
 
 	if (VARATT_IS_COMPRESSED(preslice))
 	{
-		PGLZ_Header *tmp = (PGLZ_Header *) preslice;
-		Size		size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
-
-		preslice = (struct varlena *) palloc(size);
-		SET_VARSIZE(preslice, size);
-		pglz_decompress(tmp, VARDATA(preslice));
-
-		if (tmp != (PGLZ_Header *) attr)
-			pfree(tmp);
+		preslice = (struct varlena *) DatumGetPointer(
+			toast_uncompress_datum(PointerGetDatum(preslice)));
 	}
 
 	if (VARATT_IS_SHORT(preslice))
@@ -277,8 +270,7 @@ toast_raw_datum_size(Datum value)
 	}
 	else if (VARATT_IS_COMPRESSED(attr))
 	{
-		/* here, va_rawsize is just the payload size */
-		result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
+		result = toast_uncompressed_length(attr);
 	}
 	else if (VARATT_IS_SHORT(attr))
 	{
@@ -1179,8 +1171,11 @@ toast_flatten_tuple_attribute(Datum value,
 Datum
 toast_compress_datum(Datum value)
 {
-	struct varlena *tmp;
+	struct varlena *tmp = NULL;
 	int32		valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+	int32		compressed_size;
+	Size		buffer_size;
+	int			ret;
 
 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
 	Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
@@ -1188,36 +1183,195 @@ toast_compress_datum(Datum value)
 	/*
 	 * No point in wasting a palloc cycle if value size is out of the allowed
 	 * range for compression
+	 *
+	 * XXX: Generalize concept, without referring to PGLZ
 	 */
 	if (valsize < PGLZ_strategy_default->min_input_size ||
 		valsize > PGLZ_strategy_default->max_input_size)
 		return PointerGetDatum(NULL);
 
-	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
+	/*
+	 * choose compressor based on toast_compression_algo GUC.
+	 * XXX: We probably rather want a storage attribute for that.
+	 */
+	/* compress with pglz */
+	if (toast_compression_algo == 0)
+	{
+		tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
+
+		if (!pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
+						   (PGLZ_Header *) tmp, PGLZ_strategy_default))
+			goto incompressible;
+		/* pglz_compress sets rawsize internally */
+	}
+	/* compress with snappy */
+	else if (toast_compression_algo == 1)
+	{
+		static struct snappy_env *snappy_env = NULL;
+		if (snappy_env == NULL)
+		{
+			snappy_env = malloc(sizeof(struct snappy_env));
+			snappy_init_env(snappy_env);
+		}
+		/* ask compressor about buffer size */
+		buffer_size = snappy_max_compressed_length(valsize);
+		tmp = (struct varlena *) palloc(buffer_size	+ VARHDRSZ + 4);
+
+		ret = snappy_compress(snappy_env, VARDATA_ANY(DatumGetPointer(value)),
+							  (size_t)valsize, ((char *)VARDATA(tmp)) + 4,
+							  &buffer_size);
+		/* EIO is returned for incompressible data */
+		if (ret == EIO)
+			goto incompressible;
+		else if (ret != 0)
+			elog(ERROR, "compression failed: %d", ret);
+
+		/* encode compression algorithm in size */
+		*((uint32 *)VARDATA(tmp)) = 1 << 30 | valsize;
+		SET_VARSIZE_COMPRESSED(tmp, buffer_size + VARHDRSZ + 4);
+	}
+	else
+		elog(ERROR, "huh? There's not much between 1 and zero");
+
+	compressed_size = VARSIZE(tmp);
 
 	/*
-	 * We recheck the actual size even if pglz_compress() reports success,
-	 * because it might be satisfied with having saved as little as one byte
-	 * in the compressed data --- which could turn into a net loss once you
-	 * consider header and alignment padding.  Worst case, the compressed
-	 * format might require three padding bytes (plus header, which is
-	 * included in VARSIZE(tmp)), whereas the uncompressed format would take
-	 * only one header byte and no padding if the value is short enough.  So
-	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
+	 * Check whether the compression was sufficiently effective. Some of the
+	 * compression methods check for blowing up to a larger amount of data than
+	 * the source, some don't. Even if they do, like pglz_compress(), they
+	 * might reports success, having saved as little as one byte in the
+	 * compressed data --- which could turn into a net loss once you consider
+	 * header and alignment padding.  Worst case, the compressed format might
+	 * require three padding bytes (plus header, which is included in
+	 * VARSIZE(tmp)), whereas the uncompressed format would take only one
+	 * header byte and no padding if the value is short enough.  So we insist
+	 * on a savings of more than 2 bytes to ensure we have a gain.
 	 */
-	if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
-					  (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
-		VARSIZE(tmp) < valsize - 2)
+	if (compressed_size < valsize - 2)
 	{
 		/* successful compression */
 		return PointerGetDatum(tmp);
 	}
+
+	/* incompressible data */
+incompressible:
+	if (tmp != NULL)
+		pfree(tmp);
+	return PointerGetDatum(NULL);
+}
+
+static Size
+toast_uncompressed_length(struct varlena *attr)
+{
+#ifdef USE_ASSERT_CHECKING
+	uint8 compression_type;
+#endif
+	Size result;
+
+	Assert(VARATT_IS_COMPRESSED(attr));
+
+	/*
+	 * Some compression methods have internal knowledge about the datum
+	 * length. Crosscheck.
+	 */
+#ifdef USE_ASSERT_CHECKING
+	compression_type = (*(uint32 *) (VARDATA(attr))) >> 30;
+
+	/* pglz stores the size as uint32 at the beginning */
+	if (compression_type == 0)
+	{
+		/* here, rawsize is just the payload size */
+		result = PGLZ_RAW_SIZE((PGLZ_Header *)attr);
+	}
+
+	/* snappy encodes the length as a varint */
+	else if (compression_type == 1)
+	{
+		if (!snappy_uncompressed_length(((char *)VARDATA(attr)) + 4,
+									   VARSIZE_ANY_EXHDR(attr) - 4,
+									   &result))
+			elog(ERROR, "could not read uncompressed size");
+	}
 	else
 	{
-		/* incompressible data */
-		pfree(tmp);
-		return PointerGetDatum(NULL);
+		elog(ERROR, "unknown compression method %u", (uint32)compression_type);
+	}
+	Assert(((*(uint32 *) (VARDATA(attr))) & 0x3ffffff) == result);
+#endif
+
+	result = (*(uint32 *) (VARDATA(attr))) & 0x3ffffff;
+
+	/* varlena overhead */
+	result += VARHDRSZ;
+	return result;
+}
+
+static Datum
+toast_uncompress_datum(Datum value)
+{
+	struct varlena *attr =  (struct varlena *) DatumGetPointer(value);
+	uint8 compression_type;
+
+	Assert(VARATT_IS_4B_C(value));
+
+	/* ----
+	 * Disambiguate between compression strategies:
+	 *
+	 * In PGLZ - the formerly only compression method - the first 4 bytes are
+	 * used to store the raw size of the datum as a signed integer. Since that
+	 * cannot be more than 1GB due to toast limitations we have the 2 high bits
+	 * to disambiguate whether its pglz or something more modern. We cannot
+	 * change the meaning of Datums with the first 2 bits unset since we need
+	 * to support the old ondisk format.
+	 *
+	 * If it's not pglz we store 1 byte of 1's and then 1 byte determining the
+	 * compression method. We could just use the two bytes to store 3 other
+	 * compression methods but maybe we better don't paint ourselves in a
+	 * corner again.
+	 * ----
+	 */
+	compression_type = (*(uint32 *) VARDATA(value)) >> 30;
+
+	if (compression_type == 0)
+	{
+		PGLZ_Header *tmp = (PGLZ_Header *) DatumGetPointer(value);
+		attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
+		SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
+		pglz_decompress(tmp, VARDATA(attr));
+	}
+	else if (compression_type == 1)
+	{
+		void *compressed_data;
+		Size compressed_length;
+		int ret;
+		Size uncompressed_length;
+
+		compressed_data = ((char *)VARDATA(attr)) + 4;
+		compressed_length = VARSIZE_ANY_EXHDR(attr) - 4;
+
+		ret = snappy_uncompressed_length(compressed_data,
+										 compressed_length,
+										 &uncompressed_length);
+		if (!ret)
+			elog(ERROR, "failed to determine compression length");
+		if (uncompressed_length != ((*(uint32 *) (VARDATA(attr))) & 0x3ffffff))
+			elog(ERROR, "compression size mismatch");
+
+		attr = (struct varlena *) palloc(uncompressed_length + VARHDRSZ);
+		SET_VARSIZE(attr, uncompressed_length + VARHDRSZ);
+
+		ret = snappy_uncompress(compressed_data,
+								compressed_length,
+								VARDATA(attr));
+		if (ret != 0)
+			elog(ERROR, "decompression failed: %d", ret);
+	}
+	else
+	{
+		elog(ERROR, "unknown extended compression method %c",
+			 compression_type);
 	}
+	return PointerGetDatum(attr);
 }
 
 
@@ -1284,10 +1438,11 @@ toast_save_datum(Relation rel, Datum value,
 	}
 	else if (VARATT_IS_COMPRESSED(dval))
 	{
+		struct varlena *dval_a = (struct varlena *) dval;
 		data_p = VARDATA(dval);
 		data_todo = VARSIZE(dval) - VARHDRSZ;
 		/* rawsize in a compressed datum is just the size of the payload */
-		toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
+		toast_pointer.va_rawsize = toast_uncompressed_length(dval_a);
 		toast_pointer.va_extsize = data_todo;
 		/* Assert that the numbers look like it's compressed */
 		Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ea16c64..5f3b4f5 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
 
 #include "access/gin.h"
 #include "access/transam.h"
+#include "access/tuptoaster.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
@@ -1889,6 +1890,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"toast_compression_algo", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("chooses the compression algo: 0 pglz, 1: snappy."),
+			NULL
+		},
+		&toast_compression_algo,
+		0, 0, 1,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"vacuum_freeze_min_age", PGC_USERSET, CLIENT_CONN_STATEMENT,
 			gettext_noop("Minimum age at which VACUUM should freeze a table row."),
 			NULL
diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h
index 6f4fc45..97334fd 100644
--- a/src/include/access/tuptoaster.h
+++ b/src/include/access/tuptoaster.h
@@ -94,6 +94,7 @@
 	 sizeof(int32) -									\
 	 VARHDRSZ)
 
+extern int toast_compression_algo;
 
 /* ----------
  * toast_insert_or_update -
diff --git a/src/include/postgres.h b/src/include/postgres.h
index 30e1dee..6fd70c6 100644
--- a/src/include/postgres.h
+++ b/src/include/postgres.h
@@ -81,21 +81,15 @@ struct varatt_external
  * compiler might otherwise think it could generate code that assumes
  * alignment while touching fields of a 1-byte-header varlena.
  */
-typedef union
+
+/* Normal and inline compressed varlena (4-byte length) */
+typedef struct
 {
-	struct						/* Normal varlena (4-byte length) */
-	{
-		uint32		va_header;
-		char		va_data[1];
-	}			va_4byte;
-	struct						/* Compressed-in-line format */
-	{
-		uint32		va_header;
-		uint32		va_rawsize; /* Original data size (excludes header) */
-		char		va_data[1]; /* Compressed data */
-	}			va_compressed;
+	uint32		va_header;
+	char		va_data[1];
 } varattrib_4b;
 
+/* short inline uncompressed varlena (1-byte lenght) */
 typedef struct
 {
 	uint8		va_header;
@@ -158,16 +152,16 @@ typedef struct
 
 /* VARSIZE_4B() should only be used on known-aligned data */
 #define VARSIZE_4B(PTR) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header & 0x3FFFFFFF)
+	(((varattrib_4b *) (PTR))->va_header & 0x3FFFFFFF)
 #define VARSIZE_1B(PTR) \
 	(((varattrib_1b *) (PTR))->va_header & 0x7F)
 #define VARSIZE_1B_E(PTR) \
 	(((varattrib_1b_e *) (PTR))->va_len_1be)
 
 #define SET_VARSIZE_4B(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = (len) & 0x3FFFFFFF)
+	(((varattrib_4b *) (PTR))->va_header = (len) & 0x3FFFFFFF)
 #define SET_VARSIZE_4B_C(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = ((len) & 0x3FFFFFFF) | 0x40000000)
+	(((varattrib_4b *) (PTR))->va_header = ((len) & 0x3FFFFFFF) | 0x40000000)
 #define SET_VARSIZE_1B(PTR,len) \
 	(((varattrib_1b *) (PTR))->va_header = (len) | 0x80)
 #define SET_VARSIZE_1B_E(PTR,len) \
@@ -190,16 +184,16 @@ typedef struct
 
 /* VARSIZE_4B() should only be used on known-aligned data */
 #define VARSIZE_4B(PTR) \
-	((((varattrib_4b *) (PTR))->va_4byte.va_header >> 2) & 0x3FFFFFFF)
+	((((varattrib_4b *) (PTR))->va_header >> 2) & 0x3FFFFFFF)
 #define VARSIZE_1B(PTR) \
 	((((varattrib_1b *) (PTR))->va_header >> 1) & 0x7F)
 #define VARSIZE_1B_E(PTR) \
 	(((varattrib_1b_e *) (PTR))->va_len_1be)
 
 #define SET_VARSIZE_4B(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = (((uint32) (len)) << 2))
+	(((varattrib_4b *) (PTR))->va_header = (((uint32) (len)) << 2))
 #define SET_VARSIZE_4B_C(PTR,len) \
-	(((varattrib_4b *) (PTR))->va_4byte.va_header = (((uint32) (len)) << 2) | 0x02)
+	(((varattrib_4b *) (PTR))->va_header = (((uint32) (len)) << 2) | 0x02)
 #define SET_VARSIZE_1B(PTR,len) \
 	(((varattrib_1b *) (PTR))->va_header = (((uint8) (len)) << 1) | 0x01)
 #define SET_VARSIZE_1B_E(PTR,len) \
@@ -217,14 +211,10 @@ typedef struct
 
 #define VARHDRSZ_EXTERNAL		2
 
-#define VARDATA_4B(PTR)		(((varattrib_4b *) (PTR))->va_4byte.va_data)
-#define VARDATA_4B_C(PTR)	(((varattrib_4b *) (PTR))->va_compressed.va_data)
+#define VARDATA_4B(PTR)		(((varattrib_4b *) (PTR))->va_data)
 #define VARDATA_1B(PTR)		(((varattrib_1b *) (PTR))->va_data)
 #define VARDATA_1B_E(PTR)	(((varattrib_1b_e *) (PTR))->va_data)
 
-#define VARRAWSIZE_4B_C(PTR) \
-	(((varattrib_4b *) (PTR))->va_compressed.va_rawsize)
-
 /* Externally visible macros */
 
 /*
diff --git a/src/include/utils/pg_lzcompress.h b/src/include/utils/pg_lzcompress.h
index 4af24a3..4ee7308 100644
--- a/src/include/utils/pg_lzcompress.h
+++ b/src/include/utils/pg_lzcompress.h
@@ -19,8 +19,8 @@
  */
 typedef struct PGLZ_Header
 {
-	int32		vl_len_;		/* varlena header (do not touch directly!) */
-	int32		rawsize;
+	uint32		vl_len_;		/* varlena header (do not touch directly!) */
+	uint32		rawsize;
 } PGLZ_Header;
 
 
-- 
1.8.2.rc2.4.g7799588.dirty

